summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/document_source_change_stream.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/pipeline/document_source_change_stream.cpp')
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.cpp59
1 files changed, 35 insertions, 24 deletions
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp
index a9d80a0e2a0..1049a9b04aa 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream.cpp
@@ -244,8 +244,7 @@ std::string DocumentSourceChangeStream::getNsRegexForChangeStream(const Namespac
BSONObj DocumentSourceChangeStream::buildMatchFilter(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
- Timestamp startFrom,
- bool startFromInclusive,
+ Timestamp startFromInclusive,
bool showMigrationEvents) {
auto nss = expCtx->ns;
@@ -297,6 +296,11 @@ BSONObj DocumentSourceChangeStream::buildMatchFilter(
// 2.1) Normal CRUD ops.
auto normalOpTypeMatch = BSON("op" << NE << "n");
+ // TODO SERVER-44039: we continue to generate 'kNewShardDetected' events for compatibility
+ // with 4.2, even though we no longer rely on them to detect new shards. We may wish to remove
+ // this mechanism in 4.6, or retain it for future cases where a change stream is targeted to a
+ // subset of shards. See SERVER-44039 for details.
+
// 2.2) A chunk gets migrated to a new shard that doesn't have any chunks.
auto chunkMigratedNewShardMatch = BSON("op"
<< "n"
@@ -326,7 +330,7 @@ BSONObj DocumentSourceChangeStream::buildMatchFilter(
// Only include CRUD operations tagged "fromMigrate" when the "showMigrationEvents" option is
// set - exempt all other operations and commands with that tag. Include the resume token, if
// resuming, so we can verify it was still present in the oplog.
- return BSON("$and" << BSON_ARRAY(BSON("ts" << (startFromInclusive ? GTE : GT) << startFrom)
+ return BSON("$and" << BSON_ARRAY(BSON("ts" << GTE << startFromInclusive)
<< BSON(OR(opMatch, commandAndApplyOpsMatch))));
}
@@ -388,6 +392,7 @@ list<intrusive_ptr<DocumentSource>> buildPipeline(const intrusive_ptr<Expression
}
}
+ // If we do not have a 'resumeAfter' starting point, check for 'startAtOperationTime'.
if (auto startAtOperationTime = spec.getStartAtOperationTime()) {
uassert(40674,
"Only one type of resume option is allowed, but multiple were found.",
@@ -396,8 +401,7 @@ list<intrusive_ptr<DocumentSource>> buildPipeline(const intrusive_ptr<Expression
resumeStage = DocumentSourceShardCheckResumability::create(expCtx, *startFrom);
}
- // There might not be a starting point if we're on mongos, otherwise we should either have a
- // 'resumeAfter' starting point, or should start from the latest majority committed operation.
+ // We can only run on a replica set, or through mongoS. Confirm that this is the case.
auto replCoord = repl::ReplicationCoordinator::get(expCtx->opCtx);
uassert(
40573,
@@ -405,24 +409,29 @@ list<intrusive_ptr<DocumentSource>> buildPipeline(const intrusive_ptr<Expression
expCtx->inMongos ||
(replCoord &&
replCoord->getReplicationMode() == repl::ReplicationCoordinator::Mode::modeReplSet));
- if (!startFrom && !expCtx->inMongos) {
- startFrom = replCoord->getMyLastAppliedOpTime().getTimestamp();
+
+ // If we do not have an explicit starting point, we should start from the latest majority
+ // committed operation. If we are on mongoS and do not have a starting point, set it to the
+ // current clusterTime so that all shards start in sync. We always start one tick beyond the
+ // most recent operation, to ensure that the stream does not return it.
+ if (!startFrom) {
+ const auto currentTime = !expCtx->inMongos
+ ? LogicalTime{replCoord->getMyLastAppliedOpTime().getTimestamp()}
+ : LogicalClock::get(expCtx->opCtx)->getClusterTime();
+ startFrom = currentTime.addTicks(1).asTimestamp();
}
- if (startFrom) {
- const bool startFromInclusive = (resumeStage != nullptr);
- stages.push_back(DocumentSourceOplogMatch::create(
- DocumentSourceChangeStream::buildMatchFilter(
- expCtx, *startFrom, startFromInclusive, showMigrationEvents),
- expCtx));
-
- // If we haven't already populated the initial PBRT, then we are starting from a specific
- // timestamp rather than a resume token. Initialize the PBRT to a high water mark token.
- if (expCtx->initialPostBatchResumeToken.isEmpty()) {
- Timestamp startTime{startFrom->getSecs(), startFrom->getInc() + (!startFromInclusive)};
- expCtx->initialPostBatchResumeToken =
- ResumeToken::makeHighWaterMarkToken(startTime).toDocument().toBson();
- }
+ // We must always build the DSOplogMatch stage even on mongoS, since our validation logic relies
+ // upon the fact that it is always the first stage in the pipeline.
+ stages.push_back(DocumentSourceOplogMatch::create(
+ DocumentSourceChangeStream::buildMatchFilter(expCtx, *startFrom, showMigrationEvents),
+ expCtx));
+
+ // If we haven't already populated the initial PBRT, then we are starting from a specific
+ // timestamp rather than a resume token. Initialize the PBRT to a high water mark token.
+ if (expCtx->initialPostBatchResumeToken.isEmpty()) {
+ expCtx->initialPostBatchResumeToken =
+ ResumeToken::makeHighWaterMarkToken(*startFrom).toDocument().toBson();
}
// Obtain the current FCV and use it to create the DocumentSourceChangeStreamTransform stage.
@@ -516,12 +525,14 @@ void DocumentSourceChangeStream::assertIsLegalSpecification(
(expCtx->ns.isAdminDB() && expCtx->ns.isCollectionlessAggregateNS()));
// Prevent $changeStream from running on internal databases. A stream may run against the
- // 'admin' database iff 'allChangesForCluster' is true.
+ // 'admin' database iff 'allChangesForCluster' is true. A stream may run against the 'config'
+ // database iff 'allowToRunOnConfigDB' is true.
+ const bool isNotBannedInternalDB =
+ !expCtx->ns.isLocal() && (!expCtx->ns.isConfigDB() || spec.getAllowToRunOnConfigDB());
uassert(ErrorCodes::InvalidNamespace,
str::stream() << "$changeStream may not be opened on the internal " << expCtx->ns.db()
<< " database",
- expCtx->ns.isAdminDB() ? spec.getAllChangesForCluster()
- : (!expCtx->ns.isLocal() && !expCtx->ns.isConfigDB()));
+ expCtx->ns.isAdminDB() ? spec.getAllChangesForCluster() : isNotBannedInternalDB);
// Prevent $changeStream from running on internal collections in any database.
uassert(ErrorCodes::InvalidNamespace,