diff options
author | Matthew Russotto <matthew.russotto@10gen.com> | 2017-11-13 10:35:23 -0500 |
---|---|---|
committer | Matthew Russotto <matthew.russotto@10gen.com> | 2017-11-13 16:19:00 -0500 |
commit | 634857247b51b1d8ce68530636579480c67f4d19 (patch) | |
tree | f098e91e682772502f6273a829fda71033c0cab3 /src/mongo/db/pipeline | |
parent | 7e59d2482a83b48ccf503f617dcacc8c37d9b36a (diff) | |
download | mongo-634857247b51b1d8ce68530636579480c67f4d19.tar.gz |
SERVER-31685 Sharded change streams can miss notifications
Diffstat (limited to 'src/mongo/db/pipeline')
-rw-r--r-- | src/mongo/db/pipeline/document_source_change_stream.cpp | 28 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_change_stream_test.cpp | 21 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_sources.idl | 23 |
3 files changed, 63 insertions, 9 deletions
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp index 5b850f8e6e3..b9bd8564527 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream.cpp @@ -34,6 +34,7 @@ #include "mongo/db/bson/bson_helper.h" #include "mongo/db/catalog/uuid_catalog.h" #include "mongo/db/commands/feature_compatibility_version_command_parser.h" +#include "mongo/db/logical_clock.h" #include "mongo/db/pipeline/document_path_support.h" #include "mongo/db/pipeline/document_source_check_resume_token.h" #include "mongo/db/pipeline/document_source_limit.h" @@ -326,6 +327,16 @@ list<intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::createFromBson( resumeStage = DocumentSourceEnsureResumeTokenPresent::create(expCtx, std::move(token)); } } + if (auto resumeAfterClusterTime = spec.getResumeAfterClusterTime()) { + uassert(50656, + str::stream() << "Do not specify both " + << DocumentSourceChangeStreamSpec::kResumeAfterFieldName + << " and " + << DocumentSourceChangeStreamSpec::kResumeAfterClusterTimeFieldName + << " in a $changeStream stage.", + !resumeStage); + startFrom = resumeAfterClusterTime->getTimestamp(); + } const bool changeStreamIsResuming = (resumeStage != nullptr); auto fullDocOption = spec.getFullDocument(); @@ -505,7 +516,22 @@ Document DocumentSourceChangeStream::Transformation::applyTransformation(const D Document DocumentSourceChangeStream::Transformation::serializeStageOptions( boost::optional<ExplainOptions::Verbosity> explain) const { - return Document(_changeStreamSpec); + Document changeStreamOptions(_changeStreamSpec); + // If we're on a mongos and no other start time is specified, we want to start at the current + // cluster time on the mongos. This ensures all shards use the same start time. + if (_expCtx->inMongos && + changeStreamOptions[DocumentSourceChangeStreamSpec::kResumeAfterFieldName].missing() && + changeStreamOptions[DocumentSourceChangeStreamSpec::kResumeAfterClusterTimeFieldName] + .missing()) { + MutableDocument newChangeStreamOptions(changeStreamOptions); + newChangeStreamOptions[DocumentSourceChangeStreamSpec::kResumeAfterClusterTimeFieldName] + [ResumeTokenClusterTime::kTimestampFieldName] = + Value(LogicalClock::get(_expCtx->opCtx) + ->getClusterTime() + .asTimestamp()); + changeStreamOptions = newChangeStreamOptions.freeze(); + } + return changeStreamOptions; } DocumentSource::GetDepsReturn DocumentSourceChangeStream::Transformation::addDependencies( diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp index f90e5724eb3..e70544085c1 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp @@ -33,6 +33,8 @@ #include "mongo/bson/bsonobj.h" #include "mongo/bson/json.h" +#include "mongo/db/catalog/collection_mock.h" +#include "mongo/db/catalog/uuid_catalog.h" #include "mongo/db/pipeline/aggregation_context_fixture.h" #include "mongo/db/pipeline/document.h" #include "mongo/db/pipeline/document_source.h" @@ -248,6 +250,25 @@ TEST_F(ChangeStreamStageTest, ShouldRejectUnrecognizedFullDocumentOption) { 40575); } +TEST_F(ChangeStreamStageTest, ShouldRejectBothResumeAfterClusterTimeAndResumeAfterOptions) { + auto expCtx = getExpCtx(); + + // Need to put the collection in the UUID catalog so the resume token is valid. + Collection collection(stdx::make_unique<CollectionMock>(nss)); + UUIDCatalog::get(expCtx->opCtx).onCreateCollection(expCtx->opCtx, &collection, testUuid()); + + ASSERT_THROWS_CODE( + DSChangeStream::createFromBson( + BSON(DSChangeStream::kStageName << BSON( + "resumeAfter" << makeResumeToken(ts, testUuid(), BSON("x" << 2 << "_id" << 1)) + << "$_resumeAfterClusterTime" + << BSON("ts" << ts))) + .firstElement(), + expCtx), + AssertionException, + 50656); +} + TEST_F(ChangeStreamStageTestNoSetup, FailsWithNoReplicationCoordinator) { const auto spec = fromjson("{$changeStream: {}}"); diff --git a/src/mongo/db/pipeline/document_sources.idl b/src/mongo/db/pipeline/document_sources.idl index ef427a711f4..529f8b8b165 100644 --- a/src/mongo/db/pipeline/document_sources.idl +++ b/src/mongo/db/pipeline/document_sources.idl @@ -56,6 +56,14 @@ types: structs: + ResumeTokenClusterTime: + description: The IDL type of cluster time + fields: + ts: + cpp_name: timestamp + type: timestamp + description: The timestamp of the logical time + DocumentSourceChangeStreamSpec: description: A document used to specify the $changeStream stage of an aggregation pipeline. @@ -66,6 +74,13 @@ structs: optional: true description: An object representing the point at which we should resume reporting changes from. + $_resumeAfterClusterTime: + cpp_name: resumeAfterClusterTime + type: ResumeTokenClusterTime + optional: true + description: The cluster time after which we should start reporting changes. + Only one of resumeAfter and _resumeAfterClusterTime should be + specified. For internal use only. fullDocument: cpp_name: fullDocument type: string @@ -73,14 +88,6 @@ structs: description: A string '"updateLookup"' or '"default"', indicating whether or not we should return a full document or just changes for an update. - ResumeTokenClusterTime: - description: The IDL type of cluster time - fields: - ts: - cpp_name: timestamp - type: timestamp - description: The timestamp of the logical time - ListSessionsUser: description: "A struct representing a $listSessions/$listLocalSessions User" strict: true |