diff options
author | nandinibhartiyaMDB <nandini.bhartiya@mongodb.com> | 2023-05-11 04:34:31 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2023-05-11 05:34:10 +0000 |
commit | bbb51d48561a47e6dc6e6aec9abf3463c1b6acc3 (patch) | |
tree | fc90d9d9761f800f1646c672b350708de64df5ea | |
parent | 83e593092544b7bce255f3fc25e2d8b0758ea962 (diff) | |
download | mongo-bbb51d48561a47e6dc6e6aec9abf3463c1b6acc3.tar.gz |
SERVER-76790: Fix holding extra tickets during resharding oplog application
4 files changed, 165 insertions, 73 deletions
diff --git a/jstests/sharding/libs/resharding_test_fixture.js b/jstests/sharding/libs/resharding_test_fixture.js index e05e82fa1eb..35426bfb96a 100644 --- a/jstests/sharding/libs/resharding_test_fixture.js +++ b/jstests/sharding/libs/resharding_test_fixture.js @@ -38,6 +38,8 @@ var ReshardingTest = class { maxNumberOfTransactionOperationsInSingleOplogEntry: maxNumberOfTransactionOperationsInSingleOplogEntry = undefined, configShard: configShard = false, + wiredTigerConcurrentWriteTransactions: wiredTigerConcurrentWriteTransactions = undefined, + reshardingOplogBatchTaskCount: reshardingOplogBatchTaskCount = undefined, } = {}) { // The @private JSDoc comments cause VS Code to not display the corresponding properties and // methods in its autocomplete list. This makes it simpler for test authors to know what the @@ -70,6 +72,8 @@ var ReshardingTest = class { this._maxNumberOfTransactionOperationsInSingleOplogEntry = maxNumberOfTransactionOperationsInSingleOplogEntry; this._configShard = configShard || jsTestOptions().configShard; + this._wiredTigerConcurrentWriteTransactions = wiredTigerConcurrentWriteTransactions; + this._reshardingOplogBatchTaskCount = reshardingOplogBatchTaskCount; // Properties set by setup(). /** @private */ @@ -186,6 +190,18 @@ var ReshardingTest = class { Object.merge(configOptions.setParameter, rsOptions.setParameter); } + if (this._wiredTigerConcurrentWriteTransactions !== undefined) { + rsOptions.setParameter.storageEngineConcurrencyAdjustmentAlgorithm = + "fixedConcurrentTransactions"; + rsOptions.setParameter.wiredTigerConcurrentWriteTransactions = + this._wiredTigerConcurrentWriteTransactions; + } + + if (this._reshardingOplogBatchTaskCount !== undefined) { + rsOptions.setParameter.reshardingOplogBatchTaskCount = + this._reshardingOplogBatchTaskCount; + } + this._st = new ShardingTest({ mongos: 1, mongosOptions, diff --git a/jstests/sharding/resharding_with_multi_deletes_reduced_ticket_pool_size.js b/jstests/sharding/resharding_with_multi_deletes_reduced_ticket_pool_size.js new file mode 100644 index 00000000000..7b4d77c5291 --- /dev/null +++ b/jstests/sharding/resharding_with_multi_deletes_reduced_ticket_pool_size.js @@ -0,0 +1,50 @@ +/** + * Test the correctness of multiple deletes during resharding with a reduced ticket pool size. + * + * @tags: [ + * requires_sharding, + * ] + */ + +(function() { +"use strict"; + +load("jstests/libs/discover_topology.js"); +load("jstests/sharding/libs/resharding_test_fixture.js"); + +const kNumWriteTickets = 5; +const kReshardingOplogBatchTaskCount = 20; +const reshardingTest = new ReshardingTest({ + wiredTigerConcurrentWriteTransactions: kNumWriteTickets, + reshardingOplogBatchTaskCount: kReshardingOplogBatchTaskCount +}); + +reshardingTest.setup(); + +const donorShardNames = reshardingTest.donorShardNames; +const sourceCollection = reshardingTest.createShardedCollection({ + ns: "reshardingDb.coll", + shardKeyPattern: {oldKey: 1}, + chunks: [{min: {oldKey: MinKey}, max: {oldKey: MaxKey}, shard: donorShardNames[0]}], +}); +for (let i = 0; i < 100; i++) { + assert.commandWorked(sourceCollection.insert([{x: 1}])); +} +assert.commandWorked(sourceCollection.insert([{x: 3}, {x: 3}])); +const mongos = sourceCollection.getMongo(); +const topology = DiscoverTopology.findConnectedNodes(mongos); +const coordinator = new Mongo(topology.configsvr.nodes[0]); +const recipientShardNames = reshardingTest.recipientShardNames; +reshardingTest.withReshardingInBackground( + { + newShardKeyPattern: {newKey: 1}, + newChunks: [{min: {newKey: MinKey}, max: {newKey: MaxKey}, shard: recipientShardNames[0]}], + }, + () => { + // We wait until cloneTimestamp has been chosen to guarantee that any subsequent writes will + // be applied by the ReshardingOplogApplier. + reshardingTest.awaitCloneTimestampChosen(); + assert.commandWorked(sourceCollection.remove({x: 1}, {justOne: false})); + }); +reshardingTest.teardown(); +})(); diff --git a/src/mongo/db/s/resharding/resharding_oplog_application.cpp b/src/mongo/db/s/resharding/resharding_oplog_application.cpp index 154c6b457ba..2708c104105 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_application.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_application.cpp @@ -148,75 +148,20 @@ Status ReshardingOplogApplicationRules::applyOperation( return writeConflictRetry(opCtx, "applyOplogEntryCRUDOpResharding", op.getNss().ns(), [&] { try { - WriteUnitOfWork wuow(opCtx); - - const auto outputDb = AutoGetDb(opCtx, _outputNss.dbName(), MODE_IX); - - auto outputColl = - opCtx->runWithDeadline(getDeadline(opCtx), opCtx->getTimeoutError(), [&] { - return acquireCollection( - opCtx, - CollectionAcquisitionRequest::fromOpCtx( - opCtx, _outputNss, AcquisitionPrerequisites::kWrite), - MODE_IX); - }); - - uassert( - ErrorCodes::NamespaceNotFound, - str::stream() << "Failed to apply op during resharding due to missing collection " - << _outputNss.toStringForErrorMsg(), - outputColl.exists()); - - auto stashColl = - opCtx->runWithDeadline(getDeadline(opCtx), opCtx->getTimeoutError(), [&] { - return acquireCollection( - opCtx, - CollectionAcquisitionRequest::fromOpCtx( - opCtx, _myStashNss, AcquisitionPrerequisites::kWrite), - MODE_IX); - }); - - uassert( - ErrorCodes::NamespaceNotFound, - str::stream() << "Failed to apply op during resharding due to missing collection " - << _myStashNss.toStringForErrorMsg(), - stashColl.exists()); - auto opType = op.getOpType(); switch (opType) { case repl::OpTypeEnum::kInsert: - _applyInsert_inlock(opCtx, outputColl, stashColl, op); - _applierMetrics->onInsertApplied(); - - break; case repl::OpTypeEnum::kUpdate: - _applyUpdate_inlock(opCtx, outputColl, stashColl, op); - _applierMetrics->onUpdateApplied(); + _applyInsertOrUpdate(opCtx, sii, op); break; case repl::OpTypeEnum::kDelete: { - _applyDelete_inlock(opCtx, outputColl, stashColl, sii, op); + _applyDelete(opCtx, sii, op); _applierMetrics->onDeleteApplied(); break; } default: MONGO_UNREACHABLE; } - - if (opCtx->recoveryUnit()->isTimestamped()) { - // Resharding oplog application does two kinds of writes: - // - // 1) The (obvious) write for applying oplog entries to documents being resharded. - // 2) An unreplicated no-op write that on a document in the output collection to - // ensure serialization of concurrent transactions. - // - // Some of the code paths can end up where only the second kind of write is made. In - // that case, there is no timestamp associated with the write. This results in a - // mixed-mode update chain within WT that is problematic with durable history. We - // roll back those transactions by only committing the `WriteUnitOfWork` when there - // is a timestamp set. - wuow.commit(); - } - return Status::OK(); } catch (const DBException& ex) { if (ex.code() == ErrorCodes::WriteConflict) { @@ -232,6 +177,68 @@ Status ReshardingOplogApplicationRules::applyOperation( }); } +void ReshardingOplogApplicationRules::_applyInsertOrUpdate( + OperationContext* opCtx, + const boost::optional<ShardingIndexesCatalogCache>& sii, + const repl::OplogEntry& op) const { + + WriteUnitOfWork wuow(opCtx); + + auto outputColl = opCtx->runWithDeadline(getDeadline(opCtx), opCtx->getTimeoutError(), [&] { + return acquireCollection(opCtx, + CollectionAcquisitionRequest::fromOpCtx( + opCtx, _outputNss, AcquisitionPrerequisites::kWrite), + MODE_IX); + }); + + uassert(ErrorCodes::NamespaceNotFound, + str::stream() << "Failed to apply op during resharding due to missing collection " + << _outputNss.toStringForErrorMsg(), + outputColl.exists()); + + auto stashColl = opCtx->runWithDeadline(getDeadline(opCtx), opCtx->getTimeoutError(), [&] { + return acquireCollection(opCtx, + CollectionAcquisitionRequest::fromOpCtx( + opCtx, _myStashNss, AcquisitionPrerequisites::kWrite), + MODE_IX); + }); + + uassert(ErrorCodes::NamespaceNotFound, + str::stream() << "Failed to apply op during resharding due to missing collection " + << _myStashNss.toStringForErrorMsg(), + stashColl.exists()); + + auto opType = op.getOpType(); + switch (opType) { + case repl::OpTypeEnum::kInsert: + _applyInsert_inlock(opCtx, outputColl, stashColl, op); + _applierMetrics->onInsertApplied(); + break; + case repl::OpTypeEnum::kUpdate: + _applyUpdate_inlock(opCtx, outputColl, stashColl, op); + _applierMetrics->onUpdateApplied(); + break; + default: + MONGO_UNREACHABLE; + } + + if (opCtx->recoveryUnit()->isTimestamped()) { + // Resharding oplog application does two kinds of writes: + // + // 1) The (obvious) write for applying oplog entries to documents being resharded. + // 2) A find on document in the output collection transformed into an unreplicated no-op + // write on the same document to ensure serialization of concurrent oplog appliers reading + // on the same doc. + // + // Some of the code paths can end up where only the second kind of write is made. In + // that case, there is no timestamp associated with the write. This results in a + // mixed-mode update chain within WT that is problematic with durable history. We + // roll back those transactions by only committing the `WriteUnitOfWork` when there + // is a timestamp set. + wuow.commit(); + } +} + void ReshardingOplogApplicationRules::_applyInsert_inlock(OperationContext* opCtx, ScopedCollectionAcquisition& outputColl, ScopedCollectionAcquisition& stashColl, @@ -416,10 +423,8 @@ void ReshardingOplogApplicationRules::_applyUpdate_inlock(OperationContext* opCt invariant(ur.numMatched != 0); } -void ReshardingOplogApplicationRules::_applyDelete_inlock( +void ReshardingOplogApplicationRules::_applyDelete( OperationContext* opCtx, - const ScopedCollectionAcquisition& outputColl, - const ScopedCollectionAcquisition& stashColl, const boost::optional<ShardingIndexesCatalogCache>& sii, const repl::OplogEntry& op) const { /** @@ -450,17 +455,36 @@ void ReshardingOplogApplicationRules::_applyDelete_inlock( BSONObj idQuery = idField.wrap(); const NamespaceString outputNss = op.getNss(); + { + // First, query the conflict stash collection using [op _id] as the query. If a doc exists, + // apply rule #1 and delete the doc from the stash collection. + WriteUnitOfWork wuow(opCtx); - // First, query the conflict stash collection using [op _id] as the query. If a doc exists, - // apply rule #1 and delete the doc from the stash collection. - auto stashCollDoc = _queryStashCollById(opCtx, stashColl.getCollectionPtr(), idQuery); - if (!stashCollDoc.isEmpty()) { - auto nDeleted = deleteObjects(opCtx, stashColl, idQuery, true /* justOne */); - invariant(nDeleted != 0); + const auto stashColl = + opCtx->runWithDeadline(getDeadline(opCtx), opCtx->getTimeoutError(), [&] { + return acquireCollection(opCtx, + CollectionAcquisitionRequest::fromOpCtx( + opCtx, _myStashNss, AcquisitionPrerequisites::kWrite), + MODE_IX); + }); - _applierMetrics->onWriteToStashCollections(); + uassert(ErrorCodes::NamespaceNotFound, + str::stream() << "Failed to apply op during resharding due to missing collection " + << _myStashNss.toStringForErrorMsg(), + stashColl.exists()); - return; + auto stashCollDoc = _queryStashCollById(opCtx, stashColl.getCollectionPtr(), idQuery); + if (!stashCollDoc.isEmpty()) { + auto nDeleted = deleteObjects(opCtx, stashColl, idQuery, true /* justOne */); + invariant(nDeleted != 0); + + _applierMetrics->onWriteToStashCollections(); + + invariant(opCtx->recoveryUnit()->isTimestamped()); + wuow.commit(); + + return; + } } // Now run 'findByIdAndNoopUpdate' to figure out which of rules #2, #3, and #4 we must apply. diff --git a/src/mongo/db/s/resharding/resharding_oplog_application.h b/src/mongo/db/s/resharding/resharding_oplog_application.h index 5d6c052ed0a..8ebaccfbc62 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_application.h +++ b/src/mongo/db/s/resharding/resharding_oplog_application.h @@ -77,6 +77,10 @@ public: const repl::OplogEntry& op) const; private: + // Applies an insert or update operation + void _applyInsertOrUpdate(OperationContext* opCtx, + const boost::optional<ShardingIndexesCatalogCache>& gii, + const repl::OplogEntry& op) const; // Applies an insert operation void _applyInsert_inlock(OperationContext* opCtx, ScopedCollectionAcquisition& outputColl, @@ -90,11 +94,9 @@ private: const repl::OplogEntry& op) const; // Applies a delete operation - void _applyDelete_inlock(OperationContext* opCtx, - const ScopedCollectionAcquisition& outputColl, - const ScopedCollectionAcquisition& stashColl, - const boost::optional<ShardingIndexesCatalogCache>& gii, - const repl::OplogEntry& op) const; + void _applyDelete(OperationContext* opCtx, + const boost::optional<ShardingIndexesCatalogCache>& gii, + const repl::OplogEntry& op) const; // Queries '_stashNss' using 'idQuery'. BSONObj _queryStashCollById(OperationContext* opCtx, |