diff options
author | Siyuan Zhou <siyuan.zhou@mongodb.com> | 2017-07-18 19:07:31 -0400 |
---|---|---|
committer | Siyuan Zhou <siyuan.zhou@mongodb.com> | 2017-08-03 18:26:21 -0400 |
commit | a73070d49470cd0ff1f7d2f5189b59d231e5c9ff (patch) | |
tree | 44cd4b32e3c017f28c657782fc6b422380836619 | |
parent | 225c4d1a70b7756ee77fd404aab131bf1f1c9fce (diff) | |
download | mongo-a73070d49470cd0ff1f7d2f5189b59d231e5c9ff.tar.gz |
SERVER-29140 Close cursor for invalidate change notification entries.
7 files changed, 286 insertions, 28 deletions
diff --git a/jstests/aggregation/sources/changeNotification/change_notification_invalidation.js b/jstests/aggregation/sources/changeNotification/change_notification_invalidation.js new file mode 100644 index 00000000000..74b313d1e85 --- /dev/null +++ b/jstests/aggregation/sources/changeNotification/change_notification_invalidation.js @@ -0,0 +1,78 @@ +// Tests of $changeNotification invalidate entries. + +(function() { + "use strict"; + + // Strip the oplog fields we aren't testing. + const oplogProjection = {$project: {"_id.ts": 0}}; + + // Helpers for testing that pipeline returns correct set of results. Run startWatchingChanges + // with the pipeline, then insert the changes, then run assertNextBatchMatches with the result + // of startWatchingChanges and the expected set of results. + function startWatchingChanges(pipeline, collection) { + // Strip the oplog fields we aren't testing. + pipeline.push(oplogProjection); + // Waiting for replication assures no previous operations will be included. + replTest.awaitReplication(); + let res = assert.commandWorked( + db.runCommand({aggregate: collection.getName(), "pipeline": pipeline, cursor: {}})); + assert.neq(res.cursor.id, 0); + return res.cursor; + } + + const replTest = new ReplSetTest({name: 'changeNotificationTest', nodes: 1}); + const nodes = replTest.startSet(); + replTest.initiate(); + replTest.awaitReplication(); + + db = replTest.getPrimary().getDB('test'); + db.getMongo().forceReadMode('commands'); + + // Write a document to the collection and test that the change stream returns it + // and getMore command closes the cursor afterwards. + const collGetMore = db.change_stream_getmore_invalidations; + assert.writeOK(collGetMore.insert({_id: 0, a: 1})); + replTest.awaitReplication(); + // We awaited the replicaiton of the first write, so the change stream shouldn't return it. + let aggcursor = startWatchingChanges([{$changeNotification: {}}], collGetMore); + assert.neq(aggcursor.id, 0); + assert.eq(aggcursor.firstBatch.length, 0); + + // Drop the collection and test that we return "invalidate" entry and close the cursor. + jsTestLog("Testing getMore command closes cursor for invalidate entries"); + collGetMore.drop(); + let res = assert.commandWorked( + db.runCommand({getMore: aggcursor.id, collection: collGetMore.getName()})); + aggcursor = res.cursor; + assert.eq(aggcursor.id, 0, "expected invalidation to cause the cursor to be closed"); + assert.eq(aggcursor.nextBatch.length, 1); + assert.docEq(aggcursor.nextBatch[0], + {_id: {ns: "test.$cmd"}, fullDocument: null, operationType: "invalidate"}); + + jsTestLog("Testing aggregate command closes cursor for invalidate entries"); + const collAgg = db.change_stream_agg_invalidations; + // Get a valid resume token that the next aggregate command can use. + assert.writeOK(collAgg.insert({_id: 1})); + replTest.awaitReplication(); + res = assert.commandWorked(db.runCommand( + {aggregate: collAgg.getName(), "pipeline": [{$changeNotification: {}}], cursor: {}})); + aggcursor = res.cursor; + assert.neq(aggcursor.id, 0); + assert.eq(aggcursor.firstBatch.length, 1); + const resumeToken = aggcursor.firstBatch[0]._id; + + assert(collAgg.drop()); + replTest.awaitReplication(); + res = assert.commandWorked(db.runCommand({ + aggregate: collAgg.getName(), + pipeline: [{$changeNotification: {resumeAfter: resumeToken}}, oplogProjection], + cursor: {} + })); + aggcursor = res.cursor; + assert.eq(aggcursor.id, 0, "expected invalidation to cause the cursor to be closed"); + assert.eq(aggcursor.firstBatch.length, 1); + assert.docEq(aggcursor.firstBatch, + [{_id: {ns: "test.$cmd"}, fullDocument: null, operationType: "invalidate"}]); + + replTest.stopSet(); +}()); diff --git a/src/mongo/base/error_codes.err b/src/mongo/base/error_codes.err index fa980506671..0306def5da5 100644 --- a/src/mongo/base/error_codes.err +++ b/src/mongo/base/error_codes.err @@ -218,6 +218,7 @@ error_code("IncompleteTransactionHistory", 217); error_code("UpdateOperationFailed", 218) error_code("FTDCPathNotSet", 219) error_code("FTDCPathAlreadySet", 220) +error_code("CloseChangeStream", 221) # Error codes 4000-8999 are reserved. diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp index 277d8f2e660..77afd559cb6 100644 --- a/src/mongo/db/commands/getmore_cmd.cpp +++ b/src/mongo/db/commands/getmore_cmd.cpp @@ -43,6 +43,7 @@ #include "mongo/db/cursor_manager.h" #include "mongo/db/db_raii.h" #include "mongo/db/exec/working_set_common.h" +#include "mongo/db/pipeline/close_change_stream_exception.h" #include "mongo/db/query/cursor_response.h" #include "mongo/db/query/find.h" #include "mongo/db/query/find_common.h" @@ -446,6 +447,10 @@ public: nextBatch->append(obj); (*numResults)++; } + } catch (const CloseChangeStreamException& ex) { + // FAILURE state will make getMore command close the cursor even if it's tailable. + *state = PlanExecutor::FAILURE; + return Status::OK(); } catch (const UserException& except) { if (isAwaitData && except.getCode() == ErrorCodes::ExceededTimeLimit) { // We ignore exceptions from interrupt points due to max time expiry for diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp index 6d20c1b9684..cc353243763 100644 --- a/src/mongo/db/commands/run_aggregate.cpp +++ b/src/mongo/db/commands/run_aggregate.cpp @@ -43,6 +43,7 @@ #include "mongo/db/exec/working_set_common.h" #include "mongo/db/namespace_string.h" #include "mongo/db/pipeline/accumulator.h" +#include "mongo/db/pipeline/close_change_stream_exception.h" #include "mongo/db/pipeline/document.h" #include "mongo/db/pipeline/document_source.h" #include "mongo/db/pipeline/expression.h" @@ -99,7 +100,18 @@ bool handleCursorCommand(OperationContext* opCtx, // The initial getNext() on a PipelineProxyStage may be very expensive so we don't // do it when batchSize is 0 since that indicates a desire for a fast return. PlanExecutor::ExecState state; - if ((state = cursor->getExecutor()->getNext(&next, nullptr)) == PlanExecutor::IS_EOF) { + + try { + state = cursor->getExecutor()->getNext(&next, nullptr); + } catch (const CloseChangeStreamException& ex) { + // This exception is thrown when a $changeNotification stage encounters an event + // that invalidates the cursor. We should close the cursor and return without + // error. + cursor = nullptr; + break; + } + + if (state == PlanExecutor::IS_EOF) { if (!cursor->isTailable()) { // make it an obvious error to use cursor or executor after this point cursor = nullptr; diff --git a/src/mongo/db/pipeline/close_change_stream_exception.h b/src/mongo/db/pipeline/close_change_stream_exception.h new file mode 100644 index 00000000000..862d9bbf67a --- /dev/null +++ b/src/mongo/db/pipeline/close_change_stream_exception.h @@ -0,0 +1,46 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include "mongo/base/error_codes.h" +#include "mongo/util/assert_util.h" + +namespace mongo { + +/** + * An exception used to signal to the aggregate and getMore commands that a change stream has been + * invalidated, and the cursor should not be left open. + */ +class CloseChangeStreamException : public DBException { +public: + CloseChangeStreamException() + : DBException("CloseChangeStream", ErrorCodes::CloseChangeStream) {} +}; + +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_change_notification.cpp b/src/mongo/db/pipeline/document_source_change_notification.cpp index c463c3574c9..4b6df241a10 100644 --- a/src/mongo/db/pipeline/document_source_change_notification.cpp +++ b/src/mongo/db/pipeline/document_source_change_notification.cpp @@ -31,6 +31,7 @@ #include "mongo/db/pipeline/document_source_change_notification.h" #include "mongo/bson/simple_bsonelement_comparator.h" +#include "mongo/db/pipeline/close_change_stream_exception.h" #include "mongo/db/pipeline/document_source_check_resume_token.h" #include "mongo/db/pipeline/document_source_limit.h" #include "mongo/db/pipeline/document_source_lookup_change_post_image.h" @@ -116,6 +117,77 @@ private: DocumentSourceOplogMatch(BSONObj filter, const intrusive_ptr<ExpressionContext>& expCtx) : DocumentSourceMatch(std::move(filter), expCtx) {} }; + +void checkValueType(const Value v, const StringData filedName, BSONType expectedType) { + uassert(40532, + str::stream() << "Entry field \"" << filedName << "\" should be " + << typeName(expectedType) + << ", found: " + << typeName(v.getType()), + (v.getType() == expectedType)); +} + +/** + * This stage is used internally for change notifications to close cursor after returning + * "invalidate" entries. + * It is not intended to be created by the user. + */ +class DocumentSourceCloseCursor final : public DocumentSource { +public: + GetNextResult getNext() final; + + const char* getSourceName() const final { + // This is used in error reporting. + return "$changeNotification"; + } + + Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final { + // This stage is created by the DocumentSourceChangeNotification stage, so serializing it + // here would result in it being created twice. + return Value(); + } + + static boost::intrusive_ptr<DocumentSourceCloseCursor> create( + const boost::intrusive_ptr<ExpressionContext>& expCtx) { + return new DocumentSourceCloseCursor(expCtx); + } + +private: + /** + * Use the create static method to create a DocumentSourceCheckResumeToken. + */ + DocumentSourceCloseCursor(const boost::intrusive_ptr<ExpressionContext>& expCtx) + : DocumentSource(expCtx) {} + + bool _shouldCloseCursor = false; +}; + +DocumentSource::GetNextResult DocumentSourceCloseCursor::getNext() { + pExpCtx->checkForInterrupt(); + + // Close cursor if we have returned an invalidate entry. + if (_shouldCloseCursor) { + throw CloseChangeStreamException(); + } + + auto nextInput = pSource->getNext(); + if (!nextInput.isAdvanced()) + return nextInput; + + auto doc = nextInput.getDocument(); + const auto& kOperationTypeField = DocumentSourceChangeNotification::kOperationTypeField; + checkValueType(doc[kOperationTypeField], kOperationTypeField, BSONType::String); + if (doc[kOperationTypeField].getString() == + DocumentSourceChangeNotification::kInvalidateOpType) { + // Pass the invalidation forward, so that it can be included in the results, or + // filtered/transformed by further stages in the pipeline, then throw an exception + // to close the cursor on the next call to getNext(). + _shouldCloseCursor = true; + } + + return nextInput; +} + } // namespace BSONObj DocumentSourceChangeNotification::buildMatchFilter(const NamespaceString& nss, @@ -193,6 +265,8 @@ list<intrusive_ptr<DocumentSource>> DocumentSourceChangeNotification::createFrom if (resumeStage) { stages.push_back(resumeStage); } + auto closeCursorSource = DocumentSourceCloseCursor::create(expCtx); + stages.push_back(closeCursorSource); if (shouldLookupPostImage) { stages.push_back(DocumentSourceLookupChangePostImage::create(expCtx)); } @@ -205,17 +279,6 @@ intrusive_ptr<DocumentSource> DocumentSourceChangeNotification::createTransforma expCtx, stdx::make_unique<Transformation>(changeNotificationSpec), kStageName.toString())); } -namespace { -void checkValueType(Value v, StringData filedName, BSONType expectedType) { - uassert(40532, - str::stream() << "Oplog entry field \"" << filedName << "\" should be " - << typeName(expectedType) - << ", found: " - << typeName(v.getType()), - (v.getType() == expectedType)); -} -} // namespace - Document DocumentSourceChangeNotification::Transformation::applyTransformation( const Document& input) { diff --git a/src/mongo/db/pipeline/document_source_change_notification_test.cpp b/src/mongo/db/pipeline/document_source_change_notification_test.cpp index ffc9460c96d..c0b73d5e691 100644 --- a/src/mongo/db/pipeline/document_source_change_notification_test.cpp +++ b/src/mongo/db/pipeline/document_source_change_notification_test.cpp @@ -34,6 +34,7 @@ #include "mongo/bson/bsonobj.h" #include "mongo/bson/json.h" #include "mongo/db/pipeline/aggregation_context_fixture.h" +#include "mongo/db/pipeline/close_change_stream_exception.h" #include "mongo/db/pipeline/document.h" #include "mongo/db/pipeline/document_source_change_notification.h" #include "mongo/db/pipeline/document_source_limit.h" @@ -77,27 +78,46 @@ public: } void checkTransformation(const OplogEntry& entry, const boost::optional<Document> expectedDoc) { + vector<intrusive_ptr<DocumentSource>> stages = makeStages(entry); + auto transform = stages[2].get(); + + auto next = transform->getNext(); + // Match stage should pass the doc down if expectedDoc is given. + ASSERT_EQ(next.isAdvanced(), static_cast<bool>(expectedDoc)); + if (expectedDoc) { + ASSERT_DOCUMENT_EQ(next.releaseDocument(), *expectedDoc); + } + } + + /** + * Returns a list of stages expanded from a $changeNotification specification, starting with a + * DocumentSourceMock which contains a single document representing 'entry'. + */ + vector<intrusive_ptr<DocumentSource>> makeStages(const OplogEntry& entry) { const auto spec = fromjson("{$changeNotification: {}}"); list<intrusive_ptr<DocumentSource>> result = DSChangeNotification::createFromBson(spec.firstElement(), getExpCtx()); + vector<intrusive_ptr<DocumentSource>> stages(std::begin(result), std::end(result)); - auto match = dynamic_cast<DocumentSourceMatch*>(result.front().get()); + auto match = dynamic_cast<DocumentSourceMatch*>(stages[0].get()); ASSERT(match); auto mock = DocumentSourceMock::create(D(entry.toBSON())); match->setSource(mock.get()); // Check the oplog entry is transformed correctly. - auto transform = result.back().get(); + auto transform = stages[1].get(); ASSERT(transform); ASSERT_EQ(string(transform->getSourceName()), DSChangeNotification::kStageName); transform->setSource(match); - auto next = transform->getNext(); - // Match stage should pass the doc down if expectedDoc is given. - ASSERT_EQ(next.isAdvanced(), static_cast<bool>(expectedDoc)); - if (expectedDoc) { - ASSERT_DOCUMENT_EQ(next.releaseDocument(), *expectedDoc); - } + auto closeCursor = stages.back().get(); + ASSERT(closeCursor); + closeCursor->setSource(transform); + + // Include the mock stage in the "stages" so it won't get destroyed outside the function + // scope. + stages.insert(stages.begin(), mock); + return stages; } OplogEntry createCommand(const BSONObj& oField) { @@ -153,11 +173,12 @@ TEST_F(ChangeNotificationStageTest, StagesGeneratedCorrectly) { list<intrusive_ptr<DocumentSource>> result = DSChangeNotification::createFromBson(spec.firstElement(), getExpCtx()); - - ASSERT_EQUALS(result.size(), 2UL); - ASSERT_TRUE(dynamic_cast<DocumentSourceMatch*>(result.front().get())); - ASSERT_EQUALS(string(result.front()->getSourceName()), DSChangeNotification::kStageName); - ASSERT_EQUALS(string(result.back()->getSourceName()), DSChangeNotification::kStageName); + vector<intrusive_ptr<DocumentSource>> stages(std::begin(result), std::end(result)); + ASSERT_EQUALS(stages.size(), 3UL); + ASSERT_TRUE(dynamic_cast<DocumentSourceMatch*>(stages.front().get())); + ASSERT_EQUALS(string(stages[0]->getSourceName()), DSChangeNotification::kStageName); + ASSERT_EQUALS(string(stages[1]->getSourceName()), DSChangeNotification::kStageName); + ASSERT_EQUALS(string(stages[2]->getSourceName()), DSChangeNotification::kStageName); // TODO: Check explain result. } @@ -295,9 +316,10 @@ TEST_F(ChangeNotificationStageTest, TransformationShouldBeAbleToReParseSerialize auto expCtx = getExpCtx(); auto originalSpec = BSON(DSChangeNotification::kStageName << BSONObj()); - auto allStages = DSChangeNotification::createFromBson(originalSpec.firstElement(), expCtx); - ASSERT_EQ(allStages.size(), 2UL); - auto stage = allStages.back(); + auto result = DSChangeNotification::createFromBson(originalSpec.firstElement(), expCtx); + vector<intrusive_ptr<DocumentSource>> allStages(std::begin(result), std::end(result)); + ASSERT_EQ(allStages.size(), 3UL); + auto stage = allStages[1]; ASSERT(dynamic_cast<DocumentSourceSingleDocumentTransformation*>(stage.get())); // @@ -324,5 +346,36 @@ TEST_F(ChangeNotificationStageTest, TransformationShouldBeAbleToReParseSerialize ASSERT_VALUE_EQ(newSerialization[0], serialization[0]); } +TEST_F(ChangeNotificationStageTest, CloseCursorOnInvalidateEntries) { + OplogEntry dropColl = createCommand(BSON("drop" << nss.coll())); + auto stages = makeStages(dropColl); + auto closeCursor = stages.back(); + + Document expectedInvalidate{ + {DSChangeNotification::kIdField, D{{"ts", ts}, {"ns", nss.getCommandNS().ns()}}}, + {DSChangeNotification::kOperationTypeField, DSChangeNotification::kInvalidateOpType}, + {DSChangeNotification::kFullDocumentField, BSONNULL}, + }; + + auto next = closeCursor->getNext(); + // Transform into invalidate entry. + ASSERT_DOCUMENT_EQ(next.releaseDocument(), expectedInvalidate); + // Then throw an exception on the next call of getNext(). + ASSERT_THROWS_CODE( + closeCursor->getNext(), CloseChangeStreamException, ErrorCodes::CloseChangeStream); +} + +TEST_F(ChangeNotificationStageTest, CloseCursorEvenIfInvalidateEntriesGetFilteredOut) { + OplogEntry dropColl = createCommand(BSON("drop" << nss.coll())); + auto stages = makeStages(dropColl); + auto closeCursor = stages.back(); + // Add a match stage after change stream to filter out the invalidate entries. + auto match = DocumentSourceMatch::create(fromjson("{operationType: 'insert'}"), getExpCtx()); + match->setSource(closeCursor.get()); + + // Throw an exception on the call of getNext(). + ASSERT_THROWS_CODE(match->getNext(), CloseChangeStreamException, ErrorCodes::CloseChangeStream); +} + } // namespace } // namespace mongo |