summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCharlie Swanson <charlie.swanson@mongodb.com>2018-05-15 15:20:37 -0400
committerCharlie Swanson <charlie.swanson@mongodb.com>2018-05-17 10:28:26 -0400
commit664e1d9b01dccddeec072b7746d7d4c62931716d (patch)
tree1a79d178e84cb9a378726e3256f18f1799ba78c8
parentf3439006a717ea1c8b1598b0e68935816e150564 (diff)
downloadmongo-664e1d9b01dccddeec072b7746d7d4c62931716d.tar.gz
SERVER-34818 Rename startAtClusterTime to startAtOperationTime
Also changes the type of the argument from an object with a single 'ts' field to just a Timestamp.
-rw-r--r--jstests/change_streams/change_stream_apply_ops.js5
-rw-r--r--jstests/change_streams/change_stream_shell_helper.js6
-rw-r--r--jstests/change_streams/change_stream_start_at_cluster_time.js20
-rw-r--r--jstests/change_streams/include_cluster_time.js9
-rw-r--r--jstests/multiVersion/change_streams_feature_compatibility_version.js9
-rw-r--r--jstests/sharding/resume_change_stream.js2
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.cpp20
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_test.cpp10
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_transform.cpp9
-rw-r--r--src/mongo/db/pipeline/document_sources.idl12
-rw-r--r--src/mongo/shell/collection.js6
-rw-r--r--src/mongo/shell/db.js6
-rw-r--r--src/mongo/shell/mongo.js6
13 files changed, 57 insertions, 63 deletions
diff --git a/jstests/change_streams/change_stream_apply_ops.js b/jstests/change_streams/change_stream_apply_ops.js
index 28948ed84b5..0f954578e52 100644
--- a/jstests/change_streams/change_stream_apply_ops.js
+++ b/jstests/change_streams/change_stream_apply_ops.js
@@ -116,8 +116,7 @@
// Verify that a whole-db stream returns the expected sequence of changes, including the insert
// on the other collection but NOT the changes on the other DB or the manual applyOps.
changeStream = cst.startWatchingChanges({
- pipeline:
- [{$changeStream: {startAtClusterTime: {ts: startTime}}}, {$project: {"lsid.uid": 0}}],
+ pipeline: [{$changeStream: {startAtOperationTime: startTime}}, {$project: {"lsid.uid": 0}}],
collection: 1
});
cst.assertNextChangesEqual(
@@ -138,7 +137,7 @@
cst = new ChangeStreamTest(db.getSiblingDB("admin"));
changeStream = cst.startWatchingChanges({
pipeline: [
- {$changeStream: {startAtClusterTime: {ts: startTime}, allChangesForCluster: true}},
+ {$changeStream: {startAtOperationTime: startTime, allChangesForCluster: true}},
{$project: {"lsid.uid": 0}}
],
collection: 1
diff --git a/jstests/change_streams/change_stream_shell_helper.js b/jstests/change_streams/change_stream_shell_helper.js
index 0150e1ba39d..6ecb95221eb 100644
--- a/jstests/change_streams/change_stream_shell_helper.js
+++ b/jstests/change_streams/change_stream_shell_helper.js
@@ -75,7 +75,7 @@
// Store the cluster time of the insert as the timestamp to start from.
const resumeTime =
assert.commandWorked(db.runCommand({insert: coll.getName(), documents: [{_id: 1, x: 1}]}))
- .$clusterTime.clusterTime;
+ .operationTime;
checkNextChange(changeStreamCursor, {docId: 1});
@@ -84,9 +84,9 @@
coll.watch([{$project: {_id: 0, docId: "$documentKey._id"}}], {resumeAfter: resumeToken});
checkNextChange(changeStreamCursor, {docId: 1});
- jsTestLog("Testing watch() with pipeline and startAtClusterTime");
+ jsTestLog("Testing watch() with pipeline and startAtOperationTime");
changeStreamCursor = coll.watch([{$project: {_id: 0, docId: "$documentKey._id"}}],
- {startAtClusterTime: {ts: resumeTime}});
+ {startAtOperationTime: resumeTime});
checkNextChange(changeStreamCursor, {docId: 1});
jsTestLog("Testing watch() with updateLookup");
diff --git a/jstests/change_streams/change_stream_start_at_cluster_time.js b/jstests/change_streams/change_stream_start_at_cluster_time.js
index dc6f64bb551..3fb6786437a 100644
--- a/jstests/change_streams/change_stream_start_at_cluster_time.js
+++ b/jstests/change_streams/change_stream_start_at_cluster_time.js
@@ -20,9 +20,9 @@
let next = streamToFindClusterTime.next();
assert.eq(next.operationType, "update");
assert.eq(next.documentKey, {_id: -1});
- const clusterTimeOfFirstUpdate = next.clusterTime;
+ const timeOfFirstUpdate = next.clusterTime;
- let changeStream = coll.watch([], {startAtClusterTime: {ts: clusterTimeOfFirstUpdate}});
+ let changeStream = coll.watch([], {startAtOperationTime: timeOfFirstUpdate});
// Test that starting at the cluster time is inclusive of the first update, so we should see
// both updates in the new stream.
@@ -36,14 +36,12 @@
assert.eq(next.operationType, "update", tojson(next));
assert.eq(next.documentKey._id, 1, tojson(next));
- // Test that startAtClusterTime is not allowed alongside resumeAfter or
+ // Test that startAtOperationTime is not allowed alongside resumeAfter or
// $_resumeAfterClusterTime.
assert.commandFailedWithCode(db.runCommand({
aggregate: coll.getName(),
- pipeline: [{
- $changeStream:
- {startAtClusterTime: {ts: clusterTimeOfFirstUpdate}, resumeAfter: next._id}
- }],
+ pipeline:
+ [{$changeStream: {startAtOperationTime: timeOfFirstUpdate, resumeAfter: next._id}}],
cursor: {}
}),
40674);
@@ -52,8 +50,8 @@
aggregate: coll.getName(),
pipeline: [{
$changeStream: {
- startAtClusterTime: {ts: clusterTimeOfFirstUpdate},
- $_resumeAfterClusterTime: {ts: clusterTimeOfFirstUpdate}
+ startAtOperationTime: timeOfFirstUpdate,
+ $_resumeAfterClusterTime: {ts: timeOfFirstUpdate}
}
}],
cursor: {}
@@ -65,11 +63,11 @@
resumeTimeFarFuture =
new Timestamp(resumeTimeFarFuture.getTime() + 60 * 60 * 6, 1); // 6 hours in the future
- let changeStreamFuture = coll.watch([], {startAtClusterTime: {ts: resumeTimeFarFuture}});
+ let changeStreamFuture = coll.watch([], {startAtOperationTime: resumeTimeFarFuture});
// Resume the change stream from the start of the test and verify it picks up the changes to the
// collection. Namely, it should see two inserts followed by two updates.
- changeStream = coll.watch([], {startAtClusterTime: {ts: testStartTime}});
+ changeStream = coll.watch([], {startAtOperationTime: testStartTime});
assert.soon(() => changeStream.hasNext());
next = changeStream.next();
assert.eq(next.operationType, "insert", tojson(next));
diff --git a/jstests/change_streams/include_cluster_time.js b/jstests/change_streams/include_cluster_time.js
index 6d0c33785e8..4be37f1f338 100644
--- a/jstests/change_streams/include_cluster_time.js
+++ b/jstests/change_streams/include_cluster_time.js
@@ -10,21 +10,20 @@
const changeStream = coll.watch();
const insertClusterTime =
- assert.commandWorked(coll.runCommand("insert", {documents: [{_id: 0}]}))
- .$clusterTime.clusterTime;
+ assert.commandWorked(coll.runCommand("insert", {documents: [{_id: 0}]})).operationTime;
const updateClusterTime =
assert
.commandWorked(
coll.runCommand("update", {updates: [{q: {_id: 0}, u: {$set: {updated: true}}}]}))
- .$clusterTime.clusterTime;
+ .operationTime;
const deleteClusterTime =
assert.commandWorked(coll.runCommand("delete", {deletes: [{q: {_id: 0}, limit: 1}]}))
- .$clusterTime.clusterTime;
+ .operationTime;
const dropClusterTime =
- assert.commandWorked(db.runCommand({drop: coll.getName()})).$clusterTime.clusterTime;
+ assert.commandWorked(db.runCommand({drop: coll.getName()})).operationTime;
// Make sure each operation has a reasonable cluster time. Note that we should not assert
// that the cluster times are equal, because the cluster time returned from the command is
diff --git a/jstests/multiVersion/change_streams_feature_compatibility_version.js b/jstests/multiVersion/change_streams_feature_compatibility_version.js
index 470b534aafa..a4b77fb350b 100644
--- a/jstests/multiVersion/change_streams_feature_compatibility_version.js
+++ b/jstests/multiVersion/change_streams_feature_compatibility_version.js
@@ -68,8 +68,7 @@
assert.commandFailedWithCode(testDB.runCommand({
aggregate: coll.getName(),
- pipeline:
- [{$changeStream: {startAtClusterTime: {ts: failedResponse.$clusterTime.clusterTime}}}],
+ pipeline: [{$changeStream: {startAtOperationTime: failedResponse.operationTime}}],
cursor: {}
}),
40415);
@@ -102,7 +101,7 @@
// response to use later.
const startTime =
assert.commandWorked(adminDB.runCommand({setFeatureCompatibilityVersion: "4.0"}))
- .$clusterTime.clusterTime;
+ .operationTime;
// Test that we can now use 4.0 features to open a stream.
const wholeDbCursor =
@@ -110,7 +109,7 @@
const wholeClusterCursor = adminCST.startWatchingChanges(
{pipeline: [{$changeStream: {allChangesForCluster: true}}], collection: 1});
const cursorStartedWithTime = cst.startWatchingChanges({
- pipeline: [{$changeStream: {startAtClusterTime: {ts: startTime}}}],
+ pipeline: [{$changeStream: {startAtOperationTime: startTime}}],
collection: coll.getName()
});
@@ -197,7 +196,7 @@
assert.commandFailedWithCode(testDB.runCommand({
aggregate: coll.getName(),
- pipeline: [{$changeStream: {startAtClusterTime: {ts: startTime}}}],
+ pipeline: [{$changeStream: {startAtOperationTime: startTime}}],
cursor: {}
}),
ErrorCodes.QueryFeatureNotAllowed);
diff --git a/jstests/sharding/resume_change_stream.js b/jstests/sharding/resume_change_stream.js
index 4f224007389..53396973a7c 100644
--- a/jstests/sharding/resume_change_stream.js
+++ b/jstests/sharding/resume_change_stream.js
@@ -130,7 +130,7 @@
ChangeStreamTest.assertChangeStreamThrowsCode({
db: mongosDB,
collName: collToWatch,
- pipeline: [{$changeStream: {startAtClusterTime: {ts: resumeTimeFirstUpdate}}}],
+ pipeline: [{$changeStream: {startAtOperationTime: resumeTimeFirstUpdate}}],
expectedCode: 40576
});
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp
index 602fd1edd46..f285c29bca2 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream.cpp
@@ -423,24 +423,24 @@ void parseResumeOptions(const intrusive_ptr<ExpressionContext>& expCtx,
}
auto resumeAfterClusterTime = spec.getResumeAfterClusterTimeDeprecated();
- auto startAtClusterTime = spec.getStartAtClusterTime();
+ auto startAtOperationTime = spec.getStartAtOperationTime();
uassert(40674,
"Only one type of resume option is allowed, but multiple were found.",
- !(*resumeStageOut) || (!resumeAfterClusterTime && !startAtClusterTime));
+ !(*resumeStageOut) || (!resumeAfterClusterTime && !startAtOperationTime));
if (resumeAfterClusterTime) {
if (fcv >= ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo40) {
warning() << "The '$_resumeAfterClusterTime' option is deprecated, please use "
- "'startAtClusterTime' instead.";
+ "'startAtOperationTime' instead.";
}
*startFromOut = resumeAfterClusterTime->getTimestamp();
}
- // New field name starting in 4.0 is 'startAtClusterTime'.
- if (startAtClusterTime) {
+ // New field name starting in 4.0 is 'startAtOperationTime'.
+ if (startAtOperationTime) {
uassert(ErrorCodes::QueryFeatureNotAllowed,
- str::stream() << "The startAtClusterTime option is not allowed in the current "
+ str::stream() << "The startAtOperationTime option is not allowed in the current "
"feature compatibility version. See "
<< feature_compatibility_version_documentation::kCompatibilityLink
<< " for more information.",
@@ -448,12 +448,12 @@ void parseResumeOptions(const intrusive_ptr<ExpressionContext>& expCtx,
uassert(50573,
str::stream()
<< "Do not specify both "
- << DocumentSourceChangeStreamSpec::kStartAtClusterTimeFieldName
+ << DocumentSourceChangeStreamSpec::kStartAtOperationTimeFieldName
<< " and "
<< DocumentSourceChangeStreamSpec::kResumeAfterClusterTimeDeprecatedFieldName
<< " in a $changeStream stage.",
!resumeAfterClusterTime);
- *startFromOut = startAtClusterTime->getTimestamp();
+ *startFromOut = *startAtOperationTime;
*resumeStageOut = DocumentSourceShardCheckResumability::create(expCtx, **startFromOut);
}
}
@@ -532,9 +532,9 @@ BSONObj DocumentSourceChangeStream::replaceResumeTokenInCommand(const BSONObj or
pipeline[0][DocumentSourceChangeStream::kStageName].getDocument());
changeStreamStage[DocumentSourceChangeStreamSpec::kResumeAfterFieldName] = Value(resumeToken);
- // If the command was initially specified with a startAtClusterTime, we need to remove it
+ // If the command was initially specified with a startAtOperationTime, we need to remove it
// to use the new resume token.
- changeStreamStage[DocumentSourceChangeStreamSpec::kStartAtClusterTimeFieldName] = Value();
+ changeStreamStage[DocumentSourceChangeStreamSpec::kStartAtOperationTimeFieldName] = Value();
pipeline[0] =
Value(Document{{DocumentSourceChangeStream::kStageName, changeStreamStage.freeze()}});
MutableDocument newCmd(originalCmd);
diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
index 5616e829b9e..a8412e0d8bf 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
@@ -336,7 +336,7 @@ TEST_F(ChangeStreamStageTest, ShouldRejectBothResumeAfterClusterTimeAndResumeAft
40674);
}
-TEST_F(ChangeStreamStageTest, ShouldRejectBothStartAtClusterTimeAndResumeAfterOptions) {
+TEST_F(ChangeStreamStageTest, ShouldRejectBothStartAtOperationTimeAndResumeAfterOptions) {
auto expCtx = getExpCtx();
// Need to put the collection in the UUID catalog so the resume token is valid.
@@ -348,8 +348,8 @@ TEST_F(ChangeStreamStageTest, ShouldRejectBothStartAtClusterTimeAndResumeAfterOp
BSON(DSChangeStream::kStageName
<< BSON("resumeAfter"
<< makeResumeToken(kDefaultTs, testUuid(), BSON("x" << 2 << "_id" << 1))
- << "startAtClusterTime"
- << BSON("ts" << kDefaultTs)))
+ << "startAtOperationTime"
+ << kDefaultTs))
.firstElement(),
expCtx),
AssertionException,
@@ -366,8 +366,8 @@ TEST_F(ChangeStreamStageTest, ShouldRejectBothStartAtAndResumeAfterClusterTimeOp
ASSERT_THROWS_CODE(DSChangeStream::createFromBson(
BSON(DSChangeStream::kStageName
<< BSON("$_resumeAfterClusterTime" << BSON("ts" << kDefaultTs)
- << "startAtClusterTime"
- << BSON("ts" << kDefaultTs)))
+ << "startAtOperationTime"
+ << kDefaultTs))
.firstElement(),
expCtx),
AssertionException,
diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
index cb666b1ec8b..bb98f3bfffe 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
@@ -340,19 +340,18 @@ Value DocumentSourceChangeStreamTransform::serialize(
changeStreamOptions
[DocumentSourceChangeStreamSpec::kResumeAfterClusterTimeDeprecatedFieldName]
.missing() &&
- changeStreamOptions[DocumentSourceChangeStreamSpec::kStartAtClusterTimeFieldName]
+ changeStreamOptions[DocumentSourceChangeStreamSpec::kStartAtOperationTimeFieldName]
.missing()) {
MutableDocument newChangeStreamOptions(changeStreamOptions);
// Use the current cluster time plus 1 tick since the oplog query will include all
- // operations/commands equal to or greater than the 'startAtClusterTime' timestamp. In
+ // operations/commands equal to or greater than the 'startAtOperationTime' timestamp. In
// particular, avoid including the last operation that went through mongos in an attempt to
// match the behavior of a replica set more closely.
auto clusterTime = LogicalClock::get(pExpCtx->opCtx)->getClusterTime();
clusterTime.addTicks(1);
- newChangeStreamOptions[DocumentSourceChangeStreamSpec::kStartAtClusterTimeFieldName]
- [ResumeTokenClusterTime::kTimestampFieldName] =
- Value(clusterTime.asTimestamp());
+ newChangeStreamOptions[DocumentSourceChangeStreamSpec::kStartAtOperationTimeFieldName] =
+ Value(clusterTime.asTimestamp());
changeStreamOptions = newChangeStreamOptions.freeze();
}
return Value(Document{{getSourceName(), changeStreamOptions}});
diff --git a/src/mongo/db/pipeline/document_sources.idl b/src/mongo/db/pipeline/document_sources.idl
index 90a74777e70..3228ddc90a6 100644
--- a/src/mongo/db/pipeline/document_sources.idl
+++ b/src/mongo/db/pipeline/document_sources.idl
@@ -85,14 +85,14 @@ structs:
description: The cluster time after which we should start reporting changes.
Only one of resumeAfter and _resumeAfterClusterTime should be
specified. For internal use only. Deprecated in 4.0, use
- startAtClusterTime instead.
- startAtClusterTime:
- cpp_name: startAtClusterTime
- type: ResumeTokenClusterTime
+ startAtOperationTime instead.
+ startAtOperationTime:
+ cpp_name: startAtOperationTime
+ type: timestamp
optional: true
- description: The cluster time after which we should start reporting changes.
+ description: The operation time after which we should start reporting changes.
Only one of resumeAfter, _resumeAfterClusterTimeDeprecated, and
- startAtClusterTime should be specified.
+ startAtOperationTime should be specified.
fullDocument:
cpp_name: fullDocument
type: string
diff --git a/src/mongo/shell/collection.js b/src/mongo/shell/collection.js
index abec36729ea..2e0fed63321 100644
--- a/src/mongo/shell/collection.js
+++ b/src/mongo/shell/collection.js
@@ -1496,9 +1496,9 @@ DBCollection.prototype.watch = function(pipeline, options) {
delete options.resumeAfter;
}
- if (options.hasOwnProperty("startAtClusterTime")) {
- changeStreamStage.startAtClusterTime = options.startAtClusterTime;
- delete options.startAtClusterTime;
+ if (options.hasOwnProperty("startAtOperationTime")) {
+ changeStreamStage.startAtOperationTime = options.startAtOperationTime;
+ delete options.startAtOperationTime;
}
pipeline.unshift({$changeStream: changeStreamStage});
diff --git a/src/mongo/shell/db.js b/src/mongo/shell/db.js
index 73558eef336..9206aaa67db 100644
--- a/src/mongo/shell/db.js
+++ b/src/mongo/shell/db.js
@@ -1932,9 +1932,9 @@ var DB;
delete options.resumeAfter;
}
- if (options.hasOwnProperty("startAtClusterTime")) {
- changeStreamStage.startAtClusterTime = options.startAtClusterTime;
- delete options.startAtClusterTime;
+ if (options.hasOwnProperty("startAtOperationTime")) {
+ changeStreamStage.startAtOperationTime = options.startAtOperationTime;
+ delete options.startAtOperationTime;
}
pipeline.unshift({$changeStream: changeStreamStage});
diff --git a/src/mongo/shell/mongo.js b/src/mongo/shell/mongo.js
index db7d09b672f..98c5676b554 100644
--- a/src/mongo/shell/mongo.js
+++ b/src/mongo/shell/mongo.js
@@ -493,9 +493,9 @@ Mongo.prototype.watch = function(pipeline, options) {
delete options.resumeAfter;
}
- if (options.hasOwnProperty("startAtClusterTime")) {
- changeStreamStage.startAtClusterTime = options.startAtClusterTime;
- delete options.startAtClusterTime;
+ if (options.hasOwnProperty("startAtOperationTime")) {
+ changeStreamStage.startAtOperationTime = options.startAtOperationTime;
+ delete options.startAtOperationTime;
}
pipeline.unshift({$changeStream: changeStreamStage});