summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Zolnierz <nicholas.zolnierz@mongodb.com>2018-05-09 12:25:35 -0400
committerNick Zolnierz <nicholas.zolnierz@mongodb.com>2018-05-29 17:06:54 -0400
commit55f4dbf94a1cce9d8642af9bba9ac4cc77627293 (patch)
tree5c3ca284176dd15536251b76797a2c89354f83d8
parent1e9a55f9bba4909732ba0b06bd3547df152864bf (diff)
downloadmongo-55f4dbf94a1cce9d8642af9bba9ac4cc77627293.tar.gz
SERVER-34705: Whole-DB or whole-cluster change streams may not provide a total ordering if resumed after a drop
-rw-r--r--jstests/sharding/change_stream_invalidation.js4
-rw-r--r--jstests/sharding/change_streams_primary_shard_unaware.js2
-rw-r--r--jstests/sharding/change_streams_whole_db.js34
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_test.cpp189
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_transform.cpp31
5 files changed, 248 insertions, 12 deletions
diff --git a/jstests/sharding/change_stream_invalidation.js b/jstests/sharding/change_stream_invalidation.js
index ebfc59cd8c6..a50d8eb88ea 100644
--- a/jstests/sharding/change_stream_invalidation.js
+++ b/jstests/sharding/change_stream_invalidation.js
@@ -71,8 +71,6 @@
assert.soon(() => changeStream.hasNext());
next = changeStream.next();
assert.eq(next.operationType, "insert");
- // TODO SERVER-34705: Extract the shard key from the resume token and include in the documentKey
- // for inserts.
assert.eq(next.documentKey, {_id: 2});
assert.soon(() => changeStream.hasNext());
@@ -93,7 +91,7 @@
assert.soon(() => changeStream.hasNext());
next = changeStream.next();
assert.eq(next.operationType, "insert");
- assert.eq(next.documentKey, {_id: 2});
+ assert.eq(next.documentKey, {shardKey: 2, _id: 2});
assert.soon(() => changeStream.hasNext());
next = changeStream.next();
diff --git a/jstests/sharding/change_streams_primary_shard_unaware.js b/jstests/sharding/change_streams_primary_shard_unaware.js
index f59775cacb1..6975678d18d 100644
--- a/jstests/sharding/change_streams_primary_shard_unaware.js
+++ b/jstests/sharding/change_streams_primary_shard_unaware.js
@@ -165,7 +165,7 @@
cstMongos2.assertNextChangesEqual({
cursor: cursorMongos2,
expectedChanges: [{
- documentKey: {_id: 2},
+ documentKey: {_id: 2, a: 2},
fullDocument: {_id: 2, a: 2},
ns: {db: mongos2DB.getName(), coll: mongos2Coll.getName()},
operationType: "insert",
diff --git a/jstests/sharding/change_streams_whole_db.js b/jstests/sharding/change_streams_whole_db.js
index d43294e7772..bc7d559610a 100644
--- a/jstests/sharding/change_streams_whole_db.js
+++ b/jstests/sharding/change_streams_whole_db.js
@@ -5,6 +5,7 @@
load('jstests/replsets/libs/two_phase_drops.js'); // For TwoPhaseDropCollectionTest.
load('jstests/aggregation/extras/utils.js'); // For assertErrorCode().
load('jstests/libs/change_stream_util.js'); // For ChangeStreamTest.
+ load("jstests/libs/collection_drop_recreate.js"); // For assertDropCollection.
// For supportsMajorityReadConcern().
load("jstests/multiVersion/libs/causal_consistency_helpers.js");
@@ -151,7 +152,38 @@
operationType: "insert",
},
];
- cst.assertNextChangesEqual({cursor: cursor, expectedChanges: expected});
+
+ const results = cst.assertNextChangesEqual({cursor: cursor, expectedChanges: expected});
+ // Store the resume token of the first insert to use after dropping the collection.
+ const resumeTokenBeforeDrop = results[0]._id;
+
+ // Write one more document to the collection that will be dropped, to be returned after
+ // resuming.
+ assert.writeOK(mongosCollShardedOnX.insert({_id: 4, x: 4}));
+
+ // Drop the collection, invalidating the open change stream.
+ assertDropCollection(mongosDB, mongosCollShardedOnX.getName());
+
+ // Resume the change stream from before the collection drop, and verify that the documentKey
+ // field contains the extracted shard key from the resume token.
+ cursor = cst.startWatchingChanges({
+ pipeline: [
+ {$changeStream: {resumeAfter: resumeTokenBeforeDrop}},
+ {$match: {"ns.coll": mongosCollShardedOnX.getName()}}
+ ],
+ collection: 1
+ });
+ cst.assertNextChangesEqual({
+ cursor: cursor,
+ expectedChanges: [
+ {
+ documentKey: {_id: 4, x: 4},
+ fullDocument: {_id: 4, x: 4},
+ ns: {db: mongosDB.getName(), coll: mongosCollShardedOnX.getName()},
+ operationType: "insert",
+ },
+ ]
+ });
cst.cleanUp();
diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
index 70a61705fbb..af12c25c7d8 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
@@ -73,6 +73,7 @@ using DSChangeStream = DocumentSourceChangeStream;
static const Timestamp kDefaultTs(100, 1);
static const repl::OpTime kDefaultOpTime(kDefaultTs, 1);
static const NamespaceString nss("unittests.change_stream");
+static const BSONObj kDefaultSpec = fromjson("{$changeStream: {}}");
class ChangeStreamStageTestNoSetup : public AggregationContextFixture {
public:
@@ -88,7 +89,7 @@ struct MockMongoInterface final : public StubMongoProcessInterface {
std::pair<std::vector<FieldPath>, bool> collectDocumentKeyFields(OperationContext*,
UUID) const final {
- return {_fields, true};
+ return {_fields, false};
}
std::vector<FieldPath> _fields;
@@ -110,8 +111,9 @@ public:
void checkTransformation(const OplogEntry& entry,
const boost::optional<Document> expectedDoc,
- std::vector<FieldPath> docKeyFields = {}) {
- vector<intrusive_ptr<DocumentSource>> stages = makeStages(entry);
+ std::vector<FieldPath> docKeyFields,
+ const BSONObj& spec) {
+ vector<intrusive_ptr<DocumentSource>> stages = makeStages(entry.toBSON(), spec);
auto transform = stages[2].get();
getExpCtx()->mongoProcessInterface = stdx::make_unique<MockMongoInterface>(docKeyFields);
@@ -124,12 +126,17 @@ public:
}
}
+ void checkTransformation(const OplogEntry& entry,
+ const boost::optional<Document> expectedDoc,
+ std::vector<FieldPath> docKeyFields = {}) {
+ return checkTransformation(entry, expectedDoc, docKeyFields, kDefaultSpec);
+ }
+
/**
* Returns a list of stages expanded from a $changStream specification, starting with a
* DocumentSourceMock which contains a single document representing 'entry'.
*/
- vector<intrusive_ptr<DocumentSource>> makeStages(const BSONObj& entry) {
- const auto spec = fromjson("{$changeStream: {}}");
+ vector<intrusive_ptr<DocumentSource>> makeStages(const BSONObj& entry, const BSONObj& spec) {
list<intrusive_ptr<DocumentSource>> result =
DSChangeStream::createFromBson(spec.firstElement(), getExpCtx());
vector<intrusive_ptr<DocumentSource>> stages(std::begin(result), std::end(result));
@@ -163,7 +170,7 @@ public:
}
vector<intrusive_ptr<DocumentSource>> makeStages(const OplogEntry& entry) {
- return makeStages(entry.toBSON());
+ return makeStages(entry.toBSON(), kDefaultSpec);
}
OplogEntry createCommand(const BSONObj& oField,
@@ -210,7 +217,7 @@ public:
BSONObj oplogEntry = builder.done();
// Create the stages and check that the documents produced matched those in the applyOps.
- vector<intrusive_ptr<DocumentSource>> stages = makeStages(oplogEntry);
+ vector<intrusive_ptr<DocumentSource>> stages = makeStages(oplogEntry, kDefaultSpec);
auto transform = stages[2].get();
invariant(dynamic_cast<DocumentSourceChangeStreamTransform*>(transform) != nullptr);
@@ -997,6 +1004,91 @@ TEST_F(ChangeStreamStageTest, CloseCursorEvenIfInvalidateEntriesGetFilteredOut)
ASSERT_THROWS(match->getNext(), ExceptionFor<ErrorCodes::CloseChangeStream>);
}
+TEST_F(ChangeStreamStageTest, DocumentKeyShouldIncludeShardKeyFromResumeToken) {
+ const Timestamp ts(3, 45);
+ const long long term = 4;
+ const auto opTime = repl::OpTime(ts, term);
+ const auto uuid = testUuid();
+
+ Collection collection(stdx::make_unique<CollectionMock>(nss));
+ UUIDCatalog::get(getExpCtx()->opCtx).onCreateCollection(getExpCtx()->opCtx, &collection, uuid);
+
+ BSONObj o2 = BSON("_id" << 1 << "shardKey" << 2);
+ auto resumeToken = makeResumeToken(ts, uuid, o2);
+
+ BSONObj insertDoc = BSON("_id" << 2 << "shardKey" << 3);
+ auto insertEntry = makeOplogEntry(OpTypeEnum::kInsert, // op type
+ nss, // namespace
+ insertDoc, // o
+ uuid, // uuid
+ boost::none, // fromMigrate
+ boost::none, // o2
+ opTime); // opTime
+
+ Document expectedInsert{
+ {DSChangeStream::kIdField, makeResumeToken(ts, uuid, insertDoc)},
+ {DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType},
+ {DSChangeStream::kClusterTimeField, ts},
+ {DSChangeStream::kFullDocumentField, D{{"_id", 2}, {"shardKey", 3}}},
+ {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
+ {DSChangeStream::kDocumentKeyField, D{{"_id", 2}, {"shardKey", 3}}},
+ };
+ // Although the chunk manager and sharding catalog are not aware of the shard key in this test,
+ // the expectation is for the $changeStream stage to infer the shard key from the resume token.
+ checkTransformation(insertEntry,
+ expectedInsert,
+ {},
+ BSON("$changeStream" << BSON("resumeAfter" << resumeToken)));
+}
+
+TEST_F(ChangeStreamStageTest, DocumentKeyShouldNotIncludeShardKeyFieldsIfNotPresentInOplogEntry) {
+ const Timestamp ts(3, 45);
+ const long long term = 4;
+ const auto opTime = repl::OpTime(ts, term);
+ const auto uuid = testUuid();
+
+ Collection collection(stdx::make_unique<CollectionMock>(nss));
+ UUIDCatalog::get(getExpCtx()->opCtx).onCreateCollection(getExpCtx()->opCtx, &collection, uuid);
+
+ BSONObj o2 = BSON("_id" << 1 << "shardKey" << 2);
+ auto resumeToken = makeResumeToken(ts, uuid, o2);
+
+ // Note that the 'o' field in the oplog entry does not contain the shard key field.
+ BSONObj insertDoc = BSON("_id" << 2);
+ auto insertEntry = makeOplogEntry(OpTypeEnum::kInsert, // op type
+ nss, // namespace
+ insertDoc, // o
+ uuid, // uuid
+ boost::none, // fromMigrate
+ boost::none, // o2
+ opTime); // opTime
+
+ Document expectedInsert{
+ {DSChangeStream::kIdField, makeResumeToken(ts, uuid, insertDoc)},
+ {DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType},
+ {DSChangeStream::kClusterTimeField, ts},
+ {DSChangeStream::kFullDocumentField, D{{"_id", 2}}},
+ {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
+ {DSChangeStream::kDocumentKeyField, D{{"_id", 2}}},
+ };
+ checkTransformation(insertEntry,
+ expectedInsert,
+ {},
+ BSON("$changeStream" << BSON("resumeAfter" << resumeToken)));
+}
+
+TEST_F(ChangeStreamStageTest, DocumentKeyShouldNotIncludeShardKeyIfResumeTokenDoesntContainUUID) {
+ auto resumeToken = makeResumeToken(Timestamp(3, 45));
+
+ // TODO SERVER-34710: Allow resuming from an "invalidate" will change this behavior.
+ ASSERT_THROWS_CODE(
+ DSChangeStream::createFromBson(
+ BSON(DSChangeStream::kStageName << BSON("resumeAfter" << resumeToken)).firstElement(),
+ getExpCtx()),
+ AssertionException,
+ 40645);
+}
+
//
// Test class for change stream of a single database.
//
@@ -1260,5 +1352,88 @@ TEST_F(ChangeStreamStageDBTest, MatchFiltersCreateIndex) {
checkTransformation(createIndex, boost::none);
}
+TEST_F(ChangeStreamStageDBTest, DocumentKeyShouldIncludeShardKeyFromResumeToken) {
+ const Timestamp ts(3, 45);
+ const long long term = 4;
+ const auto opTime = repl::OpTime(ts, term);
+ const auto uuid = testUuid();
+
+ Collection collection(stdx::make_unique<CollectionMock>(nss));
+ UUIDCatalog::get(getExpCtx()->opCtx).onCreateCollection(getExpCtx()->opCtx, &collection, uuid);
+
+ BSONObj o2 = BSON("_id" << 1 << "shardKey" << 2);
+ auto resumeToken = makeResumeToken(ts, uuid, o2);
+
+ BSONObj insertDoc = BSON("_id" << 2 << "shardKey" << 3);
+ auto insertEntry = makeOplogEntry(OpTypeEnum::kInsert, // op type
+ nss, // namespace
+ insertDoc, // o
+ uuid, // uuid
+ boost::none, // fromMigrate
+ boost::none, // o2
+ opTime); // opTime
+
+ Document expectedInsert{
+ {DSChangeStream::kIdField, makeResumeToken(ts, uuid, insertDoc)},
+ {DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType},
+ {DSChangeStream::kClusterTimeField, ts},
+ {DSChangeStream::kFullDocumentField, D{{"_id", 2}, {"shardKey", 3}}},
+ {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
+ {DSChangeStream::kDocumentKeyField, D{{"_id", 2}, {"shardKey", 3}}},
+ };
+ checkTransformation(insertEntry,
+ expectedInsert,
+ {},
+ BSON("$changeStream" << BSON("resumeAfter" << resumeToken)));
+}
+
+TEST_F(ChangeStreamStageDBTest, DocumentKeyShouldNotIncludeShardKeyFieldsIfNotPresentInOplogEntry) {
+ const Timestamp ts(3, 45);
+ const long long term = 4;
+ const auto opTime = repl::OpTime(ts, term);
+ const auto uuid = testUuid();
+
+ Collection collection(stdx::make_unique<CollectionMock>(nss));
+ UUIDCatalog::get(getExpCtx()->opCtx).onCreateCollection(getExpCtx()->opCtx, &collection, uuid);
+
+ BSONObj o2 = BSON("_id" << 1 << "shardKey" << 2);
+ auto resumeToken = makeResumeToken(ts, uuid, o2);
+
+ // Note that the 'o' field in the oplog entry does not contain the shard key field.
+ BSONObj insertDoc = BSON("_id" << 2);
+ auto insertEntry = makeOplogEntry(OpTypeEnum::kInsert, // op type
+ nss, // namespace
+ insertDoc, // o
+ uuid, // uuid
+ boost::none, // fromMigrate
+ boost::none, // o2
+ opTime); // opTime
+
+ Document expectedInsert{
+ {DSChangeStream::kIdField, makeResumeToken(ts, uuid, insertDoc)},
+ {DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType},
+ {DSChangeStream::kClusterTimeField, ts},
+ {DSChangeStream::kFullDocumentField, D{{"_id", 2}}},
+ {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
+ {DSChangeStream::kDocumentKeyField, D{{"_id", 2}}},
+ };
+ checkTransformation(insertEntry,
+ expectedInsert,
+ {},
+ BSON("$changeStream" << BSON("resumeAfter" << resumeToken)));
+}
+
+TEST_F(ChangeStreamStageDBTest, DocumentKeyShouldNotIncludeShardKeyIfResumeTokenDoesntContainUUID) {
+ auto resumeToken = makeResumeToken(Timestamp(3, 45));
+
+ // TODO SERVER-34710: Allow resuming from an "invalidate" will change this behavior.
+ ASSERT_THROWS_CODE(
+ DSChangeStream::createFromBson(
+ BSON(DSChangeStream::kStageName << BSON("resumeAfter" << resumeToken)).firstElement(),
+ getExpCtx()),
+ AssertionException,
+ 40645);
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
index bb98f3bfffe..9e7e623eeab 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
@@ -81,6 +81,37 @@ DocumentSourceChangeStreamTransform::DocumentSourceChangeStreamTransform(
_isIndependentOfAnyCollection(isIndependentOfAnyCollection) {
_nsRegex.emplace(DocumentSourceChangeStream::getNsRegexForChangeStream(expCtx->ns));
+
+ auto spec = DocumentSourceChangeStreamSpec::parse(IDLParserErrorContext("$changeStream"),
+ _changeStreamSpec);
+
+ // If the change stream spec includes a resumeToken with a shard key, populate the document key
+ // cache with the field paths.
+ if (auto resumeAfter = spec.getResumeAfter()) {
+ ResumeToken token = resumeAfter.get();
+ ResumeTokenData tokenData = token.getData();
+
+ // TODO SERVER-34710: Resuming from an "invalidate" means that the resume token may not
+ // always contain a UUID.
+ invariant(tokenData.uuid);
+ if (!tokenData.documentKey.missing()) {
+ std::vector<FieldPath> docKeyFields;
+ auto docKey = tokenData.documentKey.getDocument();
+
+ auto iter = docKey.fieldIterator();
+ while (iter.more()) {
+ auto fieldPair = iter.next();
+ docKeyFields.push_back(fieldPair.first);
+ }
+
+ // If the document key from the resume token has more than one field, that means it
+ // includes the shard key and thus should never change.
+ auto isFinal = docKey.size() > 1;
+
+ _documentKeyCache[tokenData.uuid.get()] =
+ DocumentKeyCacheEntry({docKeyFields, isFinal});
+ }
+ }
}
DocumentSource::StageConstraints DocumentSourceChangeStreamTransform::constraints(