summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorNick Zolnierz <nicholas.zolnierz@mongodb.com>2018-05-09 12:25:35 -0400
committerNick Zolnierz <nicholas.zolnierz@mongodb.com>2018-06-04 12:09:50 -0400
commit753cedd024b9f4fe1a83632db792f29d3a7e4454 (patch)
tree0cd094ebb24806358e02d111cc47dadf113484ff /src/mongo/db
parent851c59e7bc5b54c0cf5feb683398a0eb6dffc20f (diff)
downloadmongo-753cedd024b9f4fe1a83632db792f29d3a7e4454.tar.gz
SERVER-34705: Whole-DB or whole-cluster change streams may not provide a total ordering if resumed after a drop
(cherry picked from commit 55f4dbf94a1cce9d8642af9bba9ac4cc77627293)
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_test.cpp189
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_transform.cpp31
2 files changed, 213 insertions, 7 deletions
diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
index a8412e0d8bf..1cb939b573a 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
@@ -73,6 +73,7 @@ using DSChangeStream = DocumentSourceChangeStream;
static const Timestamp kDefaultTs(100, 1);
static const repl::OpTime kDefaultOpTime(kDefaultTs, 1);
static const NamespaceString nss("unittests.change_stream");
+static const BSONObj kDefaultSpec = fromjson("{$changeStream: {}}");
class ChangeStreamStageTestNoSetup : public AggregationContextFixture {
public:
@@ -88,7 +89,7 @@ struct MockMongoInterface final : public StubMongoProcessInterface {
std::pair<std::vector<FieldPath>, bool> collectDocumentKeyFields(OperationContext*,
UUID) const final {
- return {_fields, true};
+ return {_fields, false};
}
std::vector<FieldPath> _fields;
@@ -107,8 +108,9 @@ public:
void checkTransformation(const OplogEntry& entry,
const boost::optional<Document> expectedDoc,
- std::vector<FieldPath> docKeyFields = {}) {
- vector<intrusive_ptr<DocumentSource>> stages = makeStages(entry);
+ std::vector<FieldPath> docKeyFields,
+ const BSONObj& spec) {
+ vector<intrusive_ptr<DocumentSource>> stages = makeStages(entry.toBSON(), spec);
auto transform = stages[2].get();
getExpCtx()->mongoProcessInterface = stdx::make_unique<MockMongoInterface>(docKeyFields);
@@ -121,12 +123,17 @@ public:
}
}
+ void checkTransformation(const OplogEntry& entry,
+ const boost::optional<Document> expectedDoc,
+ std::vector<FieldPath> docKeyFields = {}) {
+ return checkTransformation(entry, expectedDoc, docKeyFields, kDefaultSpec);
+ }
+
/**
* Returns a list of stages expanded from a $changStream specification, starting with a
* DocumentSourceMock which contains a single document representing 'entry'.
*/
- vector<intrusive_ptr<DocumentSource>> makeStages(const BSONObj& entry) {
- const auto spec = fromjson("{$changeStream: {}}");
+ vector<intrusive_ptr<DocumentSource>> makeStages(const BSONObj& entry, const BSONObj& spec) {
list<intrusive_ptr<DocumentSource>> result =
DSChangeStream::createFromBson(spec.firstElement(), getExpCtx());
vector<intrusive_ptr<DocumentSource>> stages(std::begin(result), std::end(result));
@@ -160,7 +167,7 @@ public:
}
vector<intrusive_ptr<DocumentSource>> makeStages(const OplogEntry& entry) {
- return makeStages(entry.toBSON());
+ return makeStages(entry.toBSON(), kDefaultSpec);
}
OplogEntry createCommand(const BSONObj& oField,
@@ -207,7 +214,7 @@ public:
BSONObj oplogEntry = builder.done();
// Create the stages and check that the documents produced matched those in the applyOps.
- vector<intrusive_ptr<DocumentSource>> stages = makeStages(oplogEntry);
+ vector<intrusive_ptr<DocumentSource>> stages = makeStages(oplogEntry, kDefaultSpec);
auto transform = stages[2].get();
invariant(dynamic_cast<DocumentSourceChangeStreamTransform*>(transform) != nullptr);
@@ -994,6 +1001,91 @@ TEST_F(ChangeStreamStageTest, CloseCursorEvenIfInvalidateEntriesGetFilteredOut)
ASSERT_THROWS(match->getNext(), ExceptionFor<ErrorCodes::CloseChangeStream>);
}
+TEST_F(ChangeStreamStageTest, DocumentKeyShouldIncludeShardKeyFromResumeToken) {
+ const Timestamp ts(3, 45);
+ const long long term = 4;
+ const auto opTime = repl::OpTime(ts, term);
+ const auto uuid = testUuid();
+
+ Collection collection(stdx::make_unique<CollectionMock>(nss));
+ UUIDCatalog::get(getExpCtx()->opCtx).onCreateCollection(getExpCtx()->opCtx, &collection, uuid);
+
+ BSONObj o2 = BSON("_id" << 1 << "shardKey" << 2);
+ auto resumeToken = makeResumeToken(ts, uuid, o2);
+
+ BSONObj insertDoc = BSON("_id" << 2 << "shardKey" << 3);
+ auto insertEntry = makeOplogEntry(OpTypeEnum::kInsert, // op type
+ nss, // namespace
+ insertDoc, // o
+ uuid, // uuid
+ boost::none, // fromMigrate
+ boost::none, // o2
+ opTime); // opTime
+
+ Document expectedInsert{
+ {DSChangeStream::kIdField, makeResumeToken(ts, uuid, insertDoc)},
+ {DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType},
+ {DSChangeStream::kClusterTimeField, ts},
+ {DSChangeStream::kFullDocumentField, D{{"_id", 2}, {"shardKey", 3}}},
+ {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
+ {DSChangeStream::kDocumentKeyField, D{{"_id", 2}, {"shardKey", 3}}},
+ };
+ // Although the chunk manager and sharding catalog are not aware of the shard key in this test,
+ // the expectation is for the $changeStream stage to infer the shard key from the resume token.
+ checkTransformation(insertEntry,
+ expectedInsert,
+ {},
+ BSON("$changeStream" << BSON("resumeAfter" << resumeToken)));
+}
+
+TEST_F(ChangeStreamStageTest, DocumentKeyShouldNotIncludeShardKeyFieldsIfNotPresentInOplogEntry) {
+ const Timestamp ts(3, 45);
+ const long long term = 4;
+ const auto opTime = repl::OpTime(ts, term);
+ const auto uuid = testUuid();
+
+ Collection collection(stdx::make_unique<CollectionMock>(nss));
+ UUIDCatalog::get(getExpCtx()->opCtx).onCreateCollection(getExpCtx()->opCtx, &collection, uuid);
+
+ BSONObj o2 = BSON("_id" << 1 << "shardKey" << 2);
+ auto resumeToken = makeResumeToken(ts, uuid, o2);
+
+ // Note that the 'o' field in the oplog entry does not contain the shard key field.
+ BSONObj insertDoc = BSON("_id" << 2);
+ auto insertEntry = makeOplogEntry(OpTypeEnum::kInsert, // op type
+ nss, // namespace
+ insertDoc, // o
+ uuid, // uuid
+ boost::none, // fromMigrate
+ boost::none, // o2
+ opTime); // opTime
+
+ Document expectedInsert{
+ {DSChangeStream::kIdField, makeResumeToken(ts, uuid, insertDoc)},
+ {DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType},
+ {DSChangeStream::kClusterTimeField, ts},
+ {DSChangeStream::kFullDocumentField, D{{"_id", 2}}},
+ {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
+ {DSChangeStream::kDocumentKeyField, D{{"_id", 2}}},
+ };
+ checkTransformation(insertEntry,
+ expectedInsert,
+ {},
+ BSON("$changeStream" << BSON("resumeAfter" << resumeToken)));
+}
+
+TEST_F(ChangeStreamStageTest, DocumentKeyShouldNotIncludeShardKeyIfResumeTokenDoesntContainUUID) {
+ auto resumeToken = makeResumeToken(Timestamp(3, 45));
+
+ // TODO SERVER-34710: Allow resuming from an "invalidate" will change this behavior.
+ ASSERT_THROWS_CODE(
+ DSChangeStream::createFromBson(
+ BSON(DSChangeStream::kStageName << BSON("resumeAfter" << resumeToken)).firstElement(),
+ getExpCtx()),
+ AssertionException,
+ 40645);
+}
+
//
// Test class for change stream of a single database.
//
@@ -1257,5 +1349,88 @@ TEST_F(ChangeStreamStageDBTest, MatchFiltersCreateIndex) {
checkTransformation(createIndex, boost::none);
}
+TEST_F(ChangeStreamStageDBTest, DocumentKeyShouldIncludeShardKeyFromResumeToken) {
+ const Timestamp ts(3, 45);
+ const long long term = 4;
+ const auto opTime = repl::OpTime(ts, term);
+ const auto uuid = testUuid();
+
+ Collection collection(stdx::make_unique<CollectionMock>(nss));
+ UUIDCatalog::get(getExpCtx()->opCtx).onCreateCollection(getExpCtx()->opCtx, &collection, uuid);
+
+ BSONObj o2 = BSON("_id" << 1 << "shardKey" << 2);
+ auto resumeToken = makeResumeToken(ts, uuid, o2);
+
+ BSONObj insertDoc = BSON("_id" << 2 << "shardKey" << 3);
+ auto insertEntry = makeOplogEntry(OpTypeEnum::kInsert, // op type
+ nss, // namespace
+ insertDoc, // o
+ uuid, // uuid
+ boost::none, // fromMigrate
+ boost::none, // o2
+ opTime); // opTime
+
+ Document expectedInsert{
+ {DSChangeStream::kIdField, makeResumeToken(ts, uuid, insertDoc)},
+ {DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType},
+ {DSChangeStream::kClusterTimeField, ts},
+ {DSChangeStream::kFullDocumentField, D{{"_id", 2}, {"shardKey", 3}}},
+ {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
+ {DSChangeStream::kDocumentKeyField, D{{"_id", 2}, {"shardKey", 3}}},
+ };
+ checkTransformation(insertEntry,
+ expectedInsert,
+ {},
+ BSON("$changeStream" << BSON("resumeAfter" << resumeToken)));
+}
+
+TEST_F(ChangeStreamStageDBTest, DocumentKeyShouldNotIncludeShardKeyFieldsIfNotPresentInOplogEntry) {
+ const Timestamp ts(3, 45);
+ const long long term = 4;
+ const auto opTime = repl::OpTime(ts, term);
+ const auto uuid = testUuid();
+
+ Collection collection(stdx::make_unique<CollectionMock>(nss));
+ UUIDCatalog::get(getExpCtx()->opCtx).onCreateCollection(getExpCtx()->opCtx, &collection, uuid);
+
+ BSONObj o2 = BSON("_id" << 1 << "shardKey" << 2);
+ auto resumeToken = makeResumeToken(ts, uuid, o2);
+
+ // Note that the 'o' field in the oplog entry does not contain the shard key field.
+ BSONObj insertDoc = BSON("_id" << 2);
+ auto insertEntry = makeOplogEntry(OpTypeEnum::kInsert, // op type
+ nss, // namespace
+ insertDoc, // o
+ uuid, // uuid
+ boost::none, // fromMigrate
+ boost::none, // o2
+ opTime); // opTime
+
+ Document expectedInsert{
+ {DSChangeStream::kIdField, makeResumeToken(ts, uuid, insertDoc)},
+ {DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType},
+ {DSChangeStream::kClusterTimeField, ts},
+ {DSChangeStream::kFullDocumentField, D{{"_id", 2}}},
+ {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
+ {DSChangeStream::kDocumentKeyField, D{{"_id", 2}}},
+ };
+ checkTransformation(insertEntry,
+ expectedInsert,
+ {},
+ BSON("$changeStream" << BSON("resumeAfter" << resumeToken)));
+}
+
+TEST_F(ChangeStreamStageDBTest, DocumentKeyShouldNotIncludeShardKeyIfResumeTokenDoesntContainUUID) {
+ auto resumeToken = makeResumeToken(Timestamp(3, 45));
+
+ // TODO SERVER-34710: Allow resuming from an "invalidate" will change this behavior.
+ ASSERT_THROWS_CODE(
+ DSChangeStream::createFromBson(
+ BSON(DSChangeStream::kStageName << BSON("resumeAfter" << resumeToken)).firstElement(),
+ getExpCtx()),
+ AssertionException,
+ 40645);
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
index bb98f3bfffe..9e7e623eeab 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
@@ -81,6 +81,37 @@ DocumentSourceChangeStreamTransform::DocumentSourceChangeStreamTransform(
_isIndependentOfAnyCollection(isIndependentOfAnyCollection) {
_nsRegex.emplace(DocumentSourceChangeStream::getNsRegexForChangeStream(expCtx->ns));
+
+ auto spec = DocumentSourceChangeStreamSpec::parse(IDLParserErrorContext("$changeStream"),
+ _changeStreamSpec);
+
+ // If the change stream spec includes a resumeToken with a shard key, populate the document key
+ // cache with the field paths.
+ if (auto resumeAfter = spec.getResumeAfter()) {
+ ResumeToken token = resumeAfter.get();
+ ResumeTokenData tokenData = token.getData();
+
+ // TODO SERVER-34710: Resuming from an "invalidate" means that the resume token may not
+ // always contain a UUID.
+ invariant(tokenData.uuid);
+ if (!tokenData.documentKey.missing()) {
+ std::vector<FieldPath> docKeyFields;
+ auto docKey = tokenData.documentKey.getDocument();
+
+ auto iter = docKey.fieldIterator();
+ while (iter.more()) {
+ auto fieldPair = iter.next();
+ docKeyFields.push_back(fieldPair.first);
+ }
+
+ // If the document key from the resume token has more than one field, that means it
+ // includes the shard key and thus should never change.
+ auto isFinal = docKey.size() > 1;
+
+ _documentKeyCache[tokenData.uuid.get()] =
+ DocumentKeyCacheEntry({docKeyFields, isFinal});
+ }
+ }
}
DocumentSource::StageConstraints DocumentSourceChangeStreamTransform::constraints(