diff options
author | Benety Goh <benety@mongodb.com> | 2022-06-28 07:28:20 -0400 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-06-28 12:03:59 +0000 |
commit | 3e27134024d243b03e0d52c2e8e966f5aeb7f84f (patch) | |
tree | 7df4b6b24a149c21f7771e078c6d28db83e16fa1 /src | |
parent | 4be8a73678005831f48c4e12335b37c97187c044 (diff) | |
download | mongo-3e27134024d243b03e0d52c2e8e966f5aeb7f84f.tar.gz |
SERVER-67281 add OpObserver::onBatchedWriteStart() and onBatchedWriteAbort()
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/auth/auth_op_observer.h | 4 | ||||
-rw-r--r-- | src/mongo/db/fcv_op_observer.h | 4 | ||||
-rw-r--r-- | src/mongo/db/free_mon/free_mon_op_observer.h | 4 | ||||
-rw-r--r-- | src/mongo/db/op_observer.h | 19 | ||||
-rw-r--r-- | src/mongo/db/op_observer_impl.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/op_observer_impl.h | 2 | ||||
-rw-r--r-- | src/mongo/db/op_observer_noop.h | 2 | ||||
-rw-r--r-- | src/mongo/db/op_observer_registry.h | 14 | ||||
-rw-r--r-- | src/mongo/db/repl/primary_only_service_op_observer.h | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_migration_donor_op_observer.h | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_migration_recipient_op_observer.h | 4 | ||||
-rw-r--r-- | src/mongo/db/s/config_server_op_observer.h | 4 | ||||
-rw-r--r-- | src/mongo/db/s/resharding/resharding_op_observer.h | 4 | ||||
-rw-r--r-- | src/mongo/db/s/shard_server_op_observer.h | 4 | ||||
-rw-r--r-- | src/mongo/db/serverless/shard_split_donor_op_observer.h | 4 | ||||
-rw-r--r-- | src/mongo/db/user_write_block_mode_op_observer.h | 4 | ||||
-rw-r--r-- | src/mongo/idl/cluster_server_parameter_op_observer.h | 4 |
17 files changed, 89 insertions, 0 deletions
diff --git a/src/mongo/db/auth/auth_op_observer.h b/src/mongo/db/auth/auth_op_observer.h index c145ebc3371..4c5b1692dd2 100644 --- a/src/mongo/db/auth/auth_op_observer.h +++ b/src/mongo/db/auth/auth_op_observer.h @@ -209,8 +209,12 @@ public: void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) final {} + void onBatchedWriteStart(OperationContext* opCtx) final {} + void onBatchedWriteCommit(OperationContext* opCtx) final {} + void onBatchedWriteAbort(OperationContext* opCtx) final {} + void onMajorityCommitPointUpdate(ServiceContext* service, const repl::OpTime& newCommitPoint) final {} diff --git a/src/mongo/db/fcv_op_observer.h b/src/mongo/db/fcv_op_observer.h index 257be0c609a..ac8ebaf923f 100644 --- a/src/mongo/db/fcv_op_observer.h +++ b/src/mongo/db/fcv_op_observer.h @@ -205,8 +205,12 @@ public: void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) final{}; + void onBatchedWriteStart(OperationContext* opCtx) final {} + void onBatchedWriteCommit(OperationContext* opCtx) final {} + void onBatchedWriteAbort(OperationContext* opCtx) final {} + void onMajorityCommitPointUpdate(ServiceContext* service, const repl::OpTime& newCommitPoint) final {} diff --git a/src/mongo/db/free_mon/free_mon_op_observer.h b/src/mongo/db/free_mon/free_mon_op_observer.h index 498d728d187..2a2a0c60097 100644 --- a/src/mongo/db/free_mon/free_mon_op_observer.h +++ b/src/mongo/db/free_mon/free_mon_op_observer.h @@ -209,8 +209,12 @@ public: void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) final {} + void onBatchedWriteStart(OperationContext* opCtx) final {} + void onBatchedWriteCommit(OperationContext* opCtx) final {} + void onBatchedWriteAbort(OperationContext* opCtx) final {} + void onMajorityCommitPointUpdate(ServiceContext* service, const repl::OpTime& newCommitPoint) final {} diff --git a/src/mongo/db/op_observer.h b/src/mongo/db/op_observer.h index 163d4738b0e..ac8b04fb2f6 100644 --- a/src/mongo/db/op_observer.h +++ b/src/mongo/db/op_observer.h @@ -414,9 +414,28 @@ public: Timestamp commitTimestamp, const std::vector<repl::ReplOperation>& statements) noexcept = 0; + /** + * Events for logical grouping of writes to be replicated atomically. + * After onBatchedWriteStart(), the replication subsystem is prepared to + * start collecting operations to replicate in an applyOps oplog entry. + */ + virtual void onBatchedWriteStart(OperationContext* opCtx) = 0; + + /** + * The write operations between onBatchedWriteStart() and onBatchedWriteCommit() + * are gathered in a single applyOps oplog entry, similar to atomic applyOps and + * multi-doc transactions, and written to the oplog. + */ virtual void onBatchedWriteCommit(OperationContext* opCtx) = 0; /** + * Clears the accumulated write operations. No further writes is allowed in this storage + * transaction (WriteUnitOfWork). Calling this function after onBatchedWriteCommit() + * should be fine for cleanup purposes. + */ + virtual void onBatchedWriteAbort(OperationContext* opCtx) = 0; + + /** * Contains "applyOps" oplog entries and oplog slots to be used for writing pre- and post- image * oplog entries for a transaction. "applyOps" entries are not actual "applyOps" entries to be * written to the oplog, but comprise certain parts of those entries - BSON serialized diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp index 383f5d47e34..773edc1cd63 100644 --- a/src/mongo/db/op_observer_impl.cpp +++ b/src/mongo/db/op_observer_impl.cpp @@ -2085,6 +2085,8 @@ void OpObserverImpl::onUnpreparedTransactionCommit(OperationContext* opCtx, shardObserveTransactionPrepareOrUnpreparedCommit(opCtx, *statements, commitOpTime); } +void OpObserverImpl::onBatchedWriteStart(OperationContext* opCtx) {} + void OpObserverImpl::onBatchedWriteCommit(OperationContext* opCtx) { if (repl::ReplicationCoordinator::get(opCtx)->getReplicationMode() != repl::ReplicationCoordinator::modeReplSet || @@ -2127,6 +2129,8 @@ void OpObserverImpl::onBatchedWriteCommit(OperationContext* opCtx) { wallClockTime); } +void OpObserverImpl::onBatchedWriteAbort(OperationContext* opCtx) {} + void OpObserverImpl::onPreparedTransactionCommit( OperationContext* opCtx, OplogSlot commitOplogEntryOpTime, diff --git a/src/mongo/db/op_observer_impl.h b/src/mongo/db/op_observer_impl.h index 77676e9548e..cc4b0cb34ba 100644 --- a/src/mongo/db/op_observer_impl.h +++ b/src/mongo/db/op_observer_impl.h @@ -188,7 +188,9 @@ public: void onUnpreparedTransactionCommit(OperationContext* opCtx, std::vector<repl::ReplOperation>* statements, size_t numberOfPrePostImagesToWrite) final; + void onBatchedWriteStart(OperationContext* opCtx) final; void onBatchedWriteCommit(OperationContext* opCtx) final; + void onBatchedWriteAbort(OperationContext* opCtx) final; void onPreparedTransactionCommit( OperationContext* opCtx, OplogSlot commitOplogEntryOpTime, diff --git a/src/mongo/db/op_observer_noop.h b/src/mongo/db/op_observer_noop.h index 667164d7564..155a13819bc 100644 --- a/src/mongo/db/op_observer_noop.h +++ b/src/mongo/db/op_observer_noop.h @@ -162,7 +162,9 @@ public: void onUnpreparedTransactionCommit(OperationContext* opCtx, std::vector<repl::ReplOperation>* statements, size_t numberOfPrePostImagesToWrite) override {} + void onBatchedWriteStart(OperationContext* opCtx) final {} void onBatchedWriteCommit(OperationContext* opCtx) final {} + void onBatchedWriteAbort(OperationContext* opCtx) final {} void onPreparedTransactionCommit( OperationContext* opCtx, OplogSlot commitOplogEntryOpTime, diff --git a/src/mongo/db/op_observer_registry.h b/src/mongo/db/op_observer_registry.h index 4b30bc7c72b..56323d628cb 100644 --- a/src/mongo/db/op_observer_registry.h +++ b/src/mongo/db/op_observer_registry.h @@ -432,6 +432,13 @@ public: o->onTransactionAbort(opCtx, abortOplogEntryOpTime); } + void onBatchedWriteStart(OperationContext* opCtx) override { + ReservedTimes times{opCtx}; + for (auto& o : _observers) { + o->onBatchedWriteStart(opCtx); + } + } + void onBatchedWriteCommit(OperationContext* opCtx) override { ReservedTimes times{opCtx}; for (auto& o : _observers) { @@ -439,6 +446,13 @@ public: } } + void onBatchedWriteAbort(OperationContext* opCtx) override { + ReservedTimes times{opCtx}; + for (auto& o : _observers) { + o->onBatchedWriteAbort(opCtx); + } + } + void onMajorityCommitPointUpdate(ServiceContext* service, const repl::OpTime& newCommitPoint) override { for (auto& o : _observers) diff --git a/src/mongo/db/repl/primary_only_service_op_observer.h b/src/mongo/db/repl/primary_only_service_op_observer.h index 5f552149f98..15673bf3aa8 100644 --- a/src/mongo/db/repl/primary_only_service_op_observer.h +++ b/src/mongo/db/repl/primary_only_service_op_observer.h @@ -211,8 +211,12 @@ public: void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) final {} + void onBatchedWriteStart(OperationContext* opCtx) final {} + void onBatchedWriteCommit(OperationContext* opCtx) final {} + void onBatchedWriteAbort(OperationContext* opCtx) final {} + void onMajorityCommitPointUpdate(ServiceContext* service, const repl::OpTime& newCommitPoint) final {} diff --git a/src/mongo/db/repl/tenant_migration_donor_op_observer.h b/src/mongo/db/repl/tenant_migration_donor_op_observer.h index 43992f5e040..1760c8e551a 100644 --- a/src/mongo/db/repl/tenant_migration_donor_op_observer.h +++ b/src/mongo/db/repl/tenant_migration_donor_op_observer.h @@ -208,8 +208,12 @@ public: void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) final {} + void onBatchedWriteStart(OperationContext* opCtx) final {} + void onBatchedWriteCommit(OperationContext* opCtx) final {} + void onBatchedWriteAbort(OperationContext* opCtx) final {} + void onMajorityCommitPointUpdate(ServiceContext* service, const repl::OpTime& newCommitPoint) final; diff --git a/src/mongo/db/repl/tenant_migration_recipient_op_observer.h b/src/mongo/db/repl/tenant_migration_recipient_op_observer.h index dd42ff6581f..33b15910776 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_op_observer.h +++ b/src/mongo/db/repl/tenant_migration_recipient_op_observer.h @@ -210,8 +210,12 @@ public: void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) final {} + void onBatchedWriteStart(OperationContext* opCtx) final {} + void onBatchedWriteCommit(OperationContext* opCtx) final {} + void onBatchedWriteAbort(OperationContext* opCtx) final {} + void onMajorityCommitPointUpdate(ServiceContext* service, const repl::OpTime& newCommitPoint) final {} diff --git a/src/mongo/db/s/config_server_op_observer.h b/src/mongo/db/s/config_server_op_observer.h index b5e5d04d7a6..134edc88933 100644 --- a/src/mongo/db/s/config_server_op_observer.h +++ b/src/mongo/db/s/config_server_op_observer.h @@ -211,8 +211,12 @@ public: void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) override {} + void onBatchedWriteStart(OperationContext* opCtx) final {} + void onBatchedWriteCommit(OperationContext* opCtx) final {} + void onBatchedWriteAbort(OperationContext* opCtx) final {} + void onMajorityCommitPointUpdate(ServiceContext* service, const repl::OpTime& newCommitPoint) override; diff --git a/src/mongo/db/s/resharding/resharding_op_observer.h b/src/mongo/db/s/resharding/resharding_op_observer.h index 30d319a041d..d3343f40276 100644 --- a/src/mongo/db/s/resharding/resharding_op_observer.h +++ b/src/mongo/db/s/resharding/resharding_op_observer.h @@ -231,8 +231,12 @@ public: void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) override {} + void onBatchedWriteStart(OperationContext* opCtx) final {} + void onBatchedWriteCommit(OperationContext* opCtx) final {} + void onBatchedWriteAbort(OperationContext* opCtx) final {} + void onMajorityCommitPointUpdate(ServiceContext* service, const repl::OpTime& newCommitPoint) override {} diff --git a/src/mongo/db/s/shard_server_op_observer.h b/src/mongo/db/s/shard_server_op_observer.h index 5a0671254d6..4d615cbf5ea 100644 --- a/src/mongo/db/s/shard_server_op_observer.h +++ b/src/mongo/db/s/shard_server_op_observer.h @@ -210,8 +210,12 @@ public: void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) override {} + void onBatchedWriteStart(OperationContext* opCtx) final {} + void onBatchedWriteCommit(OperationContext* opCtx) final {} + void onBatchedWriteAbort(OperationContext* opCtx) final {} + void onMajorityCommitPointUpdate(ServiceContext* service, const repl::OpTime& newCommitPoint) override {} diff --git a/src/mongo/db/serverless/shard_split_donor_op_observer.h b/src/mongo/db/serverless/shard_split_donor_op_observer.h index 55796479ffe..ab80fe558a8 100644 --- a/src/mongo/db/serverless/shard_split_donor_op_observer.h +++ b/src/mongo/db/serverless/shard_split_donor_op_observer.h @@ -207,8 +207,12 @@ public: void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) final {} + void onBatchedWriteStart(OperationContext* opCtx) final {} + void onBatchedWriteCommit(OperationContext* opCtx) final {} + void onBatchedWriteAbort(OperationContext* opCtx) final {} + void onMajorityCommitPointUpdate(ServiceContext* service, const repl::OpTime& newCommitPoint) final; diff --git a/src/mongo/db/user_write_block_mode_op_observer.h b/src/mongo/db/user_write_block_mode_op_observer.h index 330779d0cd6..d339c38925f 100644 --- a/src/mongo/db/user_write_block_mode_op_observer.h +++ b/src/mongo/db/user_write_block_mode_op_observer.h @@ -230,8 +230,12 @@ public: void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) final {} + void onBatchedWriteStart(OperationContext* opCtx) final {} + void onBatchedWriteCommit(OperationContext* opCtx) final {} + void onBatchedWriteAbort(OperationContext* opCtx) final {} + void onMajorityCommitPointUpdate(ServiceContext* service, const repl::OpTime& newCommitPoint) final {} diff --git a/src/mongo/idl/cluster_server_parameter_op_observer.h b/src/mongo/idl/cluster_server_parameter_op_observer.h index 2ef05729e39..f19dcc91337 100644 --- a/src/mongo/idl/cluster_server_parameter_op_observer.h +++ b/src/mongo/idl/cluster_server_parameter_op_observer.h @@ -187,8 +187,12 @@ public: std::vector<repl::ReplOperation>* statements, size_t numberOfPrePostImagesToWrite) final {} + void onBatchedWriteStart(OperationContext* opCtx) final {} + void onBatchedWriteCommit(OperationContext* opCtx) final {} + void onBatchedWriteAbort(OperationContext* opCtx) final {} + void onPreparedTransactionCommit( OperationContext* opCtx, OplogSlot commitOplogEntryOpTime, |