diff options
author | Matthew Russotto <matthew.russotto@10gen.com> | 2017-11-10 10:25:31 -0500 |
---|---|---|
committer | Matthew Russotto <matthew.russotto@10gen.com> | 2017-11-14 15:50:03 -0500 |
commit | d4a526fdcfa3f740220940b8bf6767da959d4b3d (patch) | |
tree | 7bf2a5a1985c1388554390bc73eb3f06fa9e1b40 /src/mongo/db/pipeline/document_source_change_stream.h | |
parent | 714b2f9c4c0db34ad9a678a2be538f05ba0d9c41 (diff) | |
download | mongo-d4a526fdcfa3f740220940b8bf6767da959d4b3d.tar.gz |
SERVER-30834 Make mongos reload the shard registry and re-establish changeStream cursors when encountering a 'retryNeeded' entry
Diffstat (limited to 'src/mongo/db/pipeline/document_source_change_stream.h')
-rw-r--r-- | src/mongo/db/pipeline/document_source_change_stream.h | 13 |
1 files changed, 11 insertions, 2 deletions
diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h index 944a50e66b0..b4baa40dddc 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.h +++ b/src/mongo/db/pipeline/document_source_change_stream.h @@ -141,8 +141,8 @@ public: static constexpr StringData kReplaceOpType = "replace"_sd; static constexpr StringData kInsertOpType = "insert"_sd; static constexpr StringData kInvalidateOpType = "invalidate"_sd; - // Internal op type to close the cursor. - static constexpr StringData kRetryNeededOpType = "retryNeeded"_sd; + // Internal op type to signal mongos to open cursors on new shards. + static constexpr StringData kNewShardDetectedOpType = "kNewShardDetected"_sd; /** * Produce the BSON object representing the filter for the $match stage to filter oplog entries @@ -162,6 +162,15 @@ public: static boost::intrusive_ptr<DocumentSource> createTransformationStage( BSONObj changeStreamSpec, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); + /** + * Given a BSON object containing an aggregation command with a $changeStream stage, and a + * resume token, returns a new BSON object with the same command except with the addition of a + * resumeAfter: option containing the resume token. If there was a previous resumeAfter: + * option, it is removed. + */ + static BSONObj replaceResumeTokenInCommand(const BSONObj originalCmdObj, + const BSONObj resumeToken); + private: // It is illegal to construct a DocumentSourceChangeStream directly, use createFromBson() // instead. |