summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorMatthew Russotto <matthew.russotto@10gen.com>2017-11-10 10:25:31 -0500
committerMatthew Russotto <matthew.russotto@10gen.com>2017-11-14 15:50:03 -0500
commitd4a526fdcfa3f740220940b8bf6767da959d4b3d (patch)
tree7bf2a5a1985c1388554390bc73eb3f06fa9e1b40 /src/mongo/db
parent714b2f9c4c0db34ad9a678a2be538f05ba0d9c41 (diff)
downloadmongo-d4a526fdcfa3f740220940b8bf6767da959d4b3d.tar.gz
SERVER-30834 Make mongos reload the shard registry and re-establish changeStream cursors when encountering a 'retryNeeded' entry
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.cpp37
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.h13
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_test.cpp40
3 files changed, 56 insertions, 34 deletions
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp
index b9bd8564527..9dbad7ae96d 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream.cpp
@@ -78,7 +78,7 @@ constexpr StringData DocumentSourceChangeStream::kDeleteOpType;
constexpr StringData DocumentSourceChangeStream::kReplaceOpType;
constexpr StringData DocumentSourceChangeStream::kInsertOpType;
constexpr StringData DocumentSourceChangeStream::kInvalidateOpType;
-constexpr StringData DocumentSourceChangeStream::kRetryNeededOpType;
+constexpr StringData DocumentSourceChangeStream::kNewShardDetectedOpType;
const BSONObj DocumentSourceChangeStream::kSortSpec =
BSON("_id.clusterTime.ts" << 1 << "_id.uuid" << 1 << "_id.documentKey" << 1);
@@ -218,8 +218,7 @@ DocumentSource::GetNextResult DocumentSourceCloseCursor::getNext() {
const auto& kOperationTypeField = DocumentSourceChangeStream::kOperationTypeField;
checkValueType(doc[kOperationTypeField], kOperationTypeField, BSONType::String);
auto operationType = doc[kOperationTypeField].getString();
- if (operationType == DocumentSourceChangeStream::kInvalidateOpType ||
- operationType == DocumentSourceChangeStream::kRetryNeededOpType) {
+ if (operationType == DocumentSourceChangeStream::kInvalidateOpType) {
// Pass the invalidation forward, so that it can be included in the results, or
// filtered/transformed by further stages in the pipeline, then throw an exception
// to close the cursor on the next call to getNext().
@@ -377,6 +376,28 @@ list<intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::createFromBson(
return stages;
}
+BSONObj DocumentSourceChangeStream::replaceResumeTokenInCommand(const BSONObj originalCmdObj,
+ const BSONObj resumeToken) {
+ Document originalCmd(originalCmdObj);
+ auto pipeline = originalCmd[AggregationRequest::kPipelineName].getArray();
+ // A $changeStream must be the first element of the pipeline in order to be able
+ // to replace (or add) a resume token.
+ invariant(!pipeline[0][DocumentSourceChangeStream::kStageName].missing());
+
+ MutableDocument changeStreamStage(
+ pipeline[0][DocumentSourceChangeStream::kStageName].getDocument());
+ changeStreamStage[DocumentSourceChangeStreamSpec::kResumeAfterFieldName] = Value(resumeToken);
+
+ // If the command was initially specified with a resumeAfterClusterTime, we need to remove it
+ // to use the new resume token.
+ changeStreamStage[DocumentSourceChangeStreamSpec::kResumeAfterClusterTimeFieldName] = Value();
+ pipeline[0] =
+ Value(Document{{DocumentSourceChangeStream::kStageName, changeStreamStage.freeze()}});
+ MutableDocument newCmd(originalCmd);
+ newCmd[AggregationRequest::kPipelineName] = Value(pipeline);
+ return newCmd.freeze().toBson();
+}
+
intrusive_ptr<DocumentSource> DocumentSourceChangeStream::createTransformationStage(
BSONObj changeStreamSpec, const intrusive_ptr<ExpressionContext>& expCtx) {
return intrusive_ptr<DocumentSource>(new DocumentSourceSingleDocumentTransformation(
@@ -463,9 +484,9 @@ Document DocumentSourceChangeStream::Transformation::applyTransformation(const D
break;
}
case repl::OpTypeEnum::kNoop: {
- operationType = kRetryNeededOpType;
- // Generate a fake document Id for RetryNeeded operation so that we can resume after
- // this operation.
+ operationType = kNewShardDetectedOpType;
+ // Generate a fake document Id for NewShardDetected operation so that we can resume
+ // after this operation.
documentKey = Value(Document{{kIdField, input[repl::OplogEntry::kObject2FieldName]}});
break;
}
@@ -499,8 +520,8 @@ Document DocumentSourceChangeStream::Transformation::applyTransformation(const D
doc.setSortKeyMetaField(BSON("" << ts << "" << uuid << "" << documentKey));
}
- // "invalidate" and "retryNeeded" entries have fewer fields.
- if (operationType == kInvalidateOpType || operationType == kRetryNeededOpType) {
+ // "invalidate" and "newShardDetected" entries have fewer fields.
+ if (operationType == kInvalidateOpType || operationType == kNewShardDetectedOpType) {
return doc.freeze();
}
diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h
index 944a50e66b0..b4baa40dddc 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.h
+++ b/src/mongo/db/pipeline/document_source_change_stream.h
@@ -141,8 +141,8 @@ public:
static constexpr StringData kReplaceOpType = "replace"_sd;
static constexpr StringData kInsertOpType = "insert"_sd;
static constexpr StringData kInvalidateOpType = "invalidate"_sd;
- // Internal op type to close the cursor.
- static constexpr StringData kRetryNeededOpType = "retryNeeded"_sd;
+ // Internal op type to signal mongos to open cursors on new shards.
+ static constexpr StringData kNewShardDetectedOpType = "kNewShardDetected"_sd;
/**
* Produce the BSON object representing the filter for the $match stage to filter oplog entries
@@ -162,6 +162,15 @@ public:
static boost::intrusive_ptr<DocumentSource> createTransformationStage(
BSONObj changeStreamSpec, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+ /**
+ * Given a BSON object containing an aggregation command with a $changeStream stage, and a
+ * resume token, returns a new BSON object with the same command except with the addition of a
+ * resumeAfter: option containing the resume token. If there was a previous resumeAfter:
+ * option, it is removed.
+ */
+ static BSONObj replaceResumeTokenInCommand(const BSONObj originalCmdObj,
+ const BSONObj resumeToken);
+
private:
// It is illegal to construct a DocumentSourceChangeStream directly, use createFromBson()
// instead.
diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
index e70544085c1..d7ce589deb4 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
@@ -560,6 +560,22 @@ TEST_F(ChangeStreamStageTest, TransformInvalidateRenameDropTarget) {
checkTransformation(rename, expectedInvalidate);
}
+TEST_F(ChangeStreamStageTest, TransformNewShardDetected) {
+ auto o2Field = D{{"type", "migrateChunkToNewShard"_sd}};
+ auto newShardDetected = makeOplogEntry(OpTypeEnum::kNoop,
+ nss,
+ testUuid(),
+ boost::none, // fromMigrate
+ BSONObj(),
+ o2Field.toBson());
+
+ Document expectedNewShardDetected{
+ {DSChangeStream::kIdField, makeResumeToken(ts, testUuid(), BSON("_id" << o2Field))},
+ {DSChangeStream::kOperationTypeField, DSChangeStream::kNewShardDetectedOpType},
+ };
+ checkTransformation(newShardDetected, expectedNewShardDetected);
+}
+
TEST_F(ChangeStreamStageTest, MatchFiltersCreateCollection) {
auto collSpec =
D{{"create", "foo"_sd},
@@ -671,29 +687,5 @@ TEST_F(ChangeStreamStageTest, CloseCursorEvenIfInvalidateEntriesGetFilteredOut)
ASSERT_THROWS(match->getNext(), ExceptionFor<ErrorCodes::CloseChangeStream>);
}
-TEST_F(ChangeStreamStageTest, CloseCursorOnRetryNeededEntries) {
- auto o2Field = D{{"type", "migrateChunkToNewShard"_sd}};
- auto retryNeeded = makeOplogEntry(OpTypeEnum::kNoop, // op type
- nss, // namespace
- testUuid(), // uuid
- boost::none, // fromMigrate
- {}, // o
- o2Field.toBson()); // o2
-
- auto stages = makeStages(retryNeeded);
- auto closeCursor = stages.back();
-
- Document expectedRetryNeeded{
- {DSChangeStream::kIdField, makeResumeToken(ts, testUuid(), BSON("_id" << o2Field))},
- {DSChangeStream::kOperationTypeField, DSChangeStream::kRetryNeededOpType},
- };
-
- auto next = closeCursor->getNext();
- // Transform into RetryNeeded entry.
- ASSERT_DOCUMENT_EQ(next.releaseDocument(), expectedRetryNeeded);
- // Then throw an exception on the next call of getNext().
- ASSERT_THROWS(closeCursor->getNext(), ExceptionFor<ErrorCodes::CloseChangeStream>);
-}
-
} // namespace
} // namespace mongo