diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2023-04-10 14:54:22 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2023-04-20 16:07:41 +0000 |
commit | d3baa192c0048b643e9f664ced15a0fe1cd49e08 (patch) | |
tree | 4c56c2d589060dc599886e0a8b6d52b40c4a2bab | |
parent | 44b65708ef058734d9847d41d499efba888e298e (diff) | |
download | mongo-d3baa192c0048b643e9f664ced15a0fe1cd49e08.tar.gz |
SERVER-74380 Make the router loop not invariant on unexpected namespacesr6.0.6-rc0
27 files changed, 219 insertions, 127 deletions
diff --git a/jstests/sharding/mongos_validate_writes.js b/jstests/sharding/mongos_validate_writes.js index 0852dccf763..fc6d4dccab3 100644 --- a/jstests/sharding/mongos_validate_writes.js +++ b/jstests/sharding/mongos_validate_writes.js @@ -22,8 +22,8 @@ st.ensurePrimaryShard(coll.getDB().getName(), st.shard1.shardName); coll.createIndex({a: 1}); // Shard the collection on {a: 1} and move one chunk to another shard. Updates need to be across -// two shards to trigger an error, otherwise they are versioned and will succeed after raising -// a StaleConfigException. +// two shards to trigger an error, otherwise they are versioned and will succeed after raising a +// StaleConfig error. st.shardColl(coll, {a: 1}, {a: 0}, {a: 1}, coll.getDB(), true); // Let the stale mongos see the collection state diff --git a/jstests/sharding/query/lookup_graph_lookup_foreign_becomes_sharded.js b/jstests/sharding/query/lookup_graph_lookup_foreign_becomes_sharded.js index c3924654a74..1bc05f59555 100644 --- a/jstests/sharding/query/lookup_graph_lookup_foreign_becomes_sharded.js +++ b/jstests/sharding/query/lookup_graph_lookup_foreign_becomes_sharded.js @@ -225,14 +225,14 @@ for (let testCase of testCases) { const newStaleConfigErrorCount = assert.commandWorked(primaryDB.runCommand({serverStatus: 1})) .shardingStatistics.countStaleConfigErrors; -// ... and a single StaleConfig exception for the foreign namespace. Note that the 'ns' field of the +// ... and a single StaleConfig error for the foreign namespace. Note that the 'ns' field of the // profiler entry is the source collection in both cases, because the $lookup's parent aggregation // produces the profiler entry, and it is always running on the source collection. // TODO SERVER-60018: When the feature flag is removed, remove the check and ensure the results are // expected. if (!isShardedLookupEnabled) { - // Both in the classic lookup and the SBE lookup, 'StaleConfig' error happens. In the classic - // lookup, profiling can properly report the 'StaleConfig' of the foreign collection. + // Both in the classic lookup and the SBE lookup, StaleConfig error happens. In the classic + // lookup, profiling can properly report the StaleConfig of the foreign collection. // // On the other hand, in the SBE lookup, profiling fails to report the error because the // profiling level is not set up properly according to the configured profiling level in case of diff --git a/jstests/sharding/query/lookup_mongod_unaware.js b/jstests/sharding/query/lookup_mongod_unaware.js index 41e5f23047b..35cc9d920cb 100644 --- a/jstests/sharding/query/lookup_mongod_unaware.js +++ b/jstests/sharding/query/lookup_mongod_unaware.js @@ -6,8 +6,9 @@ * We restart a mongod to cause it to forget that a collection was sharded. When restarted, we * expect it to still have all the previous data. * + * // TODO (SERVER-74380): Remove requires_fcv_70 once SERVER-74380 has been backported to v6.0 * @tags: [ - * requires_persistence + * requires_persistence, * ] * */ @@ -18,7 +19,7 @@ load("jstests/noPassthrough/libs/server_parameter_helpers.js"); // For setParam load("jstests/libs/discover_topology.js"); // For findDataBearingNodes. // Restarts the primary shard and ensures that it believes both collections are unsharded. -function restartPrimaryShard(rs, localColl, foreignColl) { +function restartPrimaryShard(rs, ...expectedCollections) { // Returns true if the shard is aware that the collection is sharded. function hasRoutingInfoForNs(shardConn, coll) { const res = shardConn.adminCommand({getShardVersion: coll, fullMetadata: true}); @@ -28,8 +29,11 @@ function restartPrimaryShard(rs, localColl, foreignColl) { rs.restart(0); rs.awaitSecondaryNodes(); - assert(!hasRoutingInfoForNs(rs.getPrimary(), localColl.getFullName())); - assert(!hasRoutingInfoForNs(rs.getPrimary(), foreignColl.getFullName())); + + expectedCollections.forEach(function(coll) { + assert(!hasRoutingInfoForNs(rs.getPrimary(), coll.getFullName()), + 'Shard role not cleared for ' + coll.getFullName()); + }); } // Disable checking for index consistency to ensure that the config server doesn't trigger a @@ -170,5 +174,55 @@ assert.eq(mongos0LocalColl.aggregate(pipeline).toArray(), expectedResults); restartPrimaryShard(st.rs0, mongos0LocalColl, mongos0ForeignColl); assert.eq(mongos1LocalColl.aggregate(pipeline).toArray(), expectedResults); +// +// Test two-level $lookup with a stale shard handles the shard role recovery case. +// +jsTest.log("Running two-level $lookup with a shard that needs recovery"); + +assert.commandWorked(st.s0.adminCommand({enableSharding: 'D'})); +st.ensurePrimaryShard('D', st.shard0.shardName); + +const D = st.s0.getDB('D'); + +assert.commandWorked(D.A.insert({Key: 1, Value: 1})); +assert.commandWorked(D.B.insert({Key: 1, Value: 1})); +assert.commandWorked(D.C.insert({Key: 1, Value: 1})); +assert.commandWorked(D.D.insert({Key: 1, Value: 1})); + +const aggPipeline = [{ + $lookup: { + from: 'B', + localField: 'Key', + foreignField: 'Value', + as: 'Joined', + pipeline: [ + { + $lookup: { + from: 'C', + localField: 'Key', + foreignField: 'Value', + as: 'Joined', + } + }, + { + $lookup: { + from: 'D', + localField: 'Key', + foreignField: 'Value', + as: 'Joined', + } + }, + ], + } +}]; + +const resultBefore = D.A.aggregate(aggPipeline).toArray(); + +// Restarting the shard primary in order for the shard role's cache to be cleared +restartPrimaryShard(st.rs0, D.A, D.B, D.C, D.D); + +const resultAfter = D.A.aggregate(aggPipeline).toArray(); +assert.eq(resultBefore, resultAfter, "Before and after results do not match"); + st.stop(); })(); diff --git a/src/mongo/db/commands.cpp b/src/mongo/db/commands.cpp index 8c75b5cfaa8..3ec877b7d4c 100644 --- a/src/mongo/db/commands.cpp +++ b/src/mongo/db/commands.cpp @@ -168,7 +168,7 @@ BSONObj CommandHelpers::runCommandDirectly(OperationContext* opCtx, const OpMsgR invocation->run(opCtx, &replyBuilder); auto body = replyBuilder.getBodyBuilder(); CommandHelpers::extractOrAppendOk(body); - } catch (const StaleConfigException&) { + } catch (const ExceptionFor<ErrorCodes::StaleConfig>&) { // These exceptions are intended to be handled at a higher level. throw; } catch (const DBException& ex) { diff --git a/src/mongo/db/commands/dbcommands_d.cpp b/src/mongo/db/commands/dbcommands_d.cpp index 86f88497d88..339d442bcaf 100644 --- a/src/mongo/db/commands/dbcommands_d.cpp +++ b/src/mongo/db/commands/dbcommands_d.cpp @@ -382,7 +382,7 @@ public: try { // RELOCKED ctx.reset(new AutoGetCollectionForReadCommand(opCtx, nss)); - } catch (const StaleConfigException&) { + } catch (const ExceptionFor<ErrorCodes::StaleConfig>&) { LOGV2_DEBUG( 20453, 1, diff --git a/src/mongo/db/exec/write_stage_common.h b/src/mongo/db/exec/write_stage_common.h index 5628822efff..489afa62f2f 100644 --- a/src/mongo/db/exec/write_stage_common.h +++ b/src/mongo/db/exec/write_stage_common.h @@ -29,8 +29,6 @@ #pragma once -#include "mongo/platform/basic.h" - #include "mongo/db/exec/shard_filterer.h" #include "mongo/db/exec/working_set.h" #include "mongo/db/namespace_string.h" diff --git a/src/mongo/db/pipeline/dispatch_shard_pipeline_test.cpp b/src/mongo/db/pipeline/dispatch_shard_pipeline_test.cpp index 41b8f731ecc..ac8924a13ed 100644 --- a/src/mongo/db/pipeline/dispatch_shard_pipeline_test.cpp +++ b/src/mongo/db/pipeline/dispatch_shard_pipeline_test.cpp @@ -30,7 +30,7 @@ #include "mongo/db/pipeline/aggregation_request_helper.h" #include "mongo/db/pipeline/sharded_agg_helpers.h" #include "mongo/s/query/sharded_agg_test_fixture.h" -#include "mongo/s/router.h" +#include "mongo/s/router_role.h" namespace mongo { namespace { diff --git a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp index 69b5a111e2b..2c37e108c82 100644 --- a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp @@ -29,8 +29,6 @@ #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery -#include "mongo/platform/basic.h" - #include "mongo/db/pipeline/process_interface/shardsvr_process_interface.h" #include <fmt/format.h> @@ -51,7 +49,7 @@ #include "mongo/s/cluster_commands_helpers.h" #include "mongo/s/cluster_write.h" #include "mongo/s/query/document_source_merge_cursors.h" -#include "mongo/s/router.h" +#include "mongo/s/router_role.h" #include "mongo/s/stale_shard_version_helpers.h" namespace mongo { @@ -76,14 +74,11 @@ void ShardServerProcessInterface::checkRoutingInfoEpochOrThrow( catalogCache->invalidateShardOrEntireCollectionEntryForShardedCollection( nss, targetCollectionVersion, shardId); - const auto routingInfo = - uassertStatusOK(catalogCache->getCollectionRoutingInfo(expCtx->opCtx, nss)); - - const auto foundVersion = - routingInfo.isSharded() ? routingInfo.getVersion() : ChunkVersion::UNSHARDED(); + const auto cm = uassertStatusOK(catalogCache->getCollectionRoutingInfo(expCtx->opCtx, nss)); + auto foundVersion = cm.isSharded() ? cm.getVersion() : ChunkVersion::UNSHARDED(); - uassert(StaleEpochInfo(nss), - str::stream() << "Could not act as router for " << nss.ns() << ", wanted " + uassert(StaleEpochInfo(nss, targetCollectionVersion, foundVersion), + str::stream() << "Could not act as router for " << nss.ns() << ", received " << targetCollectionVersion.toString() << ", but found " << foundVersion.toString(), foundVersion.isSameCollection(targetCollectionVersion)); diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp index 157de75a2b7..42e8ae24d32 100644 --- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp +++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp @@ -61,7 +61,7 @@ #include "mongo/s/query/cluster_query_knobs_gen.h" #include "mongo/s/query/document_source_merge_cursors.h" #include "mongo/s/query/establish_cursors.h" -#include "mongo/s/router.h" +#include "mongo/s/router_role.h" #include "mongo/s/stale_exception.h" #include "mongo/s/transaction_router.h" #include "mongo/util/fail_point.h" @@ -1177,7 +1177,7 @@ DispatchShardPipelineResults dispatchShardPipeline( targetedCommand, ReadPreferenceSetting::get(opCtx)); - } catch (const StaleConfigException& e) { + } catch (const ExceptionFor<ErrorCodes::StaleConfig>& e) { // Check to see if the command failed because of a stale shard version or something // else. auto staleInfo = e.extraInfo<StaleConfigInfo>(); diff --git a/src/mongo/db/s/collection_sharding_state.h b/src/mongo/db/s/collection_sharding_state.h index 575ccb85f2f..7684847cfbc 100644 --- a/src/mongo/db/s/collection_sharding_state.h +++ b/src/mongo/db/s/collection_sharding_state.h @@ -98,7 +98,7 @@ public: /** * If the shard currently doesn't know whether the collection is sharded or not, it will throw a - * StaleConfig exception. + * StaleConfig error. * * If the request doesn't have a shard version all collections will be treated as UNSHARDED. * @@ -112,7 +112,7 @@ public: * * If the shard currently doesn't know whether the collection is sharded or not, or if the * expected shard version doesn't match with the one in the OperationShardingState, it will - * throw a StaleConfig exception. + * throw a StaleConfig error. * * If the operation context contains an 'atClusterTime', the returned filtering object will be * tied to a specific point in time. Otherwise, it will reference the latest cluster time @@ -143,7 +143,7 @@ public: /** * Checks whether the shard version in the operation context is compatible with the shard - * version of the collection and if not, throws StaleConfigException populated with the received + * version of the collection and if not, throws StaleConfig error populated with the received * and wanted versions. * * If the request is not versioned all collections will be treated as UNSHARDED. diff --git a/src/mongo/db/s/migration_source_manager.h b/src/mongo/db/s/migration_source_manager.h index d93c701f3d5..a75e68c4474 100644 --- a/src/mongo/db/s/migration_source_manager.h +++ b/src/mongo/db/s/migration_source_manager.h @@ -97,8 +97,8 @@ public: * * May throw any exception. Known exceptions are: * - InvalidOptions if the operation context is missing shard version - * - StaleConfigException if the expected collection version does not match what we find it - * to be after acquiring the distributed lock. + * - StaleConfig if the expected collection version does not match what we find it to be after + * acquiring the distributed lock. */ MigrationSourceManager(OperationContext* opCtx, ShardsvrMoveRange&& request, diff --git a/src/mongo/db/s/move_primary_source_manager.h b/src/mongo/db/s/move_primary_source_manager.h index 45bc26a6b7a..42586f9d102 100644 --- a/src/mongo/db/s/move_primary_source_manager.h +++ b/src/mongo/db/s/move_primary_source_manager.h @@ -70,10 +70,10 @@ public: * Instantiates a new movePrimary source manager. Must be called with the distributed lock * acquired in advance (not asserted). * - * May throw any exception. Known exceptions (TODO) are: + * May throw any exception. Known exceptions are: * - InvalidOptions if the operation context is missing database version - * - StaleConfigException if the expected database version does not match what we find it - * to be after acquiring the distributed lock. + * - StaleConfig if the expected database version does not match what we find it to be after + * acquiring the distributed lock */ MovePrimarySourceManager(OperationContext* opCtx, diff --git a/src/mongo/db/s/resharding/resharding_collection_cloner.cpp b/src/mongo/db/s/resharding/resharding_collection_cloner.cpp index 88ac85eed34..e51d2e2ed1f 100644 --- a/src/mongo/db/s/resharding/resharding_collection_cloner.cpp +++ b/src/mongo/db/s/resharding/resharding_collection_cloner.cpp @@ -29,8 +29,6 @@ #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kResharding -#include "mongo/platform/basic.h" - #include "mongo/db/s/resharding/resharding_collection_cloner.h" #include <utility> @@ -276,7 +274,7 @@ bool ReshardingCollectionCloner::doOneBatch(OperationContext* opCtx, Pipeline& p // ReshardingOpObserver depends on the collection metadata being known when processing writes to // the temporary resharding collection. We attach shard version IGNORED to the insert operations - // and retry once on a StaleConfig exception to allow the collection metadata information to be + // and retry once on a StaleConfig error to allow the collection metadata information to be // recovered. ScopedSetShardRole scopedSetShardRole(opCtx, _outputNss, diff --git a/src/mongo/db/s/resharding/resharding_data_copy_util.h b/src/mongo/db/s/resharding/resharding_data_copy_util.h index 0858fa2d0e4..b51cfc250a5 100644 --- a/src/mongo/db/s/resharding/resharding_data_copy_util.h +++ b/src/mongo/db/s/resharding/resharding_data_copy_util.h @@ -149,8 +149,8 @@ void updateSessionRecord(OperationContext* opCtx, /** * Calls and returns the value from the supplied lambda function. * - * If a StaleConfig exception is thrown during its execution, then this function will attempt to - * refresh the collection and invoke the supplied lambda function a second time. + * If a StaleConfig error is thrown during its execution, then this function will attempt to refresh + * the collection and invoke the supplied lambda function a second time. */ template <typename Callable> auto withOneStaleConfigRetry(OperationContext* opCtx, Callable&& callable) { diff --git a/src/mongo/db/s/resharding/resharding_oplog_application.cpp b/src/mongo/db/s/resharding/resharding_oplog_application.cpp index b774095616b..42c5a1543e5 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_application.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_application.cpp @@ -29,8 +29,6 @@ #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kResharding -#include "mongo/platform/basic.h" - #include "mongo/db/s/resharding/resharding_oplog_application.h" #include "mongo/db/concurrency/exception_util.h" @@ -79,8 +77,8 @@ void runWithTransaction(OperationContext* opCtx, // ReshardingOpObserver depends on the collection metadata being known when processing writes to // the temporary resharding collection. We attach shard version IGNORED to the write operations - // and leave it to ReshardingOplogBatchApplier::applyBatch() to retry on a StaleConfig exception - // to allow the collection metadata information to be recovered. + // and leave it to ReshardingOplogBatchApplier::applyBatch() to retry on a StaleConfig error to + // allow the collection metadata information to be recovered. ScopedSetShardRole scopedSetShardRole(asr.opCtx(), nss, ChunkVersion::IGNORED() /* shardVersion */, diff --git a/src/mongo/db/s/resharding/resharding_oplog_batch_applier.cpp b/src/mongo/db/s/resharding/resharding_oplog_batch_applier.cpp index 4ff29b42d30..c7efa6d33e4 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_batch_applier.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_batch_applier.cpp @@ -29,12 +29,8 @@ #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kResharding -#include "mongo/platform/basic.h" - #include "mongo/db/s/resharding/resharding_oplog_batch_applier.h" -#include <memory> - #include "mongo/db/s/operation_sharding_state.h" #include "mongo/db/s/resharding/resharding_data_copy_util.h" #include "mongo/db/s/resharding/resharding_future_util.h" @@ -83,8 +79,8 @@ SemiFuture<void> ReshardingOplogBatchApplier::applyBatch( // ReshardingOpObserver depends on the collection metadata being known // when processing writes to the temporary resharding collection. We // attach shard version IGNORED to the write operations and retry once - // on a StaleConfig exception to allow the collection metadata - // information to be recovered. + // on a StaleConfig error to allow the collection metadata information to + // be recovered. ScopedSetShardRole scopedSetShardRole( opCtx.get(), _crudApplication.getOutputNss(), diff --git a/src/mongo/db/s/shard_filtering_metadata_refresh.h b/src/mongo/db/s/shard_filtering_metadata_refresh.h index 0a714ee2b61..fd3a8b8ca6d 100644 --- a/src/mongo/db/s/shard_filtering_metadata_refresh.h +++ b/src/mongo/db/s/shard_filtering_metadata_refresh.h @@ -39,11 +39,12 @@ namespace mongo { class OperationContext; /** - * Must be invoked whenever code, which is executing on a shard encounters a StaleConfig exception - * and should be passed the 'version received' from the exception. If the shard's current version is - * behind 'shardVersionReceived', causes the shard's filtering metadata to be refreshed from the - * config server, otherwise does nothing and immediately returns. If there are other threads - * currently performing refresh, blocks so that only one of them hits the config server. + * Must be invoked whenever code, which is executing on a shard encounters a StaleConfig error and + * should be passed the placement version from the 'version received' in the exception. If the + * shard's current placement version is behind 'chunkVersionReceived', causes the shard's filtering + * metadata to be refreshed from the config server, otherwise does nothing and immediately returns. + * If there are other threads currently performing refresh, blocks so that only one of them hits the + * config server. * * If refresh fails for any reason (most commonly ExceededTimeLimit), returns a failed status. * diff --git a/src/mongo/rpc/get_status_from_command_result.cpp b/src/mongo/rpc/get_status_from_command_result.cpp index 2607ff15e3c..315d4ef3135 100644 --- a/src/mongo/rpc/get_status_from_command_result.cpp +++ b/src/mongo/rpc/get_status_from_command_result.cpp @@ -50,7 +50,7 @@ Status getStatusFromCommandResult(const BSONObj& result) { BSONElement codeElement = result["code"]; BSONElement errmsgElement = result["errmsg"]; - // StaleConfigException doesn't pass "ok" in legacy servers + // StaleConfig doesn't pass "ok" in legacy servers BSONElement dollarErrElement = result["$err"]; if (okElement.eoo() && dollarErrElement.eoo()) { diff --git a/src/mongo/rpc/legacy_reply_builder.h b/src/mongo/rpc/legacy_reply_builder.h index 45e986ab9ed..22a9c4566f6 100644 --- a/src/mongo/rpc/legacy_reply_builder.h +++ b/src/mongo/rpc/legacy_reply_builder.h @@ -49,7 +49,7 @@ public: LegacyReplyBuilder(Message&&); ~LegacyReplyBuilder() final; - // Override of setCommandReply specifically used to handle StaleConfigException. + // Override of setCommandReply specifically used to handle StaleConfig errors LegacyReplyBuilder& setCommandReply(Status nonOKStatus, BSONObj extraErrorInfo) final; LegacyReplyBuilder& setRawCommandReply(const BSONObj& commandReply) final; diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript index 452b1c12b47..4ab4c1946ec 100644 --- a/src/mongo/s/SConscript +++ b/src/mongo/s/SConscript @@ -49,13 +49,13 @@ env.Library( 'cluster_commands_helpers.cpp', 'collection_uuid_mismatch.cpp', 'multi_statement_transaction_requests_sender.cpp', + 'router_role.cpp', 'router_transactions_metrics.cpp', 'router_transactions_stats.idl', - 'router.cpp', 'session_catalog_router.cpp', 'stale_shard_version_helpers.cpp', - 'transaction_router_resource_yielder.cpp', 'transaction_router.cpp', + 'transaction_router_resource_yielder.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/db/commands/txn_cmd_request', diff --git a/src/mongo/s/cluster_commands_helpers.h b/src/mongo/s/cluster_commands_helpers.h index 6d4a423332a..086c00b03f8 100644 --- a/src/mongo/s/cluster_commands_helpers.h +++ b/src/mongo/s/cluster_commands_helpers.h @@ -86,7 +86,8 @@ boost::intrusive_ptr<ExpressionContext> makeExpressionContextWithDefaultsForTarg * Dispatches all the specified requests in parallel and waits until all complete, returning a * vector of the same size and positions as that of 'requests'. * - * Throws StaleConfigException if any remote returns a stale shardVersion error. + * Throws StaleConfig if any of the remotes returns that error, regardless of what the other errors + * are. */ std::vector<AsyncRequestsSender::Response> gatherResponses( OperationContext* opCtx, @@ -166,7 +167,7 @@ std::vector<AsyncRequestsSender::Response> scatterGatherUnversionedTargetAllShar * Utility for dispatching versioned commands on a namespace, deciding which shards to * target by applying the passed-in query and collation to the local routing table cache. * - * Does not retry on StaleConfigException. + * Does not retry on StaleConfig errors. */ std::vector<AsyncRequestsSender::Response> scatterGatherVersionedTargetByRoutingTable( OperationContext* opCtx, @@ -180,6 +181,7 @@ std::vector<AsyncRequestsSender::Response> scatterGatherVersionedTargetByRouting const BSONObj& collation, const boost::optional<BSONObj>& letParameters, const boost::optional<LegacyRuntimeConstants>& runtimeConstants); + /** * This overload is for callers which already have a fully initialized 'ExpressionContext' (e.g. * callers from the aggregation framework). Most callers should prefer the overload above. @@ -201,7 +203,7 @@ std::vector<AsyncRequestsSender::Response> scatterGatherVersionedTargetByRouting * * Callers can specify shards to skip, even if these shards would be otherwise targeted. * - * Allows StaleConfigException errors to append to the response list. + * Allows StaleConfig errors to append to the response list. */ std::vector<AsyncRequestsSender::Response> scatterGatherVersionedTargetByRoutingTableNoThrowOnStaleShardVersionErrors( @@ -234,7 +236,7 @@ AsyncRequestsSender::Response executeCommandAgainstDatabasePrimary( * Utility for dispatching commands against the shard with the MinKey chunk for the namespace and * attaching the appropriate shard version. * - * Does not retry on StaleConfigException. + * Does not retry on StaleConfig errors. */ AsyncRequestsSender::Response executeCommandAgainstShardWithMinKeyChunk( OperationContext* opCtx, diff --git a/src/mongo/s/commands/strategy.h b/src/mongo/s/commands/strategy.h index 1e04130f657..73916229f7f 100644 --- a/src/mongo/s/commands/strategy.h +++ b/src/mongo/s/commands/strategy.h @@ -41,8 +41,8 @@ public: /** * Executes a command from either OP_QUERY or OP_MSG wire protocols. * - * Catches StaleConfigException errors and retries the command automatically after refreshing - * the metadata for the failing namespace. + * Catches StaleConfig errors and retries the command automatically after refreshing the + * metadata for the failing namespace. */ static Future<DbResponse> clientCommand(std::shared_ptr<RequestExecutionContext> rec); }; diff --git a/src/mongo/s/router.cpp b/src/mongo/s/router_role.cpp index dba40ad1137..96ee6f2228d 100644 --- a/src/mongo/s/router.cpp +++ b/src/mongo/s/router_role.cpp @@ -29,7 +29,7 @@ #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding -#include "mongo/s/router.h" +#include "mongo/s/router_role.h" #include "mongo/logv2/log.h" #include "mongo/s/grid.h" @@ -69,32 +69,34 @@ CachedDatabaseInfo DBPrimaryRouter::_getRoutingInfo(OperationContext* opCtx) con } void DBPrimaryRouter::_onException(RouteContext* context, Status s) { - if (++context->numAttempts > kMaxNumStaleVersionRetries) { - uassertStatusOKWithContext( - s, - str::stream() << "Exceeded maximum number of " << kMaxNumStaleVersionRetries - << " retries attempting \'" << context->comment << "\'"); - } else { - LOGV2_DEBUG(637590, - 3, - "Retrying {description}. Got error: {status}", - "description"_attr = context->comment, - "status"_attr = s); - } - auto catalogCache = Grid::get(_service)->catalogCache(); if (s == ErrorCodes::StaleDbVersion) { auto si = s.extraInfo<StaleDbRoutingVersion>(); - invariant(si); - invariant(si->getDb() == _db, - str::stream() << "StaleDbVersion on unexpected database. Expected " << _db - << ", received " << si->getDb()); + tassert(6375900, "StaleDbVersion must have extraInfo", si); + tassert(6375901, + str::stream() << "StaleDbVersion on unexpected database. Expected " << _db + << ", received " << si->getDb(), + si->getDb() == _db); catalogCache->onStaleDatabaseVersion(si->getDb(), si->getVersionWanted()); } else { uassertStatusOK(s); } + + if (++context->numAttempts > kMaxNumStaleVersionRetries) { + uassertStatusOKWithContext( + s, + str::stream() << "Exceeded maximum number of " << kMaxNumStaleVersionRetries + << " retries attempting \'" << context->comment << "\'"); + } else { + LOGV2_DEBUG(6375902, + 3, + "Retrying database primary routing operation", + "attempt"_attr = context->numAttempts, + "comment"_attr = context->comment, + "status"_attr = s); + } } CollectionRouter::CollectionRouter(ServiceContext* service, NamespaceString nss) @@ -122,42 +124,39 @@ ChunkManager CollectionRouter::_getRoutingInfo(OperationContext* opCtx) const { } void CollectionRouter::_onException(RouteContext* context, Status s) { + auto catalogCache = Grid::get(_service)->catalogCache(); + + if (s == ErrorCodes::StaleDbVersion) { + auto si = s.extraInfo<StaleDbRoutingVersion>(); + tassert(6375903, "StaleDbVersion must have extraInfo", si); + catalogCache->onStaleDatabaseVersion(si->getDb(), si->getVersionWanted()); + } else if (s == ErrorCodes::StaleConfig) { + auto si = s.extraInfo<StaleConfigInfo>(); + tassert(6375904, "StaleConfig must have extraInfo", si); + catalogCache->invalidateShardOrEntireCollectionEntryForShardedCollection( + si->getNss(), si->getVersionWanted(), si->getShardId()); + } else if (s == ErrorCodes::StaleEpoch) { + auto si = s.extraInfo<StaleEpochInfo>(); + tassert(6375905, "StaleEpoch must have extra info", si); + catalogCache->invalidateShardOrEntireCollectionEntryForShardedCollection( + si->getNss(), si->getVersionWanted(), ShardId()); + } else { + uassertStatusOK(s); + } + if (++context->numAttempts > kMaxNumStaleVersionRetries) { uassertStatusOKWithContext( s, str::stream() << "Exceeded maximum number of " << kMaxNumStaleVersionRetries << " retries attempting \'" << context->comment << "\'"); } else { - LOGV2_DEBUG(637591, + LOGV2_DEBUG(6375906, 3, - "Retrying {description}. Got error: {status}", - "description"_attr = context->comment, + "Retrying collection routing operation", + "attempt"_attr = context->numAttempts, + "comment"_attr = context->comment, "status"_attr = s); } - - auto catalogCache = Grid::get(_service)->catalogCache(); - - if (s.isA<ErrorCategory::StaleShardVersionError>()) { - if (auto si = s.extraInfo<StaleConfigInfo>()) { - invariant(si->getNss() == _nss, - str::stream() << "StaleConfig on unexpected namespace. Expected " << _nss - << ", received " << si->getNss()); - catalogCache->invalidateShardOrEntireCollectionEntryForShardedCollection( - _nss, si->getVersionWanted(), si->getShardId()); - } else { - catalogCache->invalidateCollectionEntry_LINEARIZABLE(_nss); - } - } else if (s == ErrorCodes::StaleDbVersion) { - auto si = s.extraInfo<StaleDbRoutingVersion>(); - invariant(si); - invariant(si->getDb() == _nss.db(), - str::stream() << "StaleDbVersion on unexpected database. Expected " << _nss.db() - << ", received " << si->getDb()); - - catalogCache->onStaleDatabaseVersion(si->getDb(), si->getVersionWanted()); - } else { - uassertStatusOK(s); - } } } // namespace router diff --git a/src/mongo/s/router.h b/src/mongo/s/router_role.h index 6cfd1d9e546..6cfd1d9e546 100644 --- a/src/mongo/s/router.h +++ b/src/mongo/s/router_role.h diff --git a/src/mongo/s/stale_exception.cpp b/src/mongo/s/stale_exception.cpp index ac026a45ea2..a04c6c27937 100644 --- a/src/mongo/s/stale_exception.cpp +++ b/src/mongo/s/stale_exception.cpp @@ -39,6 +39,18 @@ MONGO_INIT_REGISTER_ERROR_EXTRA_INFO(StaleConfigInfo); MONGO_INIT_REGISTER_ERROR_EXTRA_INFO(StaleEpochInfo); MONGO_INIT_REGISTER_ERROR_EXTRA_INFO(StaleDbRoutingVersion); +boost::optional<ChunkVersion> extractOptionalChunkVersion(const BSONObj& obj, StringData field) { + try { + return ChunkVersion::fromBSONLegacyOrNewerFormat(obj, field); + } catch (const DBException& ex) { + auto status = ex.toStatus(); + if (status != ErrorCodes::NoSuchKey) { + throw; + } + } + return boost::none; +} + } // namespace void StaleConfigInfo::serialize(BSONObjBuilder* bob) const { @@ -56,31 +68,35 @@ std::shared_ptr<const ErrorExtraInfo> StaleConfigInfo::parse(const BSONObj& obj) const auto shardId = obj["shardId"].String(); uassert(ErrorCodes::NoSuchKey, "The shardId field is missing", !shardId.empty()); - auto extractOptionalChunkVersion = [&obj](StringData field) -> boost::optional<ChunkVersion> { - try { - return ChunkVersion::fromBSONLegacyOrNewerFormat(obj, field); - } catch (const DBException& ex) { - auto status = ex.toStatus(); - if (status != ErrorCodes::NoSuchKey) { - throw; - } - } - return boost::none; - }; - return std::make_shared<StaleConfigInfo>( NamespaceString(obj["ns"].String()), ChunkVersion::fromBSONLegacyOrNewerFormat(obj, "vReceived"), - extractOptionalChunkVersion("vWanted"), + extractOptionalChunkVersion(obj, "vWanted"), ShardId(shardId)); } void StaleEpochInfo::serialize(BSONObjBuilder* bob) const { bob->append("ns", _nss.ns()); + if (_received) + _received->appendLegacyWithField(bob, "vReceived"); + if (_wanted) + _wanted->appendLegacyWithField(bob, "vWanted"); } std::shared_ptr<const ErrorExtraInfo> StaleEpochInfo::parse(const BSONObj& obj) { - return std::make_shared<StaleEpochInfo>(NamespaceString(obj["ns"].String())); + auto received = extractOptionalChunkVersion(obj, "vReceived"); + auto wanted = extractOptionalChunkVersion(obj, "vWanted"); + + uassert(6375907, + str::stream() << "Either both vReceived (" << received << ")" + << " and vWanted (" << wanted << ") must be present or none", + received.is_initialized() == wanted.is_initialized()); + + if (received) + return std::make_shared<StaleEpochInfo>( + NamespaceString(obj["ns"].String()), *received, *wanted); + else + return std::make_shared<StaleEpochInfo>(NamespaceString(obj["ns"].String())); } void StaleDbRoutingVersion::serialize(BSONObjBuilder* bob) const { diff --git a/src/mongo/s/stale_exception.h b/src/mongo/s/stale_exception.h index d82b18bab52..1778fe689a4 100644 --- a/src/mongo/s/stale_exception.h +++ b/src/mongo/s/stale_exception.h @@ -75,7 +75,7 @@ public: void serialize(BSONObjBuilder* bob) const; static std::shared_ptr<const ErrorExtraInfo> parse(const BSONObj& obj); -protected: +private: NamespaceString _nss; ChunkVersion _received; boost::optional<ChunkVersion> _wanted; @@ -85,24 +85,41 @@ protected: boost::optional<SharedSemiFuture<void>> _criticalSectionSignal; }; +// TODO (SERVER-74380): Rename the StaleEpoch code to StaleDownstreamRouter and the info to +// StaleDownstreamRouterInfo class StaleEpochInfo final : public ErrorExtraInfo { public: static constexpr auto code = ErrorCodes::StaleEpoch; + StaleEpochInfo(NamespaceString nss, ChunkVersion received, ChunkVersion wanted) + : _nss(std::move(nss)), _received(received), _wanted(wanted) {} + + // TODO (SERVER-74380): Remove this constructor StaleEpochInfo(NamespaceString nss) : _nss(std::move(nss)) {} const auto& getNss() const { return _nss; } + const auto& getVersionReceived() const { + return _received; + } + + const auto& getVersionWanted() const { + return _wanted; + } + void serialize(BSONObjBuilder* bob) const; static std::shared_ptr<const ErrorExtraInfo> parse(const BSONObj& obj); private: NamespaceString _nss; -}; -using StaleConfigException = ExceptionFor<ErrorCodes::StaleConfig>; + // TODO (SERVER-74380): These two fields are boost::optional for backwards compatibility. Either + // both of them are boost::none or both are set. + boost::optional<ChunkVersion> _received; + boost::optional<ChunkVersion> _wanted; +}; class StaleDbRoutingVersion final : public ErrorExtraInfo { public: diff --git a/src/mongo/s/stale_exception_test.cpp b/src/mongo/s/stale_exception_test.cpp index 57cb2b89062..98a2b897af5 100644 --- a/src/mongo/s/stale_exception_test.cpp +++ b/src/mongo/s/stale_exception_test.cpp @@ -55,7 +55,7 @@ TEST(StaleExceptionTest, StaleConfigInfoSerializationTest) { ASSERT_EQUALS(deserializedInfo->getShardId(), kShardId); } -TEST(StaleExceptionTest, StaleEpochInfoSerializationTest) { +TEST(StaleExceptionTest, StaleEpochInfoLegacySerializationTest) { StaleEpochInfo info(kNss); // Serialize @@ -67,6 +67,24 @@ TEST(StaleExceptionTest, StaleEpochInfoSerializationTest) { std::static_pointer_cast<const StaleEpochInfo>(StaleEpochInfo::parse(bob.obj())); ASSERT_EQUALS(deserializedInfo->getNss(), kNss); + ASSERT(!deserializedInfo->getVersionReceived()); + ASSERT(!deserializedInfo->getVersionWanted()); +} + +TEST(StaleExceptionTest, StaleEpochInfoSerializationTest) { + StaleEpochInfo info(kNss, ChunkVersion::UNSHARDED(), ChunkVersion::UNSHARDED()); + + // Serialize + BSONObjBuilder bob; + info.serialize(&bob); + + // Deserialize + auto deserializedInfo = + std::static_pointer_cast<const StaleEpochInfo>(StaleEpochInfo::parse(bob.obj())); + + ASSERT_EQ(deserializedInfo->getNss(), kNss); + ASSERT_EQ(*deserializedInfo->getVersionReceived(), ChunkVersion::UNSHARDED()); + ASSERT_EQ(*deserializedInfo->getVersionWanted(), ChunkVersion::UNSHARDED()); } } // namespace |