summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Taskov <alex.taskov@mongodb.com>2020-08-28 09:12:15 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-08-28 14:10:24 +0000
commit7d915a65b286e535397a5d79bf2109512003bda9 (patch)
tree79a98d26851b43e9f3075b669cd0f9ba1b4cad5a
parent0c52e9773275d0c4a11c77e56d5ea0f914a903be (diff)
downloadmongo-7d915a65b286e535397a5d79bf2109512003bda9.tar.gz
SERVER-49895 Expose getLatestOplogTimestamp() in aggregation cursor command responses on oplog
-rw-r--r--jstests/sharding/resharding_oplog_sync_agg_resume_token.js90
-rw-r--r--src/mongo/db/exec/collection_scan.h12
-rw-r--r--src/mongo/db/pipeline/aggregation_request.cpp16
-rw-r--r--src/mongo/db/pipeline/aggregation_request.h10
-rw-r--r--src/mongo/db/pipeline/aggregation_request_test.cpp23
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp5
-rw-r--r--src/mongo/s/SConscript1
-rw-r--r--src/mongo/s/resharding/resume_token.idl43
8 files changed, 197 insertions, 3 deletions
diff --git a/jstests/sharding/resharding_oplog_sync_agg_resume_token.js b/jstests/sharding/resharding_oplog_sync_agg_resume_token.js
new file mode 100644
index 00000000000..7d85b1ffabd
--- /dev/null
+++ b/jstests/sharding/resharding_oplog_sync_agg_resume_token.js
@@ -0,0 +1,90 @@
+/**
+ * Test that the postBatchResumeToken field is only included for the oplog namespace when
+ * $_requestResumeToken is specified for an aggregate command.
+ *
+ * @tags: [requires_fcv_47]
+ */
+(function() {
+"use strict";
+
+var rst = new ReplSetTest({nodes: 1});
+rst.startSet();
+rst.initiate();
+
+const dbName = "test";
+const collName = "foo";
+const ns = dbName + "." + collName;
+
+jsTest.log("Inserting documents to generate oplog entries");
+let testDB = rst.getPrimary().getDB(dbName);
+let testColl = testDB.foo;
+
+for (let i = 0; i < 10; i++) {
+ assert.commandWorked(testColl.insert({x: i}));
+}
+
+const localDb = rst.getPrimary().getDB("local");
+
+jsTest.log("Run aggregation pipeline on oplog with $_requestResumeToken set");
+const resEnabled = localDb.runCommand({
+ aggregate: "oplog.rs",
+ pipeline: [{$match: {ts: {$gte: Timestamp(0, 0)}}}],
+ $_requestResumeToken: true,
+ cursor: {batchSize: 1}
+});
+
+assert.commandWorked(resEnabled);
+assert(resEnabled.cursor.hasOwnProperty("postBatchResumeToken"), resEnabled);
+assert(resEnabled.cursor.postBatchResumeToken.hasOwnProperty("ts"), resEnabled);
+
+jsTest.log("Ensure that postBatchResumeToken attribute is returned for getMore command");
+const cursorId = resEnabled.cursor.id;
+const resGetMore =
+ assert.commandWorked(localDb.runCommand({getMore: cursorId, collection: "oplog.rs"}));
+
+assert.commandWorked(resGetMore);
+assert(resGetMore.cursor.hasOwnProperty("postBatchResumeToken"), resGetMore);
+assert(resGetMore.cursor.postBatchResumeToken.hasOwnProperty("ts"), resGetMore);
+
+jsTest.log("Run aggregation pipeline on oplog with $_requestResumeToken disabled");
+const resDisabled = localDb.runCommand({
+ aggregate: "oplog.rs",
+ pipeline: [{$match: {ts: {$gte: Timestamp(0, 0)}}}],
+ $_requestResumeToken: false,
+ cursor: {}
+});
+
+assert.commandWorked(resDisabled);
+assert(!resDisabled.cursor.hasOwnProperty("postBatchResumeToken"), resDisabled);
+
+jsTest.log(
+ "Run aggregation pipeline on oplog with $_requestResumeToken unspecified and defaulting to disabled");
+const resWithout = localDb.runCommand(
+ {aggregate: "oplog.rs", pipeline: [{$match: {ts: {$gte: Timestamp(0, 0)}}}], cursor: {}});
+
+assert.commandWorked(resWithout);
+assert(!resWithout.cursor.hasOwnProperty("postBatchResumeToken"), resWithout);
+
+jsTest.log("Run aggregation pipeline on non-oplog with $_requestResumeToken set");
+const resNotOplog = localDb.runCommand(
+ {aggregate: ns, pipeline: [{limit: 100}], $_requestResumeToken: true, cursor: {}});
+
+assert.commandFailedWithCode(resNotOplog, ErrorCodes.FailedToParse);
+
+jsTest.log("Run aggregation pipeline on oplog with empty batch");
+const resEmpty = localDb.runCommand({
+ aggregate: "oplog.rs",
+ pipeline: [{$match: {ts: {$gte: Timestamp(0, 0)}}}],
+ $_requestResumeToken: true,
+ cursor: {batchSize: 0}
+});
+
+assert.commandWorked(resEmpty);
+assert(resEmpty.cursor.hasOwnProperty("postBatchResumeToken"), resEmpty);
+assert(resEnabled.cursor.postBatchResumeToken.hasOwnProperty("ts"), resEmpty);
+assert.eq(resEmpty.cursor.postBatchResumeToken.ts, new Timestamp(0, 0));
+
+jsTest.log("End of test");
+
+rst.stopSet();
+})(); \ No newline at end of file
diff --git a/src/mongo/db/exec/collection_scan.h b/src/mongo/db/exec/collection_scan.h
index 3d79915b0fe..b67d8cd6603 100644
--- a/src/mongo/db/exec/collection_scan.h
+++ b/src/mongo/db/exec/collection_scan.h
@@ -35,6 +35,7 @@
#include "mongo/db/exec/requires_collection_stage.h"
#include "mongo/db/matcher/expression_leaf.h"
#include "mongo/db/record_id.h"
+#include "mongo/s/resharding/resume_token_gen.h"
namespace mongo {
@@ -74,7 +75,16 @@ public:
}
BSONObj getPostBatchResumeToken() const {
- return _params.requestResumeToken ? BSON("$recordId" << _lastSeenId.repr()) : BSONObj();
+ // Return a resume token compatible with resumable initial sync.
+ if (_params.requestResumeToken) {
+ return BSON("$recordId" << _lastSeenId.repr());
+ }
+ // Return a resume token compatible with resharding oplog sync.
+ if (_params.shouldTrackLatestOplogTimestamp) {
+ return ResumeTokenOplogTimestamp{_latestOplogEntryTimestamp}.toBSON();
+ }
+
+ return {};
}
std::unique_ptr<PlanStageStats> getStats() final;
diff --git a/src/mongo/db/pipeline/aggregation_request.cpp b/src/mongo/db/pipeline/aggregation_request.cpp
index a948696b526..8e8761547aa 100644
--- a/src/mongo/db/pipeline/aggregation_request.cpp
+++ b/src/mongo/db/pipeline/aggregation_request.cpp
@@ -180,6 +180,21 @@ StatusWith<AggregationRequest> AggregationRequest::parseFromBSON(
}
} else if (bypassDocumentValidationCommandOption() == fieldName) {
request.setBypassDocumentValidation(elem.trueValue());
+ } else if (kRequestResumeToken == fieldName) {
+ if (elem.type() != BSONType::Bool) {
+ return {ErrorCodes::TypeMismatch,
+ str::stream()
+ << fieldName << "must be a boolean, not a " << typeName(elem.type())};
+ }
+
+ request.setRequestResumeToken(elem.Bool());
+
+ if (request.getRequestResumeToken() && !request.getNamespaceString().isOplog()) {
+ return {ErrorCodes::FailedToParse,
+ str::stream()
+ << fieldName << " must only be set for the oplog namespace, not "
+ << request.getNamespaceString()};
+ }
} else if (WriteConcernOptions::kWriteConcernField == fieldName) {
if (elem.type() != BSONType::Object) {
return {ErrorCodes::TypeMismatch,
@@ -315,6 +330,7 @@ Document AggregationRequest::serializeToCommandObj() const {
{kNeedsMergeName, _needsMerge ? Value(true) : Value()},
{bypassDocumentValidationCommandOption(),
_bypassDocumentValidation ? Value(true) : Value()},
+ {kRequestResumeToken, _requestResumeToken ? Value(true) : Value()},
// Only serialize a collation if one was specified.
{kCollationName, _collation.isEmpty() ? Value() : Value(_collation)},
// Only serialize batchSize if not an explain, otherwise serialize an empty cursor object.
diff --git a/src/mongo/db/pipeline/aggregation_request.h b/src/mongo/db/pipeline/aggregation_request.h
index 66e4d95df57..1aa8d98a057 100644
--- a/src/mongo/db/pipeline/aggregation_request.h
+++ b/src/mongo/db/pipeline/aggregation_request.h
@@ -67,6 +67,7 @@ public:
static constexpr StringData kIsMapReduceCommandName = "isMapReduceCommand"_sd;
static constexpr StringData kLetName = "let"_sd;
static constexpr StringData kCollectionUUIDName = "collectionUUID"_sd;
+ static constexpr StringData kRequestResumeToken = "$_requestResumeToken"_sd;
static constexpr long long kDefaultBatchSize = 101;
@@ -181,6 +182,10 @@ public:
return _bypassDocumentValidation;
}
+ bool getRequestResumeToken() const {
+ return _requestResumeToken;
+ }
+
/**
* Returns an empty object if no collation was specified.
*/
@@ -272,6 +277,10 @@ public:
_bypassDocumentValidation = shouldBypassDocumentValidation;
}
+ void setRequestResumeToken(bool requestResumeToken) {
+ _requestResumeToken = requestResumeToken;
+ }
+
void setMaxTimeMS(unsigned int maxTimeMS) {
_maxTimeMS = maxTimeMS;
}
@@ -342,6 +351,7 @@ private:
bool _fromMongos = false;
bool _needsMerge = false;
bool _bypassDocumentValidation = false;
+ bool _requestResumeToken = false;
// A user-specified maxTimeMS limit, or a value of '0' if not specified.
unsigned int _maxTimeMS = 0;
diff --git a/src/mongo/db/pipeline/aggregation_request_test.cpp b/src/mongo/db/pipeline/aggregation_request_test.cpp
index 04a93a21ea1..7a0d2a7653b 100644
--- a/src/mongo/db/pipeline/aggregation_request_test.cpp
+++ b/src/mongo/db/pipeline/aggregation_request_test.cpp
@@ -55,10 +55,12 @@ const Document kDefaultCursorOptionDocument{
//
TEST(AggregationRequestTest, ShouldParseAllKnownOptions) {
- NamespaceString nss("a.collection");
+ // Using oplog namespace so that validation of $_requestResumeToken succeeds.
+ NamespaceString nss("local.oplog.rs");
BSONObj inputBson = fromjson(
"{pipeline: [{$match: {a: 'abc'}}], explain: false, allowDiskUse: true, fromMongos: true, "
- "needsMerge: true, bypassDocumentValidation: true, collation: {locale: 'en_US'}, cursor: "
+ "needsMerge: true, bypassDocumentValidation: true, $_requestResumeToken: true, collation: "
+ "{locale: 'en_US'}, cursor: "
"{batchSize: 10}, hint: {a: 1}, maxTimeMS: 100, readConcern: {level: 'linearizable'}, "
"$queryOptions: {$readPreference: 'nearest'}, exchange: {policy: "
"'roundrobin', consumers:NumberInt(2)}, isMapReduceCommand: true}");
@@ -73,6 +75,7 @@ TEST(AggregationRequestTest, ShouldParseAllKnownOptions) {
ASSERT_TRUE(request.isFromMongos());
ASSERT_TRUE(request.needsMerge());
ASSERT_TRUE(request.shouldBypassDocumentValidation());
+ ASSERT_TRUE(request.getRequestResumeToken());
ASSERT_EQ(request.getBatchSize(), 10);
ASSERT_BSONOBJ_EQ(request.getHint(), BSON("a" << 1));
ASSERT_BSONOBJ_EQ(request.getCollation(),
@@ -90,6 +93,13 @@ TEST(AggregationRequestTest, ShouldParseAllKnownOptions) {
ASSERT_EQ(*request.getCollectionUUID(), uuid);
}
+TEST(AggregationRequestTest, ShouldParseExplicitRequestResumeTokenFalseForNonOplog) {
+ NamespaceString nss("a.collection");
+ const BSONObj inputBson = fromjson("{pipeline: [], $_requestResumeToken: false, cursor: {}}");
+ auto request = unittest::assertGet(AggregationRequest::parseFromBSON(nss, inputBson));
+ ASSERT_FALSE(request.getRequestResumeToken());
+}
+
TEST(AggregationRequestTest, ShouldParseExplicitExplainTrue) {
NamespaceString nss("a.collection");
const BSONObj inputBson = fromjson("{pipeline: [], explain: true, cursor: {}}");
@@ -161,6 +171,7 @@ TEST(AggregationRequestTest, ShouldNotSerializeOptionalValuesIfEquivalentToDefau
request.setFromMongos(false);
request.setNeedsMerge(false);
request.setBypassDocumentValidation(false);
+ request.setRequestResumeToken(false);
request.setCollation(BSONObj());
request.setHint(BSONObj());
request.setMaxTimeMS(0u);
@@ -182,6 +193,7 @@ TEST(AggregationRequestTest, ShouldSerializeOptionalValuesIfSet) {
request.setFromMongos(true);
request.setNeedsMerge(true);
request.setBypassDocumentValidation(true);
+ request.setRequestResumeToken(true);
request.setBatchSize(10);
request.setMaxTimeMS(10u);
const auto hintObj = BSON("a" << 1);
@@ -209,6 +221,7 @@ TEST(AggregationRequestTest, ShouldSerializeOptionalValuesIfSet) {
{AggregationRequest::kFromMongosName, true},
{AggregationRequest::kNeedsMergeName, true},
{bypassDocumentValidationCommandOption(), true},
+ {AggregationRequest::kRequestResumeToken, true},
{AggregationRequest::kCollationName, collationObj},
{AggregationRequest::kCursorName,
Value(Document({{AggregationRequest::kBatchSizeName, 10}}))},
@@ -439,6 +452,12 @@ TEST(AggregationRequestTest, ShouldRejectExplainExecStatsVerbosityWithWriteConce
.getStatus());
}
+TEST(AggregationRequestTest, ShouldRejectRequestResumeTokenIfNonOplogNss) {
+ NamespaceString nss("a.collection");
+ const BSONObj inputBson = fromjson("{pipeline: [], $_requestResumeToken: true}");
+ ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus());
+}
+
TEST(AggregationRequestTest, CannotParseNeedsMerge34) {
NamespaceString nss("a.collection");
const BSONObj inputBson =
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index 647380368db..f6cbf689508 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -659,6 +659,11 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep
QueryPlannerParams::ASSERT_MIN_TS_HAS_NOT_FALLEN_OFF_OPLOG);
}
+ // The aggregate command's $_requestResumeToken parameter can only be used for the oplog.
+ if (aggRequest && aggRequest->getRequestResumeToken()) {
+ plannerOpts |= QueryPlannerParams::TRACK_LATEST_OPLOG_TS;
+ }
+
// If there is a sort stage eligible for pushdown, serialize its SortPattern to a BSONObj. The
// BSONObj format is currently necessary to request that the sort is computed by the query layer
// inside the inner PlanExecutor. We also remove the $sort stage from the Pipeline, since it
diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript
index 4d1aa91d63c..c1ab8a0b0b2 100644
--- a/src/mongo/s/SConscript
+++ b/src/mongo/s/SConscript
@@ -183,6 +183,7 @@ env.Library(
env.Idlc('request_types/wait_for_fail_point.idl')[0],
env.Idlc('resharding/common_types.idl')[0],
env.Idlc('resharding/type_collection_fields.idl')[0],
+ env.Idlc('resharding/resume_token.idl')[0],
],
LIBDEPS=[
'$BUILD_DIR/mongo/client/connection_string',
diff --git a/src/mongo/s/resharding/resume_token.idl b/src/mongo/s/resharding/resume_token.idl
new file mode 100644
index 00000000000..f89101c389b
--- /dev/null
+++ b/src/mongo/s/resharding/resume_token.idl
@@ -0,0 +1,43 @@
+# Copyright (C) 2020-present MongoDB, Inc.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the Server Side Public License, version 1,
+# as published by MongoDB, Inc.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# Server Side Public License for more details.
+#
+# You should have received a copy of the Server Side Public License
+# along with this program. If not, see
+# <http://www.mongodb.com/licensing/server-side-public-license>.
+#
+# As a special exception, the copyright holders give permission to link the
+# code of portions of this program with the OpenSSL library under certain
+# conditions as described in each individual source file and distribute
+# linked combinations including the program with the OpenSSL library. You
+# must comply with the Server Side Public License in all respects for
+# all of the code used other than as permitted herein. If you modify file(s)
+# with this exception, you may extend this exception to your version of the
+# file(s), but you are not obligated to do so. If you do not wish to do so,
+# delete this exception statement from your version. If you delete this
+# exception statement from all source files in the program, then also delete
+# it in the license file.
+#
+
+global:
+ cpp_namespace: "mongo"
+
+imports:
+ - "mongo/idl/basic_types.idl"
+
+structs:
+ ResumeTokenOplogTimestamp:
+ description: "Token format which facilitates resuming from the most recently-observed timestamp in the oplog"
+ strict: true
+ fields:
+ ts:
+ type: timestamp
+ description: "The latest oplog timestamp."
+ optional: false