diff options
33 files changed, 740 insertions, 110 deletions
diff --git a/buildscripts/resmokeconfig/suites/cst_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/cst_jscore_passthrough.yml index 66614e15afb..ac000fef42d 100755 --- a/buildscripts/resmokeconfig/suites/cst_jscore_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/cst_jscore_passthrough.yml @@ -73,6 +73,7 @@ selector: - jstests/core/bench_test2.js - jstests/core/bindata_eq.js - jstests/core/bindata_indexonly.js + - jstests/core/batched_multi_deletes.js - jstests/core/capped_update.js - jstests/core/collation.js - jstests/core/collation_find_and_modify.js diff --git a/buildscripts/resmokelib/testing/fixtures/standalone.py b/buildscripts/resmokelib/testing/fixtures/standalone.py index b6375141305..394c025d510 100644 --- a/buildscripts/resmokelib/testing/fixtures/standalone.py +++ b/buildscripts/resmokelib/testing/fixtures/standalone.py @@ -345,6 +345,11 @@ class MongodLauncher(object): if "replSet" in mongod_options and "configsvr" in mongod_options: mongod_options["storageEngine"] = "wiredTiger" + # TODO (SERVER-63044): Remove the block below. + if executable.endswith("mongod"): + if "internalBatchUserMultiDeletesForTest" not in suite_set_parameters: + suite_set_parameters["internalBatchUserMultiDeletesForTest"] = 1 + return self.fixturelib.mongod_program(logger, job_num, executable, process_kwargs, mongod_options) diff --git a/jstests/core/batched_multi_deletes.js b/jstests/core/batched_multi_deletes.js new file mode 100644 index 00000000000..894c68e8a59 --- /dev/null +++ b/jstests/core/batched_multi_deletes.js @@ -0,0 +1,60 @@ +/** + * Tests batch-deleting a large range of data. + * + * @tags: [ + * does_not_support_retryable_writes, + * # TODO (SERVER-55909): make WUOW 'groupOplogEntries' the only mode of operation. + * does_not_support_transactions, + * multiversion_incompatible, + * requires_fcv_60, + * requires_getmore, + * requires_non_retryable_writes, + * # TODO (SERVER-63044): namespace for this test is hardcoded, tenant migrations rename it. + * tenant_migration_incompatible, + * ] + */ + +(function() { +"use strict"; + +function populateAndMassDelete(queryPredicate) { + // '__internalBatchedDeletesTesting.Collection0' is a special, hardcoded namespace that batches + // multi-doc deletes if the 'internalBatchUserMultiDeletesForTest' server parameter is set. + // TODO (SERVER-63044): remove this special handling. + const testDB = db.getSiblingDB('__internalBatchedDeletesTesting'); + const coll = testDB['Collection0']; + + const collCount = + 54321; // Intentionally not a multiple of BatchedDeleteStageBatchParams::targetBatchDocs. + + coll.drop(); + assert.commandWorked(coll.insertMany([...Array(collCount).keys()].map(x => ({_id: x, a: x})))); + + assert.eq(collCount, coll.find().itcount()); + + // Verify the delete will involve the BATCHED_DELETE stage. + const expl = testDB.runCommand({ + explain: {delete: coll.getName(), deletes: [{q: {_id: {$gte: 0}}, limit: 0}]}, + verbosity: "executionStats" + }); + assert.commandWorked(expl); + + if (expl["queryPlanner"]["winningPlan"]["stage"] === "SHARD_WRITE") { + // This is a sharded cluster. Verify all shards execute the BATCHED_DELETE stage. + for (let shard of expl["queryPlanner"]["winningPlan"]["shards"]) { + assert.eq(shard["winningPlan"]["stage"], "BATCHED_DELETE"); + } + } else { + // Non-sharded + assert.eq(expl["queryPlanner"]["winningPlan"]["stage"], "BATCHED_DELETE"); + } + + // Execute and verify the deletion. + assert.eq(collCount, coll.find().itcount()); + assert.commandWorked(coll.deleteMany(queryPredicate)); + assert.eq(0, coll.find().itcount()); +} + +populateAndMassDelete({_id: {$gte: 0}}); +populateAndMassDelete({a: {$gte: 0}}); +})(); diff --git a/jstests/noPassthrough/batched_multi_deletes.js b/jstests/noPassthrough/batched_multi_deletes.js index 8577c537140..98aedceddb8 100644 --- a/jstests/noPassthrough/batched_multi_deletes.js +++ b/jstests/noPassthrough/batched_multi_deletes.js @@ -2,6 +2,8 @@ * Validate basic batched multi-deletion functionality. * * @tags: [ + * # Running as a replica set requires journaling. + * requires_journaling, * ] */ @@ -9,70 +11,87 @@ "use strict"; load("jstests/libs/analyze_plan.js"); -const conn = MongoRunner.runMongod(); +function validateBatchedDeletes(conn) { + // '__internalBatchedDeletesTesting.Collection0' is a special, hardcoded namespace that batches + // multi-doc deletes if the 'internalBatchUserMultiDeletesForTest' server parameter is set. + // TODO (SERVER-63044): remove this special handling. + const db = conn.getDB("__internalBatchedDeletesTesting"); + const coll = db.getCollection('Collection0'); + const collName = coll.getName(); -const db = conn.getDB("__internalBatchedDeletesTesting"); -const coll = db.getCollection('Collection0'); -const collName = coll.getName(); -const ns = coll.getFullName(); + const docsPerBatchDefault = 100; // BatchedDeleteStageBatchParams::targetBatchDocs + const collCount = + 5017; // Intentionally not a multiple of BatchedDeleteStageBatchParams::targetBatchDocs. -const docsPerBatchDefault = 100; // BatchedDeleteStageBatchParams::targetBatchDocs -const collCount = - 5017; // Intentionally not a multiple of BatchedDeleteStageBatchParams::targetBatchDocs. + function validateDeletion(db, coll, docsPerBatch) { + coll.drop(); + assert.commandWorked(coll.insertMany( + [...Array(collCount).keys()].map(x => ({_id: x, a: "a".repeat(1024)})))); + + const serverStatusBatchesBefore = db.serverStatus()['batchedDeletes']['batches']; + const serverStatusDocsBefore = db.serverStatus()['batchedDeletes']['docs']; + + assert.eq(collCount, coll.find().itcount()); + assert.commandWorked(coll.deleteMany({_id: {$gte: 0}})); + assert.eq(0, coll.find().itcount()); + + const serverStatusBatchesAfter = db.serverStatus()['batchedDeletes']['batches']; + const serverStatusDocsAfter = db.serverStatus()['batchedDeletes']['docs']; + const serverStatusBatchesExpected = + serverStatusBatchesBefore + Math.ceil(collCount / docsPerBatch); + const serverStatusDocsExpected = serverStatusDocsBefore + collCount; + assert.eq(serverStatusBatchesAfter, serverStatusBatchesExpected); + assert.eq(serverStatusDocsAfter, serverStatusDocsExpected); + } -function validateDeletion(db, coll, docsPerBatch) { coll.drop(); assert.commandWorked( coll.insertMany([...Array(collCount).keys()].map(x => ({_id: x, a: "a".repeat(1024)})))); - const serverStatusBatchesBefore = db.serverStatus()['batchedDeletes']['batches']; - const serverStatusDocsBefore = db.serverStatus()['batchedDeletes']['docs']; - - assert.eq(collCount, coll.find().itcount()); - assert.commandWorked(coll.deleteMany({_id: {$gte: 0}})); - assert.eq(0, coll.find().itcount()); - - const serverStatusBatchesAfter = db.serverStatus()['batchedDeletes']['batches']; - const serverStatusDocsAfter = db.serverStatus()['batchedDeletes']['docs']; - const serverStatusBatchesExpected = - serverStatusBatchesBefore + Math.ceil(collCount / docsPerBatch); - const serverStatusDocsExpected = serverStatusDocsBefore + collCount; - assert.eq(serverStatusBatchesAfter, serverStatusBatchesExpected); - assert.eq(serverStatusDocsAfter, serverStatusDocsExpected); -} + assert.commandWorked( + db.adminCommand({setParameter: 1, internalBatchUserMultiDeletesForTest: 1})); -coll.drop(); -assert.commandWorked( - coll.insertMany([...Array(collCount).keys()].map(x => ({_id: x, a: "a".repeat(1024)})))); + // Explain plan and executionStats. + { + const expl = db.runCommand({ + explain: {delete: collName, deletes: [{q: {_id: {$gte: 0}}, limit: 0}]}, + verbosity: "executionStats" + }); + assert.commandWorked(expl); -assert.commandWorked(db.adminCommand({setParameter: 1, internalBatchUserMultiDeletesForTest: 1})); + assert(getPlanStage(expl, "BATCHED_DELETE")); + assert.eq(0, expl.executionStats.nReturned); + assert.eq(collCount, expl.executionStats.totalDocsExamined); + assert.eq(collCount, expl.executionStats.totalKeysExamined); + assert.eq(0, expl.executionStats.executionStages.nReturned); + assert.eq(1, expl.executionStats.executionStages.isEOF); + } -// Batched multi-deletion is only available for multi:true deletes. -assert.commandFailedWithCode( - db.runCommand({delete: collName, deletes: [{q: {_id: {$gte: 0}}, limit: 1}]}), 6303800); + // Actual deletion. + for (const docsPerBatch of [10, docsPerBatchDefault]) { + assert.commandWorked( + db.adminCommand({setParameter: 1, batchedDeletesTargetBatchDocs: docsPerBatch})); + validateDeletion(db, coll, docsPerBatch); + } +} -// Explain plan and executionStats. +// Standalone { - const expl = db.runCommand({ - explain: {delete: collName, deletes: [{q: {_id: {$gte: 0}}, limit: 0}]}, - verbosity: "executionStats" - }); - assert.commandWorked(expl); - - assert(getPlanStage(expl, "BATCHED_DELETE")); - assert.eq(0, expl.executionStats.nReturned); - assert.eq(collCount, expl.executionStats.totalDocsExamined); - assert.eq(collCount, expl.executionStats.totalKeysExamined); - assert.eq(0, expl.executionStats.executionStages.nReturned); - assert.eq(1, expl.executionStats.executionStages.isEOF); + const conn = MongoRunner.runMongod(); + validateBatchedDeletes(conn); + MongoRunner.stopMongod(conn); } -// Actual deletion. -for (const docsPerBatch of [10, docsPerBatchDefault]) { - assert.commandWorked( - db.adminCommand({setParameter: 1, batchedDeletesTargetBatchDocs: docsPerBatch})); - validateDeletion(db, coll, docsPerBatch); +// Replica set +{ + const rst = new ReplSetTest({ + name: "batched_multi_deletes_test", + nodes: 2, + }); + rst.startSet(); + rst.initiate(); + rst.awaitNodesAgreeOnPrimary(); + validateBatchedDeletes(rst.getPrimary()); + rst.stopSet(); } - -MongoRunner.stopMongod(conn); })(); diff --git a/jstests/noPassthrough/batched_multi_deletes_WC.js b/jstests/noPassthrough/batched_multi_deletes_WC.js index 911b22da1d2..a59d0549101 100644 --- a/jstests/noPassthrough/batched_multi_deletes_WC.js +++ b/jstests/noPassthrough/batched_multi_deletes_WC.js @@ -15,7 +15,9 @@ load("jstests/libs/fail_point_util.js"); // For 'configureFailPoint()' const conn = MongoRunner.runMongod(); -// This specific namespace is required to activate batched multi-delete behavior. +// '__internalBatchedDeletesTesting.Collection0' is a special, hardcoded namespace that batches +// multi-doc deletes if the 'internalBatchUserMultiDeletesForTest' server parameter is set. +// TODO (SERVER-63044): remove this special handling. const testDB = conn.getDB("__internalBatchedDeletesTesting"); const coll = testDB.getCollection("Collection0"); const collName = coll.getName(); diff --git a/jstests/noPassthrough/batched_multi_deletes_oplog.js b/jstests/noPassthrough/batched_multi_deletes_oplog.js new file mode 100644 index 00000000000..b27a1e6d6e8 --- /dev/null +++ b/jstests/noPassthrough/batched_multi_deletes_oplog.js @@ -0,0 +1,118 @@ +/** + * Validate oplog behaviour of batched multi-deletes. + * + * @tags: [ + * # Running as a replica set requires journaling. + * requires_journaling, + * ] + */ + +(function() { +"use strict"; + +// Verifies that batches replicate as applyOps entries. +function validateBatchedDeletesOplogDocsPerBatch(conn) { + // '__internalBatchedDeletesTesting.Collection0' is a special, hardcoded namespace that batches + // multi-doc deletes if the 'internalBatchUserMultiDeletesForTest' server parameter is set. + // TODO (SERVER-63044): remove this special handling. + const db = conn.getDB("__internalBatchedDeletesTesting"); + const coll = db.getCollection('Collection0'); + + const docsPerBatch = 100; + const collCount = 5017; // Intentionally not a multiple of docsPerBatch. + + assert.commandWorked( + db.adminCommand({setParameter: 1, internalBatchUserMultiDeletesForTest: 1})); + // Disable time-based batching + assert.commandWorked(db.adminCommand({setParameter: 1, batchedDeletesTargetBatchBytes: 0})); + // Disable size-based batching + assert.commandWorked(db.adminCommand({setParameter: 1, batchedDeletesTargetBatchTimeMS: 0})); + // Set docs per batch target + assert.commandWorked( + db.adminCommand({setParameter: 1, batchedDeletesTargetBatchDocs: docsPerBatch})); + + coll.drop(); + assert.commandWorked( + coll.insertMany([...Array(collCount).keys()].map(x => ({_id: x, a: "a".repeat(1024)})))); + + assert.eq(collCount, coll.find().itcount()); + assert.commandWorked(coll.deleteMany({_id: {$gte: 0}})); + assert.eq(0, coll.find().itcount()); + + // The deletion replicates as one applyOps per batch + const applyOpsEntriesFull = + db.getSiblingDB('local').oplog.rs.find({'o.applyOps': {$size: docsPerBatch}}).itcount(); + const applyOpsEntriesLast = + db.getSiblingDB('local') + .oplog.rs.find({'o.applyOps': {$size: collCount % docsPerBatch}}) + .itcount(); + const expectedApplyOpsEntries = Math.ceil(collCount / docsPerBatch); + assert.eq(applyOpsEntriesFull + applyOpsEntriesLast, expectedApplyOpsEntries); +} + +// Verifies that a large batch that would result in an applyOps entry beyond the 16MB BSON limit +// generates more than one applyOps entry. +function validateBatchedDeletesOplogBatchAbove16MB(conn) { + const db = conn.getDB("__internalBatchedDeletesTesting"); + const coll = db.getCollection('Collection0'); + + // With _id's of ObjectId type, and applyOps entry reaches the 16MB BSON limit at ~140k entries. + // Create a collection >> 140k documents, make the docsPerBatch target high enough to hit the + // 16MB BSON limit with applyOps, and disable other batch tunables that could cut a batch + // earlier than expected. + const docsPerBatch = 500000; + const collCount = 200000; + assert.commandWorked( + db.adminCommand({setParameter: 1, internalBatchUserMultiDeletesForTest: 1})); + // Disable time-based batching + assert.commandWorked(db.adminCommand({setParameter: 1, batchedDeletesTargetBatchBytes: 0})); + // Disable size-based batching + assert.commandWorked(db.adminCommand({setParameter: 1, batchedDeletesTargetBatchTimeMS: 0})); + // Set artificially high docs per batch target + assert.commandWorked( + db.adminCommand({setParameter: 1, batchedDeletesTargetBatchDocs: docsPerBatch})); + + coll.drop(); + assert.commandWorked(coll.insertMany([...Array(collCount).keys()].map(x => ({_id: x})))); + + assert.eq(collCount, coll.find().itcount()); + assert.commandWorked(coll.deleteMany({_id: {$gte: 0}})); + assert.eq(0, coll.find().itcount()); + + // The whole deletion replicates as two applyOps. + const oplogEntriesApplyOps = db.getSiblingDB('local') + .oplog.rs + .aggregate([ + { + $match: { + ns: "admin.$cmd", + 'o.applyOps.op': 'd', + 'o.applyOps.ns': coll.getFullName() + } + }, + {$project: {opsPerApplyOps: {$size: '$o.applyOps'}}} + ]) + .toArray(); + assert.eq(2, oplogEntriesApplyOps.length); + assert.eq( + collCount, + oplogEntriesApplyOps[0]['opsPerApplyOps'] + oplogEntriesApplyOps[1]['opsPerApplyOps']); +} + +function runTestInIsolation(testFunc) { + const rst = new ReplSetTest({ + name: "batched_multi_deletes_test", + nodes: 1, + }); + rst.startSet(); + rst.initiate(); + rst.awaitNodesAgreeOnPrimary(); + testFunc(rst.getPrimary()); + rst.stopSet(); +} + +runTestInIsolation(validateBatchedDeletesOplogDocsPerBatch); + +// TODO (SERVER-64860): re-enable this test. +// runTestInIsolation(validateBatchedDeletesOplogBatchAbove16MB); +})(); diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 9dc2cae6191..71b880f137b 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -1007,6 +1007,7 @@ env.Library( '$BUILD_DIR/mongo/db/timeseries/bucket_catalog', '$BUILD_DIR/mongo/s/coreshard', '$BUILD_DIR/mongo/s/grid', + 'batched_write_context', 'catalog/collection_options', 'catalog/database_holder', 'op_observer', @@ -1030,6 +1031,16 @@ env.Library( ) env.Library( + target="batched_write_context", + source=[ + "batched_write_context.cpp", + ], + LIBDEPS=[ + "$BUILD_DIR/mongo/base", + ], +) + +env.Library( target="fcv_op_observer", source=[ "fcv_op_observer.cpp", diff --git a/src/mongo/db/auth/auth_op_observer.h b/src/mongo/db/auth/auth_op_observer.h index a2a0183f65f..677e21e672c 100644 --- a/src/mongo/db/auth/auth_op_observer.h +++ b/src/mongo/db/auth/auth_op_observer.h @@ -208,6 +208,8 @@ public: void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) final {} + void onBatchedWriteCommit(OperationContext* opCtx) final {} + void onMajorityCommitPointUpdate(ServiceContext* service, const repl::OpTime& newCommitPoint) final {} diff --git a/src/mongo/db/batched_write_context.cpp b/src/mongo/db/batched_write_context.cpp new file mode 100644 index 00000000000..2a9517d9fe3 --- /dev/null +++ b/src/mongo/db/batched_write_context.cpp @@ -0,0 +1,73 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/batched_write_context.h" +#include "mongo/db/repl/oplog_entry.h" + +namespace mongo { +const OperationContext::Decoration<BatchedWriteContext> BatchedWriteContext::get = + OperationContext::declareDecoration<BatchedWriteContext>(); + +BatchedWriteContext::BatchedWriteContext() {} + +void BatchedWriteContext::addBatchedOperation(OperationContext* opCtx, + const repl::ReplOperation& operation) { + invariant(_batchWrites); + + // Current support is only limited to delete operations, no change stream pre-images, no + // multi-doc transactions, no retryable writes. + invariant(operation.getOpType() == repl::OpTypeEnum::kDelete); + invariant(operation.getChangeStreamPreImageRecordingMode() == + repl::ReplOperation::ChangeStreamPreImageRecordingMode::kOff); + invariant(!opCtx->inMultiDocumentTransaction()); + invariant(!opCtx->getTxnNumber()); + invariant(opCtx->lockState()->inAWriteUnitOfWork()); + + _batchedOperations.push_back(operation); +} + +std::vector<repl::ReplOperation>& BatchedWriteContext::getBatchedOperations( + OperationContext* opCtx) { + invariant(_batchWrites); + return _batchedOperations; +} + +void BatchedWriteContext::clearBatchedOperations(OperationContext* opCtx) { + invariant(_batchWrites); + _batchedOperations.clear(); +} + +bool BatchedWriteContext::writesAreBatched() const { + return _batchWrites; +} +void BatchedWriteContext::setWritesAreBatched(bool batched) { + _batchWrites = batched; +} + +} // namespace mongo diff --git a/src/mongo/db/batched_write_context.h b/src/mongo/db/batched_write_context.h new file mode 100644 index 00000000000..7aa5ec678e1 --- /dev/null +++ b/src/mongo/db/batched_write_context.h @@ -0,0 +1,82 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/operation_context.h" +#include "mongo/db/repl/oplog_entry.h" + +namespace mongo { + +/** + * Group multiple writes into a single applyOps entry. + */ + +/** + * This class is a decoration on the OperationContext holding context of writes that are logically + * related with each other. It can be used to stage writes belonging to the same WriteUnitOfWork or + * multi-document transaction. Currently only supports batching deletes in a WriteUnitOfWork. + */ +class BatchedWriteContext { +public: + static const OperationContext::Decoration<BatchedWriteContext> get; + + BatchedWriteContext(); + + // No copy and no move + BatchedWriteContext(const BatchedWriteContext&) = delete; + BatchedWriteContext(BatchedWriteContext&&) = delete; + BatchedWriteContext& operator=(const BatchedWriteContext&) = delete; + BatchedWriteContext& operator=(BatchedWriteContext&&) = delete; + + bool writesAreBatched() const; + void setWritesAreBatched(bool batched); + + /** + * Adds a stored operation to the list of stored operations for the current WUOW. It is illegal + * to add operations outside of a WUOW. + */ + void addBatchedOperation(OperationContext* opCtx, const repl::ReplOperation& operation); + + // Returns a reference to the stored operations for the current WUOW. + std::vector<repl::ReplOperation>& getBatchedOperations(OperationContext* opCtx); + void clearBatchedOperations(OperationContext* opCtx); + +private: + // Whether batching writes is enabled. + bool _batchWrites = false; + + /** + * Holds oplog data for operations which have been applied in the current batched + * write context. + */ + std::vector<repl::ReplOperation> _batchedOperations; +}; + +} // namespace mongo diff --git a/src/mongo/db/exec/batched_delete_stage.cpp b/src/mongo/db/exec/batched_delete_stage.cpp index bc0b1059861..7c4fac03e24 100644 --- a/src/mongo/db/exec/batched_delete_stage.cpp +++ b/src/mongo/db/exec/batched_delete_stage.cpp @@ -127,7 +127,7 @@ BatchedDeleteStage::BatchedDeleteStage(ExpressionContext* expCtx, : DeleteStage::DeleteStage( kStageType.rawData(), expCtx, std::move(params), ws, collection, child), _batchParams(std::move(batchParams)) { - uassert(6303800, + tassert(6303800, "batched deletions only support multi-document deletions (multi: true)", _params->isMulti); tassert(6303801, @@ -165,14 +165,13 @@ PlanStage::StageState BatchedDeleteStage::_deleteBatch(WorkingSetID* out) { std::terminate(); } - // TODO (SERVER-63047): use a single write timestamp by grouping oplog entries. - opCtx()->recoveryUnit()->ignoreAllMultiTimestampConstraints(); - const auto startOfBatchTimestampMillis = Date_t::now().toMillisSinceEpoch(); unsigned int docsDeleted = 0; std::vector<RecordId> recordsThatNoLongerMatch; try { - WriteUnitOfWork wuow(opCtx()); + // Start a WUOW with 'groupOplogEntries' which groups a delete batch into a single timestamp + // and oplog entry + WriteUnitOfWork wuow(opCtx(), true /* groupOplogEntries */); for (auto& [rid, snapshotId] : _ridMap) { if (MONGO_unlikely(throwWriteConflictExceptionInBatchedDeleteStage.shouldFail())) { diff --git a/src/mongo/db/exec/batched_delete_stage.idl b/src/mongo/db/exec/batched_delete_stage.idl index 6c1ed0de55b..3794cc1836d 100644 --- a/src/mongo/db/exec/batched_delete_stage.idl +++ b/src/mongo/db/exec/batched_delete_stage.idl @@ -46,7 +46,7 @@ server_parameters: set_at: [startup, runtime] cpp_vartype: 'AtomicWord<long long>' cpp_varname: "gBatchedDeletesTargetBatchDocs" - default: 100 + default: 10 # TODO (SERVER-64547): re-evaluate this default. validator: gte: 0 batchedDeletesTargetBatchTimeMS: diff --git a/src/mongo/db/fcv_op_observer.h b/src/mongo/db/fcv_op_observer.h index 920508eef86..0b638e96e77 100644 --- a/src/mongo/db/fcv_op_observer.h +++ b/src/mongo/db/fcv_op_observer.h @@ -179,12 +179,12 @@ public: void onUnpreparedTransactionCommit(OperationContext* opCtx, std::vector<repl::ReplOperation>* statements, size_t numberOfPrePostImagesToWrite) final {} + void onPreparedTransactionCommit( OperationContext* opCtx, OplogSlot commitOplogEntryOpTime, Timestamp commitTimestamp, const std::vector<repl::ReplOperation>& statements) noexcept final{}; - std::unique_ptr<ApplyOpsOplogSlotAndOperationAssignment> preTransactionPrepare( OperationContext* opCtx, const std::vector<OplogSlot>& reservedSlots, @@ -201,8 +201,12 @@ public: const ApplyOpsOplogSlotAndOperationAssignment* applyOpsOperationAssignment, size_t numberOfPrePostImagesToWrite, Date_t wallClockTime) final{}; + void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) final{}; + + void onBatchedWriteCommit(OperationContext* opCtx) final {} + void onMajorityCommitPointUpdate(ServiceContext* service, const repl::OpTime& newCommitPoint) final {} diff --git a/src/mongo/db/free_mon/free_mon_op_observer.h b/src/mongo/db/free_mon/free_mon_op_observer.h index b266555151c..40511b4360c 100644 --- a/src/mongo/db/free_mon/free_mon_op_observer.h +++ b/src/mongo/db/free_mon/free_mon_op_observer.h @@ -208,6 +208,8 @@ public: void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) final {} + void onBatchedWriteCommit(OperationContext* opCtx) final {} + void onMajorityCommitPointUpdate(ServiceContext* service, const repl::OpTime& newCommitPoint) final {} diff --git a/src/mongo/db/op_observer.h b/src/mongo/db/op_observer.h index 6c725dfd21f..a426783b907 100644 --- a/src/mongo/db/op_observer.h +++ b/src/mongo/db/op_observer.h @@ -413,6 +413,8 @@ public: Timestamp commitTimestamp, const std::vector<repl::ReplOperation>& statements) noexcept = 0; + virtual void onBatchedWriteCommit(OperationContext* opCtx) = 0; + /** * Contains "applyOps" oplog entries and oplog slots to be used for writing pre- and post- image * oplog entries for a transaction. "applyOps" entries are not actual "applyOps" entries to be diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp index fac3cd11e1e..4acebc278c6 100644 --- a/src/mongo/db/op_observer_impl.cpp +++ b/src/mongo/db/op_observer_impl.cpp @@ -37,6 +37,7 @@ #include <limits> #include "mongo/bson/bsonobjbuilder.h" +#include "mongo/db/batched_write_context.h" #include "mongo/db/catalog/collection_options.h" #include "mongo/db/catalog/database.h" #include "mongo/db/catalog/database_holder.h" @@ -831,8 +832,15 @@ void OpObserverImpl::onDelete(OperationContext* opCtx, const bool inMultiDocumentTransaction = txnParticipant && opCtx->writesAreReplicated() && txnParticipant.transactionIsOpen(); + auto& batchedWriteContext = BatchedWriteContext::get(opCtx); + const bool inBatchedWrite = batchedWriteContext.writesAreBatched(); + OpTimeBundle opTime; - if (inMultiDocumentTransaction) { + if (inBatchedWrite) { + auto operation = + MutableOplogEntry::makeDeleteOperation(nss, uuid, documentKey.getShardKeyAndId()); + batchedWriteContext.addBatchedOperation(opCtx, operation); + } else if (inMultiDocumentTransaction) { const bool inRetryableInternalTransaction = isInternalSessionForRetryableWrite(*opCtx->getLogicalSessionId()); @@ -1565,9 +1573,11 @@ void packTransactionStatementsForApplyOps( } } -// Logs one applyOps entry and may update the transactions table. Assumes that the given BSON -// builder object already has an 'applyOps' field appended pointing to the desired array of ops -// i.e. { "applyOps" : [op1, op2, ...] } +// Logs one applyOps entry on a prepared transaction, or an unprepared transaction's commit, or on +// committing a WUOW that is not necessarily tied to a multi-document transaction. It may update the +// transactions table on multi-document transactions. Assumes that the given BSON builder object +// already has an 'applyOps' field appended pointing to the desired array of ops i.e. { "applyOps" +// : [op1, op2, ...] } // // @param txnState the 'state' field of the transaction table entry update. @param startOpTime the // optime of the 'startOpTime' field of the transaction table entry update. If boost::none, no @@ -1576,24 +1586,26 @@ void packTransactionStatementsForApplyOps( // updated after the oplog entry is written. // // Returns the optime of the written oplog entry. -OpTimeBundle logApplyOpsForTransaction(OperationContext* opCtx, - MutableOplogEntry* oplogEntry, - boost::optional<DurableTxnStateEnum> txnState, - boost::optional<repl::OpTime> startOpTime, - std::vector<StmtId> stmtIdsWritten, - const bool updateTxnTable) { +OpTimeBundle logApplyOps(OperationContext* opCtx, + MutableOplogEntry* oplogEntry, + boost::optional<DurableTxnStateEnum> txnState, + boost::optional<repl::OpTime> startOpTime, + std::vector<StmtId> stmtIdsWritten, + const bool updateTxnTable) { if (!stmtIdsWritten.empty()) { invariant(isInternalSessionForRetryableWrite(*opCtx->getLogicalSessionId())); } - const auto txnRetryCounter = *opCtx->getTxnRetryCounter(); + const auto txnRetryCounter = opCtx->getTxnRetryCounter(); + + invariant(bool(txnRetryCounter) == bool(TransactionParticipant::get(opCtx))); oplogEntry->setOpType(repl::OpTypeEnum::kCommand); oplogEntry->setNss({"admin", "$cmd"}); oplogEntry->setSessionId(opCtx->getLogicalSessionId()); oplogEntry->setTxnNumber(opCtx->getTxnNumber()); - if (!isDefaultTxnRetryCounter(txnRetryCounter)) { - oplogEntry->getOperationSessionInfo().setTxnRetryCounter(txnRetryCounter); + if (txnRetryCounter && !isDefaultTxnRetryCounter(*txnRetryCounter)) { + oplogEntry->getOperationSessionInfo().setTxnRetryCounter(*txnRetryCounter); } try { @@ -1606,8 +1618,8 @@ OpTimeBundle logApplyOpsForTransaction(OperationContext* opCtx, sessionTxnRecord.setLastWriteDate(times.wallClockTime); sessionTxnRecord.setState(txnState); sessionTxnRecord.setStartOpTime(startOpTime); - if (!isDefaultTxnRetryCounter(txnRetryCounter)) { - sessionTxnRecord.setTxnRetryCounter(txnRetryCounter); + if (txnRetryCounter && !isDefaultTxnRetryCounter(*txnRetryCounter)) { + sessionTxnRecord.setTxnRetryCounter(*txnRetryCounter); } onWriteOpCompleted(opCtx, std::move(stmtIdsWritten), sessionTxnRecord); } @@ -1622,7 +1634,8 @@ OpTimeBundle logApplyOpsForTransaction(OperationContext* opCtx, MONGO_UNREACHABLE; } -// Logs transaction oplog entries for preparing a transaction or committing an unprepared +// Logs applyOps oplog entries for preparing a transaction, committing an unprepared +// transaction, or committing a WUOW that is not necessarily related to a multi-document // transaction. This includes the in-progress 'partialTxn' oplog entries followed by the implicit // prepare or commit entry. If the 'prepare' argument is true, it will log entries for a prepared // transaction. Otherwise, it logs entries for an unprepared transaction. The total number of oplog @@ -1645,7 +1658,7 @@ OpTimeBundle logApplyOpsForTransaction(OperationContext* opCtx, // skipping over some reserved slots. // // The number of oplog entries written is returned. -int logOplogEntriesForTransaction( +int logOplogEntries( OperationContext* opCtx, std::vector<repl::ReplOperation>* stmts, const std::vector<OplogSlot>& oplogSlots, @@ -1669,7 +1682,9 @@ int logOplogEntriesForTransaction( // OplogSlotReserver. invariant(opCtx->lockState()->isWriteLocked()); - prevWriteOpTime.writeOpTime = txnParticipant.getLastWriteOpTime(); + if (txnParticipant) { + prevWriteOpTime.writeOpTime = txnParticipant.getLastWriteOpTime(); + } auto currPrePostImageOplogEntryOplogSlot = applyOpsOperationAssignment.prePostImageOplogEntryOplogSlots.begin(); @@ -1786,9 +1801,9 @@ int logOplogEntriesForTransaction( applyOpsBuilder.append("count", static_cast<long long>(stmts->size())); } - // For both prepared and unprepared transactions, update the transactions table on - // the first and last op. - auto updateTxnTable = firstOp || lastOp; + // For both prepared and unprepared transactions (but not for batched writes) update the + // transactions table on the first and last op. + auto updateTxnTable = txnParticipant && (firstOp || lastOp); // The first optime of the transaction is always the first oplog slot, except in the // case of a single prepare oplog entry. @@ -1802,14 +1817,15 @@ int logOplogEntriesForTransaction( MutableOplogEntry oplogEntry; oplogEntry.setOpTime(applyOpsEntry.oplogSlot); - oplogEntry.setPrevWriteOpTimeInTransaction(prevWriteOpTime.writeOpTime); + if (txnParticipant) { + oplogEntry.setPrevWriteOpTimeInTransaction(prevWriteOpTime.writeOpTime); + } oplogEntry.setWallClockTime(wallClockTime); oplogEntry.setObject(applyOpsBuilder.done()); auto txnState = isPartialTxn ? DurableTxnStateEnum::kInProgress : (implicitPrepare ? DurableTxnStateEnum::kPrepared : DurableTxnStateEnum::kCommitted); - prevWriteOpTime = - logApplyOpsForTransaction(opCtx, + prevWriteOpTime = logApplyOps(opCtx, &oplogEntry, txnState, startOpTime, @@ -1923,15 +1939,14 @@ void OpObserverImpl::onUnpreparedTransactionCommit(OperationContext* opCtx, // Log in-progress entries for the transaction along with the implicit commit. boost::optional<ImageBundle> imageToWrite; - int numOplogEntries = logOplogEntriesForTransaction(opCtx, - statements, - oplogSlots, - applyOpsOplogSlotAndOperationAssignment, - &imageToWrite, - numberOfPrePostImagesToWrite, - false /* prepare*/, - wallClockTime); - + int numOplogEntries = logOplogEntries(opCtx, + statements, + oplogSlots, + applyOpsOplogSlotAndOperationAssignment, + &imageToWrite, + numberOfPrePostImagesToWrite, + false /* prepare*/, + wallClockTime); if (imageToWrite) { writeToImageCollection(opCtx, *opCtx->getLogicalSessionId(), @@ -1945,6 +1960,38 @@ void OpObserverImpl::onUnpreparedTransactionCommit(OperationContext* opCtx, shardObserveTransactionPrepareOrUnpreparedCommit(opCtx, *statements, commitOpTime); } +void OpObserverImpl::onBatchedWriteCommit(OperationContext* opCtx) { + if (repl::ReplicationCoordinator::get(opCtx)->getReplicationMode() != + repl::ReplicationCoordinator::modeReplSet || + !opCtx->writesAreReplicated()) { + return; + } + + auto& batchedWriteContext = BatchedWriteContext::get(opCtx); + auto& batchedOps = batchedWriteContext.getBatchedOperations(opCtx); + + // Reserve all the optimes in advance, so we only need to get the optime mutex once. We + // reserve enough entries for all statements in the transaction. + auto oplogSlots = repl::getNextOpTimes(opCtx, batchedOps.size()); + + auto noPrePostImage = boost::optional<ImageBundle>(boost::none); + + // Serialize batched statements to BSON and determine their assignment to "applyOps" + // entries. + const auto applyOpsOplogSlotAndOperationAssignment = + getApplyOpsOplogSlotAndOperationAssignmentForTransaction( + opCtx, oplogSlots, 0 /*numberOfPrePostImagesToWrite*/, false /*prepare*/, batchedOps); + const auto wallClockTime = getWallClockTimeForOpLog(opCtx); + logOplogEntries(opCtx, + &batchedOps, + oplogSlots, + applyOpsOplogSlotAndOperationAssignment, + &noPrePostImage, + 0 /* numberOfPrePostImagesToWrite */, + false, + wallClockTime); +} + void OpObserverImpl::onPreparedTransactionCommit( OperationContext* opCtx, OplogSlot commitOplogEntryOpTime, @@ -2024,14 +2071,14 @@ void OpObserverImpl::onTransactionPrepare( // the last reserved slot, because the transaction participant has already used // that as the prepare time. boost::optional<ImageBundle> imageToWrite; - logOplogEntriesForTransaction(opCtx, - statements, - reservedSlots, - *applyOpsOperationAssignment, - &imageToWrite, - numberOfPrePostImagesToWrite, - true /* prepare */, - wallClockTime); + logOplogEntries(opCtx, + statements, + reservedSlots, + *applyOpsOperationAssignment, + &imageToWrite, + numberOfPrePostImagesToWrite, + true /* prepare */, + wallClockTime); if (imageToWrite) { writeToImageCollection(opCtx, *opCtx->getLogicalSessionId(), @@ -2054,12 +2101,12 @@ void OpObserverImpl::onTransactionPrepare( oplogEntry.setPrevWriteOpTimeInTransaction(repl::OpTime()); oplogEntry.setObject(applyOpsBuilder.done()); oplogEntry.setWallClockTime(wallClockTime); - logApplyOpsForTransaction(opCtx, - &oplogEntry, - DurableTxnStateEnum::kPrepared, - oplogSlot, - {}, - true /* updateTxnTable */); + logApplyOps(opCtx, + &oplogEntry, + DurableTxnStateEnum::kPrepared, + oplogSlot, + {}, + true /* updateTxnTable */); } wuow.commit(); }); diff --git a/src/mongo/db/op_observer_impl.h b/src/mongo/db/op_observer_impl.h index 7d40fe84307..7843b06602f 100644 --- a/src/mongo/db/op_observer_impl.h +++ b/src/mongo/db/op_observer_impl.h @@ -187,6 +187,7 @@ public: void onUnpreparedTransactionCommit(OperationContext* opCtx, std::vector<repl::ReplOperation>* statements, size_t numberOfPrePostImagesToWrite) final; + void onBatchedWriteCommit(OperationContext* opCtx) final; void onPreparedTransactionCommit( OperationContext* opCtx, OplogSlot commitOplogEntryOpTime, diff --git a/src/mongo/db/op_observer_impl_test.cpp b/src/mongo/db/op_observer_impl_test.cpp index 4b2be83993e..27f497aedea 100644 --- a/src/mongo/db/op_observer_impl_test.cpp +++ b/src/mongo/db/op_observer_impl_test.cpp @@ -31,6 +31,7 @@ #include "mongo/platform/basic.h" +#include "mongo/db/batched_write_context.h" #include "mongo/db/catalog/import_collection_oplog_entry_gen.h" #include "mongo/db/client.h" #include "mongo/db/concurrency/locker_noop.h" @@ -41,6 +42,7 @@ #include "mongo/db/keys_collection_client_sharded.h" #include "mongo/db/keys_collection_manager.h" #include "mongo/db/logical_time_validator.h" +#include "mongo/db/namespace_string.h" #include "mongo/db/op_observer_impl.h" #include "mongo/db/op_observer_registry.h" #include "mongo/db/pipeline/change_stream_preimage_gen.h" @@ -2345,6 +2347,153 @@ struct DeleteTestCase { } }; +class BatchedWriteOutputsTest : public OpObserverTest { +protected: + const NamespaceString _nss{"test", "coll"}; + const UUID _uuid = UUID::gen(); +}; + +DEATH_TEST_REGEX_F(BatchedWriteOutputsTest, + TestCannotGroupInserts, + "Invariant failure.*getOpType.*repl::OpTypeEnum::kDelete") { + auto opCtxRaii = cc().makeOperationContext(); + OperationContext* opCtx = opCtxRaii.get(); + WriteUnitOfWork wuow(opCtx, true /* groupOplogEntries */); + + auto& bwc = BatchedWriteContext::get(opCtx); + bwc.addBatchedOperation(opCtx, + repl::MutableOplogEntry::makeInsertOperation( + _nss, _uuid, BSON("_id" << 0), BSON("_id" << 0))); +} + +DEATH_TEST_REGEX_F(BatchedWriteOutputsTest, + TestDoesNotSupportPreImagesInCollection, + "Invariant " + "failure.*getChangeStreamPreImageRecordingMode.*repl::ReplOperation::" + "ChangeStreamPreImageRecordingMode::kOff") { + auto opCtxRaii = cc().makeOperationContext(); + OperationContext* opCtx = opCtxRaii.get(); + WriteUnitOfWork wuow(opCtx, true /* groupOplogEntries */); + + auto& bwc = BatchedWriteContext::get(opCtx); + auto entry = repl::MutableOplogEntry::makeDeleteOperation(_nss, _uuid, BSON("_id" << 0)); + entry.setChangeStreamPreImageRecordingMode( + repl::ReplOperation::ChangeStreamPreImageRecordingMode::kPreImagesCollection); + bwc.addBatchedOperation(opCtx, entry); +} + +DEATH_TEST_REGEX_F(BatchedWriteOutputsTest, + TestDoesNotSupportPreImagesInOplog, + "Invariant " + "failure.*getChangeStreamPreImageRecordingMode.*repl::ReplOperation::" + "ChangeStreamPreImageRecordingMode::kOff") { + auto opCtxRaii = cc().makeOperationContext(); + OperationContext* opCtx = opCtxRaii.get(); + WriteUnitOfWork wuow(opCtx, true /* groupOplogEntries */); + + auto& bwc = BatchedWriteContext::get(opCtx); + auto entry = repl::MutableOplogEntry::makeDeleteOperation(_nss, _uuid, BSON("_id" << 0)); + entry.setChangeStreamPreImageRecordingMode( + repl::ReplOperation::ChangeStreamPreImageRecordingMode::kOplog); + bwc.addBatchedOperation(opCtx, entry); +} + +DEATH_TEST_REGEX_F(BatchedWriteOutputsTest, + TestDoesNotSupportMultiDocTxn, + "Invariant failure.*!opCtx->inMultiDocumentTransaction()") { + auto opCtxRaii = cc().makeOperationContext(); + OperationContext* opCtx = opCtxRaii.get(); + opCtx->setInMultiDocumentTransaction(); + WriteUnitOfWork wuow(opCtx, true /* groupOplogEntries */); + + auto& bwc = BatchedWriteContext::get(opCtx); + auto entry = repl::MutableOplogEntry::makeDeleteOperation(_nss, _uuid, BSON("_id" << 0)); + bwc.addBatchedOperation(opCtx, entry); +} + +DEATH_TEST_REGEX_F(BatchedWriteOutputsTest, + TestDoesNotSupportRetryableWrites, + "Invariant failure.*!opCtx->getTxnNumber()") { + auto opCtxRaii = cc().makeOperationContext(); + OperationContext* opCtx = opCtxRaii.get(); + opCtx->setLogicalSessionId(LogicalSessionId(makeLogicalSessionIdForTest())); + opCtx->setTxnNumber(TxnNumber{1}); + WriteUnitOfWork wuow(opCtx, true /* groupOplogEntries */); + + auto& bwc = BatchedWriteContext::get(opCtx); + auto entry = repl::MutableOplogEntry::makeDeleteOperation(_nss, _uuid, BSON("_id" << 0)); + bwc.addBatchedOperation(opCtx, entry); +} + +// Verifies that a WriteUnitOfWork with groupOplogEntries=true replicates its writes as a single +// applyOps. Tests WUOWs batching a range of 1 to 5 deletes (inclusive). +TEST_F(BatchedWriteOutputsTest, TestApplyOpsGrouping) { + const auto nDocsToDelete = 5; + const BSONObj docsToDelete[nDocsToDelete] = { + BSON("_id" << 0), + BSON("_id" << 1), + BSON("_id" << 2), + BSON("_id" << 3), + BSON("_id" << 4), + }; + + // Setup. + auto opCtxRaii = cc().makeOperationContext(); + OperationContext* opCtx = opCtxRaii.get(); + reset(opCtx, NamespaceString::kRsOplogNamespace); + auto opObserverRegistry = std::make_unique<OpObserverRegistry>(); + opObserverRegistry->addObserver(std::make_unique<OpObserverImpl>()); + opCtx->getServiceContext()->setOpObserver(std::move(opObserverRegistry)); + + // Run the test with WUOW's grouping 1 to 5 deletions. + for (size_t docsToBeBatched = 1; docsToBeBatched <= nDocsToDelete; docsToBeBatched++) { + + // Start a WUOW with groupOplogEntries=true. Verify that initialises the + // BatchedWriteContext. + auto& bwc = BatchedWriteContext::get(opCtx); + ASSERT(!bwc.writesAreBatched()); + WriteUnitOfWork wuow(opCtx, true /* groupOplogEntries */); + ASSERT(bwc.writesAreBatched()); + + AutoGetCollection locks(opCtx, _nss, LockMode::MODE_IX); + + for (size_t doc = 0; doc < docsToBeBatched; doc++) { + // This test does not call `OpObserver::aboutToDelete`. That method has the side-effect + // of setting of `documentKey` on the delete for sharding purposes. + // `OpObserverImpl::onDelete` asserts its existence. + documentKeyDecoration(opCtx).emplace(docsToDelete[doc]["_id"].wrap(), boost::none); + const OplogDeleteEntryArgs args; + opCtx->getServiceContext()->getOpObserver()->onDelete( + opCtx, _nss, _uuid, kUninitializedStmtId, args); + } + + wuow.commit(); + + // Retrieve the oplog entries. We expect 'docsToBeBatched' oplog entries because of previous + // iteration of this loop that exercised previous batch sizes. + std::vector<BSONObj> oplogs = getNOplogEntries(opCtx, docsToBeBatched); + // Entries in ascending timestamp order, so fetch the last one at the back of the vector. + auto lastOplogEntry = oplogs.back(); + auto lastOplogEntryParsed = assertGet(OplogEntry::parse(oplogs.back())); + + // The batch consists of an applyOps, whose array contains all deletes issued within the + // WUOW. + ASSERT(lastOplogEntryParsed.getCommandType() == OplogEntry::CommandType::kApplyOps); + std::vector<repl::OplogEntry> innerEntries; + repl::ApplyOps::extractOperationsTo( + lastOplogEntryParsed, lastOplogEntryParsed.getEntry().toBSON(), &innerEntries); + ASSERT_EQ(innerEntries.size(), docsToBeBatched); + + for (size_t opIdx = 0; opIdx < docsToBeBatched; opIdx++) { + const auto innerEntry = innerEntries[opIdx]; + ASSERT(innerEntry.getCommandType() == OplogEntry::CommandType::kNotCommand); + ASSERT(innerEntry.getOpType() == repl::OpTypeEnum::kDelete); + ASSERT(innerEntry.getNss() == NamespaceString("test.coll")); + ASSERT(0 == innerEntry.getObject().woCompare(docsToDelete[opIdx])); + } + } +} + class OnDeleteOutputsTest : public OpObserverTest { protected: diff --git a/src/mongo/db/op_observer_noop.h b/src/mongo/db/op_observer_noop.h index ab610264ce1..bee9db82207 100644 --- a/src/mongo/db/op_observer_noop.h +++ b/src/mongo/db/op_observer_noop.h @@ -161,6 +161,7 @@ public: void onUnpreparedTransactionCommit(OperationContext* opCtx, std::vector<repl::ReplOperation>* statements, size_t numberOfPrePostImagesToWrite) override {} + void onBatchedWriteCommit(OperationContext* opCtx) final {} void onPreparedTransactionCommit( OperationContext* opCtx, OplogSlot commitOplogEntryOpTime, diff --git a/src/mongo/db/op_observer_registry.h b/src/mongo/db/op_observer_registry.h index 5186d6a3f47..e153d46d65e 100644 --- a/src/mongo/db/op_observer_registry.h +++ b/src/mongo/db/op_observer_registry.h @@ -430,6 +430,13 @@ public: o->onTransactionAbort(opCtx, abortOplogEntryOpTime); } + void onBatchedWriteCommit(OperationContext* opCtx) override { + ReservedTimes times{opCtx}; + for (auto& o : _observers) { + o->onBatchedWriteCommit(opCtx); + } + } + void onMajorityCommitPointUpdate(ServiceContext* service, const repl::OpTime& newCommitPoint) override { for (auto& o : _observers) diff --git a/src/mongo/db/query/get_executor.cpp b/src/mongo/db/query/get_executor.cpp index 8f526c82ade..0c12807f3e2 100644 --- a/src/mongo/db/query/get_executor.cpp +++ b/src/mongo/db/query/get_executor.cpp @@ -1647,7 +1647,8 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorDele deleteStageParams->canonicalQuery = cq.get(); if (MONGO_unlikely(gInternalBatchUserMultiDeletesForTest.load() && - nss.ns() == "__internalBatchedDeletesTesting.Collection0")) { + nss.ns() == "__internalBatchedDeletesTesting.Collection0" && + deleteStageParams->isMulti)) { root = std::make_unique<BatchedDeleteStage>(cq->getExpCtxRaw(), std::move(deleteStageParams), diff --git a/src/mongo/db/repl/primary_only_service_op_observer.h b/src/mongo/db/repl/primary_only_service_op_observer.h index a81b66e04cd..63a93f69bb9 100644 --- a/src/mongo/db/repl/primary_only_service_op_observer.h +++ b/src/mongo/db/repl/primary_only_service_op_observer.h @@ -210,6 +210,8 @@ public: void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) final {} + void onBatchedWriteCommit(OperationContext* opCtx) final {} + void onMajorityCommitPointUpdate(ServiceContext* service, const repl::OpTime& newCommitPoint) final {} diff --git a/src/mongo/db/repl/tenant_migration_donor_op_observer.h b/src/mongo/db/repl/tenant_migration_donor_op_observer.h index 6ab805ee967..2f977b38c52 100644 --- a/src/mongo/db/repl/tenant_migration_donor_op_observer.h +++ b/src/mongo/db/repl/tenant_migration_donor_op_observer.h @@ -207,6 +207,8 @@ public: void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) final {} + void onBatchedWriteCommit(OperationContext* opCtx) final {} + void onMajorityCommitPointUpdate(ServiceContext* service, const repl::OpTime& newCommitPoint) final; diff --git a/src/mongo/db/repl/tenant_migration_recipient_op_observer.h b/src/mongo/db/repl/tenant_migration_recipient_op_observer.h index b7b0a88ca96..3a6d9e95922 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_op_observer.h +++ b/src/mongo/db/repl/tenant_migration_recipient_op_observer.h @@ -209,6 +209,8 @@ public: void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) final {} + void onBatchedWriteCommit(OperationContext* opCtx) final {} + void onMajorityCommitPointUpdate(ServiceContext* service, const repl::OpTime& newCommitPoint) final {} diff --git a/src/mongo/db/s/config_server_op_observer.h b/src/mongo/db/s/config_server_op_observer.h index 776ce97af3d..ddb36cfbd4c 100644 --- a/src/mongo/db/s/config_server_op_observer.h +++ b/src/mongo/db/s/config_server_op_observer.h @@ -210,6 +210,8 @@ public: void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) override {} + void onBatchedWriteCommit(OperationContext* opCtx) final {} + void onMajorityCommitPointUpdate(ServiceContext* service, const repl::OpTime& newCommitPoint) override; diff --git a/src/mongo/db/s/resharding/resharding_op_observer.h b/src/mongo/db/s/resharding/resharding_op_observer.h index be4dfc18505..35630ed875a 100644 --- a/src/mongo/db/s/resharding/resharding_op_observer.h +++ b/src/mongo/db/s/resharding/resharding_op_observer.h @@ -230,6 +230,8 @@ public: void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) override {} + void onBatchedWriteCommit(OperationContext* opCtx) final {} + void onMajorityCommitPointUpdate(ServiceContext* service, const repl::OpTime& newCommitPoint) override {} diff --git a/src/mongo/db/s/shard_server_op_observer.h b/src/mongo/db/s/shard_server_op_observer.h index 684ef8153bf..82449c72136 100644 --- a/src/mongo/db/s/shard_server_op_observer.h +++ b/src/mongo/db/s/shard_server_op_observer.h @@ -209,6 +209,8 @@ public: void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) override {} + void onBatchedWriteCommit(OperationContext* opCtx) final {} + void onMajorityCommitPointUpdate(ServiceContext* service, const repl::OpTime& newCommitPoint) override {} diff --git a/src/mongo/db/serverless/shard_split_donor_op_observer.h b/src/mongo/db/serverless/shard_split_donor_op_observer.h index 27b05527cce..f55a526e389 100644 --- a/src/mongo/db/serverless/shard_split_donor_op_observer.h +++ b/src/mongo/db/serverless/shard_split_donor_op_observer.h @@ -206,6 +206,8 @@ public: void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) final {} + void onBatchedWriteCommit(OperationContext* opCtx) final {} + void onMajorityCommitPointUpdate(ServiceContext* service, const repl::OpTime& newCommitPoint) final; diff --git a/src/mongo/db/storage/SConscript b/src/mongo/db/storage/SConscript index 22c96f38cba..b216260dde0 100644 --- a/src/mongo/db/storage/SConscript +++ b/src/mongo/db/storage/SConscript @@ -370,6 +370,7 @@ env.Library( ], LIBDEPS_PRIVATE=[ "$BUILD_DIR/mongo/base", + '$BUILD_DIR/mongo/db/batched_write_context', "$BUILD_DIR/mongo/db/storage/storage_options", '$BUILD_DIR/mongo/util/fail_point', 'recovery_unit_base', diff --git a/src/mongo/db/storage/write_unit_of_work.cpp b/src/mongo/db/storage/write_unit_of_work.cpp index 157a0b50b2a..29b02e560b9 100644 --- a/src/mongo/db/storage/write_unit_of_work.cpp +++ b/src/mongo/db/storage/write_unit_of_work.cpp @@ -33,7 +33,9 @@ #include "mongo/db/storage/write_unit_of_work.h" +#include "mongo/db/batched_write_context.h" #include "mongo/db/catalog/uncommitted_collections.h" +#include "mongo/db/op_observer.h" #include "mongo/db/operation_context.h" #include "mongo/logv2/log.h" #include "mongo/util/fail_point.h" @@ -43,11 +45,21 @@ namespace mongo { MONGO_FAIL_POINT_DEFINE(sleepBeforeCommit); -WriteUnitOfWork::WriteUnitOfWork(OperationContext* opCtx) - : _opCtx(opCtx), _toplevel(opCtx->_ruState == RecoveryUnitState::kNotInUnitOfWork) { +WriteUnitOfWork::WriteUnitOfWork(OperationContext* opCtx, bool groupOplogEntries) + : _opCtx(opCtx), + _toplevel(opCtx->_ruState == RecoveryUnitState::kNotInUnitOfWork), + _groupOplogEntries(groupOplogEntries) { uassert(ErrorCodes::IllegalOperation, "Cannot execute a write operation in read-only mode", !storageGlobalParams.readOnly); + // Grouping oplog entries doesn't support WUOW nesting (e.g. multi-doc transactions). + invariant(_toplevel || !_groupOplogEntries); + + if (_groupOplogEntries) { + auto& batchedWriteContext = BatchedWriteContext::get(_opCtx); + batchedWriteContext.setWritesAreBatched(true); + } + _opCtx->lockState()->beginWriteUnitOfWork(); if (_toplevel) { _opCtx->recoveryUnit()->beginUnitOfWork(_opCtx); @@ -70,6 +82,12 @@ WriteUnitOfWork::~WriteUnitOfWork() { } _opCtx->lockState()->endWriteUnitOfWork(); } + + if (_groupOplogEntries) { + auto& batchedWriteContext = BatchedWriteContext::get(_opCtx); + batchedWriteContext.clearBatchedOperations(_opCtx); + batchedWriteContext.setWritesAreBatched(false); + } } std::unique_ptr<WriteUnitOfWork> WriteUnitOfWork::createForSnapshotResume( @@ -107,6 +125,12 @@ void WriteUnitOfWork::commit() { invariant(!_committed); invariant(!_released); invariant(_opCtx->_ruState == RecoveryUnitState::kActiveUnitOfWork); + + if (_groupOplogEntries) { + const auto opObserver = _opCtx->getServiceContext()->getOpObserver(); + invariant(opObserver); + opObserver->onBatchedWriteCommit(_opCtx); + } if (_toplevel) { if (MONGO_unlikely(sleepBeforeCommit.shouldFail())) { sleepFor(Milliseconds(100)); diff --git a/src/mongo/db/storage/write_unit_of_work.h b/src/mongo/db/storage/write_unit_of_work.h index 3d8130c290e..3bb8e461563 100644 --- a/src/mongo/db/storage/write_unit_of_work.h +++ b/src/mongo/db/storage/write_unit_of_work.h @@ -59,7 +59,7 @@ public: kFailedUnitOfWork // in a unit of work that has failed and must be aborted }; - WriteUnitOfWork(OperationContext* opCtx); + WriteUnitOfWork(OperationContext* opCtx, bool groupOplogEntries = false); ~WriteUnitOfWork(); @@ -101,6 +101,7 @@ private: OperationContext* _opCtx; bool _toplevel; + bool _groupOplogEntries; bool _committed = false; bool _prepared = false; diff --git a/src/mongo/db/user_write_block_mode_op_observer.h b/src/mongo/db/user_write_block_mode_op_observer.h index adef807ed97..a3cfd78c518 100644 --- a/src/mongo/db/user_write_block_mode_op_observer.h +++ b/src/mongo/db/user_write_block_mode_op_observer.h @@ -217,6 +217,8 @@ public: void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) final {} + void onBatchedWriteCommit(OperationContext* opCtx) final {} + void onMajorityCommitPointUpdate(ServiceContext* service, const repl::OpTime& newCommitPoint) final {} diff --git a/src/mongo/idl/cluster_server_parameter_op_observer.h b/src/mongo/idl/cluster_server_parameter_op_observer.h index 932e99f44a3..b2c87f83fb2 100644 --- a/src/mongo/idl/cluster_server_parameter_op_observer.h +++ b/src/mongo/idl/cluster_server_parameter_op_observer.h @@ -195,6 +195,8 @@ public: std::vector<repl::ReplOperation>* statements, size_t numberOfPrePostImagesToWrite) final {} + void onBatchedWriteCommit(OperationContext* opCtx) final {} + void onPreparedTransactionCommit( OperationContext* opCtx, OplogSlot commitOplogEntryOpTime, |