summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorBernard Gorman <bernard.gorman@gmail.com>2018-12-13 20:54:06 +0000
committerBernard Gorman <bernard.gorman@gmail.com>2019-02-03 15:06:46 +0000
commit065f3ef77de57609d92fce482c1e4e36b15cf29c (patch)
tree52a3735f31b7b5632e7e04d4ed8657f75e12c1f9 /src
parentfbb9daeb791d16c49b861d82c097cf9bd9daf07e (diff)
downloadmongo-065f3ef77de57609d92fce482c1e4e36b15cf29c.tar.gz
SERVER-37786 Reject change stream pipelines which modify or project-out the resume token
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/exec/change_stream_proxy.cpp25
-rw-r--r--src/mongo/db/exec/change_stream_proxy.h8
-rw-r--r--src/mongo/s/query/router_stage_pipeline.cpp29
-rw-r--r--src/mongo/s/query/router_stage_pipeline.h6
4 files changed, 58 insertions, 10 deletions
diff --git a/src/mongo/db/exec/change_stream_proxy.cpp b/src/mongo/db/exec/change_stream_proxy.cpp
index 65c52bff0cd..46e5beb60f4 100644
--- a/src/mongo/db/exec/change_stream_proxy.cpp
+++ b/src/mongo/db/exec/change_stream_proxy.cpp
@@ -56,7 +56,7 @@ boost::optional<BSONObj> ChangeStreamProxyStage::getNextBson() {
if (auto next = _pipeline->getNext()) {
// While we have more results to return, we track both the timestamp and the resume token of
// the latest event observed in the oplog, the latter via its sort key metadata field.
- auto nextBSON = (_includeMetaData ? next->toBsonWithMetaData() : next->toBson());
+ auto nextBSON = _validateAndConvertToBSON(*next);
_latestOplogTimestamp = PipelineD::getLatestOplogTimestamp(_pipeline.get());
_postBatchResumeToken = next->getSortKeyMetaField();
_setSpeculativeReadOpTime();
@@ -79,6 +79,29 @@ boost::optional<BSONObj> ChangeStreamProxyStage::getNextBson() {
return boost::none;
}
+BSONObj ChangeStreamProxyStage::_validateAndConvertToBSON(const Document& event) const {
+ // If we are producing output to be merged on mongoS, then no stages can have modified the _id.
+ if (_includeMetaData) {
+ return event.toBsonWithMetaData();
+ }
+ // Confirm that the document _id field matches the original resume token in the sort key field.
+ auto eventBSON = event.toBson();
+ auto resumeToken = event.getSortKeyMetaField();
+ auto idField = eventBSON.getObjectField("_id");
+ invariant(!resumeToken.isEmpty());
+ uassert(51059,
+ str::stream() << "Encountered an event whose _id field, which contains the resume "
+ "token, was modified by the pipeline. Modifying the _id field of an "
+ "event makes it impossible to resume the stream from that point. Only "
+ "transformations that retain the unmodified _id field are allowed. "
+ "Expected: "
+ << BSON("_id" << resumeToken)
+ << " but found: "
+ << (eventBSON["_id"] ? BSON("_id" << eventBSON["_id"]) : BSONObj()),
+ idField.binaryEqual(resumeToken));
+ return eventBSON;
+}
+
void ChangeStreamProxyStage::_setSpeculativeReadOpTime() {
repl::SpeculativeMajorityReadInfo& speculativeMajorityReadInfo =
repl::SpeculativeMajorityReadInfo::get(_pipeline->getContext()->opCtx);
diff --git a/src/mongo/db/exec/change_stream_proxy.h b/src/mongo/db/exec/change_stream_proxy.h
index 380e5c4a09d..8a9a3dcc40a 100644
--- a/src/mongo/db/exec/change_stream_proxy.h
+++ b/src/mongo/db/exec/change_stream_proxy.h
@@ -81,8 +81,12 @@ protected:
private:
/**
- * Set the speculative majority read optime if we have scanned the oplog up to a certain
- * timestamp.
+ * Verifies that the docs's resume token has not been modified, then converts the doc to BSON.
+ */
+ BSONObj _validateAndConvertToBSON(const Document& event) const;
+
+ /**
+ * Set the speculative majority read optime if we have scanned up to a certain oplog timestamp.
*/
void _setSpeculativeReadOpTime();
diff --git a/src/mongo/s/query/router_stage_pipeline.cpp b/src/mongo/s/query/router_stage_pipeline.cpp
index 0811c130abf..e9b333a0152 100644
--- a/src/mongo/s/query/router_stage_pipeline.cpp
+++ b/src/mongo/s/query/router_stage_pipeline.cpp
@@ -55,8 +55,7 @@ StatusWith<ClusterQueryResult> RouterStagePipeline::next(RouterExecStage::ExecCo
// Pipeline::getNext will return a boost::optional<Document> or boost::none if EOF.
if (auto result = _mergePipeline->getNext()) {
- _validateAndRecordSortKey(*result);
- return {result->toBson()};
+ return _validateAndConvertToBSON(*result);
}
// If we reach this point, we have hit EOF.
@@ -111,8 +110,30 @@ BSONObj RouterStagePipeline::_setPostBatchResumeTokenUUID(BSONObj pbrt) const {
return pbrt;
}
-void RouterStagePipeline::_validateAndRecordSortKey(const Document& doc) {
- _latestSortKey = doc.getSortKeyMetaField();
+BSONObj RouterStagePipeline::_validateAndConvertToBSON(const Document& event) {
+ // If this is not a change stream pipeline, we have nothing to do except return the BSONObj.
+ if (!_mergePipeline->getContext()->isTailableAwaitData()) {
+ return event.toBson();
+ }
+ // Confirm that the document _id field matches the original resume token in the sort key field.
+ auto eventBSON = event.toBson();
+ auto resumeToken = event.getSortKeyMetaField();
+ auto idField = eventBSON.getObjectField("_id");
+ invariant(!resumeToken.isEmpty());
+ uassert(51060,
+ str::stream() << "Encountered an event whose _id field, which contains the resume "
+ "token, was modified by the pipeline. Modifying the _id field of an "
+ "event makes it impossible to resume the stream from that point. Only "
+ "transformations that retain the unmodified _id field are allowed. "
+ "Expected: "
+ << BSON("_id" << resumeToken)
+ << " but found: "
+ << (eventBSON["_id"] ? BSON("_id" << eventBSON["_id"]) : BSONObj()),
+ idField.binaryEqual(resumeToken));
+
+ // Record the latest resume token for later comparison, then return the event in BSONObj form.
+ _latestSortKey = resumeToken;
+ return eventBSON;
}
bool RouterStagePipeline::remotesExhausted() {
diff --git a/src/mongo/s/query/router_stage_pipeline.h b/src/mongo/s/query/router_stage_pipeline.h
index 167016ad103..6241ede25b1 100644
--- a/src/mongo/s/query/router_stage_pipeline.h
+++ b/src/mongo/s/query/router_stage_pipeline.h
@@ -64,13 +64,13 @@ protected:
void doDetachFromOperationContext() final;
private:
+ BSONObj _validateAndConvertToBSON(const Document& event);
+
BSONObj _setPostBatchResumeTokenUUID(BSONObj pbrt) const;
- void _validateAndRecordSortKey(const Document& doc);
std::unique_ptr<Pipeline, PipelineDeleter> _mergePipeline;
- // May be null if this pipeline is executing exclusively on mongos and will not contact the
- // shards at all.
+ // May be null if this pipeline runs exclusively on mongos without contacting the shards at all.
boost::intrusive_ptr<DocumentSourceMergeCursors> _mergeCursorsStage;
BSONObj _latestSortKey;