diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2022-03-15 16:28:10 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-03-15 17:30:57 +0000 |
commit | 83cbc410d85ff65a38abb2b6e60ad9af3107f455 (patch) | |
tree | bd76768f11778e8f827663dc6e9a6a5561c3fd8b /src/mongo | |
parent | c51987df9a8b60a98e9f91ff0328208df60427f8 (diff) | |
download | mongo-83cbc410d85ff65a38abb2b6e60ad9af3107f455.tar.gz |
SERVER-64459 Use ScopedSetShardRole in ShardvrProcessInterface
Diffstat (limited to 'src/mongo')
12 files changed, 71 insertions, 157 deletions
diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.cpp b/src/mongo/db/pipeline/document_source_graph_lookup.cpp index e77137c922c..8634d4d86bd 100644 --- a/src/mongo/db/pipeline/document_source_graph_lookup.cpp +++ b/src/mongo/db/pipeline/document_source_graph_lookup.cpp @@ -229,12 +229,17 @@ void DocumentSourceGraphLookUp::doBreadthFirstSearch() { long long depth = 0; bool shouldPerformAnotherQuery; do { + std::unique_ptr<MongoProcessInterface::ScopedExpectUnshardedCollection> + expectUnshardedCollectionInScope; + const auto allowForeignSharded = foreignShardedGraphLookupAllowed(); if (!allowForeignSharded) { // Enforce that the foreign collection must be unsharded for $graphLookup. - _fromExpCtx->mongoProcessInterface->setExpectedShardVersion( - _fromExpCtx->opCtx, _fromExpCtx->ns, ChunkVersion::UNSHARDED()); + expectUnshardedCollectionInScope = + _fromExpCtx->mongoProcessInterface->expectUnshardedCollectionInScope( + _fromExpCtx->opCtx, _fromExpCtx->ns, boost::none); } + shouldPerformAnotherQuery = false; // Check whether each key in the frontier exists in the cache or needs to be queried. diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp index 95fd6cfc291..718c823e269 100644 --- a/src/mongo/db/pipeline/document_source_lookup.cpp +++ b/src/mongo/db/pipeline/document_source_lookup.cpp @@ -28,13 +28,8 @@ */ #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery -#include "mongo/db/commands/feature_compatibility_version_parser.h" -#include "mongo/platform/basic.h" - #include "mongo/db/pipeline/document_source_lookup.h" -#include <memory> - #include "mongo/base/init.h" #include "mongo/db/exec/document_value/document.h" #include "mongo/db/exec/document_value/value.h" @@ -56,10 +51,6 @@ #include "mongo/util/fail_point.h" namespace mongo { - -using boost::intrusive_ptr; -using std::vector; - namespace { /** @@ -287,7 +278,7 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceLookUp::clone() const { return make_intrusive<DocumentSourceLookUp>(*this); } -void validateLookupCollectionlessPipeline(const vector<BSONObj>& pipeline) { +void validateLookupCollectionlessPipeline(const std::vector<BSONObj>& pipeline) { uassert(ErrorCodes::FailedToParse, "$lookup stage without explicit collection must have a pipeline with $documents as " "first stage", @@ -552,11 +543,15 @@ std::unique_ptr<Pipeline, PipelineDeleter> DocumentSourceLookUp::buildPipeline( // Resolve the 'let' variables to values per the given input document. resolveLetVariables(inputDoc, &_fromExpCtx->variables); + std::unique_ptr<MongoProcessInterface::ScopedExpectUnshardedCollection> + expectUnshardedCollectionInScope; + const auto allowForeignShardedColl = foreignShardedLookupAllowed(); if (!allowForeignShardedColl) { // Enforce that the foreign collection must be unsharded for lookup. - _fromExpCtx->mongoProcessInterface->setExpectedShardVersion( - _fromExpCtx->opCtx, _fromExpCtx->ns, ChunkVersion::UNSHARDED()); + expectUnshardedCollectionInScope = + _fromExpCtx->mongoProcessInterface->expectUnshardedCollectionInScope( + _fromExpCtx->opCtx, _fromExpCtx->ns, boost::none); } // If we don't have a cache, build and return the pipeline immediately. @@ -1169,7 +1164,7 @@ void DocumentSourceLookUp::reattachToOperationContext(OperationContext* opCtx) { } } -intrusive_ptr<DocumentSource> DocumentSourceLookUp::createFromBson( +boost::intrusive_ptr<DocumentSource> DocumentSourceLookUp::createFromBson( BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx) { uassert(ErrorCodes::FailedToParse, "the $lookup specification must be an Object", @@ -1244,7 +1239,7 @@ intrusive_ptr<DocumentSource> DocumentSourceLookUp::createFromBson( } uassert(ErrorCodes::FailedToParse, "must specify 'as' field for a $lookup", !as.empty()); - intrusive_ptr<DocumentSourceLookUp> lookupStage = nullptr; + boost::intrusive_ptr<DocumentSourceLookUp> lookupStage = nullptr; if (hasPipeline) { if (localField.empty() && foreignField.empty()) { // $lookup specified with only pipeline syntax. diff --git a/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp b/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp index c8d74ddd550..c327385db78 100644 --- a/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp @@ -29,8 +29,6 @@ #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery -#include "mongo/platform/basic.h" - #include "mongo/db/pipeline/process_interface/common_mongod_process_interface.h" #include <algorithm> @@ -60,7 +58,6 @@ #include "mongo/db/query/collection_query_info.h" #include "mongo/db/query/sbe_plan_cache.h" #include "mongo/db/repl/primary_only_service.h" -#include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/s/sharding_state.h" #include "mongo/db/s/transaction_coordinator_curop.h" #include "mongo/db/s/transaction_coordinator_worker_curop_repository.h" @@ -77,7 +74,6 @@ #include "mongo/s/query/document_source_merge_cursors.h" namespace mongo { - namespace { class MongoDResourceYielder : public ResourceYielder { diff --git a/src/mongo/db/pipeline/process_interface/mongo_process_interface.h b/src/mongo/db/pipeline/process_interface/mongo_process_interface.h index 218f222a771..4bf20c07c68 100644 --- a/src/mongo/db/pipeline/process_interface/mongo_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/mongo_process_interface.h @@ -434,32 +434,16 @@ public: ChunkVersion targetCollectionVersion) const = 0; /** - * Sets the expected shard version for the given namespace. Invariants if the caller attempts to - * change an existing shard version, or if the shard version for this namespace has already been - * checked by the commands infrastructure. Used by $lookup and $graphLookup to enforce the - * constraint that the foreign collection must be unsharded if featureFlagShardedLookup is - * turned off. Also used to enforce that the catalog cache is up-to-date when doing a local - * read. + * Used to enforce the constraint that the foreign collection must be unsharded. */ - virtual void setExpectedShardVersion(OperationContext* opCtx, - const NamespaceString& nss, - boost::optional<ChunkVersion> chunkVersion) = 0; - - /** - * Sets the expected db version for the given namespace. Return true if the db version is set by - * this method. Throws an IllegalOperation if the caller attempts to change an existing db - * version. If the parent operation is unversioned, does nothing and returns false. Used to - * enforce that the catalog cache is up-to-date when doing a local read. - */ - virtual bool setExpectedDbVersion(OperationContext* opCtx, - const NamespaceString& nss, - DatabaseVersion dbVersion) = 0; - - /** - * Unsets the expected db version for the given namespace. Used as a pair with - * 'setExpectedDbVersion()' to reset state after a local read is attempted. - */ - virtual void unsetExpectedDbVersion(OperationContext* opCtx, const NamespaceString& nss) = 0; + class ScopedExpectUnshardedCollection { + public: + virtual ~ScopedExpectUnshardedCollection() = default; + }; + virtual std::unique_ptr<ScopedExpectUnshardedCollection> expectUnshardedCollectionInScope( + OperationContext* opCtx, + const NamespaceString& nss, + const boost::optional<DatabaseVersion>& dbVersion) = 0; /** * Checks if this process is on the primary shard for db specified by the given namespace. diff --git a/src/mongo/db/pipeline/process_interface/mongos_process_interface.h b/src/mongo/db/pipeline/process_interface/mongos_process_interface.h index 055a9f1533b..c467e616078 100644 --- a/src/mongo/db/pipeline/process_interface/mongos_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/mongos_process_interface.h @@ -214,19 +214,10 @@ public: MONGO_UNREACHABLE; } - void setExpectedShardVersion(OperationContext* opCtx, - const NamespaceString& nss, - boost::optional<ChunkVersion> chunkVersion) override { - MONGO_UNREACHABLE; - } - - bool setExpectedDbVersion(OperationContext* opCtx, - const NamespaceString& nss, - DatabaseVersion dbVersion) override { - MONGO_UNREACHABLE; - } - - void unsetExpectedDbVersion(OperationContext* opCtx, const NamespaceString& nss) override { + std::unique_ptr<ScopedExpectUnshardedCollection> expectUnshardedCollectionInScope( + OperationContext* opCtx, + const NamespaceString& nss, + const boost::optional<DatabaseVersion>& dbVersion) override { MONGO_UNREACHABLE; } diff --git a/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h b/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h index b2b4f731f4e..ccbe90205c9 100644 --- a/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h @@ -117,21 +117,16 @@ public: const NamespaceString& ns, const std::vector<BSONObj>& indexSpecs) override; - void setExpectedShardVersion(OperationContext* opCtx, - const NamespaceString& nss, - boost::optional<ChunkVersion> chunkVersion) override { - // Do nothing on a non-shardsvr mongoD. - } - - bool setExpectedDbVersion(OperationContext* opCtx, - const NamespaceString& nss, - DatabaseVersion dbVersion) override { - // Do nothing on a non-shardsvr mongoD. - return false; - } + std::unique_ptr<ScopedExpectUnshardedCollection> expectUnshardedCollectionInScope( + OperationContext* opCtx, + const NamespaceString& nss, + const boost::optional<DatabaseVersion>& dbVersion) override { + class ScopedExpectUnshardedCollectionNoop : public ScopedExpectUnshardedCollection { + public: + ScopedExpectUnshardedCollectionNoop() = default; + }; - void unsetExpectedDbVersion(OperationContext* opCtx, const NamespaceString& nss) override { - // Do nothing on a non-shardsvr mongoD. + return std::make_unique<ScopedExpectUnshardedCollectionNoop>(); } void checkOnPrimaryShardForDb(OperationContext* opCtx, const NamespaceString& nss) override { diff --git a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp index 204806d839a..69b5a111e2b 100644 --- a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp @@ -384,43 +384,28 @@ ShardServerProcessInterface::attachCursorSourceToPipeline(Pipeline* ownedPipelin ownedPipeline, shardTargetingPolicy, std::move(readConcern)); } -void ShardServerProcessInterface::unsetExpectedDbVersion(OperationContext* opCtx, - const NamespaceString& nss) { - auto& oss = OperationShardingState::get(opCtx); - oss.unsetExpectedDbVersion_Only_For_Aggregation_Local_Reads(nss.db()); -} - -bool ShardServerProcessInterface::setExpectedDbVersion(OperationContext* opCtx, - const NamespaceString& nss, - DatabaseVersion dbVersion) { - auto& oss = OperationShardingState::get(opCtx); - - if (auto knownDBVersion = oss.getDbVersion(nss.db())) { - uassert(ErrorCodes::IllegalOperation, - "Expected db version must match known db version", - knownDBVersion == dbVersion); - } else { - OperationShardingState::setShardRole(opCtx, nss, boost::none /* shardVersion */, dbVersion); - } - - return false; -} - -void ShardServerProcessInterface::setExpectedShardVersion( +std::unique_ptr<MongoProcessInterface::ScopedExpectUnshardedCollection> +ShardServerProcessInterface::expectUnshardedCollectionInScope( OperationContext* opCtx, const NamespaceString& nss, - boost::optional<ChunkVersion> chunkVersion) { - auto& oss = OperationShardingState::get(opCtx); - if (oss.hasShardVersion(nss)) { - invariant(oss.getShardVersion(nss) == chunkVersion); - } else { - OperationShardingState::setShardRole( - opCtx, nss, chunkVersion, boost::none /* databaseVersion */); - } + const boost::optional<DatabaseVersion>& dbVersion) { + class ScopedExpectUnshardedCollectionImpl : public ScopedExpectUnshardedCollection { + public: + ScopedExpectUnshardedCollectionImpl(OperationContext* opCtx, + const NamespaceString& nss, + const boost::optional<DatabaseVersion>& dbVersion) + : _expectUnsharded(opCtx, nss, ChunkVersion::UNSHARDED(), dbVersion) {} + + private: + ScopedSetShardRole _expectUnsharded; + }; + + return std::make_unique<ScopedExpectUnshardedCollectionImpl>(opCtx, nss, dbVersion); } void ShardServerProcessInterface::checkOnPrimaryShardForDb(OperationContext* opCtx, const NamespaceString& nss) { DatabaseShardingState::checkIsPrimaryShardForDb(opCtx, nss.db()); } + } // namespace mongo diff --git a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h index d16ae72c444..f6026f6ef3a 100644 --- a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h @@ -128,15 +128,10 @@ public: ShardTargetingPolicy shardTargetingPolicy = ShardTargetingPolicy::kAllowed, boost::optional<BSONObj> readConcern = boost::none) final; - void setExpectedShardVersion(OperationContext* opCtx, - const NamespaceString& nss, - boost::optional<ChunkVersion> chunkVersion) final; - - bool setExpectedDbVersion(OperationContext* opCtx, - const NamespaceString& nss, - DatabaseVersion dbVersion) final; - - void unsetExpectedDbVersion(OperationContext* opCtx, const NamespaceString& nss) final; + std::unique_ptr<ScopedExpectUnshardedCollection> expectUnshardedCollectionInScope( + OperationContext* opCtx, + const NamespaceString& nss, + const boost::optional<DatabaseVersion>& dbVersion) override; void checkOnPrimaryShardForDb(OperationContext* opCtx, const NamespaceString& nss) final; }; diff --git a/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h b/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h index e3d83322111..fb30be686fb 100644 --- a/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h @@ -31,7 +31,6 @@ #include "mongo/db/pipeline/pipeline.h" #include "mongo/db/pipeline/process_interface/mongo_process_interface.h" - #include "mongo/util/assert_util.h" namespace mongo { @@ -267,20 +266,16 @@ public: return {*fieldPaths, targetCollectionVersion}; } - void setExpectedShardVersion(OperationContext* opCtx, - const NamespaceString& nss, - boost::optional<ChunkVersion> chunkVersion) override { - // Do nothing. - } - - void unsetExpectedDbVersion(OperationContext* opCtx, const NamespaceString& nss) override { - // Do nothing. - } + std::unique_ptr<ScopedExpectUnshardedCollection> expectUnshardedCollectionInScope( + OperationContext* opCtx, + const NamespaceString& nss, + const boost::optional<DatabaseVersion>& dbVersion) override { + class ScopedExpectUnshardedCollectionNoop : public ScopedExpectUnshardedCollection { + public: + ScopedExpectUnshardedCollectionNoop() = default; + }; - bool setExpectedDbVersion(OperationContext* opCtx, - const NamespaceString& nss, - DatabaseVersion dbVersion) override { - return false; + return std::make_unique<ScopedExpectUnshardedCollectionNoop>(); } void checkOnPrimaryShardForDb(OperationContext* opCtx, const NamespaceString& nss) override { diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp index 3eaeb1666c8..f64257eedf9 100644 --- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp +++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp @@ -29,8 +29,6 @@ #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery -#include "mongo/platform/basic.h" - #include "mongo/db/pipeline/sharded_agg_helpers.h" #include "mongo/db/curop.h" @@ -54,7 +52,6 @@ #include "mongo/db/vector_clock.h" #include "mongo/logv2/log.h" #include "mongo/rpc/get_status_from_command_result.h" -#include "mongo/s/catalog/type_shard.h" #include "mongo/s/cluster_commands_helpers.h" #include "mongo/s/query/cluster_query_knobs_gen.h" #include "mongo/s/query/document_source_merge_cursors.h" @@ -66,11 +63,10 @@ namespace mongo { namespace sharded_agg_helpers { +namespace { MONGO_FAIL_POINT_DEFINE(shardedAggregateHangBeforeEstablishingShardCursors); -namespace { - /** * Given a document representing an aggregation command such as * {aggregate: "myCollection", pipeline: [], ...}, @@ -1371,23 +1367,10 @@ std::unique_ptr<Pipeline, PipelineDeleter> attachCursorToPipeline( // do a local read. The primary may be moved right after the primary shard check, // but the local read path will do a db version check before it establishes a cursor // to catch this case and ensure we fail to read locally. - auto setDbVersion = false; try { - expCtx->mongoProcessInterface->setExpectedShardVersion( - expCtx->opCtx, expCtx->ns, ChunkVersion::UNSHARDED()); - setDbVersion = expCtx->mongoProcessInterface->setExpectedDbVersion( - expCtx->opCtx, expCtx->ns, cm.dbVersion()); - - // During 'attachCursorSourceToPipelineForLocalRead', the expected db version - // must be set. Whether or not that call is succesful, to avoid affecting later - // operations on this db, we should unset the expected db version on the opCtx, - // if we set it above. - ScopeGuard dbVersionUnsetter([&]() { - if (setDbVersion) { - expCtx->mongoProcessInterface->unsetExpectedDbVersion(expCtx->opCtx, - expCtx->ns); - } - }); + auto expectUnshardedCollection( + expCtx->mongoProcessInterface->expectUnshardedCollectionInScope( + expCtx->opCtx, expCtx->ns, cm.dbVersion())); expCtx->mongoProcessInterface->checkOnPrimaryShardForDb(expCtx->opCtx, expCtx->ns); diff --git a/src/mongo/db/s/operation_sharding_state.cpp b/src/mongo/db/s/operation_sharding_state.cpp index 256924b4665..1e371ab0d60 100644 --- a/src/mongo/db/s/operation_sharding_state.cpp +++ b/src/mongo/db/s/operation_sharding_state.cpp @@ -86,11 +86,6 @@ void OperationShardingState::setShardRole(OperationContext* opCtx, } } -void OperationShardingState::unsetExpectedDbVersion_Only_For_Aggregation_Local_Reads( - const StringData& dbName) { - _databaseVersions.erase(dbName); -} - bool OperationShardingState::hasShardVersion(const NamespaceString& nss) const { return _shardVersions.find(nss.ns()) != _shardVersions.end(); } diff --git a/src/mongo/db/s/operation_sharding_state.h b/src/mongo/db/s/operation_sharding_state.h index 7a9a20012bb..471016de760 100644 --- a/src/mongo/db/s/operation_sharding_state.h +++ b/src/mongo/db/s/operation_sharding_state.h @@ -117,11 +117,6 @@ public: const boost::optional<DatabaseVersion>& dbVersion); /** - * Removes the databaseVersion stored for the given namespace. - */ - void unsetExpectedDbVersion_Only_For_Aggregation_Local_Reads(const StringData& dbName); - - /** * Returns whether or not there is a shard version for the namespace associated with this * operation. */ |