summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSiyuan Zhou <siyuan.zhou@mongodb.com>2017-07-18 19:07:31 -0400
committerSiyuan Zhou <siyuan.zhou@mongodb.com>2017-08-03 18:26:21 -0400
commita73070d49470cd0ff1f7d2f5189b59d231e5c9ff (patch)
tree44cd4b32e3c017f28c657782fc6b422380836619
parent225c4d1a70b7756ee77fd404aab131bf1f1c9fce (diff)
downloadmongo-a73070d49470cd0ff1f7d2f5189b59d231e5c9ff.tar.gz
SERVER-29140 Close cursor for invalidate change notification entries.
-rw-r--r--jstests/aggregation/sources/changeNotification/change_notification_invalidation.js78
-rw-r--r--src/mongo/base/error_codes.err1
-rw-r--r--src/mongo/db/commands/getmore_cmd.cpp5
-rw-r--r--src/mongo/db/commands/run_aggregate.cpp14
-rw-r--r--src/mongo/db/pipeline/close_change_stream_exception.h46
-rw-r--r--src/mongo/db/pipeline/document_source_change_notification.cpp85
-rw-r--r--src/mongo/db/pipeline/document_source_change_notification_test.cpp85
7 files changed, 286 insertions, 28 deletions
diff --git a/jstests/aggregation/sources/changeNotification/change_notification_invalidation.js b/jstests/aggregation/sources/changeNotification/change_notification_invalidation.js
new file mode 100644
index 00000000000..74b313d1e85
--- /dev/null
+++ b/jstests/aggregation/sources/changeNotification/change_notification_invalidation.js
@@ -0,0 +1,78 @@
+// Tests of $changeNotification invalidate entries.
+
+(function() {
+ "use strict";
+
+ // Strip the oplog fields we aren't testing.
+ const oplogProjection = {$project: {"_id.ts": 0}};
+
+ // Helpers for testing that pipeline returns correct set of results. Run startWatchingChanges
+ // with the pipeline, then insert the changes, then run assertNextBatchMatches with the result
+ // of startWatchingChanges and the expected set of results.
+ function startWatchingChanges(pipeline, collection) {
+ // Strip the oplog fields we aren't testing.
+ pipeline.push(oplogProjection);
+ // Waiting for replication assures no previous operations will be included.
+ replTest.awaitReplication();
+ let res = assert.commandWorked(
+ db.runCommand({aggregate: collection.getName(), "pipeline": pipeline, cursor: {}}));
+ assert.neq(res.cursor.id, 0);
+ return res.cursor;
+ }
+
+ const replTest = new ReplSetTest({name: 'changeNotificationTest', nodes: 1});
+ const nodes = replTest.startSet();
+ replTest.initiate();
+ replTest.awaitReplication();
+
+ db = replTest.getPrimary().getDB('test');
+ db.getMongo().forceReadMode('commands');
+
+ // Write a document to the collection and test that the change stream returns it
+ // and getMore command closes the cursor afterwards.
+ const collGetMore = db.change_stream_getmore_invalidations;
+ assert.writeOK(collGetMore.insert({_id: 0, a: 1}));
+ replTest.awaitReplication();
+ // We awaited the replicaiton of the first write, so the change stream shouldn't return it.
+ let aggcursor = startWatchingChanges([{$changeNotification: {}}], collGetMore);
+ assert.neq(aggcursor.id, 0);
+ assert.eq(aggcursor.firstBatch.length, 0);
+
+ // Drop the collection and test that we return "invalidate" entry and close the cursor.
+ jsTestLog("Testing getMore command closes cursor for invalidate entries");
+ collGetMore.drop();
+ let res = assert.commandWorked(
+ db.runCommand({getMore: aggcursor.id, collection: collGetMore.getName()}));
+ aggcursor = res.cursor;
+ assert.eq(aggcursor.id, 0, "expected invalidation to cause the cursor to be closed");
+ assert.eq(aggcursor.nextBatch.length, 1);
+ assert.docEq(aggcursor.nextBatch[0],
+ {_id: {ns: "test.$cmd"}, fullDocument: null, operationType: "invalidate"});
+
+ jsTestLog("Testing aggregate command closes cursor for invalidate entries");
+ const collAgg = db.change_stream_agg_invalidations;
+ // Get a valid resume token that the next aggregate command can use.
+ assert.writeOK(collAgg.insert({_id: 1}));
+ replTest.awaitReplication();
+ res = assert.commandWorked(db.runCommand(
+ {aggregate: collAgg.getName(), "pipeline": [{$changeNotification: {}}], cursor: {}}));
+ aggcursor = res.cursor;
+ assert.neq(aggcursor.id, 0);
+ assert.eq(aggcursor.firstBatch.length, 1);
+ const resumeToken = aggcursor.firstBatch[0]._id;
+
+ assert(collAgg.drop());
+ replTest.awaitReplication();
+ res = assert.commandWorked(db.runCommand({
+ aggregate: collAgg.getName(),
+ pipeline: [{$changeNotification: {resumeAfter: resumeToken}}, oplogProjection],
+ cursor: {}
+ }));
+ aggcursor = res.cursor;
+ assert.eq(aggcursor.id, 0, "expected invalidation to cause the cursor to be closed");
+ assert.eq(aggcursor.firstBatch.length, 1);
+ assert.docEq(aggcursor.firstBatch,
+ [{_id: {ns: "test.$cmd"}, fullDocument: null, operationType: "invalidate"}]);
+
+ replTest.stopSet();
+}());
diff --git a/src/mongo/base/error_codes.err b/src/mongo/base/error_codes.err
index fa980506671..0306def5da5 100644
--- a/src/mongo/base/error_codes.err
+++ b/src/mongo/base/error_codes.err
@@ -218,6 +218,7 @@ error_code("IncompleteTransactionHistory", 217);
error_code("UpdateOperationFailed", 218)
error_code("FTDCPathNotSet", 219)
error_code("FTDCPathAlreadySet", 220)
+error_code("CloseChangeStream", 221)
# Error codes 4000-8999 are reserved.
diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp
index 277d8f2e660..77afd559cb6 100644
--- a/src/mongo/db/commands/getmore_cmd.cpp
+++ b/src/mongo/db/commands/getmore_cmd.cpp
@@ -43,6 +43,7 @@
#include "mongo/db/cursor_manager.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/exec/working_set_common.h"
+#include "mongo/db/pipeline/close_change_stream_exception.h"
#include "mongo/db/query/cursor_response.h"
#include "mongo/db/query/find.h"
#include "mongo/db/query/find_common.h"
@@ -446,6 +447,10 @@ public:
nextBatch->append(obj);
(*numResults)++;
}
+ } catch (const CloseChangeStreamException& ex) {
+ // FAILURE state will make getMore command close the cursor even if it's tailable.
+ *state = PlanExecutor::FAILURE;
+ return Status::OK();
} catch (const UserException& except) {
if (isAwaitData && except.getCode() == ErrorCodes::ExceededTimeLimit) {
// We ignore exceptions from interrupt points due to max time expiry for
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp
index 6d20c1b9684..cc353243763 100644
--- a/src/mongo/db/commands/run_aggregate.cpp
+++ b/src/mongo/db/commands/run_aggregate.cpp
@@ -43,6 +43,7 @@
#include "mongo/db/exec/working_set_common.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/pipeline/accumulator.h"
+#include "mongo/db/pipeline/close_change_stream_exception.h"
#include "mongo/db/pipeline/document.h"
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/expression.h"
@@ -99,7 +100,18 @@ bool handleCursorCommand(OperationContext* opCtx,
// The initial getNext() on a PipelineProxyStage may be very expensive so we don't
// do it when batchSize is 0 since that indicates a desire for a fast return.
PlanExecutor::ExecState state;
- if ((state = cursor->getExecutor()->getNext(&next, nullptr)) == PlanExecutor::IS_EOF) {
+
+ try {
+ state = cursor->getExecutor()->getNext(&next, nullptr);
+ } catch (const CloseChangeStreamException& ex) {
+ // This exception is thrown when a $changeNotification stage encounters an event
+ // that invalidates the cursor. We should close the cursor and return without
+ // error.
+ cursor = nullptr;
+ break;
+ }
+
+ if (state == PlanExecutor::IS_EOF) {
if (!cursor->isTailable()) {
// make it an obvious error to use cursor or executor after this point
cursor = nullptr;
diff --git a/src/mongo/db/pipeline/close_change_stream_exception.h b/src/mongo/db/pipeline/close_change_stream_exception.h
new file mode 100644
index 00000000000..862d9bbf67a
--- /dev/null
+++ b/src/mongo/db/pipeline/close_change_stream_exception.h
@@ -0,0 +1,46 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/base/error_codes.h"
+#include "mongo/util/assert_util.h"
+
+namespace mongo {
+
+/**
+ * An exception used to signal to the aggregate and getMore commands that a change stream has been
+ * invalidated, and the cursor should not be left open.
+ */
+class CloseChangeStreamException : public DBException {
+public:
+ CloseChangeStreamException()
+ : DBException("CloseChangeStream", ErrorCodes::CloseChangeStream) {}
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_change_notification.cpp b/src/mongo/db/pipeline/document_source_change_notification.cpp
index c463c3574c9..4b6df241a10 100644
--- a/src/mongo/db/pipeline/document_source_change_notification.cpp
+++ b/src/mongo/db/pipeline/document_source_change_notification.cpp
@@ -31,6 +31,7 @@
#include "mongo/db/pipeline/document_source_change_notification.h"
#include "mongo/bson/simple_bsonelement_comparator.h"
+#include "mongo/db/pipeline/close_change_stream_exception.h"
#include "mongo/db/pipeline/document_source_check_resume_token.h"
#include "mongo/db/pipeline/document_source_limit.h"
#include "mongo/db/pipeline/document_source_lookup_change_post_image.h"
@@ -116,6 +117,77 @@ private:
DocumentSourceOplogMatch(BSONObj filter, const intrusive_ptr<ExpressionContext>& expCtx)
: DocumentSourceMatch(std::move(filter), expCtx) {}
};
+
+void checkValueType(const Value v, const StringData filedName, BSONType expectedType) {
+ uassert(40532,
+ str::stream() << "Entry field \"" << filedName << "\" should be "
+ << typeName(expectedType)
+ << ", found: "
+ << typeName(v.getType()),
+ (v.getType() == expectedType));
+}
+
+/**
+ * This stage is used internally for change notifications to close cursor after returning
+ * "invalidate" entries.
+ * It is not intended to be created by the user.
+ */
+class DocumentSourceCloseCursor final : public DocumentSource {
+public:
+ GetNextResult getNext() final;
+
+ const char* getSourceName() const final {
+ // This is used in error reporting.
+ return "$changeNotification";
+ }
+
+ Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final {
+ // This stage is created by the DocumentSourceChangeNotification stage, so serializing it
+ // here would result in it being created twice.
+ return Value();
+ }
+
+ static boost::intrusive_ptr<DocumentSourceCloseCursor> create(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx) {
+ return new DocumentSourceCloseCursor(expCtx);
+ }
+
+private:
+ /**
+ * Use the create static method to create a DocumentSourceCheckResumeToken.
+ */
+ DocumentSourceCloseCursor(const boost::intrusive_ptr<ExpressionContext>& expCtx)
+ : DocumentSource(expCtx) {}
+
+ bool _shouldCloseCursor = false;
+};
+
+DocumentSource::GetNextResult DocumentSourceCloseCursor::getNext() {
+ pExpCtx->checkForInterrupt();
+
+ // Close cursor if we have returned an invalidate entry.
+ if (_shouldCloseCursor) {
+ throw CloseChangeStreamException();
+ }
+
+ auto nextInput = pSource->getNext();
+ if (!nextInput.isAdvanced())
+ return nextInput;
+
+ auto doc = nextInput.getDocument();
+ const auto& kOperationTypeField = DocumentSourceChangeNotification::kOperationTypeField;
+ checkValueType(doc[kOperationTypeField], kOperationTypeField, BSONType::String);
+ if (doc[kOperationTypeField].getString() ==
+ DocumentSourceChangeNotification::kInvalidateOpType) {
+ // Pass the invalidation forward, so that it can be included in the results, or
+ // filtered/transformed by further stages in the pipeline, then throw an exception
+ // to close the cursor on the next call to getNext().
+ _shouldCloseCursor = true;
+ }
+
+ return nextInput;
+}
+
} // namespace
BSONObj DocumentSourceChangeNotification::buildMatchFilter(const NamespaceString& nss,
@@ -193,6 +265,8 @@ list<intrusive_ptr<DocumentSource>> DocumentSourceChangeNotification::createFrom
if (resumeStage) {
stages.push_back(resumeStage);
}
+ auto closeCursorSource = DocumentSourceCloseCursor::create(expCtx);
+ stages.push_back(closeCursorSource);
if (shouldLookupPostImage) {
stages.push_back(DocumentSourceLookupChangePostImage::create(expCtx));
}
@@ -205,17 +279,6 @@ intrusive_ptr<DocumentSource> DocumentSourceChangeNotification::createTransforma
expCtx, stdx::make_unique<Transformation>(changeNotificationSpec), kStageName.toString()));
}
-namespace {
-void checkValueType(Value v, StringData filedName, BSONType expectedType) {
- uassert(40532,
- str::stream() << "Oplog entry field \"" << filedName << "\" should be "
- << typeName(expectedType)
- << ", found: "
- << typeName(v.getType()),
- (v.getType() == expectedType));
-}
-} // namespace
-
Document DocumentSourceChangeNotification::Transformation::applyTransformation(
const Document& input) {
diff --git a/src/mongo/db/pipeline/document_source_change_notification_test.cpp b/src/mongo/db/pipeline/document_source_change_notification_test.cpp
index ffc9460c96d..c0b73d5e691 100644
--- a/src/mongo/db/pipeline/document_source_change_notification_test.cpp
+++ b/src/mongo/db/pipeline/document_source_change_notification_test.cpp
@@ -34,6 +34,7 @@
#include "mongo/bson/bsonobj.h"
#include "mongo/bson/json.h"
#include "mongo/db/pipeline/aggregation_context_fixture.h"
+#include "mongo/db/pipeline/close_change_stream_exception.h"
#include "mongo/db/pipeline/document.h"
#include "mongo/db/pipeline/document_source_change_notification.h"
#include "mongo/db/pipeline/document_source_limit.h"
@@ -77,27 +78,46 @@ public:
}
void checkTransformation(const OplogEntry& entry, const boost::optional<Document> expectedDoc) {
+ vector<intrusive_ptr<DocumentSource>> stages = makeStages(entry);
+ auto transform = stages[2].get();
+
+ auto next = transform->getNext();
+ // Match stage should pass the doc down if expectedDoc is given.
+ ASSERT_EQ(next.isAdvanced(), static_cast<bool>(expectedDoc));
+ if (expectedDoc) {
+ ASSERT_DOCUMENT_EQ(next.releaseDocument(), *expectedDoc);
+ }
+ }
+
+ /**
+ * Returns a list of stages expanded from a $changeNotification specification, starting with a
+ * DocumentSourceMock which contains a single document representing 'entry'.
+ */
+ vector<intrusive_ptr<DocumentSource>> makeStages(const OplogEntry& entry) {
const auto spec = fromjson("{$changeNotification: {}}");
list<intrusive_ptr<DocumentSource>> result =
DSChangeNotification::createFromBson(spec.firstElement(), getExpCtx());
+ vector<intrusive_ptr<DocumentSource>> stages(std::begin(result), std::end(result));
- auto match = dynamic_cast<DocumentSourceMatch*>(result.front().get());
+ auto match = dynamic_cast<DocumentSourceMatch*>(stages[0].get());
ASSERT(match);
auto mock = DocumentSourceMock::create(D(entry.toBSON()));
match->setSource(mock.get());
// Check the oplog entry is transformed correctly.
- auto transform = result.back().get();
+ auto transform = stages[1].get();
ASSERT(transform);
ASSERT_EQ(string(transform->getSourceName()), DSChangeNotification::kStageName);
transform->setSource(match);
- auto next = transform->getNext();
- // Match stage should pass the doc down if expectedDoc is given.
- ASSERT_EQ(next.isAdvanced(), static_cast<bool>(expectedDoc));
- if (expectedDoc) {
- ASSERT_DOCUMENT_EQ(next.releaseDocument(), *expectedDoc);
- }
+ auto closeCursor = stages.back().get();
+ ASSERT(closeCursor);
+ closeCursor->setSource(transform);
+
+ // Include the mock stage in the "stages" so it won't get destroyed outside the function
+ // scope.
+ stages.insert(stages.begin(), mock);
+ return stages;
}
OplogEntry createCommand(const BSONObj& oField) {
@@ -153,11 +173,12 @@ TEST_F(ChangeNotificationStageTest, StagesGeneratedCorrectly) {
list<intrusive_ptr<DocumentSource>> result =
DSChangeNotification::createFromBson(spec.firstElement(), getExpCtx());
-
- ASSERT_EQUALS(result.size(), 2UL);
- ASSERT_TRUE(dynamic_cast<DocumentSourceMatch*>(result.front().get()));
- ASSERT_EQUALS(string(result.front()->getSourceName()), DSChangeNotification::kStageName);
- ASSERT_EQUALS(string(result.back()->getSourceName()), DSChangeNotification::kStageName);
+ vector<intrusive_ptr<DocumentSource>> stages(std::begin(result), std::end(result));
+ ASSERT_EQUALS(stages.size(), 3UL);
+ ASSERT_TRUE(dynamic_cast<DocumentSourceMatch*>(stages.front().get()));
+ ASSERT_EQUALS(string(stages[0]->getSourceName()), DSChangeNotification::kStageName);
+ ASSERT_EQUALS(string(stages[1]->getSourceName()), DSChangeNotification::kStageName);
+ ASSERT_EQUALS(string(stages[2]->getSourceName()), DSChangeNotification::kStageName);
// TODO: Check explain result.
}
@@ -295,9 +316,10 @@ TEST_F(ChangeNotificationStageTest, TransformationShouldBeAbleToReParseSerialize
auto expCtx = getExpCtx();
auto originalSpec = BSON(DSChangeNotification::kStageName << BSONObj());
- auto allStages = DSChangeNotification::createFromBson(originalSpec.firstElement(), expCtx);
- ASSERT_EQ(allStages.size(), 2UL);
- auto stage = allStages.back();
+ auto result = DSChangeNotification::createFromBson(originalSpec.firstElement(), expCtx);
+ vector<intrusive_ptr<DocumentSource>> allStages(std::begin(result), std::end(result));
+ ASSERT_EQ(allStages.size(), 3UL);
+ auto stage = allStages[1];
ASSERT(dynamic_cast<DocumentSourceSingleDocumentTransformation*>(stage.get()));
//
@@ -324,5 +346,36 @@ TEST_F(ChangeNotificationStageTest, TransformationShouldBeAbleToReParseSerialize
ASSERT_VALUE_EQ(newSerialization[0], serialization[0]);
}
+TEST_F(ChangeNotificationStageTest, CloseCursorOnInvalidateEntries) {
+ OplogEntry dropColl = createCommand(BSON("drop" << nss.coll()));
+ auto stages = makeStages(dropColl);
+ auto closeCursor = stages.back();
+
+ Document expectedInvalidate{
+ {DSChangeNotification::kIdField, D{{"ts", ts}, {"ns", nss.getCommandNS().ns()}}},
+ {DSChangeNotification::kOperationTypeField, DSChangeNotification::kInvalidateOpType},
+ {DSChangeNotification::kFullDocumentField, BSONNULL},
+ };
+
+ auto next = closeCursor->getNext();
+ // Transform into invalidate entry.
+ ASSERT_DOCUMENT_EQ(next.releaseDocument(), expectedInvalidate);
+ // Then throw an exception on the next call of getNext().
+ ASSERT_THROWS_CODE(
+ closeCursor->getNext(), CloseChangeStreamException, ErrorCodes::CloseChangeStream);
+}
+
+TEST_F(ChangeNotificationStageTest, CloseCursorEvenIfInvalidateEntriesGetFilteredOut) {
+ OplogEntry dropColl = createCommand(BSON("drop" << nss.coll()));
+ auto stages = makeStages(dropColl);
+ auto closeCursor = stages.back();
+ // Add a match stage after change stream to filter out the invalidate entries.
+ auto match = DocumentSourceMatch::create(fromjson("{operationType: 'insert'}"), getExpCtx());
+ match->setSource(closeCursor.get());
+
+ // Throw an exception on the call of getNext().
+ ASSERT_THROWS_CODE(match->getNext(), CloseChangeStreamException, ErrorCodes::CloseChangeStream);
+}
+
} // namespace
} // namespace mongo