From 664e1d9b01dccddeec072b7746d7d4c62931716d Mon Sep 17 00:00:00 2001 From: Charlie Swanson Date: Tue, 15 May 2018 15:20:37 -0400 Subject: SERVER-34818 Rename startAtClusterTime to startAtOperationTime Also changes the type of the argument from an object with a single 'ts' field to just a Timestamp. --- jstests/change_streams/change_stream_apply_ops.js | 5 ++--- jstests/change_streams/change_stream_shell_helper.js | 6 +++--- .../change_stream_start_at_cluster_time.js | 20 +++++++++----------- jstests/change_streams/include_cluster_time.js | 9 ++++----- .../change_streams_feature_compatibility_version.js | 9 ++++----- jstests/sharding/resume_change_stream.js | 2 +- .../db/pipeline/document_source_change_stream.cpp | 20 ++++++++++---------- .../pipeline/document_source_change_stream_test.cpp | 10 +++++----- .../document_source_change_stream_transform.cpp | 9 ++++----- src/mongo/db/pipeline/document_sources.idl | 12 ++++++------ src/mongo/shell/collection.js | 6 +++--- src/mongo/shell/db.js | 6 +++--- src/mongo/shell/mongo.js | 6 +++--- 13 files changed, 57 insertions(+), 63 deletions(-) diff --git a/jstests/change_streams/change_stream_apply_ops.js b/jstests/change_streams/change_stream_apply_ops.js index 28948ed84b5..0f954578e52 100644 --- a/jstests/change_streams/change_stream_apply_ops.js +++ b/jstests/change_streams/change_stream_apply_ops.js @@ -116,8 +116,7 @@ // Verify that a whole-db stream returns the expected sequence of changes, including the insert // on the other collection but NOT the changes on the other DB or the manual applyOps. changeStream = cst.startWatchingChanges({ - pipeline: - [{$changeStream: {startAtClusterTime: {ts: startTime}}}, {$project: {"lsid.uid": 0}}], + pipeline: [{$changeStream: {startAtOperationTime: startTime}}, {$project: {"lsid.uid": 0}}], collection: 1 }); cst.assertNextChangesEqual( @@ -138,7 +137,7 @@ cst = new ChangeStreamTest(db.getSiblingDB("admin")); changeStream = cst.startWatchingChanges({ pipeline: [ - {$changeStream: {startAtClusterTime: {ts: startTime}, allChangesForCluster: true}}, + {$changeStream: {startAtOperationTime: startTime, allChangesForCluster: true}}, {$project: {"lsid.uid": 0}} ], collection: 1 diff --git a/jstests/change_streams/change_stream_shell_helper.js b/jstests/change_streams/change_stream_shell_helper.js index 0150e1ba39d..6ecb95221eb 100644 --- a/jstests/change_streams/change_stream_shell_helper.js +++ b/jstests/change_streams/change_stream_shell_helper.js @@ -75,7 +75,7 @@ // Store the cluster time of the insert as the timestamp to start from. const resumeTime = assert.commandWorked(db.runCommand({insert: coll.getName(), documents: [{_id: 1, x: 1}]})) - .$clusterTime.clusterTime; + .operationTime; checkNextChange(changeStreamCursor, {docId: 1}); @@ -84,9 +84,9 @@ coll.watch([{$project: {_id: 0, docId: "$documentKey._id"}}], {resumeAfter: resumeToken}); checkNextChange(changeStreamCursor, {docId: 1}); - jsTestLog("Testing watch() with pipeline and startAtClusterTime"); + jsTestLog("Testing watch() with pipeline and startAtOperationTime"); changeStreamCursor = coll.watch([{$project: {_id: 0, docId: "$documentKey._id"}}], - {startAtClusterTime: {ts: resumeTime}}); + {startAtOperationTime: resumeTime}); checkNextChange(changeStreamCursor, {docId: 1}); jsTestLog("Testing watch() with updateLookup"); diff --git a/jstests/change_streams/change_stream_start_at_cluster_time.js b/jstests/change_streams/change_stream_start_at_cluster_time.js index dc6f64bb551..3fb6786437a 100644 --- a/jstests/change_streams/change_stream_start_at_cluster_time.js +++ b/jstests/change_streams/change_stream_start_at_cluster_time.js @@ -20,9 +20,9 @@ let next = streamToFindClusterTime.next(); assert.eq(next.operationType, "update"); assert.eq(next.documentKey, {_id: -1}); - const clusterTimeOfFirstUpdate = next.clusterTime; + const timeOfFirstUpdate = next.clusterTime; - let changeStream = coll.watch([], {startAtClusterTime: {ts: clusterTimeOfFirstUpdate}}); + let changeStream = coll.watch([], {startAtOperationTime: timeOfFirstUpdate}); // Test that starting at the cluster time is inclusive of the first update, so we should see // both updates in the new stream. @@ -36,14 +36,12 @@ assert.eq(next.operationType, "update", tojson(next)); assert.eq(next.documentKey._id, 1, tojson(next)); - // Test that startAtClusterTime is not allowed alongside resumeAfter or + // Test that startAtOperationTime is not allowed alongside resumeAfter or // $_resumeAfterClusterTime. assert.commandFailedWithCode(db.runCommand({ aggregate: coll.getName(), - pipeline: [{ - $changeStream: - {startAtClusterTime: {ts: clusterTimeOfFirstUpdate}, resumeAfter: next._id} - }], + pipeline: + [{$changeStream: {startAtOperationTime: timeOfFirstUpdate, resumeAfter: next._id}}], cursor: {} }), 40674); @@ -52,8 +50,8 @@ aggregate: coll.getName(), pipeline: [{ $changeStream: { - startAtClusterTime: {ts: clusterTimeOfFirstUpdate}, - $_resumeAfterClusterTime: {ts: clusterTimeOfFirstUpdate} + startAtOperationTime: timeOfFirstUpdate, + $_resumeAfterClusterTime: {ts: timeOfFirstUpdate} } }], cursor: {} @@ -65,11 +63,11 @@ resumeTimeFarFuture = new Timestamp(resumeTimeFarFuture.getTime() + 60 * 60 * 6, 1); // 6 hours in the future - let changeStreamFuture = coll.watch([], {startAtClusterTime: {ts: resumeTimeFarFuture}}); + let changeStreamFuture = coll.watch([], {startAtOperationTime: resumeTimeFarFuture}); // Resume the change stream from the start of the test and verify it picks up the changes to the // collection. Namely, it should see two inserts followed by two updates. - changeStream = coll.watch([], {startAtClusterTime: {ts: testStartTime}}); + changeStream = coll.watch([], {startAtOperationTime: testStartTime}); assert.soon(() => changeStream.hasNext()); next = changeStream.next(); assert.eq(next.operationType, "insert", tojson(next)); diff --git a/jstests/change_streams/include_cluster_time.js b/jstests/change_streams/include_cluster_time.js index 6d0c33785e8..4be37f1f338 100644 --- a/jstests/change_streams/include_cluster_time.js +++ b/jstests/change_streams/include_cluster_time.js @@ -10,21 +10,20 @@ const changeStream = coll.watch(); const insertClusterTime = - assert.commandWorked(coll.runCommand("insert", {documents: [{_id: 0}]})) - .$clusterTime.clusterTime; + assert.commandWorked(coll.runCommand("insert", {documents: [{_id: 0}]})).operationTime; const updateClusterTime = assert .commandWorked( coll.runCommand("update", {updates: [{q: {_id: 0}, u: {$set: {updated: true}}}]})) - .$clusterTime.clusterTime; + .operationTime; const deleteClusterTime = assert.commandWorked(coll.runCommand("delete", {deletes: [{q: {_id: 0}, limit: 1}]})) - .$clusterTime.clusterTime; + .operationTime; const dropClusterTime = - assert.commandWorked(db.runCommand({drop: coll.getName()})).$clusterTime.clusterTime; + assert.commandWorked(db.runCommand({drop: coll.getName()})).operationTime; // Make sure each operation has a reasonable cluster time. Note that we should not assert // that the cluster times are equal, because the cluster time returned from the command is diff --git a/jstests/multiVersion/change_streams_feature_compatibility_version.js b/jstests/multiVersion/change_streams_feature_compatibility_version.js index 470b534aafa..a4b77fb350b 100644 --- a/jstests/multiVersion/change_streams_feature_compatibility_version.js +++ b/jstests/multiVersion/change_streams_feature_compatibility_version.js @@ -68,8 +68,7 @@ assert.commandFailedWithCode(testDB.runCommand({ aggregate: coll.getName(), - pipeline: - [{$changeStream: {startAtClusterTime: {ts: failedResponse.$clusterTime.clusterTime}}}], + pipeline: [{$changeStream: {startAtOperationTime: failedResponse.operationTime}}], cursor: {} }), 40415); @@ -102,7 +101,7 @@ // response to use later. const startTime = assert.commandWorked(adminDB.runCommand({setFeatureCompatibilityVersion: "4.0"})) - .$clusterTime.clusterTime; + .operationTime; // Test that we can now use 4.0 features to open a stream. const wholeDbCursor = @@ -110,7 +109,7 @@ const wholeClusterCursor = adminCST.startWatchingChanges( {pipeline: [{$changeStream: {allChangesForCluster: true}}], collection: 1}); const cursorStartedWithTime = cst.startWatchingChanges({ - pipeline: [{$changeStream: {startAtClusterTime: {ts: startTime}}}], + pipeline: [{$changeStream: {startAtOperationTime: startTime}}], collection: coll.getName() }); @@ -197,7 +196,7 @@ assert.commandFailedWithCode(testDB.runCommand({ aggregate: coll.getName(), - pipeline: [{$changeStream: {startAtClusterTime: {ts: startTime}}}], + pipeline: [{$changeStream: {startAtOperationTime: startTime}}], cursor: {} }), ErrorCodes.QueryFeatureNotAllowed); diff --git a/jstests/sharding/resume_change_stream.js b/jstests/sharding/resume_change_stream.js index 4f224007389..53396973a7c 100644 --- a/jstests/sharding/resume_change_stream.js +++ b/jstests/sharding/resume_change_stream.js @@ -130,7 +130,7 @@ ChangeStreamTest.assertChangeStreamThrowsCode({ db: mongosDB, collName: collToWatch, - pipeline: [{$changeStream: {startAtClusterTime: {ts: resumeTimeFirstUpdate}}}], + pipeline: [{$changeStream: {startAtOperationTime: resumeTimeFirstUpdate}}], expectedCode: 40576 }); diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp index 602fd1edd46..f285c29bca2 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream.cpp @@ -423,24 +423,24 @@ void parseResumeOptions(const intrusive_ptr& expCtx, } auto resumeAfterClusterTime = spec.getResumeAfterClusterTimeDeprecated(); - auto startAtClusterTime = spec.getStartAtClusterTime(); + auto startAtOperationTime = spec.getStartAtOperationTime(); uassert(40674, "Only one type of resume option is allowed, but multiple were found.", - !(*resumeStageOut) || (!resumeAfterClusterTime && !startAtClusterTime)); + !(*resumeStageOut) || (!resumeAfterClusterTime && !startAtOperationTime)); if (resumeAfterClusterTime) { if (fcv >= ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo40) { warning() << "The '$_resumeAfterClusterTime' option is deprecated, please use " - "'startAtClusterTime' instead."; + "'startAtOperationTime' instead."; } *startFromOut = resumeAfterClusterTime->getTimestamp(); } - // New field name starting in 4.0 is 'startAtClusterTime'. - if (startAtClusterTime) { + // New field name starting in 4.0 is 'startAtOperationTime'. + if (startAtOperationTime) { uassert(ErrorCodes::QueryFeatureNotAllowed, - str::stream() << "The startAtClusterTime option is not allowed in the current " + str::stream() << "The startAtOperationTime option is not allowed in the current " "feature compatibility version. See " << feature_compatibility_version_documentation::kCompatibilityLink << " for more information.", @@ -448,12 +448,12 @@ void parseResumeOptions(const intrusive_ptr& expCtx, uassert(50573, str::stream() << "Do not specify both " - << DocumentSourceChangeStreamSpec::kStartAtClusterTimeFieldName + << DocumentSourceChangeStreamSpec::kStartAtOperationTimeFieldName << " and " << DocumentSourceChangeStreamSpec::kResumeAfterClusterTimeDeprecatedFieldName << " in a $changeStream stage.", !resumeAfterClusterTime); - *startFromOut = startAtClusterTime->getTimestamp(); + *startFromOut = *startAtOperationTime; *resumeStageOut = DocumentSourceShardCheckResumability::create(expCtx, **startFromOut); } } @@ -532,9 +532,9 @@ BSONObj DocumentSourceChangeStream::replaceResumeTokenInCommand(const BSONObj or pipeline[0][DocumentSourceChangeStream::kStageName].getDocument()); changeStreamStage[DocumentSourceChangeStreamSpec::kResumeAfterFieldName] = Value(resumeToken); - // If the command was initially specified with a startAtClusterTime, we need to remove it + // If the command was initially specified with a startAtOperationTime, we need to remove it // to use the new resume token. - changeStreamStage[DocumentSourceChangeStreamSpec::kStartAtClusterTimeFieldName] = Value(); + changeStreamStage[DocumentSourceChangeStreamSpec::kStartAtOperationTimeFieldName] = Value(); pipeline[0] = Value(Document{{DocumentSourceChangeStream::kStageName, changeStreamStage.freeze()}}); MutableDocument newCmd(originalCmd); diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp index 5616e829b9e..a8412e0d8bf 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp @@ -336,7 +336,7 @@ TEST_F(ChangeStreamStageTest, ShouldRejectBothResumeAfterClusterTimeAndResumeAft 40674); } -TEST_F(ChangeStreamStageTest, ShouldRejectBothStartAtClusterTimeAndResumeAfterOptions) { +TEST_F(ChangeStreamStageTest, ShouldRejectBothStartAtOperationTimeAndResumeAfterOptions) { auto expCtx = getExpCtx(); // Need to put the collection in the UUID catalog so the resume token is valid. @@ -348,8 +348,8 @@ TEST_F(ChangeStreamStageTest, ShouldRejectBothStartAtClusterTimeAndResumeAfterOp BSON(DSChangeStream::kStageName << BSON("resumeAfter" << makeResumeToken(kDefaultTs, testUuid(), BSON("x" << 2 << "_id" << 1)) - << "startAtClusterTime" - << BSON("ts" << kDefaultTs))) + << "startAtOperationTime" + << kDefaultTs)) .firstElement(), expCtx), AssertionException, @@ -366,8 +366,8 @@ TEST_F(ChangeStreamStageTest, ShouldRejectBothStartAtAndResumeAfterClusterTimeOp ASSERT_THROWS_CODE(DSChangeStream::createFromBson( BSON(DSChangeStream::kStageName << BSON("$_resumeAfterClusterTime" << BSON("ts" << kDefaultTs) - << "startAtClusterTime" - << BSON("ts" << kDefaultTs))) + << "startAtOperationTime" + << kDefaultTs)) .firstElement(), expCtx), AssertionException, diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp index cb666b1ec8b..bb98f3bfffe 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp @@ -340,19 +340,18 @@ Value DocumentSourceChangeStreamTransform::serialize( changeStreamOptions [DocumentSourceChangeStreamSpec::kResumeAfterClusterTimeDeprecatedFieldName] .missing() && - changeStreamOptions[DocumentSourceChangeStreamSpec::kStartAtClusterTimeFieldName] + changeStreamOptions[DocumentSourceChangeStreamSpec::kStartAtOperationTimeFieldName] .missing()) { MutableDocument newChangeStreamOptions(changeStreamOptions); // Use the current cluster time plus 1 tick since the oplog query will include all - // operations/commands equal to or greater than the 'startAtClusterTime' timestamp. In + // operations/commands equal to or greater than the 'startAtOperationTime' timestamp. In // particular, avoid including the last operation that went through mongos in an attempt to // match the behavior of a replica set more closely. auto clusterTime = LogicalClock::get(pExpCtx->opCtx)->getClusterTime(); clusterTime.addTicks(1); - newChangeStreamOptions[DocumentSourceChangeStreamSpec::kStartAtClusterTimeFieldName] - [ResumeTokenClusterTime::kTimestampFieldName] = - Value(clusterTime.asTimestamp()); + newChangeStreamOptions[DocumentSourceChangeStreamSpec::kStartAtOperationTimeFieldName] = + Value(clusterTime.asTimestamp()); changeStreamOptions = newChangeStreamOptions.freeze(); } return Value(Document{{getSourceName(), changeStreamOptions}}); diff --git a/src/mongo/db/pipeline/document_sources.idl b/src/mongo/db/pipeline/document_sources.idl index 90a74777e70..3228ddc90a6 100644 --- a/src/mongo/db/pipeline/document_sources.idl +++ b/src/mongo/db/pipeline/document_sources.idl @@ -85,14 +85,14 @@ structs: description: The cluster time after which we should start reporting changes. Only one of resumeAfter and _resumeAfterClusterTime should be specified. For internal use only. Deprecated in 4.0, use - startAtClusterTime instead. - startAtClusterTime: - cpp_name: startAtClusterTime - type: ResumeTokenClusterTime + startAtOperationTime instead. + startAtOperationTime: + cpp_name: startAtOperationTime + type: timestamp optional: true - description: The cluster time after which we should start reporting changes. + description: The operation time after which we should start reporting changes. Only one of resumeAfter, _resumeAfterClusterTimeDeprecated, and - startAtClusterTime should be specified. + startAtOperationTime should be specified. fullDocument: cpp_name: fullDocument type: string diff --git a/src/mongo/shell/collection.js b/src/mongo/shell/collection.js index abec36729ea..2e0fed63321 100644 --- a/src/mongo/shell/collection.js +++ b/src/mongo/shell/collection.js @@ -1496,9 +1496,9 @@ DBCollection.prototype.watch = function(pipeline, options) { delete options.resumeAfter; } - if (options.hasOwnProperty("startAtClusterTime")) { - changeStreamStage.startAtClusterTime = options.startAtClusterTime; - delete options.startAtClusterTime; + if (options.hasOwnProperty("startAtOperationTime")) { + changeStreamStage.startAtOperationTime = options.startAtOperationTime; + delete options.startAtOperationTime; } pipeline.unshift({$changeStream: changeStreamStage}); diff --git a/src/mongo/shell/db.js b/src/mongo/shell/db.js index 73558eef336..9206aaa67db 100644 --- a/src/mongo/shell/db.js +++ b/src/mongo/shell/db.js @@ -1932,9 +1932,9 @@ var DB; delete options.resumeAfter; } - if (options.hasOwnProperty("startAtClusterTime")) { - changeStreamStage.startAtClusterTime = options.startAtClusterTime; - delete options.startAtClusterTime; + if (options.hasOwnProperty("startAtOperationTime")) { + changeStreamStage.startAtOperationTime = options.startAtOperationTime; + delete options.startAtOperationTime; } pipeline.unshift({$changeStream: changeStreamStage}); diff --git a/src/mongo/shell/mongo.js b/src/mongo/shell/mongo.js index db7d09b672f..98c5676b554 100644 --- a/src/mongo/shell/mongo.js +++ b/src/mongo/shell/mongo.js @@ -493,9 +493,9 @@ Mongo.prototype.watch = function(pipeline, options) { delete options.resumeAfter; } - if (options.hasOwnProperty("startAtClusterTime")) { - changeStreamStage.startAtClusterTime = options.startAtClusterTime; - delete options.startAtClusterTime; + if (options.hasOwnProperty("startAtOperationTime")) { + changeStreamStage.startAtOperationTime = options.startAtOperationTime; + delete options.startAtOperationTime; } pipeline.unshift({$changeStream: changeStreamStage}); -- cgit v1.2.1