summaryrefslogtreecommitdiff
path: root/src/mongo/s
diff options
context:
space:
mode:
authorjannaerin <golden.janna@gmail.com>2019-06-27 13:53:38 -0400
committerjannaerin <golden.janna@gmail.com>2019-07-17 13:26:03 -0400
commit27079c53fa994874272ea35d6148c0cd878316aa (patch)
tree2b4bd75610faa7475039d6d3217a2910b1d1ef17 /src/mongo/s
parent56f7aa61a37580a973adca93a939cf969d9130d8 (diff)
downloadmongo-27079c53fa994874272ea35d6148c0cd878316aa.tar.gz
SERVER-41950 MongoS should attach dbVersion on agg path when collection is unsharded
Diffstat (limited to 'src/mongo/s')
-rw-r--r--src/mongo/s/cluster_commands_helpers.cpp8
-rw-r--r--src/mongo/s/cluster_commands_helpers.h5
-rw-r--r--src/mongo/s/query/cluster_aggregate.cpp14
-rw-r--r--src/mongo/s/query/cluster_aggregate.h3
4 files changed, 19 insertions, 11 deletions
diff --git a/src/mongo/s/cluster_commands_helpers.cpp b/src/mongo/s/cluster_commands_helpers.cpp
index bd3c4d8b323..4c1dd33cf26 100644
--- a/src/mongo/s/cluster_commands_helpers.cpp
+++ b/src/mongo/s/cluster_commands_helpers.cpp
@@ -101,10 +101,6 @@ std::unique_ptr<WriteConcernErrorDetail> getWriteConcernErrorDetailFromBSONObj(c
namespace {
-BSONObj appendDbVersionIfPresent(BSONObj cmdObj, const CachedDatabaseInfo& dbInfo) {
- return appendDbVersionIfPresent(std::move(cmdObj), dbInfo.databaseVersion());
-}
-
const auto kAllowImplicitCollectionCreation = "allowImplicitCollectionCreation"_sd;
/**
@@ -246,6 +242,10 @@ std::vector<AsyncRequestsSender::Response> gatherResponses(
return responses;
}
+BSONObj appendDbVersionIfPresent(BSONObj cmdObj, const CachedDatabaseInfo& dbInfo) {
+ return appendDbVersionIfPresent(std::move(cmdObj), dbInfo.databaseVersion());
+}
+
BSONObj appendDbVersionIfPresent(BSONObj cmdObj, DatabaseVersion dbVersion) {
if (databaseVersion::isFixed(dbVersion)) {
return cmdObj;
diff --git a/src/mongo/s/cluster_commands_helpers.h b/src/mongo/s/cluster_commands_helpers.h
index 9be10a0f481..1e5ee9685fc 100644
--- a/src/mongo/s/cluster_commands_helpers.h
+++ b/src/mongo/s/cluster_commands_helpers.h
@@ -71,6 +71,11 @@ std::vector<AsyncRequestsSender::Response> gatherResponses(
const std::set<ErrorCodes::Error>& ignorableErrors = {});
/**
+ * Returns a copy of 'cmdObj' with dbVersion appended if it exists in 'dbInfo'
+ */
+BSONObj appendDbVersionIfPresent(BSONObj cmdObj, const CachedDatabaseInfo& dbInfo);
+
+/**
* Returns a copy of 'cmdObj' with 'databaseVersion' appended.
*/
BSONObj appendDbVersionIfPresent(BSONObj cmdObj, DatabaseVersion dbVersion);
diff --git a/src/mongo/s/query/cluster_aggregate.cpp b/src/mongo/s/query/cluster_aggregate.cpp
index 76a5d1003fd..4d1d85e9503 100644
--- a/src/mongo/s/query/cluster_aggregate.cpp
+++ b/src/mongo/s/query/cluster_aggregate.cpp
@@ -790,7 +790,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
litePipe.allowedToPassthroughFromMongos() && !involvesShardedCollections) {
const auto primaryShardId = routingInfo->db().primary()->getId();
return aggPassthrough(
- opCtx, namespaces, primaryShardId, request, litePipe, privileges, result);
+ opCtx, namespaces, routingInfo->db(), request, litePipe, privileges, result);
}
// Populate the collection UUID and the appropriate collation to use.
@@ -871,7 +871,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
Status ClusterAggregate::aggPassthrough(OperationContext* opCtx,
const Namespaces& namespaces,
- const ShardId& shardId,
+ const CachedDatabaseInfo& dbInfo,
const AggregationRequest& aggRequest,
const LiteParsedPipeline& liteParsedPipeline,
const PrivilegeVector& privileges,
@@ -882,14 +882,16 @@ Status ClusterAggregate::aggPassthrough(OperationContext* opCtx,
sharded_agg_helpers::createPassthroughCommandForShard(
opCtx, aggRequest, boost::none, nullptr, BSONObj()));
+ const auto shardId = dbInfo.primary()->getId();
+ const auto cmdObjWithShardVersion = (shardId != ShardRegistry::kConfigServerShardId)
+ ? appendShardVersion(std::move(cmdObj), ChunkVersion::UNSHARDED())
+ : std::move(cmdObj);
+
MultiStatementTransactionRequestsSender ars(
opCtx,
Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
namespaces.executionNss.db().toString(),
- {{shardId,
- shardId != ShardRegistry::kConfigServerShardId
- ? appendShardVersion(std::move(cmdObj), ChunkVersion::UNSHARDED())
- : std::move(cmdObj)}},
+ {{shardId, appendDbVersionIfPresent(cmdObjWithShardVersion, dbInfo)}},
ReadPreferenceSetting::get(opCtx),
Shard::RetryPolicy::kIdempotent);
auto response = ars.next();
diff --git a/src/mongo/s/query/cluster_aggregate.h b/src/mongo/s/query/cluster_aggregate.h
index afa774cac37..630f4d987b1 100644
--- a/src/mongo/s/query/cluster_aggregate.h
+++ b/src/mongo/s/query/cluster_aggregate.h
@@ -38,6 +38,7 @@
#include "mongo/db/pipeline/aggregation_request.h"
#include "mongo/db/pipeline/document_source.h"
#include "mongo/s/async_requests_sender.h"
+#include "mongo/s/catalog_cache.h"
#include "mongo/s/commands/strategy.h"
#include "mongo/s/query/cluster_client_cursor_params.h"
#include "mongo/s/query/document_source_merge_cursors.h"
@@ -107,7 +108,7 @@ public:
private:
static Status aggPassthrough(OperationContext*,
const Namespaces&,
- const ShardId&,
+ const CachedDatabaseInfo&,
const AggregationRequest&,
const LiteParsedPipeline&,
const PrivilegeVector& privileges,