summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline
diff options
context:
space:
mode:
authorMatthew Russotto <matthew.russotto@10gen.com>2017-08-03 13:55:23 -0400
committerMatthew Russotto <matthew.russotto@10gen.com>2017-08-03 14:32:12 -0400
commit0686b1ed2d042ad5adb671912e7d6b9cc89d0c4f (patch)
tree5fe3f0a6d4e8445eaec46b564b55cfc6753cbcac /src/mongo/db/pipeline
parent07d4d94b06c6899699410312e20ef33d954ddbd1 (diff)
downloadmongo-0686b1ed2d042ad5adb671912e7d6b9cc89d0c4f.tar.gz
SERVER-29131 Support resumeAfter option to control where to start returning notifications from, which always errors if no entry with the given resumeToken exists
Diffstat (limited to 'src/mongo/db/pipeline')
-rw-r--r--src/mongo/db/pipeline/SConscript19
-rw-r--r--src/mongo/db/pipeline/document_source_change_notification.cpp76
-rw-r--r--src/mongo/db/pipeline/document_source_change_notification.h3
-rw-r--r--src/mongo/db/pipeline/document_source_change_notification_test.cpp34
-rw-r--r--src/mongo/db/pipeline/document_source_check_resume_token.cpp80
-rw-r--r--src/mongo/db/pipeline/document_source_check_resume_token.h79
-rw-r--r--src/mongo/db/pipeline/document_source_check_resume_token_test.cpp191
-rw-r--r--src/mongo/db/pipeline/document_sources.idl103
-rw-r--r--src/mongo/db/pipeline/pipeline_test.cpp14
-rw-r--r--src/mongo/db/pipeline/resume_token.cpp79
-rw-r--r--src/mongo/db/pipeline/resume_token.h75
-rw-r--r--src/mongo/db/pipeline/value.cpp13
-rw-r--r--src/mongo/db/pipeline/value.h5
13 files changed, 718 insertions, 53 deletions
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index 093cbb5a5c9..e548ff4f464 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -130,6 +130,7 @@ env.CppUnitTest(
'document_source_bucket_auto_test.cpp',
'document_source_bucket_test.cpp',
'document_source_change_notification_test.cpp',
+ 'document_source_check_resume_token_test.cpp',
'document_source_count_test.cpp',
'document_source_current_op_test.cpp',
'document_source_geo_near_test.cpp',
@@ -157,6 +158,7 @@ env.CppUnitTest(
'$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init',
'$BUILD_DIR/mongo/db/query/query_test_service_context',
'$BUILD_DIR/mongo/db/repl/oplog_entry',
+ '$BUILD_DIR/mongo/db/repl/replmocks',
'$BUILD_DIR/mongo/db/service_context',
'$BUILD_DIR/mongo/s/is_mongos',
'$BUILD_DIR/mongo/util/clock_source_mock',
@@ -264,6 +266,7 @@ docSourceEnv.Library(
'$BUILD_DIR/mongo/db/matcher/expressions',
'$BUILD_DIR/mongo/db/pipeline/lite_parsed_document_source',
'$BUILD_DIR/mongo/db/repl/oplog_entry',
+ '$BUILD_DIR/mongo/db/repl/repl_coordinator_interface',
'$BUILD_DIR/mongo/db/service_context',
'$BUILD_DIR/mongo/db/stats/top',
'$BUILD_DIR/mongo/db/storage/encryption_hooks',
@@ -272,6 +275,7 @@ docSourceEnv.Library(
'$BUILD_DIR/third_party/shim_snappy',
'accumulator',
'dependencies',
+ 'document_sources_idl',
'document_value',
'expression',
'granularity_rounder',
@@ -312,6 +316,7 @@ env.Library(
target='document_source_lookup',
source=[
'document_source_change_notification.cpp',
+ 'document_source_check_resume_token.cpp',
'document_source_graph_lookup.cpp',
'document_source_lookup.cpp',
'document_source_lookup_change_post_image.cpp',
@@ -391,6 +396,7 @@ env.CppUnitTest(
'$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init',
'$BUILD_DIR/mongo/db/query/collation/collator_interface_mock',
'$BUILD_DIR/mongo/db/query/query_test_service_context',
+ '$BUILD_DIR/mongo/db/repl/replmocks',
'$BUILD_DIR/mongo/db/service_context',
'$BUILD_DIR/mongo/db/service_context_noop_init',
'$BUILD_DIR/mongo/s/is_mongos',
@@ -491,3 +497,16 @@ env.Library(
'$BUILD_DIR/mongo/db/stats/serveronly',
],
)
+
+env.Library(
+ target='document_sources_idl',
+ source=[
+ env.Idlc('document_sources.idl')[0],
+ 'resume_token.cpp',
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/base',
+ '$BUILD_DIR/mongo/idl/idl_parser',
+ 'document_value',
+ ],
+)
diff --git a/src/mongo/db/pipeline/document_source_change_notification.cpp b/src/mongo/db/pipeline/document_source_change_notification.cpp
index d173d1183d5..c463c3574c9 100644
--- a/src/mongo/db/pipeline/document_source_change_notification.cpp
+++ b/src/mongo/db/pipeline/document_source_change_notification.cpp
@@ -31,14 +31,17 @@
#include "mongo/db/pipeline/document_source_change_notification.h"
#include "mongo/bson/simple_bsonelement_comparator.h"
+#include "mongo/db/pipeline/document_source_check_resume_token.h"
#include "mongo/db/pipeline/document_source_limit.h"
#include "mongo/db/pipeline/document_source_lookup_change_post_image.h"
#include "mongo/db/pipeline/document_source_match.h"
#include "mongo/db/pipeline/document_source_sort.h"
#include "mongo/db/pipeline/expression.h"
#include "mongo/db/pipeline/lite_parsed_document_source.h"
+#include "mongo/db/pipeline/resume_token.h"
#include "mongo/db/repl/oplog_entry.h"
#include "mongo/db/repl/oplog_entry_gen.h"
+#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/util/log.h"
namespace mongo {
@@ -115,7 +118,9 @@ private:
};
} // namespace
-BSONObj DocumentSourceChangeNotification::buildMatchFilter(const NamespaceString& nss) {
+BSONObj DocumentSourceChangeNotification::buildMatchFilter(const NamespaceString& nss,
+ Timestamp startFrom,
+ bool isResume) {
auto target = nss.ns();
// 1) Supported commands that have the target db namespace (e.g. test.$cmd) in "ns" field.
@@ -140,7 +145,9 @@ BSONObj DocumentSourceChangeNotification::buildMatchFilter(const NamespaceString
auto opMatch = BSON("ns" << target);
// Match oplog entries after "start" and are either (3) supported commands or (4) CRUD ops.
- return BSON("ts" << GT << Timestamp() << "$or" << BSON_ARRAY(opMatch << commandMatch));
+ // Include the resume token if resuming, so we can verify it was still present in the oplog.
+ return BSON("ts" << (isResume ? GTE : GT) << startFrom << "$or"
+ << BSON_ARRAY(opMatch << commandMatch));
}
list<intrusive_ptr<DocumentSource>> DocumentSourceChangeNotification::createFromBson(
@@ -153,44 +160,39 @@ list<intrusive_ptr<DocumentSource>> DocumentSourceChangeNotification::createFrom
"Only default collation is allowed when using a $changeNotification stage.",
!expCtx->getCollator());
- uassert(40573,
- str::stream() << "the $changeNotification stage must be specified as an object, got "
- << typeName(elem.type()),
- elem.type() == BSONType::Object);
-
- bool shouldLookupPostImage = false;
- for (auto&& option : elem.embeddedObject()) {
- auto optionName = option.fieldNameStringData();
- if (optionName == "fullDocument"_sd) {
- uassert(40574,
- str::stream() << "the 'fullDocument' option to the $changeNotification stage "
- "must be a string, got "
- << typeName(option.type()),
- option.type() == BSONType::String);
- auto fullDocOption = option.valueStringData();
- uassert(40575,
- str::stream() << "unrecognized value for the 'fullDocument' option to the "
- "$changeNotification stage. Expected \"none\" or "
- "\"fullDocument\", got \""
- << option.String()
- << "\"",
- fullDocOption == "lookup"_sd || fullDocOption == "none"_sd);
- shouldLookupPostImage = (fullDocOption == "lookup"_sd);
- } else if (optionName == "resumeAfter"_sd) {
- uasserted(
- 40576,
- "the 'resumeAfter' option to the $changeNotification stage is not yet supported");
- } else {
- uasserted(40577,
- str::stream() << "unrecognized option to $changeNotification stage: \""
- << optionName
- << "\"");
- }
+ auto replCoord = repl::ReplicationCoordinator::get(expCtx->opCtx);
+ uassert(40573, "The $changeNotification stage is only supported on replica sets", replCoord);
+ Timestamp startFrom = replCoord->getLastCommittedOpTime().getTimestamp();
+
+ intrusive_ptr<DocumentSourceCheckResumeToken> resumeStage = nullptr;
+ auto spec = DocumentSourceChangeNotificationSpec::parse(
+ IDLParserErrorContext("$changeNotification"), elem.embeddedObject());
+ if (auto resumeAfter = spec.getResumeAfter()) {
+ ResumeToken token = resumeAfter.get();
+ startFrom = token.getTimestamp();
+ DocumentSourceCheckResumeTokenSpec spec;
+ spec.setResumeToken(std::move(token));
+ resumeStage = DocumentSourceCheckResumeToken::create(expCtx, std::move(spec));
}
-
- auto oplogMatch = DocumentSourceOplogMatch::create(buildMatchFilter(expCtx->ns), expCtx);
+ const bool changeStreamIsResuming = resumeStage != nullptr;
+
+ auto fullDocOption = spec.getFullDocument();
+ uassert(40575,
+ str::stream() << "unrecognized value for the 'fullDocument' option to the "
+ "$changeNotification stage. Expected \"none\" or "
+ "\"lookup\", got \""
+ << fullDocOption
+ << "\"",
+ fullDocOption == "lookup"_sd || fullDocOption == "none"_sd);
+ const bool shouldLookupPostImage = (fullDocOption == "lookup"_sd);
+
+ auto oplogMatch = DocumentSourceOplogMatch::create(
+ buildMatchFilter(expCtx->ns, startFrom, changeStreamIsResuming), expCtx);
auto transformation = createTransformationStage(elem.embeddedObject(), expCtx);
list<intrusive_ptr<DocumentSource>> stages = {oplogMatch, transformation};
+ if (resumeStage) {
+ stages.push_back(resumeStage);
+ }
if (shouldLookupPostImage) {
stages.push_back(DocumentSourceLookupChangePostImage::create(expCtx));
}
diff --git a/src/mongo/db/pipeline/document_source_change_notification.h b/src/mongo/db/pipeline/document_source_change_notification.h
index ad10a5ad210..036c958888f 100644
--- a/src/mongo/db/pipeline/document_source_change_notification.h
+++ b/src/mongo/db/pipeline/document_source_change_notification.h
@@ -30,6 +30,7 @@
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/document_source_single_document_transformation.h"
+#include "mongo/db/pipeline/document_sources_gen.h"
namespace mongo {
@@ -119,7 +120,7 @@ public:
* Produce the BSON object representing the filter for the $match stage to filter oplog entries
* to only those relevant for this $changeNotification stage.
*/
- static BSONObj buildMatchFilter(const NamespaceString& nss);
+ static BSONObj buildMatchFilter(const NamespaceString& nss, Timestamp startFrom, bool isResume);
/**
* Parses a $changeNotification stage from 'elem' and produces the $match and transformation
diff --git a/src/mongo/db/pipeline/document_source_change_notification_test.cpp b/src/mongo/db/pipeline/document_source_change_notification_test.cpp
index 9c08bd91637..ffc9460c96d 100644
--- a/src/mongo/db/pipeline/document_source_change_notification_test.cpp
+++ b/src/mongo/db/pipeline/document_source_change_notification_test.cpp
@@ -44,6 +44,7 @@
#include "mongo/db/pipeline/value.h"
#include "mongo/db/pipeline/value_comparator.h"
#include "mongo/db/repl/oplog_entry.h"
+#include "mongo/db/repl/replication_coordinator_mock.h"
#include "mongo/unittest/unittest.h"
namespace mongo {
@@ -65,9 +66,15 @@ static const Timestamp ts(100, 1);
static const repl::OpTime optime(ts, 1);
static const NamespaceString nss("unittests.change_notification");
+using ChangeNotificationStageTestNoSetup = AggregationContextFixture;
+
class ChangeNotificationStageTest : public AggregationContextFixture {
public:
- ChangeNotificationStageTest() : AggregationContextFixture(nss) {}
+ ChangeNotificationStageTest() : AggregationContextFixture(nss) {
+ repl::ReplicationCoordinator::set(getExpCtx()->opCtx->getServiceContext(),
+ stdx::make_unique<repl::ReplicationCoordinatorMock>(
+ getExpCtx()->opCtx->getServiceContext()));
+ }
void checkTransformation(const OplogEntry& entry, const boost::optional<Document> expectedDoc) {
const auto spec = fromjson("{$changeNotification: {}}");
@@ -106,19 +113,7 @@ TEST_F(ChangeNotificationStageTest, ShouldRejectUnrecognizedOption) {
BSON(DSChangeNotification::kStageName << BSON("unexpected" << 4)).firstElement(),
expCtx),
UserException,
- 40577);
-}
-
-TEST_F(ChangeNotificationStageTest, ShouldRejectResumeAfterOption) {
- // TODO SERVER-29131 change this test to accept the option.
- auto expCtx = getExpCtx();
-
- ASSERT_THROWS_CODE(
- DSChangeNotification::createFromBson(
- BSON(DSChangeNotification::kStageName << BSON("resumeAfter" << ts)).firstElement(),
- expCtx),
- UserException,
- 40576);
+ 40415);
}
TEST_F(ChangeNotificationStageTest, ShouldRejectNonStringFullDocumentOption) {
@@ -129,7 +124,7 @@ TEST_F(ChangeNotificationStageTest, ShouldRejectNonStringFullDocumentOption) {
BSON(DSChangeNotification::kStageName << BSON("fullDocument" << true)).firstElement(),
expCtx),
UserException,
- 40574);
+ ErrorCodes::TypeMismatch);
}
TEST_F(ChangeNotificationStageTest, ShouldRejectUnrecognizedFullDocumentOption) {
@@ -144,6 +139,15 @@ TEST_F(ChangeNotificationStageTest, ShouldRejectUnrecognizedFullDocumentOption)
40575);
}
+TEST_F(ChangeNotificationStageTestNoSetup, FailsWithNoReplicationCoordinator) {
+ const auto spec = fromjson("{$changeNotification: {}}");
+
+ ASSERT_THROWS_CODE(
+ DocumentSourceChangeNotification::createFromBson(spec.firstElement(), getExpCtx()),
+ UserException,
+ 40573);
+}
+
TEST_F(ChangeNotificationStageTest, StagesGeneratedCorrectly) {
const auto spec = fromjson("{$changeNotification: {}}");
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.cpp b/src/mongo/db/pipeline/document_source_check_resume_token.cpp
new file mode 100644
index 00000000000..beaa8844dcb
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_check_resume_token.cpp
@@ -0,0 +1,80 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/pipeline/document_source_check_resume_token.h"
+
+using boost::intrusive_ptr;
+namespace mongo {
+const char* DocumentSourceCheckResumeToken::getSourceName() const {
+ return "$_checkResumeToken";
+}
+
+Value DocumentSourceCheckResumeToken::serialize(
+ boost::optional<ExplainOptions::Verbosity> explain) const {
+ // This stage is created by the DocumentSourceChangeNotification stage, so serializing it here
+ // would result in it being created twice.
+ return Value();
+}
+
+intrusive_ptr<DocumentSourceCheckResumeToken> DocumentSourceCheckResumeToken::create(
+ const intrusive_ptr<ExpressionContext>& expCtx, DocumentSourceCheckResumeTokenSpec spec) {
+ return new DocumentSourceCheckResumeToken(expCtx, std::move(spec));
+}
+
+DocumentSourceCheckResumeToken::DocumentSourceCheckResumeToken(
+ const intrusive_ptr<ExpressionContext>& expCtx, DocumentSourceCheckResumeTokenSpec spec)
+ : DocumentSource(expCtx), _token(spec.getResumeToken()), _seenDoc(false) {}
+
+DocumentSource::GetNextResult DocumentSourceCheckResumeToken::getNext() {
+ pExpCtx->checkForInterrupt();
+
+ auto nextInput = pSource->getNext();
+ uassert(40584,
+ "resume of change notification was not possible, as no change data was found. ",
+ _seenDoc || !nextInput.isEOF());
+
+ if (_seenDoc || !nextInput.isAdvanced())
+ return nextInput;
+
+ _seenDoc = true;
+ auto doc = nextInput.getDocument();
+
+ ResumeToken receivedToken(doc["_id"]);
+ uassert(
+ 40585,
+ str::stream()
+ << "resume of change notification was not possible, as the resume token was not found. "
+ << receivedToken.toDocument().toString(),
+ receivedToken == _token);
+ // Don't return the document which has the token; the user has already seen it.
+ return pSource->getNext();
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.h b/src/mongo/db/pipeline/document_source_check_resume_token.h
new file mode 100644
index 00000000000..13706b937b9
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_check_resume_token.h
@@ -0,0 +1,79 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_sources_gen.h"
+#include "mongo/db/pipeline/resume_token.h"
+
+namespace mongo {
+
+/**
+ * This stage is used internally for change notifications to ensure that the resume token is in the
+ * stream. It is not intended to be created by the user.
+ */
+class DocumentSourceCheckResumeToken final : public DocumentSource,
+ public SplittableDocumentSource {
+public:
+ GetNextResult getNext() final;
+ const char* getSourceName() const final;
+
+ /**
+ * SplittableDocumentSource methods; this has to run on the merger, since the resume point could
+ * be at any shard.
+ */
+ boost::intrusive_ptr<DocumentSource> getShardSource() final {
+ return nullptr;
+ };
+ boost::intrusive_ptr<DocumentSource> getMergeSource() final {
+ return this;
+ };
+
+ Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
+
+ static boost::intrusive_ptr<DocumentSourceCheckResumeToken> create(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ DocumentSourceCheckResumeTokenSpec spec);
+
+ const ResumeToken& getTokenForTest() {
+ return _token;
+ }
+
+private:
+ /**
+ * Use the create static method to create a DocumentSourceCheckResumeToken.
+ */
+ DocumentSourceCheckResumeToken(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ DocumentSourceCheckResumeTokenSpec spec);
+
+ ResumeToken _token;
+ bool _seenDoc;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
new file mode 100644
index 00000000000..a9f057ee353
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
@@ -0,0 +1,191 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include <boost/intrusive_ptr.hpp>
+#include <memory>
+
+#include "mongo/bson/bsonelement.h"
+#include "mongo/bson/bsonobj.h"
+#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/db/pipeline/aggregation_context_fixture.h"
+#include "mongo/db/pipeline/document_source_check_resume_token.h"
+#include "mongo/db/pipeline/document_source_mock.h"
+#include "mongo/db/pipeline/document_value_test_util.h"
+#include "mongo/db/pipeline/expression_context.h"
+#include "mongo/db/pipeline/resume_token.h"
+#include "mongo/db/service_context.h"
+#include "mongo/stdx/memory.h"
+#include "mongo/unittest/death_test.h"
+#include "mongo/unittest/unittest.h"
+
+using boost::intrusive_ptr;
+
+namespace mongo {
+namespace {
+static constexpr StringData kTestNs = "test.ns"_sd;
+
+class CheckResumeTokenTest : public AggregationContextFixture {
+public:
+ CheckResumeTokenTest() : _mock(DocumentSourceMock::create()) {}
+
+protected:
+ /**
+ * Puts an arbitrary document with resume token corresponding to the given timestamp, id, and
+ * namespace in the mock queue.
+ */
+ void addDocument(Timestamp ts, std::string id, StringData ns = kTestNs) {
+ _mock->queue.push_back(
+ Document({{"_id", Document({{"ts", ts}, {"ns", ns}, {"_id", id}})}}));
+ }
+
+ void addPause() {
+ _mock->queue.push_back(DocumentSource::GetNextResult::makePauseExecution());
+ }
+
+ /**
+ * Convenience method to create the class under test with a given timestamp, id, and namespace.
+ */
+ intrusive_ptr<DocumentSourceCheckResumeToken> createCheckResumeToken(Timestamp ts,
+ StringData id,
+ StringData ns = kTestNs) {
+ auto token = ResumeToken::parse(BSON("ts" << ts << "_id" << id << "ns" << ns));
+ DocumentSourceCheckResumeTokenSpec spec;
+ spec.setResumeToken(token);
+ auto checkResumeToken = DocumentSourceCheckResumeToken::create(getExpCtx(), spec);
+ checkResumeToken->setSource(_mock.get());
+ return checkResumeToken;
+ }
+
+private:
+ intrusive_ptr<DocumentSourceMock> _mock;
+};
+
+TEST_F(CheckResumeTokenTest, ShouldSucceedWithOnlyResumeToken) {
+ Timestamp resumeTimestamp(100, 1);
+
+ auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "1");
+ addDocument(resumeTimestamp, "1");
+ // We should not see the resume token.
+ ASSERT_TRUE(checkResumeToken->getNext().isEOF());
+}
+
+TEST_F(CheckResumeTokenTest, ShouldSucceedWithPausesBeforeResumeToken) {
+ Timestamp resumeTimestamp(100, 1);
+
+ auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "1");
+ addPause();
+ addDocument(resumeTimestamp, "1");
+
+ // We see the pause we inserted, but not the resume token.
+ ASSERT_TRUE(checkResumeToken->getNext().isPaused());
+ ASSERT_TRUE(checkResumeToken->getNext().isEOF());
+}
+
+TEST_F(CheckResumeTokenTest, ShouldSucceedWithPausesAfterResumeToken) {
+ Timestamp resumeTimestamp(100, 1);
+ Timestamp doc1Timestamp(100, 2);
+
+ auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "1");
+ addDocument(resumeTimestamp, "1");
+ addPause();
+ addDocument(doc1Timestamp, "2");
+
+ // Pause added explicitly.
+ ASSERT_TRUE(checkResumeToken->getNext().isPaused());
+ // The document after the resume token should be the first.
+ auto result1 = checkResumeToken->getNext();
+ ASSERT_TRUE(result1.isAdvanced());
+ auto& doc1 = result1.getDocument();
+ ASSERT_VALUE_EQ(Value(doc1Timestamp), doc1["_id"].getDocument()["ts"]);
+ ASSERT_TRUE(checkResumeToken->getNext().isEOF());
+}
+
+TEST_F(CheckResumeTokenTest, ShouldSucceedWithMultipleDocumentsAfterResumeToken) {
+ Timestamp resumeTimestamp(100, 1);
+
+ auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "0");
+ addDocument(resumeTimestamp, "0");
+
+ Timestamp doc1Timestamp(100, 2);
+ Timestamp doc2Timestamp(101, 1);
+ addDocument(doc1Timestamp, "1");
+ addDocument(doc2Timestamp, "2");
+
+ auto result1 = checkResumeToken->getNext();
+ ASSERT_TRUE(result1.isAdvanced());
+ auto& doc1 = result1.getDocument();
+ ASSERT_VALUE_EQ(Value(doc1Timestamp), doc1["_id"].getDocument()["ts"]);
+ auto result2 = checkResumeToken->getNext();
+ ASSERT_TRUE(result2.isAdvanced());
+ auto& doc2 = result2.getDocument();
+ ASSERT_VALUE_EQ(Value(doc2Timestamp), doc2["_id"].getDocument()["ts"]);
+ ASSERT_TRUE(checkResumeToken->getNext().isEOF());
+}
+
+TEST_F(CheckResumeTokenTest, ShouldFailIfFirstDocHasWrongResumeToken) {
+ Timestamp resumeTimestamp(100, 1);
+
+ auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "1");
+
+ Timestamp doc1Timestamp(100, 2);
+ Timestamp doc2Timestamp(101, 1);
+ addDocument(doc1Timestamp, "1");
+ addDocument(doc2Timestamp, "2");
+ ASSERT_THROWS_CODE(checkResumeToken->getNext(), UserException, 40585);
+}
+
+TEST_F(CheckResumeTokenTest, ShouldFailIfTokenHasWrongDocumentId) {
+ Timestamp resumeTimestamp(100, 1);
+
+ auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "0");
+ addDocument(resumeTimestamp, "1");
+ ASSERT_THROWS_CODE(checkResumeToken->getNext(), UserException, 40585);
+}
+
+TEST_F(CheckResumeTokenTest, ShouldFailIfTokenHasWrongNamespace) {
+ Timestamp resumeTimestamp(100, 1);
+
+ auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "1", "test1.ns");
+ addDocument(resumeTimestamp, "1", "test2.ns");
+ ASSERT_THROWS_CODE(checkResumeToken->getNext(), UserException, 40585);
+}
+
+/**
+ * We should _error_ on the no-document case, because that means the resume token was not found.
+ */
+TEST_F(CheckResumeTokenTest, ShouldFailWithNoDocuments) {
+ Timestamp resumeTimestamp(100, 1);
+
+ auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "0");
+ ASSERT_THROWS_CODE(checkResumeToken->getNext(), UserException, 40584);
+}
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_sources.idl b/src/mongo/db/pipeline/document_sources.idl
new file mode 100644
index 00000000000..de67f25bb02
--- /dev/null
+++ b/src/mongo/db/pipeline/document_sources.idl
@@ -0,0 +1,103 @@
+# Copyright (C) 2017 MongoDB Inc.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License, version 3,
+# as published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+# As a special exception, the copyright holders give permission to link the
+# code of portions of this program with the OpenSSL library under certain
+# conditions as described in each individual source file and distribute
+# linked combinations including the program with the OpenSSL library. You
+# must comply with the GNU Affero General Public License in all respects for
+# all of the code used other than as permitted herein. If you modify file(s)
+# with this exception, you may extend this exception to your version of the
+# file(s), but you are not obligated to do so. If you do not wish to do so,
+# delete this exception statement from your version. If you delete this
+# exception statement from all source files in the program, then also delete
+# it in the license file.
+
+# Document source pipeline stage IDL file
+
+global:
+ cpp_namespace: "mongo"
+ cpp_includes:
+ - "mongo/db/pipeline/resume_token.h"
+
+imports:
+ - "mongo/idl/basic_types.idl"
+
+types:
+ # A resume token could be parsed as a struct, but since we may make it opaque in the future, we
+ # parse it as a type with a custom class now.
+ resumeToken:
+ bson_serialization_type: object
+ description: An object representing a resume token for change notification
+ cpp_type: ResumeToken
+ serializer: ResumeToken::toBSON
+ deserializer: ResumeToken::parse
+
+ # The _id element in a resume token can be any BSON element, so we need a custom type which
+ # leaves it as a BSONElement
+ resumeTokenOpaqueId:
+ bson_serialization_type: any
+ description: The document id contained within a resume token
+ cpp_type: Value
+ serializer: Value::serializeForIDL
+ deserializer: Value::deserializeForIDL
+
+structs:
+ DocumentSourceChangeNotificationSpec:
+ description: A document used to specify the $changeNotification stage of an aggregation
+ pipeline.
+ fields:
+ resumeAfter:
+ cpp_name: resumeAfter
+ type: resumeToken
+ optional: true
+ description: An object representing the point at which we should resume reporting
+ changes from.
+ fullDocument:
+ cpp_name: fullDocument
+ type: string
+ default: '"none"'
+ description: A string '"lookup"' or '"none"', indicating whether or not we should
+ return a full document or just changes for an update.
+
+
+ DocumentSourceCheckResumeTokenSpec:
+ description: A document used to specify the internal stage which checks the presence of the
+ resume token.
+ fields:
+ resumeToken:
+ cpp_name: resumeToken
+ type: resumeToken
+ description: The resume token which is required to be present in the pipeline.
+
+
+ ResumeTokenInternal:
+ description: The internal format of a resume token. For use by the ResumeToken class
+ only.
+ fields:
+ ts:
+ cpp_name: timestamp
+ type: timestamp
+ description: The timestamp of the oplog entry represented by this resume token.
+
+ ns:
+ cpp_name: ns
+ type: string
+ description: The namespace of the oplog entry represented by this resume token.
+
+ _id:
+ cpp_name: documentId
+ type: resumeTokenOpaqueId
+ description: The document key of the document in the oplog entry represented by this
+ resume token.
diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp
index 9909895feb3..91d04bdf2a2 100644
--- a/src/mongo/db/pipeline/pipeline_test.cpp
+++ b/src/mongo/db/pipeline/pipeline_test.cpp
@@ -48,6 +48,7 @@
#include "mongo/db/pipeline/pipeline.h"
#include "mongo/db/query/collation/collator_interface_mock.h"
#include "mongo/db/query/query_test_service_context.h"
+#include "mongo/db/repl/replication_coordinator_mock.h"
#include "mongo/dbtests/dbtests.h"
namespace PipelineTests {
@@ -58,6 +59,14 @@ using std::vector;
const NamespaceString kTestNss = NamespaceString("a.collection");
+namespace {
+void setMockReplicationCoordinatorOnOpCtx(OperationContext* opCtx) {
+ repl::ReplicationCoordinator::set(
+ opCtx->getServiceContext(),
+ stdx::make_unique<repl::ReplicationCoordinatorMock>(opCtx->getServiceContext()));
+}
+} // namespace
+
namespace Optimizations {
using namespace mongo;
@@ -974,6 +983,7 @@ TEST(PipelineOptimizationTest, ChangeNotificationLookupSwapsWithIndependentMatch
intrusive_ptr<ExpressionContext> expCtx(new ExpressionContextForTest(kTestNss));
expCtx->opCtx = opCtx.get();
+ setMockReplicationCoordinatorOnOpCtx(expCtx->opCtx);
auto spec = BSON("$changeNotification" << BSON("fullDocument"
<< "lookup"));
@@ -998,6 +1008,7 @@ TEST(PipelineOptimizationTest, ChangeNotificationLookupDoesNotSwapWithMatchOnPos
intrusive_ptr<ExpressionContext> expCtx(new ExpressionContextForTest(kTestNss));
expCtx->opCtx = opCtx.get();
+ setMockReplicationCoordinatorOnOpCtx(expCtx->opCtx);
auto spec = BSON("$changeNotification" << BSON("fullDocument"
<< "lookup"));
@@ -1472,6 +1483,7 @@ TEST_F(PipelineInitialSourceNSTest, AggregateOneNSValidForFacetPipelineRegardles
TEST_F(PipelineInitialSourceNSTest, ChangeNotificationIsValidAsFirstStage) {
const std::vector<BSONObj> rawPipeline = {fromjson("{$changeNotification: {}}")};
auto ctx = getExpCtx();
+ setMockReplicationCoordinatorOnOpCtx(ctx->opCtx);
ctx->ns = NamespaceString("a.collection");
ASSERT_OK(Pipeline::parse(rawPipeline, ctx).getStatus());
}
@@ -1480,6 +1492,7 @@ TEST_F(PipelineInitialSourceNSTest, ChangeNotificationIsNotValidIfNotFirstStage)
const std::vector<BSONObj> rawPipeline = {fromjson("{$match: {custom: 'filter'}}"),
fromjson("{$changeNotification: {}}")};
auto ctx = getExpCtx();
+ setMockReplicationCoordinatorOnOpCtx(ctx->opCtx);
ctx->ns = NamespaceString("a.collection");
auto parseStatus = Pipeline::parse(rawPipeline, ctx).getStatus();
ASSERT_EQ(parseStatus, ErrorCodes::BadValue);
@@ -1490,6 +1503,7 @@ TEST_F(PipelineInitialSourceNSTest, ChangeNotificationIsNotValidIfNotFirstStageI
const std::vector<BSONObj> rawPipeline = {fromjson("{$match: {custom: 'filter'}}"),
fromjson("{$changeNotification: {}}")};
auto ctx = getExpCtx();
+ setMockReplicationCoordinatorOnOpCtx(ctx->opCtx);
ctx->ns = NamespaceString("a.collection");
auto parseStatus = Pipeline::parseFacetPipeline(rawPipeline, ctx).getStatus();
ASSERT_EQ(parseStatus, ErrorCodes::BadValue);
diff --git a/src/mongo/db/pipeline/resume_token.cpp b/src/mongo/db/pipeline/resume_token.cpp
new file mode 100644
index 00000000000..5b409f1f113
--- /dev/null
+++ b/src/mongo/db/pipeline/resume_token.cpp
@@ -0,0 +1,79 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/pipeline/resume_token.h"
+
+#include "mongo/bson/bsonmisc.h"
+#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/db/pipeline/document.h"
+#include "mongo/db/pipeline/document_sources_gen.h"
+#include "mongo/db/pipeline/value_comparator.h"
+
+namespace mongo {
+
+ResumeToken::ResumeToken(const BSONObj& resumeBson) {
+ auto token = ResumeTokenInternal::parse(
+ IDLParserErrorContext("$changeNotification.resumeAfter"), resumeBson);
+ _timestamp = token.getTimestamp();
+ _namespace = token.getNs().toString();
+ _documentId = token.getDocumentId();
+}
+
+ResumeToken::ResumeToken(const Value& resumeValue) {
+ Document resumeTokenDoc = resumeValue.getDocument();
+ Value timestamp = resumeTokenDoc[ResumeTokenInternal::kTimestampFieldName];
+ _timestamp = timestamp.getTimestamp();
+ Value ns = resumeTokenDoc[ResumeTokenInternal::kNsFieldName];
+ _namespace = ns.getString();
+ _documentId = resumeTokenDoc[ResumeTokenInternal::kDocumentIdFieldName];
+}
+
+bool ResumeToken::operator==(const ResumeToken& other) {
+ return _timestamp == other._timestamp && _namespace == other._namespace &&
+ ValueComparator::kInstance.evaluate(_documentId == other._documentId);
+}
+
+Document ResumeToken::toDocument() const {
+ return Document({{ResumeTokenInternal::kTimestampFieldName, _timestamp},
+ {{ResumeTokenInternal::kNsFieldName}, _namespace},
+ {{ResumeTokenInternal::kDocumentIdFieldName}, _documentId}});
+}
+
+BSONObj ResumeToken::toBSON() const {
+ return BSON(
+ ResumeTokenInternal::kTimestampFieldName << _timestamp << ResumeTokenInternal::kNsFieldName
+ << _namespace
+ << ResumeTokenInternal::kDocumentIdFieldName
+ << _documentId);
+}
+
+ResumeToken ResumeToken::parse(const BSONObj& resumeBson) {
+ return ResumeToken(resumeBson);
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/resume_token.h b/src/mongo/db/pipeline/resume_token.h
new file mode 100644
index 00000000000..733e285e599
--- /dev/null
+++ b/src/mongo/db/pipeline/resume_token.h
@@ -0,0 +1,75 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/base/string_data.h"
+#include "mongo/bson/bsonobj.h"
+#include "mongo/bson/timestamp.h"
+#include "mongo/db/pipeline/value.h"
+
+namespace mongo {
+/**
+ * A token passed in by the user to indicate where in the oplog we should start for
+ * $changeNotification.
+ */
+class ResumeToken {
+public:
+ /**
+ * The default no-argument constructor is required by the IDL for types used as non-optional
+ * fields.
+ */
+ ResumeToken() = default;
+ explicit ResumeToken(const Value& resumeValue);
+ bool operator==(const ResumeToken&);
+
+ Timestamp getTimestamp() const {
+ return _timestamp;
+ }
+
+ Document toDocument() const;
+
+ BSONObj toBSON() const;
+
+ /**
+ * Parse a resume token from a BSON object; used as an interface to the IDL parser.
+ */
+ static ResumeToken parse(const BSONObj& obj);
+
+private:
+ /**
+ * Construct from a BSON object.
+ * External callers should use the static ResumeToken::parse(const BSONObj&) method instead.
+ */
+ explicit ResumeToken(const BSONObj& resumeBson);
+
+ Timestamp _timestamp;
+ std::string _namespace;
+ Value _documentId;
+};
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/value.cpp b/src/mongo/db/pipeline/value.cpp
index d70c6b65cc9..a7bce3fc777 100644
--- a/src/mongo/db/pipeline/value.cpp
+++ b/src/mongo/db/pipeline/value.cpp
@@ -1323,4 +1323,17 @@ Value Value::deserializeForSorter(BufReader& buf, const SorterDeserializeSetting
}
verify(false);
}
+
+void Value::serializeForIDL(StringData fieldName, BSONObjBuilder* builder) const {
+ addToBsonObj(builder, fieldName);
+}
+
+void Value::serializeForIDL(BSONArrayBuilder* builder) const {
+ addToBsonArray(builder);
}
+
+Value Value::deserializeForIDL(const BSONElement& element) {
+ return Value(element);
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/value.h b/src/mongo/db/pipeline/value.h
index 04e396dc58d..40c05d89b9a 100644
--- a/src/mongo/db/pipeline/value.h
+++ b/src/mongo/db/pipeline/value.h
@@ -328,6 +328,11 @@ public:
return *this;
}
+ /// Members to support parsing/deserialization from IDL generated code.
+ void serializeForIDL(StringData fieldName, BSONObjBuilder* builder) const;
+ void serializeForIDL(BSONArrayBuilder* builder) const;
+ static Value deserializeForIDL(const BSONElement& element);
+
private:
/** This is a "honeypot" to prevent unexpected implicit conversions to the accepted argument
* types. bool is especially bad since without this it will accept any pointer.