diff options
author | Bernard Gorman <bernard.gorman@gmail.com> | 2019-03-05 14:47:48 +0000 |
---|---|---|
committer | Bernard Gorman <bernard.gorman@gmail.com> | 2019-03-11 20:02:07 -0400 |
commit | 43de192f45e15fefed5b5c033feee35848b1f066 (patch) | |
tree | 862460c0681d2fe4bce8ae6e5dc83dcc2fbd8d24 | |
parent | 4054c2b07cb658a44fc51d145a1688483e18d666 (diff) | |
download | mongo-43de192f45e15fefed5b5c033feee35848b1f066.tar.gz |
SERVER-39302 Remove all high-water-mark code necessary to support default collation inheritance for single-collection streams
14 files changed, 13 insertions, 272 deletions
diff --git a/jstests/noPassthrough/change_streams_single_ns_high_water_mark_includes_uuid.js b/jstests/noPassthrough/change_streams_single_ns_high_water_mark_includes_uuid.js deleted file mode 100644 index 11b55171c02..00000000000 --- a/jstests/noPassthrough/change_streams_single_ns_high_water_mark_includes_uuid.js +++ /dev/null @@ -1,73 +0,0 @@ -/** - * Tests that single-collection high water mark tokens on a sharded cluster always contain the - * collection's UUID, even if the collection is not present on all shards. - * @tags: [requires_replication, requires_journaling] - */ -(function() { - "use strict"; - - load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection. - - // Create a 2-shard cluster. Disable 'writePeriodicNoops' on the shards, since we want to - // manually control which shard advances and when. - const st = - new ShardingTest({shards: 2, rs: {nodes: 1, setParameter: {writePeriodicNoops: false}}}); - - // Obtain a connection to the mongoS and one direct connection to each shard. - const shard0 = st.rs0.getPrimary(); - const shard1 = st.rs1.getPrimary(); - const mongos = st.s; - - const mongosDB = mongos.getDB(jsTestName()); - const mongosColl = mongosDB.test; - - const shard1DB = shard1.getDB(jsTestName()); - const shard1Coll = shard1DB.test; - - const shard1UnrelatedColl = shard1.getCollection("otherdb.unrelated_collection"); - - const shardNames = [st.rs0.name, st.rs1.name]; - - function runHighWaterMarkTest(testCollOnlyOnShard0) { - // Open a stream on the test collection, and get the first available high-water-mark. - const csCursor = testCollOnlyOnShard0.watch(); - const firstHWM = csCursor.getResumeToken(); - assert(!csCursor.hasNext()); - // Write a document to the unrelated collection on shard1 to push its clusterTime forward. - assert.commandWorked(shard1UnrelatedColl.insert({})); - assert(!csCursor.hasNext()); - // Write a document to the test collection on shard0, advancing its optime to the present. - assert.commandWorked(testCollOnlyOnShard0.insert({})); - assert(!csCursor.hasNext()); - // Wait for the high water mark token to advance, and confirm that we do not see any events. - // This token is guaranteed to be from shard1, which does not have the collection, because - // otherwise we would have been able to retrieve the document we just wrote. - assert.soon(() => { - assert(!shard1Coll.exists()); - assert(!csCursor.hasNext()); - return bsonWoCompare(csCursor.getResumeToken(), firstHWM) > 0; - }); - // Confirm that we can resumeAfter a token from the shard that does not have the collection. - const hwmFromShardWithoutCollection = csCursor.getResumeToken(); - assert.commandWorked(testCollOnlyOnShard0.runCommand({ - aggregate: testCollOnlyOnShard0.getName(), - pipeline: [{$changeStream: {resumeAfter: hwmFromShardWithoutCollection}}], - cursor: {} - })); - } - - // Enable sharding on the the test database and ensure that the primary is shard0. - assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()})); - st.ensurePrimaryShard(mongosDB.getName(), shard0.name); - - // Create an unsharded collection on shard0 and confirm that HWM tokens return from a stream on - // this shard have the correct UUIDs, despite the fact that the stream is opened on all shards. - assertCreateCollection(mongosDB, mongosColl.getName()); - runHighWaterMarkTest(mongosColl); - - // Shard the collection on {_id: 1}, keep it on shard0 only, and re-run the test. - st.shardColl(mongosColl, {_id: 1}, false, false); - runHighWaterMarkTest(mongosColl); - - st.stop(); -})();
\ No newline at end of file diff --git a/src/mongo/db/exec/change_stream_proxy.cpp b/src/mongo/db/exec/change_stream_proxy.cpp index 2b79f6dd0fe..7750beeaf86 100644 --- a/src/mongo/db/exec/change_stream_proxy.cpp +++ b/src/mongo/db/exec/change_stream_proxy.cpp @@ -70,8 +70,7 @@ boost::optional<BSONObj> ChangeStreamProxyStage::getNextBson() { // at the current clusterTime. auto highWaterMark = PipelineD::getLatestOplogTimestamp(_pipeline.get()); if (highWaterMark > _latestOplogTimestamp) { - auto token = - ResumeToken::makeHighWaterMarkToken(highWaterMark, _pipeline->getContext()->uuid); + auto token = ResumeToken::makeHighWaterMarkToken(highWaterMark); _postBatchResumeToken = token.toDocument().toBson(); _latestOplogTimestamp = highWaterMark; _setSpeculativeReadTimestamp(); diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index cda26fc3e84..2480340ea2d 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -403,7 +403,6 @@ pipelineeEnv.Library( 'document_source_sort_by_count.cpp', 'document_source_tee_consumer.cpp', 'document_source_unwind.cpp', - 'document_source_watch_for_uuid.cpp', 'pipeline.cpp', 'sequential_document_cache.cpp', 'stage_constraints.cpp', diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp index 4984207ae4c..c20dccfe277 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream.cpp @@ -44,7 +44,6 @@ #include "mongo/db/pipeline/document_source_limit.h" #include "mongo/db/pipeline/document_source_lookup_change_post_image.h" #include "mongo/db/pipeline/document_source_sort.h" -#include "mongo/db/pipeline/document_source_watch_for_uuid.h" #include "mongo/db/pipeline/expression.h" #include "mongo/db/pipeline/lite_parsed_document_source.h" #include "mongo/db/pipeline/resume_token.h" @@ -360,21 +359,15 @@ list<intrusive_ptr<DocumentSource>> buildPipeline(const intrusive_ptr<Expression if (expCtx->initialPostBatchResumeToken.isEmpty()) { Timestamp startTime{startFrom->getSecs(), startFrom->getInc() + (!startFromInclusive)}; expCtx->initialPostBatchResumeToken = - ResumeToken::makeHighWaterMarkToken(startTime, expCtx->uuid).toDocument().toBson(); + ResumeToken::makeHighWaterMarkToken(startTime).toDocument().toBson(); } } + // Obtain the current FCV and use it to create the DocumentSourceChangeStreamTransform stage. const auto fcv = serverGlobalParams.featureCompatibility.getVersion(); stages.push_back( DocumentSourceChangeStreamTransform::create(expCtx, fcv, elem.embeddedObject())); - // If this is a single-collection stream but we don't have a UUID set on the expression context, - // then the stream was opened before the collection exists. Add a stage which will populate the - // UUID using the first change stream result observed by the pipeline during execution. - if (!expCtx->uuid && expCtx->isSingleNamespaceAggregation()) { - stages.push_back(DocumentSourceWatchForUUID::create(expCtx)); - } - // The resume stage must come after the check invalidate stage so that the former can determine // whether the event that matches the resume token should be followed by an "invalidate" event. stages.push_back(DocumentSourceCheckInvalidate::create(expCtx, ignoreFirstInvalidate)); @@ -419,13 +412,6 @@ list<intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::createFromBson( // to be merged, do not add a close cursor stage, since the mongos will already have one. stages.push_back(DocumentSourceCloseCursor::create(expCtx)); - // If this is a single-collection stream but we do not have a UUID set on the expression - // context, then the stream was opened before the collection exists. Add a stage on mongoS - // which will watch for and populate the UUID using the first result seen by the pipeline. - if (expCtx->inMongos && !expCtx->uuid && expCtx->isSingleNamespaceAggregation()) { - stages.push_back(DocumentSourceWatchForUUID::create(expCtx)); - } - // There should be only one post-image lookup stage. If we're on the shards and producing // input to be merged, the lookup is done on the mongos. if (shouldLookupPostImage) { diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.cpp b/src/mongo/db/pipeline/document_source_check_resume_token.cpp index 3c6eb438ad8..97ee1b74505 100644 --- a/src/mongo/db/pipeline/document_source_check_resume_token.cpp +++ b/src/mongo/db/pipeline/document_source_check_resume_token.cpp @@ -222,7 +222,7 @@ Value DocumentSourceShardCheckResumability::serialize( intrusive_ptr<DocumentSourceShardCheckResumability> DocumentSourceShardCheckResumability::create( const intrusive_ptr<ExpressionContext>& expCtx, Timestamp ts) { // We are resuming from a point in time, not an event. Seed the stage with a high water mark. - return create(expCtx, ResumeToken::makeHighWaterMarkToken(ts, expCtx->uuid).getData()); + return create(expCtx, ResumeToken::makeHighWaterMarkToken(ts).getData()); } intrusive_ptr<DocumentSourceShardCheckResumability> DocumentSourceShardCheckResumability::create( diff --git a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp index cbc345fef7f..b038d1dd418 100644 --- a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp +++ b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp @@ -148,8 +148,7 @@ protected: return shardCheckResumability; } intrusive_ptr<DocumentSourceShardCheckResumability> createShardCheckResumability(Timestamp ts) { - return createShardCheckResumability( - ResumeToken::makeHighWaterMarkToken(ts, testUuid()).getData()); + return createShardCheckResumability(ResumeToken::makeHighWaterMarkToken(ts).getData()); } }; diff --git a/src/mongo/db/pipeline/document_source_watch_for_uuid.cpp b/src/mongo/db/pipeline/document_source_watch_for_uuid.cpp deleted file mode 100644 index 1c26e4681ba..00000000000 --- a/src/mongo/db/pipeline/document_source_watch_for_uuid.cpp +++ /dev/null @@ -1,59 +0,0 @@ -/** - * Copyright (C) 2019-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#include "mongo/platform/basic.h" - -#include "mongo/db/pipeline/document_source_watch_for_uuid.h" - -#include "mongo/db/pipeline/document_source_change_stream.h" -#include "mongo/db/pipeline/resume_token.h" - -namespace mongo { - -DocumentSource::GetNextResult DocumentSourceWatchForUUID::getNext() { - pExpCtx->checkForInterrupt(); - - auto nextInput = pSource->getNext(); - if (!nextInput.isAdvanced()) - return nextInput; - - // This single-collection stream was opened before the collection was created, and the pipeline - // does not know its UUID. When we see the first event, we update our expression context with - // the UUID drawn from that event's resume token. - if (!pExpCtx->uuid) { - auto resumeToken = ResumeToken::parse( - nextInput.getDocument()[DocumentSourceChangeStream::kIdField].getDocument()); - pExpCtx->uuid = resumeToken.getData().uuid; - } - - // Forward the result without modification. - return nextInput; -} - -} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_watch_for_uuid.h b/src/mongo/db/pipeline/document_source_watch_for_uuid.h deleted file mode 100644 index 6570454fb05..00000000000 --- a/src/mongo/db/pipeline/document_source_watch_for_uuid.h +++ /dev/null @@ -1,86 +0,0 @@ -/** - * Copyright (C) 2019-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include "mongo/db/pipeline/document_source.h" - -namespace mongo { - -/** - * In the event that a single-collection $changeStream is opened on a namespace whose UUID is not - * known, this stage will be added to the pipeline on both mongoD and mongoS. When the first event - * is observed by the pipeline, DSWatchForUUID will extract the collection's UUID from the event's - * resume token, and will use it to populate the pipeline's ExpressionContext::uuid. - */ -class DocumentSourceWatchForUUID final : public DocumentSource { -public: - GetNextResult getNext() final; - - const char* getSourceName() const final { - return "$watchForUUID"; - } - - StageConstraints constraints(Pipeline::SplitState pipeState) const final { - return {StreamType::kStreaming, - PositionRequirement::kNone, - HostTypeRequirement::kNone, - DiskUseRequirement::kNoDiskUse, - FacetRequirement::kNotAllowed, - TransactionRequirement::kNotAllowed, - ChangeStreamRequirement::kChangeStreamStage}; - } - - Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final { - // This stage is created by the DocumentSourceChangeStream stage, so serializing it - // here would result in it being created twice. - return Value(); - } - - boost::optional<MergingLogic> mergingLogic() final { - return boost::none; - } - - static boost::intrusive_ptr<DocumentSourceWatchForUUID> create( - const boost::intrusive_ptr<ExpressionContext>& expCtx) { - // Only created for a single-collection stream where the UUID does not exist. - invariant(expCtx->isSingleNamespaceAggregation()); - invariant(!expCtx->uuid); - return new DocumentSourceWatchForUUID(expCtx); - } - -private: - /** - * The static 'create' method must be used to build a DocumentSourceWatchForUUID. - */ - DocumentSourceWatchForUUID(const boost::intrusive_ptr<ExpressionContext>& expCtx) - : DocumentSource(expCtx) {} -}; - -} // namespace mongo diff --git a/src/mongo/db/pipeline/resume_token.cpp b/src/mongo/db/pipeline/resume_token.cpp index f64b1764adb..e9b06a74e9e 100644 --- a/src/mongo/db/pipeline/resume_token.cpp +++ b/src/mongo/db/pipeline/resume_token.cpp @@ -230,8 +230,8 @@ ResumeToken ResumeToken::parse(const Document& resumeDoc) { return ResumeToken(resumeDoc); } -ResumeToken ResumeToken::makeHighWaterMarkToken(Timestamp clusterTime, boost::optional<UUID> uuid) { - return ResumeToken(makeHighWaterMarkResumeTokenData(clusterTime, uuid)); +ResumeToken ResumeToken::makeHighWaterMarkToken(Timestamp clusterTime) { + return ResumeToken(makeHighWaterMarkResumeTokenData(clusterTime, boost::none)); } bool ResumeToken::isHighWaterMarkToken(const ResumeTokenData& tokenData) { diff --git a/src/mongo/db/pipeline/resume_token.h b/src/mongo/db/pipeline/resume_token.h index 19399123150..52ae1b1ab1e 100644 --- a/src/mongo/db/pipeline/resume_token.h +++ b/src/mongo/db/pipeline/resume_token.h @@ -117,9 +117,9 @@ public: static ResumeToken parse(const Document& document); /** - * Generate a high-water-mark token for 'clusterTime', with an optional UUID and no documentKey. + * Generate a high-water-mark token for 'clusterTime', with no UUID or documentKey. */ - static ResumeToken makeHighWaterMarkToken(Timestamp clusterTime, boost::optional<UUID> uuid); + static ResumeToken makeHighWaterMarkToken(Timestamp clusterTime); /** * Returns true if the given token data represents a valid high-water-mark resume token; that diff --git a/src/mongo/db/query/cursor_response_test.cpp b/src/mongo/db/query/cursor_response_test.cpp index 682cbf462ab..6a3a2229813 100644 --- a/src/mongo/db/query/cursor_response_test.cpp +++ b/src/mongo/db/query/cursor_response_test.cpp @@ -337,7 +337,7 @@ TEST(CursorResponseTest, serializeLatestOplogEntry) { TEST(CursorResponseTest, serializePostBatchResumeToken) { std::vector<BSONObj> batch = {BSON("_id" << 1), BSON("_id" << 2)}; auto postBatchResumeToken = - ResumeToken::makeHighWaterMarkToken(Timestamp(1, 2), boost::none).toDocument().toBson(); + ResumeToken::makeHighWaterMarkToken(Timestamp(1, 2)).toDocument().toBson(); CursorResponse response(NamespaceString("db.coll"), CursorId(123), batch, diff --git a/src/mongo/s/query/async_results_merger_test.cpp b/src/mongo/s/query/async_results_merger_test.cpp index db9c70b2c87..30d3a182c42 100644 --- a/src/mongo/s/query/async_results_merger_test.cpp +++ b/src/mongo/s/query/async_results_merger_test.cpp @@ -54,7 +54,7 @@ LogicalSessionId parseSessionIdFromCmd(BSONObj cmdObj) { } BSONObj makePostBatchResumeToken(Timestamp clusterTime) { - auto pbrt = ResumeToken::makeHighWaterMarkToken(clusterTime, boost::none).toDocument().toBson(); + auto pbrt = ResumeToken::makeHighWaterMarkToken(clusterTime).toDocument().toBson(); invariant(pbrt.firstElement().type() == BSONType::String); return pbrt; } diff --git a/src/mongo/s/query/router_stage_pipeline.cpp b/src/mongo/s/query/router_stage_pipeline.cpp index 5b75eb43880..ce1c56c103b 100644 --- a/src/mongo/s/query/router_stage_pipeline.cpp +++ b/src/mongo/s/query/router_stage_pipeline.cpp @@ -87,26 +87,7 @@ std::size_t RouterStagePipeline::getNumRemotes() const { } BSONObj RouterStagePipeline::getPostBatchResumeToken() const { - auto pbrt = _mergeCursorsStage ? _mergeCursorsStage->getHighWaterMark() : BSONObj(); - return pbrt.isEmpty() ? pbrt : _setPostBatchResumeTokenUUID(pbrt); -} - -BSONObj RouterStagePipeline::_setPostBatchResumeTokenUUID(BSONObj pbrt) const { - // If the PBRT does not match the sort key of the latest document, it is a high water mark. - const bool isHighWaterMark = !pbrt.binaryEqual(_latestSortKey); - - // If this stream is on a single collection and the token is a high water mark, then it may have - // come from a shard that does not have the collection. If so, we must fill in the correct UUID. - if (isHighWaterMark && _mergePipeline->getContext()->uuid) { - auto tokenData = ResumeToken::parse(pbrt).getData(); - // Check whether the UUID is missing before regenerating the token. - if (!tokenData.uuid) { - invariant(tokenData.tokenType == ResumeTokenData::kHighWaterMarkToken); - tokenData.uuid = _mergePipeline->getContext()->uuid; - pbrt = ResumeToken(tokenData).toDocument().toBson(); - } - } - return pbrt; + return _mergeCursorsStage ? _mergeCursorsStage->getHighWaterMark() : BSONObj(); } BSONObj RouterStagePipeline::_validateAndConvertToBSON(const Document& event) { @@ -130,8 +111,7 @@ BSONObj RouterStagePipeline::_validateAndConvertToBSON(const Document& event) { << (eventBSON["_id"] ? BSON("_id" << eventBSON["_id"]) : BSONObj()), idField.binaryEqual(resumeToken)); - // Record the latest resume token for later comparison, then return the event in BSONObj form. - _latestSortKey = resumeToken; + // Return the event in BSONObj form, minus the $sortKey metadata. return eventBSON; } diff --git a/src/mongo/s/query/router_stage_pipeline.h b/src/mongo/s/query/router_stage_pipeline.h index c6a9d221c5d..de0dc25b310 100644 --- a/src/mongo/s/query/router_stage_pipeline.h +++ b/src/mongo/s/query/router_stage_pipeline.h @@ -65,13 +65,9 @@ protected: private: BSONObj _validateAndConvertToBSON(const Document& event); - BSONObj _setPostBatchResumeTokenUUID(BSONObj pbrt) const; - std::unique_ptr<Pipeline, PipelineDeleter> _mergePipeline; // May be null if this pipeline runs exclusively on mongos without contacting the shards at all. boost::intrusive_ptr<DocumentSourceMergeCursors> _mergeCursorsStage; - - BSONObj _latestSortKey; }; } // namespace mongo |