summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/pipeline')
-rw-r--r--src/mongo/db/pipeline/dispatch_shard_pipeline_test.cpp6
-rw-r--r--src/mongo/db/pipeline/document_source_merge.cpp4
-rw-r--r--src/mongo/db/pipeline/process_interface/common_process_interface.cpp4
-rw-r--r--src/mongo/db/pipeline/sharded_agg_helpers.cpp15
-rw-r--r--src/mongo/db/pipeline/sharded_union_test.cpp12
5 files changed, 28 insertions, 13 deletions
diff --git a/src/mongo/db/pipeline/dispatch_shard_pipeline_test.cpp b/src/mongo/db/pipeline/dispatch_shard_pipeline_test.cpp
index 26eb3002a80..79b666c1956 100644
--- a/src/mongo/db/pipeline/dispatch_shard_pipeline_test.cpp
+++ b/src/mongo/db/pipeline/dispatch_shard_pipeline_test.cpp
@@ -176,7 +176,8 @@ TEST_F(DispatchShardPipelineTest, DispatchShardPipelineDoesNotRetryOnStaleConfig
Timestamp timestamp{1, 0};
return createErrorCursorResponse(
{StaleConfigInfo(kTestAggregateNss,
- ShardVersion(ChunkVersion({epoch, timestamp}, {1, 0})),
+ ShardVersion(ChunkVersion({epoch, timestamp}, {1, 0}),
+ CollectionIndexes({epoch, timestamp}, boost::none)),
boost::none,
ShardId{"0"}),
"Mock error: shard version mismatch"});
@@ -220,7 +221,8 @@ TEST_F(DispatchShardPipelineTest, WrappedDispatchDoesRetryOnStaleConfigError) {
onCommand([&](const executor::RemoteCommandRequest& request) {
return createErrorCursorResponse(
{StaleConfigInfo(kTestAggregateNss,
- ShardVersion(ChunkVersion({epoch, timestamp}, {2, 0})),
+ ShardVersion(ChunkVersion({epoch, timestamp}, {2, 0}),
+ CollectionIndexes({epoch, timestamp}, boost::none)),
boost::none,
ShardId{"0"}),
"Mock error: shard version mismatch"});
diff --git a/src/mongo/db/pipeline/document_source_merge.cpp b/src/mongo/db/pipeline/document_source_merge.cpp
index 63e2aa38bde..ed9c14b3bed 100644
--- a/src/mongo/db/pipeline/document_source_merge.cpp
+++ b/src/mongo/db/pipeline/document_source_merge.cpp
@@ -555,7 +555,9 @@ Value DocumentSourceMerge::serialize(boost::optional<ExplainOptions::Verbosity>
return mergeOnFields;
}());
spec.setTargetCollectionVersion(
- _targetCollectionVersion ? boost::make_optional(ShardVersion(*_targetCollectionVersion))
+ _targetCollectionVersion ? boost::make_optional(ShardVersion(
+ *_targetCollectionVersion,
+ CollectionIndexes(*_targetCollectionVersion, boost::none)))
: boost::none);
return Value(Document{{getSourceName(), spec.toBSON()}});
}
diff --git a/src/mongo/db/pipeline/process_interface/common_process_interface.cpp b/src/mongo/db/pipeline/process_interface/common_process_interface.cpp
index 8e56f390b13..3aa6d48c136 100644
--- a/src/mongo/db/pipeline/process_interface/common_process_interface.cpp
+++ b/src/mongo/db/pipeline/process_interface/common_process_interface.cpp
@@ -198,7 +198,9 @@ boost::optional<ShardVersion> CommonProcessInterface::refreshAndGetCollectionVer
->catalogCache()
->getCollectionRoutingInfoWithRefresh(expCtx->opCtx, nss));
- return cm.isSharded() ? boost::make_optional(ShardVersion(cm.getVersion())) : boost::none;
+ return cm.isSharded() ? boost::make_optional(ShardVersion(
+ cm.getVersion(), CollectionIndexes(cm.getVersion(), boost::none)))
+ : boost::none;
}
std::vector<FieldPath> CommonProcessInterface::_shardKeyToDocumentKeyFields(
diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
index 13ddc42253d..c836400728b 100644
--- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp
+++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
@@ -206,14 +206,17 @@ std::vector<RemoteCursor> establishShardCursors(OperationContext* opCtx,
// The collection is sharded. Use the routing table to decide which shards to target
// based on the query and collation, and build versioned requests for them.
for (const auto& shardId : shardIds) {
- auto versionedCmdObj = appendShardVersion(cmdObj, cm->getVersion(shardId));
+ ChunkVersion placementVersion = cm->getVersion(shardId);
+ auto versionedCmdObj = appendShardVersion(
+ cmdObj,
+ ShardVersion(placementVersion, CollectionIndexes(placementVersion, boost::none)));
requests.emplace_back(shardId, std::move(versionedCmdObj));
}
} else {
// The collection is unsharded. Target only the primary shard for the database.
// Don't append shard version info when contacting the config servers.
const auto cmdObjWithShardVersion = cm->dbPrimary() != ShardId::kConfigServerId
- ? appendShardVersion(cmdObj, ChunkVersion::UNSHARDED())
+ ? appendShardVersion(cmdObj, ShardVersion::UNSHARDED())
: cmdObj;
requests.emplace_back(cm->dbPrimary(),
appendDbVersionIfPresent(cmdObjWithShardVersion, cm->dbVersion()));
@@ -809,14 +812,16 @@ std::unique_ptr<Pipeline, PipelineDeleter> runPipelineDirectlyOnSingleShard(
auto versionedCmdObj = [&] {
if (cm.isSharded()) {
- return appendShardVersion(aggregation_request_helper::serializeToCommandObj(request),
- cm.getVersion(shardId));
+ ChunkVersion placementVersion = cm.getVersion(shardId);
+ return appendShardVersion(
+ aggregation_request_helper::serializeToCommandObj(request),
+ ShardVersion(placementVersion, CollectionIndexes(placementVersion, boost::none)));
} else {
// The collection is unsharded. Don't append shard version info when contacting the
// config servers.
const auto cmdObjWithShardVersion = (shardId != ShardId::kConfigServerId)
? appendShardVersion(aggregation_request_helper::serializeToCommandObj(request),
- ChunkVersion::UNSHARDED())
+ ShardVersion::UNSHARDED())
: aggregation_request_helper::serializeToCommandObj(request);
return appendDbVersionIfPresent(std::move(cmdObjWithShardVersion), cm.dbVersion());
}
diff --git a/src/mongo/db/pipeline/sharded_union_test.cpp b/src/mongo/db/pipeline/sharded_union_test.cpp
index 2b136a89dc2..54f4b091fa2 100644
--- a/src/mongo/db/pipeline/sharded_union_test.cpp
+++ b/src/mongo/db/pipeline/sharded_union_test.cpp
@@ -163,7 +163,8 @@ TEST_F(ShardedUnionTest, RetriesSubPipelineOnStaleConfigError) {
Timestamp timestamp{1, 0};
return createErrorCursorResponse(
Status{StaleConfigInfo(kTestAggregateNss,
- ShardVersion(ChunkVersion({epoch, timestamp}, {1, 0})),
+ ShardVersion(ChunkVersion({epoch, timestamp}, {1, 0}),
+ CollectionIndexes({epoch, timestamp}, boost::none)),
boost::none,
ShardId{"0"}),
"Mock error: shard version mismatch"});
@@ -248,7 +249,8 @@ TEST_F(ShardedUnionTest, CorrectlySplitsSubPipelineIfRefreshedDistributionRequir
Timestamp timestamp{1, 0};
return createErrorCursorResponse(
Status{StaleConfigInfo(kTestAggregateNss,
- ShardVersion(ChunkVersion({epoch, timestamp}, {1, 0})),
+ ShardVersion(ChunkVersion({epoch, timestamp}, {1, 0}),
+ CollectionIndexes({epoch, timestamp}, boost::none)),
boost::none,
ShardId{"0"}),
"Mock error: shard version mismatch"});
@@ -341,7 +343,8 @@ TEST_F(ShardedUnionTest, AvoidsSplittingSubPipelineIfRefreshedDistributionDoesNo
onCommand([&](const executor::RemoteCommandRequest& request) {
return createErrorCursorResponse(
Status{StaleConfigInfo(kTestAggregateNss,
- ShardVersion(ChunkVersion({epoch, timestamp}, {1, 0})),
+ ShardVersion(ChunkVersion({epoch, timestamp}, {1, 0}),
+ CollectionIndexes({epoch, timestamp}, boost::none)),
boost::none,
ShardId{"0"}),
"Mock error: shard version mismatch"});
@@ -349,7 +352,8 @@ TEST_F(ShardedUnionTest, AvoidsSplittingSubPipelineIfRefreshedDistributionDoesNo
onCommand([&](const executor::RemoteCommandRequest& request) {
return createErrorCursorResponse(
Status{StaleConfigInfo(kTestAggregateNss,
- ShardVersion(ChunkVersion({epoch, timestamp}, {1, 0})),
+ ShardVersion(ChunkVersion({epoch, timestamp}, {1, 0}),
+ CollectionIndexes({epoch, timestamp}, boost::none)),
boost::none,
ShardId{"0"}),
"Mock error: shard version mismatch"});