summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline
diff options
context:
space:
mode:
authorMatthew Russotto <matthew.russotto@10gen.com>2017-11-13 10:35:23 -0500
committerMatthew Russotto <matthew.russotto@10gen.com>2017-11-13 16:19:00 -0500
commit634857247b51b1d8ce68530636579480c67f4d19 (patch)
treef098e91e682772502f6273a829fda71033c0cab3 /src/mongo/db/pipeline
parent7e59d2482a83b48ccf503f617dcacc8c37d9b36a (diff)
downloadmongo-634857247b51b1d8ce68530636579480c67f4d19.tar.gz
SERVER-31685 Sharded change streams can miss notifications
Diffstat (limited to 'src/mongo/db/pipeline')
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.cpp28
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_test.cpp21
-rw-r--r--src/mongo/db/pipeline/document_sources.idl23
3 files changed, 63 insertions, 9 deletions
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp
index 5b850f8e6e3..b9bd8564527 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream.cpp
@@ -34,6 +34,7 @@
#include "mongo/db/bson/bson_helper.h"
#include "mongo/db/catalog/uuid_catalog.h"
#include "mongo/db/commands/feature_compatibility_version_command_parser.h"
+#include "mongo/db/logical_clock.h"
#include "mongo/db/pipeline/document_path_support.h"
#include "mongo/db/pipeline/document_source_check_resume_token.h"
#include "mongo/db/pipeline/document_source_limit.h"
@@ -326,6 +327,16 @@ list<intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::createFromBson(
resumeStage = DocumentSourceEnsureResumeTokenPresent::create(expCtx, std::move(token));
}
}
+ if (auto resumeAfterClusterTime = spec.getResumeAfterClusterTime()) {
+ uassert(50656,
+ str::stream() << "Do not specify both "
+ << DocumentSourceChangeStreamSpec::kResumeAfterFieldName
+ << " and "
+ << DocumentSourceChangeStreamSpec::kResumeAfterClusterTimeFieldName
+ << " in a $changeStream stage.",
+ !resumeStage);
+ startFrom = resumeAfterClusterTime->getTimestamp();
+ }
const bool changeStreamIsResuming = (resumeStage != nullptr);
auto fullDocOption = spec.getFullDocument();
@@ -505,7 +516,22 @@ Document DocumentSourceChangeStream::Transformation::applyTransformation(const D
Document DocumentSourceChangeStream::Transformation::serializeStageOptions(
boost::optional<ExplainOptions::Verbosity> explain) const {
- return Document(_changeStreamSpec);
+ Document changeStreamOptions(_changeStreamSpec);
+ // If we're on a mongos and no other start time is specified, we want to start at the current
+ // cluster time on the mongos. This ensures all shards use the same start time.
+ if (_expCtx->inMongos &&
+ changeStreamOptions[DocumentSourceChangeStreamSpec::kResumeAfterFieldName].missing() &&
+ changeStreamOptions[DocumentSourceChangeStreamSpec::kResumeAfterClusterTimeFieldName]
+ .missing()) {
+ MutableDocument newChangeStreamOptions(changeStreamOptions);
+ newChangeStreamOptions[DocumentSourceChangeStreamSpec::kResumeAfterClusterTimeFieldName]
+ [ResumeTokenClusterTime::kTimestampFieldName] =
+ Value(LogicalClock::get(_expCtx->opCtx)
+ ->getClusterTime()
+ .asTimestamp());
+ changeStreamOptions = newChangeStreamOptions.freeze();
+ }
+ return changeStreamOptions;
}
DocumentSource::GetDepsReturn DocumentSourceChangeStream::Transformation::addDependencies(
diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
index f90e5724eb3..e70544085c1 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
@@ -33,6 +33,8 @@
#include "mongo/bson/bsonobj.h"
#include "mongo/bson/json.h"
+#include "mongo/db/catalog/collection_mock.h"
+#include "mongo/db/catalog/uuid_catalog.h"
#include "mongo/db/pipeline/aggregation_context_fixture.h"
#include "mongo/db/pipeline/document.h"
#include "mongo/db/pipeline/document_source.h"
@@ -248,6 +250,25 @@ TEST_F(ChangeStreamStageTest, ShouldRejectUnrecognizedFullDocumentOption) {
40575);
}
+TEST_F(ChangeStreamStageTest, ShouldRejectBothResumeAfterClusterTimeAndResumeAfterOptions) {
+ auto expCtx = getExpCtx();
+
+ // Need to put the collection in the UUID catalog so the resume token is valid.
+ Collection collection(stdx::make_unique<CollectionMock>(nss));
+ UUIDCatalog::get(expCtx->opCtx).onCreateCollection(expCtx->opCtx, &collection, testUuid());
+
+ ASSERT_THROWS_CODE(
+ DSChangeStream::createFromBson(
+ BSON(DSChangeStream::kStageName << BSON(
+ "resumeAfter" << makeResumeToken(ts, testUuid(), BSON("x" << 2 << "_id" << 1))
+ << "$_resumeAfterClusterTime"
+ << BSON("ts" << ts)))
+ .firstElement(),
+ expCtx),
+ AssertionException,
+ 50656);
+}
+
TEST_F(ChangeStreamStageTestNoSetup, FailsWithNoReplicationCoordinator) {
const auto spec = fromjson("{$changeStream: {}}");
diff --git a/src/mongo/db/pipeline/document_sources.idl b/src/mongo/db/pipeline/document_sources.idl
index ef427a711f4..529f8b8b165 100644
--- a/src/mongo/db/pipeline/document_sources.idl
+++ b/src/mongo/db/pipeline/document_sources.idl
@@ -56,6 +56,14 @@ types:
structs:
+ ResumeTokenClusterTime:
+ description: The IDL type of cluster time
+ fields:
+ ts:
+ cpp_name: timestamp
+ type: timestamp
+ description: The timestamp of the logical time
+
DocumentSourceChangeStreamSpec:
description: A document used to specify the $changeStream stage of an aggregation
pipeline.
@@ -66,6 +74,13 @@ structs:
optional: true
description: An object representing the point at which we should resume reporting
changes from.
+ $_resumeAfterClusterTime:
+ cpp_name: resumeAfterClusterTime
+ type: ResumeTokenClusterTime
+ optional: true
+ description: The cluster time after which we should start reporting changes.
+ Only one of resumeAfter and _resumeAfterClusterTime should be
+ specified. For internal use only.
fullDocument:
cpp_name: fullDocument
type: string
@@ -73,14 +88,6 @@ structs:
description: A string '"updateLookup"' or '"default"', indicating whether or not we
should return a full document or just changes for an update.
- ResumeTokenClusterTime:
- description: The IDL type of cluster time
- fields:
- ts:
- cpp_name: timestamp
- type: timestamp
- description: The timestamp of the logical time
-
ListSessionsUser:
description: "A struct representing a $listSessions/$listLocalSessions User"
strict: true