summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/document_source_change_stream.h
diff options
context:
space:
mode:
authorMatthew Russotto <matthew.russotto@10gen.com>2017-11-10 10:25:31 -0500
committerMatthew Russotto <matthew.russotto@10gen.com>2017-11-14 15:50:03 -0500
commitd4a526fdcfa3f740220940b8bf6767da959d4b3d (patch)
tree7bf2a5a1985c1388554390bc73eb3f06fa9e1b40 /src/mongo/db/pipeline/document_source_change_stream.h
parent714b2f9c4c0db34ad9a678a2be538f05ba0d9c41 (diff)
downloadmongo-d4a526fdcfa3f740220940b8bf6767da959d4b3d.tar.gz
SERVER-30834 Make mongos reload the shard registry and re-establish changeStream cursors when encountering a 'retryNeeded' entry
Diffstat (limited to 'src/mongo/db/pipeline/document_source_change_stream.h')
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.h13
1 files changed, 11 insertions, 2 deletions
diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h
index 944a50e66b0..b4baa40dddc 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.h
+++ b/src/mongo/db/pipeline/document_source_change_stream.h
@@ -141,8 +141,8 @@ public:
static constexpr StringData kReplaceOpType = "replace"_sd;
static constexpr StringData kInsertOpType = "insert"_sd;
static constexpr StringData kInvalidateOpType = "invalidate"_sd;
- // Internal op type to close the cursor.
- static constexpr StringData kRetryNeededOpType = "retryNeeded"_sd;
+ // Internal op type to signal mongos to open cursors on new shards.
+ static constexpr StringData kNewShardDetectedOpType = "kNewShardDetected"_sd;
/**
* Produce the BSON object representing the filter for the $match stage to filter oplog entries
@@ -162,6 +162,15 @@ public:
static boost::intrusive_ptr<DocumentSource> createTransformationStage(
BSONObj changeStreamSpec, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+ /**
+ * Given a BSON object containing an aggregation command with a $changeStream stage, and a
+ * resume token, returns a new BSON object with the same command except with the addition of a
+ * resumeAfter: option containing the resume token. If there was a previous resumeAfter:
+ * option, it is removed.
+ */
+ static BSONObj replaceResumeTokenInCommand(const BSONObj originalCmdObj,
+ const BSONObj resumeToken);
+
private:
// It is illegal to construct a DocumentSourceChangeStream directly, use createFromBson()
// instead.