diff options
Diffstat (limited to 'src/mongo/db/pipeline')
5 files changed, 28 insertions, 13 deletions
diff --git a/src/mongo/db/pipeline/dispatch_shard_pipeline_test.cpp b/src/mongo/db/pipeline/dispatch_shard_pipeline_test.cpp index 26eb3002a80..79b666c1956 100644 --- a/src/mongo/db/pipeline/dispatch_shard_pipeline_test.cpp +++ b/src/mongo/db/pipeline/dispatch_shard_pipeline_test.cpp @@ -176,7 +176,8 @@ TEST_F(DispatchShardPipelineTest, DispatchShardPipelineDoesNotRetryOnStaleConfig Timestamp timestamp{1, 0}; return createErrorCursorResponse( {StaleConfigInfo(kTestAggregateNss, - ShardVersion(ChunkVersion({epoch, timestamp}, {1, 0})), + ShardVersion(ChunkVersion({epoch, timestamp}, {1, 0}), + CollectionIndexes({epoch, timestamp}, boost::none)), boost::none, ShardId{"0"}), "Mock error: shard version mismatch"}); @@ -220,7 +221,8 @@ TEST_F(DispatchShardPipelineTest, WrappedDispatchDoesRetryOnStaleConfigError) { onCommand([&](const executor::RemoteCommandRequest& request) { return createErrorCursorResponse( {StaleConfigInfo(kTestAggregateNss, - ShardVersion(ChunkVersion({epoch, timestamp}, {2, 0})), + ShardVersion(ChunkVersion({epoch, timestamp}, {2, 0}), + CollectionIndexes({epoch, timestamp}, boost::none)), boost::none, ShardId{"0"}), "Mock error: shard version mismatch"}); diff --git a/src/mongo/db/pipeline/document_source_merge.cpp b/src/mongo/db/pipeline/document_source_merge.cpp index 63e2aa38bde..ed9c14b3bed 100644 --- a/src/mongo/db/pipeline/document_source_merge.cpp +++ b/src/mongo/db/pipeline/document_source_merge.cpp @@ -555,7 +555,9 @@ Value DocumentSourceMerge::serialize(boost::optional<ExplainOptions::Verbosity> return mergeOnFields; }()); spec.setTargetCollectionVersion( - _targetCollectionVersion ? boost::make_optional(ShardVersion(*_targetCollectionVersion)) + _targetCollectionVersion ? boost::make_optional(ShardVersion( + *_targetCollectionVersion, + CollectionIndexes(*_targetCollectionVersion, boost::none))) : boost::none); return Value(Document{{getSourceName(), spec.toBSON()}}); } diff --git a/src/mongo/db/pipeline/process_interface/common_process_interface.cpp b/src/mongo/db/pipeline/process_interface/common_process_interface.cpp index 8e56f390b13..3aa6d48c136 100644 --- a/src/mongo/db/pipeline/process_interface/common_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/common_process_interface.cpp @@ -198,7 +198,9 @@ boost::optional<ShardVersion> CommonProcessInterface::refreshAndGetCollectionVer ->catalogCache() ->getCollectionRoutingInfoWithRefresh(expCtx->opCtx, nss)); - return cm.isSharded() ? boost::make_optional(ShardVersion(cm.getVersion())) : boost::none; + return cm.isSharded() ? boost::make_optional(ShardVersion( + cm.getVersion(), CollectionIndexes(cm.getVersion(), boost::none))) + : boost::none; } std::vector<FieldPath> CommonProcessInterface::_shardKeyToDocumentKeyFields( diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp index 13ddc42253d..c836400728b 100644 --- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp +++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp @@ -206,14 +206,17 @@ std::vector<RemoteCursor> establishShardCursors(OperationContext* opCtx, // The collection is sharded. Use the routing table to decide which shards to target // based on the query and collation, and build versioned requests for them. for (const auto& shardId : shardIds) { - auto versionedCmdObj = appendShardVersion(cmdObj, cm->getVersion(shardId)); + ChunkVersion placementVersion = cm->getVersion(shardId); + auto versionedCmdObj = appendShardVersion( + cmdObj, + ShardVersion(placementVersion, CollectionIndexes(placementVersion, boost::none))); requests.emplace_back(shardId, std::move(versionedCmdObj)); } } else { // The collection is unsharded. Target only the primary shard for the database. // Don't append shard version info when contacting the config servers. const auto cmdObjWithShardVersion = cm->dbPrimary() != ShardId::kConfigServerId - ? appendShardVersion(cmdObj, ChunkVersion::UNSHARDED()) + ? appendShardVersion(cmdObj, ShardVersion::UNSHARDED()) : cmdObj; requests.emplace_back(cm->dbPrimary(), appendDbVersionIfPresent(cmdObjWithShardVersion, cm->dbVersion())); @@ -809,14 +812,16 @@ std::unique_ptr<Pipeline, PipelineDeleter> runPipelineDirectlyOnSingleShard( auto versionedCmdObj = [&] { if (cm.isSharded()) { - return appendShardVersion(aggregation_request_helper::serializeToCommandObj(request), - cm.getVersion(shardId)); + ChunkVersion placementVersion = cm.getVersion(shardId); + return appendShardVersion( + aggregation_request_helper::serializeToCommandObj(request), + ShardVersion(placementVersion, CollectionIndexes(placementVersion, boost::none))); } else { // The collection is unsharded. Don't append shard version info when contacting the // config servers. const auto cmdObjWithShardVersion = (shardId != ShardId::kConfigServerId) ? appendShardVersion(aggregation_request_helper::serializeToCommandObj(request), - ChunkVersion::UNSHARDED()) + ShardVersion::UNSHARDED()) : aggregation_request_helper::serializeToCommandObj(request); return appendDbVersionIfPresent(std::move(cmdObjWithShardVersion), cm.dbVersion()); } diff --git a/src/mongo/db/pipeline/sharded_union_test.cpp b/src/mongo/db/pipeline/sharded_union_test.cpp index 2b136a89dc2..54f4b091fa2 100644 --- a/src/mongo/db/pipeline/sharded_union_test.cpp +++ b/src/mongo/db/pipeline/sharded_union_test.cpp @@ -163,7 +163,8 @@ TEST_F(ShardedUnionTest, RetriesSubPipelineOnStaleConfigError) { Timestamp timestamp{1, 0}; return createErrorCursorResponse( Status{StaleConfigInfo(kTestAggregateNss, - ShardVersion(ChunkVersion({epoch, timestamp}, {1, 0})), + ShardVersion(ChunkVersion({epoch, timestamp}, {1, 0}), + CollectionIndexes({epoch, timestamp}, boost::none)), boost::none, ShardId{"0"}), "Mock error: shard version mismatch"}); @@ -248,7 +249,8 @@ TEST_F(ShardedUnionTest, CorrectlySplitsSubPipelineIfRefreshedDistributionRequir Timestamp timestamp{1, 0}; return createErrorCursorResponse( Status{StaleConfigInfo(kTestAggregateNss, - ShardVersion(ChunkVersion({epoch, timestamp}, {1, 0})), + ShardVersion(ChunkVersion({epoch, timestamp}, {1, 0}), + CollectionIndexes({epoch, timestamp}, boost::none)), boost::none, ShardId{"0"}), "Mock error: shard version mismatch"}); @@ -341,7 +343,8 @@ TEST_F(ShardedUnionTest, AvoidsSplittingSubPipelineIfRefreshedDistributionDoesNo onCommand([&](const executor::RemoteCommandRequest& request) { return createErrorCursorResponse( Status{StaleConfigInfo(kTestAggregateNss, - ShardVersion(ChunkVersion({epoch, timestamp}, {1, 0})), + ShardVersion(ChunkVersion({epoch, timestamp}, {1, 0}), + CollectionIndexes({epoch, timestamp}, boost::none)), boost::none, ShardId{"0"}), "Mock error: shard version mismatch"}); @@ -349,7 +352,8 @@ TEST_F(ShardedUnionTest, AvoidsSplittingSubPipelineIfRefreshedDistributionDoesNo onCommand([&](const executor::RemoteCommandRequest& request) { return createErrorCursorResponse( Status{StaleConfigInfo(kTestAggregateNss, - ShardVersion(ChunkVersion({epoch, timestamp}, {1, 0})), + ShardVersion(ChunkVersion({epoch, timestamp}, {1, 0}), + CollectionIndexes({epoch, timestamp}, boost::none)), boost::none, ShardId{"0"}), "Mock error: shard version mismatch"}); |