/** * Copyright (C) 2017 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kCommand #include "mongo/db/pipeline/document_source_change_stream.h" #include "mongo/bson/simple_bsonelement_comparator.h" #include "mongo/db/bson/bson_helper.h" #include "mongo/db/catalog/uuid_catalog.h" #include "mongo/db/commands/feature_compatibility_version_command_parser.h" #include "mongo/db/pipeline/close_change_stream_exception.h" #include "mongo/db/pipeline/document_source_check_resume_token.h" #include "mongo/db/pipeline/document_source_limit.h" #include "mongo/db/pipeline/document_source_lookup_change_post_image.h" #include "mongo/db/pipeline/document_source_sort.h" #include "mongo/db/pipeline/expression.h" #include "mongo/db/pipeline/lite_parsed_document_source.h" #include "mongo/db/pipeline/resume_token.h" #include "mongo/db/repl/oplog_entry.h" #include "mongo/db/repl/oplog_entry_gen.h" #include "mongo/db/repl/replication_coordinator.h" #include "mongo/util/log.h" namespace mongo { using boost::intrusive_ptr; using boost::optional; using std::list; using std::string; using std::vector; // The $changeStream stage is an alias for many stages, but we need to be able to serialize // and re-parse the pipeline. To make this work, the 'transformation' stage will serialize itself // with the original specification, and all other stages that are created during the alias expansion // will not serialize themselves. REGISTER_MULTI_STAGE_ALIAS(changeStream, DocumentSourceChangeStream::LiteParsed::parse, DocumentSourceChangeStream::createFromBson); constexpr StringData DocumentSourceChangeStream::kDocumentKeyField; constexpr StringData DocumentSourceChangeStream::kFullDocumentField; constexpr StringData DocumentSourceChangeStream::kIdField; constexpr StringData DocumentSourceChangeStream::kNamespaceField; constexpr StringData DocumentSourceChangeStream::kUuidField; constexpr StringData DocumentSourceChangeStream::kOperationTypeField; constexpr StringData DocumentSourceChangeStream::kStageName; constexpr StringData DocumentSourceChangeStream::kTimestampField; constexpr StringData DocumentSourceChangeStream::kClusterTimeField; constexpr StringData DocumentSourceChangeStream::kUpdateOpType; constexpr StringData DocumentSourceChangeStream::kDeleteOpType; constexpr StringData DocumentSourceChangeStream::kReplaceOpType; constexpr StringData DocumentSourceChangeStream::kInsertOpType; constexpr StringData DocumentSourceChangeStream::kInvalidateOpType; constexpr StringData DocumentSourceChangeStream::kRetryNeededOpType; namespace { static constexpr StringData kOplogMatchExplainName = "$_internalOplogMatch"_sd; } // namespace intrusive_ptr DocumentSourceOplogMatch::create( BSONObj filter, const intrusive_ptr& expCtx) { return new DocumentSourceOplogMatch(std::move(filter), expCtx); } const char* DocumentSourceOplogMatch::getSourceName() const { // This is used in error reporting, particularly if we find this stage in a position other // than first, so report the name as $changeStream. return DocumentSourceChangeStream::kStageName.rawData(); } DocumentSource::StageConstraints DocumentSourceOplogMatch::constraints( Pipeline::SplitState pipeState) const { return {StreamType::kStreaming, PositionRequirement::kFirst, HostTypeRequirement::kAnyShard, DiskUseRequirement::kNoDiskUse, FacetRequirement::kNotAllowed}; } /** * Only serialize this stage for explain purposes, otherwise keep it hidden so that we can * properly alias. */ Value DocumentSourceOplogMatch::serialize(optional explain) const { if (explain) { return Value(Document{{kOplogMatchExplainName, Document{}}}); } return Value(); } DocumentSourceOplogMatch::DocumentSourceOplogMatch(BSONObj filter, const intrusive_ptr& expCtx) : DocumentSourceMatch(std::move(filter), expCtx) {} void checkValueType(const Value v, const StringData filedName, BSONType expectedType) { uassert(40532, str::stream() << "Entry field \"" << filedName << "\" should be " << typeName(expectedType) << ", found: " << typeName(v.getType()), (v.getType() == expectedType)); } namespace { /** * This stage is used internally for change notifications to close cursor after returning * "invalidate" entries. * It is not intended to be created by the user. */ class DocumentSourceCloseCursor final : public DocumentSource { public: GetNextResult getNext() final; const char* getSourceName() const final { // This is used in error reporting. return "$changeStream"; } StageConstraints constraints(Pipeline::SplitState pipeState) const final { // This stage should never be in the shards part of a split pipeline. invariant(pipeState != Pipeline::SplitState::kSplitForShards); return {StreamType::kStreaming, PositionRequirement::kNone, (pipeState == Pipeline::SplitState::kUnsplit ? HostTypeRequirement::kNone : HostTypeRequirement::kMongoS), DiskUseRequirement::kNoDiskUse, FacetRequirement::kNotAllowed}; } Value serialize(boost::optional explain = boost::none) const final { // This stage is created by the DocumentSourceChangeStream stage, so serializing it // here would result in it being created twice. return Value(); } static boost::intrusive_ptr create( const boost::intrusive_ptr& expCtx) { return new DocumentSourceCloseCursor(expCtx); } private: /** * Use the create static method to create a DocumentSourceCloseCursor. */ DocumentSourceCloseCursor(const boost::intrusive_ptr& expCtx) : DocumentSource(expCtx) {} bool _shouldCloseCursor = false; }; DocumentSource::GetNextResult DocumentSourceCloseCursor::getNext() { pExpCtx->checkForInterrupt(); // Close cursor if we have returned an invalidate entry. if (_shouldCloseCursor) { throw CloseChangeStreamException(); } auto nextInput = pSource->getNext(); if (!nextInput.isAdvanced()) return nextInput; auto doc = nextInput.getDocument(); const auto& kOperationTypeField = DocumentSourceChangeStream::kOperationTypeField; checkValueType(doc[kOperationTypeField], kOperationTypeField, BSONType::String); auto operationType = doc[kOperationTypeField].getString(); if (operationType == DocumentSourceChangeStream::kInvalidateOpType || operationType == DocumentSourceChangeStream::kRetryNeededOpType) { // Pass the invalidation forward, so that it can be included in the results, or // filtered/transformed by further stages in the pipeline, then throw an exception // to close the cursor on the next call to getNext(). _shouldCloseCursor = true; } return nextInput; } } // namespace BSONObj DocumentSourceChangeStream::buildMatchFilter(const NamespaceString& nss, Timestamp startFrom, bool isResume) { auto target = nss.ns(); // 1) Supported commands that have the target db namespace (e.g. test.$cmd) in "ns" field. auto dropDatabase = BSON("o.dropDatabase" << 1); auto dropCollection = BSON("o.drop" << nss.coll()); auto renameCollection = BSON("o.renameCollection" << target); // 1.1) Commands that are on target db and one of the above. auto commandsOnTargetDb = BSON("ns" << nss.getCommandNS().ns() << OR(dropDatabase, dropCollection, renameCollection)); // 1.2) Supported commands that have arbitrary db namespaces in "ns" field. auto renameDropTarget = BSON("o.to" << target); // All supported commands that are either (1.1) or (1.2). BSONObj commandMatch = BSON("op" << "c" << OR(commandsOnTargetDb, renameDropTarget)); // 2.1) Normal CRUD ops on the target collection. auto normalOpTypeMatch = BSON("op" << NE << "n"); // 2.2) A chunk gets migrated to a new shard that doesn't have any chunks. auto chunkMigratedMatch = BSON("op" << "n" << "o2.type" << "migrateChunkToNewShard"); // 2) Supported operations on the target namespace. auto opMatch = BSON("ns" << target << OR(normalOpTypeMatch, chunkMigratedMatch)); // Match oplog entries after "start" and are either supported (1) commands or (2) operations, // excepting those tagged "fromMigrate". // Include the resume token, if resuming, so we can verify it was still present in the oplog. return BSON("$and" << BSON_ARRAY(BSON("ts" << (isResume ? GTE : GT) << startFrom) << BSON(OR(opMatch, commandMatch)) << BSON("fromMigrate" << NE << true))); } list> DocumentSourceChangeStream::createFromBson( BSONElement elem, const intrusive_ptr& expCtx) { uassert( ErrorCodes::InvalidOptions, str::stream() << "The featureCompatibilityVersion must be 3.6 to use the $changeStream stage. See " << feature_compatibility_version::kDochubLink << ".", serverGlobalParams.featureCompatibility.version.load() != ServerGlobalParams::FeatureCompatibility::Version::k34); // TODO: Add sharding support here (SERVER-29141). uassert( 40470, "The $changeStream stage is not supported on sharded systems.", !expCtx->inMongos); uassert(40471, "Only default collation is allowed when using a $changeStream stage.", !expCtx->getCollator()); auto replCoord = repl::ReplicationCoordinator::get(expCtx->opCtx); uassert(40573, "The $changeStream stage is only supported on replica sets", replCoord && replCoord->getReplicationMode() == repl::ReplicationCoordinator::Mode::modeReplSet); Timestamp startFrom = replCoord->getMyLastAppliedOpTime().getTimestamp(); intrusive_ptr resumeStage = nullptr; auto spec = DocumentSourceChangeStreamSpec::parse(IDLParserErrorContext("$changeStream"), elem.embeddedObject()); if (auto resumeAfter = spec.getResumeAfter()) { ResumeToken token = resumeAfter.get(); ResumeTokenData tokenData = token.getData(); uassert(40645, "The resume token is invalid (no UUID), possibly from an invalidate.", tokenData.uuid); auto resumeNamespace = UUIDCatalog::get(expCtx->opCtx).lookupNSSByUUID(tokenData.uuid.get()); uassert(40615, "The resume token UUID does not exist. Has the collection been dropped?", !resumeNamespace.isEmpty()); startFrom = tokenData.clusterTime; if (expCtx->needsMerge) { resumeStage = DocumentSourceShardCheckResumability::create(expCtx, std::move(token)); } else { resumeStage = DocumentSourceEnsureResumeTokenPresent::create(expCtx, std::move(token)); } } const bool changeStreamIsResuming = resumeStage != nullptr; auto fullDocOption = spec.getFullDocument(); uassert(40575, str::stream() << "unrecognized value for the 'fullDocument' option to the " "$changeStream stage. Expected \"default\" or " "\"updateLookup\", got \"" << fullDocOption << "\"", fullDocOption == "updateLookup"_sd || fullDocOption == "default"_sd); const bool shouldLookupPostImage = (fullDocOption == "updateLookup"_sd); auto oplogMatch = DocumentSourceOplogMatch::create( buildMatchFilter(expCtx->ns, startFrom, changeStreamIsResuming), expCtx); auto transformation = createTransformationStage(elem.embeddedObject(), expCtx); list> stages = {oplogMatch, transformation}; if (resumeStage) { stages.push_back(resumeStage); } auto closeCursorSource = DocumentSourceCloseCursor::create(expCtx); stages.push_back(closeCursorSource); if (shouldLookupPostImage) { stages.push_back(DocumentSourceLookupChangePostImage::create(expCtx)); } return stages; } intrusive_ptr DocumentSourceChangeStream::createTransformationStage( BSONObj changeStreamSpec, const intrusive_ptr& expCtx) { return intrusive_ptr(new DocumentSourceSingleDocumentTransformation( expCtx, stdx::make_unique(changeStreamSpec), kStageName.toString())); } Document DocumentSourceChangeStream::Transformation::applyTransformation(const Document& input) { MutableDocument doc; // Extract the fields we need. checkValueType(input[repl::OplogEntry::kOpTypeFieldName], repl::OplogEntry::kOpTypeFieldName, BSONType::String); string op = input[repl::OplogEntry::kOpTypeFieldName].getString(); Value ts = input[repl::OplogEntry::kTimestampFieldName]; Value ns = input[repl::OplogEntry::kNamespaceFieldName]; checkValueType(ns, repl::OplogEntry::kNamespaceFieldName, BSONType::String); Value uuid = input[repl::OplogEntry::kUuidFieldName]; if (!uuid.missing()) { checkValueType(uuid, repl::OplogEntry::kUuidFieldName, BSONType::BinData); } NamespaceString nss(ns.getString()); Value id = input.getNestedField("o._id"); // Non-replace updates have the _id in field "o2". StringData operationType; Value fullDocument; Value updateDescription; Value documentKey; // Deal with CRUD operations and commands. auto opType = repl::OpType_parse(IDLParserErrorContext("ChangeStreamEntry.op"), op); switch (opType) { case repl::OpTypeEnum::kInsert: { operationType = kInsertOpType; fullDocument = input[repl::OplogEntry::kObjectFieldName]; documentKey = Value(Document{{kIdField, id}}); break; } case repl::OpTypeEnum::kDelete: { operationType = kDeleteOpType; documentKey = input[repl::OplogEntry::kObjectFieldName]; break; } case repl::OpTypeEnum::kUpdate: { if (id.missing()) { operationType = kUpdateOpType; checkValueType(input[repl::OplogEntry::kObjectFieldName], repl::OplogEntry::kObjectFieldName, BSONType::Object); Document opObject = input[repl::OplogEntry::kObjectFieldName].getDocument(); Value updatedFields = opObject["$set"]; Value removedFields = opObject["$unset"]; // Extract the field names of $unset document. vector removedFieldsVector; if (removedFields.getType() == BSONType::Object) { auto iter = removedFields.getDocument().fieldIterator(); while (iter.more()) { removedFieldsVector.push_back(Value(iter.next().first)); } } updateDescription = Value(Document{ {"updatedFields", updatedFields.missing() ? Value(Document()) : updatedFields}, {"removedFields", removedFieldsVector}}); } else { operationType = kReplaceOpType; fullDocument = input[repl::OplogEntry::kObjectFieldName]; } documentKey = input[repl::OplogEntry::kObject2FieldName]; break; } case repl::OpTypeEnum::kCommand: { operationType = kInvalidateOpType; // Make sure the result doesn't have a document key. documentKey = Value(); break; } case repl::OpTypeEnum::kNoop: { operationType = kRetryNeededOpType; // Generate a fake document Id for RetryNeeded operation so that we can resume after // this operation. documentKey = Value(Document{{kIdField, input[repl::OplogEntry::kObject2FieldName]}}); break; } default: { MONGO_UNREACHABLE; } } // UUID should always be present except for invalidate entries. It will not be under // FCV 3.4, so we should close the stream as invalid. if (operationType != kInvalidateOpType && uuid.missing()) { warning() << "Saw a CRUD op without a UUID. Did Feature Compatibility Version get " "downgraded after opening the stream?"; operationType = kInvalidateOpType; fullDocument = Value(); updateDescription = Value(); documentKey = Value(); } // Note that 'documentKey' and/or 'uuid' might be missing, in which case the missing fields will // not appear in the output. ResumeTokenData resumeTokenData; resumeTokenData.clusterTime = ts.getTimestamp(); resumeTokenData.documentKey = documentKey; if (!uuid.missing()) resumeTokenData.uuid = uuid.getUuid(); doc.addField(kIdField, Value(ResumeToken(resumeTokenData).toDocument())); doc.addField(kOperationTypeField, Value(operationType)); // "invalidate" and "retryNeeded" entries have fewer fields. if (operationType == kInvalidateOpType || operationType == kRetryNeededOpType) { return doc.freeze(); } doc.addField(kFullDocumentField, fullDocument); doc.addField(kNamespaceField, Value(Document{{"db", nss.db()}, {"coll", nss.coll()}})); doc.addField(kDocumentKeyField, documentKey); // Note that 'updateDescription' might be the 'missing' value, in which case it will not be // serialized. doc.addField("updateDescription", updateDescription); return doc.freeze(); } Document DocumentSourceChangeStream::Transformation::serializeStageOptions( boost::optional explain) const { return Document(_changeStreamSpec); } DocumentSource::GetDepsReturn DocumentSourceChangeStream::Transformation::addDependencies( DepsTracker* deps) const { deps->fields.insert(repl::OplogEntry::kOpTypeFieldName.toString()); deps->fields.insert(repl::OplogEntry::kTimestampFieldName.toString()); deps->fields.insert(repl::OplogEntry::kNamespaceFieldName.toString()); deps->fields.insert(repl::OplogEntry::kUuidFieldName.toString()); deps->fields.insert(repl::OplogEntry::kObjectFieldName.toString()); deps->fields.insert(repl::OplogEntry::kObject2FieldName.toString()); return DocumentSource::GetDepsReturn::EXHAUSTIVE_ALL; } DocumentSource::GetModPathsReturn DocumentSourceChangeStream::Transformation::getModifiedPaths() const { // All paths are modified. return {DocumentSource::GetModPathsReturn::Type::kAllPaths, std::set{}, {}}; } } // namespace mongo