summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authornandinibhartiyaMDB <nandini.bhartiya@mongodb.com>2023-05-11 04:34:31 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2023-05-11 05:34:10 +0000
commitbbb51d48561a47e6dc6e6aec9abf3463c1b6acc3 (patch)
treefc90d9d9761f800f1646c672b350708de64df5ea
parent83e593092544b7bce255f3fc25e2d8b0758ea962 (diff)
downloadmongo-bbb51d48561a47e6dc6e6aec9abf3463c1b6acc3.tar.gz
SERVER-76790: Fix holding extra tickets during resharding oplog application
-rw-r--r--jstests/sharding/libs/resharding_test_fixture.js16
-rw-r--r--jstests/sharding/resharding_with_multi_deletes_reduced_ticket_pool_size.js50
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_application.cpp160
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_application.h12
4 files changed, 165 insertions, 73 deletions
diff --git a/jstests/sharding/libs/resharding_test_fixture.js b/jstests/sharding/libs/resharding_test_fixture.js
index e05e82fa1eb..35426bfb96a 100644
--- a/jstests/sharding/libs/resharding_test_fixture.js
+++ b/jstests/sharding/libs/resharding_test_fixture.js
@@ -38,6 +38,8 @@ var ReshardingTest = class {
maxNumberOfTransactionOperationsInSingleOplogEntry:
maxNumberOfTransactionOperationsInSingleOplogEntry = undefined,
configShard: configShard = false,
+ wiredTigerConcurrentWriteTransactions: wiredTigerConcurrentWriteTransactions = undefined,
+ reshardingOplogBatchTaskCount: reshardingOplogBatchTaskCount = undefined,
} = {}) {
// The @private JSDoc comments cause VS Code to not display the corresponding properties and
// methods in its autocomplete list. This makes it simpler for test authors to know what the
@@ -70,6 +72,8 @@ var ReshardingTest = class {
this._maxNumberOfTransactionOperationsInSingleOplogEntry =
maxNumberOfTransactionOperationsInSingleOplogEntry;
this._configShard = configShard || jsTestOptions().configShard;
+ this._wiredTigerConcurrentWriteTransactions = wiredTigerConcurrentWriteTransactions;
+ this._reshardingOplogBatchTaskCount = reshardingOplogBatchTaskCount;
// Properties set by setup().
/** @private */
@@ -186,6 +190,18 @@ var ReshardingTest = class {
Object.merge(configOptions.setParameter, rsOptions.setParameter);
}
+ if (this._wiredTigerConcurrentWriteTransactions !== undefined) {
+ rsOptions.setParameter.storageEngineConcurrencyAdjustmentAlgorithm =
+ "fixedConcurrentTransactions";
+ rsOptions.setParameter.wiredTigerConcurrentWriteTransactions =
+ this._wiredTigerConcurrentWriteTransactions;
+ }
+
+ if (this._reshardingOplogBatchTaskCount !== undefined) {
+ rsOptions.setParameter.reshardingOplogBatchTaskCount =
+ this._reshardingOplogBatchTaskCount;
+ }
+
this._st = new ShardingTest({
mongos: 1,
mongosOptions,
diff --git a/jstests/sharding/resharding_with_multi_deletes_reduced_ticket_pool_size.js b/jstests/sharding/resharding_with_multi_deletes_reduced_ticket_pool_size.js
new file mode 100644
index 00000000000..7b4d77c5291
--- /dev/null
+++ b/jstests/sharding/resharding_with_multi_deletes_reduced_ticket_pool_size.js
@@ -0,0 +1,50 @@
+/**
+ * Test the correctness of multiple deletes during resharding with a reduced ticket pool size.
+ *
+ * @tags: [
+ * requires_sharding,
+ * ]
+ */
+
+(function() {
+"use strict";
+
+load("jstests/libs/discover_topology.js");
+load("jstests/sharding/libs/resharding_test_fixture.js");
+
+const kNumWriteTickets = 5;
+const kReshardingOplogBatchTaskCount = 20;
+const reshardingTest = new ReshardingTest({
+ wiredTigerConcurrentWriteTransactions: kNumWriteTickets,
+ reshardingOplogBatchTaskCount: kReshardingOplogBatchTaskCount
+});
+
+reshardingTest.setup();
+
+const donorShardNames = reshardingTest.donorShardNames;
+const sourceCollection = reshardingTest.createShardedCollection({
+ ns: "reshardingDb.coll",
+ shardKeyPattern: {oldKey: 1},
+ chunks: [{min: {oldKey: MinKey}, max: {oldKey: MaxKey}, shard: donorShardNames[0]}],
+});
+for (let i = 0; i < 100; i++) {
+ assert.commandWorked(sourceCollection.insert([{x: 1}]));
+}
+assert.commandWorked(sourceCollection.insert([{x: 3}, {x: 3}]));
+const mongos = sourceCollection.getMongo();
+const topology = DiscoverTopology.findConnectedNodes(mongos);
+const coordinator = new Mongo(topology.configsvr.nodes[0]);
+const recipientShardNames = reshardingTest.recipientShardNames;
+reshardingTest.withReshardingInBackground(
+ {
+ newShardKeyPattern: {newKey: 1},
+ newChunks: [{min: {newKey: MinKey}, max: {newKey: MaxKey}, shard: recipientShardNames[0]}],
+ },
+ () => {
+ // We wait until cloneTimestamp has been chosen to guarantee that any subsequent writes will
+ // be applied by the ReshardingOplogApplier.
+ reshardingTest.awaitCloneTimestampChosen();
+ assert.commandWorked(sourceCollection.remove({x: 1}, {justOne: false}));
+ });
+reshardingTest.teardown();
+})();
diff --git a/src/mongo/db/s/resharding/resharding_oplog_application.cpp b/src/mongo/db/s/resharding/resharding_oplog_application.cpp
index 154c6b457ba..2708c104105 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_application.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_application.cpp
@@ -148,75 +148,20 @@ Status ReshardingOplogApplicationRules::applyOperation(
return writeConflictRetry(opCtx, "applyOplogEntryCRUDOpResharding", op.getNss().ns(), [&] {
try {
- WriteUnitOfWork wuow(opCtx);
-
- const auto outputDb = AutoGetDb(opCtx, _outputNss.dbName(), MODE_IX);
-
- auto outputColl =
- opCtx->runWithDeadline(getDeadline(opCtx), opCtx->getTimeoutError(), [&] {
- return acquireCollection(
- opCtx,
- CollectionAcquisitionRequest::fromOpCtx(
- opCtx, _outputNss, AcquisitionPrerequisites::kWrite),
- MODE_IX);
- });
-
- uassert(
- ErrorCodes::NamespaceNotFound,
- str::stream() << "Failed to apply op during resharding due to missing collection "
- << _outputNss.toStringForErrorMsg(),
- outputColl.exists());
-
- auto stashColl =
- opCtx->runWithDeadline(getDeadline(opCtx), opCtx->getTimeoutError(), [&] {
- return acquireCollection(
- opCtx,
- CollectionAcquisitionRequest::fromOpCtx(
- opCtx, _myStashNss, AcquisitionPrerequisites::kWrite),
- MODE_IX);
- });
-
- uassert(
- ErrorCodes::NamespaceNotFound,
- str::stream() << "Failed to apply op during resharding due to missing collection "
- << _myStashNss.toStringForErrorMsg(),
- stashColl.exists());
-
auto opType = op.getOpType();
switch (opType) {
case repl::OpTypeEnum::kInsert:
- _applyInsert_inlock(opCtx, outputColl, stashColl, op);
- _applierMetrics->onInsertApplied();
-
- break;
case repl::OpTypeEnum::kUpdate:
- _applyUpdate_inlock(opCtx, outputColl, stashColl, op);
- _applierMetrics->onUpdateApplied();
+ _applyInsertOrUpdate(opCtx, sii, op);
break;
case repl::OpTypeEnum::kDelete: {
- _applyDelete_inlock(opCtx, outputColl, stashColl, sii, op);
+ _applyDelete(opCtx, sii, op);
_applierMetrics->onDeleteApplied();
break;
}
default:
MONGO_UNREACHABLE;
}
-
- if (opCtx->recoveryUnit()->isTimestamped()) {
- // Resharding oplog application does two kinds of writes:
- //
- // 1) The (obvious) write for applying oplog entries to documents being resharded.
- // 2) An unreplicated no-op write that on a document in the output collection to
- // ensure serialization of concurrent transactions.
- //
- // Some of the code paths can end up where only the second kind of write is made. In
- // that case, there is no timestamp associated with the write. This results in a
- // mixed-mode update chain within WT that is problematic with durable history. We
- // roll back those transactions by only committing the `WriteUnitOfWork` when there
- // is a timestamp set.
- wuow.commit();
- }
-
return Status::OK();
} catch (const DBException& ex) {
if (ex.code() == ErrorCodes::WriteConflict) {
@@ -232,6 +177,68 @@ Status ReshardingOplogApplicationRules::applyOperation(
});
}
+void ReshardingOplogApplicationRules::_applyInsertOrUpdate(
+ OperationContext* opCtx,
+ const boost::optional<ShardingIndexesCatalogCache>& sii,
+ const repl::OplogEntry& op) const {
+
+ WriteUnitOfWork wuow(opCtx);
+
+ auto outputColl = opCtx->runWithDeadline(getDeadline(opCtx), opCtx->getTimeoutError(), [&] {
+ return acquireCollection(opCtx,
+ CollectionAcquisitionRequest::fromOpCtx(
+ opCtx, _outputNss, AcquisitionPrerequisites::kWrite),
+ MODE_IX);
+ });
+
+ uassert(ErrorCodes::NamespaceNotFound,
+ str::stream() << "Failed to apply op during resharding due to missing collection "
+ << _outputNss.toStringForErrorMsg(),
+ outputColl.exists());
+
+ auto stashColl = opCtx->runWithDeadline(getDeadline(opCtx), opCtx->getTimeoutError(), [&] {
+ return acquireCollection(opCtx,
+ CollectionAcquisitionRequest::fromOpCtx(
+ opCtx, _myStashNss, AcquisitionPrerequisites::kWrite),
+ MODE_IX);
+ });
+
+ uassert(ErrorCodes::NamespaceNotFound,
+ str::stream() << "Failed to apply op during resharding due to missing collection "
+ << _myStashNss.toStringForErrorMsg(),
+ stashColl.exists());
+
+ auto opType = op.getOpType();
+ switch (opType) {
+ case repl::OpTypeEnum::kInsert:
+ _applyInsert_inlock(opCtx, outputColl, stashColl, op);
+ _applierMetrics->onInsertApplied();
+ break;
+ case repl::OpTypeEnum::kUpdate:
+ _applyUpdate_inlock(opCtx, outputColl, stashColl, op);
+ _applierMetrics->onUpdateApplied();
+ break;
+ default:
+ MONGO_UNREACHABLE;
+ }
+
+ if (opCtx->recoveryUnit()->isTimestamped()) {
+ // Resharding oplog application does two kinds of writes:
+ //
+ // 1) The (obvious) write for applying oplog entries to documents being resharded.
+ // 2) A find on document in the output collection transformed into an unreplicated no-op
+ // write on the same document to ensure serialization of concurrent oplog appliers reading
+ // on the same doc.
+ //
+ // Some of the code paths can end up where only the second kind of write is made. In
+ // that case, there is no timestamp associated with the write. This results in a
+ // mixed-mode update chain within WT that is problematic with durable history. We
+ // roll back those transactions by only committing the `WriteUnitOfWork` when there
+ // is a timestamp set.
+ wuow.commit();
+ }
+}
+
void ReshardingOplogApplicationRules::_applyInsert_inlock(OperationContext* opCtx,
ScopedCollectionAcquisition& outputColl,
ScopedCollectionAcquisition& stashColl,
@@ -416,10 +423,8 @@ void ReshardingOplogApplicationRules::_applyUpdate_inlock(OperationContext* opCt
invariant(ur.numMatched != 0);
}
-void ReshardingOplogApplicationRules::_applyDelete_inlock(
+void ReshardingOplogApplicationRules::_applyDelete(
OperationContext* opCtx,
- const ScopedCollectionAcquisition& outputColl,
- const ScopedCollectionAcquisition& stashColl,
const boost::optional<ShardingIndexesCatalogCache>& sii,
const repl::OplogEntry& op) const {
/**
@@ -450,17 +455,36 @@ void ReshardingOplogApplicationRules::_applyDelete_inlock(
BSONObj idQuery = idField.wrap();
const NamespaceString outputNss = op.getNss();
+ {
+ // First, query the conflict stash collection using [op _id] as the query. If a doc exists,
+ // apply rule #1 and delete the doc from the stash collection.
+ WriteUnitOfWork wuow(opCtx);
- // First, query the conflict stash collection using [op _id] as the query. If a doc exists,
- // apply rule #1 and delete the doc from the stash collection.
- auto stashCollDoc = _queryStashCollById(opCtx, stashColl.getCollectionPtr(), idQuery);
- if (!stashCollDoc.isEmpty()) {
- auto nDeleted = deleteObjects(opCtx, stashColl, idQuery, true /* justOne */);
- invariant(nDeleted != 0);
+ const auto stashColl =
+ opCtx->runWithDeadline(getDeadline(opCtx), opCtx->getTimeoutError(), [&] {
+ return acquireCollection(opCtx,
+ CollectionAcquisitionRequest::fromOpCtx(
+ opCtx, _myStashNss, AcquisitionPrerequisites::kWrite),
+ MODE_IX);
+ });
- _applierMetrics->onWriteToStashCollections();
+ uassert(ErrorCodes::NamespaceNotFound,
+ str::stream() << "Failed to apply op during resharding due to missing collection "
+ << _myStashNss.toStringForErrorMsg(),
+ stashColl.exists());
- return;
+ auto stashCollDoc = _queryStashCollById(opCtx, stashColl.getCollectionPtr(), idQuery);
+ if (!stashCollDoc.isEmpty()) {
+ auto nDeleted = deleteObjects(opCtx, stashColl, idQuery, true /* justOne */);
+ invariant(nDeleted != 0);
+
+ _applierMetrics->onWriteToStashCollections();
+
+ invariant(opCtx->recoveryUnit()->isTimestamped());
+ wuow.commit();
+
+ return;
+ }
}
// Now run 'findByIdAndNoopUpdate' to figure out which of rules #2, #3, and #4 we must apply.
diff --git a/src/mongo/db/s/resharding/resharding_oplog_application.h b/src/mongo/db/s/resharding/resharding_oplog_application.h
index 5d6c052ed0a..8ebaccfbc62 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_application.h
+++ b/src/mongo/db/s/resharding/resharding_oplog_application.h
@@ -77,6 +77,10 @@ public:
const repl::OplogEntry& op) const;
private:
+ // Applies an insert or update operation
+ void _applyInsertOrUpdate(OperationContext* opCtx,
+ const boost::optional<ShardingIndexesCatalogCache>& gii,
+ const repl::OplogEntry& op) const;
// Applies an insert operation
void _applyInsert_inlock(OperationContext* opCtx,
ScopedCollectionAcquisition& outputColl,
@@ -90,11 +94,9 @@ private:
const repl::OplogEntry& op) const;
// Applies a delete operation
- void _applyDelete_inlock(OperationContext* opCtx,
- const ScopedCollectionAcquisition& outputColl,
- const ScopedCollectionAcquisition& stashColl,
- const boost::optional<ShardingIndexesCatalogCache>& gii,
- const repl::OplogEntry& op) const;
+ void _applyDelete(OperationContext* opCtx,
+ const boost::optional<ShardingIndexesCatalogCache>& gii,
+ const repl::OplogEntry& op) const;
// Queries '_stashNss' using 'idQuery'.
BSONObj _queryStashCollById(OperationContext* opCtx,