summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2022-06-28 07:28:20 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-06-28 12:03:59 +0000
commit3e27134024d243b03e0d52c2e8e966f5aeb7f84f (patch)
tree7df4b6b24a149c21f7771e078c6d28db83e16fa1 /src
parent4be8a73678005831f48c4e12335b37c97187c044 (diff)
downloadmongo-3e27134024d243b03e0d52c2e8e966f5aeb7f84f.tar.gz
SERVER-67281 add OpObserver::onBatchedWriteStart() and onBatchedWriteAbort()
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/auth/auth_op_observer.h4
-rw-r--r--src/mongo/db/fcv_op_observer.h4
-rw-r--r--src/mongo/db/free_mon/free_mon_op_observer.h4
-rw-r--r--src/mongo/db/op_observer.h19
-rw-r--r--src/mongo/db/op_observer_impl.cpp4
-rw-r--r--src/mongo/db/op_observer_impl.h2
-rw-r--r--src/mongo/db/op_observer_noop.h2
-rw-r--r--src/mongo/db/op_observer_registry.h14
-rw-r--r--src/mongo/db/repl/primary_only_service_op_observer.h4
-rw-r--r--src/mongo/db/repl/tenant_migration_donor_op_observer.h4
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_op_observer.h4
-rw-r--r--src/mongo/db/s/config_server_op_observer.h4
-rw-r--r--src/mongo/db/s/resharding/resharding_op_observer.h4
-rw-r--r--src/mongo/db/s/shard_server_op_observer.h4
-rw-r--r--src/mongo/db/serverless/shard_split_donor_op_observer.h4
-rw-r--r--src/mongo/db/user_write_block_mode_op_observer.h4
-rw-r--r--src/mongo/idl/cluster_server_parameter_op_observer.h4
17 files changed, 89 insertions, 0 deletions
diff --git a/src/mongo/db/auth/auth_op_observer.h b/src/mongo/db/auth/auth_op_observer.h
index c145ebc3371..4c5b1692dd2 100644
--- a/src/mongo/db/auth/auth_op_observer.h
+++ b/src/mongo/db/auth/auth_op_observer.h
@@ -209,8 +209,12 @@ public:
void onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) final {}
+ void onBatchedWriteStart(OperationContext* opCtx) final {}
+
void onBatchedWriteCommit(OperationContext* opCtx) final {}
+ void onBatchedWriteAbort(OperationContext* opCtx) final {}
+
void onMajorityCommitPointUpdate(ServiceContext* service,
const repl::OpTime& newCommitPoint) final {}
diff --git a/src/mongo/db/fcv_op_observer.h b/src/mongo/db/fcv_op_observer.h
index 257be0c609a..ac8ebaf923f 100644
--- a/src/mongo/db/fcv_op_observer.h
+++ b/src/mongo/db/fcv_op_observer.h
@@ -205,8 +205,12 @@ public:
void onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) final{};
+ void onBatchedWriteStart(OperationContext* opCtx) final {}
+
void onBatchedWriteCommit(OperationContext* opCtx) final {}
+ void onBatchedWriteAbort(OperationContext* opCtx) final {}
+
void onMajorityCommitPointUpdate(ServiceContext* service,
const repl::OpTime& newCommitPoint) final {}
diff --git a/src/mongo/db/free_mon/free_mon_op_observer.h b/src/mongo/db/free_mon/free_mon_op_observer.h
index 498d728d187..2a2a0c60097 100644
--- a/src/mongo/db/free_mon/free_mon_op_observer.h
+++ b/src/mongo/db/free_mon/free_mon_op_observer.h
@@ -209,8 +209,12 @@ public:
void onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) final {}
+ void onBatchedWriteStart(OperationContext* opCtx) final {}
+
void onBatchedWriteCommit(OperationContext* opCtx) final {}
+ void onBatchedWriteAbort(OperationContext* opCtx) final {}
+
void onMajorityCommitPointUpdate(ServiceContext* service,
const repl::OpTime& newCommitPoint) final {}
diff --git a/src/mongo/db/op_observer.h b/src/mongo/db/op_observer.h
index 163d4738b0e..ac8b04fb2f6 100644
--- a/src/mongo/db/op_observer.h
+++ b/src/mongo/db/op_observer.h
@@ -414,9 +414,28 @@ public:
Timestamp commitTimestamp,
const std::vector<repl::ReplOperation>& statements) noexcept = 0;
+ /**
+ * Events for logical grouping of writes to be replicated atomically.
+ * After onBatchedWriteStart(), the replication subsystem is prepared to
+ * start collecting operations to replicate in an applyOps oplog entry.
+ */
+ virtual void onBatchedWriteStart(OperationContext* opCtx) = 0;
+
+ /**
+ * The write operations between onBatchedWriteStart() and onBatchedWriteCommit()
+ * are gathered in a single applyOps oplog entry, similar to atomic applyOps and
+ * multi-doc transactions, and written to the oplog.
+ */
virtual void onBatchedWriteCommit(OperationContext* opCtx) = 0;
/**
+ * Clears the accumulated write operations. No further writes is allowed in this storage
+ * transaction (WriteUnitOfWork). Calling this function after onBatchedWriteCommit()
+ * should be fine for cleanup purposes.
+ */
+ virtual void onBatchedWriteAbort(OperationContext* opCtx) = 0;
+
+ /**
* Contains "applyOps" oplog entries and oplog slots to be used for writing pre- and post- image
* oplog entries for a transaction. "applyOps" entries are not actual "applyOps" entries to be
* written to the oplog, but comprise certain parts of those entries - BSON serialized
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp
index 383f5d47e34..773edc1cd63 100644
--- a/src/mongo/db/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer_impl.cpp
@@ -2085,6 +2085,8 @@ void OpObserverImpl::onUnpreparedTransactionCommit(OperationContext* opCtx,
shardObserveTransactionPrepareOrUnpreparedCommit(opCtx, *statements, commitOpTime);
}
+void OpObserverImpl::onBatchedWriteStart(OperationContext* opCtx) {}
+
void OpObserverImpl::onBatchedWriteCommit(OperationContext* opCtx) {
if (repl::ReplicationCoordinator::get(opCtx)->getReplicationMode() !=
repl::ReplicationCoordinator::modeReplSet ||
@@ -2127,6 +2129,8 @@ void OpObserverImpl::onBatchedWriteCommit(OperationContext* opCtx) {
wallClockTime);
}
+void OpObserverImpl::onBatchedWriteAbort(OperationContext* opCtx) {}
+
void OpObserverImpl::onPreparedTransactionCommit(
OperationContext* opCtx,
OplogSlot commitOplogEntryOpTime,
diff --git a/src/mongo/db/op_observer_impl.h b/src/mongo/db/op_observer_impl.h
index 77676e9548e..cc4b0cb34ba 100644
--- a/src/mongo/db/op_observer_impl.h
+++ b/src/mongo/db/op_observer_impl.h
@@ -188,7 +188,9 @@ public:
void onUnpreparedTransactionCommit(OperationContext* opCtx,
std::vector<repl::ReplOperation>* statements,
size_t numberOfPrePostImagesToWrite) final;
+ void onBatchedWriteStart(OperationContext* opCtx) final;
void onBatchedWriteCommit(OperationContext* opCtx) final;
+ void onBatchedWriteAbort(OperationContext* opCtx) final;
void onPreparedTransactionCommit(
OperationContext* opCtx,
OplogSlot commitOplogEntryOpTime,
diff --git a/src/mongo/db/op_observer_noop.h b/src/mongo/db/op_observer_noop.h
index 667164d7564..155a13819bc 100644
--- a/src/mongo/db/op_observer_noop.h
+++ b/src/mongo/db/op_observer_noop.h
@@ -162,7 +162,9 @@ public:
void onUnpreparedTransactionCommit(OperationContext* opCtx,
std::vector<repl::ReplOperation>* statements,
size_t numberOfPrePostImagesToWrite) override {}
+ void onBatchedWriteStart(OperationContext* opCtx) final {}
void onBatchedWriteCommit(OperationContext* opCtx) final {}
+ void onBatchedWriteAbort(OperationContext* opCtx) final {}
void onPreparedTransactionCommit(
OperationContext* opCtx,
OplogSlot commitOplogEntryOpTime,
diff --git a/src/mongo/db/op_observer_registry.h b/src/mongo/db/op_observer_registry.h
index 4b30bc7c72b..56323d628cb 100644
--- a/src/mongo/db/op_observer_registry.h
+++ b/src/mongo/db/op_observer_registry.h
@@ -432,6 +432,13 @@ public:
o->onTransactionAbort(opCtx, abortOplogEntryOpTime);
}
+ void onBatchedWriteStart(OperationContext* opCtx) override {
+ ReservedTimes times{opCtx};
+ for (auto& o : _observers) {
+ o->onBatchedWriteStart(opCtx);
+ }
+ }
+
void onBatchedWriteCommit(OperationContext* opCtx) override {
ReservedTimes times{opCtx};
for (auto& o : _observers) {
@@ -439,6 +446,13 @@ public:
}
}
+ void onBatchedWriteAbort(OperationContext* opCtx) override {
+ ReservedTimes times{opCtx};
+ for (auto& o : _observers) {
+ o->onBatchedWriteAbort(opCtx);
+ }
+ }
+
void onMajorityCommitPointUpdate(ServiceContext* service,
const repl::OpTime& newCommitPoint) override {
for (auto& o : _observers)
diff --git a/src/mongo/db/repl/primary_only_service_op_observer.h b/src/mongo/db/repl/primary_only_service_op_observer.h
index 5f552149f98..15673bf3aa8 100644
--- a/src/mongo/db/repl/primary_only_service_op_observer.h
+++ b/src/mongo/db/repl/primary_only_service_op_observer.h
@@ -211,8 +211,12 @@ public:
void onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) final {}
+ void onBatchedWriteStart(OperationContext* opCtx) final {}
+
void onBatchedWriteCommit(OperationContext* opCtx) final {}
+ void onBatchedWriteAbort(OperationContext* opCtx) final {}
+
void onMajorityCommitPointUpdate(ServiceContext* service,
const repl::OpTime& newCommitPoint) final {}
diff --git a/src/mongo/db/repl/tenant_migration_donor_op_observer.h b/src/mongo/db/repl/tenant_migration_donor_op_observer.h
index 43992f5e040..1760c8e551a 100644
--- a/src/mongo/db/repl/tenant_migration_donor_op_observer.h
+++ b/src/mongo/db/repl/tenant_migration_donor_op_observer.h
@@ -208,8 +208,12 @@ public:
void onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) final {}
+ void onBatchedWriteStart(OperationContext* opCtx) final {}
+
void onBatchedWriteCommit(OperationContext* opCtx) final {}
+ void onBatchedWriteAbort(OperationContext* opCtx) final {}
+
void onMajorityCommitPointUpdate(ServiceContext* service,
const repl::OpTime& newCommitPoint) final;
diff --git a/src/mongo/db/repl/tenant_migration_recipient_op_observer.h b/src/mongo/db/repl/tenant_migration_recipient_op_observer.h
index dd42ff6581f..33b15910776 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_op_observer.h
+++ b/src/mongo/db/repl/tenant_migration_recipient_op_observer.h
@@ -210,8 +210,12 @@ public:
void onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) final {}
+ void onBatchedWriteStart(OperationContext* opCtx) final {}
+
void onBatchedWriteCommit(OperationContext* opCtx) final {}
+ void onBatchedWriteAbort(OperationContext* opCtx) final {}
+
void onMajorityCommitPointUpdate(ServiceContext* service,
const repl::OpTime& newCommitPoint) final {}
diff --git a/src/mongo/db/s/config_server_op_observer.h b/src/mongo/db/s/config_server_op_observer.h
index b5e5d04d7a6..134edc88933 100644
--- a/src/mongo/db/s/config_server_op_observer.h
+++ b/src/mongo/db/s/config_server_op_observer.h
@@ -211,8 +211,12 @@ public:
void onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) override {}
+ void onBatchedWriteStart(OperationContext* opCtx) final {}
+
void onBatchedWriteCommit(OperationContext* opCtx) final {}
+ void onBatchedWriteAbort(OperationContext* opCtx) final {}
+
void onMajorityCommitPointUpdate(ServiceContext* service,
const repl::OpTime& newCommitPoint) override;
diff --git a/src/mongo/db/s/resharding/resharding_op_observer.h b/src/mongo/db/s/resharding/resharding_op_observer.h
index 30d319a041d..d3343f40276 100644
--- a/src/mongo/db/s/resharding/resharding_op_observer.h
+++ b/src/mongo/db/s/resharding/resharding_op_observer.h
@@ -231,8 +231,12 @@ public:
void onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) override {}
+ void onBatchedWriteStart(OperationContext* opCtx) final {}
+
void onBatchedWriteCommit(OperationContext* opCtx) final {}
+ void onBatchedWriteAbort(OperationContext* opCtx) final {}
+
void onMajorityCommitPointUpdate(ServiceContext* service,
const repl::OpTime& newCommitPoint) override {}
diff --git a/src/mongo/db/s/shard_server_op_observer.h b/src/mongo/db/s/shard_server_op_observer.h
index 5a0671254d6..4d615cbf5ea 100644
--- a/src/mongo/db/s/shard_server_op_observer.h
+++ b/src/mongo/db/s/shard_server_op_observer.h
@@ -210,8 +210,12 @@ public:
void onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) override {}
+ void onBatchedWriteStart(OperationContext* opCtx) final {}
+
void onBatchedWriteCommit(OperationContext* opCtx) final {}
+ void onBatchedWriteAbort(OperationContext* opCtx) final {}
+
void onMajorityCommitPointUpdate(ServiceContext* service,
const repl::OpTime& newCommitPoint) override {}
diff --git a/src/mongo/db/serverless/shard_split_donor_op_observer.h b/src/mongo/db/serverless/shard_split_donor_op_observer.h
index 55796479ffe..ab80fe558a8 100644
--- a/src/mongo/db/serverless/shard_split_donor_op_observer.h
+++ b/src/mongo/db/serverless/shard_split_donor_op_observer.h
@@ -207,8 +207,12 @@ public:
void onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) final {}
+ void onBatchedWriteStart(OperationContext* opCtx) final {}
+
void onBatchedWriteCommit(OperationContext* opCtx) final {}
+ void onBatchedWriteAbort(OperationContext* opCtx) final {}
+
void onMajorityCommitPointUpdate(ServiceContext* service,
const repl::OpTime& newCommitPoint) final;
diff --git a/src/mongo/db/user_write_block_mode_op_observer.h b/src/mongo/db/user_write_block_mode_op_observer.h
index 330779d0cd6..d339c38925f 100644
--- a/src/mongo/db/user_write_block_mode_op_observer.h
+++ b/src/mongo/db/user_write_block_mode_op_observer.h
@@ -230,8 +230,12 @@ public:
void onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) final {}
+ void onBatchedWriteStart(OperationContext* opCtx) final {}
+
void onBatchedWriteCommit(OperationContext* opCtx) final {}
+ void onBatchedWriteAbort(OperationContext* opCtx) final {}
+
void onMajorityCommitPointUpdate(ServiceContext* service,
const repl::OpTime& newCommitPoint) final {}
diff --git a/src/mongo/idl/cluster_server_parameter_op_observer.h b/src/mongo/idl/cluster_server_parameter_op_observer.h
index 2ef05729e39..f19dcc91337 100644
--- a/src/mongo/idl/cluster_server_parameter_op_observer.h
+++ b/src/mongo/idl/cluster_server_parameter_op_observer.h
@@ -187,8 +187,12 @@ public:
std::vector<repl::ReplOperation>* statements,
size_t numberOfPrePostImagesToWrite) final {}
+ void onBatchedWriteStart(OperationContext* opCtx) final {}
+
void onBatchedWriteCommit(OperationContext* opCtx) final {}
+ void onBatchedWriteAbort(OperationContext* opCtx) final {}
+
void onPreparedTransactionCommit(
OperationContext* opCtx,
OplogSlot commitOplogEntryOpTime,