summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTommaso Tocci <tommaso.tocci@mongodb.com>2020-10-29 16:34:03 +0100
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-11-11 18:15:09 +0000
commit3eabee611446687824c2560ab78dc51e57b91d75 (patch)
tree1dbac66311e705ffc7dbafabc7a088ccb8038309
parentae52fb0d0ccdf33a626404fb1ac8a2ba4ea5d08b (diff)
downloadmongo-3eabee611446687824c2560ab78dc51e57b91d75.tar.gz
SERVER-28943 Make shards retry non-write commands on stale version exceptions
-rw-r--r--jstests/sharding/query/agg_shard_targeting.js24
-rw-r--r--jstests/sharding/query/lookup_graph_lookup_foreign_becomes_sharded.js10
-rw-r--r--jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js14
-rw-r--r--jstests/sharding/secondary_shard_version_protocol_with_causal_consistency.js15
-rw-r--r--jstests/sharding/transactions_stale_shard_version_errors.js32
-rw-r--r--jstests/sharding/union_with_read_preference.js8
-rw-r--r--src/mongo/db/service_entry_point_common.cpp55
-rw-r--r--src/mongo/db/service_entry_point_common.h6
-rw-r--r--src/mongo/db/service_entry_point_mongod.cpp43
-rw-r--r--src/mongo/embedded/service_entry_point_embedded.cpp10
10 files changed, 104 insertions, 113 deletions
diff --git a/jstests/sharding/query/agg_shard_targeting.js b/jstests/sharding/query/agg_shard_targeting.js
index 59f40a7f284..6ef9cc60cb8 100644
--- a/jstests/sharding/query/agg_shard_targeting.js
+++ b/jstests/sharding/query/agg_shard_targeting.js
@@ -226,18 +226,6 @@ function runAggShardTargetTest({splitPoint}) {
}
});
- // - One aggregation on st.shard1.shardName with a shard version exception (indicating that
- // the shard was stale).
- profilerHasSingleMatchingEntryOrThrow({
- profileDB: shard1DB,
- filter: {
- "command.aggregate": mongosColl.getName(),
- "command.comment": testName,
- "command.pipeline.$mergeCursors": {$exists: false},
- errCode: {$in: shardExceptions}
- }
- });
-
// - At most two aggregations on st.shard0.shardName with no stale config exceptions. The
// first, if present, is an aborted cursor created if the command reaches
// st.shard0.shardName before st.shard1.shardName throws its stale config exception during
@@ -325,18 +313,6 @@ function runAggShardTargetTest({splitPoint}) {
}
});
- // - One aggregation on st.shard0.shardName with a shard version exception (indicating that
- // the shard was stale).
- profilerHasSingleMatchingEntryOrThrow({
- profileDB: shard0DB,
- filter: {
- "command.aggregate": mongosColl.getName(),
- "command.comment": testName,
- "command.pipeline.$mergeCursors": {$exists: false},
- errCode: {$in: shardExceptions}
- }
- });
-
// - At most two aggregations on st.shard0.shardName with no stale config exceptions. The
// first, if present, is an aborted cursor created if the command reaches
// st.shard0.shardName before st.shard1.shardName throws its stale config exception during
diff --git a/jstests/sharding/query/lookup_graph_lookup_foreign_becomes_sharded.js b/jstests/sharding/query/lookup_graph_lookup_foreign_becomes_sharded.js
index f455ec20228..4bfc3e3e7a5 100644
--- a/jstests/sharding/query/lookup_graph_lookup_foreign_becomes_sharded.js
+++ b/jstests/sharding/query/lookup_graph_lookup_foreign_becomes_sharded.js
@@ -161,16 +161,6 @@ for (let testCase of testCases) {
assert.eq(aggCmdRes.cursor.firstBatch.length, batchSize);
}
-// Confirm that the profiler shows a single StaleConfig exception for the source namespace...
-profilerHasSingleMatchingEntryOrThrow({
- profileDB: primaryDB,
- filter: {
- ns: sourceCollection.getFullName(),
- errCode: ErrorCodes.StaleConfig,
- errMsg: {$regex: `${sourceCollection.getFullName()} is not currently known`}
- }
-});
-
// ... and a single StaleConfig exception for the foreign namespace. Note that the 'ns' field of the
// profiler entry is the source collection in both cases, because the $lookup's parent aggregation
// produces the profiler entry, and it is always running on the source collection.
diff --git a/jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js b/jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js
index cef8487c207..0859b10e41a 100644
--- a/jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js
+++ b/jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js
@@ -554,20 +554,6 @@ for (let command of commands) {
});
// Check that the recipient shard secondary received the request with local read concern
- // and also returned stale shardVersion once, even though the mongos is fresh, because
- // the secondary was stale.
- profilerHasSingleMatchingEntryOrThrow({
- profileDB: recipientShardSecondary.getDB(db),
- filter: Object.extend({
- "command.shardVersion": {"$exists": true},
- "command.$readPreference": {"mode": "secondary"},
- "command.readConcern": {"level": "local"},
- "errCode": ErrorCodes.StaleConfig
- },
- commandProfile)
- });
-
- // Check that the recipient shard secondary received the request with local read concern
// again and finally returned success.
profilerHasSingleMatchingEntryOrThrow({
profileDB: recipientShardSecondary.getDB(db),
diff --git a/jstests/sharding/secondary_shard_version_protocol_with_causal_consistency.js b/jstests/sharding/secondary_shard_version_protocol_with_causal_consistency.js
index f6389079c81..59cb577849e 100644
--- a/jstests/sharding/secondary_shard_version_protocol_with_causal_consistency.js
+++ b/jstests/sharding/secondary_shard_version_protocol_with_causal_consistency.js
@@ -81,21 +81,6 @@ profilerHasSingleMatchingEntryOrThrow({
}
});
-// The recipient shard will then return a stale shard version error because it needs to refresh
-// its own routing table.
-profilerHasSingleMatchingEntryOrThrow({
- profileDB: recipientShardSecondary.getDB(dbName),
- filter: {
- "ns": ns,
- "command.count": collName,
- "command.query": {x: 1},
- "command.shardVersion": {"$exists": true},
- "command.$readPreference": {"mode": "secondary"},
- "command.readConcern.afterClusterTime": {"$exists": true},
- "errCode": ErrorCodes.StaleConfig
- }
-});
-
// Finally, the command is retried on the recipient shard and succeeds.
profilerHasSingleMatchingEntryOrThrow({
profileDB: recipientShardSecondary.getDB(dbName),
diff --git a/jstests/sharding/transactions_stale_shard_version_errors.js b/jstests/sharding/transactions_stale_shard_version_errors.js
index 2ff76e94b0c..12debac39ab 100644
--- a/jstests/sharding/transactions_stale_shard_version_errors.js
+++ b/jstests/sharding/transactions_stale_shard_version_errors.js
@@ -1,10 +1,15 @@
// Tests mongos behavior on stale shard version errors received in a transaction.
//
-// @tags: [requires_sharding, uses_transactions, uses_multi_shard_transaction]
+// @tags: [
+// requires_sharding,
+// uses_transactions,
+// uses_multi_shard_transaction,
+// ]
(function() {
"use strict";
load("jstests/sharding/libs/sharded_transactions_helpers.js");
+load("jstests/multiVersion/libs/verify_versions.js");
function expectChunks(st, ns, chunks) {
for (let i = 0; i < chunks.length; i++) {
@@ -208,13 +213,24 @@ session.startTransaction();
assert.commandWorked(sessionDB.runCommand({insert: collName, documents: [{_id: -4}]}));
// Targets Shard2, which is stale.
-res = assert.commandFailedWithCode(sessionDB.runCommand({insert: collName, documents: [{_id: 7}]}),
- ErrorCodes.StaleConfig);
-assert.eq(res.errorLabels, ["TransientTransactionError"]);
-
-// The transaction should have been implicitly aborted on all shards.
-assertNoSuchTransactionOnAllShards(st, session.getSessionId(), session.getTxnNumber_forTesting());
-assert.commandFailedWithCode(session.abortTransaction_forTesting(), ErrorCodes.NoSuchTransaction);
+let shard2Version = st.shard2.getBinVersion();
+jsTest.log("Binary version of shard2: " + MongoRunner.getBinVersionFor(shard2Version));
+if (MongoRunner.compareBinVersions(shard2Version, "4.9") < 0) {
+ // TODO SERVER-52782 remove this if branch when 5.0 becomes last-lts
+ res = assert.commandFailedWithCode(
+ sessionDB.runCommand({insert: collName, documents: [{_id: 7}]}), ErrorCodes.StaleConfig);
+ assert.eq(res.errorLabels, ["TransientTransactionError"]);
+
+ // The transaction should have been implicitly aborted on all shards.
+ assertNoSuchTransactionOnAllShards(
+ st, session.getSessionId(), session.getTxnNumber_forTesting());
+ assert.commandFailedWithCode(session.abortTransaction_forTesting(),
+ ErrorCodes.NoSuchTransaction);
+} else {
+ assert.commandWorked(sessionDB.runCommand({insert: collName, documents: [{_id: 7}]}));
+
+ assert.commandWorked(session.abortTransaction_forTesting());
+}
//
// The final StaleConfig error should be returned if the router exhausts its retries.
diff --git a/jstests/sharding/union_with_read_preference.js b/jstests/sharding/union_with_read_preference.js
index acb15533300..74f6bf26d79 100644
--- a/jstests/sharding/union_with_read_preference.js
+++ b/jstests/sharding/union_with_read_preference.js
@@ -54,7 +54,7 @@ assert.eq(mongosColl
// Test that the union's sub-pipelines go to the primary.
for (let rs of [st.rs0, st.rs1]) {
const primaryDB = rs.getPrimary().getDB(dbName);
- profilerHasSingleMatchingEntryOrThrow({
+ profilerHasAtLeastOneMatchingEntryOrThrow({
profileDB: primaryDB,
filter: {
ns: unionedColl.getFullName(),
@@ -79,7 +79,7 @@ assert.eq(mongosColl
// Test that the union's sub-pipelines go to the secondary.
for (let rs of [st.rs0, st.rs1]) {
const secondaryDB = rs.getSecondary().getDB(dbName);
- profilerHasSingleMatchingEntryOrThrow({
+ profilerHasAtLeastOneMatchingEntryOrThrow({
profileDB: secondaryDB,
filter: {
ns: unionedColl.getFullName(),
@@ -136,7 +136,7 @@ assert.eq(runAgg(), [{_id: -1, docNum: [0, 2, 4]}, {_id: 1, docNum: [1, 3, 5]}])
for (let rs of [st.rs0, st.rs1]) {
jsTestLog(`Testing profile on shard ${rs.getURL()}`);
const secondaryDB = rs.getSecondary().getDB(dbName);
- profilerHasSingleMatchingEntryOrThrow({
+ profilerHasAtLeastOneMatchingEntryOrThrow({
profileDB: secondaryDB,
filter: {
ns: unionedColl.getFullName(),
@@ -148,7 +148,7 @@ for (let rs of [st.rs0, st.rs1]) {
errCode: {$ne: ErrorCodes.StaleConfig}
}
});
- profilerHasSingleMatchingEntryOrThrow({
+ profilerHasAtLeastOneMatchingEntryOrThrow({
profileDB: secondaryDB,
filter: {
ns: secondTargetColl.getFullName(),
diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp
index ab449e2b9a7..1ce882c43cb 100644
--- a/src/mongo/db/service_entry_point_common.cpp
+++ b/src/mongo/db/service_entry_point_common.cpp
@@ -666,6 +666,8 @@ private:
boost::optional<ResourceConsumption::ScopedMetricsCollector> _scopedMetrics;
boost::optional<ImpersonationSessionGuard> _impersonationSessionGuard;
std::unique_ptr<PolymorphicScoped> _scoped;
+ bool _refreshedDatabase = false;
+ bool _refreshedCollection = false;
};
class RunCommandImpl : public std::enable_shared_from_this<RunCommandImpl> {
@@ -1556,15 +1558,60 @@ Future<void> ExecCommandDatabase::_initiateCommand() try {
rpc::TrackingMetadata::get(opCtx).setIsLogged(true);
}
- _execContext->behaviors->waitForReadConcern(opCtx, _invocation.get(), request);
- _execContext->behaviors->setPrepareConflictBehaviorForReadConcern(opCtx, _invocation.get());
return Status::OK();
} catch (const DBException& ex) {
return ex.toStatus();
}
Future<void> ExecCommandDatabase::_commandExec() {
- return RunCommandImpl::run(shared_from_this());
+ auto opCtx = _execContext->getOpCtx();
+ auto& request = _execContext->getRequest();
+
+ _execContext->behaviors->waitForReadConcern(opCtx, _invocation.get(), request);
+ _execContext->behaviors->setPrepareConflictBehaviorForReadConcern(opCtx, _invocation.get());
+ _execContext->getReplyBuilder()->reset();
+
+ return RunCommandImpl::run(shared_from_this())
+ .onError<ErrorCodes::StaleDbVersion>(
+ [this, anchor = shared_from_this()](Status s) -> Future<void> {
+ auto opCtx = _execContext->getOpCtx();
+
+ if (!opCtx->getClient()->isInDirectClient() &&
+ serverGlobalParams.clusterRole != ClusterRole::ConfigServer &&
+ !_refreshedDatabase) {
+ auto sce = s.extraInfo<StaleDbRoutingVersion>();
+ invariant(sce);
+ // TODO SERVER-52784 refresh only if wantedVersion is empty or less then
+ // received
+ const auto refreshed = _execContext->behaviors->refreshDatabase(opCtx, *sce);
+ if (refreshed) {
+ _refreshedDatabase = true;
+ return _commandExec();
+ }
+ }
+
+ return s;
+ })
+ .onErrorCategory<ErrorCategory::StaleShardVersionError>([this, anchor = shared_from_this()](
+ Status s) -> Future<void> {
+ auto opCtx = _execContext->getOpCtx();
+
+ if (!opCtx->getClient()->isInDirectClient() &&
+ serverGlobalParams.clusterRole != ClusterRole::ConfigServer &&
+ !_refreshedCollection) {
+ if (auto sce = s.extraInfo<StaleConfigInfo>()) {
+ // TODO SERVER-52784 refresh only if wantedVersion is empty or less then
+ // received
+ const auto refreshed = _execContext->behaviors->refreshCollection(opCtx, *sce);
+ if (refreshed) {
+ _refreshedCollection = true;
+ return _commandExec();
+ }
+ }
+ }
+
+ return s;
+ });
}
void ExecCommandDatabase::_handleFailure(Status status) {
@@ -1578,8 +1625,6 @@ void ExecCommandDatabase::_handleFailure(Status status) {
auto replyBuilder = _execContext->getReplyBuilder();
const auto& behaviors = *_execContext->behaviors;
- behaviors.handleException(status, opCtx);
-
// Append the error labels for transient transaction errors.
auto response = _extraFieldsBuilder.asTempObj();
boost::optional<ErrorCodes::Error> wcCode;
diff --git a/src/mongo/db/service_entry_point_common.h b/src/mongo/db/service_entry_point_common.h
index dbf95b165a2..d3edbacae12 100644
--- a/src/mongo/db/service_entry_point_common.h
+++ b/src/mongo/db/service_entry_point_common.h
@@ -84,7 +84,11 @@ struct ServiceEntryPointCommon {
virtual void attachCurOpErrInfo(OperationContext* opCtx, const BSONObj& replyObj) const = 0;
- virtual void handleException(const Status& status, OperationContext* opCtx) const = 0;
+ virtual bool refreshDatabase(OperationContext* opCtx, const StaleDbRoutingVersion& se) const
+ noexcept = 0;
+
+ virtual bool refreshCollection(OperationContext* opCtx, const StaleConfigInfo& se) const
+ noexcept = 0;
virtual void advanceConfigOpTimeFromRequestMetadata(OperationContext* opCtx) const = 0;
diff --git a/src/mongo/db/service_entry_point_mongod.cpp b/src/mongo/db/service_entry_point_mongod.cpp
index 4347ffa8ac4..324885da92c 100644
--- a/src/mongo/db/service_entry_point_mongod.cpp
+++ b/src/mongo/db/service_entry_point_mongod.cpp
@@ -182,37 +182,6 @@ public:
CurOp::get(opCtx)->debug().errInfo = getStatusFromCommandResult(replyObj);
}
- void handleException(const Status& status, OperationContext* opCtx) const override {
- // If we got a stale config, wait in case the operation is stuck in a critical section
- if (auto sce = status.extraInfo<StaleConfigInfo>()) {
- // A config server acting as a router may return a StaleConfig exception, but a config
- // server won't contain data for a sharded collection, so skip handling the exception.
- if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) {
- return;
- }
-
- if (sce->getCriticalSectionSignal()) {
- // Set migration critical section on operation sharding state: operation will wait
- // for the migration to finish before returning.
- auto& oss = OperationShardingState::get(opCtx);
- oss.setMigrationCriticalSectionSignal(sce->getCriticalSectionSignal());
- }
-
- if (!opCtx->getClient()->isInDirectClient()) {
- // We already have the StaleConfig exception, so just swallow any errors due to
- // refresh
- onShardVersionMismatchNoExcept(opCtx, sce->getNss(), sce->getVersionReceived())
- .ignore();
- }
- } else if (auto sce = status.extraInfo<StaleDbRoutingVersion>()) {
- if (!opCtx->getClient()->isInDirectClient()) {
- onDbVersionMismatchNoExcept(
- opCtx, sce->getDb(), sce->getVersionReceived(), sce->getVersionWanted())
- .ignore();
- }
- }
- }
-
// Called from the error contexts where request may not be available.
void appendReplyMetadataOnError(OperationContext* opCtx,
BSONObjBuilder* metadataBob) const override {
@@ -258,6 +227,18 @@ public:
}
}
+ bool refreshDatabase(OperationContext* opCtx, const StaleDbRoutingVersion& se) const
+ noexcept override {
+ return onDbVersionMismatchNoExcept(
+ opCtx, se.getDb(), se.getVersionReceived(), se.getVersionWanted())
+ .isOK();
+ }
+
+ bool refreshCollection(OperationContext* opCtx, const StaleConfigInfo& se) const
+ noexcept override {
+ return onShardVersionMismatchNoExcept(opCtx, se.getNss(), se.getVersionReceived()).isOK();
+ }
+
void advanceConfigOpTimeFromRequestMetadata(OperationContext* opCtx) const override {
// Handle config optime information that may have been sent along with the command.
rpc::advanceConfigOpTimeFromRequestMetadata(opCtx);
diff --git a/src/mongo/embedded/service_entry_point_embedded.cpp b/src/mongo/embedded/service_entry_point_embedded.cpp
index 158b55a10e6..efc8926edb0 100644
--- a/src/mongo/embedded/service_entry_point_embedded.cpp
+++ b/src/mongo/embedded/service_entry_point_embedded.cpp
@@ -93,7 +93,15 @@ public:
void attachCurOpErrInfo(OperationContext*, const BSONObj&) const override {}
- void handleException(const Status& status, OperationContext* opCtx) const override {}
+ bool refreshDatabase(OperationContext* opCtx, const StaleDbRoutingVersion& se) const
+ noexcept override {
+ return false;
+ }
+
+ bool refreshCollection(OperationContext* opCtx, const StaleConfigInfo& se) const
+ noexcept override {
+ return false;
+ }
void advanceConfigOpTimeFromRequestMetadata(OperationContext* opCtx) const override {}