diff options
Diffstat (limited to 'src/mongo/db/pipeline')
-rw-r--r-- | src/mongo/db/pipeline/aggregation_request.cpp | 16 | ||||
-rw-r--r-- | src/mongo/db/pipeline/aggregation_request.h | 10 | ||||
-rw-r--r-- | src/mongo/db/pipeline/aggregation_request_test.cpp | 23 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline_d.cpp | 5 |
4 files changed, 52 insertions, 2 deletions
diff --git a/src/mongo/db/pipeline/aggregation_request.cpp b/src/mongo/db/pipeline/aggregation_request.cpp index a948696b526..8e8761547aa 100644 --- a/src/mongo/db/pipeline/aggregation_request.cpp +++ b/src/mongo/db/pipeline/aggregation_request.cpp @@ -180,6 +180,21 @@ StatusWith<AggregationRequest> AggregationRequest::parseFromBSON( } } else if (bypassDocumentValidationCommandOption() == fieldName) { request.setBypassDocumentValidation(elem.trueValue()); + } else if (kRequestResumeToken == fieldName) { + if (elem.type() != BSONType::Bool) { + return {ErrorCodes::TypeMismatch, + str::stream() + << fieldName << "must be a boolean, not a " << typeName(elem.type())}; + } + + request.setRequestResumeToken(elem.Bool()); + + if (request.getRequestResumeToken() && !request.getNamespaceString().isOplog()) { + return {ErrorCodes::FailedToParse, + str::stream() + << fieldName << " must only be set for the oplog namespace, not " + << request.getNamespaceString()}; + } } else if (WriteConcernOptions::kWriteConcernField == fieldName) { if (elem.type() != BSONType::Object) { return {ErrorCodes::TypeMismatch, @@ -315,6 +330,7 @@ Document AggregationRequest::serializeToCommandObj() const { {kNeedsMergeName, _needsMerge ? Value(true) : Value()}, {bypassDocumentValidationCommandOption(), _bypassDocumentValidation ? Value(true) : Value()}, + {kRequestResumeToken, _requestResumeToken ? Value(true) : Value()}, // Only serialize a collation if one was specified. {kCollationName, _collation.isEmpty() ? Value() : Value(_collation)}, // Only serialize batchSize if not an explain, otherwise serialize an empty cursor object. diff --git a/src/mongo/db/pipeline/aggregation_request.h b/src/mongo/db/pipeline/aggregation_request.h index 66e4d95df57..1aa8d98a057 100644 --- a/src/mongo/db/pipeline/aggregation_request.h +++ b/src/mongo/db/pipeline/aggregation_request.h @@ -67,6 +67,7 @@ public: static constexpr StringData kIsMapReduceCommandName = "isMapReduceCommand"_sd; static constexpr StringData kLetName = "let"_sd; static constexpr StringData kCollectionUUIDName = "collectionUUID"_sd; + static constexpr StringData kRequestResumeToken = "$_requestResumeToken"_sd; static constexpr long long kDefaultBatchSize = 101; @@ -181,6 +182,10 @@ public: return _bypassDocumentValidation; } + bool getRequestResumeToken() const { + return _requestResumeToken; + } + /** * Returns an empty object if no collation was specified. */ @@ -272,6 +277,10 @@ public: _bypassDocumentValidation = shouldBypassDocumentValidation; } + void setRequestResumeToken(bool requestResumeToken) { + _requestResumeToken = requestResumeToken; + } + void setMaxTimeMS(unsigned int maxTimeMS) { _maxTimeMS = maxTimeMS; } @@ -342,6 +351,7 @@ private: bool _fromMongos = false; bool _needsMerge = false; bool _bypassDocumentValidation = false; + bool _requestResumeToken = false; // A user-specified maxTimeMS limit, or a value of '0' if not specified. unsigned int _maxTimeMS = 0; diff --git a/src/mongo/db/pipeline/aggregation_request_test.cpp b/src/mongo/db/pipeline/aggregation_request_test.cpp index 04a93a21ea1..7a0d2a7653b 100644 --- a/src/mongo/db/pipeline/aggregation_request_test.cpp +++ b/src/mongo/db/pipeline/aggregation_request_test.cpp @@ -55,10 +55,12 @@ const Document kDefaultCursorOptionDocument{ // TEST(AggregationRequestTest, ShouldParseAllKnownOptions) { - NamespaceString nss("a.collection"); + // Using oplog namespace so that validation of $_requestResumeToken succeeds. + NamespaceString nss("local.oplog.rs"); BSONObj inputBson = fromjson( "{pipeline: [{$match: {a: 'abc'}}], explain: false, allowDiskUse: true, fromMongos: true, " - "needsMerge: true, bypassDocumentValidation: true, collation: {locale: 'en_US'}, cursor: " + "needsMerge: true, bypassDocumentValidation: true, $_requestResumeToken: true, collation: " + "{locale: 'en_US'}, cursor: " "{batchSize: 10}, hint: {a: 1}, maxTimeMS: 100, readConcern: {level: 'linearizable'}, " "$queryOptions: {$readPreference: 'nearest'}, exchange: {policy: " "'roundrobin', consumers:NumberInt(2)}, isMapReduceCommand: true}"); @@ -73,6 +75,7 @@ TEST(AggregationRequestTest, ShouldParseAllKnownOptions) { ASSERT_TRUE(request.isFromMongos()); ASSERT_TRUE(request.needsMerge()); ASSERT_TRUE(request.shouldBypassDocumentValidation()); + ASSERT_TRUE(request.getRequestResumeToken()); ASSERT_EQ(request.getBatchSize(), 10); ASSERT_BSONOBJ_EQ(request.getHint(), BSON("a" << 1)); ASSERT_BSONOBJ_EQ(request.getCollation(), @@ -90,6 +93,13 @@ TEST(AggregationRequestTest, ShouldParseAllKnownOptions) { ASSERT_EQ(*request.getCollectionUUID(), uuid); } +TEST(AggregationRequestTest, ShouldParseExplicitRequestResumeTokenFalseForNonOplog) { + NamespaceString nss("a.collection"); + const BSONObj inputBson = fromjson("{pipeline: [], $_requestResumeToken: false, cursor: {}}"); + auto request = unittest::assertGet(AggregationRequest::parseFromBSON(nss, inputBson)); + ASSERT_FALSE(request.getRequestResumeToken()); +} + TEST(AggregationRequestTest, ShouldParseExplicitExplainTrue) { NamespaceString nss("a.collection"); const BSONObj inputBson = fromjson("{pipeline: [], explain: true, cursor: {}}"); @@ -161,6 +171,7 @@ TEST(AggregationRequestTest, ShouldNotSerializeOptionalValuesIfEquivalentToDefau request.setFromMongos(false); request.setNeedsMerge(false); request.setBypassDocumentValidation(false); + request.setRequestResumeToken(false); request.setCollation(BSONObj()); request.setHint(BSONObj()); request.setMaxTimeMS(0u); @@ -182,6 +193,7 @@ TEST(AggregationRequestTest, ShouldSerializeOptionalValuesIfSet) { request.setFromMongos(true); request.setNeedsMerge(true); request.setBypassDocumentValidation(true); + request.setRequestResumeToken(true); request.setBatchSize(10); request.setMaxTimeMS(10u); const auto hintObj = BSON("a" << 1); @@ -209,6 +221,7 @@ TEST(AggregationRequestTest, ShouldSerializeOptionalValuesIfSet) { {AggregationRequest::kFromMongosName, true}, {AggregationRequest::kNeedsMergeName, true}, {bypassDocumentValidationCommandOption(), true}, + {AggregationRequest::kRequestResumeToken, true}, {AggregationRequest::kCollationName, collationObj}, {AggregationRequest::kCursorName, Value(Document({{AggregationRequest::kBatchSizeName, 10}}))}, @@ -439,6 +452,12 @@ TEST(AggregationRequestTest, ShouldRejectExplainExecStatsVerbosityWithWriteConce .getStatus()); } +TEST(AggregationRequestTest, ShouldRejectRequestResumeTokenIfNonOplogNss) { + NamespaceString nss("a.collection"); + const BSONObj inputBson = fromjson("{pipeline: [], $_requestResumeToken: true}"); + ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus()); +} + TEST(AggregationRequestTest, CannotParseNeedsMerge34) { NamespaceString nss("a.collection"); const BSONObj inputBson = diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index 647380368db..f6cbf689508 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -659,6 +659,11 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep QueryPlannerParams::ASSERT_MIN_TS_HAS_NOT_FALLEN_OFF_OPLOG); } + // The aggregate command's $_requestResumeToken parameter can only be used for the oplog. + if (aggRequest && aggRequest->getRequestResumeToken()) { + plannerOpts |= QueryPlannerParams::TRACK_LATEST_OPLOG_TS; + } + // If there is a sort stage eligible for pushdown, serialize its SortPattern to a BSONObj. The // BSONObj format is currently necessary to request that the sort is computed by the query layer // inside the inner PlanExecutor. We also remove the $sort stage from the Pipeline, since it |