diff options
author | Hana Pearlman <hana.pearlman@mongodb.com> | 2021-09-30 15:50:20 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-09-30 16:35:28 +0000 |
commit | 6c4a4bb88aa29b8edb42690bf123a43d72a4c621 (patch) | |
tree | c7ac8b237ca7a39719415fe2dec101d627305ea8 /src/mongo/db/pipeline | |
parent | 85a5fe32e3932fd834993cbf32d2115fffae4e01 (diff) | |
download | mongo-6c4a4bb88aa29b8edb42690bf123a43d72a4c621.tar.gz |
SERVER-58376: Perform local reads when on the primary reading from an unsharded coll
Co-authored-by: Alya Carina Berciu alya.berciu@mongodb.com
Diffstat (limited to 'src/mongo/db/pipeline')
8 files changed, 200 insertions, 13 deletions
diff --git a/src/mongo/db/pipeline/process_interface/mongo_process_interface.h b/src/mongo/db/pipeline/process_interface/mongo_process_interface.h index 1be99ee1826..64d23c976a4 100644 --- a/src/mongo/db/pipeline/process_interface/mongo_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/mongo_process_interface.h @@ -430,13 +430,37 @@ public: * Sets the expected shard version for the given namespace. Invariants if the caller attempts to * change an existing shard version, or if the shard version for this namespace has already been * checked by the commands infrastructure. Used by $lookup and $graphLookup to enforce the - * constraint that the foreign collection must be unsharded. If the parent operation is - * unversioned, this method does nothing. + * constraint that the foreign collection must be unsharded if featureFlagShardedLookup is + * turned off. Also used to enforce that the catalog cache is up-to-date when doing a local + * read.If the parent operation is unversioned, this method does nothing. */ virtual void setExpectedShardVersion(OperationContext* opCtx, const NamespaceString& nss, boost::optional<ChunkVersion> chunkVersion) = 0; + /** + * Sets the expected db version for the given namespace. Return true if the db version is set by + * this method. Throws an IllegalOperation if the caller attempts to change an existing db + * version. If the parent operation is unversioned, does nothing and returns false. Used to + * enforce that the catalog cache is up-to-date when doing a local read. + */ + virtual bool setExpectedDbVersion(OperationContext* opCtx, + const NamespaceString& nss, + DatabaseVersion dbVersion) = 0; + + /** + * Unsets the expected db version for the given namespace. Used as a pair with + * 'setExpectedDbVersion()' to reset state after a local read is attempted. + */ + virtual void unsetExpectedDbVersion(OperationContext* opCtx, const NamespaceString& nss) = 0; + + /** + * Checks if this process is on the primary shard for db specified by the given namespace. + * Throws an IllegalOperation exception otherwise. Assumes the operation context has a db + * version attached to it for db name specified by the namespace. + */ + virtual void checkOnPrimaryShardForDb(OperationContext* opCtx, const NamespaceString& nss) = 0; + virtual std::unique_ptr<ResourceYielder> getResourceYielder() const = 0; /** diff --git a/src/mongo/db/pipeline/process_interface/mongos_process_interface.h b/src/mongo/db/pipeline/process_interface/mongos_process_interface.h index 83998c935a1..f1dd0eab9b8 100644 --- a/src/mongo/db/pipeline/process_interface/mongos_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/mongos_process_interface.h @@ -216,6 +216,20 @@ public: MONGO_UNREACHABLE; } + bool setExpectedDbVersion(OperationContext* opCtx, + const NamespaceString& nss, + DatabaseVersion dbVersion) override { + MONGO_UNREACHABLE; + } + + void unsetExpectedDbVersion(OperationContext* opCtx, const NamespaceString& nss) override { + MONGO_UNREACHABLE; + } + + void checkOnPrimaryShardForDb(OperationContext* opCtx, const NamespaceString& nss) override { + MONGO_UNREACHABLE; + } + std::unique_ptr<ResourceYielder> getResourceYielder() const override { return nullptr; } diff --git a/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h b/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h index e402d549494..663fa4531bf 100644 --- a/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h @@ -125,6 +125,21 @@ public: // Do nothing on a non-shardsvr mongoD. } + bool setExpectedDbVersion(OperationContext* opCtx, + const NamespaceString& nss, + DatabaseVersion dbVersion) override { + // Do nothing on a non-shardsvr mongoD. + return false; + } + + void unsetExpectedDbVersion(OperationContext* opCtx, const NamespaceString& nss) override { + // Do nothing on a non-shardsvr mongoD. + } + + void checkOnPrimaryShardForDb(OperationContext* opCtx, const NamespaceString& nss) override { + // Do nothing on a non-shardsvr mongoD. + } + protected: // This constructor is marked as protected in order to prevent instantiation since this // interface is designed to have a concrete process interface for each possible diff --git a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp index 628446554d5..67bd2f2d4f9 100644 --- a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp @@ -44,6 +44,7 @@ #include "mongo/db/pipeline/sharded_agg_helpers.h" #include "mongo/db/query/query_knobs_gen.h" #include "mongo/db/s/collection_sharding_state.h" +#include "mongo/db/s/database_sharding_state.h" #include "mongo/db/s/operation_sharding_state.h" #include "mongo/db/s/sharding_state.h" #include "mongo/rpc/get_status_from_command_result.h" @@ -406,6 +407,29 @@ ShardServerProcessInterface::attachCursorSourceToPipeline(Pipeline* ownedPipelin ownedPipeline, shardTargetingPolicy, std::move(readConcern)); } +void ShardServerProcessInterface::unsetExpectedDbVersion(OperationContext* opCtx, + const NamespaceString& nss) { + auto& oss = OperationShardingState::get(opCtx); + oss.unsetExpectedDbVersion_Only_For_Aggregation_Local_Reads(nss.db()); +} + +bool ShardServerProcessInterface::setExpectedDbVersion(OperationContext* opCtx, + const NamespaceString& nss, + DatabaseVersion dbVersion) { + auto& oss = OperationShardingState::get(opCtx); + + if (auto knownDBVersion = oss.getDbVersion(nss.db())) { + uassert(ErrorCodes::IllegalOperation, + "Expected db version must match known db version", + knownDBVersion == dbVersion); + } else if (_opIsVersioned) { + oss.initializeClientRoutingVersions(nss, boost::none, dbVersion); + return true; + } + + return false; +} + void ShardServerProcessInterface::setExpectedShardVersion( OperationContext* opCtx, const NamespaceString& nss, @@ -418,6 +442,11 @@ void ShardServerProcessInterface::setExpectedShardVersion( } } +void ShardServerProcessInterface::checkOnPrimaryShardForDb(OperationContext* opCtx, + const NamespaceString& nss) { + DatabaseShardingState::checkIsPrimaryShardForDb(opCtx, nss.db()); +} + BSONObj ShardServerProcessInterface::_versionCommandIfAppropriate( BSONObj cmdObj, const CachedDatabaseInfo& cachedDbInfo, diff --git a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h index b223907c792..59309fddcd4 100644 --- a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h @@ -139,6 +139,14 @@ public: const NamespaceString& nss, boost::optional<ChunkVersion> chunkVersion) final; + bool setExpectedDbVersion(OperationContext* opCtx, + const NamespaceString& nss, + DatabaseVersion dbVersion) final; + + void unsetExpectedDbVersion(OperationContext* opCtx, const NamespaceString& nss) final; + + void checkOnPrimaryShardForDb(OperationContext* opCtx, const NamespaceString& nss) final; + private: // If the current operation is versioned, then we attach the DB version to the command object; // otherwise, it is returned unmodified. Used when running internal commands, as the parent diff --git a/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h b/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h index 4a3973ab030..51adcf96f6e 100644 --- a/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h @@ -267,6 +267,20 @@ public: // Do nothing. } + void unsetExpectedDbVersion(OperationContext* opCtx, const NamespaceString& nss) override { + // Do nothing. + } + + bool setExpectedDbVersion(OperationContext* opCtx, + const NamespaceString& nss, + DatabaseVersion dbVersion) override { + return false; + } + + void checkOnPrimaryShardForDb(OperationContext* opCtx, const NamespaceString& nss) override { + // Do nothing. + } + std::unique_ptr<TemporaryRecordStore> createTemporaryRecordStore( const boost::intrusive_ptr<ExpressionContext>& expCtx) const { MONGO_UNREACHABLE; diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp index 21bb1d87f37..288c59814bf 100644 --- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp +++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp @@ -1296,6 +1296,63 @@ std::unique_ptr<Pipeline, PipelineDeleter> attachCursorToPipeline( return expCtx->mongoProcessInterface->attachCursorSourceToPipelineForLocalRead( pipelineToTarget.release()); } + + auto cm = catalogCache->getCollectionRoutingInfo(expCtx->opCtx, expCtx->ns); + if (cm.isOK() && !cm.getValue().isSharded()) { + // If the collection is unsharded and we are on the primary, we should be able to + // do a local read. The primary may be moved right after the primary shard check, + // but the local read path will do a db version check before it establishes a cursor + // to catch this case and ensure we fail to read locally. + auto setDbVersion = false; + try { + expCtx->mongoProcessInterface->setExpectedShardVersion( + expCtx->opCtx, expCtx->ns, ChunkVersion::UNSHARDED()); + setDbVersion = expCtx->mongoProcessInterface->setExpectedDbVersion( + expCtx->opCtx, expCtx->ns, cm.getValue().dbVersion()); + + // During 'attachCursorSourceToPipelineForLocalRead', the expected db version + // must be set. Whether or not that call is succesful, to avoid affecting later + // operations on this db, we should unset the expected db version on the opCtx, + // if we set it above. + ScopeGuard dbVersionUnsetter([&]() { + if (setDbVersion) { + expCtx->mongoProcessInterface->unsetExpectedDbVersion(expCtx->opCtx, + expCtx->ns); + } + }); + + expCtx->mongoProcessInterface->checkOnPrimaryShardForDb(expCtx->opCtx, + expCtx->ns); + + LOGV2_DEBUG(5837600, + 3, + "Performing local read", + "ns"_attr = expCtx->ns, + "pipeline"_attr = pipelineToTarget->serializeToBson(), + "comment"_attr = expCtx->opCtx->getComment()); + + return expCtx->mongoProcessInterface->attachCursorSourceToPipelineForLocalRead( + pipelineToTarget.release()); + } catch (ExceptionFor<ErrorCodes::IllegalOperation>&) { + // The current node isn't the primary for or has stale information about this + // collection, proceed with shard targeting. + } catch (ExceptionFor<ErrorCodes::StaleDbVersion>&) { + // The current node has stale information about this collection, proceed with + // shard targeting, which has logic to handle refreshing that may be needed. + } catch (ExceptionForCat<ErrorCategory::StaleShardVersionError>&) { + // The current node has stale information about this collection, proceed with + // shard targeting, which has logic to handle refreshing that may be needed. + } catch (ExceptionFor<ErrorCodes::CommandNotSupportedOnView>&) { + // The current node may be trying to run a pipeline on a namespace which is an + // unresolved view, proceed with shard targeting, + } + + // The local read failed. Recreate 'pipelineToTarget' if it was released above. + if (!pipelineToTarget) { + pipelineToTarget = pipeline->clone(); + } + } + return targetShardsAndAddMergeCursors(expCtx, std::move(pipelineToTarget), boost::none, diff --git a/src/mongo/db/pipeline/sharded_union_test.cpp b/src/mongo/db/pipeline/sharded_union_test.cpp index e27be5c5477..1f480e61f99 100644 --- a/src/mongo/db/pipeline/sharded_union_test.cpp +++ b/src/mongo/db/pipeline/sharded_union_test.cpp @@ -365,7 +365,7 @@ TEST_F(ShardedUnionTest, AvoidsSplittingSubPipelineIfRefreshedDistributionDoesNo TEST_F(ShardedUnionTest, IncorporatesViewDefinitionAndRetriesWhenViewErrorReceived) { // Sharded by {_id: 1}, [MinKey, 0) on shard "0", [0, MaxKey) on shard "1". auto shards = setupNShards(2); - loadRoutingTableWithTwoChunksAndTwoShards(kTestAggregateNss); + auto cm = loadRoutingTableWithTwoChunksAndTwoShards(kTestAggregateNss); NamespaceString nsToUnionWith(expCtx()->ns.db(), "view"); // Mock out the view namespace as emtpy for now - this is what it would be when parsing in a @@ -392,18 +392,44 @@ TEST_F(ShardedUnionTest, IncorporatesViewDefinitionAndRetriesWhenViewErrorReceiv ASSERT(unionWith->getNext().isEOF()); }); - // Mock out one error response, then expect a refresh of the sharding catalog for that - // namespace, then mock out a successful response. + // Mock the expected config server queries. + const OID epoch = OID::gen(); + const UUID uuid = UUID::gen(); + const ShardKeyPattern shardKeyPattern(BSON("_id" << 1)); + + const Timestamp timestamp; + ChunkVersion version(1, 0, epoch, timestamp); + + ChunkType chunk1(*cm.getUUID(), + {shardKeyPattern.getKeyPattern().globalMin(), BSON("_id" << 0)}, + version, + {"0"}); + chunk1.setName(OID::gen()); + version.incMinor(); + + ChunkType chunk2(*cm.getUUID(), + {BSON("_id" << 0), shardKeyPattern.getKeyPattern().globalMax()}, + version, + {"1"}); + chunk2.setName(OID::gen()); + version.incMinor(); + + expectCollectionAndChunksAggregation( + kTestAggregateNss, epoch, timestamp, uuid, shardKeyPattern, {chunk1, chunk2}); + + // Mock out the sharded view error responses from both shards. + std::vector<BSONObj> viewPipeline = {fromjson("{$group: {_id: '$groupKey'}}"), + // Prevent the $match from being pushed into the shards + // where it would not execute in this mocked environment. + fromjson("{$_internalInhibitOptimization: {}}"), + fromjson("{$match: {_id: 'unionResult'}}")}; + onCommand([&](const executor::RemoteCommandRequest& request) { + return createErrorCursorResponse( + Status{ResolvedView{expectedBackingNs, viewPipeline, BSONObj()}, "It was a view!"_sd}); + }); onCommand([&](const executor::RemoteCommandRequest& request) { return createErrorCursorResponse( - Status{ResolvedView{expectedBackingNs, - {fromjson("{$group: {_id: '$groupKey'}}"), - // Prevent the $match from being pushed into the shards where it - // would not execute in this mocked environment. - fromjson("{$_internalInhibitOptimization: {}}"), - fromjson("{$match: {_id: 'unionResult'}}")}, - BSONObj()}, - "It was a view!"_sd}); + Status{ResolvedView{expectedBackingNs, viewPipeline, BSONObj()}, "It was a view!"_sd}); }); // That error should be incorporated, then we should target both shards. The results should be |