diff options
author | Matthew Russotto <matthew.russotto@10gen.com> | 2017-08-03 13:55:23 -0400 |
---|---|---|
committer | Matthew Russotto <matthew.russotto@10gen.com> | 2017-08-03 14:32:12 -0400 |
commit | 0686b1ed2d042ad5adb671912e7d6b9cc89d0c4f (patch) | |
tree | 5fe3f0a6d4e8445eaec46b564b55cfc6753cbcac /src/mongo/db | |
parent | 07d4d94b06c6899699410312e20ef33d954ddbd1 (diff) | |
download | mongo-0686b1ed2d042ad5adb671912e7d6b9cc89d0c4f.tar.gz |
SERVER-29131 Support resumeAfter option to control where to start returning notifications from, which always errors if no entry with the given resumeToken exists
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/commands/getmore_cmd.cpp | 9 | ||||
-rw-r--r-- | src/mongo/db/pipeline/SConscript | 19 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_change_notification.cpp | 76 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_change_notification.h | 3 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_change_notification_test.cpp | 34 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_check_resume_token.cpp | 80 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_check_resume_token.h | 79 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_check_resume_token_test.cpp | 191 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_sources.idl | 103 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline_test.cpp | 14 | ||||
-rw-r--r-- | src/mongo/db/pipeline/resume_token.cpp | 79 | ||||
-rw-r--r-- | src/mongo/db/pipeline/resume_token.h | 75 | ||||
-rw-r--r-- | src/mongo/db/pipeline/value.cpp | 13 | ||||
-rw-r--r-- | src/mongo/db/pipeline/value.h | 5 |
14 files changed, 725 insertions, 55 deletions
diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp index 58a1622c49b..277d8f2e660 100644 --- a/src/mongo/db/commands/getmore_cmd.cpp +++ b/src/mongo/db/commands/getmore_cmd.cpp @@ -66,6 +66,9 @@ namespace mongo { namespace { MONGO_FP_DECLARE(rsStopGetMoreCmd); +// Failpoint for making getMore not wait for an awaitdata cursor. Allows us to avoid waiting during +// tests. +MONGO_FP_DECLARE(disableAwaitDataForGetMoreCmd); } // namespace /** @@ -281,6 +284,8 @@ public: const bool hasOwnMaxTime = opCtx->hasDeadline(); + const bool disableAwaitDataFailpointActive = + MONGO_FAIL_POINT(disableAwaitDataForGetMoreCmd); // We assume that cursors created through a DBDirectClient are always used from their // original OperationContext, so we do not need to move time to and from the cursor. if (!hasOwnMaxTime && !opCtx->getClient()->isInDirectClient()) { @@ -288,7 +293,7 @@ public: // awaitData, then we supply a default time of one second. Otherwise we roll over // any leftover time from the maxTimeMS of the operation that spawned this cursor, // applying it to this getMore. - if (isCursorAwaitData(cursor)) { + if (isCursorAwaitData(cursor) && !disableAwaitDataFailpointActive) { opCtx->setDeadlineAfterNowBy(Seconds{1}); } else if (cursor->getLeftoverMaxTimeMicros() < Microseconds::max()) { opCtx->setDeadlineAfterNowBy(cursor->getLeftoverMaxTimeMicros()); @@ -329,7 +334,7 @@ public: Explain::getSummaryStats(*exec, &preExecutionStats); // Mark this as an AwaitData operation if appropriate. - if (isCursorAwaitData(cursor)) { + if (isCursorAwaitData(cursor) && !disableAwaitDataFailpointActive) { if (request.lastKnownCommittedOpTime) clientsLastKnownCommittedOpTime(opCtx) = request.lastKnownCommittedOpTime.get(); shouldWaitForInserts(opCtx) = true; diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index 093cbb5a5c9..e548ff4f464 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -130,6 +130,7 @@ env.CppUnitTest( 'document_source_bucket_auto_test.cpp', 'document_source_bucket_test.cpp', 'document_source_change_notification_test.cpp', + 'document_source_check_resume_token_test.cpp', 'document_source_count_test.cpp', 'document_source_current_op_test.cpp', 'document_source_geo_near_test.cpp', @@ -157,6 +158,7 @@ env.CppUnitTest( '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init', '$BUILD_DIR/mongo/db/query/query_test_service_context', '$BUILD_DIR/mongo/db/repl/oplog_entry', + '$BUILD_DIR/mongo/db/repl/replmocks', '$BUILD_DIR/mongo/db/service_context', '$BUILD_DIR/mongo/s/is_mongos', '$BUILD_DIR/mongo/util/clock_source_mock', @@ -264,6 +266,7 @@ docSourceEnv.Library( '$BUILD_DIR/mongo/db/matcher/expressions', '$BUILD_DIR/mongo/db/pipeline/lite_parsed_document_source', '$BUILD_DIR/mongo/db/repl/oplog_entry', + '$BUILD_DIR/mongo/db/repl/repl_coordinator_interface', '$BUILD_DIR/mongo/db/service_context', '$BUILD_DIR/mongo/db/stats/top', '$BUILD_DIR/mongo/db/storage/encryption_hooks', @@ -272,6 +275,7 @@ docSourceEnv.Library( '$BUILD_DIR/third_party/shim_snappy', 'accumulator', 'dependencies', + 'document_sources_idl', 'document_value', 'expression', 'granularity_rounder', @@ -312,6 +316,7 @@ env.Library( target='document_source_lookup', source=[ 'document_source_change_notification.cpp', + 'document_source_check_resume_token.cpp', 'document_source_graph_lookup.cpp', 'document_source_lookup.cpp', 'document_source_lookup_change_post_image.cpp', @@ -391,6 +396,7 @@ env.CppUnitTest( '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init', '$BUILD_DIR/mongo/db/query/collation/collator_interface_mock', '$BUILD_DIR/mongo/db/query/query_test_service_context', + '$BUILD_DIR/mongo/db/repl/replmocks', '$BUILD_DIR/mongo/db/service_context', '$BUILD_DIR/mongo/db/service_context_noop_init', '$BUILD_DIR/mongo/s/is_mongos', @@ -491,3 +497,16 @@ env.Library( '$BUILD_DIR/mongo/db/stats/serveronly', ], ) + +env.Library( + target='document_sources_idl', + source=[ + env.Idlc('document_sources.idl')[0], + 'resume_token.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/base', + '$BUILD_DIR/mongo/idl/idl_parser', + 'document_value', + ], +) diff --git a/src/mongo/db/pipeline/document_source_change_notification.cpp b/src/mongo/db/pipeline/document_source_change_notification.cpp index d173d1183d5..c463c3574c9 100644 --- a/src/mongo/db/pipeline/document_source_change_notification.cpp +++ b/src/mongo/db/pipeline/document_source_change_notification.cpp @@ -31,14 +31,17 @@ #include "mongo/db/pipeline/document_source_change_notification.h" #include "mongo/bson/simple_bsonelement_comparator.h" +#include "mongo/db/pipeline/document_source_check_resume_token.h" #include "mongo/db/pipeline/document_source_limit.h" #include "mongo/db/pipeline/document_source_lookup_change_post_image.h" #include "mongo/db/pipeline/document_source_match.h" #include "mongo/db/pipeline/document_source_sort.h" #include "mongo/db/pipeline/expression.h" #include "mongo/db/pipeline/lite_parsed_document_source.h" +#include "mongo/db/pipeline/resume_token.h" #include "mongo/db/repl/oplog_entry.h" #include "mongo/db/repl/oplog_entry_gen.h" +#include "mongo/db/repl/replication_coordinator.h" #include "mongo/util/log.h" namespace mongo { @@ -115,7 +118,9 @@ private: }; } // namespace -BSONObj DocumentSourceChangeNotification::buildMatchFilter(const NamespaceString& nss) { +BSONObj DocumentSourceChangeNotification::buildMatchFilter(const NamespaceString& nss, + Timestamp startFrom, + bool isResume) { auto target = nss.ns(); // 1) Supported commands that have the target db namespace (e.g. test.$cmd) in "ns" field. @@ -140,7 +145,9 @@ BSONObj DocumentSourceChangeNotification::buildMatchFilter(const NamespaceString auto opMatch = BSON("ns" << target); // Match oplog entries after "start" and are either (3) supported commands or (4) CRUD ops. - return BSON("ts" << GT << Timestamp() << "$or" << BSON_ARRAY(opMatch << commandMatch)); + // Include the resume token if resuming, so we can verify it was still present in the oplog. + return BSON("ts" << (isResume ? GTE : GT) << startFrom << "$or" + << BSON_ARRAY(opMatch << commandMatch)); } list<intrusive_ptr<DocumentSource>> DocumentSourceChangeNotification::createFromBson( @@ -153,44 +160,39 @@ list<intrusive_ptr<DocumentSource>> DocumentSourceChangeNotification::createFrom "Only default collation is allowed when using a $changeNotification stage.", !expCtx->getCollator()); - uassert(40573, - str::stream() << "the $changeNotification stage must be specified as an object, got " - << typeName(elem.type()), - elem.type() == BSONType::Object); - - bool shouldLookupPostImage = false; - for (auto&& option : elem.embeddedObject()) { - auto optionName = option.fieldNameStringData(); - if (optionName == "fullDocument"_sd) { - uassert(40574, - str::stream() << "the 'fullDocument' option to the $changeNotification stage " - "must be a string, got " - << typeName(option.type()), - option.type() == BSONType::String); - auto fullDocOption = option.valueStringData(); - uassert(40575, - str::stream() << "unrecognized value for the 'fullDocument' option to the " - "$changeNotification stage. Expected \"none\" or " - "\"fullDocument\", got \"" - << option.String() - << "\"", - fullDocOption == "lookup"_sd || fullDocOption == "none"_sd); - shouldLookupPostImage = (fullDocOption == "lookup"_sd); - } else if (optionName == "resumeAfter"_sd) { - uasserted( - 40576, - "the 'resumeAfter' option to the $changeNotification stage is not yet supported"); - } else { - uasserted(40577, - str::stream() << "unrecognized option to $changeNotification stage: \"" - << optionName - << "\""); - } + auto replCoord = repl::ReplicationCoordinator::get(expCtx->opCtx); + uassert(40573, "The $changeNotification stage is only supported on replica sets", replCoord); + Timestamp startFrom = replCoord->getLastCommittedOpTime().getTimestamp(); + + intrusive_ptr<DocumentSourceCheckResumeToken> resumeStage = nullptr; + auto spec = DocumentSourceChangeNotificationSpec::parse( + IDLParserErrorContext("$changeNotification"), elem.embeddedObject()); + if (auto resumeAfter = spec.getResumeAfter()) { + ResumeToken token = resumeAfter.get(); + startFrom = token.getTimestamp(); + DocumentSourceCheckResumeTokenSpec spec; + spec.setResumeToken(std::move(token)); + resumeStage = DocumentSourceCheckResumeToken::create(expCtx, std::move(spec)); } - - auto oplogMatch = DocumentSourceOplogMatch::create(buildMatchFilter(expCtx->ns), expCtx); + const bool changeStreamIsResuming = resumeStage != nullptr; + + auto fullDocOption = spec.getFullDocument(); + uassert(40575, + str::stream() << "unrecognized value for the 'fullDocument' option to the " + "$changeNotification stage. Expected \"none\" or " + "\"lookup\", got \"" + << fullDocOption + << "\"", + fullDocOption == "lookup"_sd || fullDocOption == "none"_sd); + const bool shouldLookupPostImage = (fullDocOption == "lookup"_sd); + + auto oplogMatch = DocumentSourceOplogMatch::create( + buildMatchFilter(expCtx->ns, startFrom, changeStreamIsResuming), expCtx); auto transformation = createTransformationStage(elem.embeddedObject(), expCtx); list<intrusive_ptr<DocumentSource>> stages = {oplogMatch, transformation}; + if (resumeStage) { + stages.push_back(resumeStage); + } if (shouldLookupPostImage) { stages.push_back(DocumentSourceLookupChangePostImage::create(expCtx)); } diff --git a/src/mongo/db/pipeline/document_source_change_notification.h b/src/mongo/db/pipeline/document_source_change_notification.h index ad10a5ad210..036c958888f 100644 --- a/src/mongo/db/pipeline/document_source_change_notification.h +++ b/src/mongo/db/pipeline/document_source_change_notification.h @@ -30,6 +30,7 @@ #include "mongo/db/pipeline/document_source.h" #include "mongo/db/pipeline/document_source_single_document_transformation.h" +#include "mongo/db/pipeline/document_sources_gen.h" namespace mongo { @@ -119,7 +120,7 @@ public: * Produce the BSON object representing the filter for the $match stage to filter oplog entries * to only those relevant for this $changeNotification stage. */ - static BSONObj buildMatchFilter(const NamespaceString& nss); + static BSONObj buildMatchFilter(const NamespaceString& nss, Timestamp startFrom, bool isResume); /** * Parses a $changeNotification stage from 'elem' and produces the $match and transformation diff --git a/src/mongo/db/pipeline/document_source_change_notification_test.cpp b/src/mongo/db/pipeline/document_source_change_notification_test.cpp index 9c08bd91637..ffc9460c96d 100644 --- a/src/mongo/db/pipeline/document_source_change_notification_test.cpp +++ b/src/mongo/db/pipeline/document_source_change_notification_test.cpp @@ -44,6 +44,7 @@ #include "mongo/db/pipeline/value.h" #include "mongo/db/pipeline/value_comparator.h" #include "mongo/db/repl/oplog_entry.h" +#include "mongo/db/repl/replication_coordinator_mock.h" #include "mongo/unittest/unittest.h" namespace mongo { @@ -65,9 +66,15 @@ static const Timestamp ts(100, 1); static const repl::OpTime optime(ts, 1); static const NamespaceString nss("unittests.change_notification"); +using ChangeNotificationStageTestNoSetup = AggregationContextFixture; + class ChangeNotificationStageTest : public AggregationContextFixture { public: - ChangeNotificationStageTest() : AggregationContextFixture(nss) {} + ChangeNotificationStageTest() : AggregationContextFixture(nss) { + repl::ReplicationCoordinator::set(getExpCtx()->opCtx->getServiceContext(), + stdx::make_unique<repl::ReplicationCoordinatorMock>( + getExpCtx()->opCtx->getServiceContext())); + } void checkTransformation(const OplogEntry& entry, const boost::optional<Document> expectedDoc) { const auto spec = fromjson("{$changeNotification: {}}"); @@ -106,19 +113,7 @@ TEST_F(ChangeNotificationStageTest, ShouldRejectUnrecognizedOption) { BSON(DSChangeNotification::kStageName << BSON("unexpected" << 4)).firstElement(), expCtx), UserException, - 40577); -} - -TEST_F(ChangeNotificationStageTest, ShouldRejectResumeAfterOption) { - // TODO SERVER-29131 change this test to accept the option. - auto expCtx = getExpCtx(); - - ASSERT_THROWS_CODE( - DSChangeNotification::createFromBson( - BSON(DSChangeNotification::kStageName << BSON("resumeAfter" << ts)).firstElement(), - expCtx), - UserException, - 40576); + 40415); } TEST_F(ChangeNotificationStageTest, ShouldRejectNonStringFullDocumentOption) { @@ -129,7 +124,7 @@ TEST_F(ChangeNotificationStageTest, ShouldRejectNonStringFullDocumentOption) { BSON(DSChangeNotification::kStageName << BSON("fullDocument" << true)).firstElement(), expCtx), UserException, - 40574); + ErrorCodes::TypeMismatch); } TEST_F(ChangeNotificationStageTest, ShouldRejectUnrecognizedFullDocumentOption) { @@ -144,6 +139,15 @@ TEST_F(ChangeNotificationStageTest, ShouldRejectUnrecognizedFullDocumentOption) 40575); } +TEST_F(ChangeNotificationStageTestNoSetup, FailsWithNoReplicationCoordinator) { + const auto spec = fromjson("{$changeNotification: {}}"); + + ASSERT_THROWS_CODE( + DocumentSourceChangeNotification::createFromBson(spec.firstElement(), getExpCtx()), + UserException, + 40573); +} + TEST_F(ChangeNotificationStageTest, StagesGeneratedCorrectly) { const auto spec = fromjson("{$changeNotification: {}}"); diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.cpp b/src/mongo/db/pipeline/document_source_check_resume_token.cpp new file mode 100644 index 00000000000..beaa8844dcb --- /dev/null +++ b/src/mongo/db/pipeline/document_source_check_resume_token.cpp @@ -0,0 +1,80 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/pipeline/document_source_check_resume_token.h" + +using boost::intrusive_ptr; +namespace mongo { +const char* DocumentSourceCheckResumeToken::getSourceName() const { + return "$_checkResumeToken"; +} + +Value DocumentSourceCheckResumeToken::serialize( + boost::optional<ExplainOptions::Verbosity> explain) const { + // This stage is created by the DocumentSourceChangeNotification stage, so serializing it here + // would result in it being created twice. + return Value(); +} + +intrusive_ptr<DocumentSourceCheckResumeToken> DocumentSourceCheckResumeToken::create( + const intrusive_ptr<ExpressionContext>& expCtx, DocumentSourceCheckResumeTokenSpec spec) { + return new DocumentSourceCheckResumeToken(expCtx, std::move(spec)); +} + +DocumentSourceCheckResumeToken::DocumentSourceCheckResumeToken( + const intrusive_ptr<ExpressionContext>& expCtx, DocumentSourceCheckResumeTokenSpec spec) + : DocumentSource(expCtx), _token(spec.getResumeToken()), _seenDoc(false) {} + +DocumentSource::GetNextResult DocumentSourceCheckResumeToken::getNext() { + pExpCtx->checkForInterrupt(); + + auto nextInput = pSource->getNext(); + uassert(40584, + "resume of change notification was not possible, as no change data was found. ", + _seenDoc || !nextInput.isEOF()); + + if (_seenDoc || !nextInput.isAdvanced()) + return nextInput; + + _seenDoc = true; + auto doc = nextInput.getDocument(); + + ResumeToken receivedToken(doc["_id"]); + uassert( + 40585, + str::stream() + << "resume of change notification was not possible, as the resume token was not found. " + << receivedToken.toDocument().toString(), + receivedToken == _token); + // Don't return the document which has the token; the user has already seen it. + return pSource->getNext(); +} + +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.h b/src/mongo/db/pipeline/document_source_check_resume_token.h new file mode 100644 index 00000000000..13706b937b9 --- /dev/null +++ b/src/mongo/db/pipeline/document_source_check_resume_token.h @@ -0,0 +1,79 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_sources_gen.h" +#include "mongo/db/pipeline/resume_token.h" + +namespace mongo { + +/** + * This stage is used internally for change notifications to ensure that the resume token is in the + * stream. It is not intended to be created by the user. + */ +class DocumentSourceCheckResumeToken final : public DocumentSource, + public SplittableDocumentSource { +public: + GetNextResult getNext() final; + const char* getSourceName() const final; + + /** + * SplittableDocumentSource methods; this has to run on the merger, since the resume point could + * be at any shard. + */ + boost::intrusive_ptr<DocumentSource> getShardSource() final { + return nullptr; + }; + boost::intrusive_ptr<DocumentSource> getMergeSource() final { + return this; + }; + + Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; + + static boost::intrusive_ptr<DocumentSourceCheckResumeToken> create( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + DocumentSourceCheckResumeTokenSpec spec); + + const ResumeToken& getTokenForTest() { + return _token; + } + +private: + /** + * Use the create static method to create a DocumentSourceCheckResumeToken. + */ + DocumentSourceCheckResumeToken(const boost::intrusive_ptr<ExpressionContext>& expCtx, + DocumentSourceCheckResumeTokenSpec spec); + + ResumeToken _token; + bool _seenDoc; +}; + +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp new file mode 100644 index 00000000000..a9f057ee353 --- /dev/null +++ b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp @@ -0,0 +1,191 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include <boost/intrusive_ptr.hpp> +#include <memory> + +#include "mongo/bson/bsonelement.h" +#include "mongo/bson/bsonobj.h" +#include "mongo/bson/bsonobjbuilder.h" +#include "mongo/db/pipeline/aggregation_context_fixture.h" +#include "mongo/db/pipeline/document_source_check_resume_token.h" +#include "mongo/db/pipeline/document_source_mock.h" +#include "mongo/db/pipeline/document_value_test_util.h" +#include "mongo/db/pipeline/expression_context.h" +#include "mongo/db/pipeline/resume_token.h" +#include "mongo/db/service_context.h" +#include "mongo/stdx/memory.h" +#include "mongo/unittest/death_test.h" +#include "mongo/unittest/unittest.h" + +using boost::intrusive_ptr; + +namespace mongo { +namespace { +static constexpr StringData kTestNs = "test.ns"_sd; + +class CheckResumeTokenTest : public AggregationContextFixture { +public: + CheckResumeTokenTest() : _mock(DocumentSourceMock::create()) {} + +protected: + /** + * Puts an arbitrary document with resume token corresponding to the given timestamp, id, and + * namespace in the mock queue. + */ + void addDocument(Timestamp ts, std::string id, StringData ns = kTestNs) { + _mock->queue.push_back( + Document({{"_id", Document({{"ts", ts}, {"ns", ns}, {"_id", id}})}})); + } + + void addPause() { + _mock->queue.push_back(DocumentSource::GetNextResult::makePauseExecution()); + } + + /** + * Convenience method to create the class under test with a given timestamp, id, and namespace. + */ + intrusive_ptr<DocumentSourceCheckResumeToken> createCheckResumeToken(Timestamp ts, + StringData id, + StringData ns = kTestNs) { + auto token = ResumeToken::parse(BSON("ts" << ts << "_id" << id << "ns" << ns)); + DocumentSourceCheckResumeTokenSpec spec; + spec.setResumeToken(token); + auto checkResumeToken = DocumentSourceCheckResumeToken::create(getExpCtx(), spec); + checkResumeToken->setSource(_mock.get()); + return checkResumeToken; + } + +private: + intrusive_ptr<DocumentSourceMock> _mock; +}; + +TEST_F(CheckResumeTokenTest, ShouldSucceedWithOnlyResumeToken) { + Timestamp resumeTimestamp(100, 1); + + auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "1"); + addDocument(resumeTimestamp, "1"); + // We should not see the resume token. + ASSERT_TRUE(checkResumeToken->getNext().isEOF()); +} + +TEST_F(CheckResumeTokenTest, ShouldSucceedWithPausesBeforeResumeToken) { + Timestamp resumeTimestamp(100, 1); + + auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "1"); + addPause(); + addDocument(resumeTimestamp, "1"); + + // We see the pause we inserted, but not the resume token. + ASSERT_TRUE(checkResumeToken->getNext().isPaused()); + ASSERT_TRUE(checkResumeToken->getNext().isEOF()); +} + +TEST_F(CheckResumeTokenTest, ShouldSucceedWithPausesAfterResumeToken) { + Timestamp resumeTimestamp(100, 1); + Timestamp doc1Timestamp(100, 2); + + auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "1"); + addDocument(resumeTimestamp, "1"); + addPause(); + addDocument(doc1Timestamp, "2"); + + // Pause added explicitly. + ASSERT_TRUE(checkResumeToken->getNext().isPaused()); + // The document after the resume token should be the first. + auto result1 = checkResumeToken->getNext(); + ASSERT_TRUE(result1.isAdvanced()); + auto& doc1 = result1.getDocument(); + ASSERT_VALUE_EQ(Value(doc1Timestamp), doc1["_id"].getDocument()["ts"]); + ASSERT_TRUE(checkResumeToken->getNext().isEOF()); +} + +TEST_F(CheckResumeTokenTest, ShouldSucceedWithMultipleDocumentsAfterResumeToken) { + Timestamp resumeTimestamp(100, 1); + + auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "0"); + addDocument(resumeTimestamp, "0"); + + Timestamp doc1Timestamp(100, 2); + Timestamp doc2Timestamp(101, 1); + addDocument(doc1Timestamp, "1"); + addDocument(doc2Timestamp, "2"); + + auto result1 = checkResumeToken->getNext(); + ASSERT_TRUE(result1.isAdvanced()); + auto& doc1 = result1.getDocument(); + ASSERT_VALUE_EQ(Value(doc1Timestamp), doc1["_id"].getDocument()["ts"]); + auto result2 = checkResumeToken->getNext(); + ASSERT_TRUE(result2.isAdvanced()); + auto& doc2 = result2.getDocument(); + ASSERT_VALUE_EQ(Value(doc2Timestamp), doc2["_id"].getDocument()["ts"]); + ASSERT_TRUE(checkResumeToken->getNext().isEOF()); +} + +TEST_F(CheckResumeTokenTest, ShouldFailIfFirstDocHasWrongResumeToken) { + Timestamp resumeTimestamp(100, 1); + + auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "1"); + + Timestamp doc1Timestamp(100, 2); + Timestamp doc2Timestamp(101, 1); + addDocument(doc1Timestamp, "1"); + addDocument(doc2Timestamp, "2"); + ASSERT_THROWS_CODE(checkResumeToken->getNext(), UserException, 40585); +} + +TEST_F(CheckResumeTokenTest, ShouldFailIfTokenHasWrongDocumentId) { + Timestamp resumeTimestamp(100, 1); + + auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "0"); + addDocument(resumeTimestamp, "1"); + ASSERT_THROWS_CODE(checkResumeToken->getNext(), UserException, 40585); +} + +TEST_F(CheckResumeTokenTest, ShouldFailIfTokenHasWrongNamespace) { + Timestamp resumeTimestamp(100, 1); + + auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "1", "test1.ns"); + addDocument(resumeTimestamp, "1", "test2.ns"); + ASSERT_THROWS_CODE(checkResumeToken->getNext(), UserException, 40585); +} + +/** + * We should _error_ on the no-document case, because that means the resume token was not found. + */ +TEST_F(CheckResumeTokenTest, ShouldFailWithNoDocuments) { + Timestamp resumeTimestamp(100, 1); + + auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "0"); + ASSERT_THROWS_CODE(checkResumeToken->getNext(), UserException, 40584); +} + +} // namespace +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_sources.idl b/src/mongo/db/pipeline/document_sources.idl new file mode 100644 index 00000000000..de67f25bb02 --- /dev/null +++ b/src/mongo/db/pipeline/document_sources.idl @@ -0,0 +1,103 @@ +# Copyright (C) 2017 MongoDB Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License, version 3, +# as published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# +# As a special exception, the copyright holders give permission to link the +# code of portions of this program with the OpenSSL library under certain +# conditions as described in each individual source file and distribute +# linked combinations including the program with the OpenSSL library. You +# must comply with the GNU Affero General Public License in all respects for +# all of the code used other than as permitted herein. If you modify file(s) +# with this exception, you may extend this exception to your version of the +# file(s), but you are not obligated to do so. If you do not wish to do so, +# delete this exception statement from your version. If you delete this +# exception statement from all source files in the program, then also delete +# it in the license file. + +# Document source pipeline stage IDL file + +global: + cpp_namespace: "mongo" + cpp_includes: + - "mongo/db/pipeline/resume_token.h" + +imports: + - "mongo/idl/basic_types.idl" + +types: + # A resume token could be parsed as a struct, but since we may make it opaque in the future, we + # parse it as a type with a custom class now. + resumeToken: + bson_serialization_type: object + description: An object representing a resume token for change notification + cpp_type: ResumeToken + serializer: ResumeToken::toBSON + deserializer: ResumeToken::parse + + # The _id element in a resume token can be any BSON element, so we need a custom type which + # leaves it as a BSONElement + resumeTokenOpaqueId: + bson_serialization_type: any + description: The document id contained within a resume token + cpp_type: Value + serializer: Value::serializeForIDL + deserializer: Value::deserializeForIDL + +structs: + DocumentSourceChangeNotificationSpec: + description: A document used to specify the $changeNotification stage of an aggregation + pipeline. + fields: + resumeAfter: + cpp_name: resumeAfter + type: resumeToken + optional: true + description: An object representing the point at which we should resume reporting + changes from. + fullDocument: + cpp_name: fullDocument + type: string + default: '"none"' + description: A string '"lookup"' or '"none"', indicating whether or not we should + return a full document or just changes for an update. + + + DocumentSourceCheckResumeTokenSpec: + description: A document used to specify the internal stage which checks the presence of the + resume token. + fields: + resumeToken: + cpp_name: resumeToken + type: resumeToken + description: The resume token which is required to be present in the pipeline. + + + ResumeTokenInternal: + description: The internal format of a resume token. For use by the ResumeToken class + only. + fields: + ts: + cpp_name: timestamp + type: timestamp + description: The timestamp of the oplog entry represented by this resume token. + + ns: + cpp_name: ns + type: string + description: The namespace of the oplog entry represented by this resume token. + + _id: + cpp_name: documentId + type: resumeTokenOpaqueId + description: The document key of the document in the oplog entry represented by this + resume token. diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp index 9909895feb3..91d04bdf2a2 100644 --- a/src/mongo/db/pipeline/pipeline_test.cpp +++ b/src/mongo/db/pipeline/pipeline_test.cpp @@ -48,6 +48,7 @@ #include "mongo/db/pipeline/pipeline.h" #include "mongo/db/query/collation/collator_interface_mock.h" #include "mongo/db/query/query_test_service_context.h" +#include "mongo/db/repl/replication_coordinator_mock.h" #include "mongo/dbtests/dbtests.h" namespace PipelineTests { @@ -58,6 +59,14 @@ using std::vector; const NamespaceString kTestNss = NamespaceString("a.collection"); +namespace { +void setMockReplicationCoordinatorOnOpCtx(OperationContext* opCtx) { + repl::ReplicationCoordinator::set( + opCtx->getServiceContext(), + stdx::make_unique<repl::ReplicationCoordinatorMock>(opCtx->getServiceContext())); +} +} // namespace + namespace Optimizations { using namespace mongo; @@ -974,6 +983,7 @@ TEST(PipelineOptimizationTest, ChangeNotificationLookupSwapsWithIndependentMatch intrusive_ptr<ExpressionContext> expCtx(new ExpressionContextForTest(kTestNss)); expCtx->opCtx = opCtx.get(); + setMockReplicationCoordinatorOnOpCtx(expCtx->opCtx); auto spec = BSON("$changeNotification" << BSON("fullDocument" << "lookup")); @@ -998,6 +1008,7 @@ TEST(PipelineOptimizationTest, ChangeNotificationLookupDoesNotSwapWithMatchOnPos intrusive_ptr<ExpressionContext> expCtx(new ExpressionContextForTest(kTestNss)); expCtx->opCtx = opCtx.get(); + setMockReplicationCoordinatorOnOpCtx(expCtx->opCtx); auto spec = BSON("$changeNotification" << BSON("fullDocument" << "lookup")); @@ -1472,6 +1483,7 @@ TEST_F(PipelineInitialSourceNSTest, AggregateOneNSValidForFacetPipelineRegardles TEST_F(PipelineInitialSourceNSTest, ChangeNotificationIsValidAsFirstStage) { const std::vector<BSONObj> rawPipeline = {fromjson("{$changeNotification: {}}")}; auto ctx = getExpCtx(); + setMockReplicationCoordinatorOnOpCtx(ctx->opCtx); ctx->ns = NamespaceString("a.collection"); ASSERT_OK(Pipeline::parse(rawPipeline, ctx).getStatus()); } @@ -1480,6 +1492,7 @@ TEST_F(PipelineInitialSourceNSTest, ChangeNotificationIsNotValidIfNotFirstStage) const std::vector<BSONObj> rawPipeline = {fromjson("{$match: {custom: 'filter'}}"), fromjson("{$changeNotification: {}}")}; auto ctx = getExpCtx(); + setMockReplicationCoordinatorOnOpCtx(ctx->opCtx); ctx->ns = NamespaceString("a.collection"); auto parseStatus = Pipeline::parse(rawPipeline, ctx).getStatus(); ASSERT_EQ(parseStatus, ErrorCodes::BadValue); @@ -1490,6 +1503,7 @@ TEST_F(PipelineInitialSourceNSTest, ChangeNotificationIsNotValidIfNotFirstStageI const std::vector<BSONObj> rawPipeline = {fromjson("{$match: {custom: 'filter'}}"), fromjson("{$changeNotification: {}}")}; auto ctx = getExpCtx(); + setMockReplicationCoordinatorOnOpCtx(ctx->opCtx); ctx->ns = NamespaceString("a.collection"); auto parseStatus = Pipeline::parseFacetPipeline(rawPipeline, ctx).getStatus(); ASSERT_EQ(parseStatus, ErrorCodes::BadValue); diff --git a/src/mongo/db/pipeline/resume_token.cpp b/src/mongo/db/pipeline/resume_token.cpp new file mode 100644 index 00000000000..5b409f1f113 --- /dev/null +++ b/src/mongo/db/pipeline/resume_token.cpp @@ -0,0 +1,79 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/pipeline/resume_token.h" + +#include "mongo/bson/bsonmisc.h" +#include "mongo/bson/bsonobjbuilder.h" +#include "mongo/db/pipeline/document.h" +#include "mongo/db/pipeline/document_sources_gen.h" +#include "mongo/db/pipeline/value_comparator.h" + +namespace mongo { + +ResumeToken::ResumeToken(const BSONObj& resumeBson) { + auto token = ResumeTokenInternal::parse( + IDLParserErrorContext("$changeNotification.resumeAfter"), resumeBson); + _timestamp = token.getTimestamp(); + _namespace = token.getNs().toString(); + _documentId = token.getDocumentId(); +} + +ResumeToken::ResumeToken(const Value& resumeValue) { + Document resumeTokenDoc = resumeValue.getDocument(); + Value timestamp = resumeTokenDoc[ResumeTokenInternal::kTimestampFieldName]; + _timestamp = timestamp.getTimestamp(); + Value ns = resumeTokenDoc[ResumeTokenInternal::kNsFieldName]; + _namespace = ns.getString(); + _documentId = resumeTokenDoc[ResumeTokenInternal::kDocumentIdFieldName]; +} + +bool ResumeToken::operator==(const ResumeToken& other) { + return _timestamp == other._timestamp && _namespace == other._namespace && + ValueComparator::kInstance.evaluate(_documentId == other._documentId); +} + +Document ResumeToken::toDocument() const { + return Document({{ResumeTokenInternal::kTimestampFieldName, _timestamp}, + {{ResumeTokenInternal::kNsFieldName}, _namespace}, + {{ResumeTokenInternal::kDocumentIdFieldName}, _documentId}}); +} + +BSONObj ResumeToken::toBSON() const { + return BSON( + ResumeTokenInternal::kTimestampFieldName << _timestamp << ResumeTokenInternal::kNsFieldName + << _namespace + << ResumeTokenInternal::kDocumentIdFieldName + << _documentId); +} + +ResumeToken ResumeToken::parse(const BSONObj& resumeBson) { + return ResumeToken(resumeBson); +} + +} // namespace mongo diff --git a/src/mongo/db/pipeline/resume_token.h b/src/mongo/db/pipeline/resume_token.h new file mode 100644 index 00000000000..733e285e599 --- /dev/null +++ b/src/mongo/db/pipeline/resume_token.h @@ -0,0 +1,75 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include "mongo/base/string_data.h" +#include "mongo/bson/bsonobj.h" +#include "mongo/bson/timestamp.h" +#include "mongo/db/pipeline/value.h" + +namespace mongo { +/** + * A token passed in by the user to indicate where in the oplog we should start for + * $changeNotification. + */ +class ResumeToken { +public: + /** + * The default no-argument constructor is required by the IDL for types used as non-optional + * fields. + */ + ResumeToken() = default; + explicit ResumeToken(const Value& resumeValue); + bool operator==(const ResumeToken&); + + Timestamp getTimestamp() const { + return _timestamp; + } + + Document toDocument() const; + + BSONObj toBSON() const; + + /** + * Parse a resume token from a BSON object; used as an interface to the IDL parser. + */ + static ResumeToken parse(const BSONObj& obj); + +private: + /** + * Construct from a BSON object. + * External callers should use the static ResumeToken::parse(const BSONObj&) method instead. + */ + explicit ResumeToken(const BSONObj& resumeBson); + + Timestamp _timestamp; + std::string _namespace; + Value _documentId; +}; +} // namespace mongo diff --git a/src/mongo/db/pipeline/value.cpp b/src/mongo/db/pipeline/value.cpp index d70c6b65cc9..a7bce3fc777 100644 --- a/src/mongo/db/pipeline/value.cpp +++ b/src/mongo/db/pipeline/value.cpp @@ -1323,4 +1323,17 @@ Value Value::deserializeForSorter(BufReader& buf, const SorterDeserializeSetting } verify(false); } + +void Value::serializeForIDL(StringData fieldName, BSONObjBuilder* builder) const { + addToBsonObj(builder, fieldName); +} + +void Value::serializeForIDL(BSONArrayBuilder* builder) const { + addToBsonArray(builder); } + +Value Value::deserializeForIDL(const BSONElement& element) { + return Value(element); +} + +} // namespace mongo diff --git a/src/mongo/db/pipeline/value.h b/src/mongo/db/pipeline/value.h index 04e396dc58d..40c05d89b9a 100644 --- a/src/mongo/db/pipeline/value.h +++ b/src/mongo/db/pipeline/value.h @@ -328,6 +328,11 @@ public: return *this; } + /// Members to support parsing/deserialization from IDL generated code. + void serializeForIDL(StringData fieldName, BSONObjBuilder* builder) const; + void serializeForIDL(BSONArrayBuilder* builder) const; + static Value deserializeForIDL(const BSONElement& element); + private: /** This is a "honeypot" to prevent unexpected implicit conversions to the accepted argument * types. bool is especially bad since without this it will accept any pointer. |