diff options
author | Matthew Russotto <matthew.russotto@10gen.com> | 2017-11-10 10:25:31 -0500 |
---|---|---|
committer | Matthew Russotto <matthew.russotto@10gen.com> | 2017-11-14 15:50:03 -0500 |
commit | d4a526fdcfa3f740220940b8bf6767da959d4b3d (patch) | |
tree | 7bf2a5a1985c1388554390bc73eb3f06fa9e1b40 /src/mongo/db | |
parent | 714b2f9c4c0db34ad9a678a2be538f05ba0d9c41 (diff) | |
download | mongo-d4a526fdcfa3f740220940b8bf6767da959d4b3d.tar.gz |
SERVER-30834 Make mongos reload the shard registry and re-establish changeStream cursors when encountering a 'retryNeeded' entry
Diffstat (limited to 'src/mongo/db')
3 files changed, 56 insertions, 34 deletions
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp index b9bd8564527..9dbad7ae96d 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream.cpp @@ -78,7 +78,7 @@ constexpr StringData DocumentSourceChangeStream::kDeleteOpType; constexpr StringData DocumentSourceChangeStream::kReplaceOpType; constexpr StringData DocumentSourceChangeStream::kInsertOpType; constexpr StringData DocumentSourceChangeStream::kInvalidateOpType; -constexpr StringData DocumentSourceChangeStream::kRetryNeededOpType; +constexpr StringData DocumentSourceChangeStream::kNewShardDetectedOpType; const BSONObj DocumentSourceChangeStream::kSortSpec = BSON("_id.clusterTime.ts" << 1 << "_id.uuid" << 1 << "_id.documentKey" << 1); @@ -218,8 +218,7 @@ DocumentSource::GetNextResult DocumentSourceCloseCursor::getNext() { const auto& kOperationTypeField = DocumentSourceChangeStream::kOperationTypeField; checkValueType(doc[kOperationTypeField], kOperationTypeField, BSONType::String); auto operationType = doc[kOperationTypeField].getString(); - if (operationType == DocumentSourceChangeStream::kInvalidateOpType || - operationType == DocumentSourceChangeStream::kRetryNeededOpType) { + if (operationType == DocumentSourceChangeStream::kInvalidateOpType) { // Pass the invalidation forward, so that it can be included in the results, or // filtered/transformed by further stages in the pipeline, then throw an exception // to close the cursor on the next call to getNext(). @@ -377,6 +376,28 @@ list<intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::createFromBson( return stages; } +BSONObj DocumentSourceChangeStream::replaceResumeTokenInCommand(const BSONObj originalCmdObj, + const BSONObj resumeToken) { + Document originalCmd(originalCmdObj); + auto pipeline = originalCmd[AggregationRequest::kPipelineName].getArray(); + // A $changeStream must be the first element of the pipeline in order to be able + // to replace (or add) a resume token. + invariant(!pipeline[0][DocumentSourceChangeStream::kStageName].missing()); + + MutableDocument changeStreamStage( + pipeline[0][DocumentSourceChangeStream::kStageName].getDocument()); + changeStreamStage[DocumentSourceChangeStreamSpec::kResumeAfterFieldName] = Value(resumeToken); + + // If the command was initially specified with a resumeAfterClusterTime, we need to remove it + // to use the new resume token. + changeStreamStage[DocumentSourceChangeStreamSpec::kResumeAfterClusterTimeFieldName] = Value(); + pipeline[0] = + Value(Document{{DocumentSourceChangeStream::kStageName, changeStreamStage.freeze()}}); + MutableDocument newCmd(originalCmd); + newCmd[AggregationRequest::kPipelineName] = Value(pipeline); + return newCmd.freeze().toBson(); +} + intrusive_ptr<DocumentSource> DocumentSourceChangeStream::createTransformationStage( BSONObj changeStreamSpec, const intrusive_ptr<ExpressionContext>& expCtx) { return intrusive_ptr<DocumentSource>(new DocumentSourceSingleDocumentTransformation( @@ -463,9 +484,9 @@ Document DocumentSourceChangeStream::Transformation::applyTransformation(const D break; } case repl::OpTypeEnum::kNoop: { - operationType = kRetryNeededOpType; - // Generate a fake document Id for RetryNeeded operation so that we can resume after - // this operation. + operationType = kNewShardDetectedOpType; + // Generate a fake document Id for NewShardDetected operation so that we can resume + // after this operation. documentKey = Value(Document{{kIdField, input[repl::OplogEntry::kObject2FieldName]}}); break; } @@ -499,8 +520,8 @@ Document DocumentSourceChangeStream::Transformation::applyTransformation(const D doc.setSortKeyMetaField(BSON("" << ts << "" << uuid << "" << documentKey)); } - // "invalidate" and "retryNeeded" entries have fewer fields. - if (operationType == kInvalidateOpType || operationType == kRetryNeededOpType) { + // "invalidate" and "newShardDetected" entries have fewer fields. + if (operationType == kInvalidateOpType || operationType == kNewShardDetectedOpType) { return doc.freeze(); } diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h index 944a50e66b0..b4baa40dddc 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.h +++ b/src/mongo/db/pipeline/document_source_change_stream.h @@ -141,8 +141,8 @@ public: static constexpr StringData kReplaceOpType = "replace"_sd; static constexpr StringData kInsertOpType = "insert"_sd; static constexpr StringData kInvalidateOpType = "invalidate"_sd; - // Internal op type to close the cursor. - static constexpr StringData kRetryNeededOpType = "retryNeeded"_sd; + // Internal op type to signal mongos to open cursors on new shards. + static constexpr StringData kNewShardDetectedOpType = "kNewShardDetected"_sd; /** * Produce the BSON object representing the filter for the $match stage to filter oplog entries @@ -162,6 +162,15 @@ public: static boost::intrusive_ptr<DocumentSource> createTransformationStage( BSONObj changeStreamSpec, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); + /** + * Given a BSON object containing an aggregation command with a $changeStream stage, and a + * resume token, returns a new BSON object with the same command except with the addition of a + * resumeAfter: option containing the resume token. If there was a previous resumeAfter: + * option, it is removed. + */ + static BSONObj replaceResumeTokenInCommand(const BSONObj originalCmdObj, + const BSONObj resumeToken); + private: // It is illegal to construct a DocumentSourceChangeStream directly, use createFromBson() // instead. diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp index e70544085c1..d7ce589deb4 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp @@ -560,6 +560,22 @@ TEST_F(ChangeStreamStageTest, TransformInvalidateRenameDropTarget) { checkTransformation(rename, expectedInvalidate); } +TEST_F(ChangeStreamStageTest, TransformNewShardDetected) { + auto o2Field = D{{"type", "migrateChunkToNewShard"_sd}}; + auto newShardDetected = makeOplogEntry(OpTypeEnum::kNoop, + nss, + testUuid(), + boost::none, // fromMigrate + BSONObj(), + o2Field.toBson()); + + Document expectedNewShardDetected{ + {DSChangeStream::kIdField, makeResumeToken(ts, testUuid(), BSON("_id" << o2Field))}, + {DSChangeStream::kOperationTypeField, DSChangeStream::kNewShardDetectedOpType}, + }; + checkTransformation(newShardDetected, expectedNewShardDetected); +} + TEST_F(ChangeStreamStageTest, MatchFiltersCreateCollection) { auto collSpec = D{{"create", "foo"_sd}, @@ -671,29 +687,5 @@ TEST_F(ChangeStreamStageTest, CloseCursorEvenIfInvalidateEntriesGetFilteredOut) ASSERT_THROWS(match->getNext(), ExceptionFor<ErrorCodes::CloseChangeStream>); } -TEST_F(ChangeStreamStageTest, CloseCursorOnRetryNeededEntries) { - auto o2Field = D{{"type", "migrateChunkToNewShard"_sd}}; - auto retryNeeded = makeOplogEntry(OpTypeEnum::kNoop, // op type - nss, // namespace - testUuid(), // uuid - boost::none, // fromMigrate - {}, // o - o2Field.toBson()); // o2 - - auto stages = makeStages(retryNeeded); - auto closeCursor = stages.back(); - - Document expectedRetryNeeded{ - {DSChangeStream::kIdField, makeResumeToken(ts, testUuid(), BSON("_id" << o2Field))}, - {DSChangeStream::kOperationTypeField, DSChangeStream::kRetryNeededOpType}, - }; - - auto next = closeCursor->getNext(); - // Transform into RetryNeeded entry. - ASSERT_DOCUMENT_EQ(next.releaseDocument(), expectedRetryNeeded); - // Then throw an exception on the next call of getNext(). - ASSERT_THROWS(closeCursor->getNext(), ExceptionFor<ErrorCodes::CloseChangeStream>); -} - } // namespace } // namespace mongo |