diff options
author | jannaerin <golden.janna@gmail.com> | 2019-06-27 13:53:38 -0400 |
---|---|---|
committer | jannaerin <golden.janna@gmail.com> | 2019-07-17 13:26:03 -0400 |
commit | 27079c53fa994874272ea35d6148c0cd878316aa (patch) | |
tree | 2b4bd75610faa7475039d6d3217a2910b1d1ef17 /src/mongo/s | |
parent | 56f7aa61a37580a973adca93a939cf969d9130d8 (diff) | |
download | mongo-27079c53fa994874272ea35d6148c0cd878316aa.tar.gz |
SERVER-41950 MongoS should attach dbVersion on agg path when collection is unsharded
Diffstat (limited to 'src/mongo/s')
-rw-r--r-- | src/mongo/s/cluster_commands_helpers.cpp | 8 | ||||
-rw-r--r-- | src/mongo/s/cluster_commands_helpers.h | 5 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_aggregate.cpp | 14 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_aggregate.h | 3 |
4 files changed, 19 insertions, 11 deletions
diff --git a/src/mongo/s/cluster_commands_helpers.cpp b/src/mongo/s/cluster_commands_helpers.cpp index bd3c4d8b323..4c1dd33cf26 100644 --- a/src/mongo/s/cluster_commands_helpers.cpp +++ b/src/mongo/s/cluster_commands_helpers.cpp @@ -101,10 +101,6 @@ std::unique_ptr<WriteConcernErrorDetail> getWriteConcernErrorDetailFromBSONObj(c namespace { -BSONObj appendDbVersionIfPresent(BSONObj cmdObj, const CachedDatabaseInfo& dbInfo) { - return appendDbVersionIfPresent(std::move(cmdObj), dbInfo.databaseVersion()); -} - const auto kAllowImplicitCollectionCreation = "allowImplicitCollectionCreation"_sd; /** @@ -246,6 +242,10 @@ std::vector<AsyncRequestsSender::Response> gatherResponses( return responses; } +BSONObj appendDbVersionIfPresent(BSONObj cmdObj, const CachedDatabaseInfo& dbInfo) { + return appendDbVersionIfPresent(std::move(cmdObj), dbInfo.databaseVersion()); +} + BSONObj appendDbVersionIfPresent(BSONObj cmdObj, DatabaseVersion dbVersion) { if (databaseVersion::isFixed(dbVersion)) { return cmdObj; diff --git a/src/mongo/s/cluster_commands_helpers.h b/src/mongo/s/cluster_commands_helpers.h index 9be10a0f481..1e5ee9685fc 100644 --- a/src/mongo/s/cluster_commands_helpers.h +++ b/src/mongo/s/cluster_commands_helpers.h @@ -71,6 +71,11 @@ std::vector<AsyncRequestsSender::Response> gatherResponses( const std::set<ErrorCodes::Error>& ignorableErrors = {}); /** + * Returns a copy of 'cmdObj' with dbVersion appended if it exists in 'dbInfo' + */ +BSONObj appendDbVersionIfPresent(BSONObj cmdObj, const CachedDatabaseInfo& dbInfo); + +/** * Returns a copy of 'cmdObj' with 'databaseVersion' appended. */ BSONObj appendDbVersionIfPresent(BSONObj cmdObj, DatabaseVersion dbVersion); diff --git a/src/mongo/s/query/cluster_aggregate.cpp b/src/mongo/s/query/cluster_aggregate.cpp index 76a5d1003fd..4d1d85e9503 100644 --- a/src/mongo/s/query/cluster_aggregate.cpp +++ b/src/mongo/s/query/cluster_aggregate.cpp @@ -790,7 +790,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, litePipe.allowedToPassthroughFromMongos() && !involvesShardedCollections) { const auto primaryShardId = routingInfo->db().primary()->getId(); return aggPassthrough( - opCtx, namespaces, primaryShardId, request, litePipe, privileges, result); + opCtx, namespaces, routingInfo->db(), request, litePipe, privileges, result); } // Populate the collection UUID and the appropriate collation to use. @@ -871,7 +871,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, Status ClusterAggregate::aggPassthrough(OperationContext* opCtx, const Namespaces& namespaces, - const ShardId& shardId, + const CachedDatabaseInfo& dbInfo, const AggregationRequest& aggRequest, const LiteParsedPipeline& liteParsedPipeline, const PrivilegeVector& privileges, @@ -882,14 +882,16 @@ Status ClusterAggregate::aggPassthrough(OperationContext* opCtx, sharded_agg_helpers::createPassthroughCommandForShard( opCtx, aggRequest, boost::none, nullptr, BSONObj())); + const auto shardId = dbInfo.primary()->getId(); + const auto cmdObjWithShardVersion = (shardId != ShardRegistry::kConfigServerShardId) + ? appendShardVersion(std::move(cmdObj), ChunkVersion::UNSHARDED()) + : std::move(cmdObj); + MultiStatementTransactionRequestsSender ars( opCtx, Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), namespaces.executionNss.db().toString(), - {{shardId, - shardId != ShardRegistry::kConfigServerShardId - ? appendShardVersion(std::move(cmdObj), ChunkVersion::UNSHARDED()) - : std::move(cmdObj)}}, + {{shardId, appendDbVersionIfPresent(cmdObjWithShardVersion, dbInfo)}}, ReadPreferenceSetting::get(opCtx), Shard::RetryPolicy::kIdempotent); auto response = ars.next(); diff --git a/src/mongo/s/query/cluster_aggregate.h b/src/mongo/s/query/cluster_aggregate.h index afa774cac37..630f4d987b1 100644 --- a/src/mongo/s/query/cluster_aggregate.h +++ b/src/mongo/s/query/cluster_aggregate.h @@ -38,6 +38,7 @@ #include "mongo/db/pipeline/aggregation_request.h" #include "mongo/db/pipeline/document_source.h" #include "mongo/s/async_requests_sender.h" +#include "mongo/s/catalog_cache.h" #include "mongo/s/commands/strategy.h" #include "mongo/s/query/cluster_client_cursor_params.h" #include "mongo/s/query/document_source_merge_cursors.h" @@ -107,7 +108,7 @@ public: private: static Status aggPassthrough(OperationContext*, const Namespaces&, - const ShardId&, + const CachedDatabaseInfo&, const AggregationRequest&, const LiteParsedPipeline&, const PrivilegeVector& privileges, |