summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTess Avitabile <tess.avitabile@mongodb.com>2017-09-01 14:06:58 -0400
committerTess Avitabile <tess.avitabile@mongodb.com>2017-09-06 13:41:32 -0400
commited601dd01169b8c1fad9fb8d388da0523a1b48f5 (patch)
tree4d08bd1a36a12967fcb098432709da07236026b3
parent456ba544978a0d41a2261bf65da686874fb631a2 (diff)
downloadmongo-ed601dd01169b8c1fad9fb8d388da0523a1b48f5.tar.gz
SERVER-30899 Aggregations sent from a 3.4 mongos should serialize 3.4 metadata
-rw-r--r--buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml2
-rw-r--r--jstests/sharding/agg_sort.js2
-rw-r--r--jstests/sharding/aggregation_currentop.js2
-rw-r--r--src/mongo/db/commands/run_aggregate.cpp6
-rw-r--r--src/mongo/db/exec/pipeline_proxy.cpp3
-rw-r--r--src/mongo/db/exec/pipeline_proxy.h6
-rw-r--r--src/mongo/db/pipeline/aggregation_request.cpp54
-rw-r--r--src/mongo/db/pipeline/aggregation_request.h30
-rw-r--r--src/mongo/db/pipeline/aggregation_request_test.cpp55
-rw-r--r--src/mongo/db/pipeline/document.cpp4
-rw-r--r--src/mongo/db/pipeline/document.h2
-rw-r--r--src/mongo/db/pipeline/document_source_bucket_auto.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_bucket_auto_test.cpp6
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_current_op.cpp6
-rw-r--r--src/mongo/db/pipeline/document_source_current_op_test.cpp8
-rw-r--r--src/mongo/db/pipeline/document_source_group.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_group_test.cpp26
-rw-r--r--src/mongo/db/pipeline/document_source_sort.cpp2
-rw-r--r--src/mongo/db/pipeline/expression_context.cpp8
-rw-r--r--src/mongo/db/pipeline/expression_context.h10
-rw-r--r--src/mongo/s/commands/cluster_aggregate.cpp8
22 files changed, 181 insertions, 67 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
index 52937a2116a..8bcc55f14f2 100644
--- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
+++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
@@ -4,8 +4,6 @@ selector:
roots:
- jstests/sharding/*.js
exclude_files:
- # SERVER-30899 We changed the meaning of 'fromRouter' - impacting sorts on metadata.
- - jstests/sharding/agg_sort.js
# Doesn't use ShardingTest so won't actually be run in a mixed version configuration
- jstests/sharding/config_version_rollback.js
# Behavior change to addShard
diff --git a/jstests/sharding/agg_sort.js b/jstests/sharding/agg_sort.js
index bd12565dd20..2f2503e63e1 100644
--- a/jstests/sharding/agg_sort.js
+++ b/jstests/sharding/agg_sort.js
@@ -49,7 +49,7 @@
function assertResultsEqual({actual, expected}) {
const resultsAsString = " actual: " + tojson(actual) + "\n expected: " + tojson(expected);
assert.eq(
- actual.length, expected.length, `different number of results:\n" + ${resultsAsString}`);
+ actual.length, expected.length, `different number of results:\n${resultsAsString}`);
for (let i = 0; i < actual.length; i++) {
assert.eq(
actual[i], expected[i], `different results at index ${i}:\n${resultsAsString}`);
diff --git a/jstests/sharding/aggregation_currentop.js b/jstests/sharding/aggregation_currentop.js
index 652eaa0de63..689e57ea20c 100644
--- a/jstests/sharding/aggregation_currentop.js
+++ b/jstests/sharding/aggregation_currentop.js
@@ -570,7 +570,7 @@
// Test that attempting to 'spoof' a sharded request on non-shardsvr mongoD fails.
assert.commandFailedWithCode(
shardAdminDB.runCommand(
- {aggregate: 1, pipeline: [{$currentOp: {}}], fromRouter: true, cursor: {}}),
+ {aggregate: 1, pipeline: [{$currentOp: {}}], fromMongos: true, cursor: {}}),
40465);
// Test that an operation which is at the BSON user size limit does not throw an error when the
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp
index 753cc2bfebd..3350f43e4b5 100644
--- a/src/mongo/db/commands/run_aggregate.cpp
+++ b/src/mongo/db/commands/run_aggregate.cpp
@@ -427,11 +427,11 @@ Status runAggregate(OperationContext* opCtx,
pipeline->optimizePipeline();
- if (kDebugBuild && !expCtx->explain && !expCtx->fromRouter) {
+ if (kDebugBuild && !expCtx->explain && !expCtx->fromMongos) {
// Make sure all operations round-trip through Pipeline::serialize() correctly by
// re-parsing every command in debug builds. This is important because sharded
- // aggregations rely on this ability. Skipping when fromRouter because this has
- // already been through the transformation (and this un-sets expCtx->fromRouter).
+ // aggregations rely on this ability. Skipping when fromMongos because this has
+ // already been through the transformation (and this un-sets expCtx->fromMongos).
pipeline = reparsePipeline(pipeline.get(), request, expCtx);
}
diff --git a/src/mongo/db/exec/pipeline_proxy.cpp b/src/mongo/db/exec/pipeline_proxy.cpp
index 4b291b59d89..b814a2eee0f 100644
--- a/src/mongo/db/exec/pipeline_proxy.cpp
+++ b/src/mongo/db/exec/pipeline_proxy.cpp
@@ -52,6 +52,7 @@ PipelineProxyStage::PipelineProxyStage(OperationContext* opCtx,
: PlanStage(kStageType, opCtx),
_pipeline(std::move(pipeline)),
_includeMetaData(_pipeline->getContext()->needsMerge), // send metadata to merger
+ _includeSortKey(_includeMetaData && !_pipeline->getContext()->from34Mongos),
_ws(ws) {
// We take over responsibility for disposing of the Pipeline, since it is required that
// doDispose() will be called before destruction of this PipelineProxyStage.
@@ -117,7 +118,7 @@ unique_ptr<PlanStageStats> PipelineProxyStage::getStats() {
boost::optional<BSONObj> PipelineProxyStage::getNextBson() {
if (auto next = _pipeline->getNext()) {
if (_includeMetaData) {
- return next->toBsonWithMetaData();
+ return next->toBsonWithMetaData(_includeSortKey);
} else {
return next->toBson();
}
diff --git a/src/mongo/db/exec/pipeline_proxy.h b/src/mongo/db/exec/pipeline_proxy.h
index ab33c963fa1..bb6c6645eb1 100644
--- a/src/mongo/db/exec/pipeline_proxy.h
+++ b/src/mongo/db/exec/pipeline_proxy.h
@@ -94,6 +94,12 @@ private:
std::vector<BSONObj> _stash;
const bool _includeMetaData;
+ // When the aggregation request is from a 3.4 mongos, the merge may happen on a 3.4 shard (which
+ // does not understand sort key metadata), so we should not serialize the sort key, and
+ // '_includeSortKey' is set to false.
+ // TODO SERVER-30924: remove this.
+ const bool _includeSortKey;
+
// Not owned by us.
WorkingSet* _ws;
};
diff --git a/src/mongo/db/pipeline/aggregation_request.cpp b/src/mongo/db/pipeline/aggregation_request.cpp
index 4dbb785fb10..7c0e6401cd2 100644
--- a/src/mongo/db/pipeline/aggregation_request.cpp
+++ b/src/mongo/db/pipeline/aggregation_request.cpp
@@ -50,8 +50,9 @@ namespace mongo {
constexpr StringData AggregationRequest::kCommandName;
constexpr StringData AggregationRequest::kCursorName;
constexpr StringData AggregationRequest::kBatchSizeName;
-constexpr StringData AggregationRequest::kFromRouterName;
+constexpr StringData AggregationRequest::kFromMongosName;
constexpr StringData AggregationRequest::kNeedsMergeName;
+constexpr StringData AggregationRequest::kNeedsMerge34Name;
constexpr StringData AggregationRequest::kPipelineName;
constexpr StringData AggregationRequest::kCollationName;
constexpr StringData AggregationRequest::kExplainName;
@@ -107,6 +108,10 @@ StatusWith<AggregationRequest> AggregationRequest::parseFromBSON(
bool hasCursorElem = false;
bool hasExplainElem = false;
+ bool hasFromMongosElem = false;
+ bool hasNeedsMergeElem = false;
+ bool hasNeedsMerge34Elem = false;
+
// Parse optional parameters.
for (auto&& elem : cmdObj) {
auto fieldName = elem.fieldNameStringData();
@@ -180,20 +185,35 @@ StatusWith<AggregationRequest> AggregationRequest::parseFromBSON(
if (elem.Bool()) {
request.setExplain(ExplainOptions::Verbosity::kQueryPlanner);
}
- } else if (kFromRouterName == fieldName) {
+ } else if (kFromMongosName == fieldName) {
if (elem.type() != BSONType::Bool) {
return {ErrorCodes::TypeMismatch,
- str::stream() << kFromRouterName << " must be a boolean, not a "
+ str::stream() << kFromMongosName << " must be a boolean, not a "
<< typeName(elem.type())};
}
- request.setFromRouter(elem.Bool());
+
+ hasFromMongosElem = true;
+ request.setFromMongos(elem.Bool());
} else if (kNeedsMergeName == fieldName) {
if (elem.type() != BSONType::Bool) {
return {ErrorCodes::TypeMismatch,
str::stream() << kNeedsMergeName << " must be a boolean, not a "
<< typeName(elem.type())};
}
+
+ hasNeedsMergeElem = true;
+ request.setNeedsMerge(elem.Bool());
+ } else if (kNeedsMerge34Name == fieldName) {
+ if (elem.type() != BSONType::Bool) {
+ return {ErrorCodes::TypeMismatch,
+ str::stream() << kNeedsMerge34Name << " must be a boolean, not a "
+ << typeName(elem.type())};
+ }
+
+ hasNeedsMerge34Elem = true;
request.setNeedsMerge(elem.Bool());
+ request.setFromMongos(elem.Bool());
+ request.setFrom34Mongos(elem.Bool());
} else if (kAllowDiskUseName == fieldName) {
if (storageGlobalParams.readOnly) {
return {ErrorCodes::IllegalOperation,
@@ -248,6 +268,30 @@ StatusWith<AggregationRequest> AggregationRequest::parseFromBSON(
<< "' option"};
}
+ if (hasNeedsMergeElem && !hasFromMongosElem) {
+ return {ErrorCodes::FailedToParse,
+ str::stream() << "Cannot specify '" << kNeedsMergeName << "' without '"
+ << kFromMongosName
+ << "'"};
+ }
+
+ // If 'fromRouter' is specified, the request is from a 3.4 mongos, so we do not expect
+ // 'fromMongos' or 'needsMerge' to be specified.
+ if (hasNeedsMerge34Elem) {
+ if (hasNeedsMergeElem) {
+ return {ErrorCodes::FailedToParse,
+ str::stream() << "Cannot specify both '" << kNeedsMergeName << "' and '"
+ << kNeedsMerge34Name
+ << "'"};
+ }
+ if (hasFromMongosElem) {
+ return {ErrorCodes::FailedToParse,
+ str::stream() << "Cannot specify both '" << kFromMongosName << "' and '"
+ << kNeedsMerge34Name
+ << "'"};
+ }
+ }
+
return request;
}
@@ -284,7 +328,7 @@ Document AggregationRequest::serializeToCommandObj() const {
{kPipelineName, _pipeline},
// Only serialize booleans if different than their default.
{kAllowDiskUseName, _allowDiskUse ? Value(true) : Value()},
- {kFromRouterName, _fromRouter ? Value(true) : Value()},
+ {kFromMongosName, _fromMongos ? Value(true) : Value()},
{kNeedsMergeName, _needsMerge ? Value(true) : Value()},
{bypassDocumentValidationCommandOption(),
_bypassDocumentValidation ? Value(true) : Value()},
diff --git a/src/mongo/db/pipeline/aggregation_request.h b/src/mongo/db/pipeline/aggregation_request.h
index d8672b986d7..b34fc383d20 100644
--- a/src/mongo/db/pipeline/aggregation_request.h
+++ b/src/mongo/db/pipeline/aggregation_request.h
@@ -50,8 +50,9 @@ public:
static constexpr StringData kCommandName = "aggregate"_sd;
static constexpr StringData kCursorName = "cursor"_sd;
static constexpr StringData kBatchSizeName = "batchSize"_sd;
- static constexpr StringData kFromRouterName = "fromRouter"_sd;
+ static constexpr StringData kFromMongosName = "fromMongos"_sd;
static constexpr StringData kNeedsMergeName = "needsMerge"_sd;
+ static constexpr StringData kNeedsMerge34Name = "fromRouter"_sd;
static constexpr StringData kPipelineName = "pipeline"_sd;
static constexpr StringData kCollationName = "collation"_sd;
static constexpr StringData kExplainName = "explain"_sd;
@@ -135,8 +136,15 @@ public:
/**
* Returns true if this request originated from a mongoS.
*/
- bool isFromRouter() const {
- return _fromRouter;
+ bool isFromMongos() const {
+ return _fromMongos;
+ }
+
+ /**
+ * Returns true if this request originated from a 3.4 mongos.
+ */
+ bool isFrom34Mongos() const {
+ return _from34Mongos;
}
/**
@@ -218,8 +226,12 @@ public:
_allowDiskUse = allowDiskUse;
}
- void setFromRouter(bool isFromRouter) {
- _fromRouter = isFromRouter;
+ void setFromMongos(bool isFromMongos) {
+ _fromMongos = isFromMongos;
+ }
+
+ void setFrom34Mongos(bool isFrom34Mongos) {
+ _from34Mongos = isFrom34Mongos;
}
void setNeedsMerge(bool needsMerge) {
@@ -276,10 +288,16 @@ private:
boost::optional<ExplainOptions::Verbosity> _explainMode;
bool _allowDiskUse = false;
- bool _fromRouter = false;
+ bool _fromMongos = false;
bool _needsMerge = false;
bool _bypassDocumentValidation = false;
+ // We track whether the aggregation request came from a 3.4 mongos. If so, the merge may occur
+ // on a 3.4 shard (which does not understand sort key metadata), and we should not serialize the
+ // sort key.
+ // TODO SERVER-30924: remove this.
+ bool _from34Mongos = false;
+
// A user-specified maxTimeMS limit, or a value of '0' if not specified.
unsigned int _maxTimeMS = 0;
};
diff --git a/src/mongo/db/pipeline/aggregation_request_test.cpp b/src/mongo/db/pipeline/aggregation_request_test.cpp
index 8192e0fed3e..312e8158121 100644
--- a/src/mongo/db/pipeline/aggregation_request_test.cpp
+++ b/src/mongo/db/pipeline/aggregation_request_test.cpp
@@ -56,14 +56,14 @@ const Document kDefaultCursorOptionDocument{
TEST(AggregationRequestTest, ShouldParseAllKnownOptions) {
NamespaceString nss("a.collection");
const BSONObj inputBson = fromjson(
- "{pipeline: [{$match: {a: 'abc'}}], explain: false, allowDiskUse: true, fromRouter: true, "
+ "{pipeline: [{$match: {a: 'abc'}}], explain: false, allowDiskUse: true, fromMongos: true, "
"needsMerge: true, bypassDocumentValidation: true, collation: {locale: 'en_US'}, cursor: "
"{batchSize: 10}, hint: {a: 1}, maxTimeMS: 100, readConcern: {level: 'linearizable'}, "
"$queryOptions: {$readPreference: 'nearest'}, comment: 'agg_comment'}}");
auto request = unittest::assertGet(AggregationRequest::parseFromBSON(nss, inputBson));
ASSERT_FALSE(request.getExplain());
ASSERT_TRUE(request.shouldAllowDiskUse());
- ASSERT_TRUE(request.isFromRouter());
+ ASSERT_TRUE(request.isFromMongos());
ASSERT_TRUE(request.needsMerge());
ASSERT_TRUE(request.shouldBypassDocumentValidation());
ASSERT_EQ(request.getBatchSize(), 10);
@@ -81,6 +81,16 @@ TEST(AggregationRequestTest, ShouldParseAllKnownOptions) {
<< "nearest"));
}
+TEST(AggregationRequestTest, ShouldParseNeedsMerge34) {
+ NamespaceString nss("a.collection");
+ const BSONObj inputBson =
+ fromjson("{pipeline: [{$match: {a: 'abc'}}], cursor: {}, fromRouter: true}");
+ auto request = unittest::assertGet(AggregationRequest::parseFromBSON(nss, inputBson));
+ ASSERT_TRUE(request.needsMerge());
+ ASSERT_TRUE(request.isFromMongos());
+ ASSERT_TRUE(request.isFrom34Mongos());
+}
+
TEST(AggregationRequestTest, ShouldParseExplicitExplainTrue) {
NamespaceString nss("a.collection");
const BSONObj inputBson = fromjson("{pipeline: [], explain: true, cursor: {}}");
@@ -136,7 +146,7 @@ TEST(AggregationRequestTest, ShouldNotSerializeOptionalValuesIfEquivalentToDefau
AggregationRequest request(nss, {});
request.setExplain(boost::none);
request.setAllowDiskUse(false);
- request.setFromRouter(false);
+ request.setFromMongos(false);
request.setNeedsMerge(false);
request.setBypassDocumentValidation(false);
request.setCollation(BSONObj());
@@ -157,7 +167,7 @@ TEST(AggregationRequestTest, ShouldSerializeOptionalValuesIfSet) {
NamespaceString nss("a.collection");
AggregationRequest request(nss, {});
request.setAllowDiskUse(true);
- request.setFromRouter(true);
+ request.setFromMongos(true);
request.setNeedsMerge(true);
request.setBypassDocumentValidation(true);
request.setBatchSize(10);
@@ -180,7 +190,7 @@ TEST(AggregationRequestTest, ShouldSerializeOptionalValuesIfSet) {
Document{{AggregationRequest::kCommandName, nss.coll()},
{AggregationRequest::kPipelineName, Value(std::vector<Value>{})},
{AggregationRequest::kAllowDiskUseName, true},
- {AggregationRequest::kFromRouterName, true},
+ {AggregationRequest::kFromMongosName, true},
{AggregationRequest::kNeedsMergeName, true},
{bypassDocumentValidationCommandOption(), true},
{AggregationRequest::kCollationName, collationObj},
@@ -317,17 +327,46 @@ TEST(AggregationRequestTest, ShouldRejectExplainIfObject) {
ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus());
}
-TEST(AggregationRequestTest, ShouldRejectNonBoolFromRouter) {
+TEST(AggregationRequestTest, ShouldRejectNonBoolFromMongos) {
NamespaceString nss("a.collection");
const BSONObj inputBson =
- fromjson("{pipeline: [{$match: {a: 'abc'}}], cursor: {}, fromRouter: 1}");
+ fromjson("{pipeline: [{$match: {a: 'abc'}}], cursor: {}, fromMongos: 1}");
ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus());
}
TEST(AggregationRequestTest, ShouldRejectNonBoolNeedsMerge) {
NamespaceString nss("a.collection");
const BSONObj inputBson =
- fromjson("{pipeline: [{$match: {a: 'abc'}}], cursor: {}, needsMerge: 1}");
+ fromjson("{pipeline: [{$match: {a: 'abc'}}], cursor: {}, needsMerge: 1, fromMongos: true}");
+ ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus());
+}
+
+TEST(AggregationRequestTest, ShouldRejectNeedsMergeIfFromMongosNotPresent) {
+ NamespaceString nss("a.collection");
+ const BSONObj inputBson =
+ fromjson("{pipeline: [{$match: {a: 'abc'}}], cursor: {}, needsMerge: true}");
+ ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus());
+}
+
+TEST(AggregationRequestTest, ShouldRejectNonBoolNeedsMerge34) {
+ NamespaceString nss("a.collection");
+ const BSONObj inputBson =
+ fromjson("{pipeline: [{$match: {a: 'abc'}}], cursor: {}, fromRouter: 1}");
+ ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus());
+}
+
+TEST(AggregationRequestTest, ShouldRejectNeedsMergeIfNeedsMerge34AlsoPresent) {
+ NamespaceString nss("a.collection");
+ const BSONObj inputBson = fromjson(
+ "{pipeline: [{$match: {a: 'abc'}}], cursor: {}, needsMerge: true, fromMongos: true, "
+ "fromRouter: true}");
+ ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus());
+}
+
+TEST(AggregationRequestTest, ShouldRejectFromMongosIfNeedsMerge34AlsoPresent) {
+ NamespaceString nss("a.collection");
+ const BSONObj inputBson = fromjson(
+ "{pipeline: [{$match: {a: 'abc'}}], cursor: {}, fromMongos: true, fromRouter: true}");
ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus());
}
diff --git a/src/mongo/db/pipeline/document.cpp b/src/mongo/db/pipeline/document.cpp
index 4f531fe72f3..f92248d7508 100644
--- a/src/mongo/db/pipeline/document.cpp
+++ b/src/mongo/db/pipeline/document.cpp
@@ -273,14 +273,14 @@ constexpr StringData Document::metaFieldTextScore;
constexpr StringData Document::metaFieldRandVal;
constexpr StringData Document::metaFieldSortKey;
-BSONObj Document::toBsonWithMetaData() const {
+BSONObj Document::toBsonWithMetaData(bool includeSortKey) const {
BSONObjBuilder bb;
toBson(&bb);
if (hasTextScore())
bb.append(metaFieldTextScore, getTextScore());
if (hasRandMetaField())
bb.append(metaFieldRandVal, getRandMetaField());
- if (hasSortKeyMetaField())
+ if (includeSortKey && hasSortKeyMetaField())
bb.append(metaFieldSortKey, getSortKeyMetaField());
return bb.obj();
}
diff --git a/src/mongo/db/pipeline/document.h b/src/mongo/db/pipeline/document.h
index a8ebfd1c4d2..0df446a797a 100644
--- a/src/mongo/db/pipeline/document.h
+++ b/src/mongo/db/pipeline/document.h
@@ -208,7 +208,7 @@ public:
* Like toBson, but includes metadata at the top-level.
* Output is parseable by fromBsonWithMetaData
*/
- BSONObj toBsonWithMetaData() const;
+ BSONObj toBsonWithMetaData(bool includeSortKey = true) const;
/**
* Like Document(BSONObj) but treats top-level fields with special names as metadata.
diff --git a/src/mongo/db/pipeline/document_source_bucket_auto.cpp b/src/mongo/db/pipeline/document_source_bucket_auto.cpp
index e1385b19c68..e874fd5d426 100644
--- a/src/mongo/db/pipeline/document_source_bucket_auto.cpp
+++ b/src/mongo/db/pipeline/document_source_bucket_auto.cpp
@@ -91,7 +91,7 @@ DocumentSource::GetNextResult DocumentSourceBucketAuto::populateSorter() {
if (!_sorter) {
SortOptions opts;
opts.maxMemoryUsageBytes = _maxMemoryUsageBytes;
- if (pExpCtx->extSortAllowed && !pExpCtx->inRouter) {
+ if (pExpCtx->extSortAllowed && !pExpCtx->inMongos) {
opts.extSortAllowed = true;
opts.tempDir = pExpCtx->tempDir;
}
diff --git a/src/mongo/db/pipeline/document_source_bucket_auto_test.cpp b/src/mongo/db/pipeline/document_source_bucket_auto_test.cpp
index 333a130a0ae..08926254480 100644
--- a/src/mongo/db/pipeline/document_source_bucket_auto_test.cpp
+++ b/src/mongo/db/pipeline/document_source_bucket_auto_test.cpp
@@ -648,15 +648,15 @@ TEST_F(BucketAutoTests, ShouldFailIfBufferingTooManyDocuments) {
auto expCtx = getExpCtx();
expCtx->extSortAllowed = false;
- expCtx->inRouter = false;
+ expCtx->inMongos = false;
assertCannotSpillToDisk(expCtx);
expCtx->extSortAllowed = true;
- expCtx->inRouter = true;
+ expCtx->inMongos = true;
assertCannotSpillToDisk(expCtx);
expCtx->extSortAllowed = false;
- expCtx->inRouter = true;
+ expCtx->inMongos = true;
assertCannotSpillToDisk(expCtx);
}
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp
index 6b73b4b5084..3b318e6d19a 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream.cpp
@@ -222,7 +222,7 @@ list<intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::createFromBson(
BSONElement elem, const intrusive_ptr<ExpressionContext>& expCtx) {
// TODO: Add sharding support here (SERVER-29141).
uassert(
- 40470, "The $changeStream stage is not supported on sharded systems.", !expCtx->inRouter);
+ 40470, "The $changeStream stage is not supported on sharded systems.", !expCtx->inMongos);
uassert(40471,
"Only default collation is allowed when using a $changeStream stage.",
!expCtx->getCollator());
diff --git a/src/mongo/db/pipeline/document_source_current_op.cpp b/src/mongo/db/pipeline/document_source_current_op.cpp
index 7b22cdce75d..ce77e3da6ae 100644
--- a/src/mongo/db/pipeline/document_source_current_op.cpp
+++ b/src/mongo/db/pipeline/document_source_current_op.cpp
@@ -95,18 +95,18 @@ DocumentSource::GetNextResult DocumentSourceCurrentOp::getNext() {
_opsIter = _ops.begin();
- if (pExpCtx->fromRouter) {
+ if (pExpCtx->fromMongos) {
_shardName = _mongod->getShardName(pExpCtx->opCtx);
uassert(40465,
- "Aggregation request specified 'fromRouter' but unable to retrieve shard name "
+ "Aggregation request specified 'fromMongos' but unable to retrieve shard name "
"for $currentOp pipeline stage.",
!_shardName.empty());
}
}
if (_opsIter != _ops.end()) {
- if (!pExpCtx->fromRouter) {
+ if (!pExpCtx->fromMongos) {
return Document(*_opsIter++);
}
diff --git a/src/mongo/db/pipeline/document_source_current_op_test.cpp b/src/mongo/db/pipeline/document_source_current_op_test.cpp
index b852c94ffbe..a52453f8aa1 100644
--- a/src/mongo/db/pipeline/document_source_current_op_test.cpp
+++ b/src/mongo/db/pipeline/document_source_current_op_test.cpp
@@ -191,7 +191,7 @@ TEST_F(DocumentSourceCurrentOpTest, ShouldReturnEOFImmediatelyIfNoCurrentOps) {
TEST_F(DocumentSourceCurrentOpTest,
ShouldAddShardNameModifyOpIDAndClientFieldNameInShardedContext) {
- getExpCtx()->fromRouter = true;
+ getExpCtx()->fromMongos = true;
std::vector<BSONObj> ops{fromjson("{ client: '192.168.1.10:50844', opid: 430 }")};
const auto mongod = std::make_shared<MockMongodImplementation>(ops);
@@ -209,7 +209,7 @@ TEST_F(DocumentSourceCurrentOpTest,
TEST_F(DocumentSourceCurrentOpTest,
ShouldReturnOpIDAndClientFieldNameUnmodifiedWhenNotInShardedContext) {
- getExpCtx()->fromRouter = false;
+ getExpCtx()->fromMongos = false;
std::vector<BSONObj> ops{fromjson("{ client: '192.168.1.10:50844', opid: 430 }")};
const auto mongod = std::make_shared<MockMongodImplementation>(ops);
@@ -224,7 +224,7 @@ TEST_F(DocumentSourceCurrentOpTest,
}
TEST_F(DocumentSourceCurrentOpTest, ShouldFailIfNoShardNameAvailableForShardedRequest) {
- getExpCtx()->fromRouter = true;
+ getExpCtx()->fromMongos = true;
const auto mongod = std::make_shared<MockMongodImplementation>(false);
@@ -235,7 +235,7 @@ TEST_F(DocumentSourceCurrentOpTest, ShouldFailIfNoShardNameAvailableForShardedRe
}
TEST_F(DocumentSourceCurrentOpTest, ShouldFailIfOpIDIsNonNumericWhenModifyingInShardedContext) {
- getExpCtx()->fromRouter = true;
+ getExpCtx()->fromMongos = true;
std::vector<BSONObj> ops{fromjson("{ client: '192.168.1.10:50844', opid: 'string' }")};
const auto mongod = std::make_shared<MockMongodImplementation>(ops);
diff --git a/src/mongo/db/pipeline/document_source_group.cpp b/src/mongo/db/pipeline/document_source_group.cpp
index de75a225f70..ba2c48680f2 100644
--- a/src/mongo/db/pipeline/document_source_group.cpp
+++ b/src/mongo/db/pipeline/document_source_group.cpp
@@ -266,7 +266,7 @@ DocumentSourceGroup::DocumentSourceGroup(const intrusive_ptr<ExpressionContext>&
_initialized(false),
_groups(pExpCtx->getValueComparator().makeUnorderedValueMap<Accumulators>()),
_spilled(false),
- _extSortAllowed(pExpCtx->extSortAllowed && !pExpCtx->inRouter) {}
+ _extSortAllowed(pExpCtx->extSortAllowed && !pExpCtx->inMongos) {}
void DocumentSourceGroup::addAccumulator(AccumulationStatement accumulationStatement) {
_accumulatedFields.push_back(accumulationStatement);
@@ -530,7 +530,7 @@ DocumentSource::GetNextResult DocumentSourceGroup::initialize() {
if (kDebugBuild && !storageGlobalParams.readOnly) {
// In debug mode, spill every time we have a duplicate id to stress merge logic.
if (!inserted && // is a dup
- !pExpCtx->inRouter && // can't spill to disk in router
+ !pExpCtx->inMongos && // can't spill to disk in mongos
!_extSortAllowed && // don't change behavior when testing external sort
_sortedFiles.size() < 20) { // don't open too many FDs
diff --git a/src/mongo/db/pipeline/document_source_group_test.cpp b/src/mongo/db/pipeline/document_source_group_test.cpp
index 34e5a753e7a..5cc5ba93be4 100644
--- a/src/mongo/db/pipeline/document_source_group_test.cpp
+++ b/src/mongo/db/pipeline/document_source_group_test.cpp
@@ -70,7 +70,7 @@ using DocumentSourceGroupTest = AggregationContextFixture;
TEST_F(DocumentSourceGroupTest, ShouldBeAbleToPauseLoading) {
auto expCtx = getExpCtx();
- expCtx->inRouter = true; // Disallow external sort.
+ expCtx->inMongos = true; // Disallow external sort.
// This is the only way to do this in a debug build.
AccumulationStatement countStatement{"count",
ExpressionConstant::create(expCtx, Value(1)),
@@ -142,7 +142,7 @@ TEST_F(DocumentSourceGroupTest, ShouldBeAbleToPauseLoadingWhileSpilled) {
TEST_F(DocumentSourceGroupTest, ShouldErrorIfNotAllowedToSpillToDiskAndResultSetIsTooLarge) {
auto expCtx = getExpCtx();
const size_t maxMemoryUsageBytes = 1000;
- expCtx->inRouter = true; // Disallow external sort.
+ expCtx->inMongos = true; // Disallow external sort.
// This is the only way to do this in a debug build.
VariablesParseState vps = expCtx->variablesParseState;
@@ -164,7 +164,7 @@ TEST_F(DocumentSourceGroupTest, ShouldErrorIfNotAllowedToSpillToDiskAndResultSet
TEST_F(DocumentSourceGroupTest, ShouldCorrectlyTrackMemoryUsageBetweenPauses) {
auto expCtx = getExpCtx();
const size_t maxMemoryUsageBytes = 1000;
- expCtx->inRouter = true; // Disallow external sort.
+ expCtx->inMongos = true; // Disallow external sort.
// This is the only way to do this in a debug build.
VariablesParseState vps = expCtx->variablesParseState;
@@ -206,15 +206,15 @@ public:
_tempDir("DocumentSourceGroupTest") {}
protected:
- void createGroup(const BSONObj& spec, bool inShard = false, bool inRouter = false) {
+ void createGroup(const BSONObj& spec, bool inShard = false, bool inMongos = false) {
BSONObj namedSpec = BSON("$group" << spec);
BSONElement specElement = namedSpec.firstElement();
intrusive_ptr<ExpressionContextForTest> expressionContext =
new ExpressionContextForTest(_opCtx.get(), AggregationRequest(NamespaceString(ns), {}));
- // For $group, 'inShard' implies 'fromRouter' and 'needsMerge'.
- expressionContext->fromRouter = expressionContext->needsMerge = inShard;
- expressionContext->inRouter = inRouter;
+ // For $group, 'inShard' implies 'fromMongos' and 'needsMerge'.
+ expressionContext->fromMongos = expressionContext->needsMerge = inShard;
+ expressionContext->inMongos = inMongos;
// Won't spill to disk properly if it needs to.
expressionContext->tempDir = _tempDir.path();
@@ -1012,7 +1012,7 @@ public:
// We pretend to be in the router so that we don't spill to disk, because this produces
// inconsistent output on debug vs. non-debug builds.
- const bool inRouter = true;
+ const bool inMongos = true;
const bool inShard = false;
createGroup(BSON("_id" << BSON("x"
@@ -1020,7 +1020,7 @@ public:
<< "y"
<< "$b")),
inShard,
- inRouter);
+ inMongos);
group()->setSource(source.get());
group()->getNext();
@@ -1039,7 +1039,7 @@ public:
// We pretend to be in the router so that we don't spill to disk, because this produces
// inconsistent output on debug vs. non-debug builds.
- const bool inRouter = true;
+ const bool inMongos = true;
const bool inShard = false;
createGroup(BSON("_id" << BSON("a"
@@ -1047,7 +1047,7 @@ public:
<< "b"
<< "$a")),
inShard,
- inRouter);
+ inMongos);
group()->setSource(source.get());
group()->getNext();
@@ -1066,10 +1066,10 @@ public:
// We pretend to be in the router so that we don't spill to disk, because this produces
// inconsistent output on debug vs. non-debug builds.
- const bool inRouter = true;
+ const bool inMongos = true;
const bool inShard = false;
- createGroup(fromjson("{_id: {$sum: ['$a', '$b']}}"), inShard, inRouter);
+ createGroup(fromjson("{_id: {$sum: ['$a', '$b']}}"), inShard, inMongos);
group()->setSource(source.get());
group()->getNext();
diff --git a/src/mongo/db/pipeline/document_source_sort.cpp b/src/mongo/db/pipeline/document_source_sort.cpp
index 54cf67a97a6..97923ff8118 100644
--- a/src/mongo/db/pipeline/document_source_sort.cpp
+++ b/src/mongo/db/pipeline/document_source_sort.cpp
@@ -304,7 +304,7 @@ SortOptions DocumentSourceSort::makeSortOptions() const {
opts.limit = limitSrc->getLimit();
opts.maxMemoryUsageBytes = _maxMemoryUsageBytes;
- if (pExpCtx->extSortAllowed && !pExpCtx->inRouter) {
+ if (pExpCtx->extSortAllowed && !pExpCtx->inMongos) {
opts.extSortAllowed = true;
opts.tempDir = pExpCtx->tempDir;
}
diff --git a/src/mongo/db/pipeline/expression_context.cpp b/src/mongo/db/pipeline/expression_context.cpp
index f2dd5ab59cf..0a6005c73b0 100644
--- a/src/mongo/db/pipeline/expression_context.cpp
+++ b/src/mongo/db/pipeline/expression_context.cpp
@@ -44,10 +44,11 @@ ExpressionContext::ExpressionContext(OperationContext* opCtx,
std::unique_ptr<CollatorInterface> collator,
StringMap<ResolvedNamespace> resolvedNamespaces)
: explain(request.getExplain()),
- fromRouter(request.isFromRouter()),
+ fromMongos(request.isFromMongos()),
needsMerge(request.needsMerge()),
extSortAllowed(request.shouldAllowDiskUse()),
bypassDocumentValidation(request.shouldBypassDocumentValidation()),
+ from34Mongos(request.isFrom34Mongos()),
ns(request.getNamespaceString()),
opCtx(opCtx),
collation(request.getCollation()),
@@ -78,8 +79,9 @@ intrusive_ptr<ExpressionContext> ExpressionContext::copyWith(NamespaceString ns)
expCtx->explain = explain;
expCtx->needsMerge = needsMerge;
- expCtx->fromRouter = fromRouter;
- expCtx->inRouter = inRouter;
+ expCtx->fromMongos = fromMongos;
+ expCtx->from34Mongos = from34Mongos;
+ expCtx->inMongos = inMongos;
expCtx->extSortAllowed = extSortAllowed;
expCtx->bypassDocumentValidation = bypassDocumentValidation;
diff --git a/src/mongo/db/pipeline/expression_context.h b/src/mongo/db/pipeline/expression_context.h
index c67ba4ab9b8..ad7052968c1 100644
--- a/src/mongo/db/pipeline/expression_context.h
+++ b/src/mongo/db/pipeline/expression_context.h
@@ -113,12 +113,18 @@ public:
// The explain verbosity requested by the user, or boost::none if no explain was requested.
boost::optional<ExplainOptions::Verbosity> explain;
- bool fromRouter = false;
+ bool fromMongos = false;
bool needsMerge = false;
- bool inRouter = false;
+ bool inMongos = false;
bool extSortAllowed = false;
bool bypassDocumentValidation = false;
+ // We track whether the aggregation request came from a 3.4 mongos. If so, the merge may occur
+ // on a 3.4 shard (which does not understand sort key metadata), and we should not serialize the
+ // sort key.
+ // TODO SERVER-30924: remove this.
+ bool from34Mongos = false;
+
NamespaceString ns;
std::string tempDir; // Defaults to empty to prevent external sorting in mongos.
diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp
index 1bdbd85e850..fc8bab48bf4 100644
--- a/src/mongo/s/commands/cluster_aggregate.cpp
+++ b/src/mongo/s/commands/cluster_aggregate.cpp
@@ -185,7 +185,7 @@ BSONObj createCommandForTargetedShards(
const std::unique_ptr<Pipeline, Pipeline::Deleter>& pipelineForTargetedShards) {
// Create the command for the shards.
MutableDocument targetedCmd(request.serializeToCommandObj());
- targetedCmd[AggregationRequest::kFromRouterName] = Value(true);
+ targetedCmd[AggregationRequest::kFromMongosName] = Value(true);
// If 'pipelineForTargetedShards' is 'nullptr', this is an unsharded direct passthrough.
if (pipelineForTargetedShards) {
@@ -221,7 +221,7 @@ BSONObj createCommandForMergingShard(
MutableDocument mergeCmd(request.serializeToCommandObj());
mergeCmd["pipeline"] = Value(pipelineForMerging->serialize());
- mergeCmd[AggregationRequest::kFromRouterName] = Value(true);
+ mergeCmd[AggregationRequest::kFromMongosName] = Value(true);
mergeCmd["writeConcern"] = Value(originalCmdObj["writeConcern"]);
// If the user didn't specify a collation already, make sure there's a collation attached to
@@ -451,7 +451,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
boost::intrusive_ptr<ExpressionContext> mergeCtx =
new ExpressionContext(opCtx, request, std::move(collation), std::move(resolvedNamespaces));
- mergeCtx->inRouter = true;
+ mergeCtx->inMongos = true;
// explicitly *not* setting mergeCtx->tempDir
auto pipeline = uassertStatusOK(Pipeline::parse(request.getPipeline(), mergeCtx));
@@ -678,7 +678,7 @@ Status ClusterAggregate::aggPassthrough(OperationContext* opCtx,
}
auto shard = std::move(swShard.getValue());
- // Format the command for the shard. This adds the 'fromRouter' field, wraps the command as an
+ // Format the command for the shard. This adds the 'fromMongos' field, wraps the command as an
// explain if necessary, and rewrites the result into a format safe to forward to shards.
cmdObj = Command::filterCommandRequestForPassthrough(
createCommandForTargetedShards(aggRequest, cmdObj, nullptr));