diff options
author | Bernard Gorman <bernard.gorman@gmail.com> | 2018-12-13 20:54:06 +0000 |
---|---|---|
committer | Bernard Gorman <bernard.gorman@gmail.com> | 2019-02-03 15:06:46 +0000 |
commit | 065f3ef77de57609d92fce482c1e4e36b15cf29c (patch) | |
tree | 52a3735f31b7b5632e7e04d4ed8657f75e12c1f9 /src | |
parent | fbb9daeb791d16c49b861d82c097cf9bd9daf07e (diff) | |
download | mongo-065f3ef77de57609d92fce482c1e4e36b15cf29c.tar.gz |
SERVER-37786 Reject change stream pipelines which modify or project-out the resume token
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/exec/change_stream_proxy.cpp | 25 | ||||
-rw-r--r-- | src/mongo/db/exec/change_stream_proxy.h | 8 | ||||
-rw-r--r-- | src/mongo/s/query/router_stage_pipeline.cpp | 29 | ||||
-rw-r--r-- | src/mongo/s/query/router_stage_pipeline.h | 6 |
4 files changed, 58 insertions, 10 deletions
diff --git a/src/mongo/db/exec/change_stream_proxy.cpp b/src/mongo/db/exec/change_stream_proxy.cpp index 65c52bff0cd..46e5beb60f4 100644 --- a/src/mongo/db/exec/change_stream_proxy.cpp +++ b/src/mongo/db/exec/change_stream_proxy.cpp @@ -56,7 +56,7 @@ boost::optional<BSONObj> ChangeStreamProxyStage::getNextBson() { if (auto next = _pipeline->getNext()) { // While we have more results to return, we track both the timestamp and the resume token of // the latest event observed in the oplog, the latter via its sort key metadata field. - auto nextBSON = (_includeMetaData ? next->toBsonWithMetaData() : next->toBson()); + auto nextBSON = _validateAndConvertToBSON(*next); _latestOplogTimestamp = PipelineD::getLatestOplogTimestamp(_pipeline.get()); _postBatchResumeToken = next->getSortKeyMetaField(); _setSpeculativeReadOpTime(); @@ -79,6 +79,29 @@ boost::optional<BSONObj> ChangeStreamProxyStage::getNextBson() { return boost::none; } +BSONObj ChangeStreamProxyStage::_validateAndConvertToBSON(const Document& event) const { + // If we are producing output to be merged on mongoS, then no stages can have modified the _id. + if (_includeMetaData) { + return event.toBsonWithMetaData(); + } + // Confirm that the document _id field matches the original resume token in the sort key field. + auto eventBSON = event.toBson(); + auto resumeToken = event.getSortKeyMetaField(); + auto idField = eventBSON.getObjectField("_id"); + invariant(!resumeToken.isEmpty()); + uassert(51059, + str::stream() << "Encountered an event whose _id field, which contains the resume " + "token, was modified by the pipeline. Modifying the _id field of an " + "event makes it impossible to resume the stream from that point. Only " + "transformations that retain the unmodified _id field are allowed. " + "Expected: " + << BSON("_id" << resumeToken) + << " but found: " + << (eventBSON["_id"] ? BSON("_id" << eventBSON["_id"]) : BSONObj()), + idField.binaryEqual(resumeToken)); + return eventBSON; +} + void ChangeStreamProxyStage::_setSpeculativeReadOpTime() { repl::SpeculativeMajorityReadInfo& speculativeMajorityReadInfo = repl::SpeculativeMajorityReadInfo::get(_pipeline->getContext()->opCtx); diff --git a/src/mongo/db/exec/change_stream_proxy.h b/src/mongo/db/exec/change_stream_proxy.h index 380e5c4a09d..8a9a3dcc40a 100644 --- a/src/mongo/db/exec/change_stream_proxy.h +++ b/src/mongo/db/exec/change_stream_proxy.h @@ -81,8 +81,12 @@ protected: private: /** - * Set the speculative majority read optime if we have scanned the oplog up to a certain - * timestamp. + * Verifies that the docs's resume token has not been modified, then converts the doc to BSON. + */ + BSONObj _validateAndConvertToBSON(const Document& event) const; + + /** + * Set the speculative majority read optime if we have scanned up to a certain oplog timestamp. */ void _setSpeculativeReadOpTime(); diff --git a/src/mongo/s/query/router_stage_pipeline.cpp b/src/mongo/s/query/router_stage_pipeline.cpp index 0811c130abf..e9b333a0152 100644 --- a/src/mongo/s/query/router_stage_pipeline.cpp +++ b/src/mongo/s/query/router_stage_pipeline.cpp @@ -55,8 +55,7 @@ StatusWith<ClusterQueryResult> RouterStagePipeline::next(RouterExecStage::ExecCo // Pipeline::getNext will return a boost::optional<Document> or boost::none if EOF. if (auto result = _mergePipeline->getNext()) { - _validateAndRecordSortKey(*result); - return {result->toBson()}; + return _validateAndConvertToBSON(*result); } // If we reach this point, we have hit EOF. @@ -111,8 +110,30 @@ BSONObj RouterStagePipeline::_setPostBatchResumeTokenUUID(BSONObj pbrt) const { return pbrt; } -void RouterStagePipeline::_validateAndRecordSortKey(const Document& doc) { - _latestSortKey = doc.getSortKeyMetaField(); +BSONObj RouterStagePipeline::_validateAndConvertToBSON(const Document& event) { + // If this is not a change stream pipeline, we have nothing to do except return the BSONObj. + if (!_mergePipeline->getContext()->isTailableAwaitData()) { + return event.toBson(); + } + // Confirm that the document _id field matches the original resume token in the sort key field. + auto eventBSON = event.toBson(); + auto resumeToken = event.getSortKeyMetaField(); + auto idField = eventBSON.getObjectField("_id"); + invariant(!resumeToken.isEmpty()); + uassert(51060, + str::stream() << "Encountered an event whose _id field, which contains the resume " + "token, was modified by the pipeline. Modifying the _id field of an " + "event makes it impossible to resume the stream from that point. Only " + "transformations that retain the unmodified _id field are allowed. " + "Expected: " + << BSON("_id" << resumeToken) + << " but found: " + << (eventBSON["_id"] ? BSON("_id" << eventBSON["_id"]) : BSONObj()), + idField.binaryEqual(resumeToken)); + + // Record the latest resume token for later comparison, then return the event in BSONObj form. + _latestSortKey = resumeToken; + return eventBSON; } bool RouterStagePipeline::remotesExhausted() { diff --git a/src/mongo/s/query/router_stage_pipeline.h b/src/mongo/s/query/router_stage_pipeline.h index 167016ad103..6241ede25b1 100644 --- a/src/mongo/s/query/router_stage_pipeline.h +++ b/src/mongo/s/query/router_stage_pipeline.h @@ -64,13 +64,13 @@ protected: void doDetachFromOperationContext() final; private: + BSONObj _validateAndConvertToBSON(const Document& event); + BSONObj _setPostBatchResumeTokenUUID(BSONObj pbrt) const; - void _validateAndRecordSortKey(const Document& doc); std::unique_ptr<Pipeline, PipelineDeleter> _mergePipeline; - // May be null if this pipeline is executing exclusively on mongos and will not contact the - // shards at all. + // May be null if this pipeline runs exclusively on mongos without contacting the shards at all. boost::intrusive_ptr<DocumentSourceMergeCursors> _mergeCursorsStage; BSONObj _latestSortKey; |