diff options
author | Tommaso Tocci <tommaso.tocci@mongodb.com> | 2020-10-29 16:34:03 +0100 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-11-11 18:15:09 +0000 |
commit | 3eabee611446687824c2560ab78dc51e57b91d75 (patch) | |
tree | 1dbac66311e705ffc7dbafabc7a088ccb8038309 | |
parent | ae52fb0d0ccdf33a626404fb1ac8a2ba4ea5d08b (diff) | |
download | mongo-3eabee611446687824c2560ab78dc51e57b91d75.tar.gz |
SERVER-28943 Make shards retry non-write commands on stale version exceptions
10 files changed, 104 insertions, 113 deletions
diff --git a/jstests/sharding/query/agg_shard_targeting.js b/jstests/sharding/query/agg_shard_targeting.js index 59f40a7f284..6ef9cc60cb8 100644 --- a/jstests/sharding/query/agg_shard_targeting.js +++ b/jstests/sharding/query/agg_shard_targeting.js @@ -226,18 +226,6 @@ function runAggShardTargetTest({splitPoint}) { } }); - // - One aggregation on st.shard1.shardName with a shard version exception (indicating that - // the shard was stale). - profilerHasSingleMatchingEntryOrThrow({ - profileDB: shard1DB, - filter: { - "command.aggregate": mongosColl.getName(), - "command.comment": testName, - "command.pipeline.$mergeCursors": {$exists: false}, - errCode: {$in: shardExceptions} - } - }); - // - At most two aggregations on st.shard0.shardName with no stale config exceptions. The // first, if present, is an aborted cursor created if the command reaches // st.shard0.shardName before st.shard1.shardName throws its stale config exception during @@ -325,18 +313,6 @@ function runAggShardTargetTest({splitPoint}) { } }); - // - One aggregation on st.shard0.shardName with a shard version exception (indicating that - // the shard was stale). - profilerHasSingleMatchingEntryOrThrow({ - profileDB: shard0DB, - filter: { - "command.aggregate": mongosColl.getName(), - "command.comment": testName, - "command.pipeline.$mergeCursors": {$exists: false}, - errCode: {$in: shardExceptions} - } - }); - // - At most two aggregations on st.shard0.shardName with no stale config exceptions. The // first, if present, is an aborted cursor created if the command reaches // st.shard0.shardName before st.shard1.shardName throws its stale config exception during diff --git a/jstests/sharding/query/lookup_graph_lookup_foreign_becomes_sharded.js b/jstests/sharding/query/lookup_graph_lookup_foreign_becomes_sharded.js index f455ec20228..4bfc3e3e7a5 100644 --- a/jstests/sharding/query/lookup_graph_lookup_foreign_becomes_sharded.js +++ b/jstests/sharding/query/lookup_graph_lookup_foreign_becomes_sharded.js @@ -161,16 +161,6 @@ for (let testCase of testCases) { assert.eq(aggCmdRes.cursor.firstBatch.length, batchSize); } -// Confirm that the profiler shows a single StaleConfig exception for the source namespace... -profilerHasSingleMatchingEntryOrThrow({ - profileDB: primaryDB, - filter: { - ns: sourceCollection.getFullName(), - errCode: ErrorCodes.StaleConfig, - errMsg: {$regex: `${sourceCollection.getFullName()} is not currently known`} - } -}); - // ... and a single StaleConfig exception for the foreign namespace. Note that the 'ns' field of the // profiler entry is the source collection in both cases, because the $lookup's parent aggregation // produces the profiler entry, and it is always running on the source collection. diff --git a/jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js b/jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js index cef8487c207..0859b10e41a 100644 --- a/jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js +++ b/jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js @@ -554,20 +554,6 @@ for (let command of commands) { }); // Check that the recipient shard secondary received the request with local read concern - // and also returned stale shardVersion once, even though the mongos is fresh, because - // the secondary was stale. - profilerHasSingleMatchingEntryOrThrow({ - profileDB: recipientShardSecondary.getDB(db), - filter: Object.extend({ - "command.shardVersion": {"$exists": true}, - "command.$readPreference": {"mode": "secondary"}, - "command.readConcern": {"level": "local"}, - "errCode": ErrorCodes.StaleConfig - }, - commandProfile) - }); - - // Check that the recipient shard secondary received the request with local read concern // again and finally returned success. profilerHasSingleMatchingEntryOrThrow({ profileDB: recipientShardSecondary.getDB(db), diff --git a/jstests/sharding/secondary_shard_version_protocol_with_causal_consistency.js b/jstests/sharding/secondary_shard_version_protocol_with_causal_consistency.js index f6389079c81..59cb577849e 100644 --- a/jstests/sharding/secondary_shard_version_protocol_with_causal_consistency.js +++ b/jstests/sharding/secondary_shard_version_protocol_with_causal_consistency.js @@ -81,21 +81,6 @@ profilerHasSingleMatchingEntryOrThrow({ } }); -// The recipient shard will then return a stale shard version error because it needs to refresh -// its own routing table. -profilerHasSingleMatchingEntryOrThrow({ - profileDB: recipientShardSecondary.getDB(dbName), - filter: { - "ns": ns, - "command.count": collName, - "command.query": {x: 1}, - "command.shardVersion": {"$exists": true}, - "command.$readPreference": {"mode": "secondary"}, - "command.readConcern.afterClusterTime": {"$exists": true}, - "errCode": ErrorCodes.StaleConfig - } -}); - // Finally, the command is retried on the recipient shard and succeeds. profilerHasSingleMatchingEntryOrThrow({ profileDB: recipientShardSecondary.getDB(dbName), diff --git a/jstests/sharding/transactions_stale_shard_version_errors.js b/jstests/sharding/transactions_stale_shard_version_errors.js index 2ff76e94b0c..12debac39ab 100644 --- a/jstests/sharding/transactions_stale_shard_version_errors.js +++ b/jstests/sharding/transactions_stale_shard_version_errors.js @@ -1,10 +1,15 @@ // Tests mongos behavior on stale shard version errors received in a transaction. // -// @tags: [requires_sharding, uses_transactions, uses_multi_shard_transaction] +// @tags: [ +// requires_sharding, +// uses_transactions, +// uses_multi_shard_transaction, +// ] (function() { "use strict"; load("jstests/sharding/libs/sharded_transactions_helpers.js"); +load("jstests/multiVersion/libs/verify_versions.js"); function expectChunks(st, ns, chunks) { for (let i = 0; i < chunks.length; i++) { @@ -208,13 +213,24 @@ session.startTransaction(); assert.commandWorked(sessionDB.runCommand({insert: collName, documents: [{_id: -4}]})); // Targets Shard2, which is stale. -res = assert.commandFailedWithCode(sessionDB.runCommand({insert: collName, documents: [{_id: 7}]}), - ErrorCodes.StaleConfig); -assert.eq(res.errorLabels, ["TransientTransactionError"]); - -// The transaction should have been implicitly aborted on all shards. -assertNoSuchTransactionOnAllShards(st, session.getSessionId(), session.getTxnNumber_forTesting()); -assert.commandFailedWithCode(session.abortTransaction_forTesting(), ErrorCodes.NoSuchTransaction); +let shard2Version = st.shard2.getBinVersion(); +jsTest.log("Binary version of shard2: " + MongoRunner.getBinVersionFor(shard2Version)); +if (MongoRunner.compareBinVersions(shard2Version, "4.9") < 0) { + // TODO SERVER-52782 remove this if branch when 5.0 becomes last-lts + res = assert.commandFailedWithCode( + sessionDB.runCommand({insert: collName, documents: [{_id: 7}]}), ErrorCodes.StaleConfig); + assert.eq(res.errorLabels, ["TransientTransactionError"]); + + // The transaction should have been implicitly aborted on all shards. + assertNoSuchTransactionOnAllShards( + st, session.getSessionId(), session.getTxnNumber_forTesting()); + assert.commandFailedWithCode(session.abortTransaction_forTesting(), + ErrorCodes.NoSuchTransaction); +} else { + assert.commandWorked(sessionDB.runCommand({insert: collName, documents: [{_id: 7}]})); + + assert.commandWorked(session.abortTransaction_forTesting()); +} // // The final StaleConfig error should be returned if the router exhausts its retries. diff --git a/jstests/sharding/union_with_read_preference.js b/jstests/sharding/union_with_read_preference.js index acb15533300..74f6bf26d79 100644 --- a/jstests/sharding/union_with_read_preference.js +++ b/jstests/sharding/union_with_read_preference.js @@ -54,7 +54,7 @@ assert.eq(mongosColl // Test that the union's sub-pipelines go to the primary. for (let rs of [st.rs0, st.rs1]) { const primaryDB = rs.getPrimary().getDB(dbName); - profilerHasSingleMatchingEntryOrThrow({ + profilerHasAtLeastOneMatchingEntryOrThrow({ profileDB: primaryDB, filter: { ns: unionedColl.getFullName(), @@ -79,7 +79,7 @@ assert.eq(mongosColl // Test that the union's sub-pipelines go to the secondary. for (let rs of [st.rs0, st.rs1]) { const secondaryDB = rs.getSecondary().getDB(dbName); - profilerHasSingleMatchingEntryOrThrow({ + profilerHasAtLeastOneMatchingEntryOrThrow({ profileDB: secondaryDB, filter: { ns: unionedColl.getFullName(), @@ -136,7 +136,7 @@ assert.eq(runAgg(), [{_id: -1, docNum: [0, 2, 4]}, {_id: 1, docNum: [1, 3, 5]}]) for (let rs of [st.rs0, st.rs1]) { jsTestLog(`Testing profile on shard ${rs.getURL()}`); const secondaryDB = rs.getSecondary().getDB(dbName); - profilerHasSingleMatchingEntryOrThrow({ + profilerHasAtLeastOneMatchingEntryOrThrow({ profileDB: secondaryDB, filter: { ns: unionedColl.getFullName(), @@ -148,7 +148,7 @@ for (let rs of [st.rs0, st.rs1]) { errCode: {$ne: ErrorCodes.StaleConfig} } }); - profilerHasSingleMatchingEntryOrThrow({ + profilerHasAtLeastOneMatchingEntryOrThrow({ profileDB: secondaryDB, filter: { ns: secondTargetColl.getFullName(), diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp index ab449e2b9a7..1ce882c43cb 100644 --- a/src/mongo/db/service_entry_point_common.cpp +++ b/src/mongo/db/service_entry_point_common.cpp @@ -666,6 +666,8 @@ private: boost::optional<ResourceConsumption::ScopedMetricsCollector> _scopedMetrics; boost::optional<ImpersonationSessionGuard> _impersonationSessionGuard; std::unique_ptr<PolymorphicScoped> _scoped; + bool _refreshedDatabase = false; + bool _refreshedCollection = false; }; class RunCommandImpl : public std::enable_shared_from_this<RunCommandImpl> { @@ -1556,15 +1558,60 @@ Future<void> ExecCommandDatabase::_initiateCommand() try { rpc::TrackingMetadata::get(opCtx).setIsLogged(true); } - _execContext->behaviors->waitForReadConcern(opCtx, _invocation.get(), request); - _execContext->behaviors->setPrepareConflictBehaviorForReadConcern(opCtx, _invocation.get()); return Status::OK(); } catch (const DBException& ex) { return ex.toStatus(); } Future<void> ExecCommandDatabase::_commandExec() { - return RunCommandImpl::run(shared_from_this()); + auto opCtx = _execContext->getOpCtx(); + auto& request = _execContext->getRequest(); + + _execContext->behaviors->waitForReadConcern(opCtx, _invocation.get(), request); + _execContext->behaviors->setPrepareConflictBehaviorForReadConcern(opCtx, _invocation.get()); + _execContext->getReplyBuilder()->reset(); + + return RunCommandImpl::run(shared_from_this()) + .onError<ErrorCodes::StaleDbVersion>( + [this, anchor = shared_from_this()](Status s) -> Future<void> { + auto opCtx = _execContext->getOpCtx(); + + if (!opCtx->getClient()->isInDirectClient() && + serverGlobalParams.clusterRole != ClusterRole::ConfigServer && + !_refreshedDatabase) { + auto sce = s.extraInfo<StaleDbRoutingVersion>(); + invariant(sce); + // TODO SERVER-52784 refresh only if wantedVersion is empty or less then + // received + const auto refreshed = _execContext->behaviors->refreshDatabase(opCtx, *sce); + if (refreshed) { + _refreshedDatabase = true; + return _commandExec(); + } + } + + return s; + }) + .onErrorCategory<ErrorCategory::StaleShardVersionError>([this, anchor = shared_from_this()]( + Status s) -> Future<void> { + auto opCtx = _execContext->getOpCtx(); + + if (!opCtx->getClient()->isInDirectClient() && + serverGlobalParams.clusterRole != ClusterRole::ConfigServer && + !_refreshedCollection) { + if (auto sce = s.extraInfo<StaleConfigInfo>()) { + // TODO SERVER-52784 refresh only if wantedVersion is empty or less then + // received + const auto refreshed = _execContext->behaviors->refreshCollection(opCtx, *sce); + if (refreshed) { + _refreshedCollection = true; + return _commandExec(); + } + } + } + + return s; + }); } void ExecCommandDatabase::_handleFailure(Status status) { @@ -1578,8 +1625,6 @@ void ExecCommandDatabase::_handleFailure(Status status) { auto replyBuilder = _execContext->getReplyBuilder(); const auto& behaviors = *_execContext->behaviors; - behaviors.handleException(status, opCtx); - // Append the error labels for transient transaction errors. auto response = _extraFieldsBuilder.asTempObj(); boost::optional<ErrorCodes::Error> wcCode; diff --git a/src/mongo/db/service_entry_point_common.h b/src/mongo/db/service_entry_point_common.h index dbf95b165a2..d3edbacae12 100644 --- a/src/mongo/db/service_entry_point_common.h +++ b/src/mongo/db/service_entry_point_common.h @@ -84,7 +84,11 @@ struct ServiceEntryPointCommon { virtual void attachCurOpErrInfo(OperationContext* opCtx, const BSONObj& replyObj) const = 0; - virtual void handleException(const Status& status, OperationContext* opCtx) const = 0; + virtual bool refreshDatabase(OperationContext* opCtx, const StaleDbRoutingVersion& se) const + noexcept = 0; + + virtual bool refreshCollection(OperationContext* opCtx, const StaleConfigInfo& se) const + noexcept = 0; virtual void advanceConfigOpTimeFromRequestMetadata(OperationContext* opCtx) const = 0; diff --git a/src/mongo/db/service_entry_point_mongod.cpp b/src/mongo/db/service_entry_point_mongod.cpp index 4347ffa8ac4..324885da92c 100644 --- a/src/mongo/db/service_entry_point_mongod.cpp +++ b/src/mongo/db/service_entry_point_mongod.cpp @@ -182,37 +182,6 @@ public: CurOp::get(opCtx)->debug().errInfo = getStatusFromCommandResult(replyObj); } - void handleException(const Status& status, OperationContext* opCtx) const override { - // If we got a stale config, wait in case the operation is stuck in a critical section - if (auto sce = status.extraInfo<StaleConfigInfo>()) { - // A config server acting as a router may return a StaleConfig exception, but a config - // server won't contain data for a sharded collection, so skip handling the exception. - if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { - return; - } - - if (sce->getCriticalSectionSignal()) { - // Set migration critical section on operation sharding state: operation will wait - // for the migration to finish before returning. - auto& oss = OperationShardingState::get(opCtx); - oss.setMigrationCriticalSectionSignal(sce->getCriticalSectionSignal()); - } - - if (!opCtx->getClient()->isInDirectClient()) { - // We already have the StaleConfig exception, so just swallow any errors due to - // refresh - onShardVersionMismatchNoExcept(opCtx, sce->getNss(), sce->getVersionReceived()) - .ignore(); - } - } else if (auto sce = status.extraInfo<StaleDbRoutingVersion>()) { - if (!opCtx->getClient()->isInDirectClient()) { - onDbVersionMismatchNoExcept( - opCtx, sce->getDb(), sce->getVersionReceived(), sce->getVersionWanted()) - .ignore(); - } - } - } - // Called from the error contexts where request may not be available. void appendReplyMetadataOnError(OperationContext* opCtx, BSONObjBuilder* metadataBob) const override { @@ -258,6 +227,18 @@ public: } } + bool refreshDatabase(OperationContext* opCtx, const StaleDbRoutingVersion& se) const + noexcept override { + return onDbVersionMismatchNoExcept( + opCtx, se.getDb(), se.getVersionReceived(), se.getVersionWanted()) + .isOK(); + } + + bool refreshCollection(OperationContext* opCtx, const StaleConfigInfo& se) const + noexcept override { + return onShardVersionMismatchNoExcept(opCtx, se.getNss(), se.getVersionReceived()).isOK(); + } + void advanceConfigOpTimeFromRequestMetadata(OperationContext* opCtx) const override { // Handle config optime information that may have been sent along with the command. rpc::advanceConfigOpTimeFromRequestMetadata(opCtx); diff --git a/src/mongo/embedded/service_entry_point_embedded.cpp b/src/mongo/embedded/service_entry_point_embedded.cpp index 158b55a10e6..efc8926edb0 100644 --- a/src/mongo/embedded/service_entry_point_embedded.cpp +++ b/src/mongo/embedded/service_entry_point_embedded.cpp @@ -93,7 +93,15 @@ public: void attachCurOpErrInfo(OperationContext*, const BSONObj&) const override {} - void handleException(const Status& status, OperationContext* opCtx) const override {} + bool refreshDatabase(OperationContext* opCtx, const StaleDbRoutingVersion& se) const + noexcept override { + return false; + } + + bool refreshCollection(OperationContext* opCtx, const StaleConfigInfo& se) const + noexcept override { + return false; + } void advanceConfigOpTimeFromRequestMetadata(OperationContext* opCtx) const override {} |