diff options
Diffstat (limited to 'src/mongo/db/pipeline/document_source_change_stream.cpp')
-rw-r--r-- | src/mongo/db/pipeline/document_source_change_stream.cpp | 59 |
1 files changed, 35 insertions, 24 deletions
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp index a9d80a0e2a0..1049a9b04aa 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream.cpp @@ -244,8 +244,7 @@ std::string DocumentSourceChangeStream::getNsRegexForChangeStream(const Namespac BSONObj DocumentSourceChangeStream::buildMatchFilter( const boost::intrusive_ptr<ExpressionContext>& expCtx, - Timestamp startFrom, - bool startFromInclusive, + Timestamp startFromInclusive, bool showMigrationEvents) { auto nss = expCtx->ns; @@ -297,6 +296,11 @@ BSONObj DocumentSourceChangeStream::buildMatchFilter( // 2.1) Normal CRUD ops. auto normalOpTypeMatch = BSON("op" << NE << "n"); + // TODO SERVER-44039: we continue to generate 'kNewShardDetected' events for compatibility + // with 4.2, even though we no longer rely on them to detect new shards. We may wish to remove + // this mechanism in 4.6, or retain it for future cases where a change stream is targeted to a + // subset of shards. See SERVER-44039 for details. + // 2.2) A chunk gets migrated to a new shard that doesn't have any chunks. auto chunkMigratedNewShardMatch = BSON("op" << "n" @@ -326,7 +330,7 @@ BSONObj DocumentSourceChangeStream::buildMatchFilter( // Only include CRUD operations tagged "fromMigrate" when the "showMigrationEvents" option is // set - exempt all other operations and commands with that tag. Include the resume token, if // resuming, so we can verify it was still present in the oplog. - return BSON("$and" << BSON_ARRAY(BSON("ts" << (startFromInclusive ? GTE : GT) << startFrom) + return BSON("$and" << BSON_ARRAY(BSON("ts" << GTE << startFromInclusive) << BSON(OR(opMatch, commandAndApplyOpsMatch)))); } @@ -388,6 +392,7 @@ list<intrusive_ptr<DocumentSource>> buildPipeline(const intrusive_ptr<Expression } } + // If we do not have a 'resumeAfter' starting point, check for 'startAtOperationTime'. if (auto startAtOperationTime = spec.getStartAtOperationTime()) { uassert(40674, "Only one type of resume option is allowed, but multiple were found.", @@ -396,8 +401,7 @@ list<intrusive_ptr<DocumentSource>> buildPipeline(const intrusive_ptr<Expression resumeStage = DocumentSourceShardCheckResumability::create(expCtx, *startFrom); } - // There might not be a starting point if we're on mongos, otherwise we should either have a - // 'resumeAfter' starting point, or should start from the latest majority committed operation. + // We can only run on a replica set, or through mongoS. Confirm that this is the case. auto replCoord = repl::ReplicationCoordinator::get(expCtx->opCtx); uassert( 40573, @@ -405,24 +409,29 @@ list<intrusive_ptr<DocumentSource>> buildPipeline(const intrusive_ptr<Expression expCtx->inMongos || (replCoord && replCoord->getReplicationMode() == repl::ReplicationCoordinator::Mode::modeReplSet)); - if (!startFrom && !expCtx->inMongos) { - startFrom = replCoord->getMyLastAppliedOpTime().getTimestamp(); + + // If we do not have an explicit starting point, we should start from the latest majority + // committed operation. If we are on mongoS and do not have a starting point, set it to the + // current clusterTime so that all shards start in sync. We always start one tick beyond the + // most recent operation, to ensure that the stream does not return it. + if (!startFrom) { + const auto currentTime = !expCtx->inMongos + ? LogicalTime{replCoord->getMyLastAppliedOpTime().getTimestamp()} + : LogicalClock::get(expCtx->opCtx)->getClusterTime(); + startFrom = currentTime.addTicks(1).asTimestamp(); } - if (startFrom) { - const bool startFromInclusive = (resumeStage != nullptr); - stages.push_back(DocumentSourceOplogMatch::create( - DocumentSourceChangeStream::buildMatchFilter( - expCtx, *startFrom, startFromInclusive, showMigrationEvents), - expCtx)); - - // If we haven't already populated the initial PBRT, then we are starting from a specific - // timestamp rather than a resume token. Initialize the PBRT to a high water mark token. - if (expCtx->initialPostBatchResumeToken.isEmpty()) { - Timestamp startTime{startFrom->getSecs(), startFrom->getInc() + (!startFromInclusive)}; - expCtx->initialPostBatchResumeToken = - ResumeToken::makeHighWaterMarkToken(startTime).toDocument().toBson(); - } + // We must always build the DSOplogMatch stage even on mongoS, since our validation logic relies + // upon the fact that it is always the first stage in the pipeline. + stages.push_back(DocumentSourceOplogMatch::create( + DocumentSourceChangeStream::buildMatchFilter(expCtx, *startFrom, showMigrationEvents), + expCtx)); + + // If we haven't already populated the initial PBRT, then we are starting from a specific + // timestamp rather than a resume token. Initialize the PBRT to a high water mark token. + if (expCtx->initialPostBatchResumeToken.isEmpty()) { + expCtx->initialPostBatchResumeToken = + ResumeToken::makeHighWaterMarkToken(*startFrom).toDocument().toBson(); } // Obtain the current FCV and use it to create the DocumentSourceChangeStreamTransform stage. @@ -516,12 +525,14 @@ void DocumentSourceChangeStream::assertIsLegalSpecification( (expCtx->ns.isAdminDB() && expCtx->ns.isCollectionlessAggregateNS())); // Prevent $changeStream from running on internal databases. A stream may run against the - // 'admin' database iff 'allChangesForCluster' is true. + // 'admin' database iff 'allChangesForCluster' is true. A stream may run against the 'config' + // database iff 'allowToRunOnConfigDB' is true. + const bool isNotBannedInternalDB = + !expCtx->ns.isLocal() && (!expCtx->ns.isConfigDB() || spec.getAllowToRunOnConfigDB()); uassert(ErrorCodes::InvalidNamespace, str::stream() << "$changeStream may not be opened on the internal " << expCtx->ns.db() << " database", - expCtx->ns.isAdminDB() ? spec.getAllChangesForCluster() - : (!expCtx->ns.isLocal() && !expCtx->ns.isConfigDB())); + expCtx->ns.isAdminDB() ? spec.getAllChangesForCluster() : isNotBannedInternalDB); // Prevent $changeStream from running on internal collections in any database. uassert(ErrorCodes::InvalidNamespace, |