summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/cursor_manager.cpp9
-rw-r--r--src/mongo/db/cursor_manager.h2
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_test.cpp40
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_transform.cpp26
4 files changed, 47 insertions, 30 deletions
diff --git a/src/mongo/db/cursor_manager.cpp b/src/mongo/db/cursor_manager.cpp
index d58e25bbbc7..5f6d5d754b8 100644
--- a/src/mongo/db/cursor_manager.cpp
+++ b/src/mongo/db/cursor_manager.cpp
@@ -757,15 +757,6 @@ void CursorManager::unpin(OperationContext* opCtx,
cursor.release();
}
-void CursorManager::getCursorIds(std::set<CursorId>* openCursors) const {
- auto allPartitions = _cursorMap->lockAllPartitions();
- for (auto&& partition : allPartitions) {
- for (auto&& entry : partition) {
- openCursors->insert(entry.first);
- }
- }
-}
-
void CursorManager::appendActiveSessions(LogicalSessionIdSet* lsids) const {
auto allPartitions = _cursorMap->lockAllPartitions();
for (auto&& partition : allPartitions) {
diff --git a/src/mongo/db/cursor_manager.h b/src/mongo/db/cursor_manager.h
index 6817455ec0d..350e23b62d3 100644
--- a/src/mongo/db/cursor_manager.h
+++ b/src/mongo/db/cursor_manager.h
@@ -212,8 +212,6 @@ public:
*/
Status checkAuthForKillCursors(OperationContext* opCtx, CursorId id);
- void getCursorIds(std::set<CursorId>* openCursors) const;
-
/**
* Appends sessions that have open cursors in this cursor manager to the given set of lsids.
*/
diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
index 846186da260..5616e829b9e 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
@@ -51,6 +51,7 @@
#include "mongo/db/repl/oplog_entry.h"
#include "mongo/db/repl/replication_coordinator_mock.h"
#include "mongo/stdx/memory.h"
+#include "mongo/unittest/death_test.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/uuid.h"
@@ -704,6 +705,45 @@ TEST_F(ChangeStreamStageTest, TransformEmptyApplyOps) {
ASSERT_EQ(results.size(), 0u);
}
+DEATH_TEST_F(ChangeStreamStageTest, ShouldCrashWithNoopInsideApplyOps, "Unexpected noop") {
+ Document applyOpsDoc =
+ Document{{"applyOps",
+ Value{std::vector<Document>{
+ Document{{"op", "n"_sd},
+ {"ns", nss.ns()},
+ {"ui", testUuid()},
+ {"o", Value{Document{{"_id", 123}, {"x", "hallo"_sd}}}}}}}}};
+ LogicalSessionFromClient lsid = testLsid();
+ getApplyOpsResults(applyOpsDoc, lsid); // Should crash.
+}
+
+DEATH_TEST_F(ChangeStreamStageTest,
+ ShouldCrashWithEntryWithoutOpFieldInsideApplyOps,
+ "Unexpected format for entry") {
+ Document applyOpsDoc =
+ Document{{"applyOps",
+ Value{std::vector<Document>{
+ Document{{"ns", nss.ns()},
+ {"ui", testUuid()},
+ {"o", Value{Document{{"_id", 123}, {"x", "hallo"_sd}}}}}}}}};
+ LogicalSessionFromClient lsid = testLsid();
+ getApplyOpsResults(applyOpsDoc, lsid); // Should crash.
+}
+
+DEATH_TEST_F(ChangeStreamStageTest,
+ ShouldCrashWithEntryWithNonStringOpFieldInsideApplyOps,
+ "Unexpected format for entry") {
+ Document applyOpsDoc =
+ Document{{"applyOps",
+ Value{std::vector<Document>{
+ Document{{"op", 2},
+ {"ns", nss.ns()},
+ {"ui", testUuid()},
+ {"o", Value{Document{{"_id", 123}, {"x", "hallo"_sd}}}}}}}}};
+ LogicalSessionFromClient lsid = testLsid();
+ getApplyOpsResults(applyOpsDoc, lsid); // Should crash.
+}
+
TEST_F(ChangeStreamStageTest, TransformNonTransactionApplyOps) {
BSONObj applyOpsObj = Document{{"applyOps",
Value{std::vector<Document>{Document{
diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
index 99729a4d0de..cb666b1ec8b 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
@@ -65,22 +65,6 @@ using std::vector;
namespace {
constexpr auto checkValueType = &DocumentSourceChangeStream::checkValueType;
-
-bool isOpTypeRelevant(const Document& d) {
- Value op = d["op"];
- invariant(!op.missing());
-
- if (op.getString() != "n") {
- return true;
- }
-
- Value type = d.getNestedField("o2.type");
- if (!type.missing() && type.getString() == "migrateChunkToNewShard") {
- return true;
- }
-
- return false;
-}
} // namespace
DocumentSourceChangeStreamTransform::DocumentSourceChangeStreamTransform(
@@ -413,9 +397,13 @@ DocumentSource::GetNextResult DocumentSourceChangeStreamTransform::getNext() {
}
bool DocumentSourceChangeStreamTransform::isDocumentRelevant(const Document& d) {
- if (!isOpTypeRelevant(d)) {
- return false;
- }
+ invariant(
+ d["op"].getType() == BSONType::String,
+ str::stream()
+ << "Unexpected format for entry within a transaction oplog entry: 'op' field was type "
+ << typeName(d["op"].getType()));
+ invariant(ValueComparator::kInstance.evaluate(d["op"] != Value("n"_sd)),
+ "Unexpected noop entry within a transaction");
Value nsField = d["ns"];
invariant(!nsField.missing());