summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline
diff options
context:
space:
mode:
authorAlex Taskov <alex.taskov@mongodb.com>2020-08-28 09:12:15 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-08-28 14:10:24 +0000
commit7d915a65b286e535397a5d79bf2109512003bda9 (patch)
tree79a98d26851b43e9f3075b669cd0f9ba1b4cad5a /src/mongo/db/pipeline
parent0c52e9773275d0c4a11c77e56d5ea0f914a903be (diff)
downloadmongo-7d915a65b286e535397a5d79bf2109512003bda9.tar.gz
SERVER-49895 Expose getLatestOplogTimestamp() in aggregation cursor command responses on oplog
Diffstat (limited to 'src/mongo/db/pipeline')
-rw-r--r--src/mongo/db/pipeline/aggregation_request.cpp16
-rw-r--r--src/mongo/db/pipeline/aggregation_request.h10
-rw-r--r--src/mongo/db/pipeline/aggregation_request_test.cpp23
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp5
4 files changed, 52 insertions, 2 deletions
diff --git a/src/mongo/db/pipeline/aggregation_request.cpp b/src/mongo/db/pipeline/aggregation_request.cpp
index a948696b526..8e8761547aa 100644
--- a/src/mongo/db/pipeline/aggregation_request.cpp
+++ b/src/mongo/db/pipeline/aggregation_request.cpp
@@ -180,6 +180,21 @@ StatusWith<AggregationRequest> AggregationRequest::parseFromBSON(
}
} else if (bypassDocumentValidationCommandOption() == fieldName) {
request.setBypassDocumentValidation(elem.trueValue());
+ } else if (kRequestResumeToken == fieldName) {
+ if (elem.type() != BSONType::Bool) {
+ return {ErrorCodes::TypeMismatch,
+ str::stream()
+ << fieldName << "must be a boolean, not a " << typeName(elem.type())};
+ }
+
+ request.setRequestResumeToken(elem.Bool());
+
+ if (request.getRequestResumeToken() && !request.getNamespaceString().isOplog()) {
+ return {ErrorCodes::FailedToParse,
+ str::stream()
+ << fieldName << " must only be set for the oplog namespace, not "
+ << request.getNamespaceString()};
+ }
} else if (WriteConcernOptions::kWriteConcernField == fieldName) {
if (elem.type() != BSONType::Object) {
return {ErrorCodes::TypeMismatch,
@@ -315,6 +330,7 @@ Document AggregationRequest::serializeToCommandObj() const {
{kNeedsMergeName, _needsMerge ? Value(true) : Value()},
{bypassDocumentValidationCommandOption(),
_bypassDocumentValidation ? Value(true) : Value()},
+ {kRequestResumeToken, _requestResumeToken ? Value(true) : Value()},
// Only serialize a collation if one was specified.
{kCollationName, _collation.isEmpty() ? Value() : Value(_collation)},
// Only serialize batchSize if not an explain, otherwise serialize an empty cursor object.
diff --git a/src/mongo/db/pipeline/aggregation_request.h b/src/mongo/db/pipeline/aggregation_request.h
index 66e4d95df57..1aa8d98a057 100644
--- a/src/mongo/db/pipeline/aggregation_request.h
+++ b/src/mongo/db/pipeline/aggregation_request.h
@@ -67,6 +67,7 @@ public:
static constexpr StringData kIsMapReduceCommandName = "isMapReduceCommand"_sd;
static constexpr StringData kLetName = "let"_sd;
static constexpr StringData kCollectionUUIDName = "collectionUUID"_sd;
+ static constexpr StringData kRequestResumeToken = "$_requestResumeToken"_sd;
static constexpr long long kDefaultBatchSize = 101;
@@ -181,6 +182,10 @@ public:
return _bypassDocumentValidation;
}
+ bool getRequestResumeToken() const {
+ return _requestResumeToken;
+ }
+
/**
* Returns an empty object if no collation was specified.
*/
@@ -272,6 +277,10 @@ public:
_bypassDocumentValidation = shouldBypassDocumentValidation;
}
+ void setRequestResumeToken(bool requestResumeToken) {
+ _requestResumeToken = requestResumeToken;
+ }
+
void setMaxTimeMS(unsigned int maxTimeMS) {
_maxTimeMS = maxTimeMS;
}
@@ -342,6 +351,7 @@ private:
bool _fromMongos = false;
bool _needsMerge = false;
bool _bypassDocumentValidation = false;
+ bool _requestResumeToken = false;
// A user-specified maxTimeMS limit, or a value of '0' if not specified.
unsigned int _maxTimeMS = 0;
diff --git a/src/mongo/db/pipeline/aggregation_request_test.cpp b/src/mongo/db/pipeline/aggregation_request_test.cpp
index 04a93a21ea1..7a0d2a7653b 100644
--- a/src/mongo/db/pipeline/aggregation_request_test.cpp
+++ b/src/mongo/db/pipeline/aggregation_request_test.cpp
@@ -55,10 +55,12 @@ const Document kDefaultCursorOptionDocument{
//
TEST(AggregationRequestTest, ShouldParseAllKnownOptions) {
- NamespaceString nss("a.collection");
+ // Using oplog namespace so that validation of $_requestResumeToken succeeds.
+ NamespaceString nss("local.oplog.rs");
BSONObj inputBson = fromjson(
"{pipeline: [{$match: {a: 'abc'}}], explain: false, allowDiskUse: true, fromMongos: true, "
- "needsMerge: true, bypassDocumentValidation: true, collation: {locale: 'en_US'}, cursor: "
+ "needsMerge: true, bypassDocumentValidation: true, $_requestResumeToken: true, collation: "
+ "{locale: 'en_US'}, cursor: "
"{batchSize: 10}, hint: {a: 1}, maxTimeMS: 100, readConcern: {level: 'linearizable'}, "
"$queryOptions: {$readPreference: 'nearest'}, exchange: {policy: "
"'roundrobin', consumers:NumberInt(2)}, isMapReduceCommand: true}");
@@ -73,6 +75,7 @@ TEST(AggregationRequestTest, ShouldParseAllKnownOptions) {
ASSERT_TRUE(request.isFromMongos());
ASSERT_TRUE(request.needsMerge());
ASSERT_TRUE(request.shouldBypassDocumentValidation());
+ ASSERT_TRUE(request.getRequestResumeToken());
ASSERT_EQ(request.getBatchSize(), 10);
ASSERT_BSONOBJ_EQ(request.getHint(), BSON("a" << 1));
ASSERT_BSONOBJ_EQ(request.getCollation(),
@@ -90,6 +93,13 @@ TEST(AggregationRequestTest, ShouldParseAllKnownOptions) {
ASSERT_EQ(*request.getCollectionUUID(), uuid);
}
+TEST(AggregationRequestTest, ShouldParseExplicitRequestResumeTokenFalseForNonOplog) {
+ NamespaceString nss("a.collection");
+ const BSONObj inputBson = fromjson("{pipeline: [], $_requestResumeToken: false, cursor: {}}");
+ auto request = unittest::assertGet(AggregationRequest::parseFromBSON(nss, inputBson));
+ ASSERT_FALSE(request.getRequestResumeToken());
+}
+
TEST(AggregationRequestTest, ShouldParseExplicitExplainTrue) {
NamespaceString nss("a.collection");
const BSONObj inputBson = fromjson("{pipeline: [], explain: true, cursor: {}}");
@@ -161,6 +171,7 @@ TEST(AggregationRequestTest, ShouldNotSerializeOptionalValuesIfEquivalentToDefau
request.setFromMongos(false);
request.setNeedsMerge(false);
request.setBypassDocumentValidation(false);
+ request.setRequestResumeToken(false);
request.setCollation(BSONObj());
request.setHint(BSONObj());
request.setMaxTimeMS(0u);
@@ -182,6 +193,7 @@ TEST(AggregationRequestTest, ShouldSerializeOptionalValuesIfSet) {
request.setFromMongos(true);
request.setNeedsMerge(true);
request.setBypassDocumentValidation(true);
+ request.setRequestResumeToken(true);
request.setBatchSize(10);
request.setMaxTimeMS(10u);
const auto hintObj = BSON("a" << 1);
@@ -209,6 +221,7 @@ TEST(AggregationRequestTest, ShouldSerializeOptionalValuesIfSet) {
{AggregationRequest::kFromMongosName, true},
{AggregationRequest::kNeedsMergeName, true},
{bypassDocumentValidationCommandOption(), true},
+ {AggregationRequest::kRequestResumeToken, true},
{AggregationRequest::kCollationName, collationObj},
{AggregationRequest::kCursorName,
Value(Document({{AggregationRequest::kBatchSizeName, 10}}))},
@@ -439,6 +452,12 @@ TEST(AggregationRequestTest, ShouldRejectExplainExecStatsVerbosityWithWriteConce
.getStatus());
}
+TEST(AggregationRequestTest, ShouldRejectRequestResumeTokenIfNonOplogNss) {
+ NamespaceString nss("a.collection");
+ const BSONObj inputBson = fromjson("{pipeline: [], $_requestResumeToken: true}");
+ ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus());
+}
+
TEST(AggregationRequestTest, CannotParseNeedsMerge34) {
NamespaceString nss("a.collection");
const BSONObj inputBson =
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index 647380368db..f6cbf689508 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -659,6 +659,11 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep
QueryPlannerParams::ASSERT_MIN_TS_HAS_NOT_FALLEN_OFF_OPLOG);
}
+ // The aggregate command's $_requestResumeToken parameter can only be used for the oplog.
+ if (aggRequest && aggRequest->getRequestResumeToken()) {
+ plannerOpts |= QueryPlannerParams::TRACK_LATEST_OPLOG_TS;
+ }
+
// If there is a sort stage eligible for pushdown, serialize its SortPattern to a BSONObj. The
// BSONObj format is currently necessary to request that the sort is computed by the query layer
// inside the inner PlanExecutor. We also remove the $sort stage from the Pipeline, since it