diff options
Diffstat (limited to 'src/mongo')
27 files changed, 483 insertions, 58 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 9dc2cae6191..71b880f137b 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -1007,6 +1007,7 @@ env.Library( '$BUILD_DIR/mongo/db/timeseries/bucket_catalog', '$BUILD_DIR/mongo/s/coreshard', '$BUILD_DIR/mongo/s/grid', + 'batched_write_context', 'catalog/collection_options', 'catalog/database_holder', 'op_observer', @@ -1030,6 +1031,16 @@ env.Library( ) env.Library( + target="batched_write_context", + source=[ + "batched_write_context.cpp", + ], + LIBDEPS=[ + "$BUILD_DIR/mongo/base", + ], +) + +env.Library( target="fcv_op_observer", source=[ "fcv_op_observer.cpp", diff --git a/src/mongo/db/auth/auth_op_observer.h b/src/mongo/db/auth/auth_op_observer.h index a2a0183f65f..677e21e672c 100644 --- a/src/mongo/db/auth/auth_op_observer.h +++ b/src/mongo/db/auth/auth_op_observer.h @@ -208,6 +208,8 @@ public: void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) final {} + void onBatchedWriteCommit(OperationContext* opCtx) final {} + void onMajorityCommitPointUpdate(ServiceContext* service, const repl::OpTime& newCommitPoint) final {} diff --git a/src/mongo/db/batched_write_context.cpp b/src/mongo/db/batched_write_context.cpp new file mode 100644 index 00000000000..2a9517d9fe3 --- /dev/null +++ b/src/mongo/db/batched_write_context.cpp @@ -0,0 +1,73 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/batched_write_context.h" +#include "mongo/db/repl/oplog_entry.h" + +namespace mongo { +const OperationContext::Decoration<BatchedWriteContext> BatchedWriteContext::get = + OperationContext::declareDecoration<BatchedWriteContext>(); + +BatchedWriteContext::BatchedWriteContext() {} + +void BatchedWriteContext::addBatchedOperation(OperationContext* opCtx, + const repl::ReplOperation& operation) { + invariant(_batchWrites); + + // Current support is only limited to delete operations, no change stream pre-images, no + // multi-doc transactions, no retryable writes. + invariant(operation.getOpType() == repl::OpTypeEnum::kDelete); + invariant(operation.getChangeStreamPreImageRecordingMode() == + repl::ReplOperation::ChangeStreamPreImageRecordingMode::kOff); + invariant(!opCtx->inMultiDocumentTransaction()); + invariant(!opCtx->getTxnNumber()); + invariant(opCtx->lockState()->inAWriteUnitOfWork()); + + _batchedOperations.push_back(operation); +} + +std::vector<repl::ReplOperation>& BatchedWriteContext::getBatchedOperations( + OperationContext* opCtx) { + invariant(_batchWrites); + return _batchedOperations; +} + +void BatchedWriteContext::clearBatchedOperations(OperationContext* opCtx) { + invariant(_batchWrites); + _batchedOperations.clear(); +} + +bool BatchedWriteContext::writesAreBatched() const { + return _batchWrites; +} +void BatchedWriteContext::setWritesAreBatched(bool batched) { + _batchWrites = batched; +} + +} // namespace mongo diff --git a/src/mongo/db/batched_write_context.h b/src/mongo/db/batched_write_context.h new file mode 100644 index 00000000000..7aa5ec678e1 --- /dev/null +++ b/src/mongo/db/batched_write_context.h @@ -0,0 +1,82 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/operation_context.h" +#include "mongo/db/repl/oplog_entry.h" + +namespace mongo { + +/** + * Group multiple writes into a single applyOps entry. + */ + +/** + * This class is a decoration on the OperationContext holding context of writes that are logically + * related with each other. It can be used to stage writes belonging to the same WriteUnitOfWork or + * multi-document transaction. Currently only supports batching deletes in a WriteUnitOfWork. + */ +class BatchedWriteContext { +public: + static const OperationContext::Decoration<BatchedWriteContext> get; + + BatchedWriteContext(); + + // No copy and no move + BatchedWriteContext(const BatchedWriteContext&) = delete; + BatchedWriteContext(BatchedWriteContext&&) = delete; + BatchedWriteContext& operator=(const BatchedWriteContext&) = delete; + BatchedWriteContext& operator=(BatchedWriteContext&&) = delete; + + bool writesAreBatched() const; + void setWritesAreBatched(bool batched); + + /** + * Adds a stored operation to the list of stored operations for the current WUOW. It is illegal + * to add operations outside of a WUOW. + */ + void addBatchedOperation(OperationContext* opCtx, const repl::ReplOperation& operation); + + // Returns a reference to the stored operations for the current WUOW. + std::vector<repl::ReplOperation>& getBatchedOperations(OperationContext* opCtx); + void clearBatchedOperations(OperationContext* opCtx); + +private: + // Whether batching writes is enabled. + bool _batchWrites = false; + + /** + * Holds oplog data for operations which have been applied in the current batched + * write context. + */ + std::vector<repl::ReplOperation> _batchedOperations; +}; + +} // namespace mongo diff --git a/src/mongo/db/exec/batched_delete_stage.cpp b/src/mongo/db/exec/batched_delete_stage.cpp index bc0b1059861..7c4fac03e24 100644 --- a/src/mongo/db/exec/batched_delete_stage.cpp +++ b/src/mongo/db/exec/batched_delete_stage.cpp @@ -127,7 +127,7 @@ BatchedDeleteStage::BatchedDeleteStage(ExpressionContext* expCtx, : DeleteStage::DeleteStage( kStageType.rawData(), expCtx, std::move(params), ws, collection, child), _batchParams(std::move(batchParams)) { - uassert(6303800, + tassert(6303800, "batched deletions only support multi-document deletions (multi: true)", _params->isMulti); tassert(6303801, @@ -165,14 +165,13 @@ PlanStage::StageState BatchedDeleteStage::_deleteBatch(WorkingSetID* out) { std::terminate(); } - // TODO (SERVER-63047): use a single write timestamp by grouping oplog entries. - opCtx()->recoveryUnit()->ignoreAllMultiTimestampConstraints(); - const auto startOfBatchTimestampMillis = Date_t::now().toMillisSinceEpoch(); unsigned int docsDeleted = 0; std::vector<RecordId> recordsThatNoLongerMatch; try { - WriteUnitOfWork wuow(opCtx()); + // Start a WUOW with 'groupOplogEntries' which groups a delete batch into a single timestamp + // and oplog entry + WriteUnitOfWork wuow(opCtx(), true /* groupOplogEntries */); for (auto& [rid, snapshotId] : _ridMap) { if (MONGO_unlikely(throwWriteConflictExceptionInBatchedDeleteStage.shouldFail())) { diff --git a/src/mongo/db/exec/batched_delete_stage.idl b/src/mongo/db/exec/batched_delete_stage.idl index 6c1ed0de55b..3794cc1836d 100644 --- a/src/mongo/db/exec/batched_delete_stage.idl +++ b/src/mongo/db/exec/batched_delete_stage.idl @@ -46,7 +46,7 @@ server_parameters: set_at: [startup, runtime] cpp_vartype: 'AtomicWord<long long>' cpp_varname: "gBatchedDeletesTargetBatchDocs" - default: 100 + default: 10 # TODO (SERVER-64547): re-evaluate this default. validator: gte: 0 batchedDeletesTargetBatchTimeMS: diff --git a/src/mongo/db/fcv_op_observer.h b/src/mongo/db/fcv_op_observer.h index 920508eef86..0b638e96e77 100644 --- a/src/mongo/db/fcv_op_observer.h +++ b/src/mongo/db/fcv_op_observer.h @@ -179,12 +179,12 @@ public: void onUnpreparedTransactionCommit(OperationContext* opCtx, std::vector<repl::ReplOperation>* statements, size_t numberOfPrePostImagesToWrite) final {} + void onPreparedTransactionCommit( OperationContext* opCtx, OplogSlot commitOplogEntryOpTime, Timestamp commitTimestamp, const std::vector<repl::ReplOperation>& statements) noexcept final{}; - std::unique_ptr<ApplyOpsOplogSlotAndOperationAssignment> preTransactionPrepare( OperationContext* opCtx, const std::vector<OplogSlot>& reservedSlots, @@ -201,8 +201,12 @@ public: const ApplyOpsOplogSlotAndOperationAssignment* applyOpsOperationAssignment, size_t numberOfPrePostImagesToWrite, Date_t wallClockTime) final{}; + void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) final{}; + + void onBatchedWriteCommit(OperationContext* opCtx) final {} + void onMajorityCommitPointUpdate(ServiceContext* service, const repl::OpTime& newCommitPoint) final {} diff --git a/src/mongo/db/free_mon/free_mon_op_observer.h b/src/mongo/db/free_mon/free_mon_op_observer.h index b266555151c..40511b4360c 100644 --- a/src/mongo/db/free_mon/free_mon_op_observer.h +++ b/src/mongo/db/free_mon/free_mon_op_observer.h @@ -208,6 +208,8 @@ public: void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) final {} + void onBatchedWriteCommit(OperationContext* opCtx) final {} + void onMajorityCommitPointUpdate(ServiceContext* service, const repl::OpTime& newCommitPoint) final {} diff --git a/src/mongo/db/op_observer.h b/src/mongo/db/op_observer.h index 6c725dfd21f..a426783b907 100644 --- a/src/mongo/db/op_observer.h +++ b/src/mongo/db/op_observer.h @@ -413,6 +413,8 @@ public: Timestamp commitTimestamp, const std::vector<repl::ReplOperation>& statements) noexcept = 0; + virtual void onBatchedWriteCommit(OperationContext* opCtx) = 0; + /** * Contains "applyOps" oplog entries and oplog slots to be used for writing pre- and post- image * oplog entries for a transaction. "applyOps" entries are not actual "applyOps" entries to be diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp index fac3cd11e1e..4acebc278c6 100644 --- a/src/mongo/db/op_observer_impl.cpp +++ b/src/mongo/db/op_observer_impl.cpp @@ -37,6 +37,7 @@ #include <limits> #include "mongo/bson/bsonobjbuilder.h" +#include "mongo/db/batched_write_context.h" #include "mongo/db/catalog/collection_options.h" #include "mongo/db/catalog/database.h" #include "mongo/db/catalog/database_holder.h" @@ -831,8 +832,15 @@ void OpObserverImpl::onDelete(OperationContext* opCtx, const bool inMultiDocumentTransaction = txnParticipant && opCtx->writesAreReplicated() && txnParticipant.transactionIsOpen(); + auto& batchedWriteContext = BatchedWriteContext::get(opCtx); + const bool inBatchedWrite = batchedWriteContext.writesAreBatched(); + OpTimeBundle opTime; - if (inMultiDocumentTransaction) { + if (inBatchedWrite) { + auto operation = + MutableOplogEntry::makeDeleteOperation(nss, uuid, documentKey.getShardKeyAndId()); + batchedWriteContext.addBatchedOperation(opCtx, operation); + } else if (inMultiDocumentTransaction) { const bool inRetryableInternalTransaction = isInternalSessionForRetryableWrite(*opCtx->getLogicalSessionId()); @@ -1565,9 +1573,11 @@ void packTransactionStatementsForApplyOps( } } -// Logs one applyOps entry and may update the transactions table. Assumes that the given BSON -// builder object already has an 'applyOps' field appended pointing to the desired array of ops -// i.e. { "applyOps" : [op1, op2, ...] } +// Logs one applyOps entry on a prepared transaction, or an unprepared transaction's commit, or on +// committing a WUOW that is not necessarily tied to a multi-document transaction. It may update the +// transactions table on multi-document transactions. Assumes that the given BSON builder object +// already has an 'applyOps' field appended pointing to the desired array of ops i.e. { "applyOps" +// : [op1, op2, ...] } // // @param txnState the 'state' field of the transaction table entry update. @param startOpTime the // optime of the 'startOpTime' field of the transaction table entry update. If boost::none, no @@ -1576,24 +1586,26 @@ void packTransactionStatementsForApplyOps( // updated after the oplog entry is written. // // Returns the optime of the written oplog entry. -OpTimeBundle logApplyOpsForTransaction(OperationContext* opCtx, - MutableOplogEntry* oplogEntry, - boost::optional<DurableTxnStateEnum> txnState, - boost::optional<repl::OpTime> startOpTime, - std::vector<StmtId> stmtIdsWritten, - const bool updateTxnTable) { +OpTimeBundle logApplyOps(OperationContext* opCtx, + MutableOplogEntry* oplogEntry, + boost::optional<DurableTxnStateEnum> txnState, + boost::optional<repl::OpTime> startOpTime, + std::vector<StmtId> stmtIdsWritten, + const bool updateTxnTable) { if (!stmtIdsWritten.empty()) { invariant(isInternalSessionForRetryableWrite(*opCtx->getLogicalSessionId())); } - const auto txnRetryCounter = *opCtx->getTxnRetryCounter(); + const auto txnRetryCounter = opCtx->getTxnRetryCounter(); + + invariant(bool(txnRetryCounter) == bool(TransactionParticipant::get(opCtx))); oplogEntry->setOpType(repl::OpTypeEnum::kCommand); oplogEntry->setNss({"admin", "$cmd"}); oplogEntry->setSessionId(opCtx->getLogicalSessionId()); oplogEntry->setTxnNumber(opCtx->getTxnNumber()); - if (!isDefaultTxnRetryCounter(txnRetryCounter)) { - oplogEntry->getOperationSessionInfo().setTxnRetryCounter(txnRetryCounter); + if (txnRetryCounter && !isDefaultTxnRetryCounter(*txnRetryCounter)) { + oplogEntry->getOperationSessionInfo().setTxnRetryCounter(*txnRetryCounter); } try { @@ -1606,8 +1618,8 @@ OpTimeBundle logApplyOpsForTransaction(OperationContext* opCtx, sessionTxnRecord.setLastWriteDate(times.wallClockTime); sessionTxnRecord.setState(txnState); sessionTxnRecord.setStartOpTime(startOpTime); - if (!isDefaultTxnRetryCounter(txnRetryCounter)) { - sessionTxnRecord.setTxnRetryCounter(txnRetryCounter); + if (txnRetryCounter && !isDefaultTxnRetryCounter(*txnRetryCounter)) { + sessionTxnRecord.setTxnRetryCounter(*txnRetryCounter); } onWriteOpCompleted(opCtx, std::move(stmtIdsWritten), sessionTxnRecord); } @@ -1622,7 +1634,8 @@ OpTimeBundle logApplyOpsForTransaction(OperationContext* opCtx, MONGO_UNREACHABLE; } -// Logs transaction oplog entries for preparing a transaction or committing an unprepared +// Logs applyOps oplog entries for preparing a transaction, committing an unprepared +// transaction, or committing a WUOW that is not necessarily related to a multi-document // transaction. This includes the in-progress 'partialTxn' oplog entries followed by the implicit // prepare or commit entry. If the 'prepare' argument is true, it will log entries for a prepared // transaction. Otherwise, it logs entries for an unprepared transaction. The total number of oplog @@ -1645,7 +1658,7 @@ OpTimeBundle logApplyOpsForTransaction(OperationContext* opCtx, // skipping over some reserved slots. // // The number of oplog entries written is returned. -int logOplogEntriesForTransaction( +int logOplogEntries( OperationContext* opCtx, std::vector<repl::ReplOperation>* stmts, const std::vector<OplogSlot>& oplogSlots, @@ -1669,7 +1682,9 @@ int logOplogEntriesForTransaction( // OplogSlotReserver. invariant(opCtx->lockState()->isWriteLocked()); - prevWriteOpTime.writeOpTime = txnParticipant.getLastWriteOpTime(); + if (txnParticipant) { + prevWriteOpTime.writeOpTime = txnParticipant.getLastWriteOpTime(); + } auto currPrePostImageOplogEntryOplogSlot = applyOpsOperationAssignment.prePostImageOplogEntryOplogSlots.begin(); @@ -1786,9 +1801,9 @@ int logOplogEntriesForTransaction( applyOpsBuilder.append("count", static_cast<long long>(stmts->size())); } - // For both prepared and unprepared transactions, update the transactions table on - // the first and last op. - auto updateTxnTable = firstOp || lastOp; + // For both prepared and unprepared transactions (but not for batched writes) update the + // transactions table on the first and last op. + auto updateTxnTable = txnParticipant && (firstOp || lastOp); // The first optime of the transaction is always the first oplog slot, except in the // case of a single prepare oplog entry. @@ -1802,14 +1817,15 @@ int logOplogEntriesForTransaction( MutableOplogEntry oplogEntry; oplogEntry.setOpTime(applyOpsEntry.oplogSlot); - oplogEntry.setPrevWriteOpTimeInTransaction(prevWriteOpTime.writeOpTime); + if (txnParticipant) { + oplogEntry.setPrevWriteOpTimeInTransaction(prevWriteOpTime.writeOpTime); + } oplogEntry.setWallClockTime(wallClockTime); oplogEntry.setObject(applyOpsBuilder.done()); auto txnState = isPartialTxn ? DurableTxnStateEnum::kInProgress : (implicitPrepare ? DurableTxnStateEnum::kPrepared : DurableTxnStateEnum::kCommitted); - prevWriteOpTime = - logApplyOpsForTransaction(opCtx, + prevWriteOpTime = logApplyOps(opCtx, &oplogEntry, txnState, startOpTime, @@ -1923,15 +1939,14 @@ void OpObserverImpl::onUnpreparedTransactionCommit(OperationContext* opCtx, // Log in-progress entries for the transaction along with the implicit commit. boost::optional<ImageBundle> imageToWrite; - int numOplogEntries = logOplogEntriesForTransaction(opCtx, - statements, - oplogSlots, - applyOpsOplogSlotAndOperationAssignment, - &imageToWrite, - numberOfPrePostImagesToWrite, - false /* prepare*/, - wallClockTime); - + int numOplogEntries = logOplogEntries(opCtx, + statements, + oplogSlots, + applyOpsOplogSlotAndOperationAssignment, + &imageToWrite, + numberOfPrePostImagesToWrite, + false /* prepare*/, + wallClockTime); if (imageToWrite) { writeToImageCollection(opCtx, *opCtx->getLogicalSessionId(), @@ -1945,6 +1960,38 @@ void OpObserverImpl::onUnpreparedTransactionCommit(OperationContext* opCtx, shardObserveTransactionPrepareOrUnpreparedCommit(opCtx, *statements, commitOpTime); } +void OpObserverImpl::onBatchedWriteCommit(OperationContext* opCtx) { + if (repl::ReplicationCoordinator::get(opCtx)->getReplicationMode() != + repl::ReplicationCoordinator::modeReplSet || + !opCtx->writesAreReplicated()) { + return; + } + + auto& batchedWriteContext = BatchedWriteContext::get(opCtx); + auto& batchedOps = batchedWriteContext.getBatchedOperations(opCtx); + + // Reserve all the optimes in advance, so we only need to get the optime mutex once. We + // reserve enough entries for all statements in the transaction. + auto oplogSlots = repl::getNextOpTimes(opCtx, batchedOps.size()); + + auto noPrePostImage = boost::optional<ImageBundle>(boost::none); + + // Serialize batched statements to BSON and determine their assignment to "applyOps" + // entries. + const auto applyOpsOplogSlotAndOperationAssignment = + getApplyOpsOplogSlotAndOperationAssignmentForTransaction( + opCtx, oplogSlots, 0 /*numberOfPrePostImagesToWrite*/, false /*prepare*/, batchedOps); + const auto wallClockTime = getWallClockTimeForOpLog(opCtx); + logOplogEntries(opCtx, + &batchedOps, + oplogSlots, + applyOpsOplogSlotAndOperationAssignment, + &noPrePostImage, + 0 /* numberOfPrePostImagesToWrite */, + false, + wallClockTime); +} + void OpObserverImpl::onPreparedTransactionCommit( OperationContext* opCtx, OplogSlot commitOplogEntryOpTime, @@ -2024,14 +2071,14 @@ void OpObserverImpl::onTransactionPrepare( // the last reserved slot, because the transaction participant has already used // that as the prepare time. boost::optional<ImageBundle> imageToWrite; - logOplogEntriesForTransaction(opCtx, - statements, - reservedSlots, - *applyOpsOperationAssignment, - &imageToWrite, - numberOfPrePostImagesToWrite, - true /* prepare */, - wallClockTime); + logOplogEntries(opCtx, + statements, + reservedSlots, + *applyOpsOperationAssignment, + &imageToWrite, + numberOfPrePostImagesToWrite, + true /* prepare */, + wallClockTime); if (imageToWrite) { writeToImageCollection(opCtx, *opCtx->getLogicalSessionId(), @@ -2054,12 +2101,12 @@ void OpObserverImpl::onTransactionPrepare( oplogEntry.setPrevWriteOpTimeInTransaction(repl::OpTime()); oplogEntry.setObject(applyOpsBuilder.done()); oplogEntry.setWallClockTime(wallClockTime); - logApplyOpsForTransaction(opCtx, - &oplogEntry, - DurableTxnStateEnum::kPrepared, - oplogSlot, - {}, - true /* updateTxnTable */); + logApplyOps(opCtx, + &oplogEntry, + DurableTxnStateEnum::kPrepared, + oplogSlot, + {}, + true /* updateTxnTable */); } wuow.commit(); }); diff --git a/src/mongo/db/op_observer_impl.h b/src/mongo/db/op_observer_impl.h index 7d40fe84307..7843b06602f 100644 --- a/src/mongo/db/op_observer_impl.h +++ b/src/mongo/db/op_observer_impl.h @@ -187,6 +187,7 @@ public: void onUnpreparedTransactionCommit(OperationContext* opCtx, std::vector<repl::ReplOperation>* statements, size_t numberOfPrePostImagesToWrite) final; + void onBatchedWriteCommit(OperationContext* opCtx) final; void onPreparedTransactionCommit( OperationContext* opCtx, OplogSlot commitOplogEntryOpTime, diff --git a/src/mongo/db/op_observer_impl_test.cpp b/src/mongo/db/op_observer_impl_test.cpp index 4b2be83993e..27f497aedea 100644 --- a/src/mongo/db/op_observer_impl_test.cpp +++ b/src/mongo/db/op_observer_impl_test.cpp @@ -31,6 +31,7 @@ #include "mongo/platform/basic.h" +#include "mongo/db/batched_write_context.h" #include "mongo/db/catalog/import_collection_oplog_entry_gen.h" #include "mongo/db/client.h" #include "mongo/db/concurrency/locker_noop.h" @@ -41,6 +42,7 @@ #include "mongo/db/keys_collection_client_sharded.h" #include "mongo/db/keys_collection_manager.h" #include "mongo/db/logical_time_validator.h" +#include "mongo/db/namespace_string.h" #include "mongo/db/op_observer_impl.h" #include "mongo/db/op_observer_registry.h" #include "mongo/db/pipeline/change_stream_preimage_gen.h" @@ -2345,6 +2347,153 @@ struct DeleteTestCase { } }; +class BatchedWriteOutputsTest : public OpObserverTest { +protected: + const NamespaceString _nss{"test", "coll"}; + const UUID _uuid = UUID::gen(); +}; + +DEATH_TEST_REGEX_F(BatchedWriteOutputsTest, + TestCannotGroupInserts, + "Invariant failure.*getOpType.*repl::OpTypeEnum::kDelete") { + auto opCtxRaii = cc().makeOperationContext(); + OperationContext* opCtx = opCtxRaii.get(); + WriteUnitOfWork wuow(opCtx, true /* groupOplogEntries */); + + auto& bwc = BatchedWriteContext::get(opCtx); + bwc.addBatchedOperation(opCtx, + repl::MutableOplogEntry::makeInsertOperation( + _nss, _uuid, BSON("_id" << 0), BSON("_id" << 0))); +} + +DEATH_TEST_REGEX_F(BatchedWriteOutputsTest, + TestDoesNotSupportPreImagesInCollection, + "Invariant " + "failure.*getChangeStreamPreImageRecordingMode.*repl::ReplOperation::" + "ChangeStreamPreImageRecordingMode::kOff") { + auto opCtxRaii = cc().makeOperationContext(); + OperationContext* opCtx = opCtxRaii.get(); + WriteUnitOfWork wuow(opCtx, true /* groupOplogEntries */); + + auto& bwc = BatchedWriteContext::get(opCtx); + auto entry = repl::MutableOplogEntry::makeDeleteOperation(_nss, _uuid, BSON("_id" << 0)); + entry.setChangeStreamPreImageRecordingMode( + repl::ReplOperation::ChangeStreamPreImageRecordingMode::kPreImagesCollection); + bwc.addBatchedOperation(opCtx, entry); +} + +DEATH_TEST_REGEX_F(BatchedWriteOutputsTest, + TestDoesNotSupportPreImagesInOplog, + "Invariant " + "failure.*getChangeStreamPreImageRecordingMode.*repl::ReplOperation::" + "ChangeStreamPreImageRecordingMode::kOff") { + auto opCtxRaii = cc().makeOperationContext(); + OperationContext* opCtx = opCtxRaii.get(); + WriteUnitOfWork wuow(opCtx, true /* groupOplogEntries */); + + auto& bwc = BatchedWriteContext::get(opCtx); + auto entry = repl::MutableOplogEntry::makeDeleteOperation(_nss, _uuid, BSON("_id" << 0)); + entry.setChangeStreamPreImageRecordingMode( + repl::ReplOperation::ChangeStreamPreImageRecordingMode::kOplog); + bwc.addBatchedOperation(opCtx, entry); +} + +DEATH_TEST_REGEX_F(BatchedWriteOutputsTest, + TestDoesNotSupportMultiDocTxn, + "Invariant failure.*!opCtx->inMultiDocumentTransaction()") { + auto opCtxRaii = cc().makeOperationContext(); + OperationContext* opCtx = opCtxRaii.get(); + opCtx->setInMultiDocumentTransaction(); + WriteUnitOfWork wuow(opCtx, true /* groupOplogEntries */); + + auto& bwc = BatchedWriteContext::get(opCtx); + auto entry = repl::MutableOplogEntry::makeDeleteOperation(_nss, _uuid, BSON("_id" << 0)); + bwc.addBatchedOperation(opCtx, entry); +} + +DEATH_TEST_REGEX_F(BatchedWriteOutputsTest, + TestDoesNotSupportRetryableWrites, + "Invariant failure.*!opCtx->getTxnNumber()") { + auto opCtxRaii = cc().makeOperationContext(); + OperationContext* opCtx = opCtxRaii.get(); + opCtx->setLogicalSessionId(LogicalSessionId(makeLogicalSessionIdForTest())); + opCtx->setTxnNumber(TxnNumber{1}); + WriteUnitOfWork wuow(opCtx, true /* groupOplogEntries */); + + auto& bwc = BatchedWriteContext::get(opCtx); + auto entry = repl::MutableOplogEntry::makeDeleteOperation(_nss, _uuid, BSON("_id" << 0)); + bwc.addBatchedOperation(opCtx, entry); +} + +// Verifies that a WriteUnitOfWork with groupOplogEntries=true replicates its writes as a single +// applyOps. Tests WUOWs batching a range of 1 to 5 deletes (inclusive). +TEST_F(BatchedWriteOutputsTest, TestApplyOpsGrouping) { + const auto nDocsToDelete = 5; + const BSONObj docsToDelete[nDocsToDelete] = { + BSON("_id" << 0), + BSON("_id" << 1), + BSON("_id" << 2), + BSON("_id" << 3), + BSON("_id" << 4), + }; + + // Setup. + auto opCtxRaii = cc().makeOperationContext(); + OperationContext* opCtx = opCtxRaii.get(); + reset(opCtx, NamespaceString::kRsOplogNamespace); + auto opObserverRegistry = std::make_unique<OpObserverRegistry>(); + opObserverRegistry->addObserver(std::make_unique<OpObserverImpl>()); + opCtx->getServiceContext()->setOpObserver(std::move(opObserverRegistry)); + + // Run the test with WUOW's grouping 1 to 5 deletions. + for (size_t docsToBeBatched = 1; docsToBeBatched <= nDocsToDelete; docsToBeBatched++) { + + // Start a WUOW with groupOplogEntries=true. Verify that initialises the + // BatchedWriteContext. + auto& bwc = BatchedWriteContext::get(opCtx); + ASSERT(!bwc.writesAreBatched()); + WriteUnitOfWork wuow(opCtx, true /* groupOplogEntries */); + ASSERT(bwc.writesAreBatched()); + + AutoGetCollection locks(opCtx, _nss, LockMode::MODE_IX); + + for (size_t doc = 0; doc < docsToBeBatched; doc++) { + // This test does not call `OpObserver::aboutToDelete`. That method has the side-effect + // of setting of `documentKey` on the delete for sharding purposes. + // `OpObserverImpl::onDelete` asserts its existence. + documentKeyDecoration(opCtx).emplace(docsToDelete[doc]["_id"].wrap(), boost::none); + const OplogDeleteEntryArgs args; + opCtx->getServiceContext()->getOpObserver()->onDelete( + opCtx, _nss, _uuid, kUninitializedStmtId, args); + } + + wuow.commit(); + + // Retrieve the oplog entries. We expect 'docsToBeBatched' oplog entries because of previous + // iteration of this loop that exercised previous batch sizes. + std::vector<BSONObj> oplogs = getNOplogEntries(opCtx, docsToBeBatched); + // Entries in ascending timestamp order, so fetch the last one at the back of the vector. + auto lastOplogEntry = oplogs.back(); + auto lastOplogEntryParsed = assertGet(OplogEntry::parse(oplogs.back())); + + // The batch consists of an applyOps, whose array contains all deletes issued within the + // WUOW. + ASSERT(lastOplogEntryParsed.getCommandType() == OplogEntry::CommandType::kApplyOps); + std::vector<repl::OplogEntry> innerEntries; + repl::ApplyOps::extractOperationsTo( + lastOplogEntryParsed, lastOplogEntryParsed.getEntry().toBSON(), &innerEntries); + ASSERT_EQ(innerEntries.size(), docsToBeBatched); + + for (size_t opIdx = 0; opIdx < docsToBeBatched; opIdx++) { + const auto innerEntry = innerEntries[opIdx]; + ASSERT(innerEntry.getCommandType() == OplogEntry::CommandType::kNotCommand); + ASSERT(innerEntry.getOpType() == repl::OpTypeEnum::kDelete); + ASSERT(innerEntry.getNss() == NamespaceString("test.coll")); + ASSERT(0 == innerEntry.getObject().woCompare(docsToDelete[opIdx])); + } + } +} + class OnDeleteOutputsTest : public OpObserverTest { protected: diff --git a/src/mongo/db/op_observer_noop.h b/src/mongo/db/op_observer_noop.h index ab610264ce1..bee9db82207 100644 --- a/src/mongo/db/op_observer_noop.h +++ b/src/mongo/db/op_observer_noop.h @@ -161,6 +161,7 @@ public: void onUnpreparedTransactionCommit(OperationContext* opCtx, std::vector<repl::ReplOperation>* statements, size_t numberOfPrePostImagesToWrite) override {} + void onBatchedWriteCommit(OperationContext* opCtx) final {} void onPreparedTransactionCommit( OperationContext* opCtx, OplogSlot commitOplogEntryOpTime, diff --git a/src/mongo/db/op_observer_registry.h b/src/mongo/db/op_observer_registry.h index 5186d6a3f47..e153d46d65e 100644 --- a/src/mongo/db/op_observer_registry.h +++ b/src/mongo/db/op_observer_registry.h @@ -430,6 +430,13 @@ public: o->onTransactionAbort(opCtx, abortOplogEntryOpTime); } + void onBatchedWriteCommit(OperationContext* opCtx) override { + ReservedTimes times{opCtx}; + for (auto& o : _observers) { + o->onBatchedWriteCommit(opCtx); + } + } + void onMajorityCommitPointUpdate(ServiceContext* service, const repl::OpTime& newCommitPoint) override { for (auto& o : _observers) diff --git a/src/mongo/db/query/get_executor.cpp b/src/mongo/db/query/get_executor.cpp index 8f526c82ade..0c12807f3e2 100644 --- a/src/mongo/db/query/get_executor.cpp +++ b/src/mongo/db/query/get_executor.cpp @@ -1647,7 +1647,8 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorDele deleteStageParams->canonicalQuery = cq.get(); if (MONGO_unlikely(gInternalBatchUserMultiDeletesForTest.load() && - nss.ns() == "__internalBatchedDeletesTesting.Collection0")) { + nss.ns() == "__internalBatchedDeletesTesting.Collection0" && + deleteStageParams->isMulti)) { root = std::make_unique<BatchedDeleteStage>(cq->getExpCtxRaw(), std::move(deleteStageParams), diff --git a/src/mongo/db/repl/primary_only_service_op_observer.h b/src/mongo/db/repl/primary_only_service_op_observer.h index a81b66e04cd..63a93f69bb9 100644 --- a/src/mongo/db/repl/primary_only_service_op_observer.h +++ b/src/mongo/db/repl/primary_only_service_op_observer.h @@ -210,6 +210,8 @@ public: void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) final {} + void onBatchedWriteCommit(OperationContext* opCtx) final {} + void onMajorityCommitPointUpdate(ServiceContext* service, const repl::OpTime& newCommitPoint) final {} diff --git a/src/mongo/db/repl/tenant_migration_donor_op_observer.h b/src/mongo/db/repl/tenant_migration_donor_op_observer.h index 6ab805ee967..2f977b38c52 100644 --- a/src/mongo/db/repl/tenant_migration_donor_op_observer.h +++ b/src/mongo/db/repl/tenant_migration_donor_op_observer.h @@ -207,6 +207,8 @@ public: void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) final {} + void onBatchedWriteCommit(OperationContext* opCtx) final {} + void onMajorityCommitPointUpdate(ServiceContext* service, const repl::OpTime& newCommitPoint) final; diff --git a/src/mongo/db/repl/tenant_migration_recipient_op_observer.h b/src/mongo/db/repl/tenant_migration_recipient_op_observer.h index b7b0a88ca96..3a6d9e95922 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_op_observer.h +++ b/src/mongo/db/repl/tenant_migration_recipient_op_observer.h @@ -209,6 +209,8 @@ public: void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) final {} + void onBatchedWriteCommit(OperationContext* opCtx) final {} + void onMajorityCommitPointUpdate(ServiceContext* service, const repl::OpTime& newCommitPoint) final {} diff --git a/src/mongo/db/s/config_server_op_observer.h b/src/mongo/db/s/config_server_op_observer.h index 776ce97af3d..ddb36cfbd4c 100644 --- a/src/mongo/db/s/config_server_op_observer.h +++ b/src/mongo/db/s/config_server_op_observer.h @@ -210,6 +210,8 @@ public: void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) override {} + void onBatchedWriteCommit(OperationContext* opCtx) final {} + void onMajorityCommitPointUpdate(ServiceContext* service, const repl::OpTime& newCommitPoint) override; diff --git a/src/mongo/db/s/resharding/resharding_op_observer.h b/src/mongo/db/s/resharding/resharding_op_observer.h index be4dfc18505..35630ed875a 100644 --- a/src/mongo/db/s/resharding/resharding_op_observer.h +++ b/src/mongo/db/s/resharding/resharding_op_observer.h @@ -230,6 +230,8 @@ public: void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) override {} + void onBatchedWriteCommit(OperationContext* opCtx) final {} + void onMajorityCommitPointUpdate(ServiceContext* service, const repl::OpTime& newCommitPoint) override {} diff --git a/src/mongo/db/s/shard_server_op_observer.h b/src/mongo/db/s/shard_server_op_observer.h index 684ef8153bf..82449c72136 100644 --- a/src/mongo/db/s/shard_server_op_observer.h +++ b/src/mongo/db/s/shard_server_op_observer.h @@ -209,6 +209,8 @@ public: void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) override {} + void onBatchedWriteCommit(OperationContext* opCtx) final {} + void onMajorityCommitPointUpdate(ServiceContext* service, const repl::OpTime& newCommitPoint) override {} diff --git a/src/mongo/db/serverless/shard_split_donor_op_observer.h b/src/mongo/db/serverless/shard_split_donor_op_observer.h index 27b05527cce..f55a526e389 100644 --- a/src/mongo/db/serverless/shard_split_donor_op_observer.h +++ b/src/mongo/db/serverless/shard_split_donor_op_observer.h @@ -206,6 +206,8 @@ public: void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) final {} + void onBatchedWriteCommit(OperationContext* opCtx) final {} + void onMajorityCommitPointUpdate(ServiceContext* service, const repl::OpTime& newCommitPoint) final; diff --git a/src/mongo/db/storage/SConscript b/src/mongo/db/storage/SConscript index 22c96f38cba..b216260dde0 100644 --- a/src/mongo/db/storage/SConscript +++ b/src/mongo/db/storage/SConscript @@ -370,6 +370,7 @@ env.Library( ], LIBDEPS_PRIVATE=[ "$BUILD_DIR/mongo/base", + '$BUILD_DIR/mongo/db/batched_write_context', "$BUILD_DIR/mongo/db/storage/storage_options", '$BUILD_DIR/mongo/util/fail_point', 'recovery_unit_base', diff --git a/src/mongo/db/storage/write_unit_of_work.cpp b/src/mongo/db/storage/write_unit_of_work.cpp index 157a0b50b2a..29b02e560b9 100644 --- a/src/mongo/db/storage/write_unit_of_work.cpp +++ b/src/mongo/db/storage/write_unit_of_work.cpp @@ -33,7 +33,9 @@ #include "mongo/db/storage/write_unit_of_work.h" +#include "mongo/db/batched_write_context.h" #include "mongo/db/catalog/uncommitted_collections.h" +#include "mongo/db/op_observer.h" #include "mongo/db/operation_context.h" #include "mongo/logv2/log.h" #include "mongo/util/fail_point.h" @@ -43,11 +45,21 @@ namespace mongo { MONGO_FAIL_POINT_DEFINE(sleepBeforeCommit); -WriteUnitOfWork::WriteUnitOfWork(OperationContext* opCtx) - : _opCtx(opCtx), _toplevel(opCtx->_ruState == RecoveryUnitState::kNotInUnitOfWork) { +WriteUnitOfWork::WriteUnitOfWork(OperationContext* opCtx, bool groupOplogEntries) + : _opCtx(opCtx), + _toplevel(opCtx->_ruState == RecoveryUnitState::kNotInUnitOfWork), + _groupOplogEntries(groupOplogEntries) { uassert(ErrorCodes::IllegalOperation, "Cannot execute a write operation in read-only mode", !storageGlobalParams.readOnly); + // Grouping oplog entries doesn't support WUOW nesting (e.g. multi-doc transactions). + invariant(_toplevel || !_groupOplogEntries); + + if (_groupOplogEntries) { + auto& batchedWriteContext = BatchedWriteContext::get(_opCtx); + batchedWriteContext.setWritesAreBatched(true); + } + _opCtx->lockState()->beginWriteUnitOfWork(); if (_toplevel) { _opCtx->recoveryUnit()->beginUnitOfWork(_opCtx); @@ -70,6 +82,12 @@ WriteUnitOfWork::~WriteUnitOfWork() { } _opCtx->lockState()->endWriteUnitOfWork(); } + + if (_groupOplogEntries) { + auto& batchedWriteContext = BatchedWriteContext::get(_opCtx); + batchedWriteContext.clearBatchedOperations(_opCtx); + batchedWriteContext.setWritesAreBatched(false); + } } std::unique_ptr<WriteUnitOfWork> WriteUnitOfWork::createForSnapshotResume( @@ -107,6 +125,12 @@ void WriteUnitOfWork::commit() { invariant(!_committed); invariant(!_released); invariant(_opCtx->_ruState == RecoveryUnitState::kActiveUnitOfWork); + + if (_groupOplogEntries) { + const auto opObserver = _opCtx->getServiceContext()->getOpObserver(); + invariant(opObserver); + opObserver->onBatchedWriteCommit(_opCtx); + } if (_toplevel) { if (MONGO_unlikely(sleepBeforeCommit.shouldFail())) { sleepFor(Milliseconds(100)); diff --git a/src/mongo/db/storage/write_unit_of_work.h b/src/mongo/db/storage/write_unit_of_work.h index 3d8130c290e..3bb8e461563 100644 --- a/src/mongo/db/storage/write_unit_of_work.h +++ b/src/mongo/db/storage/write_unit_of_work.h @@ -59,7 +59,7 @@ public: kFailedUnitOfWork // in a unit of work that has failed and must be aborted }; - WriteUnitOfWork(OperationContext* opCtx); + WriteUnitOfWork(OperationContext* opCtx, bool groupOplogEntries = false); ~WriteUnitOfWork(); @@ -101,6 +101,7 @@ private: OperationContext* _opCtx; bool _toplevel; + bool _groupOplogEntries; bool _committed = false; bool _prepared = false; diff --git a/src/mongo/db/user_write_block_mode_op_observer.h b/src/mongo/db/user_write_block_mode_op_observer.h index adef807ed97..a3cfd78c518 100644 --- a/src/mongo/db/user_write_block_mode_op_observer.h +++ b/src/mongo/db/user_write_block_mode_op_observer.h @@ -217,6 +217,8 @@ public: void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) final {} + void onBatchedWriteCommit(OperationContext* opCtx) final {} + void onMajorityCommitPointUpdate(ServiceContext* service, const repl::OpTime& newCommitPoint) final {} diff --git a/src/mongo/idl/cluster_server_parameter_op_observer.h b/src/mongo/idl/cluster_server_parameter_op_observer.h index 932e99f44a3..b2c87f83fb2 100644 --- a/src/mongo/idl/cluster_server_parameter_op_observer.h +++ b/src/mongo/idl/cluster_server_parameter_op_observer.h @@ -195,6 +195,8 @@ public: std::vector<repl::ReplOperation>* statements, size_t numberOfPrePostImagesToWrite) final {} + void onBatchedWriteCommit(OperationContext* opCtx) final {} + void onPreparedTransactionCommit( OperationContext* opCtx, OplogSlot commitOplogEntryOpTime, |