summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJosef Ahmad <josef.ahmad@mongodb.com>2022-03-24 16:07:41 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-03-24 18:58:47 +0000
commit16227df086c840630ded6e4304fd25c04b7fac6b (patch)
treed5220bc4c091773de111ed8bf97178fa857640d9
parent7671b3cca3c5fb1024b8affcafe92c9dfed37280 (diff)
downloadmongo-16227df086c840630ded6e4304fd25c04b7fac6b.tar.gz
SERVER-63047 Make delete batches fully transactional
-rwxr-xr-xbuildscripts/resmokeconfig/suites/cst_jscore_passthrough.yml1
-rw-r--r--buildscripts/resmokelib/testing/fixtures/standalone.py5
-rw-r--r--jstests/core/batched_multi_deletes.js60
-rw-r--r--jstests/noPassthrough/batched_multi_deletes.js121
-rw-r--r--jstests/noPassthrough/batched_multi_deletes_WC.js4
-rw-r--r--jstests/noPassthrough/batched_multi_deletes_oplog.js118
-rw-r--r--src/mongo/db/SConscript11
-rw-r--r--src/mongo/db/auth/auth_op_observer.h2
-rw-r--r--src/mongo/db/batched_write_context.cpp73
-rw-r--r--src/mongo/db/batched_write_context.h82
-rw-r--r--src/mongo/db/exec/batched_delete_stage.cpp9
-rw-r--r--src/mongo/db/exec/batched_delete_stage.idl2
-rw-r--r--src/mongo/db/fcv_op_observer.h6
-rw-r--r--src/mongo/db/free_mon/free_mon_op_observer.h2
-rw-r--r--src/mongo/db/op_observer.h2
-rw-r--r--src/mongo/db/op_observer_impl.cpp141
-rw-r--r--src/mongo/db/op_observer_impl.h1
-rw-r--r--src/mongo/db/op_observer_impl_test.cpp149
-rw-r--r--src/mongo/db/op_observer_noop.h1
-rw-r--r--src/mongo/db/op_observer_registry.h7
-rw-r--r--src/mongo/db/query/get_executor.cpp3
-rw-r--r--src/mongo/db/repl/primary_only_service_op_observer.h2
-rw-r--r--src/mongo/db/repl/tenant_migration_donor_op_observer.h2
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_op_observer.h2
-rw-r--r--src/mongo/db/s/config_server_op_observer.h2
-rw-r--r--src/mongo/db/s/resharding/resharding_op_observer.h2
-rw-r--r--src/mongo/db/s/shard_server_op_observer.h2
-rw-r--r--src/mongo/db/serverless/shard_split_donor_op_observer.h2
-rw-r--r--src/mongo/db/storage/SConscript1
-rw-r--r--src/mongo/db/storage/write_unit_of_work.cpp28
-rw-r--r--src/mongo/db/storage/write_unit_of_work.h3
-rw-r--r--src/mongo/db/user_write_block_mode_op_observer.h2
-rw-r--r--src/mongo/idl/cluster_server_parameter_op_observer.h2
33 files changed, 740 insertions, 110 deletions
diff --git a/buildscripts/resmokeconfig/suites/cst_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/cst_jscore_passthrough.yml
index 66614e15afb..ac000fef42d 100755
--- a/buildscripts/resmokeconfig/suites/cst_jscore_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/cst_jscore_passthrough.yml
@@ -73,6 +73,7 @@ selector:
- jstests/core/bench_test2.js
- jstests/core/bindata_eq.js
- jstests/core/bindata_indexonly.js
+ - jstests/core/batched_multi_deletes.js
- jstests/core/capped_update.js
- jstests/core/collation.js
- jstests/core/collation_find_and_modify.js
diff --git a/buildscripts/resmokelib/testing/fixtures/standalone.py b/buildscripts/resmokelib/testing/fixtures/standalone.py
index b6375141305..394c025d510 100644
--- a/buildscripts/resmokelib/testing/fixtures/standalone.py
+++ b/buildscripts/resmokelib/testing/fixtures/standalone.py
@@ -345,6 +345,11 @@ class MongodLauncher(object):
if "replSet" in mongod_options and "configsvr" in mongod_options:
mongod_options["storageEngine"] = "wiredTiger"
+ # TODO (SERVER-63044): Remove the block below.
+ if executable.endswith("mongod"):
+ if "internalBatchUserMultiDeletesForTest" not in suite_set_parameters:
+ suite_set_parameters["internalBatchUserMultiDeletesForTest"] = 1
+
return self.fixturelib.mongod_program(logger, job_num, executable, process_kwargs,
mongod_options)
diff --git a/jstests/core/batched_multi_deletes.js b/jstests/core/batched_multi_deletes.js
new file mode 100644
index 00000000000..894c68e8a59
--- /dev/null
+++ b/jstests/core/batched_multi_deletes.js
@@ -0,0 +1,60 @@
+/**
+ * Tests batch-deleting a large range of data.
+ *
+ * @tags: [
+ * does_not_support_retryable_writes,
+ * # TODO (SERVER-55909): make WUOW 'groupOplogEntries' the only mode of operation.
+ * does_not_support_transactions,
+ * multiversion_incompatible,
+ * requires_fcv_60,
+ * requires_getmore,
+ * requires_non_retryable_writes,
+ * # TODO (SERVER-63044): namespace for this test is hardcoded, tenant migrations rename it.
+ * tenant_migration_incompatible,
+ * ]
+ */
+
+(function() {
+"use strict";
+
+function populateAndMassDelete(queryPredicate) {
+ // '__internalBatchedDeletesTesting.Collection0' is a special, hardcoded namespace that batches
+ // multi-doc deletes if the 'internalBatchUserMultiDeletesForTest' server parameter is set.
+ // TODO (SERVER-63044): remove this special handling.
+ const testDB = db.getSiblingDB('__internalBatchedDeletesTesting');
+ const coll = testDB['Collection0'];
+
+ const collCount =
+ 54321; // Intentionally not a multiple of BatchedDeleteStageBatchParams::targetBatchDocs.
+
+ coll.drop();
+ assert.commandWorked(coll.insertMany([...Array(collCount).keys()].map(x => ({_id: x, a: x}))));
+
+ assert.eq(collCount, coll.find().itcount());
+
+ // Verify the delete will involve the BATCHED_DELETE stage.
+ const expl = testDB.runCommand({
+ explain: {delete: coll.getName(), deletes: [{q: {_id: {$gte: 0}}, limit: 0}]},
+ verbosity: "executionStats"
+ });
+ assert.commandWorked(expl);
+
+ if (expl["queryPlanner"]["winningPlan"]["stage"] === "SHARD_WRITE") {
+ // This is a sharded cluster. Verify all shards execute the BATCHED_DELETE stage.
+ for (let shard of expl["queryPlanner"]["winningPlan"]["shards"]) {
+ assert.eq(shard["winningPlan"]["stage"], "BATCHED_DELETE");
+ }
+ } else {
+ // Non-sharded
+ assert.eq(expl["queryPlanner"]["winningPlan"]["stage"], "BATCHED_DELETE");
+ }
+
+ // Execute and verify the deletion.
+ assert.eq(collCount, coll.find().itcount());
+ assert.commandWorked(coll.deleteMany(queryPredicate));
+ assert.eq(0, coll.find().itcount());
+}
+
+populateAndMassDelete({_id: {$gte: 0}});
+populateAndMassDelete({a: {$gte: 0}});
+})();
diff --git a/jstests/noPassthrough/batched_multi_deletes.js b/jstests/noPassthrough/batched_multi_deletes.js
index 8577c537140..98aedceddb8 100644
--- a/jstests/noPassthrough/batched_multi_deletes.js
+++ b/jstests/noPassthrough/batched_multi_deletes.js
@@ -2,6 +2,8 @@
* Validate basic batched multi-deletion functionality.
*
* @tags: [
+ * # Running as a replica set requires journaling.
+ * requires_journaling,
* ]
*/
@@ -9,70 +11,87 @@
"use strict";
load("jstests/libs/analyze_plan.js");
-const conn = MongoRunner.runMongod();
+function validateBatchedDeletes(conn) {
+ // '__internalBatchedDeletesTesting.Collection0' is a special, hardcoded namespace that batches
+ // multi-doc deletes if the 'internalBatchUserMultiDeletesForTest' server parameter is set.
+ // TODO (SERVER-63044): remove this special handling.
+ const db = conn.getDB("__internalBatchedDeletesTesting");
+ const coll = db.getCollection('Collection0');
+ const collName = coll.getName();
-const db = conn.getDB("__internalBatchedDeletesTesting");
-const coll = db.getCollection('Collection0');
-const collName = coll.getName();
-const ns = coll.getFullName();
+ const docsPerBatchDefault = 100; // BatchedDeleteStageBatchParams::targetBatchDocs
+ const collCount =
+ 5017; // Intentionally not a multiple of BatchedDeleteStageBatchParams::targetBatchDocs.
-const docsPerBatchDefault = 100; // BatchedDeleteStageBatchParams::targetBatchDocs
-const collCount =
- 5017; // Intentionally not a multiple of BatchedDeleteStageBatchParams::targetBatchDocs.
+ function validateDeletion(db, coll, docsPerBatch) {
+ coll.drop();
+ assert.commandWorked(coll.insertMany(
+ [...Array(collCount).keys()].map(x => ({_id: x, a: "a".repeat(1024)}))));
+
+ const serverStatusBatchesBefore = db.serverStatus()['batchedDeletes']['batches'];
+ const serverStatusDocsBefore = db.serverStatus()['batchedDeletes']['docs'];
+
+ assert.eq(collCount, coll.find().itcount());
+ assert.commandWorked(coll.deleteMany({_id: {$gte: 0}}));
+ assert.eq(0, coll.find().itcount());
+
+ const serverStatusBatchesAfter = db.serverStatus()['batchedDeletes']['batches'];
+ const serverStatusDocsAfter = db.serverStatus()['batchedDeletes']['docs'];
+ const serverStatusBatchesExpected =
+ serverStatusBatchesBefore + Math.ceil(collCount / docsPerBatch);
+ const serverStatusDocsExpected = serverStatusDocsBefore + collCount;
+ assert.eq(serverStatusBatchesAfter, serverStatusBatchesExpected);
+ assert.eq(serverStatusDocsAfter, serverStatusDocsExpected);
+ }
-function validateDeletion(db, coll, docsPerBatch) {
coll.drop();
assert.commandWorked(
coll.insertMany([...Array(collCount).keys()].map(x => ({_id: x, a: "a".repeat(1024)}))));
- const serverStatusBatchesBefore = db.serverStatus()['batchedDeletes']['batches'];
- const serverStatusDocsBefore = db.serverStatus()['batchedDeletes']['docs'];
-
- assert.eq(collCount, coll.find().itcount());
- assert.commandWorked(coll.deleteMany({_id: {$gte: 0}}));
- assert.eq(0, coll.find().itcount());
-
- const serverStatusBatchesAfter = db.serverStatus()['batchedDeletes']['batches'];
- const serverStatusDocsAfter = db.serverStatus()['batchedDeletes']['docs'];
- const serverStatusBatchesExpected =
- serverStatusBatchesBefore + Math.ceil(collCount / docsPerBatch);
- const serverStatusDocsExpected = serverStatusDocsBefore + collCount;
- assert.eq(serverStatusBatchesAfter, serverStatusBatchesExpected);
- assert.eq(serverStatusDocsAfter, serverStatusDocsExpected);
-}
+ assert.commandWorked(
+ db.adminCommand({setParameter: 1, internalBatchUserMultiDeletesForTest: 1}));
-coll.drop();
-assert.commandWorked(
- coll.insertMany([...Array(collCount).keys()].map(x => ({_id: x, a: "a".repeat(1024)}))));
+ // Explain plan and executionStats.
+ {
+ const expl = db.runCommand({
+ explain: {delete: collName, deletes: [{q: {_id: {$gte: 0}}, limit: 0}]},
+ verbosity: "executionStats"
+ });
+ assert.commandWorked(expl);
-assert.commandWorked(db.adminCommand({setParameter: 1, internalBatchUserMultiDeletesForTest: 1}));
+ assert(getPlanStage(expl, "BATCHED_DELETE"));
+ assert.eq(0, expl.executionStats.nReturned);
+ assert.eq(collCount, expl.executionStats.totalDocsExamined);
+ assert.eq(collCount, expl.executionStats.totalKeysExamined);
+ assert.eq(0, expl.executionStats.executionStages.nReturned);
+ assert.eq(1, expl.executionStats.executionStages.isEOF);
+ }
-// Batched multi-deletion is only available for multi:true deletes.
-assert.commandFailedWithCode(
- db.runCommand({delete: collName, deletes: [{q: {_id: {$gte: 0}}, limit: 1}]}), 6303800);
+ // Actual deletion.
+ for (const docsPerBatch of [10, docsPerBatchDefault]) {
+ assert.commandWorked(
+ db.adminCommand({setParameter: 1, batchedDeletesTargetBatchDocs: docsPerBatch}));
+ validateDeletion(db, coll, docsPerBatch);
+ }
+}
-// Explain plan and executionStats.
+// Standalone
{
- const expl = db.runCommand({
- explain: {delete: collName, deletes: [{q: {_id: {$gte: 0}}, limit: 0}]},
- verbosity: "executionStats"
- });
- assert.commandWorked(expl);
-
- assert(getPlanStage(expl, "BATCHED_DELETE"));
- assert.eq(0, expl.executionStats.nReturned);
- assert.eq(collCount, expl.executionStats.totalDocsExamined);
- assert.eq(collCount, expl.executionStats.totalKeysExamined);
- assert.eq(0, expl.executionStats.executionStages.nReturned);
- assert.eq(1, expl.executionStats.executionStages.isEOF);
+ const conn = MongoRunner.runMongod();
+ validateBatchedDeletes(conn);
+ MongoRunner.stopMongod(conn);
}
-// Actual deletion.
-for (const docsPerBatch of [10, docsPerBatchDefault]) {
- assert.commandWorked(
- db.adminCommand({setParameter: 1, batchedDeletesTargetBatchDocs: docsPerBatch}));
- validateDeletion(db, coll, docsPerBatch);
+// Replica set
+{
+ const rst = new ReplSetTest({
+ name: "batched_multi_deletes_test",
+ nodes: 2,
+ });
+ rst.startSet();
+ rst.initiate();
+ rst.awaitNodesAgreeOnPrimary();
+ validateBatchedDeletes(rst.getPrimary());
+ rst.stopSet();
}
-
-MongoRunner.stopMongod(conn);
})();
diff --git a/jstests/noPassthrough/batched_multi_deletes_WC.js b/jstests/noPassthrough/batched_multi_deletes_WC.js
index 911b22da1d2..a59d0549101 100644
--- a/jstests/noPassthrough/batched_multi_deletes_WC.js
+++ b/jstests/noPassthrough/batched_multi_deletes_WC.js
@@ -15,7 +15,9 @@ load("jstests/libs/fail_point_util.js"); // For 'configureFailPoint()'
const conn = MongoRunner.runMongod();
-// This specific namespace is required to activate batched multi-delete behavior.
+// '__internalBatchedDeletesTesting.Collection0' is a special, hardcoded namespace that batches
+// multi-doc deletes if the 'internalBatchUserMultiDeletesForTest' server parameter is set.
+// TODO (SERVER-63044): remove this special handling.
const testDB = conn.getDB("__internalBatchedDeletesTesting");
const coll = testDB.getCollection("Collection0");
const collName = coll.getName();
diff --git a/jstests/noPassthrough/batched_multi_deletes_oplog.js b/jstests/noPassthrough/batched_multi_deletes_oplog.js
new file mode 100644
index 00000000000..b27a1e6d6e8
--- /dev/null
+++ b/jstests/noPassthrough/batched_multi_deletes_oplog.js
@@ -0,0 +1,118 @@
+/**
+ * Validate oplog behaviour of batched multi-deletes.
+ *
+ * @tags: [
+ * # Running as a replica set requires journaling.
+ * requires_journaling,
+ * ]
+ */
+
+(function() {
+"use strict";
+
+// Verifies that batches replicate as applyOps entries.
+function validateBatchedDeletesOplogDocsPerBatch(conn) {
+ // '__internalBatchedDeletesTesting.Collection0' is a special, hardcoded namespace that batches
+ // multi-doc deletes if the 'internalBatchUserMultiDeletesForTest' server parameter is set.
+ // TODO (SERVER-63044): remove this special handling.
+ const db = conn.getDB("__internalBatchedDeletesTesting");
+ const coll = db.getCollection('Collection0');
+
+ const docsPerBatch = 100;
+ const collCount = 5017; // Intentionally not a multiple of docsPerBatch.
+
+ assert.commandWorked(
+ db.adminCommand({setParameter: 1, internalBatchUserMultiDeletesForTest: 1}));
+ // Disable time-based batching
+ assert.commandWorked(db.adminCommand({setParameter: 1, batchedDeletesTargetBatchBytes: 0}));
+ // Disable size-based batching
+ assert.commandWorked(db.adminCommand({setParameter: 1, batchedDeletesTargetBatchTimeMS: 0}));
+ // Set docs per batch target
+ assert.commandWorked(
+ db.adminCommand({setParameter: 1, batchedDeletesTargetBatchDocs: docsPerBatch}));
+
+ coll.drop();
+ assert.commandWorked(
+ coll.insertMany([...Array(collCount).keys()].map(x => ({_id: x, a: "a".repeat(1024)}))));
+
+ assert.eq(collCount, coll.find().itcount());
+ assert.commandWorked(coll.deleteMany({_id: {$gte: 0}}));
+ assert.eq(0, coll.find().itcount());
+
+ // The deletion replicates as one applyOps per batch
+ const applyOpsEntriesFull =
+ db.getSiblingDB('local').oplog.rs.find({'o.applyOps': {$size: docsPerBatch}}).itcount();
+ const applyOpsEntriesLast =
+ db.getSiblingDB('local')
+ .oplog.rs.find({'o.applyOps': {$size: collCount % docsPerBatch}})
+ .itcount();
+ const expectedApplyOpsEntries = Math.ceil(collCount / docsPerBatch);
+ assert.eq(applyOpsEntriesFull + applyOpsEntriesLast, expectedApplyOpsEntries);
+}
+
+// Verifies that a large batch that would result in an applyOps entry beyond the 16MB BSON limit
+// generates more than one applyOps entry.
+function validateBatchedDeletesOplogBatchAbove16MB(conn) {
+ const db = conn.getDB("__internalBatchedDeletesTesting");
+ const coll = db.getCollection('Collection0');
+
+ // With _id's of ObjectId type, and applyOps entry reaches the 16MB BSON limit at ~140k entries.
+ // Create a collection >> 140k documents, make the docsPerBatch target high enough to hit the
+ // 16MB BSON limit with applyOps, and disable other batch tunables that could cut a batch
+ // earlier than expected.
+ const docsPerBatch = 500000;
+ const collCount = 200000;
+ assert.commandWorked(
+ db.adminCommand({setParameter: 1, internalBatchUserMultiDeletesForTest: 1}));
+ // Disable time-based batching
+ assert.commandWorked(db.adminCommand({setParameter: 1, batchedDeletesTargetBatchBytes: 0}));
+ // Disable size-based batching
+ assert.commandWorked(db.adminCommand({setParameter: 1, batchedDeletesTargetBatchTimeMS: 0}));
+ // Set artificially high docs per batch target
+ assert.commandWorked(
+ db.adminCommand({setParameter: 1, batchedDeletesTargetBatchDocs: docsPerBatch}));
+
+ coll.drop();
+ assert.commandWorked(coll.insertMany([...Array(collCount).keys()].map(x => ({_id: x}))));
+
+ assert.eq(collCount, coll.find().itcount());
+ assert.commandWorked(coll.deleteMany({_id: {$gte: 0}}));
+ assert.eq(0, coll.find().itcount());
+
+ // The whole deletion replicates as two applyOps.
+ const oplogEntriesApplyOps = db.getSiblingDB('local')
+ .oplog.rs
+ .aggregate([
+ {
+ $match: {
+ ns: "admin.$cmd",
+ 'o.applyOps.op': 'd',
+ 'o.applyOps.ns': coll.getFullName()
+ }
+ },
+ {$project: {opsPerApplyOps: {$size: '$o.applyOps'}}}
+ ])
+ .toArray();
+ assert.eq(2, oplogEntriesApplyOps.length);
+ assert.eq(
+ collCount,
+ oplogEntriesApplyOps[0]['opsPerApplyOps'] + oplogEntriesApplyOps[1]['opsPerApplyOps']);
+}
+
+function runTestInIsolation(testFunc) {
+ const rst = new ReplSetTest({
+ name: "batched_multi_deletes_test",
+ nodes: 1,
+ });
+ rst.startSet();
+ rst.initiate();
+ rst.awaitNodesAgreeOnPrimary();
+ testFunc(rst.getPrimary());
+ rst.stopSet();
+}
+
+runTestInIsolation(validateBatchedDeletesOplogDocsPerBatch);
+
+// TODO (SERVER-64860): re-enable this test.
+// runTestInIsolation(validateBatchedDeletesOplogBatchAbove16MB);
+})();
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index 9dc2cae6191..71b880f137b 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -1007,6 +1007,7 @@ env.Library(
'$BUILD_DIR/mongo/db/timeseries/bucket_catalog',
'$BUILD_DIR/mongo/s/coreshard',
'$BUILD_DIR/mongo/s/grid',
+ 'batched_write_context',
'catalog/collection_options',
'catalog/database_holder',
'op_observer',
@@ -1030,6 +1031,16 @@ env.Library(
)
env.Library(
+ target="batched_write_context",
+ source=[
+ "batched_write_context.cpp",
+ ],
+ LIBDEPS=[
+ "$BUILD_DIR/mongo/base",
+ ],
+)
+
+env.Library(
target="fcv_op_observer",
source=[
"fcv_op_observer.cpp",
diff --git a/src/mongo/db/auth/auth_op_observer.h b/src/mongo/db/auth/auth_op_observer.h
index a2a0183f65f..677e21e672c 100644
--- a/src/mongo/db/auth/auth_op_observer.h
+++ b/src/mongo/db/auth/auth_op_observer.h
@@ -208,6 +208,8 @@ public:
void onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) final {}
+ void onBatchedWriteCommit(OperationContext* opCtx) final {}
+
void onMajorityCommitPointUpdate(ServiceContext* service,
const repl::OpTime& newCommitPoint) final {}
diff --git a/src/mongo/db/batched_write_context.cpp b/src/mongo/db/batched_write_context.cpp
new file mode 100644
index 00000000000..2a9517d9fe3
--- /dev/null
+++ b/src/mongo/db/batched_write_context.cpp
@@ -0,0 +1,73 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/batched_write_context.h"
+#include "mongo/db/repl/oplog_entry.h"
+
+namespace mongo {
+const OperationContext::Decoration<BatchedWriteContext> BatchedWriteContext::get =
+ OperationContext::declareDecoration<BatchedWriteContext>();
+
+BatchedWriteContext::BatchedWriteContext() {}
+
+void BatchedWriteContext::addBatchedOperation(OperationContext* opCtx,
+ const repl::ReplOperation& operation) {
+ invariant(_batchWrites);
+
+ // Current support is only limited to delete operations, no change stream pre-images, no
+ // multi-doc transactions, no retryable writes.
+ invariant(operation.getOpType() == repl::OpTypeEnum::kDelete);
+ invariant(operation.getChangeStreamPreImageRecordingMode() ==
+ repl::ReplOperation::ChangeStreamPreImageRecordingMode::kOff);
+ invariant(!opCtx->inMultiDocumentTransaction());
+ invariant(!opCtx->getTxnNumber());
+ invariant(opCtx->lockState()->inAWriteUnitOfWork());
+
+ _batchedOperations.push_back(operation);
+}
+
+std::vector<repl::ReplOperation>& BatchedWriteContext::getBatchedOperations(
+ OperationContext* opCtx) {
+ invariant(_batchWrites);
+ return _batchedOperations;
+}
+
+void BatchedWriteContext::clearBatchedOperations(OperationContext* opCtx) {
+ invariant(_batchWrites);
+ _batchedOperations.clear();
+}
+
+bool BatchedWriteContext::writesAreBatched() const {
+ return _batchWrites;
+}
+void BatchedWriteContext::setWritesAreBatched(bool batched) {
+ _batchWrites = batched;
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/batched_write_context.h b/src/mongo/db/batched_write_context.h
new file mode 100644
index 00000000000..7aa5ec678e1
--- /dev/null
+++ b/src/mongo/db/batched_write_context.h
@@ -0,0 +1,82 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/operation_context.h"
+#include "mongo/db/repl/oplog_entry.h"
+
+namespace mongo {
+
+/**
+ * Group multiple writes into a single applyOps entry.
+ */
+
+/**
+ * This class is a decoration on the OperationContext holding context of writes that are logically
+ * related with each other. It can be used to stage writes belonging to the same WriteUnitOfWork or
+ * multi-document transaction. Currently only supports batching deletes in a WriteUnitOfWork.
+ */
+class BatchedWriteContext {
+public:
+ static const OperationContext::Decoration<BatchedWriteContext> get;
+
+ BatchedWriteContext();
+
+ // No copy and no move
+ BatchedWriteContext(const BatchedWriteContext&) = delete;
+ BatchedWriteContext(BatchedWriteContext&&) = delete;
+ BatchedWriteContext& operator=(const BatchedWriteContext&) = delete;
+ BatchedWriteContext& operator=(BatchedWriteContext&&) = delete;
+
+ bool writesAreBatched() const;
+ void setWritesAreBatched(bool batched);
+
+ /**
+ * Adds a stored operation to the list of stored operations for the current WUOW. It is illegal
+ * to add operations outside of a WUOW.
+ */
+ void addBatchedOperation(OperationContext* opCtx, const repl::ReplOperation& operation);
+
+ // Returns a reference to the stored operations for the current WUOW.
+ std::vector<repl::ReplOperation>& getBatchedOperations(OperationContext* opCtx);
+ void clearBatchedOperations(OperationContext* opCtx);
+
+private:
+ // Whether batching writes is enabled.
+ bool _batchWrites = false;
+
+ /**
+ * Holds oplog data for operations which have been applied in the current batched
+ * write context.
+ */
+ std::vector<repl::ReplOperation> _batchedOperations;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/exec/batched_delete_stage.cpp b/src/mongo/db/exec/batched_delete_stage.cpp
index bc0b1059861..7c4fac03e24 100644
--- a/src/mongo/db/exec/batched_delete_stage.cpp
+++ b/src/mongo/db/exec/batched_delete_stage.cpp
@@ -127,7 +127,7 @@ BatchedDeleteStage::BatchedDeleteStage(ExpressionContext* expCtx,
: DeleteStage::DeleteStage(
kStageType.rawData(), expCtx, std::move(params), ws, collection, child),
_batchParams(std::move(batchParams)) {
- uassert(6303800,
+ tassert(6303800,
"batched deletions only support multi-document deletions (multi: true)",
_params->isMulti);
tassert(6303801,
@@ -165,14 +165,13 @@ PlanStage::StageState BatchedDeleteStage::_deleteBatch(WorkingSetID* out) {
std::terminate();
}
- // TODO (SERVER-63047): use a single write timestamp by grouping oplog entries.
- opCtx()->recoveryUnit()->ignoreAllMultiTimestampConstraints();
-
const auto startOfBatchTimestampMillis = Date_t::now().toMillisSinceEpoch();
unsigned int docsDeleted = 0;
std::vector<RecordId> recordsThatNoLongerMatch;
try {
- WriteUnitOfWork wuow(opCtx());
+ // Start a WUOW with 'groupOplogEntries' which groups a delete batch into a single timestamp
+ // and oplog entry
+ WriteUnitOfWork wuow(opCtx(), true /* groupOplogEntries */);
for (auto& [rid, snapshotId] : _ridMap) {
if (MONGO_unlikely(throwWriteConflictExceptionInBatchedDeleteStage.shouldFail())) {
diff --git a/src/mongo/db/exec/batched_delete_stage.idl b/src/mongo/db/exec/batched_delete_stage.idl
index 6c1ed0de55b..3794cc1836d 100644
--- a/src/mongo/db/exec/batched_delete_stage.idl
+++ b/src/mongo/db/exec/batched_delete_stage.idl
@@ -46,7 +46,7 @@ server_parameters:
set_at: [startup, runtime]
cpp_vartype: 'AtomicWord<long long>'
cpp_varname: "gBatchedDeletesTargetBatchDocs"
- default: 100
+ default: 10 # TODO (SERVER-64547): re-evaluate this default.
validator:
gte: 0
batchedDeletesTargetBatchTimeMS:
diff --git a/src/mongo/db/fcv_op_observer.h b/src/mongo/db/fcv_op_observer.h
index 920508eef86..0b638e96e77 100644
--- a/src/mongo/db/fcv_op_observer.h
+++ b/src/mongo/db/fcv_op_observer.h
@@ -179,12 +179,12 @@ public:
void onUnpreparedTransactionCommit(OperationContext* opCtx,
std::vector<repl::ReplOperation>* statements,
size_t numberOfPrePostImagesToWrite) final {}
+
void onPreparedTransactionCommit(
OperationContext* opCtx,
OplogSlot commitOplogEntryOpTime,
Timestamp commitTimestamp,
const std::vector<repl::ReplOperation>& statements) noexcept final{};
-
std::unique_ptr<ApplyOpsOplogSlotAndOperationAssignment> preTransactionPrepare(
OperationContext* opCtx,
const std::vector<OplogSlot>& reservedSlots,
@@ -201,8 +201,12 @@ public:
const ApplyOpsOplogSlotAndOperationAssignment* applyOpsOperationAssignment,
size_t numberOfPrePostImagesToWrite,
Date_t wallClockTime) final{};
+
void onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) final{};
+
+ void onBatchedWriteCommit(OperationContext* opCtx) final {}
+
void onMajorityCommitPointUpdate(ServiceContext* service,
const repl::OpTime& newCommitPoint) final {}
diff --git a/src/mongo/db/free_mon/free_mon_op_observer.h b/src/mongo/db/free_mon/free_mon_op_observer.h
index b266555151c..40511b4360c 100644
--- a/src/mongo/db/free_mon/free_mon_op_observer.h
+++ b/src/mongo/db/free_mon/free_mon_op_observer.h
@@ -208,6 +208,8 @@ public:
void onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) final {}
+ void onBatchedWriteCommit(OperationContext* opCtx) final {}
+
void onMajorityCommitPointUpdate(ServiceContext* service,
const repl::OpTime& newCommitPoint) final {}
diff --git a/src/mongo/db/op_observer.h b/src/mongo/db/op_observer.h
index 6c725dfd21f..a426783b907 100644
--- a/src/mongo/db/op_observer.h
+++ b/src/mongo/db/op_observer.h
@@ -413,6 +413,8 @@ public:
Timestamp commitTimestamp,
const std::vector<repl::ReplOperation>& statements) noexcept = 0;
+ virtual void onBatchedWriteCommit(OperationContext* opCtx) = 0;
+
/**
* Contains "applyOps" oplog entries and oplog slots to be used for writing pre- and post- image
* oplog entries for a transaction. "applyOps" entries are not actual "applyOps" entries to be
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp
index fac3cd11e1e..4acebc278c6 100644
--- a/src/mongo/db/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer_impl.cpp
@@ -37,6 +37,7 @@
#include <limits>
#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/db/batched_write_context.h"
#include "mongo/db/catalog/collection_options.h"
#include "mongo/db/catalog/database.h"
#include "mongo/db/catalog/database_holder.h"
@@ -831,8 +832,15 @@ void OpObserverImpl::onDelete(OperationContext* opCtx,
const bool inMultiDocumentTransaction =
txnParticipant && opCtx->writesAreReplicated() && txnParticipant.transactionIsOpen();
+ auto& batchedWriteContext = BatchedWriteContext::get(opCtx);
+ const bool inBatchedWrite = batchedWriteContext.writesAreBatched();
+
OpTimeBundle opTime;
- if (inMultiDocumentTransaction) {
+ if (inBatchedWrite) {
+ auto operation =
+ MutableOplogEntry::makeDeleteOperation(nss, uuid, documentKey.getShardKeyAndId());
+ batchedWriteContext.addBatchedOperation(opCtx, operation);
+ } else if (inMultiDocumentTransaction) {
const bool inRetryableInternalTransaction =
isInternalSessionForRetryableWrite(*opCtx->getLogicalSessionId());
@@ -1565,9 +1573,11 @@ void packTransactionStatementsForApplyOps(
}
}
-// Logs one applyOps entry and may update the transactions table. Assumes that the given BSON
-// builder object already has an 'applyOps' field appended pointing to the desired array of ops
-// i.e. { "applyOps" : [op1, op2, ...] }
+// Logs one applyOps entry on a prepared transaction, or an unprepared transaction's commit, or on
+// committing a WUOW that is not necessarily tied to a multi-document transaction. It may update the
+// transactions table on multi-document transactions. Assumes that the given BSON builder object
+// already has an 'applyOps' field appended pointing to the desired array of ops i.e. { "applyOps"
+// : [op1, op2, ...] }
//
// @param txnState the 'state' field of the transaction table entry update. @param startOpTime the
// optime of the 'startOpTime' field of the transaction table entry update. If boost::none, no
@@ -1576,24 +1586,26 @@ void packTransactionStatementsForApplyOps(
// updated after the oplog entry is written.
//
// Returns the optime of the written oplog entry.
-OpTimeBundle logApplyOpsForTransaction(OperationContext* opCtx,
- MutableOplogEntry* oplogEntry,
- boost::optional<DurableTxnStateEnum> txnState,
- boost::optional<repl::OpTime> startOpTime,
- std::vector<StmtId> stmtIdsWritten,
- const bool updateTxnTable) {
+OpTimeBundle logApplyOps(OperationContext* opCtx,
+ MutableOplogEntry* oplogEntry,
+ boost::optional<DurableTxnStateEnum> txnState,
+ boost::optional<repl::OpTime> startOpTime,
+ std::vector<StmtId> stmtIdsWritten,
+ const bool updateTxnTable) {
if (!stmtIdsWritten.empty()) {
invariant(isInternalSessionForRetryableWrite(*opCtx->getLogicalSessionId()));
}
- const auto txnRetryCounter = *opCtx->getTxnRetryCounter();
+ const auto txnRetryCounter = opCtx->getTxnRetryCounter();
+
+ invariant(bool(txnRetryCounter) == bool(TransactionParticipant::get(opCtx)));
oplogEntry->setOpType(repl::OpTypeEnum::kCommand);
oplogEntry->setNss({"admin", "$cmd"});
oplogEntry->setSessionId(opCtx->getLogicalSessionId());
oplogEntry->setTxnNumber(opCtx->getTxnNumber());
- if (!isDefaultTxnRetryCounter(txnRetryCounter)) {
- oplogEntry->getOperationSessionInfo().setTxnRetryCounter(txnRetryCounter);
+ if (txnRetryCounter && !isDefaultTxnRetryCounter(*txnRetryCounter)) {
+ oplogEntry->getOperationSessionInfo().setTxnRetryCounter(*txnRetryCounter);
}
try {
@@ -1606,8 +1618,8 @@ OpTimeBundle logApplyOpsForTransaction(OperationContext* opCtx,
sessionTxnRecord.setLastWriteDate(times.wallClockTime);
sessionTxnRecord.setState(txnState);
sessionTxnRecord.setStartOpTime(startOpTime);
- if (!isDefaultTxnRetryCounter(txnRetryCounter)) {
- sessionTxnRecord.setTxnRetryCounter(txnRetryCounter);
+ if (txnRetryCounter && !isDefaultTxnRetryCounter(*txnRetryCounter)) {
+ sessionTxnRecord.setTxnRetryCounter(*txnRetryCounter);
}
onWriteOpCompleted(opCtx, std::move(stmtIdsWritten), sessionTxnRecord);
}
@@ -1622,7 +1634,8 @@ OpTimeBundle logApplyOpsForTransaction(OperationContext* opCtx,
MONGO_UNREACHABLE;
}
-// Logs transaction oplog entries for preparing a transaction or committing an unprepared
+// Logs applyOps oplog entries for preparing a transaction, committing an unprepared
+// transaction, or committing a WUOW that is not necessarily related to a multi-document
// transaction. This includes the in-progress 'partialTxn' oplog entries followed by the implicit
// prepare or commit entry. If the 'prepare' argument is true, it will log entries for a prepared
// transaction. Otherwise, it logs entries for an unprepared transaction. The total number of oplog
@@ -1645,7 +1658,7 @@ OpTimeBundle logApplyOpsForTransaction(OperationContext* opCtx,
// skipping over some reserved slots.
//
// The number of oplog entries written is returned.
-int logOplogEntriesForTransaction(
+int logOplogEntries(
OperationContext* opCtx,
std::vector<repl::ReplOperation>* stmts,
const std::vector<OplogSlot>& oplogSlots,
@@ -1669,7 +1682,9 @@ int logOplogEntriesForTransaction(
// OplogSlotReserver.
invariant(opCtx->lockState()->isWriteLocked());
- prevWriteOpTime.writeOpTime = txnParticipant.getLastWriteOpTime();
+ if (txnParticipant) {
+ prevWriteOpTime.writeOpTime = txnParticipant.getLastWriteOpTime();
+ }
auto currPrePostImageOplogEntryOplogSlot =
applyOpsOperationAssignment.prePostImageOplogEntryOplogSlots.begin();
@@ -1786,9 +1801,9 @@ int logOplogEntriesForTransaction(
applyOpsBuilder.append("count", static_cast<long long>(stmts->size()));
}
- // For both prepared and unprepared transactions, update the transactions table on
- // the first and last op.
- auto updateTxnTable = firstOp || lastOp;
+ // For both prepared and unprepared transactions (but not for batched writes) update the
+ // transactions table on the first and last op.
+ auto updateTxnTable = txnParticipant && (firstOp || lastOp);
// The first optime of the transaction is always the first oplog slot, except in the
// case of a single prepare oplog entry.
@@ -1802,14 +1817,15 @@ int logOplogEntriesForTransaction(
MutableOplogEntry oplogEntry;
oplogEntry.setOpTime(applyOpsEntry.oplogSlot);
- oplogEntry.setPrevWriteOpTimeInTransaction(prevWriteOpTime.writeOpTime);
+ if (txnParticipant) {
+ oplogEntry.setPrevWriteOpTimeInTransaction(prevWriteOpTime.writeOpTime);
+ }
oplogEntry.setWallClockTime(wallClockTime);
oplogEntry.setObject(applyOpsBuilder.done());
auto txnState = isPartialTxn
? DurableTxnStateEnum::kInProgress
: (implicitPrepare ? DurableTxnStateEnum::kPrepared : DurableTxnStateEnum::kCommitted);
- prevWriteOpTime =
- logApplyOpsForTransaction(opCtx,
+ prevWriteOpTime = logApplyOps(opCtx,
&oplogEntry,
txnState,
startOpTime,
@@ -1923,15 +1939,14 @@ void OpObserverImpl::onUnpreparedTransactionCommit(OperationContext* opCtx,
// Log in-progress entries for the transaction along with the implicit commit.
boost::optional<ImageBundle> imageToWrite;
- int numOplogEntries = logOplogEntriesForTransaction(opCtx,
- statements,
- oplogSlots,
- applyOpsOplogSlotAndOperationAssignment,
- &imageToWrite,
- numberOfPrePostImagesToWrite,
- false /* prepare*/,
- wallClockTime);
-
+ int numOplogEntries = logOplogEntries(opCtx,
+ statements,
+ oplogSlots,
+ applyOpsOplogSlotAndOperationAssignment,
+ &imageToWrite,
+ numberOfPrePostImagesToWrite,
+ false /* prepare*/,
+ wallClockTime);
if (imageToWrite) {
writeToImageCollection(opCtx,
*opCtx->getLogicalSessionId(),
@@ -1945,6 +1960,38 @@ void OpObserverImpl::onUnpreparedTransactionCommit(OperationContext* opCtx,
shardObserveTransactionPrepareOrUnpreparedCommit(opCtx, *statements, commitOpTime);
}
+void OpObserverImpl::onBatchedWriteCommit(OperationContext* opCtx) {
+ if (repl::ReplicationCoordinator::get(opCtx)->getReplicationMode() !=
+ repl::ReplicationCoordinator::modeReplSet ||
+ !opCtx->writesAreReplicated()) {
+ return;
+ }
+
+ auto& batchedWriteContext = BatchedWriteContext::get(opCtx);
+ auto& batchedOps = batchedWriteContext.getBatchedOperations(opCtx);
+
+ // Reserve all the optimes in advance, so we only need to get the optime mutex once. We
+ // reserve enough entries for all statements in the transaction.
+ auto oplogSlots = repl::getNextOpTimes(opCtx, batchedOps.size());
+
+ auto noPrePostImage = boost::optional<ImageBundle>(boost::none);
+
+ // Serialize batched statements to BSON and determine their assignment to "applyOps"
+ // entries.
+ const auto applyOpsOplogSlotAndOperationAssignment =
+ getApplyOpsOplogSlotAndOperationAssignmentForTransaction(
+ opCtx, oplogSlots, 0 /*numberOfPrePostImagesToWrite*/, false /*prepare*/, batchedOps);
+ const auto wallClockTime = getWallClockTimeForOpLog(opCtx);
+ logOplogEntries(opCtx,
+ &batchedOps,
+ oplogSlots,
+ applyOpsOplogSlotAndOperationAssignment,
+ &noPrePostImage,
+ 0 /* numberOfPrePostImagesToWrite */,
+ false,
+ wallClockTime);
+}
+
void OpObserverImpl::onPreparedTransactionCommit(
OperationContext* opCtx,
OplogSlot commitOplogEntryOpTime,
@@ -2024,14 +2071,14 @@ void OpObserverImpl::onTransactionPrepare(
// the last reserved slot, because the transaction participant has already used
// that as the prepare time.
boost::optional<ImageBundle> imageToWrite;
- logOplogEntriesForTransaction(opCtx,
- statements,
- reservedSlots,
- *applyOpsOperationAssignment,
- &imageToWrite,
- numberOfPrePostImagesToWrite,
- true /* prepare */,
- wallClockTime);
+ logOplogEntries(opCtx,
+ statements,
+ reservedSlots,
+ *applyOpsOperationAssignment,
+ &imageToWrite,
+ numberOfPrePostImagesToWrite,
+ true /* prepare */,
+ wallClockTime);
if (imageToWrite) {
writeToImageCollection(opCtx,
*opCtx->getLogicalSessionId(),
@@ -2054,12 +2101,12 @@ void OpObserverImpl::onTransactionPrepare(
oplogEntry.setPrevWriteOpTimeInTransaction(repl::OpTime());
oplogEntry.setObject(applyOpsBuilder.done());
oplogEntry.setWallClockTime(wallClockTime);
- logApplyOpsForTransaction(opCtx,
- &oplogEntry,
- DurableTxnStateEnum::kPrepared,
- oplogSlot,
- {},
- true /* updateTxnTable */);
+ logApplyOps(opCtx,
+ &oplogEntry,
+ DurableTxnStateEnum::kPrepared,
+ oplogSlot,
+ {},
+ true /* updateTxnTable */);
}
wuow.commit();
});
diff --git a/src/mongo/db/op_observer_impl.h b/src/mongo/db/op_observer_impl.h
index 7d40fe84307..7843b06602f 100644
--- a/src/mongo/db/op_observer_impl.h
+++ b/src/mongo/db/op_observer_impl.h
@@ -187,6 +187,7 @@ public:
void onUnpreparedTransactionCommit(OperationContext* opCtx,
std::vector<repl::ReplOperation>* statements,
size_t numberOfPrePostImagesToWrite) final;
+ void onBatchedWriteCommit(OperationContext* opCtx) final;
void onPreparedTransactionCommit(
OperationContext* opCtx,
OplogSlot commitOplogEntryOpTime,
diff --git a/src/mongo/db/op_observer_impl_test.cpp b/src/mongo/db/op_observer_impl_test.cpp
index 4b2be83993e..27f497aedea 100644
--- a/src/mongo/db/op_observer_impl_test.cpp
+++ b/src/mongo/db/op_observer_impl_test.cpp
@@ -31,6 +31,7 @@
#include "mongo/platform/basic.h"
+#include "mongo/db/batched_write_context.h"
#include "mongo/db/catalog/import_collection_oplog_entry_gen.h"
#include "mongo/db/client.h"
#include "mongo/db/concurrency/locker_noop.h"
@@ -41,6 +42,7 @@
#include "mongo/db/keys_collection_client_sharded.h"
#include "mongo/db/keys_collection_manager.h"
#include "mongo/db/logical_time_validator.h"
+#include "mongo/db/namespace_string.h"
#include "mongo/db/op_observer_impl.h"
#include "mongo/db/op_observer_registry.h"
#include "mongo/db/pipeline/change_stream_preimage_gen.h"
@@ -2345,6 +2347,153 @@ struct DeleteTestCase {
}
};
+class BatchedWriteOutputsTest : public OpObserverTest {
+protected:
+ const NamespaceString _nss{"test", "coll"};
+ const UUID _uuid = UUID::gen();
+};
+
+DEATH_TEST_REGEX_F(BatchedWriteOutputsTest,
+ TestCannotGroupInserts,
+ "Invariant failure.*getOpType.*repl::OpTypeEnum::kDelete") {
+ auto opCtxRaii = cc().makeOperationContext();
+ OperationContext* opCtx = opCtxRaii.get();
+ WriteUnitOfWork wuow(opCtx, true /* groupOplogEntries */);
+
+ auto& bwc = BatchedWriteContext::get(opCtx);
+ bwc.addBatchedOperation(opCtx,
+ repl::MutableOplogEntry::makeInsertOperation(
+ _nss, _uuid, BSON("_id" << 0), BSON("_id" << 0)));
+}
+
+DEATH_TEST_REGEX_F(BatchedWriteOutputsTest,
+ TestDoesNotSupportPreImagesInCollection,
+ "Invariant "
+ "failure.*getChangeStreamPreImageRecordingMode.*repl::ReplOperation::"
+ "ChangeStreamPreImageRecordingMode::kOff") {
+ auto opCtxRaii = cc().makeOperationContext();
+ OperationContext* opCtx = opCtxRaii.get();
+ WriteUnitOfWork wuow(opCtx, true /* groupOplogEntries */);
+
+ auto& bwc = BatchedWriteContext::get(opCtx);
+ auto entry = repl::MutableOplogEntry::makeDeleteOperation(_nss, _uuid, BSON("_id" << 0));
+ entry.setChangeStreamPreImageRecordingMode(
+ repl::ReplOperation::ChangeStreamPreImageRecordingMode::kPreImagesCollection);
+ bwc.addBatchedOperation(opCtx, entry);
+}
+
+DEATH_TEST_REGEX_F(BatchedWriteOutputsTest,
+ TestDoesNotSupportPreImagesInOplog,
+ "Invariant "
+ "failure.*getChangeStreamPreImageRecordingMode.*repl::ReplOperation::"
+ "ChangeStreamPreImageRecordingMode::kOff") {
+ auto opCtxRaii = cc().makeOperationContext();
+ OperationContext* opCtx = opCtxRaii.get();
+ WriteUnitOfWork wuow(opCtx, true /* groupOplogEntries */);
+
+ auto& bwc = BatchedWriteContext::get(opCtx);
+ auto entry = repl::MutableOplogEntry::makeDeleteOperation(_nss, _uuid, BSON("_id" << 0));
+ entry.setChangeStreamPreImageRecordingMode(
+ repl::ReplOperation::ChangeStreamPreImageRecordingMode::kOplog);
+ bwc.addBatchedOperation(opCtx, entry);
+}
+
+DEATH_TEST_REGEX_F(BatchedWriteOutputsTest,
+ TestDoesNotSupportMultiDocTxn,
+ "Invariant failure.*!opCtx->inMultiDocumentTransaction()") {
+ auto opCtxRaii = cc().makeOperationContext();
+ OperationContext* opCtx = opCtxRaii.get();
+ opCtx->setInMultiDocumentTransaction();
+ WriteUnitOfWork wuow(opCtx, true /* groupOplogEntries */);
+
+ auto& bwc = BatchedWriteContext::get(opCtx);
+ auto entry = repl::MutableOplogEntry::makeDeleteOperation(_nss, _uuid, BSON("_id" << 0));
+ bwc.addBatchedOperation(opCtx, entry);
+}
+
+DEATH_TEST_REGEX_F(BatchedWriteOutputsTest,
+ TestDoesNotSupportRetryableWrites,
+ "Invariant failure.*!opCtx->getTxnNumber()") {
+ auto opCtxRaii = cc().makeOperationContext();
+ OperationContext* opCtx = opCtxRaii.get();
+ opCtx->setLogicalSessionId(LogicalSessionId(makeLogicalSessionIdForTest()));
+ opCtx->setTxnNumber(TxnNumber{1});
+ WriteUnitOfWork wuow(opCtx, true /* groupOplogEntries */);
+
+ auto& bwc = BatchedWriteContext::get(opCtx);
+ auto entry = repl::MutableOplogEntry::makeDeleteOperation(_nss, _uuid, BSON("_id" << 0));
+ bwc.addBatchedOperation(opCtx, entry);
+}
+
+// Verifies that a WriteUnitOfWork with groupOplogEntries=true replicates its writes as a single
+// applyOps. Tests WUOWs batching a range of 1 to 5 deletes (inclusive).
+TEST_F(BatchedWriteOutputsTest, TestApplyOpsGrouping) {
+ const auto nDocsToDelete = 5;
+ const BSONObj docsToDelete[nDocsToDelete] = {
+ BSON("_id" << 0),
+ BSON("_id" << 1),
+ BSON("_id" << 2),
+ BSON("_id" << 3),
+ BSON("_id" << 4),
+ };
+
+ // Setup.
+ auto opCtxRaii = cc().makeOperationContext();
+ OperationContext* opCtx = opCtxRaii.get();
+ reset(opCtx, NamespaceString::kRsOplogNamespace);
+ auto opObserverRegistry = std::make_unique<OpObserverRegistry>();
+ opObserverRegistry->addObserver(std::make_unique<OpObserverImpl>());
+ opCtx->getServiceContext()->setOpObserver(std::move(opObserverRegistry));
+
+ // Run the test with WUOW's grouping 1 to 5 deletions.
+ for (size_t docsToBeBatched = 1; docsToBeBatched <= nDocsToDelete; docsToBeBatched++) {
+
+ // Start a WUOW with groupOplogEntries=true. Verify that initialises the
+ // BatchedWriteContext.
+ auto& bwc = BatchedWriteContext::get(opCtx);
+ ASSERT(!bwc.writesAreBatched());
+ WriteUnitOfWork wuow(opCtx, true /* groupOplogEntries */);
+ ASSERT(bwc.writesAreBatched());
+
+ AutoGetCollection locks(opCtx, _nss, LockMode::MODE_IX);
+
+ for (size_t doc = 0; doc < docsToBeBatched; doc++) {
+ // This test does not call `OpObserver::aboutToDelete`. That method has the side-effect
+ // of setting of `documentKey` on the delete for sharding purposes.
+ // `OpObserverImpl::onDelete` asserts its existence.
+ documentKeyDecoration(opCtx).emplace(docsToDelete[doc]["_id"].wrap(), boost::none);
+ const OplogDeleteEntryArgs args;
+ opCtx->getServiceContext()->getOpObserver()->onDelete(
+ opCtx, _nss, _uuid, kUninitializedStmtId, args);
+ }
+
+ wuow.commit();
+
+ // Retrieve the oplog entries. We expect 'docsToBeBatched' oplog entries because of previous
+ // iteration of this loop that exercised previous batch sizes.
+ std::vector<BSONObj> oplogs = getNOplogEntries(opCtx, docsToBeBatched);
+ // Entries in ascending timestamp order, so fetch the last one at the back of the vector.
+ auto lastOplogEntry = oplogs.back();
+ auto lastOplogEntryParsed = assertGet(OplogEntry::parse(oplogs.back()));
+
+ // The batch consists of an applyOps, whose array contains all deletes issued within the
+ // WUOW.
+ ASSERT(lastOplogEntryParsed.getCommandType() == OplogEntry::CommandType::kApplyOps);
+ std::vector<repl::OplogEntry> innerEntries;
+ repl::ApplyOps::extractOperationsTo(
+ lastOplogEntryParsed, lastOplogEntryParsed.getEntry().toBSON(), &innerEntries);
+ ASSERT_EQ(innerEntries.size(), docsToBeBatched);
+
+ for (size_t opIdx = 0; opIdx < docsToBeBatched; opIdx++) {
+ const auto innerEntry = innerEntries[opIdx];
+ ASSERT(innerEntry.getCommandType() == OplogEntry::CommandType::kNotCommand);
+ ASSERT(innerEntry.getOpType() == repl::OpTypeEnum::kDelete);
+ ASSERT(innerEntry.getNss() == NamespaceString("test.coll"));
+ ASSERT(0 == innerEntry.getObject().woCompare(docsToDelete[opIdx]));
+ }
+ }
+}
+
class OnDeleteOutputsTest : public OpObserverTest {
protected:
diff --git a/src/mongo/db/op_observer_noop.h b/src/mongo/db/op_observer_noop.h
index ab610264ce1..bee9db82207 100644
--- a/src/mongo/db/op_observer_noop.h
+++ b/src/mongo/db/op_observer_noop.h
@@ -161,6 +161,7 @@ public:
void onUnpreparedTransactionCommit(OperationContext* opCtx,
std::vector<repl::ReplOperation>* statements,
size_t numberOfPrePostImagesToWrite) override {}
+ void onBatchedWriteCommit(OperationContext* opCtx) final {}
void onPreparedTransactionCommit(
OperationContext* opCtx,
OplogSlot commitOplogEntryOpTime,
diff --git a/src/mongo/db/op_observer_registry.h b/src/mongo/db/op_observer_registry.h
index 5186d6a3f47..e153d46d65e 100644
--- a/src/mongo/db/op_observer_registry.h
+++ b/src/mongo/db/op_observer_registry.h
@@ -430,6 +430,13 @@ public:
o->onTransactionAbort(opCtx, abortOplogEntryOpTime);
}
+ void onBatchedWriteCommit(OperationContext* opCtx) override {
+ ReservedTimes times{opCtx};
+ for (auto& o : _observers) {
+ o->onBatchedWriteCommit(opCtx);
+ }
+ }
+
void onMajorityCommitPointUpdate(ServiceContext* service,
const repl::OpTime& newCommitPoint) override {
for (auto& o : _observers)
diff --git a/src/mongo/db/query/get_executor.cpp b/src/mongo/db/query/get_executor.cpp
index 8f526c82ade..0c12807f3e2 100644
--- a/src/mongo/db/query/get_executor.cpp
+++ b/src/mongo/db/query/get_executor.cpp
@@ -1647,7 +1647,8 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorDele
deleteStageParams->canonicalQuery = cq.get();
if (MONGO_unlikely(gInternalBatchUserMultiDeletesForTest.load() &&
- nss.ns() == "__internalBatchedDeletesTesting.Collection0")) {
+ nss.ns() == "__internalBatchedDeletesTesting.Collection0" &&
+ deleteStageParams->isMulti)) {
root =
std::make_unique<BatchedDeleteStage>(cq->getExpCtxRaw(),
std::move(deleteStageParams),
diff --git a/src/mongo/db/repl/primary_only_service_op_observer.h b/src/mongo/db/repl/primary_only_service_op_observer.h
index a81b66e04cd..63a93f69bb9 100644
--- a/src/mongo/db/repl/primary_only_service_op_observer.h
+++ b/src/mongo/db/repl/primary_only_service_op_observer.h
@@ -210,6 +210,8 @@ public:
void onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) final {}
+ void onBatchedWriteCommit(OperationContext* opCtx) final {}
+
void onMajorityCommitPointUpdate(ServiceContext* service,
const repl::OpTime& newCommitPoint) final {}
diff --git a/src/mongo/db/repl/tenant_migration_donor_op_observer.h b/src/mongo/db/repl/tenant_migration_donor_op_observer.h
index 6ab805ee967..2f977b38c52 100644
--- a/src/mongo/db/repl/tenant_migration_donor_op_observer.h
+++ b/src/mongo/db/repl/tenant_migration_donor_op_observer.h
@@ -207,6 +207,8 @@ public:
void onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) final {}
+ void onBatchedWriteCommit(OperationContext* opCtx) final {}
+
void onMajorityCommitPointUpdate(ServiceContext* service,
const repl::OpTime& newCommitPoint) final;
diff --git a/src/mongo/db/repl/tenant_migration_recipient_op_observer.h b/src/mongo/db/repl/tenant_migration_recipient_op_observer.h
index b7b0a88ca96..3a6d9e95922 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_op_observer.h
+++ b/src/mongo/db/repl/tenant_migration_recipient_op_observer.h
@@ -209,6 +209,8 @@ public:
void onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) final {}
+ void onBatchedWriteCommit(OperationContext* opCtx) final {}
+
void onMajorityCommitPointUpdate(ServiceContext* service,
const repl::OpTime& newCommitPoint) final {}
diff --git a/src/mongo/db/s/config_server_op_observer.h b/src/mongo/db/s/config_server_op_observer.h
index 776ce97af3d..ddb36cfbd4c 100644
--- a/src/mongo/db/s/config_server_op_observer.h
+++ b/src/mongo/db/s/config_server_op_observer.h
@@ -210,6 +210,8 @@ public:
void onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) override {}
+ void onBatchedWriteCommit(OperationContext* opCtx) final {}
+
void onMajorityCommitPointUpdate(ServiceContext* service,
const repl::OpTime& newCommitPoint) override;
diff --git a/src/mongo/db/s/resharding/resharding_op_observer.h b/src/mongo/db/s/resharding/resharding_op_observer.h
index be4dfc18505..35630ed875a 100644
--- a/src/mongo/db/s/resharding/resharding_op_observer.h
+++ b/src/mongo/db/s/resharding/resharding_op_observer.h
@@ -230,6 +230,8 @@ public:
void onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) override {}
+ void onBatchedWriteCommit(OperationContext* opCtx) final {}
+
void onMajorityCommitPointUpdate(ServiceContext* service,
const repl::OpTime& newCommitPoint) override {}
diff --git a/src/mongo/db/s/shard_server_op_observer.h b/src/mongo/db/s/shard_server_op_observer.h
index 684ef8153bf..82449c72136 100644
--- a/src/mongo/db/s/shard_server_op_observer.h
+++ b/src/mongo/db/s/shard_server_op_observer.h
@@ -209,6 +209,8 @@ public:
void onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) override {}
+ void onBatchedWriteCommit(OperationContext* opCtx) final {}
+
void onMajorityCommitPointUpdate(ServiceContext* service,
const repl::OpTime& newCommitPoint) override {}
diff --git a/src/mongo/db/serverless/shard_split_donor_op_observer.h b/src/mongo/db/serverless/shard_split_donor_op_observer.h
index 27b05527cce..f55a526e389 100644
--- a/src/mongo/db/serverless/shard_split_donor_op_observer.h
+++ b/src/mongo/db/serverless/shard_split_donor_op_observer.h
@@ -206,6 +206,8 @@ public:
void onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) final {}
+ void onBatchedWriteCommit(OperationContext* opCtx) final {}
+
void onMajorityCommitPointUpdate(ServiceContext* service,
const repl::OpTime& newCommitPoint) final;
diff --git a/src/mongo/db/storage/SConscript b/src/mongo/db/storage/SConscript
index 22c96f38cba..b216260dde0 100644
--- a/src/mongo/db/storage/SConscript
+++ b/src/mongo/db/storage/SConscript
@@ -370,6 +370,7 @@ env.Library(
],
LIBDEPS_PRIVATE=[
"$BUILD_DIR/mongo/base",
+ '$BUILD_DIR/mongo/db/batched_write_context',
"$BUILD_DIR/mongo/db/storage/storage_options",
'$BUILD_DIR/mongo/util/fail_point',
'recovery_unit_base',
diff --git a/src/mongo/db/storage/write_unit_of_work.cpp b/src/mongo/db/storage/write_unit_of_work.cpp
index 157a0b50b2a..29b02e560b9 100644
--- a/src/mongo/db/storage/write_unit_of_work.cpp
+++ b/src/mongo/db/storage/write_unit_of_work.cpp
@@ -33,7 +33,9 @@
#include "mongo/db/storage/write_unit_of_work.h"
+#include "mongo/db/batched_write_context.h"
#include "mongo/db/catalog/uncommitted_collections.h"
+#include "mongo/db/op_observer.h"
#include "mongo/db/operation_context.h"
#include "mongo/logv2/log.h"
#include "mongo/util/fail_point.h"
@@ -43,11 +45,21 @@ namespace mongo {
MONGO_FAIL_POINT_DEFINE(sleepBeforeCommit);
-WriteUnitOfWork::WriteUnitOfWork(OperationContext* opCtx)
- : _opCtx(opCtx), _toplevel(opCtx->_ruState == RecoveryUnitState::kNotInUnitOfWork) {
+WriteUnitOfWork::WriteUnitOfWork(OperationContext* opCtx, bool groupOplogEntries)
+ : _opCtx(opCtx),
+ _toplevel(opCtx->_ruState == RecoveryUnitState::kNotInUnitOfWork),
+ _groupOplogEntries(groupOplogEntries) {
uassert(ErrorCodes::IllegalOperation,
"Cannot execute a write operation in read-only mode",
!storageGlobalParams.readOnly);
+ // Grouping oplog entries doesn't support WUOW nesting (e.g. multi-doc transactions).
+ invariant(_toplevel || !_groupOplogEntries);
+
+ if (_groupOplogEntries) {
+ auto& batchedWriteContext = BatchedWriteContext::get(_opCtx);
+ batchedWriteContext.setWritesAreBatched(true);
+ }
+
_opCtx->lockState()->beginWriteUnitOfWork();
if (_toplevel) {
_opCtx->recoveryUnit()->beginUnitOfWork(_opCtx);
@@ -70,6 +82,12 @@ WriteUnitOfWork::~WriteUnitOfWork() {
}
_opCtx->lockState()->endWriteUnitOfWork();
}
+
+ if (_groupOplogEntries) {
+ auto& batchedWriteContext = BatchedWriteContext::get(_opCtx);
+ batchedWriteContext.clearBatchedOperations(_opCtx);
+ batchedWriteContext.setWritesAreBatched(false);
+ }
}
std::unique_ptr<WriteUnitOfWork> WriteUnitOfWork::createForSnapshotResume(
@@ -107,6 +125,12 @@ void WriteUnitOfWork::commit() {
invariant(!_committed);
invariant(!_released);
invariant(_opCtx->_ruState == RecoveryUnitState::kActiveUnitOfWork);
+
+ if (_groupOplogEntries) {
+ const auto opObserver = _opCtx->getServiceContext()->getOpObserver();
+ invariant(opObserver);
+ opObserver->onBatchedWriteCommit(_opCtx);
+ }
if (_toplevel) {
if (MONGO_unlikely(sleepBeforeCommit.shouldFail())) {
sleepFor(Milliseconds(100));
diff --git a/src/mongo/db/storage/write_unit_of_work.h b/src/mongo/db/storage/write_unit_of_work.h
index 3d8130c290e..3bb8e461563 100644
--- a/src/mongo/db/storage/write_unit_of_work.h
+++ b/src/mongo/db/storage/write_unit_of_work.h
@@ -59,7 +59,7 @@ public:
kFailedUnitOfWork // in a unit of work that has failed and must be aborted
};
- WriteUnitOfWork(OperationContext* opCtx);
+ WriteUnitOfWork(OperationContext* opCtx, bool groupOplogEntries = false);
~WriteUnitOfWork();
@@ -101,6 +101,7 @@ private:
OperationContext* _opCtx;
bool _toplevel;
+ bool _groupOplogEntries;
bool _committed = false;
bool _prepared = false;
diff --git a/src/mongo/db/user_write_block_mode_op_observer.h b/src/mongo/db/user_write_block_mode_op_observer.h
index adef807ed97..a3cfd78c518 100644
--- a/src/mongo/db/user_write_block_mode_op_observer.h
+++ b/src/mongo/db/user_write_block_mode_op_observer.h
@@ -217,6 +217,8 @@ public:
void onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) final {}
+ void onBatchedWriteCommit(OperationContext* opCtx) final {}
+
void onMajorityCommitPointUpdate(ServiceContext* service,
const repl::OpTime& newCommitPoint) final {}
diff --git a/src/mongo/idl/cluster_server_parameter_op_observer.h b/src/mongo/idl/cluster_server_parameter_op_observer.h
index 932e99f44a3..b2c87f83fb2 100644
--- a/src/mongo/idl/cluster_server_parameter_op_observer.h
+++ b/src/mongo/idl/cluster_server_parameter_op_observer.h
@@ -195,6 +195,8 @@ public:
std::vector<repl::ReplOperation>* statements,
size_t numberOfPrePostImagesToWrite) final {}
+ void onBatchedWriteCommit(OperationContext* opCtx) final {}
+
void onPreparedTransactionCommit(
OperationContext* opCtx,
OplogSlot commitOplogEntryOpTime,