summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2022-07-16 09:40:51 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-07-16 14:09:05 +0000
commit52a596efa4256d42d895ee56ebfa4c7328f83164 (patch)
treeaaef35f9ff277561fd84eee40abd075059e5608a
parent610d9be50d6efa94a1012ca0af3afe6b67b56c19 (diff)
downloadmongo-52a596efa4256d42d895ee56ebfa4c7328f83164.tar.gz
SERVER-67508 add OplogWriterTransactionProxy
-rw-r--r--src/mongo/db/SConscript1
-rw-r--r--src/mongo/db/mongod_main.cpp5
-rw-r--r--src/mongo/db/op_observer/SConscript10
-rw-r--r--src/mongo/db/op_observer/oplog_writer_transaction_proxy.cpp65
-rw-r--r--src/mongo/db/op_observer/oplog_writer_transaction_proxy.h72
5 files changed, 151 insertions, 2 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index 61bae96889c..0d434266dbb 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -2440,6 +2440,7 @@ env.Library(
'op_observer/fcv_op_observer',
'op_observer/op_observer',
'op_observer/oplog_writer_impl',
+ 'op_observer/oplog_writer_transaction_proxy',
'op_observer/user_write_block_mode_op_observer',
'periodic_runner_job_abort_expired_transactions',
'pipeline/process_interface/mongod_process_interface_factory',
diff --git a/src/mongo/db/mongod_main.cpp b/src/mongo/db/mongod_main.cpp
index a9101d83dcb..5b6f0b88d1c 100644
--- a/src/mongo/db/mongod_main.cpp
+++ b/src/mongo/db/mongod_main.cpp
@@ -107,6 +107,7 @@
#include "mongo/db/op_observer/op_observer_impl.h"
#include "mongo/db/op_observer/op_observer_registry.h"
#include "mongo/db/op_observer/oplog_writer_impl.h"
+#include "mongo/db/op_observer/oplog_writer_transaction_proxy.h"
#include "mongo/db/op_observer/user_write_block_mode_op_observer.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/periodic_runner_job_abort_expired_transactions.h"
@@ -1109,8 +1110,8 @@ void setUpObservers(ServiceContext* serviceContext) {
if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) {
DurableHistoryRegistry::get(serviceContext)
->registerPin(std::make_unique<ReshardingHistoryHook>());
- opObserverRegistry->addObserver(
- std::make_unique<OpObserverShardingImpl>(std::make_unique<OplogWriterImpl>()));
+ opObserverRegistry->addObserver(std::make_unique<OpObserverShardingImpl>(
+ std::make_unique<OplogWriterTransactionProxy>(std::make_unique<OplogWriterImpl>())));
opObserverRegistry->addObserver(std::make_unique<ShardServerOpObserver>());
opObserverRegistry->addObserver(std::make_unique<ReshardingOpObserver>());
opObserverRegistry->addObserver(std::make_unique<repl::TenantMigrationDonorOpObserver>());
diff --git a/src/mongo/db/op_observer/SConscript b/src/mongo/db/op_observer/SConscript
index 9d081cdba7b..8cce8f0d18d 100644
--- a/src/mongo/db/op_observer/SConscript
+++ b/src/mongo/db/op_observer/SConscript
@@ -42,6 +42,16 @@ env.Library(
)
env.Library(
+ target='oplog_writer_transaction_proxy',
+ source=[
+ 'oplog_writer_transaction_proxy.cpp',
+ ],
+ LIBDEPS_PRIVATE=[
+ '$BUILD_DIR/mongo/base',
+ ],
+)
+
+env.Library(
target="op_observer_impl",
source=[
"op_observer_impl.cpp",
diff --git a/src/mongo/db/op_observer/oplog_writer_transaction_proxy.cpp b/src/mongo/db/op_observer/oplog_writer_transaction_proxy.cpp
new file mode 100644
index 00000000000..35021e42f07
--- /dev/null
+++ b/src/mongo/db/op_observer/oplog_writer_transaction_proxy.cpp
@@ -0,0 +1,65 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/op_observer/oplog_writer_transaction_proxy.h"
+
+namespace mongo {
+
+OplogWriterTransactionProxy::OplogWriterTransactionProxy(
+ std::unique_ptr<OplogWriter> targetOplogWriter)
+ : _targetOplogWriter(std::move(targetOplogWriter)) {}
+
+void OplogWriterTransactionProxy::appendOplogEntryChainInfo(OperationContext* opCtx,
+ repl::MutableOplogEntry* oplogEntry,
+ repl::OplogLink* oplogLink,
+ const std::vector<StmtId>& stmtIds) {
+ return _targetOplogWriter->appendOplogEntryChainInfo(opCtx, oplogEntry, oplogLink, stmtIds);
+}
+
+std::vector<repl::OpTime> OplogWriterTransactionProxy::logInsertOps(
+ OperationContext* opCtx,
+ repl::MutableOplogEntry* oplogEntryTemplate,
+ std::vector<InsertStatement>::const_iterator begin,
+ std::vector<InsertStatement>::const_iterator end,
+ std::function<boost::optional<ShardId>(const BSONObj& doc)> getDestinedRecipientFn) {
+ return _targetOplogWriter->logInsertOps(
+ opCtx, oplogEntryTemplate, begin, end, getDestinedRecipientFn);
+}
+
+repl::OpTime OplogWriterTransactionProxy::logOp(OperationContext* opCtx,
+ repl::MutableOplogEntry* oplogEntry) {
+ return _targetOplogWriter->logOp(opCtx, oplogEntry);
+}
+
+std::vector<OplogSlot> OplogWriterTransactionProxy::getNextOpTimes(OperationContext* opCtx,
+ std::size_t count) {
+ return _targetOplogWriter->getNextOpTimes(opCtx, count);
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/op_observer/oplog_writer_transaction_proxy.h b/src/mongo/db/op_observer/oplog_writer_transaction_proxy.h
new file mode 100644
index 00000000000..3f96e2d3679
--- /dev/null
+++ b/src/mongo/db/op_observer/oplog_writer_transaction_proxy.h
@@ -0,0 +1,72 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <memory>
+
+#include "mongo/db/op_observer/oplog_writer.h"
+
+namespace mongo {
+
+/**
+ * Accumulates replicated operations for multi-document transactions and batched WUOW writes.
+ * When the operations are ready to be replicated, we compose the final chain of applyOps oplog
+ * entries and write it to the actual oplog referenced in '_targetOplogWriter'.
+ */
+class OplogWriterTransactionProxy : public OplogWriter {
+ OplogWriterTransactionProxy(const OplogWriterTransactionProxy&) = delete;
+ OplogWriterTransactionProxy& operator=(const OplogWriterTransactionProxy&) = delete;
+
+public:
+ OplogWriterTransactionProxy(std::unique_ptr<OplogWriter> targetOplogWriter);
+ virtual ~OplogWriterTransactionProxy() = default;
+
+ void appendOplogEntryChainInfo(OperationContext* opCtx,
+ repl::MutableOplogEntry* oplogEntry,
+ repl::OplogLink* oplogLink,
+ const std::vector<StmtId>& stmtIds) override;
+
+ std::vector<repl::OpTime> logInsertOps(
+ OperationContext* opCtx,
+ repl::MutableOplogEntry* oplogEntryTemplate,
+ std::vector<InsertStatement>::const_iterator begin,
+ std::vector<InsertStatement>::const_iterator end,
+ std::function<boost::optional<ShardId>(const BSONObj& doc)> getDestinedRecipientFn)
+ override;
+
+ repl::OpTime logOp(OperationContext* opCtx, repl::MutableOplogEntry* oplogEntry) override;
+
+ std::vector<OplogSlot> getNextOpTimes(OperationContext* opCtx, std::size_t count) override;
+
+private:
+ std::unique_ptr<OplogWriter> _targetOplogWriter;
+};
+
+} // namespace mongo