diff options
-rw-r--r-- | src/mongo/db/cursor_manager.cpp | 9 | ||||
-rw-r--r-- | src/mongo/db/cursor_manager.h | 2 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_change_stream_test.cpp | 40 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_change_stream_transform.cpp | 26 |
4 files changed, 47 insertions, 30 deletions
diff --git a/src/mongo/db/cursor_manager.cpp b/src/mongo/db/cursor_manager.cpp index d58e25bbbc7..5f6d5d754b8 100644 --- a/src/mongo/db/cursor_manager.cpp +++ b/src/mongo/db/cursor_manager.cpp @@ -757,15 +757,6 @@ void CursorManager::unpin(OperationContext* opCtx, cursor.release(); } -void CursorManager::getCursorIds(std::set<CursorId>* openCursors) const { - auto allPartitions = _cursorMap->lockAllPartitions(); - for (auto&& partition : allPartitions) { - for (auto&& entry : partition) { - openCursors->insert(entry.first); - } - } -} - void CursorManager::appendActiveSessions(LogicalSessionIdSet* lsids) const { auto allPartitions = _cursorMap->lockAllPartitions(); for (auto&& partition : allPartitions) { diff --git a/src/mongo/db/cursor_manager.h b/src/mongo/db/cursor_manager.h index 6817455ec0d..350e23b62d3 100644 --- a/src/mongo/db/cursor_manager.h +++ b/src/mongo/db/cursor_manager.h @@ -212,8 +212,6 @@ public: */ Status checkAuthForKillCursors(OperationContext* opCtx, CursorId id); - void getCursorIds(std::set<CursorId>* openCursors) const; - /** * Appends sessions that have open cursors in this cursor manager to the given set of lsids. */ diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp index 846186da260..5616e829b9e 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp @@ -51,6 +51,7 @@ #include "mongo/db/repl/oplog_entry.h" #include "mongo/db/repl/replication_coordinator_mock.h" #include "mongo/stdx/memory.h" +#include "mongo/unittest/death_test.h" #include "mongo/unittest/unittest.h" #include "mongo/util/uuid.h" @@ -704,6 +705,45 @@ TEST_F(ChangeStreamStageTest, TransformEmptyApplyOps) { ASSERT_EQ(results.size(), 0u); } +DEATH_TEST_F(ChangeStreamStageTest, ShouldCrashWithNoopInsideApplyOps, "Unexpected noop") { + Document applyOpsDoc = + Document{{"applyOps", + Value{std::vector<Document>{ + Document{{"op", "n"_sd}, + {"ns", nss.ns()}, + {"ui", testUuid()}, + {"o", Value{Document{{"_id", 123}, {"x", "hallo"_sd}}}}}}}}}; + LogicalSessionFromClient lsid = testLsid(); + getApplyOpsResults(applyOpsDoc, lsid); // Should crash. +} + +DEATH_TEST_F(ChangeStreamStageTest, + ShouldCrashWithEntryWithoutOpFieldInsideApplyOps, + "Unexpected format for entry") { + Document applyOpsDoc = + Document{{"applyOps", + Value{std::vector<Document>{ + Document{{"ns", nss.ns()}, + {"ui", testUuid()}, + {"o", Value{Document{{"_id", 123}, {"x", "hallo"_sd}}}}}}}}}; + LogicalSessionFromClient lsid = testLsid(); + getApplyOpsResults(applyOpsDoc, lsid); // Should crash. +} + +DEATH_TEST_F(ChangeStreamStageTest, + ShouldCrashWithEntryWithNonStringOpFieldInsideApplyOps, + "Unexpected format for entry") { + Document applyOpsDoc = + Document{{"applyOps", + Value{std::vector<Document>{ + Document{{"op", 2}, + {"ns", nss.ns()}, + {"ui", testUuid()}, + {"o", Value{Document{{"_id", 123}, {"x", "hallo"_sd}}}}}}}}}; + LogicalSessionFromClient lsid = testLsid(); + getApplyOpsResults(applyOpsDoc, lsid); // Should crash. +} + TEST_F(ChangeStreamStageTest, TransformNonTransactionApplyOps) { BSONObj applyOpsObj = Document{{"applyOps", Value{std::vector<Document>{Document{ diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp index 99729a4d0de..cb666b1ec8b 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp @@ -65,22 +65,6 @@ using std::vector; namespace { constexpr auto checkValueType = &DocumentSourceChangeStream::checkValueType; - -bool isOpTypeRelevant(const Document& d) { - Value op = d["op"]; - invariant(!op.missing()); - - if (op.getString() != "n") { - return true; - } - - Value type = d.getNestedField("o2.type"); - if (!type.missing() && type.getString() == "migrateChunkToNewShard") { - return true; - } - - return false; -} } // namespace DocumentSourceChangeStreamTransform::DocumentSourceChangeStreamTransform( @@ -413,9 +397,13 @@ DocumentSource::GetNextResult DocumentSourceChangeStreamTransform::getNext() { } bool DocumentSourceChangeStreamTransform::isDocumentRelevant(const Document& d) { - if (!isOpTypeRelevant(d)) { - return false; - } + invariant( + d["op"].getType() == BSONType::String, + str::stream() + << "Unexpected format for entry within a transaction oplog entry: 'op' field was type " + << typeName(d["op"].getType())); + invariant(ValueComparator::kInstance.evaluate(d["op"] != Value("n"_sd)), + "Unexpected noop entry within a transaction"); Value nsField = d["ns"]; invariant(!nsField.missing()); |