diff options
author | Alex Taskov <alex.taskov@mongodb.com> | 2020-08-28 09:12:15 -0400 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-08-28 14:10:24 +0000 |
commit | 7d915a65b286e535397a5d79bf2109512003bda9 (patch) | |
tree | 79a98d26851b43e9f3075b669cd0f9ba1b4cad5a | |
parent | 0c52e9773275d0c4a11c77e56d5ea0f914a903be (diff) | |
download | mongo-7d915a65b286e535397a5d79bf2109512003bda9.tar.gz |
SERVER-49895 Expose getLatestOplogTimestamp() in aggregation cursor command responses on oplog
-rw-r--r-- | jstests/sharding/resharding_oplog_sync_agg_resume_token.js | 90 | ||||
-rw-r--r-- | src/mongo/db/exec/collection_scan.h | 12 | ||||
-rw-r--r-- | src/mongo/db/pipeline/aggregation_request.cpp | 16 | ||||
-rw-r--r-- | src/mongo/db/pipeline/aggregation_request.h | 10 | ||||
-rw-r--r-- | src/mongo/db/pipeline/aggregation_request_test.cpp | 23 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline_d.cpp | 5 | ||||
-rw-r--r-- | src/mongo/s/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/s/resharding/resume_token.idl | 43 |
8 files changed, 197 insertions, 3 deletions
diff --git a/jstests/sharding/resharding_oplog_sync_agg_resume_token.js b/jstests/sharding/resharding_oplog_sync_agg_resume_token.js new file mode 100644 index 00000000000..7d85b1ffabd --- /dev/null +++ b/jstests/sharding/resharding_oplog_sync_agg_resume_token.js @@ -0,0 +1,90 @@ +/** + * Test that the postBatchResumeToken field is only included for the oplog namespace when + * $_requestResumeToken is specified for an aggregate command. + * + * @tags: [requires_fcv_47] + */ +(function() { +"use strict"; + +var rst = new ReplSetTest({nodes: 1}); +rst.startSet(); +rst.initiate(); + +const dbName = "test"; +const collName = "foo"; +const ns = dbName + "." + collName; + +jsTest.log("Inserting documents to generate oplog entries"); +let testDB = rst.getPrimary().getDB(dbName); +let testColl = testDB.foo; + +for (let i = 0; i < 10; i++) { + assert.commandWorked(testColl.insert({x: i})); +} + +const localDb = rst.getPrimary().getDB("local"); + +jsTest.log("Run aggregation pipeline on oplog with $_requestResumeToken set"); +const resEnabled = localDb.runCommand({ + aggregate: "oplog.rs", + pipeline: [{$match: {ts: {$gte: Timestamp(0, 0)}}}], + $_requestResumeToken: true, + cursor: {batchSize: 1} +}); + +assert.commandWorked(resEnabled); +assert(resEnabled.cursor.hasOwnProperty("postBatchResumeToken"), resEnabled); +assert(resEnabled.cursor.postBatchResumeToken.hasOwnProperty("ts"), resEnabled); + +jsTest.log("Ensure that postBatchResumeToken attribute is returned for getMore command"); +const cursorId = resEnabled.cursor.id; +const resGetMore = + assert.commandWorked(localDb.runCommand({getMore: cursorId, collection: "oplog.rs"})); + +assert.commandWorked(resGetMore); +assert(resGetMore.cursor.hasOwnProperty("postBatchResumeToken"), resGetMore); +assert(resGetMore.cursor.postBatchResumeToken.hasOwnProperty("ts"), resGetMore); + +jsTest.log("Run aggregation pipeline on oplog with $_requestResumeToken disabled"); +const resDisabled = localDb.runCommand({ + aggregate: "oplog.rs", + pipeline: [{$match: {ts: {$gte: Timestamp(0, 0)}}}], + $_requestResumeToken: false, + cursor: {} +}); + +assert.commandWorked(resDisabled); +assert(!resDisabled.cursor.hasOwnProperty("postBatchResumeToken"), resDisabled); + +jsTest.log( + "Run aggregation pipeline on oplog with $_requestResumeToken unspecified and defaulting to disabled"); +const resWithout = localDb.runCommand( + {aggregate: "oplog.rs", pipeline: [{$match: {ts: {$gte: Timestamp(0, 0)}}}], cursor: {}}); + +assert.commandWorked(resWithout); +assert(!resWithout.cursor.hasOwnProperty("postBatchResumeToken"), resWithout); + +jsTest.log("Run aggregation pipeline on non-oplog with $_requestResumeToken set"); +const resNotOplog = localDb.runCommand( + {aggregate: ns, pipeline: [{limit: 100}], $_requestResumeToken: true, cursor: {}}); + +assert.commandFailedWithCode(resNotOplog, ErrorCodes.FailedToParse); + +jsTest.log("Run aggregation pipeline on oplog with empty batch"); +const resEmpty = localDb.runCommand({ + aggregate: "oplog.rs", + pipeline: [{$match: {ts: {$gte: Timestamp(0, 0)}}}], + $_requestResumeToken: true, + cursor: {batchSize: 0} +}); + +assert.commandWorked(resEmpty); +assert(resEmpty.cursor.hasOwnProperty("postBatchResumeToken"), resEmpty); +assert(resEnabled.cursor.postBatchResumeToken.hasOwnProperty("ts"), resEmpty); +assert.eq(resEmpty.cursor.postBatchResumeToken.ts, new Timestamp(0, 0)); + +jsTest.log("End of test"); + +rst.stopSet(); +})();
\ No newline at end of file diff --git a/src/mongo/db/exec/collection_scan.h b/src/mongo/db/exec/collection_scan.h index 3d79915b0fe..b67d8cd6603 100644 --- a/src/mongo/db/exec/collection_scan.h +++ b/src/mongo/db/exec/collection_scan.h @@ -35,6 +35,7 @@ #include "mongo/db/exec/requires_collection_stage.h" #include "mongo/db/matcher/expression_leaf.h" #include "mongo/db/record_id.h" +#include "mongo/s/resharding/resume_token_gen.h" namespace mongo { @@ -74,7 +75,16 @@ public: } BSONObj getPostBatchResumeToken() const { - return _params.requestResumeToken ? BSON("$recordId" << _lastSeenId.repr()) : BSONObj(); + // Return a resume token compatible with resumable initial sync. + if (_params.requestResumeToken) { + return BSON("$recordId" << _lastSeenId.repr()); + } + // Return a resume token compatible with resharding oplog sync. + if (_params.shouldTrackLatestOplogTimestamp) { + return ResumeTokenOplogTimestamp{_latestOplogEntryTimestamp}.toBSON(); + } + + return {}; } std::unique_ptr<PlanStageStats> getStats() final; diff --git a/src/mongo/db/pipeline/aggregation_request.cpp b/src/mongo/db/pipeline/aggregation_request.cpp index a948696b526..8e8761547aa 100644 --- a/src/mongo/db/pipeline/aggregation_request.cpp +++ b/src/mongo/db/pipeline/aggregation_request.cpp @@ -180,6 +180,21 @@ StatusWith<AggregationRequest> AggregationRequest::parseFromBSON( } } else if (bypassDocumentValidationCommandOption() == fieldName) { request.setBypassDocumentValidation(elem.trueValue()); + } else if (kRequestResumeToken == fieldName) { + if (elem.type() != BSONType::Bool) { + return {ErrorCodes::TypeMismatch, + str::stream() + << fieldName << "must be a boolean, not a " << typeName(elem.type())}; + } + + request.setRequestResumeToken(elem.Bool()); + + if (request.getRequestResumeToken() && !request.getNamespaceString().isOplog()) { + return {ErrorCodes::FailedToParse, + str::stream() + << fieldName << " must only be set for the oplog namespace, not " + << request.getNamespaceString()}; + } } else if (WriteConcernOptions::kWriteConcernField == fieldName) { if (elem.type() != BSONType::Object) { return {ErrorCodes::TypeMismatch, @@ -315,6 +330,7 @@ Document AggregationRequest::serializeToCommandObj() const { {kNeedsMergeName, _needsMerge ? Value(true) : Value()}, {bypassDocumentValidationCommandOption(), _bypassDocumentValidation ? Value(true) : Value()}, + {kRequestResumeToken, _requestResumeToken ? Value(true) : Value()}, // Only serialize a collation if one was specified. {kCollationName, _collation.isEmpty() ? Value() : Value(_collation)}, // Only serialize batchSize if not an explain, otherwise serialize an empty cursor object. diff --git a/src/mongo/db/pipeline/aggregation_request.h b/src/mongo/db/pipeline/aggregation_request.h index 66e4d95df57..1aa8d98a057 100644 --- a/src/mongo/db/pipeline/aggregation_request.h +++ b/src/mongo/db/pipeline/aggregation_request.h @@ -67,6 +67,7 @@ public: static constexpr StringData kIsMapReduceCommandName = "isMapReduceCommand"_sd; static constexpr StringData kLetName = "let"_sd; static constexpr StringData kCollectionUUIDName = "collectionUUID"_sd; + static constexpr StringData kRequestResumeToken = "$_requestResumeToken"_sd; static constexpr long long kDefaultBatchSize = 101; @@ -181,6 +182,10 @@ public: return _bypassDocumentValidation; } + bool getRequestResumeToken() const { + return _requestResumeToken; + } + /** * Returns an empty object if no collation was specified. */ @@ -272,6 +277,10 @@ public: _bypassDocumentValidation = shouldBypassDocumentValidation; } + void setRequestResumeToken(bool requestResumeToken) { + _requestResumeToken = requestResumeToken; + } + void setMaxTimeMS(unsigned int maxTimeMS) { _maxTimeMS = maxTimeMS; } @@ -342,6 +351,7 @@ private: bool _fromMongos = false; bool _needsMerge = false; bool _bypassDocumentValidation = false; + bool _requestResumeToken = false; // A user-specified maxTimeMS limit, or a value of '0' if not specified. unsigned int _maxTimeMS = 0; diff --git a/src/mongo/db/pipeline/aggregation_request_test.cpp b/src/mongo/db/pipeline/aggregation_request_test.cpp index 04a93a21ea1..7a0d2a7653b 100644 --- a/src/mongo/db/pipeline/aggregation_request_test.cpp +++ b/src/mongo/db/pipeline/aggregation_request_test.cpp @@ -55,10 +55,12 @@ const Document kDefaultCursorOptionDocument{ // TEST(AggregationRequestTest, ShouldParseAllKnownOptions) { - NamespaceString nss("a.collection"); + // Using oplog namespace so that validation of $_requestResumeToken succeeds. + NamespaceString nss("local.oplog.rs"); BSONObj inputBson = fromjson( "{pipeline: [{$match: {a: 'abc'}}], explain: false, allowDiskUse: true, fromMongos: true, " - "needsMerge: true, bypassDocumentValidation: true, collation: {locale: 'en_US'}, cursor: " + "needsMerge: true, bypassDocumentValidation: true, $_requestResumeToken: true, collation: " + "{locale: 'en_US'}, cursor: " "{batchSize: 10}, hint: {a: 1}, maxTimeMS: 100, readConcern: {level: 'linearizable'}, " "$queryOptions: {$readPreference: 'nearest'}, exchange: {policy: " "'roundrobin', consumers:NumberInt(2)}, isMapReduceCommand: true}"); @@ -73,6 +75,7 @@ TEST(AggregationRequestTest, ShouldParseAllKnownOptions) { ASSERT_TRUE(request.isFromMongos()); ASSERT_TRUE(request.needsMerge()); ASSERT_TRUE(request.shouldBypassDocumentValidation()); + ASSERT_TRUE(request.getRequestResumeToken()); ASSERT_EQ(request.getBatchSize(), 10); ASSERT_BSONOBJ_EQ(request.getHint(), BSON("a" << 1)); ASSERT_BSONOBJ_EQ(request.getCollation(), @@ -90,6 +93,13 @@ TEST(AggregationRequestTest, ShouldParseAllKnownOptions) { ASSERT_EQ(*request.getCollectionUUID(), uuid); } +TEST(AggregationRequestTest, ShouldParseExplicitRequestResumeTokenFalseForNonOplog) { + NamespaceString nss("a.collection"); + const BSONObj inputBson = fromjson("{pipeline: [], $_requestResumeToken: false, cursor: {}}"); + auto request = unittest::assertGet(AggregationRequest::parseFromBSON(nss, inputBson)); + ASSERT_FALSE(request.getRequestResumeToken()); +} + TEST(AggregationRequestTest, ShouldParseExplicitExplainTrue) { NamespaceString nss("a.collection"); const BSONObj inputBson = fromjson("{pipeline: [], explain: true, cursor: {}}"); @@ -161,6 +171,7 @@ TEST(AggregationRequestTest, ShouldNotSerializeOptionalValuesIfEquivalentToDefau request.setFromMongos(false); request.setNeedsMerge(false); request.setBypassDocumentValidation(false); + request.setRequestResumeToken(false); request.setCollation(BSONObj()); request.setHint(BSONObj()); request.setMaxTimeMS(0u); @@ -182,6 +193,7 @@ TEST(AggregationRequestTest, ShouldSerializeOptionalValuesIfSet) { request.setFromMongos(true); request.setNeedsMerge(true); request.setBypassDocumentValidation(true); + request.setRequestResumeToken(true); request.setBatchSize(10); request.setMaxTimeMS(10u); const auto hintObj = BSON("a" << 1); @@ -209,6 +221,7 @@ TEST(AggregationRequestTest, ShouldSerializeOptionalValuesIfSet) { {AggregationRequest::kFromMongosName, true}, {AggregationRequest::kNeedsMergeName, true}, {bypassDocumentValidationCommandOption(), true}, + {AggregationRequest::kRequestResumeToken, true}, {AggregationRequest::kCollationName, collationObj}, {AggregationRequest::kCursorName, Value(Document({{AggregationRequest::kBatchSizeName, 10}}))}, @@ -439,6 +452,12 @@ TEST(AggregationRequestTest, ShouldRejectExplainExecStatsVerbosityWithWriteConce .getStatus()); } +TEST(AggregationRequestTest, ShouldRejectRequestResumeTokenIfNonOplogNss) { + NamespaceString nss("a.collection"); + const BSONObj inputBson = fromjson("{pipeline: [], $_requestResumeToken: true}"); + ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus()); +} + TEST(AggregationRequestTest, CannotParseNeedsMerge34) { NamespaceString nss("a.collection"); const BSONObj inputBson = diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index 647380368db..f6cbf689508 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -659,6 +659,11 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep QueryPlannerParams::ASSERT_MIN_TS_HAS_NOT_FALLEN_OFF_OPLOG); } + // The aggregate command's $_requestResumeToken parameter can only be used for the oplog. + if (aggRequest && aggRequest->getRequestResumeToken()) { + plannerOpts |= QueryPlannerParams::TRACK_LATEST_OPLOG_TS; + } + // If there is a sort stage eligible for pushdown, serialize its SortPattern to a BSONObj. The // BSONObj format is currently necessary to request that the sort is computed by the query layer // inside the inner PlanExecutor. We also remove the $sort stage from the Pipeline, since it diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript index 4d1aa91d63c..c1ab8a0b0b2 100644 --- a/src/mongo/s/SConscript +++ b/src/mongo/s/SConscript @@ -183,6 +183,7 @@ env.Library( env.Idlc('request_types/wait_for_fail_point.idl')[0], env.Idlc('resharding/common_types.idl')[0], env.Idlc('resharding/type_collection_fields.idl')[0], + env.Idlc('resharding/resume_token.idl')[0], ], LIBDEPS=[ '$BUILD_DIR/mongo/client/connection_string', diff --git a/src/mongo/s/resharding/resume_token.idl b/src/mongo/s/resharding/resume_token.idl new file mode 100644 index 00000000000..f89101c389b --- /dev/null +++ b/src/mongo/s/resharding/resume_token.idl @@ -0,0 +1,43 @@ +# Copyright (C) 2020-present MongoDB, Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the Server Side Public License, version 1, +# as published by MongoDB, Inc. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Server Side Public License for more details. +# +# You should have received a copy of the Server Side Public License +# along with this program. If not, see +# <http://www.mongodb.com/licensing/server-side-public-license>. +# +# As a special exception, the copyright holders give permission to link the +# code of portions of this program with the OpenSSL library under certain +# conditions as described in each individual source file and distribute +# linked combinations including the program with the OpenSSL library. You +# must comply with the Server Side Public License in all respects for +# all of the code used other than as permitted herein. If you modify file(s) +# with this exception, you may extend this exception to your version of the +# file(s), but you are not obligated to do so. If you do not wish to do so, +# delete this exception statement from your version. If you delete this +# exception statement from all source files in the program, then also delete +# it in the license file. +# + +global: + cpp_namespace: "mongo" + +imports: + - "mongo/idl/basic_types.idl" + +structs: + ResumeTokenOplogTimestamp: + description: "Token format which facilitates resuming from the most recently-observed timestamp in the oplog" + strict: true + fields: + ts: + type: timestamp + description: "The latest oplog timestamp." + optional: false |