summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Taskov <alex.taskov@mongodb.com>2021-03-18 09:57:00 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-03-18 14:28:27 +0000
commit272954c26742e40884d5532366d7ec419d1b13d4 (patch)
tree5ad8828eb72abb74a74344b6d313e4bb19a21ff4
parent9d601c939bca2a4304dca2d3c8abd195c1f070af (diff)
downloadmongo-272954c26742e40884d5532366d7ec419d1b13d4.tar.gz
SERVER-54681 Delay start of txnCloners to prevent early write of kIncompleteHistoryStmtId during resharding operation
-rw-r--r--jstests/sharding/libs/resharding_test_fixture.js12
-rw-r--r--jstests/sharding/resharding_inplace_retryable_writes.js115
-rw-r--r--src/mongo/db/s/collection_metadata_test.cpp2
-rw-r--r--src/mongo/db/s/resharding/recipient_document.idl2
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_service.cpp8
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp4
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_recipient_common_test.h2
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service.cpp31
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service.h1
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service_test.cpp1
-rw-r--r--src/mongo/s/resharding/type_collection_fields.idl2
-rw-r--r--src/mongo/shell/servers.js5
12 files changed, 165 insertions, 20 deletions
diff --git a/jstests/sharding/libs/resharding_test_fixture.js b/jstests/sharding/libs/resharding_test_fixture.js
index e1b9073af7e..580eb1bf4bb 100644
--- a/jstests/sharding/libs/resharding_test_fixture.js
+++ b/jstests/sharding/libs/resharding_test_fixture.js
@@ -27,6 +27,7 @@ var ReshardingTest = class {
numDonors: numDonors = 1,
numRecipients: numRecipients = 1,
reshardInPlace: reshardInPlace = false,
+ minimumOperationDurationMS: minimumOperationDurationMS = undefined,
} = {}) {
// The @private JSDoc comments cause VS Code to not display the corresponding properties and
// methods in its autocomplete list. This makes it simpler for test authors to know what the
@@ -41,6 +42,8 @@ var ReshardingTest = class {
/** @private */
this._numShards = this._reshardInPlace ? Math.max(this._numDonors, this._numRecipients)
: this._numDonors + this._numRecipients;
+ /** @private */
+ this._minimumOperationDurationMS = minimumOperationDurationMS;
// Properties set by setup().
/** @private */
@@ -74,11 +77,18 @@ var ReshardingTest = class {
}
setup() {
+ let config = {setParameter: {featureFlagResharding: true}};
+
+ if (this._minimumOperationDurationMS !== undefined) {
+ config.setParameter.reshardingMinimumOperationDurationMillis =
+ this._minimumOperationDurationMS;
+ }
+
this._st = new ShardingTest({
mongos: 1,
mongosOptions: {setParameter: {featureFlagResharding: true}},
config: 1,
- configOptions: {setParameter: {featureFlagResharding: true}},
+ configOptions: config,
shards: this._numShards,
rs: {nodes: 2},
rsOptions: {setParameter: {featureFlagResharding: true}},
diff --git a/jstests/sharding/resharding_inplace_retryable_writes.js b/jstests/sharding/resharding_inplace_retryable_writes.js
new file mode 100644
index 00000000000..9d9ccc84143
--- /dev/null
+++ b/jstests/sharding/resharding_inplace_retryable_writes.js
@@ -0,0 +1,115 @@
+/**
+ * Verify that the cloning phase of a resharding operation takes at least
+ * reshardingMinimumOperationDurationMillis to complete. This will indirectly verify that the
+ * txnCloners were not started until after waiting for reshardingMinimumOperationDurationMillis to
+ * elapse.
+ *
+ * @tags: [requires_fcv_49, uses_atclustertime]
+ */
+
+(function() {
+"use strict";
+
+load("jstests/sharding/libs/resharding_test_fixture.js");
+
+const minimumOperationDurationMS = 30000;
+
+const reshardingTest = new ReshardingTest({
+ numDonors: 2,
+ numRecipients: 2,
+ reshardInPlace: true,
+ minimumOperationDurationMS: minimumOperationDurationMS
+});
+reshardingTest.setup();
+
+const donorShardNames = reshardingTest.donorShardNames;
+const sourceCollection = reshardingTest.createShardedCollection({
+ ns: "reshardingDb.coll",
+ shardKeyPattern: {oldKey: 1},
+ chunks: [
+ {min: {oldKey: MinKey}, max: {oldKey: 0}, shard: donorShardNames[0]},
+ {min: {oldKey: 0}, max: {oldKey: MaxKey}, shard: donorShardNames[1]},
+ ],
+});
+
+assert.commandWorked(sourceCollection.insert([
+ {_id: "stays on shard0", oldKey: -10, newKey: -10, counter: 0},
+ {_id: "moves to shard0", oldKey: 10, newKey: -10, counter: 0},
+]));
+
+const mongos = sourceCollection.getMongo();
+const session = mongos.startSession({causalConsistency: false, retryWrites: false});
+const sessionCollection = session.getDatabase(sourceCollection.getDB().getName())
+ .getCollection(sourceCollection.getName());
+
+function runRetryableWrite(phase, expectedErrorCode = ErrorCodes.OK) {
+ const res = sessionCollection.runCommand("update", {
+ updates: [
+ {q: {_id: "stays on shard0"}, u: {$inc: {counter: 1}}},
+ {q: {_id: "moves to shard0"}, u: {$inc: {counter: 1}}},
+ ],
+ txnNumber: NumberLong(1)
+ });
+
+ if (expectedErrorCode === ErrorCodes.OK) {
+ assert.commandWorked(res);
+ } else {
+ assert.commandFailedWithCode(res, expectedErrorCode);
+ }
+
+ const docs = sourceCollection.find().toArray();
+ assert.eq(2, docs.length, {docs});
+
+ for (const doc of docs) {
+ assert.eq(1,
+ doc.counter,
+ {message: `retryable write executed more than once ${phase}`, id: doc._id, docs});
+ }
+}
+
+runRetryableWrite("before resharding");
+
+const recipientShardNames = reshardingTest.recipientShardNames;
+reshardingTest.withReshardingInBackground( //
+ {
+ newShardKeyPattern: {newKey: 1},
+ newChunks: [
+ {min: {newKey: MinKey}, max: {newKey: 0}, shard: recipientShardNames[0]},
+ {min: {newKey: 0}, max: {newKey: MaxKey}, shard: recipientShardNames[1]},
+ ],
+ },
+ () => {
+ runRetryableWrite("during resharding");
+
+ assert.soon(() => {
+ const coordinatorDoc = mongos.getCollection("config.reshardingOperations").findOne({
+ ns: sourceCollection.getFullName()
+ });
+
+ return coordinatorDoc !== null && coordinatorDoc.fetchTimestamp !== undefined;
+ });
+
+ runRetryableWrite("during resharding after fetchTimestamp was chosen");
+
+ let startTime = Date.now();
+
+ assert.soon(() => {
+ const coordinatorDoc = mongos.getCollection("config.reshardingOperations").findOne({
+ ns: sourceCollection.getFullName()
+ });
+
+ return coordinatorDoc !== null && coordinatorDoc.state === "applying";
+ });
+
+ const epsilon = 5000;
+ const elapsed = Date.now() - startTime;
+ assert.gt(elapsed, minimumOperationDurationMS - epsilon);
+
+ runRetryableWrite("during resharding after collection cloning had finished",
+ ErrorCodes.IncompleteTransactionHistory);
+ });
+
+runRetryableWrite("after resharding", ErrorCodes.IncompleteTransactionHistory);
+
+reshardingTest.teardown();
+})(); \ No newline at end of file
diff --git a/src/mongo/db/s/collection_metadata_test.cpp b/src/mongo/db/s/collection_metadata_test.cpp
index 5bbf258a5ab..b5f9e77045d 100644
--- a/src/mongo/db/s/collection_metadata_test.cpp
+++ b/src/mongo/db/s/collection_metadata_test.cpp
@@ -118,7 +118,7 @@ protected:
if (state == CoordinatorStateEnum::kDecisionPersisted) {
TypeCollectionRecipientFields recipientFields{
- {kThisShard, kOtherShard}, existingUuid, kNss};
+ {kThisShard, kOtherShard}, existingUuid, kNss, 5000};
reshardingFields.setRecipientFields(std::move(recipientFields));
} else if (state == CoordinatorStateEnum::kBlockingWrites) {
TypeCollectionDonorFields donorFields{
diff --git a/src/mongo/db/s/resharding/recipient_document.idl b/src/mongo/db/s/resharding/recipient_document.idl
index 4c56423307e..5efe3d36deb 100644
--- a/src/mongo/db/s/resharding/recipient_document.idl
+++ b/src/mongo/db/s/resharding/recipient_document.idl
@@ -54,3 +54,5 @@ structs:
donorShards:
type: array<shard_id>
description: "The list of donor shards that report to this recipient."
+ minimumOperationDurationMillis:
+ type: long
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
index 88aa8f87930..7811e6db54a 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
@@ -197,10 +197,14 @@ void writeToCoordinatorStateNss(OperationContext* opCtx,
*/
TypeCollectionRecipientFields constructRecipientFields(
const ReshardingCoordinatorDocument& coordinatorDoc) {
- auto donorShardIds = resharding::extractShardIds(coordinatorDoc.getDonorShards());
TypeCollectionRecipientFields recipientFields(
- std::move(donorShardIds), coordinatorDoc.getSourceUUID(), coordinatorDoc.getSourceNss());
+ resharding::extractShardIds(coordinatorDoc.getDonorShards()),
+ coordinatorDoc.getSourceUUID(),
+ coordinatorDoc.getSourceNss(),
+ resharding::gReshardingMinimumOperationDurationMillis.load());
+
emplaceFetchTimestampIfExists(recipientFields, coordinatorDoc.getFetchTimestamp());
+
return recipientFields;
}
diff --git a/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp b/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp
index 18d10d50547..4ac8df3cd22 100644
--- a/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp
+++ b/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp
@@ -333,7 +333,9 @@ ReshardingRecipientDocument constructRecipientDocumentFromReshardingFields(
recipientCtx.setState(RecipientStateEnum::kAwaitingFetchTimestamp);
auto recipientDoc = ReshardingRecipientDocument{
- std::move(recipientCtx), reshardingFields.getRecipientFields()->getDonorShardIds()};
+ std::move(recipientCtx),
+ reshardingFields.getRecipientFields()->getDonorShardIds(),
+ reshardingFields.getRecipientFields()->getMinimumOperationDurationMillis()};
auto sourceNss = reshardingFields.getRecipientFields()->getSourceNss();
auto sourceUUID = reshardingFields.getRecipientFields()->getSourceUUID();
diff --git a/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.h b/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.h
index d3ecac2041b..b707a975444 100644
--- a/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.h
+++ b/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.h
@@ -146,7 +146,7 @@ protected:
const NamespaceString& originalNss,
const boost::optional<Timestamp>& fetchTimestamp = boost::none) {
auto recipientFields =
- TypeCollectionRecipientFields(donorShardIds, existingUUID, originalNss);
+ TypeCollectionRecipientFields(donorShardIds, existingUUID, originalNss, 5000);
emplaceFetchTimestampIfExists(recipientFields, fetchTimestamp);
fields.setRecipientFields(std::move(recipientFields));
}
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.cpp b/src/mongo/db/s/resharding/resharding_recipient_service.cpp
index d27bca9afec..dd1761a6742 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_recipient_service.cpp
@@ -226,6 +226,7 @@ ReshardingRecipientService::RecipientStateMachine::RecipientStateMachine(
: repl::PrimaryOnlyService::TypedInstance<RecipientStateMachine>(),
_metadata{recipientDoc.getCommonReshardingMetadata()},
_donorShardIds{recipientDoc.getDonorShards()},
+ _minimumOperationDuration{Milliseconds{recipientDoc.getMinimumOperationDurationMillis()}},
_recipientCtx{recipientDoc.getMutableState()},
_fetchTimestamp{recipientDoc.getFetchTimestamp()} {}
@@ -433,9 +434,7 @@ void ReshardingRecipientService::RecipientStateMachine::_initTxnCloner(
catalogCache->getShardedCollectionRoutingInfo(opCtx, _metadata.getSourceNss());
std::set<ShardId> shardList;
- const auto myShardId = ShardingState::get(opCtx)->shardId();
routingInfo.getAllShardIds(&shardList);
- shardList.erase(myShardId);
for (const auto& shard : shardList) {
_txnCloners.push_back(std::make_unique<ReshardingTxnCloner>(
@@ -509,21 +508,25 @@ ReshardingRecipientService::RecipientStateMachine::_cloneThenTransitionToApplyin
}));
}
- return _collectionCloner->run(**executor, abortToken)
- .then([this, executor, abortToken] {
- if (_txnCloners.empty()) {
- return SemiFuture<void>::makeReady();
- }
+ return whenAllSucceed(_collectionCloner->run(**executor, abortToken),
+ (*executor)
+ ->sleepFor(_minimumOperationDuration, abortToken)
+ .then([this, executor, abortToken] {
+ if (_txnCloners.empty()) {
+ return SemiFuture<void>::makeReady();
+ }
- auto serviceContext = Client::getCurrent()->getServiceContext();
+ auto serviceContext = Client::getCurrent()->getServiceContext();
- std::vector<ExecutorFuture<void>> txnClonerFutures;
- for (auto&& txnCloner : _txnCloners) {
- txnClonerFutures.push_back(txnCloner->run(serviceContext, **executor, abortToken));
- }
+ std::vector<ExecutorFuture<void>> txnClonerFutures;
+ for (auto&& txnCloner : _txnCloners) {
+ txnClonerFutures.push_back(
+ txnCloner->run(serviceContext, **executor, abortToken));
+ }
- return whenAllSucceed(std::move(txnClonerFutures));
- })
+ return whenAllSucceed(std::move(txnClonerFutures));
+ }))
+ .thenRunOn(**executor)
.then([this] {
// ReshardingTxnCloners must complete before the recipient transitions to kApplying to
// avoid errors caused by donor shards unpinning the fetchTimestamp.
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.h b/src/mongo/db/s/resharding/resharding_recipient_service.h
index 1e667da7195..8460178d087 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service.h
+++ b/src/mongo/db/s/resharding/resharding_recipient_service.h
@@ -205,6 +205,7 @@ private:
// config.localReshardingOperations.recipient.
const CommonReshardingMetadata _metadata;
const std::vector<ShardId> _donorShardIds;
+ const Milliseconds _minimumOperationDuration;
// The in-memory representation of the mutable portion of the document in
// config.localReshardingOperations.recipient.
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp b/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp
index 5d73cd21ddd..cc725f3b3d6 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp
@@ -154,6 +154,7 @@ public:
// Populating the set of donor shard ids isn't necessary to test the functionality of
// creating the temporary resharding collection.
recipientFields.setDonorShardIds({});
+ recipientFields.setMinimumOperationDurationMillis(5000);
reshardingFields.setRecipientFields(recipientFields);
coll.setReshardingFields(reshardingFields);
diff --git a/src/mongo/s/resharding/type_collection_fields.idl b/src/mongo/s/resharding/type_collection_fields.idl
index d335500a5c4..97519598445 100644
--- a/src/mongo/s/resharding/type_collection_fields.idl
+++ b/src/mongo/s/resharding/type_collection_fields.idl
@@ -73,6 +73,8 @@ structs:
cpp_name: sourceNss
type: namespacestring
description: "The namespace of the collection being resharded."
+ minimumOperationDurationMillis:
+ type: long
TypeCollectionReshardingFields:
description: "Resharding-related fields meant to be stored in a config.collections
diff --git a/src/mongo/shell/servers.js b/src/mongo/shell/servers.js
index fb0cea95612..1a886bd11d8 100644
--- a/src/mongo/shell/servers.js
+++ b/src/mongo/shell/servers.js
@@ -1386,6 +1386,11 @@ function appendSetParameterArgs(argArray) {
continue;
}
+ if (paramName === "reshardingMinimumOperationDurationMillis" &&
+ argArrayContains("reshardingMinimumOperationDurationMillis")) {
+ continue;
+ }
+
const paramVal = ((param) => {
if (typeof param === "object") {
return JSON.stringify(param);