summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Zolnierz <nicholas.zolnierz@mongodb.com>2018-05-09 12:25:35 -0400
committerNick Zolnierz <nicholas.zolnierz@mongodb.com>2018-06-04 12:09:50 -0400
commit753cedd024b9f4fe1a83632db792f29d3a7e4454 (patch)
tree0cd094ebb24806358e02d111cc47dadf113484ff
parent851c59e7bc5b54c0cf5feb683398a0eb6dffc20f (diff)
downloadmongo-753cedd024b9f4fe1a83632db792f29d3a7e4454.tar.gz
SERVER-34705: Whole-DB or whole-cluster change streams may not provide a total ordering if resumed after a drop
(cherry picked from commit 55f4dbf94a1cce9d8642af9bba9ac4cc77627293)
-rw-r--r--jstests/sharding/change_stream_invalidation.js34
-rw-r--r--jstests/sharding/change_streams_primary_shard_unaware.js2
-rw-r--r--jstests/sharding/change_streams_whole_db.js34
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_test.cpp189
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_transform.cpp31
5 files changed, 265 insertions, 25 deletions
diff --git a/jstests/sharding/change_stream_invalidation.js b/jstests/sharding/change_stream_invalidation.js
index 396db586c4f..eb51724f8ea 100644
--- a/jstests/sharding/change_stream_invalidation.js
+++ b/jstests/sharding/change_stream_invalidation.js
@@ -30,50 +30,53 @@
assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()}));
st.ensurePrimaryShard(mongosDB.getName(), st.rs0.getURL());
- // Shard the test collection on _id.
+ // Shard the test collection on a field called 'shardKey'.
assert.commandWorked(
- mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {_id: 1}}));
+ mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {shardKey: 1}}));
// Split the collection into 2 chunks: [MinKey, 0), [0, MaxKey].
assert.commandWorked(
- mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {_id: 0}}));
+ mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {shardKey: 0}}));
// Move the [0, MaxKey] chunk to st.shard1.shardName.
assert.commandWorked(mongosDB.adminCommand(
- {moveChunk: mongosColl.getFullName(), find: {_id: 1}, to: st.rs1.getURL()}));
+ {moveChunk: mongosColl.getFullName(), find: {shardKey: 1}, to: st.rs1.getURL()}));
// Write a document to each chunk.
- assert.writeOK(mongosColl.insert({_id: -1}, {writeConcern: {w: "majority"}}));
- assert.writeOK(mongosColl.insert({_id: 1}, {writeConcern: {w: "majority"}}));
+ assert.writeOK(mongosColl.insert({shardKey: -1, _id: -1}, {writeConcern: {w: "majority"}}));
+ assert.writeOK(mongosColl.insert({shardKey: 1, _id: 1}, {writeConcern: {w: "majority"}}));
- let changeStream = mongosColl.aggregate([{$changeStream: {}}]);
+ let changeStream = mongosColl.watch();
// We awaited the replication of the first writes, so the change stream shouldn't return them.
- assert.writeOK(mongosColl.update({_id: -1}, {$set: {updated: true}}));
- assert.writeOK(mongosColl.update({_id: 1}, {$set: {updated: true}}));
+ assert.writeOK(mongosColl.update({shardKey: -1, _id: -1}, {$set: {updated: true}}));
+ assert.writeOK(mongosColl.update({shardKey: 1, _id: 1}, {$set: {updated: true}}));
+ assert.writeOK(mongosColl.insert({shardKey: 2, _id: 2}));
// Drop the collection and test that we return "invalidate" entry and close the cursor.
mongosColl.drop();
- st.rs0.awaitReplication();
- st.rs1.awaitReplication();
// Test that we see the two writes that happened before the invalidation.
assert.soon(() => changeStream.hasNext());
let next = changeStream.next();
assert.eq(next.operationType, "update");
- assert.eq(next.documentKey._id, -1);
+ assert.eq(next.documentKey.shardKey, -1);
const resumeTokenFromFirstUpdate = next._id;
assert.soon(() => changeStream.hasNext());
next = changeStream.next();
assert.eq(next.operationType, "update");
- assert.eq(next.documentKey._id, 1);
+ assert.eq(next.documentKey.shardKey, 1);
assert.soon(() => changeStream.hasNext());
next = changeStream.next();
- assert.eq(next.operationType, "invalidate");
+ assert.eq(next.operationType, "insert");
+ assert.eq(next.documentKey, {_id: 2});
- assert(!changeStream.hasNext(), "expected invalidation to cause the cursor to be closed");
+ assert.soon(() => changeStream.hasNext());
+ next = changeStream.next();
+ assert.eq(next.operationType, "invalidate");
+ assert(changeStream.isExhausted());
// Test that it is not possible to resume a change stream after a collection has been dropped.
// Once it's been dropped, we won't be able to figure out the shard key.
@@ -82,7 +85,6 @@
assert.commandFailedWithCode(mongosDB.runCommand({
aggregate: mongosColl.getName(),
pipeline: [{$changeStream: {resumeAfter: resumeTokenFromFirstUpdate}}],
- readConcern: {level: "majority"},
cursor: {}
}),
40615);
diff --git a/jstests/sharding/change_streams_primary_shard_unaware.js b/jstests/sharding/change_streams_primary_shard_unaware.js
index ec4027bb800..89f8d0ebf6d 100644
--- a/jstests/sharding/change_streams_primary_shard_unaware.js
+++ b/jstests/sharding/change_streams_primary_shard_unaware.js
@@ -168,7 +168,7 @@
cstMongos2.assertNextChangesEqual({
cursor: cursorMongos2,
expectedChanges: [{
- documentKey: {_id: 2},
+ documentKey: {_id: 2, a: 2},
fullDocument: {_id: 2, a: 2},
ns: {db: mongos2DB.getName(), coll: mongos2Coll.getName()},
operationType: "insert",
diff --git a/jstests/sharding/change_streams_whole_db.js b/jstests/sharding/change_streams_whole_db.js
index d43294e7772..bc7d559610a 100644
--- a/jstests/sharding/change_streams_whole_db.js
+++ b/jstests/sharding/change_streams_whole_db.js
@@ -5,6 +5,7 @@
load('jstests/replsets/libs/two_phase_drops.js'); // For TwoPhaseDropCollectionTest.
load('jstests/aggregation/extras/utils.js'); // For assertErrorCode().
load('jstests/libs/change_stream_util.js'); // For ChangeStreamTest.
+ load("jstests/libs/collection_drop_recreate.js"); // For assertDropCollection.
// For supportsMajorityReadConcern().
load("jstests/multiVersion/libs/causal_consistency_helpers.js");
@@ -151,7 +152,38 @@
operationType: "insert",
},
];
- cst.assertNextChangesEqual({cursor: cursor, expectedChanges: expected});
+
+ const results = cst.assertNextChangesEqual({cursor: cursor, expectedChanges: expected});
+ // Store the resume token of the first insert to use after dropping the collection.
+ const resumeTokenBeforeDrop = results[0]._id;
+
+ // Write one more document to the collection that will be dropped, to be returned after
+ // resuming.
+ assert.writeOK(mongosCollShardedOnX.insert({_id: 4, x: 4}));
+
+ // Drop the collection, invalidating the open change stream.
+ assertDropCollection(mongosDB, mongosCollShardedOnX.getName());
+
+ // Resume the change stream from before the collection drop, and verify that the documentKey
+ // field contains the extracted shard key from the resume token.
+ cursor = cst.startWatchingChanges({
+ pipeline: [
+ {$changeStream: {resumeAfter: resumeTokenBeforeDrop}},
+ {$match: {"ns.coll": mongosCollShardedOnX.getName()}}
+ ],
+ collection: 1
+ });
+ cst.assertNextChangesEqual({
+ cursor: cursor,
+ expectedChanges: [
+ {
+ documentKey: {_id: 4, x: 4},
+ fullDocument: {_id: 4, x: 4},
+ ns: {db: mongosDB.getName(), coll: mongosCollShardedOnX.getName()},
+ operationType: "insert",
+ },
+ ]
+ });
cst.cleanUp();
diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
index a8412e0d8bf..1cb939b573a 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
@@ -73,6 +73,7 @@ using DSChangeStream = DocumentSourceChangeStream;
static const Timestamp kDefaultTs(100, 1);
static const repl::OpTime kDefaultOpTime(kDefaultTs, 1);
static const NamespaceString nss("unittests.change_stream");
+static const BSONObj kDefaultSpec = fromjson("{$changeStream: {}}");
class ChangeStreamStageTestNoSetup : public AggregationContextFixture {
public:
@@ -88,7 +89,7 @@ struct MockMongoInterface final : public StubMongoProcessInterface {
std::pair<std::vector<FieldPath>, bool> collectDocumentKeyFields(OperationContext*,
UUID) const final {
- return {_fields, true};
+ return {_fields, false};
}
std::vector<FieldPath> _fields;
@@ -107,8 +108,9 @@ public:
void checkTransformation(const OplogEntry& entry,
const boost::optional<Document> expectedDoc,
- std::vector<FieldPath> docKeyFields = {}) {
- vector<intrusive_ptr<DocumentSource>> stages = makeStages(entry);
+ std::vector<FieldPath> docKeyFields,
+ const BSONObj& spec) {
+ vector<intrusive_ptr<DocumentSource>> stages = makeStages(entry.toBSON(), spec);
auto transform = stages[2].get();
getExpCtx()->mongoProcessInterface = stdx::make_unique<MockMongoInterface>(docKeyFields);
@@ -121,12 +123,17 @@ public:
}
}
+ void checkTransformation(const OplogEntry& entry,
+ const boost::optional<Document> expectedDoc,
+ std::vector<FieldPath> docKeyFields = {}) {
+ return checkTransformation(entry, expectedDoc, docKeyFields, kDefaultSpec);
+ }
+
/**
* Returns a list of stages expanded from a $changStream specification, starting with a
* DocumentSourceMock which contains a single document representing 'entry'.
*/
- vector<intrusive_ptr<DocumentSource>> makeStages(const BSONObj& entry) {
- const auto spec = fromjson("{$changeStream: {}}");
+ vector<intrusive_ptr<DocumentSource>> makeStages(const BSONObj& entry, const BSONObj& spec) {
list<intrusive_ptr<DocumentSource>> result =
DSChangeStream::createFromBson(spec.firstElement(), getExpCtx());
vector<intrusive_ptr<DocumentSource>> stages(std::begin(result), std::end(result));
@@ -160,7 +167,7 @@ public:
}
vector<intrusive_ptr<DocumentSource>> makeStages(const OplogEntry& entry) {
- return makeStages(entry.toBSON());
+ return makeStages(entry.toBSON(), kDefaultSpec);
}
OplogEntry createCommand(const BSONObj& oField,
@@ -207,7 +214,7 @@ public:
BSONObj oplogEntry = builder.done();
// Create the stages and check that the documents produced matched those in the applyOps.
- vector<intrusive_ptr<DocumentSource>> stages = makeStages(oplogEntry);
+ vector<intrusive_ptr<DocumentSource>> stages = makeStages(oplogEntry, kDefaultSpec);
auto transform = stages[2].get();
invariant(dynamic_cast<DocumentSourceChangeStreamTransform*>(transform) != nullptr);
@@ -994,6 +1001,91 @@ TEST_F(ChangeStreamStageTest, CloseCursorEvenIfInvalidateEntriesGetFilteredOut)
ASSERT_THROWS(match->getNext(), ExceptionFor<ErrorCodes::CloseChangeStream>);
}
+TEST_F(ChangeStreamStageTest, DocumentKeyShouldIncludeShardKeyFromResumeToken) {
+ const Timestamp ts(3, 45);
+ const long long term = 4;
+ const auto opTime = repl::OpTime(ts, term);
+ const auto uuid = testUuid();
+
+ Collection collection(stdx::make_unique<CollectionMock>(nss));
+ UUIDCatalog::get(getExpCtx()->opCtx).onCreateCollection(getExpCtx()->opCtx, &collection, uuid);
+
+ BSONObj o2 = BSON("_id" << 1 << "shardKey" << 2);
+ auto resumeToken = makeResumeToken(ts, uuid, o2);
+
+ BSONObj insertDoc = BSON("_id" << 2 << "shardKey" << 3);
+ auto insertEntry = makeOplogEntry(OpTypeEnum::kInsert, // op type
+ nss, // namespace
+ insertDoc, // o
+ uuid, // uuid
+ boost::none, // fromMigrate
+ boost::none, // o2
+ opTime); // opTime
+
+ Document expectedInsert{
+ {DSChangeStream::kIdField, makeResumeToken(ts, uuid, insertDoc)},
+ {DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType},
+ {DSChangeStream::kClusterTimeField, ts},
+ {DSChangeStream::kFullDocumentField, D{{"_id", 2}, {"shardKey", 3}}},
+ {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
+ {DSChangeStream::kDocumentKeyField, D{{"_id", 2}, {"shardKey", 3}}},
+ };
+ // Although the chunk manager and sharding catalog are not aware of the shard key in this test,
+ // the expectation is for the $changeStream stage to infer the shard key from the resume token.
+ checkTransformation(insertEntry,
+ expectedInsert,
+ {},
+ BSON("$changeStream" << BSON("resumeAfter" << resumeToken)));
+}
+
+TEST_F(ChangeStreamStageTest, DocumentKeyShouldNotIncludeShardKeyFieldsIfNotPresentInOplogEntry) {
+ const Timestamp ts(3, 45);
+ const long long term = 4;
+ const auto opTime = repl::OpTime(ts, term);
+ const auto uuid = testUuid();
+
+ Collection collection(stdx::make_unique<CollectionMock>(nss));
+ UUIDCatalog::get(getExpCtx()->opCtx).onCreateCollection(getExpCtx()->opCtx, &collection, uuid);
+
+ BSONObj o2 = BSON("_id" << 1 << "shardKey" << 2);
+ auto resumeToken = makeResumeToken(ts, uuid, o2);
+
+ // Note that the 'o' field in the oplog entry does not contain the shard key field.
+ BSONObj insertDoc = BSON("_id" << 2);
+ auto insertEntry = makeOplogEntry(OpTypeEnum::kInsert, // op type
+ nss, // namespace
+ insertDoc, // o
+ uuid, // uuid
+ boost::none, // fromMigrate
+ boost::none, // o2
+ opTime); // opTime
+
+ Document expectedInsert{
+ {DSChangeStream::kIdField, makeResumeToken(ts, uuid, insertDoc)},
+ {DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType},
+ {DSChangeStream::kClusterTimeField, ts},
+ {DSChangeStream::kFullDocumentField, D{{"_id", 2}}},
+ {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
+ {DSChangeStream::kDocumentKeyField, D{{"_id", 2}}},
+ };
+ checkTransformation(insertEntry,
+ expectedInsert,
+ {},
+ BSON("$changeStream" << BSON("resumeAfter" << resumeToken)));
+}
+
+TEST_F(ChangeStreamStageTest, DocumentKeyShouldNotIncludeShardKeyIfResumeTokenDoesntContainUUID) {
+ auto resumeToken = makeResumeToken(Timestamp(3, 45));
+
+ // TODO SERVER-34710: Allow resuming from an "invalidate" will change this behavior.
+ ASSERT_THROWS_CODE(
+ DSChangeStream::createFromBson(
+ BSON(DSChangeStream::kStageName << BSON("resumeAfter" << resumeToken)).firstElement(),
+ getExpCtx()),
+ AssertionException,
+ 40645);
+}
+
//
// Test class for change stream of a single database.
//
@@ -1257,5 +1349,88 @@ TEST_F(ChangeStreamStageDBTest, MatchFiltersCreateIndex) {
checkTransformation(createIndex, boost::none);
}
+TEST_F(ChangeStreamStageDBTest, DocumentKeyShouldIncludeShardKeyFromResumeToken) {
+ const Timestamp ts(3, 45);
+ const long long term = 4;
+ const auto opTime = repl::OpTime(ts, term);
+ const auto uuid = testUuid();
+
+ Collection collection(stdx::make_unique<CollectionMock>(nss));
+ UUIDCatalog::get(getExpCtx()->opCtx).onCreateCollection(getExpCtx()->opCtx, &collection, uuid);
+
+ BSONObj o2 = BSON("_id" << 1 << "shardKey" << 2);
+ auto resumeToken = makeResumeToken(ts, uuid, o2);
+
+ BSONObj insertDoc = BSON("_id" << 2 << "shardKey" << 3);
+ auto insertEntry = makeOplogEntry(OpTypeEnum::kInsert, // op type
+ nss, // namespace
+ insertDoc, // o
+ uuid, // uuid
+ boost::none, // fromMigrate
+ boost::none, // o2
+ opTime); // opTime
+
+ Document expectedInsert{
+ {DSChangeStream::kIdField, makeResumeToken(ts, uuid, insertDoc)},
+ {DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType},
+ {DSChangeStream::kClusterTimeField, ts},
+ {DSChangeStream::kFullDocumentField, D{{"_id", 2}, {"shardKey", 3}}},
+ {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
+ {DSChangeStream::kDocumentKeyField, D{{"_id", 2}, {"shardKey", 3}}},
+ };
+ checkTransformation(insertEntry,
+ expectedInsert,
+ {},
+ BSON("$changeStream" << BSON("resumeAfter" << resumeToken)));
+}
+
+TEST_F(ChangeStreamStageDBTest, DocumentKeyShouldNotIncludeShardKeyFieldsIfNotPresentInOplogEntry) {
+ const Timestamp ts(3, 45);
+ const long long term = 4;
+ const auto opTime = repl::OpTime(ts, term);
+ const auto uuid = testUuid();
+
+ Collection collection(stdx::make_unique<CollectionMock>(nss));
+ UUIDCatalog::get(getExpCtx()->opCtx).onCreateCollection(getExpCtx()->opCtx, &collection, uuid);
+
+ BSONObj o2 = BSON("_id" << 1 << "shardKey" << 2);
+ auto resumeToken = makeResumeToken(ts, uuid, o2);
+
+ // Note that the 'o' field in the oplog entry does not contain the shard key field.
+ BSONObj insertDoc = BSON("_id" << 2);
+ auto insertEntry = makeOplogEntry(OpTypeEnum::kInsert, // op type
+ nss, // namespace
+ insertDoc, // o
+ uuid, // uuid
+ boost::none, // fromMigrate
+ boost::none, // o2
+ opTime); // opTime
+
+ Document expectedInsert{
+ {DSChangeStream::kIdField, makeResumeToken(ts, uuid, insertDoc)},
+ {DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType},
+ {DSChangeStream::kClusterTimeField, ts},
+ {DSChangeStream::kFullDocumentField, D{{"_id", 2}}},
+ {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
+ {DSChangeStream::kDocumentKeyField, D{{"_id", 2}}},
+ };
+ checkTransformation(insertEntry,
+ expectedInsert,
+ {},
+ BSON("$changeStream" << BSON("resumeAfter" << resumeToken)));
+}
+
+TEST_F(ChangeStreamStageDBTest, DocumentKeyShouldNotIncludeShardKeyIfResumeTokenDoesntContainUUID) {
+ auto resumeToken = makeResumeToken(Timestamp(3, 45));
+
+ // TODO SERVER-34710: Allow resuming from an "invalidate" will change this behavior.
+ ASSERT_THROWS_CODE(
+ DSChangeStream::createFromBson(
+ BSON(DSChangeStream::kStageName << BSON("resumeAfter" << resumeToken)).firstElement(),
+ getExpCtx()),
+ AssertionException,
+ 40645);
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
index bb98f3bfffe..9e7e623eeab 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
@@ -81,6 +81,37 @@ DocumentSourceChangeStreamTransform::DocumentSourceChangeStreamTransform(
_isIndependentOfAnyCollection(isIndependentOfAnyCollection) {
_nsRegex.emplace(DocumentSourceChangeStream::getNsRegexForChangeStream(expCtx->ns));
+
+ auto spec = DocumentSourceChangeStreamSpec::parse(IDLParserErrorContext("$changeStream"),
+ _changeStreamSpec);
+
+ // If the change stream spec includes a resumeToken with a shard key, populate the document key
+ // cache with the field paths.
+ if (auto resumeAfter = spec.getResumeAfter()) {
+ ResumeToken token = resumeAfter.get();
+ ResumeTokenData tokenData = token.getData();
+
+ // TODO SERVER-34710: Resuming from an "invalidate" means that the resume token may not
+ // always contain a UUID.
+ invariant(tokenData.uuid);
+ if (!tokenData.documentKey.missing()) {
+ std::vector<FieldPath> docKeyFields;
+ auto docKey = tokenData.documentKey.getDocument();
+
+ auto iter = docKey.fieldIterator();
+ while (iter.more()) {
+ auto fieldPair = iter.next();
+ docKeyFields.push_back(fieldPair.first);
+ }
+
+ // If the document key from the resume token has more than one field, that means it
+ // includes the shard key and thus should never change.
+ auto isFinal = docKey.size() > 1;
+
+ _documentKeyCache[tokenData.uuid.get()] =
+ DocumentKeyCacheEntry({docKeyFields, isFinal});
+ }
+ }
}
DocumentSource::StageConstraints DocumentSourceChangeStreamTransform::constraints(