diff options
author | Tess Avitabile <tess.avitabile@mongodb.com> | 2017-09-01 14:06:58 -0400 |
---|---|---|
committer | Tess Avitabile <tess.avitabile@mongodb.com> | 2017-09-06 13:41:32 -0400 |
commit | ed601dd01169b8c1fad9fb8d388da0523a1b48f5 (patch) | |
tree | 4d08bd1a36a12967fcb098432709da07236026b3 | |
parent | 456ba544978a0d41a2261bf65da686874fb631a2 (diff) | |
download | mongo-ed601dd01169b8c1fad9fb8d388da0523a1b48f5.tar.gz |
SERVER-30899 Aggregations sent from a 3.4 mongos should serialize 3.4 metadata
22 files changed, 181 insertions, 67 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml index 52937a2116a..8bcc55f14f2 100644 --- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml +++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml @@ -4,8 +4,6 @@ selector: roots: - jstests/sharding/*.js exclude_files: - # SERVER-30899 We changed the meaning of 'fromRouter' - impacting sorts on metadata. - - jstests/sharding/agg_sort.js # Doesn't use ShardingTest so won't actually be run in a mixed version configuration - jstests/sharding/config_version_rollback.js # Behavior change to addShard diff --git a/jstests/sharding/agg_sort.js b/jstests/sharding/agg_sort.js index bd12565dd20..2f2503e63e1 100644 --- a/jstests/sharding/agg_sort.js +++ b/jstests/sharding/agg_sort.js @@ -49,7 +49,7 @@ function assertResultsEqual({actual, expected}) { const resultsAsString = " actual: " + tojson(actual) + "\n expected: " + tojson(expected); assert.eq( - actual.length, expected.length, `different number of results:\n" + ${resultsAsString}`); + actual.length, expected.length, `different number of results:\n${resultsAsString}`); for (let i = 0; i < actual.length; i++) { assert.eq( actual[i], expected[i], `different results at index ${i}:\n${resultsAsString}`); diff --git a/jstests/sharding/aggregation_currentop.js b/jstests/sharding/aggregation_currentop.js index 652eaa0de63..689e57ea20c 100644 --- a/jstests/sharding/aggregation_currentop.js +++ b/jstests/sharding/aggregation_currentop.js @@ -570,7 +570,7 @@ // Test that attempting to 'spoof' a sharded request on non-shardsvr mongoD fails. assert.commandFailedWithCode( shardAdminDB.runCommand( - {aggregate: 1, pipeline: [{$currentOp: {}}], fromRouter: true, cursor: {}}), + {aggregate: 1, pipeline: [{$currentOp: {}}], fromMongos: true, cursor: {}}), 40465); // Test that an operation which is at the BSON user size limit does not throw an error when the diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp index 753cc2bfebd..3350f43e4b5 100644 --- a/src/mongo/db/commands/run_aggregate.cpp +++ b/src/mongo/db/commands/run_aggregate.cpp @@ -427,11 +427,11 @@ Status runAggregate(OperationContext* opCtx, pipeline->optimizePipeline(); - if (kDebugBuild && !expCtx->explain && !expCtx->fromRouter) { + if (kDebugBuild && !expCtx->explain && !expCtx->fromMongos) { // Make sure all operations round-trip through Pipeline::serialize() correctly by // re-parsing every command in debug builds. This is important because sharded - // aggregations rely on this ability. Skipping when fromRouter because this has - // already been through the transformation (and this un-sets expCtx->fromRouter). + // aggregations rely on this ability. Skipping when fromMongos because this has + // already been through the transformation (and this un-sets expCtx->fromMongos). pipeline = reparsePipeline(pipeline.get(), request, expCtx); } diff --git a/src/mongo/db/exec/pipeline_proxy.cpp b/src/mongo/db/exec/pipeline_proxy.cpp index 4b291b59d89..b814a2eee0f 100644 --- a/src/mongo/db/exec/pipeline_proxy.cpp +++ b/src/mongo/db/exec/pipeline_proxy.cpp @@ -52,6 +52,7 @@ PipelineProxyStage::PipelineProxyStage(OperationContext* opCtx, : PlanStage(kStageType, opCtx), _pipeline(std::move(pipeline)), _includeMetaData(_pipeline->getContext()->needsMerge), // send metadata to merger + _includeSortKey(_includeMetaData && !_pipeline->getContext()->from34Mongos), _ws(ws) { // We take over responsibility for disposing of the Pipeline, since it is required that // doDispose() will be called before destruction of this PipelineProxyStage. @@ -117,7 +118,7 @@ unique_ptr<PlanStageStats> PipelineProxyStage::getStats() { boost::optional<BSONObj> PipelineProxyStage::getNextBson() { if (auto next = _pipeline->getNext()) { if (_includeMetaData) { - return next->toBsonWithMetaData(); + return next->toBsonWithMetaData(_includeSortKey); } else { return next->toBson(); } diff --git a/src/mongo/db/exec/pipeline_proxy.h b/src/mongo/db/exec/pipeline_proxy.h index ab33c963fa1..bb6c6645eb1 100644 --- a/src/mongo/db/exec/pipeline_proxy.h +++ b/src/mongo/db/exec/pipeline_proxy.h @@ -94,6 +94,12 @@ private: std::vector<BSONObj> _stash; const bool _includeMetaData; + // When the aggregation request is from a 3.4 mongos, the merge may happen on a 3.4 shard (which + // does not understand sort key metadata), so we should not serialize the sort key, and + // '_includeSortKey' is set to false. + // TODO SERVER-30924: remove this. + const bool _includeSortKey; + // Not owned by us. WorkingSet* _ws; }; diff --git a/src/mongo/db/pipeline/aggregation_request.cpp b/src/mongo/db/pipeline/aggregation_request.cpp index 4dbb785fb10..7c0e6401cd2 100644 --- a/src/mongo/db/pipeline/aggregation_request.cpp +++ b/src/mongo/db/pipeline/aggregation_request.cpp @@ -50,8 +50,9 @@ namespace mongo { constexpr StringData AggregationRequest::kCommandName; constexpr StringData AggregationRequest::kCursorName; constexpr StringData AggregationRequest::kBatchSizeName; -constexpr StringData AggregationRequest::kFromRouterName; +constexpr StringData AggregationRequest::kFromMongosName; constexpr StringData AggregationRequest::kNeedsMergeName; +constexpr StringData AggregationRequest::kNeedsMerge34Name; constexpr StringData AggregationRequest::kPipelineName; constexpr StringData AggregationRequest::kCollationName; constexpr StringData AggregationRequest::kExplainName; @@ -107,6 +108,10 @@ StatusWith<AggregationRequest> AggregationRequest::parseFromBSON( bool hasCursorElem = false; bool hasExplainElem = false; + bool hasFromMongosElem = false; + bool hasNeedsMergeElem = false; + bool hasNeedsMerge34Elem = false; + // Parse optional parameters. for (auto&& elem : cmdObj) { auto fieldName = elem.fieldNameStringData(); @@ -180,20 +185,35 @@ StatusWith<AggregationRequest> AggregationRequest::parseFromBSON( if (elem.Bool()) { request.setExplain(ExplainOptions::Verbosity::kQueryPlanner); } - } else if (kFromRouterName == fieldName) { + } else if (kFromMongosName == fieldName) { if (elem.type() != BSONType::Bool) { return {ErrorCodes::TypeMismatch, - str::stream() << kFromRouterName << " must be a boolean, not a " + str::stream() << kFromMongosName << " must be a boolean, not a " << typeName(elem.type())}; } - request.setFromRouter(elem.Bool()); + + hasFromMongosElem = true; + request.setFromMongos(elem.Bool()); } else if (kNeedsMergeName == fieldName) { if (elem.type() != BSONType::Bool) { return {ErrorCodes::TypeMismatch, str::stream() << kNeedsMergeName << " must be a boolean, not a " << typeName(elem.type())}; } + + hasNeedsMergeElem = true; + request.setNeedsMerge(elem.Bool()); + } else if (kNeedsMerge34Name == fieldName) { + if (elem.type() != BSONType::Bool) { + return {ErrorCodes::TypeMismatch, + str::stream() << kNeedsMerge34Name << " must be a boolean, not a " + << typeName(elem.type())}; + } + + hasNeedsMerge34Elem = true; request.setNeedsMerge(elem.Bool()); + request.setFromMongos(elem.Bool()); + request.setFrom34Mongos(elem.Bool()); } else if (kAllowDiskUseName == fieldName) { if (storageGlobalParams.readOnly) { return {ErrorCodes::IllegalOperation, @@ -248,6 +268,30 @@ StatusWith<AggregationRequest> AggregationRequest::parseFromBSON( << "' option"}; } + if (hasNeedsMergeElem && !hasFromMongosElem) { + return {ErrorCodes::FailedToParse, + str::stream() << "Cannot specify '" << kNeedsMergeName << "' without '" + << kFromMongosName + << "'"}; + } + + // If 'fromRouter' is specified, the request is from a 3.4 mongos, so we do not expect + // 'fromMongos' or 'needsMerge' to be specified. + if (hasNeedsMerge34Elem) { + if (hasNeedsMergeElem) { + return {ErrorCodes::FailedToParse, + str::stream() << "Cannot specify both '" << kNeedsMergeName << "' and '" + << kNeedsMerge34Name + << "'"}; + } + if (hasFromMongosElem) { + return {ErrorCodes::FailedToParse, + str::stream() << "Cannot specify both '" << kFromMongosName << "' and '" + << kNeedsMerge34Name + << "'"}; + } + } + return request; } @@ -284,7 +328,7 @@ Document AggregationRequest::serializeToCommandObj() const { {kPipelineName, _pipeline}, // Only serialize booleans if different than their default. {kAllowDiskUseName, _allowDiskUse ? Value(true) : Value()}, - {kFromRouterName, _fromRouter ? Value(true) : Value()}, + {kFromMongosName, _fromMongos ? Value(true) : Value()}, {kNeedsMergeName, _needsMerge ? Value(true) : Value()}, {bypassDocumentValidationCommandOption(), _bypassDocumentValidation ? Value(true) : Value()}, diff --git a/src/mongo/db/pipeline/aggregation_request.h b/src/mongo/db/pipeline/aggregation_request.h index d8672b986d7..b34fc383d20 100644 --- a/src/mongo/db/pipeline/aggregation_request.h +++ b/src/mongo/db/pipeline/aggregation_request.h @@ -50,8 +50,9 @@ public: static constexpr StringData kCommandName = "aggregate"_sd; static constexpr StringData kCursorName = "cursor"_sd; static constexpr StringData kBatchSizeName = "batchSize"_sd; - static constexpr StringData kFromRouterName = "fromRouter"_sd; + static constexpr StringData kFromMongosName = "fromMongos"_sd; static constexpr StringData kNeedsMergeName = "needsMerge"_sd; + static constexpr StringData kNeedsMerge34Name = "fromRouter"_sd; static constexpr StringData kPipelineName = "pipeline"_sd; static constexpr StringData kCollationName = "collation"_sd; static constexpr StringData kExplainName = "explain"_sd; @@ -135,8 +136,15 @@ public: /** * Returns true if this request originated from a mongoS. */ - bool isFromRouter() const { - return _fromRouter; + bool isFromMongos() const { + return _fromMongos; + } + + /** + * Returns true if this request originated from a 3.4 mongos. + */ + bool isFrom34Mongos() const { + return _from34Mongos; } /** @@ -218,8 +226,12 @@ public: _allowDiskUse = allowDiskUse; } - void setFromRouter(bool isFromRouter) { - _fromRouter = isFromRouter; + void setFromMongos(bool isFromMongos) { + _fromMongos = isFromMongos; + } + + void setFrom34Mongos(bool isFrom34Mongos) { + _from34Mongos = isFrom34Mongos; } void setNeedsMerge(bool needsMerge) { @@ -276,10 +288,16 @@ private: boost::optional<ExplainOptions::Verbosity> _explainMode; bool _allowDiskUse = false; - bool _fromRouter = false; + bool _fromMongos = false; bool _needsMerge = false; bool _bypassDocumentValidation = false; + // We track whether the aggregation request came from a 3.4 mongos. If so, the merge may occur + // on a 3.4 shard (which does not understand sort key metadata), and we should not serialize the + // sort key. + // TODO SERVER-30924: remove this. + bool _from34Mongos = false; + // A user-specified maxTimeMS limit, or a value of '0' if not specified. unsigned int _maxTimeMS = 0; }; diff --git a/src/mongo/db/pipeline/aggregation_request_test.cpp b/src/mongo/db/pipeline/aggregation_request_test.cpp index 8192e0fed3e..312e8158121 100644 --- a/src/mongo/db/pipeline/aggregation_request_test.cpp +++ b/src/mongo/db/pipeline/aggregation_request_test.cpp @@ -56,14 +56,14 @@ const Document kDefaultCursorOptionDocument{ TEST(AggregationRequestTest, ShouldParseAllKnownOptions) { NamespaceString nss("a.collection"); const BSONObj inputBson = fromjson( - "{pipeline: [{$match: {a: 'abc'}}], explain: false, allowDiskUse: true, fromRouter: true, " + "{pipeline: [{$match: {a: 'abc'}}], explain: false, allowDiskUse: true, fromMongos: true, " "needsMerge: true, bypassDocumentValidation: true, collation: {locale: 'en_US'}, cursor: " "{batchSize: 10}, hint: {a: 1}, maxTimeMS: 100, readConcern: {level: 'linearizable'}, " "$queryOptions: {$readPreference: 'nearest'}, comment: 'agg_comment'}}"); auto request = unittest::assertGet(AggregationRequest::parseFromBSON(nss, inputBson)); ASSERT_FALSE(request.getExplain()); ASSERT_TRUE(request.shouldAllowDiskUse()); - ASSERT_TRUE(request.isFromRouter()); + ASSERT_TRUE(request.isFromMongos()); ASSERT_TRUE(request.needsMerge()); ASSERT_TRUE(request.shouldBypassDocumentValidation()); ASSERT_EQ(request.getBatchSize(), 10); @@ -81,6 +81,16 @@ TEST(AggregationRequestTest, ShouldParseAllKnownOptions) { << "nearest")); } +TEST(AggregationRequestTest, ShouldParseNeedsMerge34) { + NamespaceString nss("a.collection"); + const BSONObj inputBson = + fromjson("{pipeline: [{$match: {a: 'abc'}}], cursor: {}, fromRouter: true}"); + auto request = unittest::assertGet(AggregationRequest::parseFromBSON(nss, inputBson)); + ASSERT_TRUE(request.needsMerge()); + ASSERT_TRUE(request.isFromMongos()); + ASSERT_TRUE(request.isFrom34Mongos()); +} + TEST(AggregationRequestTest, ShouldParseExplicitExplainTrue) { NamespaceString nss("a.collection"); const BSONObj inputBson = fromjson("{pipeline: [], explain: true, cursor: {}}"); @@ -136,7 +146,7 @@ TEST(AggregationRequestTest, ShouldNotSerializeOptionalValuesIfEquivalentToDefau AggregationRequest request(nss, {}); request.setExplain(boost::none); request.setAllowDiskUse(false); - request.setFromRouter(false); + request.setFromMongos(false); request.setNeedsMerge(false); request.setBypassDocumentValidation(false); request.setCollation(BSONObj()); @@ -157,7 +167,7 @@ TEST(AggregationRequestTest, ShouldSerializeOptionalValuesIfSet) { NamespaceString nss("a.collection"); AggregationRequest request(nss, {}); request.setAllowDiskUse(true); - request.setFromRouter(true); + request.setFromMongos(true); request.setNeedsMerge(true); request.setBypassDocumentValidation(true); request.setBatchSize(10); @@ -180,7 +190,7 @@ TEST(AggregationRequestTest, ShouldSerializeOptionalValuesIfSet) { Document{{AggregationRequest::kCommandName, nss.coll()}, {AggregationRequest::kPipelineName, Value(std::vector<Value>{})}, {AggregationRequest::kAllowDiskUseName, true}, - {AggregationRequest::kFromRouterName, true}, + {AggregationRequest::kFromMongosName, true}, {AggregationRequest::kNeedsMergeName, true}, {bypassDocumentValidationCommandOption(), true}, {AggregationRequest::kCollationName, collationObj}, @@ -317,17 +327,46 @@ TEST(AggregationRequestTest, ShouldRejectExplainIfObject) { ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus()); } -TEST(AggregationRequestTest, ShouldRejectNonBoolFromRouter) { +TEST(AggregationRequestTest, ShouldRejectNonBoolFromMongos) { NamespaceString nss("a.collection"); const BSONObj inputBson = - fromjson("{pipeline: [{$match: {a: 'abc'}}], cursor: {}, fromRouter: 1}"); + fromjson("{pipeline: [{$match: {a: 'abc'}}], cursor: {}, fromMongos: 1}"); ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus()); } TEST(AggregationRequestTest, ShouldRejectNonBoolNeedsMerge) { NamespaceString nss("a.collection"); const BSONObj inputBson = - fromjson("{pipeline: [{$match: {a: 'abc'}}], cursor: {}, needsMerge: 1}"); + fromjson("{pipeline: [{$match: {a: 'abc'}}], cursor: {}, needsMerge: 1, fromMongos: true}"); + ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus()); +} + +TEST(AggregationRequestTest, ShouldRejectNeedsMergeIfFromMongosNotPresent) { + NamespaceString nss("a.collection"); + const BSONObj inputBson = + fromjson("{pipeline: [{$match: {a: 'abc'}}], cursor: {}, needsMerge: true}"); + ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus()); +} + +TEST(AggregationRequestTest, ShouldRejectNonBoolNeedsMerge34) { + NamespaceString nss("a.collection"); + const BSONObj inputBson = + fromjson("{pipeline: [{$match: {a: 'abc'}}], cursor: {}, fromRouter: 1}"); + ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus()); +} + +TEST(AggregationRequestTest, ShouldRejectNeedsMergeIfNeedsMerge34AlsoPresent) { + NamespaceString nss("a.collection"); + const BSONObj inputBson = fromjson( + "{pipeline: [{$match: {a: 'abc'}}], cursor: {}, needsMerge: true, fromMongos: true, " + "fromRouter: true}"); + ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus()); +} + +TEST(AggregationRequestTest, ShouldRejectFromMongosIfNeedsMerge34AlsoPresent) { + NamespaceString nss("a.collection"); + const BSONObj inputBson = fromjson( + "{pipeline: [{$match: {a: 'abc'}}], cursor: {}, fromMongos: true, fromRouter: true}"); ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus()); } diff --git a/src/mongo/db/pipeline/document.cpp b/src/mongo/db/pipeline/document.cpp index 4f531fe72f3..f92248d7508 100644 --- a/src/mongo/db/pipeline/document.cpp +++ b/src/mongo/db/pipeline/document.cpp @@ -273,14 +273,14 @@ constexpr StringData Document::metaFieldTextScore; constexpr StringData Document::metaFieldRandVal; constexpr StringData Document::metaFieldSortKey; -BSONObj Document::toBsonWithMetaData() const { +BSONObj Document::toBsonWithMetaData(bool includeSortKey) const { BSONObjBuilder bb; toBson(&bb); if (hasTextScore()) bb.append(metaFieldTextScore, getTextScore()); if (hasRandMetaField()) bb.append(metaFieldRandVal, getRandMetaField()); - if (hasSortKeyMetaField()) + if (includeSortKey && hasSortKeyMetaField()) bb.append(metaFieldSortKey, getSortKeyMetaField()); return bb.obj(); } diff --git a/src/mongo/db/pipeline/document.h b/src/mongo/db/pipeline/document.h index a8ebfd1c4d2..0df446a797a 100644 --- a/src/mongo/db/pipeline/document.h +++ b/src/mongo/db/pipeline/document.h @@ -208,7 +208,7 @@ public: * Like toBson, but includes metadata at the top-level. * Output is parseable by fromBsonWithMetaData */ - BSONObj toBsonWithMetaData() const; + BSONObj toBsonWithMetaData(bool includeSortKey = true) const; /** * Like Document(BSONObj) but treats top-level fields with special names as metadata. diff --git a/src/mongo/db/pipeline/document_source_bucket_auto.cpp b/src/mongo/db/pipeline/document_source_bucket_auto.cpp index e1385b19c68..e874fd5d426 100644 --- a/src/mongo/db/pipeline/document_source_bucket_auto.cpp +++ b/src/mongo/db/pipeline/document_source_bucket_auto.cpp @@ -91,7 +91,7 @@ DocumentSource::GetNextResult DocumentSourceBucketAuto::populateSorter() { if (!_sorter) { SortOptions opts; opts.maxMemoryUsageBytes = _maxMemoryUsageBytes; - if (pExpCtx->extSortAllowed && !pExpCtx->inRouter) { + if (pExpCtx->extSortAllowed && !pExpCtx->inMongos) { opts.extSortAllowed = true; opts.tempDir = pExpCtx->tempDir; } diff --git a/src/mongo/db/pipeline/document_source_bucket_auto_test.cpp b/src/mongo/db/pipeline/document_source_bucket_auto_test.cpp index 333a130a0ae..08926254480 100644 --- a/src/mongo/db/pipeline/document_source_bucket_auto_test.cpp +++ b/src/mongo/db/pipeline/document_source_bucket_auto_test.cpp @@ -648,15 +648,15 @@ TEST_F(BucketAutoTests, ShouldFailIfBufferingTooManyDocuments) { auto expCtx = getExpCtx(); expCtx->extSortAllowed = false; - expCtx->inRouter = false; + expCtx->inMongos = false; assertCannotSpillToDisk(expCtx); expCtx->extSortAllowed = true; - expCtx->inRouter = true; + expCtx->inMongos = true; assertCannotSpillToDisk(expCtx); expCtx->extSortAllowed = false; - expCtx->inRouter = true; + expCtx->inMongos = true; assertCannotSpillToDisk(expCtx); } diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp index 6b73b4b5084..3b318e6d19a 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream.cpp @@ -222,7 +222,7 @@ list<intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::createFromBson( BSONElement elem, const intrusive_ptr<ExpressionContext>& expCtx) { // TODO: Add sharding support here (SERVER-29141). uassert( - 40470, "The $changeStream stage is not supported on sharded systems.", !expCtx->inRouter); + 40470, "The $changeStream stage is not supported on sharded systems.", !expCtx->inMongos); uassert(40471, "Only default collation is allowed when using a $changeStream stage.", !expCtx->getCollator()); diff --git a/src/mongo/db/pipeline/document_source_current_op.cpp b/src/mongo/db/pipeline/document_source_current_op.cpp index 7b22cdce75d..ce77e3da6ae 100644 --- a/src/mongo/db/pipeline/document_source_current_op.cpp +++ b/src/mongo/db/pipeline/document_source_current_op.cpp @@ -95,18 +95,18 @@ DocumentSource::GetNextResult DocumentSourceCurrentOp::getNext() { _opsIter = _ops.begin(); - if (pExpCtx->fromRouter) { + if (pExpCtx->fromMongos) { _shardName = _mongod->getShardName(pExpCtx->opCtx); uassert(40465, - "Aggregation request specified 'fromRouter' but unable to retrieve shard name " + "Aggregation request specified 'fromMongos' but unable to retrieve shard name " "for $currentOp pipeline stage.", !_shardName.empty()); } } if (_opsIter != _ops.end()) { - if (!pExpCtx->fromRouter) { + if (!pExpCtx->fromMongos) { return Document(*_opsIter++); } diff --git a/src/mongo/db/pipeline/document_source_current_op_test.cpp b/src/mongo/db/pipeline/document_source_current_op_test.cpp index b852c94ffbe..a52453f8aa1 100644 --- a/src/mongo/db/pipeline/document_source_current_op_test.cpp +++ b/src/mongo/db/pipeline/document_source_current_op_test.cpp @@ -191,7 +191,7 @@ TEST_F(DocumentSourceCurrentOpTest, ShouldReturnEOFImmediatelyIfNoCurrentOps) { TEST_F(DocumentSourceCurrentOpTest, ShouldAddShardNameModifyOpIDAndClientFieldNameInShardedContext) { - getExpCtx()->fromRouter = true; + getExpCtx()->fromMongos = true; std::vector<BSONObj> ops{fromjson("{ client: '192.168.1.10:50844', opid: 430 }")}; const auto mongod = std::make_shared<MockMongodImplementation>(ops); @@ -209,7 +209,7 @@ TEST_F(DocumentSourceCurrentOpTest, TEST_F(DocumentSourceCurrentOpTest, ShouldReturnOpIDAndClientFieldNameUnmodifiedWhenNotInShardedContext) { - getExpCtx()->fromRouter = false; + getExpCtx()->fromMongos = false; std::vector<BSONObj> ops{fromjson("{ client: '192.168.1.10:50844', opid: 430 }")}; const auto mongod = std::make_shared<MockMongodImplementation>(ops); @@ -224,7 +224,7 @@ TEST_F(DocumentSourceCurrentOpTest, } TEST_F(DocumentSourceCurrentOpTest, ShouldFailIfNoShardNameAvailableForShardedRequest) { - getExpCtx()->fromRouter = true; + getExpCtx()->fromMongos = true; const auto mongod = std::make_shared<MockMongodImplementation>(false); @@ -235,7 +235,7 @@ TEST_F(DocumentSourceCurrentOpTest, ShouldFailIfNoShardNameAvailableForShardedRe } TEST_F(DocumentSourceCurrentOpTest, ShouldFailIfOpIDIsNonNumericWhenModifyingInShardedContext) { - getExpCtx()->fromRouter = true; + getExpCtx()->fromMongos = true; std::vector<BSONObj> ops{fromjson("{ client: '192.168.1.10:50844', opid: 'string' }")}; const auto mongod = std::make_shared<MockMongodImplementation>(ops); diff --git a/src/mongo/db/pipeline/document_source_group.cpp b/src/mongo/db/pipeline/document_source_group.cpp index de75a225f70..ba2c48680f2 100644 --- a/src/mongo/db/pipeline/document_source_group.cpp +++ b/src/mongo/db/pipeline/document_source_group.cpp @@ -266,7 +266,7 @@ DocumentSourceGroup::DocumentSourceGroup(const intrusive_ptr<ExpressionContext>& _initialized(false), _groups(pExpCtx->getValueComparator().makeUnorderedValueMap<Accumulators>()), _spilled(false), - _extSortAllowed(pExpCtx->extSortAllowed && !pExpCtx->inRouter) {} + _extSortAllowed(pExpCtx->extSortAllowed && !pExpCtx->inMongos) {} void DocumentSourceGroup::addAccumulator(AccumulationStatement accumulationStatement) { _accumulatedFields.push_back(accumulationStatement); @@ -530,7 +530,7 @@ DocumentSource::GetNextResult DocumentSourceGroup::initialize() { if (kDebugBuild && !storageGlobalParams.readOnly) { // In debug mode, spill every time we have a duplicate id to stress merge logic. if (!inserted && // is a dup - !pExpCtx->inRouter && // can't spill to disk in router + !pExpCtx->inMongos && // can't spill to disk in mongos !_extSortAllowed && // don't change behavior when testing external sort _sortedFiles.size() < 20) { // don't open too many FDs diff --git a/src/mongo/db/pipeline/document_source_group_test.cpp b/src/mongo/db/pipeline/document_source_group_test.cpp index 34e5a753e7a..5cc5ba93be4 100644 --- a/src/mongo/db/pipeline/document_source_group_test.cpp +++ b/src/mongo/db/pipeline/document_source_group_test.cpp @@ -70,7 +70,7 @@ using DocumentSourceGroupTest = AggregationContextFixture; TEST_F(DocumentSourceGroupTest, ShouldBeAbleToPauseLoading) { auto expCtx = getExpCtx(); - expCtx->inRouter = true; // Disallow external sort. + expCtx->inMongos = true; // Disallow external sort. // This is the only way to do this in a debug build. AccumulationStatement countStatement{"count", ExpressionConstant::create(expCtx, Value(1)), @@ -142,7 +142,7 @@ TEST_F(DocumentSourceGroupTest, ShouldBeAbleToPauseLoadingWhileSpilled) { TEST_F(DocumentSourceGroupTest, ShouldErrorIfNotAllowedToSpillToDiskAndResultSetIsTooLarge) { auto expCtx = getExpCtx(); const size_t maxMemoryUsageBytes = 1000; - expCtx->inRouter = true; // Disallow external sort. + expCtx->inMongos = true; // Disallow external sort. // This is the only way to do this in a debug build. VariablesParseState vps = expCtx->variablesParseState; @@ -164,7 +164,7 @@ TEST_F(DocumentSourceGroupTest, ShouldErrorIfNotAllowedToSpillToDiskAndResultSet TEST_F(DocumentSourceGroupTest, ShouldCorrectlyTrackMemoryUsageBetweenPauses) { auto expCtx = getExpCtx(); const size_t maxMemoryUsageBytes = 1000; - expCtx->inRouter = true; // Disallow external sort. + expCtx->inMongos = true; // Disallow external sort. // This is the only way to do this in a debug build. VariablesParseState vps = expCtx->variablesParseState; @@ -206,15 +206,15 @@ public: _tempDir("DocumentSourceGroupTest") {} protected: - void createGroup(const BSONObj& spec, bool inShard = false, bool inRouter = false) { + void createGroup(const BSONObj& spec, bool inShard = false, bool inMongos = false) { BSONObj namedSpec = BSON("$group" << spec); BSONElement specElement = namedSpec.firstElement(); intrusive_ptr<ExpressionContextForTest> expressionContext = new ExpressionContextForTest(_opCtx.get(), AggregationRequest(NamespaceString(ns), {})); - // For $group, 'inShard' implies 'fromRouter' and 'needsMerge'. - expressionContext->fromRouter = expressionContext->needsMerge = inShard; - expressionContext->inRouter = inRouter; + // For $group, 'inShard' implies 'fromMongos' and 'needsMerge'. + expressionContext->fromMongos = expressionContext->needsMerge = inShard; + expressionContext->inMongos = inMongos; // Won't spill to disk properly if it needs to. expressionContext->tempDir = _tempDir.path(); @@ -1012,7 +1012,7 @@ public: // We pretend to be in the router so that we don't spill to disk, because this produces // inconsistent output on debug vs. non-debug builds. - const bool inRouter = true; + const bool inMongos = true; const bool inShard = false; createGroup(BSON("_id" << BSON("x" @@ -1020,7 +1020,7 @@ public: << "y" << "$b")), inShard, - inRouter); + inMongos); group()->setSource(source.get()); group()->getNext(); @@ -1039,7 +1039,7 @@ public: // We pretend to be in the router so that we don't spill to disk, because this produces // inconsistent output on debug vs. non-debug builds. - const bool inRouter = true; + const bool inMongos = true; const bool inShard = false; createGroup(BSON("_id" << BSON("a" @@ -1047,7 +1047,7 @@ public: << "b" << "$a")), inShard, - inRouter); + inMongos); group()->setSource(source.get()); group()->getNext(); @@ -1066,10 +1066,10 @@ public: // We pretend to be in the router so that we don't spill to disk, because this produces // inconsistent output on debug vs. non-debug builds. - const bool inRouter = true; + const bool inMongos = true; const bool inShard = false; - createGroup(fromjson("{_id: {$sum: ['$a', '$b']}}"), inShard, inRouter); + createGroup(fromjson("{_id: {$sum: ['$a', '$b']}}"), inShard, inMongos); group()->setSource(source.get()); group()->getNext(); diff --git a/src/mongo/db/pipeline/document_source_sort.cpp b/src/mongo/db/pipeline/document_source_sort.cpp index 54cf67a97a6..97923ff8118 100644 --- a/src/mongo/db/pipeline/document_source_sort.cpp +++ b/src/mongo/db/pipeline/document_source_sort.cpp @@ -304,7 +304,7 @@ SortOptions DocumentSourceSort::makeSortOptions() const { opts.limit = limitSrc->getLimit(); opts.maxMemoryUsageBytes = _maxMemoryUsageBytes; - if (pExpCtx->extSortAllowed && !pExpCtx->inRouter) { + if (pExpCtx->extSortAllowed && !pExpCtx->inMongos) { opts.extSortAllowed = true; opts.tempDir = pExpCtx->tempDir; } diff --git a/src/mongo/db/pipeline/expression_context.cpp b/src/mongo/db/pipeline/expression_context.cpp index f2dd5ab59cf..0a6005c73b0 100644 --- a/src/mongo/db/pipeline/expression_context.cpp +++ b/src/mongo/db/pipeline/expression_context.cpp @@ -44,10 +44,11 @@ ExpressionContext::ExpressionContext(OperationContext* opCtx, std::unique_ptr<CollatorInterface> collator, StringMap<ResolvedNamespace> resolvedNamespaces) : explain(request.getExplain()), - fromRouter(request.isFromRouter()), + fromMongos(request.isFromMongos()), needsMerge(request.needsMerge()), extSortAllowed(request.shouldAllowDiskUse()), bypassDocumentValidation(request.shouldBypassDocumentValidation()), + from34Mongos(request.isFrom34Mongos()), ns(request.getNamespaceString()), opCtx(opCtx), collation(request.getCollation()), @@ -78,8 +79,9 @@ intrusive_ptr<ExpressionContext> ExpressionContext::copyWith(NamespaceString ns) expCtx->explain = explain; expCtx->needsMerge = needsMerge; - expCtx->fromRouter = fromRouter; - expCtx->inRouter = inRouter; + expCtx->fromMongos = fromMongos; + expCtx->from34Mongos = from34Mongos; + expCtx->inMongos = inMongos; expCtx->extSortAllowed = extSortAllowed; expCtx->bypassDocumentValidation = bypassDocumentValidation; diff --git a/src/mongo/db/pipeline/expression_context.h b/src/mongo/db/pipeline/expression_context.h index c67ba4ab9b8..ad7052968c1 100644 --- a/src/mongo/db/pipeline/expression_context.h +++ b/src/mongo/db/pipeline/expression_context.h @@ -113,12 +113,18 @@ public: // The explain verbosity requested by the user, or boost::none if no explain was requested. boost::optional<ExplainOptions::Verbosity> explain; - bool fromRouter = false; + bool fromMongos = false; bool needsMerge = false; - bool inRouter = false; + bool inMongos = false; bool extSortAllowed = false; bool bypassDocumentValidation = false; + // We track whether the aggregation request came from a 3.4 mongos. If so, the merge may occur + // on a 3.4 shard (which does not understand sort key metadata), and we should not serialize the + // sort key. + // TODO SERVER-30924: remove this. + bool from34Mongos = false; + NamespaceString ns; std::string tempDir; // Defaults to empty to prevent external sorting in mongos. diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp index 1bdbd85e850..fc8bab48bf4 100644 --- a/src/mongo/s/commands/cluster_aggregate.cpp +++ b/src/mongo/s/commands/cluster_aggregate.cpp @@ -185,7 +185,7 @@ BSONObj createCommandForTargetedShards( const std::unique_ptr<Pipeline, Pipeline::Deleter>& pipelineForTargetedShards) { // Create the command for the shards. MutableDocument targetedCmd(request.serializeToCommandObj()); - targetedCmd[AggregationRequest::kFromRouterName] = Value(true); + targetedCmd[AggregationRequest::kFromMongosName] = Value(true); // If 'pipelineForTargetedShards' is 'nullptr', this is an unsharded direct passthrough. if (pipelineForTargetedShards) { @@ -221,7 +221,7 @@ BSONObj createCommandForMergingShard( MutableDocument mergeCmd(request.serializeToCommandObj()); mergeCmd["pipeline"] = Value(pipelineForMerging->serialize()); - mergeCmd[AggregationRequest::kFromRouterName] = Value(true); + mergeCmd[AggregationRequest::kFromMongosName] = Value(true); mergeCmd["writeConcern"] = Value(originalCmdObj["writeConcern"]); // If the user didn't specify a collation already, make sure there's a collation attached to @@ -451,7 +451,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, boost::intrusive_ptr<ExpressionContext> mergeCtx = new ExpressionContext(opCtx, request, std::move(collation), std::move(resolvedNamespaces)); - mergeCtx->inRouter = true; + mergeCtx->inMongos = true; // explicitly *not* setting mergeCtx->tempDir auto pipeline = uassertStatusOK(Pipeline::parse(request.getPipeline(), mergeCtx)); @@ -678,7 +678,7 @@ Status ClusterAggregate::aggPassthrough(OperationContext* opCtx, } auto shard = std::move(swShard.getValue()); - // Format the command for the shard. This adds the 'fromRouter' field, wraps the command as an + // Format the command for the shard. This adds the 'fromMongos' field, wraps the command as an // explain if necessary, and rewrites the result into a format safe to forward to shards. cmdObj = Command::filterCommandRequestForPassthrough( createCommandForTargetedShards(aggRequest, cmdObj, nullptr)); |