summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorBernard Gorman <bernard.gorman@gmail.com>2019-03-05 14:47:48 +0000
committerBernard Gorman <bernard.gorman@gmail.com>2019-03-11 20:02:07 -0400
commit43de192f45e15fefed5b5c033feee35848b1f066 (patch)
tree862460c0681d2fe4bce8ae6e5dc83dcc2fbd8d24 /src/mongo
parent4054c2b07cb658a44fc51d145a1688483e18d666 (diff)
downloadmongo-43de192f45e15fefed5b5c033feee35848b1f066.tar.gz
SERVER-39302 Remove all high-water-mark code necessary to support default collation inheritance for single-collection streams
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/exec/change_stream_proxy.cpp3
-rw-r--r--src/mongo/db/pipeline/SConscript1
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.cpp18
-rw-r--r--src/mongo/db/pipeline/document_source_check_resume_token.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_check_resume_token_test.cpp3
-rw-r--r--src/mongo/db/pipeline/document_source_watch_for_uuid.cpp59
-rw-r--r--src/mongo/db/pipeline/document_source_watch_for_uuid.h86
-rw-r--r--src/mongo/db/pipeline/resume_token.cpp4
-rw-r--r--src/mongo/db/pipeline/resume_token.h4
-rw-r--r--src/mongo/db/query/cursor_response_test.cpp2
-rw-r--r--src/mongo/s/query/async_results_merger_test.cpp2
-rw-r--r--src/mongo/s/query/router_stage_pipeline.cpp24
-rw-r--r--src/mongo/s/query/router_stage_pipeline.h4
13 files changed, 13 insertions, 199 deletions
diff --git a/src/mongo/db/exec/change_stream_proxy.cpp b/src/mongo/db/exec/change_stream_proxy.cpp
index 2b79f6dd0fe..7750beeaf86 100644
--- a/src/mongo/db/exec/change_stream_proxy.cpp
+++ b/src/mongo/db/exec/change_stream_proxy.cpp
@@ -70,8 +70,7 @@ boost::optional<BSONObj> ChangeStreamProxyStage::getNextBson() {
// at the current clusterTime.
auto highWaterMark = PipelineD::getLatestOplogTimestamp(_pipeline.get());
if (highWaterMark > _latestOplogTimestamp) {
- auto token =
- ResumeToken::makeHighWaterMarkToken(highWaterMark, _pipeline->getContext()->uuid);
+ auto token = ResumeToken::makeHighWaterMarkToken(highWaterMark);
_postBatchResumeToken = token.toDocument().toBson();
_latestOplogTimestamp = highWaterMark;
_setSpeculativeReadTimestamp();
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index cda26fc3e84..2480340ea2d 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -403,7 +403,6 @@ pipelineeEnv.Library(
'document_source_sort_by_count.cpp',
'document_source_tee_consumer.cpp',
'document_source_unwind.cpp',
- 'document_source_watch_for_uuid.cpp',
'pipeline.cpp',
'sequential_document_cache.cpp',
'stage_constraints.cpp',
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp
index 4984207ae4c..c20dccfe277 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream.cpp
@@ -44,7 +44,6 @@
#include "mongo/db/pipeline/document_source_limit.h"
#include "mongo/db/pipeline/document_source_lookup_change_post_image.h"
#include "mongo/db/pipeline/document_source_sort.h"
-#include "mongo/db/pipeline/document_source_watch_for_uuid.h"
#include "mongo/db/pipeline/expression.h"
#include "mongo/db/pipeline/lite_parsed_document_source.h"
#include "mongo/db/pipeline/resume_token.h"
@@ -360,21 +359,15 @@ list<intrusive_ptr<DocumentSource>> buildPipeline(const intrusive_ptr<Expression
if (expCtx->initialPostBatchResumeToken.isEmpty()) {
Timestamp startTime{startFrom->getSecs(), startFrom->getInc() + (!startFromInclusive)};
expCtx->initialPostBatchResumeToken =
- ResumeToken::makeHighWaterMarkToken(startTime, expCtx->uuid).toDocument().toBson();
+ ResumeToken::makeHighWaterMarkToken(startTime).toDocument().toBson();
}
}
+ // Obtain the current FCV and use it to create the DocumentSourceChangeStreamTransform stage.
const auto fcv = serverGlobalParams.featureCompatibility.getVersion();
stages.push_back(
DocumentSourceChangeStreamTransform::create(expCtx, fcv, elem.embeddedObject()));
- // If this is a single-collection stream but we don't have a UUID set on the expression context,
- // then the stream was opened before the collection exists. Add a stage which will populate the
- // UUID using the first change stream result observed by the pipeline during execution.
- if (!expCtx->uuid && expCtx->isSingleNamespaceAggregation()) {
- stages.push_back(DocumentSourceWatchForUUID::create(expCtx));
- }
-
// The resume stage must come after the check invalidate stage so that the former can determine
// whether the event that matches the resume token should be followed by an "invalidate" event.
stages.push_back(DocumentSourceCheckInvalidate::create(expCtx, ignoreFirstInvalidate));
@@ -419,13 +412,6 @@ list<intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::createFromBson(
// to be merged, do not add a close cursor stage, since the mongos will already have one.
stages.push_back(DocumentSourceCloseCursor::create(expCtx));
- // If this is a single-collection stream but we do not have a UUID set on the expression
- // context, then the stream was opened before the collection exists. Add a stage on mongoS
- // which will watch for and populate the UUID using the first result seen by the pipeline.
- if (expCtx->inMongos && !expCtx->uuid && expCtx->isSingleNamespaceAggregation()) {
- stages.push_back(DocumentSourceWatchForUUID::create(expCtx));
- }
-
// There should be only one post-image lookup stage. If we're on the shards and producing
// input to be merged, the lookup is done on the mongos.
if (shouldLookupPostImage) {
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.cpp b/src/mongo/db/pipeline/document_source_check_resume_token.cpp
index 3c6eb438ad8..97ee1b74505 100644
--- a/src/mongo/db/pipeline/document_source_check_resume_token.cpp
+++ b/src/mongo/db/pipeline/document_source_check_resume_token.cpp
@@ -222,7 +222,7 @@ Value DocumentSourceShardCheckResumability::serialize(
intrusive_ptr<DocumentSourceShardCheckResumability> DocumentSourceShardCheckResumability::create(
const intrusive_ptr<ExpressionContext>& expCtx, Timestamp ts) {
// We are resuming from a point in time, not an event. Seed the stage with a high water mark.
- return create(expCtx, ResumeToken::makeHighWaterMarkToken(ts, expCtx->uuid).getData());
+ return create(expCtx, ResumeToken::makeHighWaterMarkToken(ts).getData());
}
intrusive_ptr<DocumentSourceShardCheckResumability> DocumentSourceShardCheckResumability::create(
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
index cbc345fef7f..b038d1dd418 100644
--- a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
+++ b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
@@ -148,8 +148,7 @@ protected:
return shardCheckResumability;
}
intrusive_ptr<DocumentSourceShardCheckResumability> createShardCheckResumability(Timestamp ts) {
- return createShardCheckResumability(
- ResumeToken::makeHighWaterMarkToken(ts, testUuid()).getData());
+ return createShardCheckResumability(ResumeToken::makeHighWaterMarkToken(ts).getData());
}
};
diff --git a/src/mongo/db/pipeline/document_source_watch_for_uuid.cpp b/src/mongo/db/pipeline/document_source_watch_for_uuid.cpp
deleted file mode 100644
index 1c26e4681ba..00000000000
--- a/src/mongo/db/pipeline/document_source_watch_for_uuid.cpp
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Copyright (C) 2019-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/db/pipeline/document_source_watch_for_uuid.h"
-
-#include "mongo/db/pipeline/document_source_change_stream.h"
-#include "mongo/db/pipeline/resume_token.h"
-
-namespace mongo {
-
-DocumentSource::GetNextResult DocumentSourceWatchForUUID::getNext() {
- pExpCtx->checkForInterrupt();
-
- auto nextInput = pSource->getNext();
- if (!nextInput.isAdvanced())
- return nextInput;
-
- // This single-collection stream was opened before the collection was created, and the pipeline
- // does not know its UUID. When we see the first event, we update our expression context with
- // the UUID drawn from that event's resume token.
- if (!pExpCtx->uuid) {
- auto resumeToken = ResumeToken::parse(
- nextInput.getDocument()[DocumentSourceChangeStream::kIdField].getDocument());
- pExpCtx->uuid = resumeToken.getData().uuid;
- }
-
- // Forward the result without modification.
- return nextInput;
-}
-
-} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_watch_for_uuid.h b/src/mongo/db/pipeline/document_source_watch_for_uuid.h
deleted file mode 100644
index 6570454fb05..00000000000
--- a/src/mongo/db/pipeline/document_source_watch_for_uuid.h
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
- * Copyright (C) 2019-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#pragma once
-
-#include "mongo/db/pipeline/document_source.h"
-
-namespace mongo {
-
-/**
- * In the event that a single-collection $changeStream is opened on a namespace whose UUID is not
- * known, this stage will be added to the pipeline on both mongoD and mongoS. When the first event
- * is observed by the pipeline, DSWatchForUUID will extract the collection's UUID from the event's
- * resume token, and will use it to populate the pipeline's ExpressionContext::uuid.
- */
-class DocumentSourceWatchForUUID final : public DocumentSource {
-public:
- GetNextResult getNext() final;
-
- const char* getSourceName() const final {
- return "$watchForUUID";
- }
-
- StageConstraints constraints(Pipeline::SplitState pipeState) const final {
- return {StreamType::kStreaming,
- PositionRequirement::kNone,
- HostTypeRequirement::kNone,
- DiskUseRequirement::kNoDiskUse,
- FacetRequirement::kNotAllowed,
- TransactionRequirement::kNotAllowed,
- ChangeStreamRequirement::kChangeStreamStage};
- }
-
- Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final {
- // This stage is created by the DocumentSourceChangeStream stage, so serializing it
- // here would result in it being created twice.
- return Value();
- }
-
- boost::optional<MergingLogic> mergingLogic() final {
- return boost::none;
- }
-
- static boost::intrusive_ptr<DocumentSourceWatchForUUID> create(
- const boost::intrusive_ptr<ExpressionContext>& expCtx) {
- // Only created for a single-collection stream where the UUID does not exist.
- invariant(expCtx->isSingleNamespaceAggregation());
- invariant(!expCtx->uuid);
- return new DocumentSourceWatchForUUID(expCtx);
- }
-
-private:
- /**
- * The static 'create' method must be used to build a DocumentSourceWatchForUUID.
- */
- DocumentSourceWatchForUUID(const boost::intrusive_ptr<ExpressionContext>& expCtx)
- : DocumentSource(expCtx) {}
-};
-
-} // namespace mongo
diff --git a/src/mongo/db/pipeline/resume_token.cpp b/src/mongo/db/pipeline/resume_token.cpp
index f64b1764adb..e9b06a74e9e 100644
--- a/src/mongo/db/pipeline/resume_token.cpp
+++ b/src/mongo/db/pipeline/resume_token.cpp
@@ -230,8 +230,8 @@ ResumeToken ResumeToken::parse(const Document& resumeDoc) {
return ResumeToken(resumeDoc);
}
-ResumeToken ResumeToken::makeHighWaterMarkToken(Timestamp clusterTime, boost::optional<UUID> uuid) {
- return ResumeToken(makeHighWaterMarkResumeTokenData(clusterTime, uuid));
+ResumeToken ResumeToken::makeHighWaterMarkToken(Timestamp clusterTime) {
+ return ResumeToken(makeHighWaterMarkResumeTokenData(clusterTime, boost::none));
}
bool ResumeToken::isHighWaterMarkToken(const ResumeTokenData& tokenData) {
diff --git a/src/mongo/db/pipeline/resume_token.h b/src/mongo/db/pipeline/resume_token.h
index 19399123150..52ae1b1ab1e 100644
--- a/src/mongo/db/pipeline/resume_token.h
+++ b/src/mongo/db/pipeline/resume_token.h
@@ -117,9 +117,9 @@ public:
static ResumeToken parse(const Document& document);
/**
- * Generate a high-water-mark token for 'clusterTime', with an optional UUID and no documentKey.
+ * Generate a high-water-mark token for 'clusterTime', with no UUID or documentKey.
*/
- static ResumeToken makeHighWaterMarkToken(Timestamp clusterTime, boost::optional<UUID> uuid);
+ static ResumeToken makeHighWaterMarkToken(Timestamp clusterTime);
/**
* Returns true if the given token data represents a valid high-water-mark resume token; that
diff --git a/src/mongo/db/query/cursor_response_test.cpp b/src/mongo/db/query/cursor_response_test.cpp
index 682cbf462ab..6a3a2229813 100644
--- a/src/mongo/db/query/cursor_response_test.cpp
+++ b/src/mongo/db/query/cursor_response_test.cpp
@@ -337,7 +337,7 @@ TEST(CursorResponseTest, serializeLatestOplogEntry) {
TEST(CursorResponseTest, serializePostBatchResumeToken) {
std::vector<BSONObj> batch = {BSON("_id" << 1), BSON("_id" << 2)};
auto postBatchResumeToken =
- ResumeToken::makeHighWaterMarkToken(Timestamp(1, 2), boost::none).toDocument().toBson();
+ ResumeToken::makeHighWaterMarkToken(Timestamp(1, 2)).toDocument().toBson();
CursorResponse response(NamespaceString("db.coll"),
CursorId(123),
batch,
diff --git a/src/mongo/s/query/async_results_merger_test.cpp b/src/mongo/s/query/async_results_merger_test.cpp
index db9c70b2c87..30d3a182c42 100644
--- a/src/mongo/s/query/async_results_merger_test.cpp
+++ b/src/mongo/s/query/async_results_merger_test.cpp
@@ -54,7 +54,7 @@ LogicalSessionId parseSessionIdFromCmd(BSONObj cmdObj) {
}
BSONObj makePostBatchResumeToken(Timestamp clusterTime) {
- auto pbrt = ResumeToken::makeHighWaterMarkToken(clusterTime, boost::none).toDocument().toBson();
+ auto pbrt = ResumeToken::makeHighWaterMarkToken(clusterTime).toDocument().toBson();
invariant(pbrt.firstElement().type() == BSONType::String);
return pbrt;
}
diff --git a/src/mongo/s/query/router_stage_pipeline.cpp b/src/mongo/s/query/router_stage_pipeline.cpp
index 5b75eb43880..ce1c56c103b 100644
--- a/src/mongo/s/query/router_stage_pipeline.cpp
+++ b/src/mongo/s/query/router_stage_pipeline.cpp
@@ -87,26 +87,7 @@ std::size_t RouterStagePipeline::getNumRemotes() const {
}
BSONObj RouterStagePipeline::getPostBatchResumeToken() const {
- auto pbrt = _mergeCursorsStage ? _mergeCursorsStage->getHighWaterMark() : BSONObj();
- return pbrt.isEmpty() ? pbrt : _setPostBatchResumeTokenUUID(pbrt);
-}
-
-BSONObj RouterStagePipeline::_setPostBatchResumeTokenUUID(BSONObj pbrt) const {
- // If the PBRT does not match the sort key of the latest document, it is a high water mark.
- const bool isHighWaterMark = !pbrt.binaryEqual(_latestSortKey);
-
- // If this stream is on a single collection and the token is a high water mark, then it may have
- // come from a shard that does not have the collection. If so, we must fill in the correct UUID.
- if (isHighWaterMark && _mergePipeline->getContext()->uuid) {
- auto tokenData = ResumeToken::parse(pbrt).getData();
- // Check whether the UUID is missing before regenerating the token.
- if (!tokenData.uuid) {
- invariant(tokenData.tokenType == ResumeTokenData::kHighWaterMarkToken);
- tokenData.uuid = _mergePipeline->getContext()->uuid;
- pbrt = ResumeToken(tokenData).toDocument().toBson();
- }
- }
- return pbrt;
+ return _mergeCursorsStage ? _mergeCursorsStage->getHighWaterMark() : BSONObj();
}
BSONObj RouterStagePipeline::_validateAndConvertToBSON(const Document& event) {
@@ -130,8 +111,7 @@ BSONObj RouterStagePipeline::_validateAndConvertToBSON(const Document& event) {
<< (eventBSON["_id"] ? BSON("_id" << eventBSON["_id"]) : BSONObj()),
idField.binaryEqual(resumeToken));
- // Record the latest resume token for later comparison, then return the event in BSONObj form.
- _latestSortKey = resumeToken;
+ // Return the event in BSONObj form, minus the $sortKey metadata.
return eventBSON;
}
diff --git a/src/mongo/s/query/router_stage_pipeline.h b/src/mongo/s/query/router_stage_pipeline.h
index c6a9d221c5d..de0dc25b310 100644
--- a/src/mongo/s/query/router_stage_pipeline.h
+++ b/src/mongo/s/query/router_stage_pipeline.h
@@ -65,13 +65,9 @@ protected:
private:
BSONObj _validateAndConvertToBSON(const Document& event);
- BSONObj _setPostBatchResumeTokenUUID(BSONObj pbrt) const;
-
std::unique_ptr<Pipeline, PipelineDeleter> _mergePipeline;
// May be null if this pipeline runs exclusively on mongos without contacting the shards at all.
boost::intrusive_ptr<DocumentSourceMergeCursors> _mergeCursorsStage;
-
- BSONObj _latestSortKey;
};
} // namespace mongo