summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorBernard Gorman <bernard.gorman@gmail.com>2019-01-15 23:54:23 +0000
committerBernard Gorman <bernard.gorman@gmail.com>2019-02-02 20:44:19 +0000
commitfbb9daeb791d16c49b861d82c097cf9bd9daf07e (patch)
tree660bb996be48530e2e1eb5c1c9fc303912c1812c /src/mongo/db
parent603a1d610e9ebfa7b43d4e5df0df2a5477622303 (diff)
downloadmongo-fbb9daeb791d16c49b861d82c097cf9bd9daf07e.tar.gz
SERVER-38975 Include UUID in high water marks from shards where the collection does not exist
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/commands/run_aggregate.cpp2
-rw-r--r--src/mongo/db/pipeline/SConscript1
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.cpp33
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.h36
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_transform.cpp9
-rw-r--r--src/mongo/db/pipeline/document_source_watch_for_uuid.cpp59
-rw-r--r--src/mongo/db/pipeline/document_source_watch_for_uuid.h82
-rw-r--r--src/mongo/db/pipeline/lite_parsed_document_source.h8
-rw-r--r--src/mongo/db/pipeline/lite_parsed_pipeline.h15
-rw-r--r--src/mongo/db/pipeline/pipeline_test.cpp2
-rw-r--r--src/mongo/db/pipeline/resume_token.cpp2
11 files changed, 230 insertions, 19 deletions
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp
index 78463d65c63..09ac63f0349 100644
--- a/src/mongo/db/commands/run_aggregate.cpp
+++ b/src/mongo/db/commands/run_aggregate.cpp
@@ -431,7 +431,7 @@ Status runAggregate(OperationContext* opCtx,
// Upgrade and wait for read concern if necessary.
_adjustChangeStreamReadConcern(opCtx);
- if (!origNss.isCollectionlessAggregateNS()) {
+ if (liteParsedPipeline.shouldResolveUUIDAndCollation()) {
// AutoGetCollectionForReadCommand will raise an error if 'origNss' is a view.
AutoGetCollectionForReadCommand origNssCtx(opCtx, origNss);
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index 2480340ea2d..cda26fc3e84 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -403,6 +403,7 @@ pipelineeEnv.Library(
'document_source_sort_by_count.cpp',
'document_source_tee_consumer.cpp',
'document_source_unwind.cpp',
+ 'document_source_watch_for_uuid.cpp',
'pipeline.cpp',
'sequential_document_cache.cpp',
'stage_constraints.cpp',
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp
index 28f3c1df0c9..74ac3842377 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream.cpp
@@ -45,6 +45,7 @@
#include "mongo/db/pipeline/document_source_limit.h"
#include "mongo/db/pipeline/document_source_lookup_change_post_image.h"
#include "mongo/db/pipeline/document_source_sort.h"
+#include "mongo/db/pipeline/document_source_watch_for_uuid.h"
#include "mongo/db/pipeline/expression.h"
#include "mongo/db/pipeline/lite_parsed_document_source.h"
#include "mongo/db/pipeline/resume_token.h"
@@ -306,6 +307,14 @@ void assertResumeAllowed(const intrusive_ptr<ExpressionContext>& expCtx,
return;
}
+ if (!tokenData.uuid && ResumeToken::isHighWaterMarkToken(tokenData)) {
+ // The only time we see a single-collection high water mark with no UUID is when the stream
+ // was opened on a non-existent collection. We allow this to proceed, as the resumed stream
+ // will immediately invalidate itself if it observes a createCollection event in the oplog
+ // with a non-simple collation.
+ return;
+ }
+
const auto cannotResumeErrMsg =
"Attempted to resume a stream on a collection which has been dropped. The change stream's "
"pipeline may need to make comparisons which should respect the collection's default "
@@ -394,19 +403,26 @@ list<intrusive_ptr<DocumentSource>> buildPipeline(const intrusive_ptr<Expression
// If we haven't already populated the initial PBRT, then we are starting from a specific
// timestamp rather than a resume token. Initialize the PBRT to a high water mark token.
if (expCtx->initialPostBatchResumeToken.isEmpty()) {
+ Timestamp startTime{startFrom->getSecs(), startFrom->getInc() + (!startFromInclusive)};
expCtx->initialPostBatchResumeToken =
- ResumeToken::makeHighWaterMarkToken(*startFrom, expCtx->uuid).toDocument().toBson();
+ ResumeToken::makeHighWaterMarkToken(startTime, expCtx->uuid).toDocument().toBson();
}
}
const auto fcv = serverGlobalParams.featureCompatibility.getVersion();
stages.push_back(
DocumentSourceChangeStreamTransform::create(expCtx, fcv, elem.embeddedObject()));
- stages.push_back(DocumentSourceCheckInvalidate::create(expCtx, ignoreFirstInvalidate));
- // The resume stage must come after the check invalidate stage to allow the check invalidate
- // stage to determine whether the oplog entry matching the resume token should be followed by an
- // "invalidate" entry.
+ // If this is a single-collection stream but we don't have a UUID set on the expression context,
+ // then the stream was opened before the collection exists. Add a stage which will populate the
+ // UUID using the first change stream result observed by the pipeline during execution.
+ if (!expCtx->uuid && expCtx->isSingleNamespaceAggregation()) {
+ stages.push_back(DocumentSourceWatchForUUID::create(expCtx));
+ }
+
+ // The resume stage must come after the check invalidate stage so that the former can determine
+ // whether the event that matches the resume token should be followed by an "invalidate" event.
+ stages.push_back(DocumentSourceCheckInvalidate::create(expCtx, ignoreFirstInvalidate));
if (resumeStage) {
stages.push_back(resumeStage);
}
@@ -448,6 +464,13 @@ list<intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::createFromBson(
// to be merged, do not add a close cursor stage, since the mongos will already have one.
stages.push_back(DocumentSourceCloseCursor::create(expCtx));
+ // If this is a single-collection stream but we do not have a UUID set on the expression
+ // context, then the stream was opened before the collection exists. Add a stage on mongoS
+ // which will watch for and populate the UUID using the first result seen by the pipeline.
+ if (expCtx->inMongos && !expCtx->uuid && expCtx->isSingleNamespaceAggregation()) {
+ stages.push_back(DocumentSourceWatchForUUID::create(expCtx));
+ }
+
// There should be only one post-image lookup stage. If we're on the shards and producing
// input to be merged, the lookup is done on the mongos.
if (shouldLookupPostImage) {
diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h
index 7d922a3ec7f..5f529f77a9c 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.h
+++ b/src/mongo/db/pipeline/document_source_change_stream.h
@@ -49,15 +49,46 @@ public:
public:
static std::unique_ptr<LiteParsed> parse(const AggregationRequest& request,
const BSONElement& spec) {
- return stdx::make_unique<LiteParsed>(request.getNamespaceString());
+ return stdx::make_unique<LiteParsed>(request.getNamespaceString(), spec);
}
- explicit LiteParsed(NamespaceString nss) : _nss(std::move(nss)) {}
+ explicit LiteParsed(NamespaceString nss, BSONElement spec) : _nss(std::move(nss)) {
+ // We don't do any validation here, just a minimal check for the resume token. We also
+ // do not need to extract the token unless the stream is running on a single namespace.
+ if (_nss.isCollectionlessAggregateNS() || spec.type() != BSONType::Object) {
+ return;
+ }
+ // Check the 'resumeAfter' field first; if empty, check the 'startAfter' field.
+ auto specObj = spec.embeddedObject();
+ _resumeToken =
+ specObj.getObjectField(DocumentSourceChangeStreamSpec::kResumeAfterFieldName);
+ if (_resumeToken.isEmpty()) {
+ _resumeToken =
+ specObj.getObjectField(DocumentSourceChangeStreamSpec::kStartAfterFieldName);
+ }
+ }
bool isChangeStream() const final {
return true;
}
+ bool shouldResolveUUIDAndCollation() const final {
+ // If this is a whole-db or whole-cluster stream, never resolve the UUID and collation.
+ if (_nss.isCollectionlessAggregateNS()) {
+ return false;
+ }
+ // If we are not resuming, always resolve the UUID and collation.
+ if (_resumeToken.isEmpty()) {
+ return true;
+ }
+ // If we are resuming a single-collection stream from a high water mark that does not
+ // have a UUID, then the token was generated before the collection was created. Do not
+ // attempt to resolve the collection's current UUID or collation, so that the stream
+ // resumes in exactly the same condition as it was in when the token was generated.
+ auto tokenData = ResumeToken::parse(_resumeToken).getData();
+ return !(ResumeToken::isHighWaterMarkToken(tokenData) && !tokenData.uuid);
+ }
+
bool allowedToForwardFromMongos() const final {
return false;
}
@@ -98,6 +129,7 @@ public:
private:
const NamespaceString _nss;
+ BSONObj _resumeToken;
};
// The name of the field where the document key (_id and shard key, if present) will be found
diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
index 493b284ba75..e9435d5ced2 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
@@ -349,15 +349,6 @@ Document DocumentSourceChangeStreamTransform::applyTransformation(const Document
invariant(!uuid.missing(), "Saw a CRUD op without a UUID");
}
- // If the collection did not exist when the change stream was opened, then the UUID will not
- // have been obtained from the catalog. In this case, we set the UUID on the ExpressionContext
- // after obtaining it from the first relevant oplog entry, so that the UUID can be included in
- // high water mark tokens for change streams watching a single collection. The UUID is needed
- // for resumability against a single collection due to collation semantics.
- if (!pExpCtx->uuid && !uuid.missing() && pExpCtx->isSingleNamespaceAggregation()) {
- pExpCtx->uuid = uuid.getUuid();
- }
-
// Note that 'documentKey' and/or 'uuid' might be missing, in which case they will not appear
// in the output.
auto resumeTokenData = getResumeToken(ts, uuid, documentKey);
diff --git a/src/mongo/db/pipeline/document_source_watch_for_uuid.cpp b/src/mongo/db/pipeline/document_source_watch_for_uuid.cpp
new file mode 100644
index 00000000000..1c26e4681ba
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_watch_for_uuid.cpp
@@ -0,0 +1,59 @@
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/pipeline/document_source_watch_for_uuid.h"
+
+#include "mongo/db/pipeline/document_source_change_stream.h"
+#include "mongo/db/pipeline/resume_token.h"
+
+namespace mongo {
+
+DocumentSource::GetNextResult DocumentSourceWatchForUUID::getNext() {
+ pExpCtx->checkForInterrupt();
+
+ auto nextInput = pSource->getNext();
+ if (!nextInput.isAdvanced())
+ return nextInput;
+
+ // This single-collection stream was opened before the collection was created, and the pipeline
+ // does not know its UUID. When we see the first event, we update our expression context with
+ // the UUID drawn from that event's resume token.
+ if (!pExpCtx->uuid) {
+ auto resumeToken = ResumeToken::parse(
+ nextInput.getDocument()[DocumentSourceChangeStream::kIdField].getDocument());
+ pExpCtx->uuid = resumeToken.getData().uuid;
+ }
+
+ // Forward the result without modification.
+ return nextInput;
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_watch_for_uuid.h b/src/mongo/db/pipeline/document_source_watch_for_uuid.h
new file mode 100644
index 00000000000..1ff653378c1
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_watch_for_uuid.h
@@ -0,0 +1,82 @@
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/pipeline/document_source.h"
+
+namespace mongo {
+
+/**
+ * In the event that a single-collection $changeStream is opened on a namespace whose UUID is not
+ * known, this stage will be added to the pipeline on both mongoD and mongoS. When the first event
+ * is observed by the pipeline, DSWatchForUUID will extract the collection's UUID from the event's
+ * resume token, and will use it to populate the pipeline's ExpressionContext::uuid.
+ */
+class DocumentSourceWatchForUUID final : public DocumentSource {
+public:
+ GetNextResult getNext() final;
+
+ const char* getSourceName() const final {
+ return "$watchForUUID";
+ }
+
+ StageConstraints constraints(Pipeline::SplitState pipeState) const final {
+ return {StreamType::kStreaming,
+ PositionRequirement::kNone,
+ HostTypeRequirement::kNone,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kNotAllowed,
+ TransactionRequirement::kNotAllowed,
+ ChangeStreamRequirement::kChangeStreamStage};
+ }
+
+ Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final {
+ // This stage is created by the DocumentSourceChangeStream stage, so serializing it
+ // here would result in it being created twice.
+ return Value();
+ }
+
+ static boost::intrusive_ptr<DocumentSourceWatchForUUID> create(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx) {
+ // Only created for a single-collection stream where the UUID does not exist.
+ invariant(expCtx->isSingleNamespaceAggregation());
+ invariant(!expCtx->uuid);
+ return new DocumentSourceWatchForUUID(expCtx);
+ }
+
+private:
+ /**
+ * The static 'create' method must be used to build a DocumentSourceWatchForUUID.
+ */
+ DocumentSourceWatchForUUID(const boost::intrusive_ptr<ExpressionContext>& expCtx)
+ : DocumentSource(expCtx) {}
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/lite_parsed_document_source.h b/src/mongo/db/pipeline/lite_parsed_document_source.h
index fc3b88163f3..119740410b0 100644
--- a/src/mongo/db/pipeline/lite_parsed_document_source.h
+++ b/src/mongo/db/pipeline/lite_parsed_document_source.h
@@ -109,6 +109,14 @@ public:
}
/**
+ * Returns true if this pipeline's UUID and collation should be resolved. For the latter, this
+ * means adopting the collection's default collation, unless a custom collation was specified.
+ */
+ virtual bool shouldResolveUUIDAndCollation() const {
+ return true;
+ }
+
+ /**
* Returns true if this stage does not require an input source.
*/
virtual bool isInitialSource() const {
diff --git a/src/mongo/db/pipeline/lite_parsed_pipeline.h b/src/mongo/db/pipeline/lite_parsed_pipeline.h
index dabffc24c3e..47b17706cf1 100644
--- a/src/mongo/db/pipeline/lite_parsed_pipeline.h
+++ b/src/mongo/db/pipeline/lite_parsed_pipeline.h
@@ -53,7 +53,7 @@ public:
* May throw a AssertionException if there is an invalid stage specification, although full
* validation happens later, during Pipeline construction.
*/
- LiteParsedPipeline(const AggregationRequest& request) {
+ LiteParsedPipeline(const AggregationRequest& request) : _nss(request.getNamespaceString()) {
_stageSpecs.reserve(request.getPipeline().size());
for (auto&& rawStage : request.getPipeline()) {
@@ -104,6 +104,18 @@ public:
}
/**
+ * Returns true if this pipeline's UUID and collation should be resolved. For the latter, this
+ * means adopting the collection's default collation, unless a custom collation was specified.
+ */
+ bool shouldResolveUUIDAndCollation() const {
+ // Collectionless aggregations do not have a UUID or default collation.
+ return !_nss.isCollectionlessAggregateNS() &&
+ std::all_of(_stageSpecs.begin(), _stageSpecs.end(), [](auto&& spec) {
+ return spec->shouldResolveUUIDAndCollation();
+ });
+ }
+
+ /**
* Returns false if the pipeline has any stage which must be run locally on mongos.
*/
bool allowedToForwardFromMongos() const {
@@ -153,6 +165,7 @@ public:
private:
std::vector<std::unique_ptr<LiteParsedDocumentSource>> _stageSpecs;
+ NamespaceString _nss;
};
} // namespace mongo
diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp
index 5f42b1a725b..b3b50d2fb46 100644
--- a/src/mongo/db/pipeline/pipeline_test.cpp
+++ b/src/mongo/db/pipeline/pipeline_test.cpp
@@ -1574,6 +1574,7 @@ TEST(PipelineOptimizationTest, ChangeStreamLookupSwapsWithIndependentMatch) {
intrusive_ptr<ExpressionContext> expCtx(new ExpressionContextForTest(kTestNss));
expCtx->opCtx = opCtx.get();
+ expCtx->uuid = UUID::gen();
setMockReplicationCoordinatorOnOpCtx(expCtx->opCtx);
auto spec = BSON("$changeStream" << BSON("fullDocument"
@@ -1599,6 +1600,7 @@ TEST(PipelineOptimizationTest, ChangeStreamLookupDoesNotSwapWithMatchOnPostImage
intrusive_ptr<ExpressionContext> expCtx(new ExpressionContextForTest(kTestNss));
expCtx->opCtx = opCtx.get();
+ expCtx->uuid = UUID::gen();
setMockReplicationCoordinatorOnOpCtx(expCtx->opCtx);
auto spec = BSON("$changeStream" << BSON("fullDocument"
diff --git a/src/mongo/db/pipeline/resume_token.cpp b/src/mongo/db/pipeline/resume_token.cpp
index 0ccb720b82b..5ddaf5a073a 100644
--- a/src/mongo/db/pipeline/resume_token.cpp
+++ b/src/mongo/db/pipeline/resume_token.cpp
@@ -71,7 +71,7 @@ std::ostream& operator<<(std::ostream& out, const ResumeTokenData& tokenData) {
if (tokenData.version > 0) {
out << ", tokenType: " << tokenData.tokenType;
}
- out << ", applyOpsIndex" << tokenData.applyOpsIndex;
+ out << ", applyOpsIndex: " << tokenData.applyOpsIndex;
if (tokenData.version > 0) {
out << ", fromInvalidate: " << static_cast<bool>(tokenData.fromInvalidate);
}