summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/auth/auth_op_observer.h6
-rw-r--r--src/mongo/db/free_mon/free_mon_op_observer.h6
-rw-r--r--src/mongo/db/op_observer.h29
-rw-r--r--src/mongo/db/op_observer_impl.cpp25
-rw-r--r--src/mongo/db/op_observer_impl.h6
-rw-r--r--src/mongo/db/op_observer_noop.h6
-rw-r--r--src/mongo/db/op_observer_registry.h16
-rw-r--r--src/mongo/db/repl/SConscript6
-rw-r--r--src/mongo/db/repl/oplog.cpp13
-rw-r--r--src/mongo/db/repl/oplog_entry.h2
-rw-r--r--src/mongo/db/repl/oplog_entry.idl5
-rw-r--r--src/mongo/db/repl/oplog_test.cpp41
-rw-r--r--src/mongo/db/repl/tenant_migration_decoration.cpp40
-rw-r--r--src/mongo/db/repl/tenant_migration_decoration.h47
-rw-r--r--src/mongo/db/repl/tenant_oplog_applier.cpp319
-rw-r--r--src/mongo/db/repl/tenant_oplog_applier.h145
-rw-r--r--src/mongo/db/repl/tenant_oplog_applier_test.cpp280
-rw-r--r--src/mongo/db/s/config_server_op_observer.h6
-rw-r--r--src/mongo/db/s/migration_source_manager.cpp11
-rw-r--r--src/mongo/db/s/session_catalog_migration_source.cpp10
-rw-r--r--src/mongo/db/s/shard_server_op_observer.h6
21 files changed, 1000 insertions, 25 deletions
diff --git a/src/mongo/db/auth/auth_op_observer.h b/src/mongo/db/auth/auth_op_observer.h
index 5920146860e..2dc13af3bd8 100644
--- a/src/mongo/db/auth/auth_op_observer.h
+++ b/src/mongo/db/auth/auth_op_observer.h
@@ -99,7 +99,11 @@ public:
const NamespaceString& nss,
const boost::optional<UUID> uuid,
const BSONObj& msgObj,
- const boost::optional<BSONObj> o2MsgObj) final {}
+ const boost::optional<BSONObj> o2MsgObj,
+ const boost::optional<repl::OpTime> preImageOpTime,
+ const boost::optional<repl::OpTime> postImageOpTime,
+ const boost::optional<repl::OpTime> prevWriteOpTimeInTransaction,
+ const boost::optional<OplogSlot> slot) final{};
void onCreateCollection(OperationContext* opCtx,
Collection* coll,
diff --git a/src/mongo/db/free_mon/free_mon_op_observer.h b/src/mongo/db/free_mon/free_mon_op_observer.h
index 30642076e7e..f06efef564f 100644
--- a/src/mongo/db/free_mon/free_mon_op_observer.h
+++ b/src/mongo/db/free_mon/free_mon_op_observer.h
@@ -99,7 +99,11 @@ public:
const NamespaceString& nss,
const boost::optional<UUID> uuid,
const BSONObj& msgObj,
- const boost::optional<BSONObj> o2MsgObj) final {}
+ const boost::optional<BSONObj> o2MsgObj,
+ const boost::optional<repl::OpTime> preImageOpTime,
+ const boost::optional<repl::OpTime> postImageOpTime,
+ const boost::optional<repl::OpTime> prevWriteOpTimeInTransaction,
+ const boost::optional<OplogSlot> slot) final{};
void onCreateCollection(OperationContext* opCtx,
Collection* coll,
diff --git a/src/mongo/db/op_observer.h b/src/mongo/db/op_observer.h
index aefdf1fc04e..51753376d88 100644
--- a/src/mongo/db/op_observer.h
+++ b/src/mongo/db/op_observer.h
@@ -152,20 +152,33 @@ public:
/**
* Logs a no-op with "msgObj" in the o field into oplog.
*
- * This function should only be used internally. "nss", "uuid" and the o2 field should never be
- * exposed to users (for instance through the appendOplogNote command).
+ * This function should only be used internally. "nss", "uuid", "o2", and the opTimes should
+ * never be exposed to users (for instance through the appendOplogNote command).
*/
- virtual void onInternalOpMessage(OperationContext* opCtx,
- const NamespaceString& nss,
- const boost::optional<UUID> uuid,
- const BSONObj& msgObj,
- const boost::optional<BSONObj> o2MsgObj) = 0;
+ virtual void onInternalOpMessage(
+ OperationContext* opCtx,
+ const NamespaceString& nss,
+ const boost::optional<UUID> uuid,
+ const BSONObj& msgObj,
+ const boost::optional<BSONObj> o2MsgObj,
+ const boost::optional<repl::OpTime> preImageOpTime,
+ const boost::optional<repl::OpTime> postImageOpTime,
+ const boost::optional<repl::OpTime> prevWriteOpTimeInTransaction,
+ const boost::optional<OplogSlot> slot) = 0;
/**
* Logs a no-op with "msgObj" in the o field into oplog.
*/
void onOpMessage(OperationContext* opCtx, const BSONObj& msgObj) {
- onInternalOpMessage(opCtx, {}, boost::none, msgObj, boost::none);
+ onInternalOpMessage(opCtx,
+ {},
+ boost::none,
+ msgObj,
+ boost::none,
+ boost::none,
+ boost::none,
+ boost::none,
+ boost::none);
}
virtual void onCreateCollection(OperationContext* opCtx,
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp
index 193fa3885da..11c6402566e 100644
--- a/src/mongo/db/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer_impl.cpp
@@ -356,6 +356,10 @@ void OpObserverImpl::onStartIndexBuildSinglePhase(OperationContext* opCtx,
{},
boost::none,
BSON("msg" << std::string(str::stream() << "Creating indexes. Coll: " << nss)),
+ boost::none,
+ boost::none,
+ boost::none,
+ boost::none,
boost::none);
}
@@ -646,17 +650,28 @@ void OpObserverImpl::onDelete(OperationContext* opCtx,
}
}
-void OpObserverImpl::onInternalOpMessage(OperationContext* opCtx,
- const NamespaceString& nss,
- const boost::optional<UUID> uuid,
- const BSONObj& msgObj,
- const boost::optional<BSONObj> o2MsgObj) {
+void OpObserverImpl::onInternalOpMessage(
+ OperationContext* opCtx,
+ const NamespaceString& nss,
+ const boost::optional<UUID> uuid,
+ const BSONObj& msgObj,
+ const boost::optional<BSONObj> o2MsgObj,
+ const boost::optional<repl::OpTime> preImageOpTime,
+ const boost::optional<repl::OpTime> postImageOpTime,
+ const boost::optional<repl::OpTime> prevWriteOpTimeInTransaction,
+ const boost::optional<OplogSlot> slot) {
MutableOplogEntry oplogEntry;
oplogEntry.setOpType(repl::OpTypeEnum::kNoop);
oplogEntry.setNss(nss);
oplogEntry.setUuid(uuid);
oplogEntry.setObject(msgObj);
oplogEntry.setObject2(o2MsgObj);
+ oplogEntry.setPreImageOpTime(preImageOpTime);
+ oplogEntry.setPostImageOpTime(postImageOpTime);
+ oplogEntry.setPrevWriteOpTimeInTransaction(prevWriteOpTimeInTransaction);
+ if (slot) {
+ oplogEntry.setOpTime(*slot);
+ }
logOperation(opCtx, &oplogEntry);
}
diff --git a/src/mongo/db/op_observer_impl.h b/src/mongo/db/op_observer_impl.h
index 52c50da7b91..266cdb603a6 100644
--- a/src/mongo/db/op_observer_impl.h
+++ b/src/mongo/db/op_observer_impl.h
@@ -106,7 +106,11 @@ public:
const NamespaceString& nss,
const boost::optional<UUID> uuid,
const BSONObj& msgObj,
- const boost::optional<BSONObj> o2MsgObj) final;
+ const boost::optional<BSONObj> o2MsgObj,
+ const boost::optional<repl::OpTime> preImageOpTime,
+ const boost::optional<repl::OpTime> postImageOpTime,
+ const boost::optional<repl::OpTime> prevWriteOpTimeInTransaction,
+ const boost::optional<OplogSlot> slot) final;
void onCreateCollection(OperationContext* opCtx,
Collection* coll,
const NamespaceString& collectionName,
diff --git a/src/mongo/db/op_observer_noop.h b/src/mongo/db/op_observer_noop.h
index f3636fdc43f..2d4cb62da31 100644
--- a/src/mongo/db/op_observer_noop.h
+++ b/src/mongo/db/op_observer_noop.h
@@ -86,7 +86,11 @@ public:
const NamespaceString& nss,
const boost::optional<UUID> uuid,
const BSONObj& msgObj,
- const boost::optional<BSONObj> o2MsgObj) override {}
+ const boost::optional<BSONObj> o2MsgObj,
+ const boost::optional<repl::OpTime> preImageOpTime,
+ const boost::optional<repl::OpTime> postImageOpTime,
+ const boost::optional<repl::OpTime> prevWriteOpTimeInTransaction,
+ const boost::optional<OplogSlot> slot) override {}
void onCreateCollection(OperationContext* opCtx,
Collection* coll,
const NamespaceString& collectionName,
diff --git a/src/mongo/db/op_observer_registry.h b/src/mongo/db/op_observer_registry.h
index 9ce4ec255a0..6988b0560bb 100644
--- a/src/mongo/db/op_observer_registry.h
+++ b/src/mongo/db/op_observer_registry.h
@@ -153,10 +153,22 @@ public:
const NamespaceString& nss,
const boost::optional<UUID> uuid,
const BSONObj& msgObj,
- const boost::optional<BSONObj> o2MsgObj) override {
+ const boost::optional<BSONObj> o2MsgObj,
+ const boost::optional<repl::OpTime> preImageOpTime,
+ const boost::optional<repl::OpTime> postImageOpTime,
+ const boost::optional<repl::OpTime> prevWriteOpTimeInTransaction,
+ const boost::optional<OplogSlot> slot) override {
ReservedTimes times{opCtx};
for (auto& o : _observers)
- o->onInternalOpMessage(opCtx, nss, uuid, msgObj, o2MsgObj);
+ o->onInternalOpMessage(opCtx,
+ nss,
+ uuid,
+ msgObj,
+ o2MsgObj,
+ preImageOpTime,
+ postImageOpTime,
+ prevWriteOpTimeInTransaction,
+ slot);
}
void onCreateCollection(OperationContext* const opCtx,
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index e5efba6af97..77fdf036e9c 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -35,6 +35,7 @@ env.Library(
'apply_ops.cpp',
'oplog.cpp',
'oplog_entry_or_grouped_inserts.cpp',
+ 'tenant_migration_decoration.cpp',
'transaction_oplog_application.cpp',
env.Idlc('apply_ops.idl')[0],
],
@@ -1381,6 +1382,7 @@ env.CppUnitTest(
'sync_source_resolver_test.cpp',
'task_runner_test.cpp',
'task_runner_test_fixture.cpp',
+ 'tenant_oplog_applier_test.cpp',
'tenant_oplog_batcher_test.cpp',
'vote_requester_test.cpp',
'wait_for_majority_service_test.cpp',
@@ -1642,10 +1644,12 @@ env.Library(
env.Library(
target='tenant_oplog_processing',
source=[
- 'tenant_oplog_batcher.cpp'
+ 'tenant_oplog_batcher.cpp',
+ 'tenant_oplog_applier.cpp'
],
LIBDEPS_PRIVATE=[
'$BUILD_DIR/mongo/base',
+ '$BUILD_DIR/mongo/db/concurrency/write_conflict_exception',
'abstract_async_component',
'oplog',
'oplog_application_interface',
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index 61ce9f4da5d..4398102e372 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -79,6 +79,7 @@
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/repl/repl_server_parameters_gen.h"
#include "mongo/db/repl/replication_coordinator.h"
+#include "mongo/db/repl/tenant_migration_decoration.h"
#include "mongo/db/repl/tenant_migration_donor_util.h"
#include "mongo/db/repl/timestamp_block.h"
#include "mongo/db/repl/transaction_oplog_application.h"
@@ -284,6 +285,12 @@ OpTime logOp(OperationContext* opCtx, MutableOplogEntry* oplogEntry) {
!oplogEntry->getStatementId());
return {};
}
+ // If this oplog entry is from a tenant migration, include the tenant migration
+ // UUID.
+ const auto& recipientInfo = tenantMigrationRecipientInfo(opCtx);
+ if (recipientInfo) {
+ oplogEntry->setFromTenantMigration(recipientInfo->uuid);
+ }
// Use OplogAccessMode::kLogOp to avoid recursive locking.
AutoGetOplog oplogWrite(opCtx, OplogAccessMode::kLogOp);
@@ -329,6 +336,12 @@ std::vector<OpTime> logInsertOps(OperationContext* opCtx,
std::vector<InsertStatement>::const_iterator end) {
invariant(begin != end);
oplogEntryTemplate->setOpType(repl::OpTypeEnum::kInsert);
+ // If this oplog entry is from a tenant migration, include the tenant migration
+ // UUID.
+ const auto& recipientInfo = tenantMigrationRecipientInfo(opCtx);
+ if (recipientInfo) {
+ oplogEntryTemplate->setFromTenantMigration(recipientInfo->uuid);
+ }
auto nss = oplogEntryTemplate->getNss();
auto replCoord = ReplicationCoordinator::get(opCtx);
diff --git a/src/mongo/db/repl/oplog_entry.h b/src/mongo/db/repl/oplog_entry.h
index 036162e02d3..d29e6b6acc8 100644
--- a/src/mongo/db/repl/oplog_entry.h
+++ b/src/mongo/db/repl/oplog_entry.h
@@ -206,6 +206,7 @@ public:
using MutableOplogEntry::kDestinedRecipientFieldName;
using MutableOplogEntry::kDurableReplOperationFieldName;
using MutableOplogEntry::kFromMigrateFieldName;
+ using MutableOplogEntry::kFromTenantMigrationFieldName;
using MutableOplogEntry::kHashFieldName;
using MutableOplogEntry::kNssFieldName;
using MutableOplogEntry::kObject2FieldName;
@@ -231,6 +232,7 @@ public:
using MutableOplogEntry::getDestinedRecipient;
using MutableOplogEntry::getDurableReplOperation;
using MutableOplogEntry::getFromMigrate;
+ using MutableOplogEntry::getFromTenantMigration;
using MutableOplogEntry::getHash;
using MutableOplogEntry::getNss;
using MutableOplogEntry::getObject;
diff --git a/src/mongo/db/repl/oplog_entry.idl b/src/mongo/db/repl/oplog_entry.idl
index 1bd4e2c2a6a..5147eb9dfe0 100644
--- a/src/mongo/db/repl/oplog_entry.idl
+++ b/src/mongo/db/repl/oplog_entry.idl
@@ -119,6 +119,11 @@ structs:
type: bool
optional: true
description: "An operation caused by a chunk migration"
+ fromTenantMigration:
+ type: uuid
+ optional: true
+ description: "Contains the UUID of a tenant migration for an operation caused by
+ one."
_id:
type: objectid
optional: true
diff --git a/src/mongo/db/repl/oplog_test.cpp b/src/mongo/db/repl/oplog_test.cpp
index b19713167bf..3451e54d592 100644
--- a/src/mongo/db/repl/oplog_test.cpp
+++ b/src/mongo/db/repl/oplog_test.cpp
@@ -30,6 +30,7 @@
#include "mongo/platform/basic.h"
#include <algorithm>
+#include <boost/optional/optional_io.hpp>
#include <functional>
#include <map>
#include <utility>
@@ -44,6 +45,7 @@
#include "mongo/db/repl/oplog_interface_local.h"
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/repl/replication_coordinator_mock.h"
+#include "mongo/db/repl/tenant_migration_decoration.h"
#include "mongo/db/service_context_d_test_fixture.h"
#include "mongo/platform/mutex.h"
#include "mongo/unittest/barrier.h"
@@ -356,6 +358,45 @@ TEST_F(OplogTest, ConcurrentLogOpRevertLastOplogEntry) {
_checkOplogEntry(oplogEntries[0], *(opTimeNssMap.cbegin()));
}
+TEST_F(OplogTest, MigrationIdAddedToOplog) {
+ auto opCtx = cc().makeOperationContext();
+ auto migrationUuid = UUID::gen();
+ tenantMigrationRecipientInfo(opCtx.get()) =
+ boost::make_optional<TenantMigrationRecipientInfo>(migrationUuid);
+
+ const NamespaceString nss("test.coll");
+ auto msgObj = BSON("msg"
+ << "hello, world!");
+
+ // Write to the oplog.
+ OpTime opTime;
+ {
+ MutableOplogEntry oplogEntry;
+ oplogEntry.setOpType(repl::OpTypeEnum::kNoop);
+ oplogEntry.setNss(nss);
+ oplogEntry.setObject(msgObj);
+ oplogEntry.setWallClockTime(Date_t::now());
+ AutoGetDb autoDb(opCtx.get(), nss.db(), MODE_X);
+ WriteUnitOfWork wunit(opCtx.get());
+ opTime = logOp(opCtx.get(), &oplogEntry);
+ ASSERT_FALSE(opTime.isNull());
+ wunit.commit();
+ }
+
+ OplogEntry oplogEntry = _getSingleOplogEntry(opCtx.get());
+
+ // Ensure that msg fields were properly added to the oplog entry.
+ ASSERT_EQUALS(opTime, oplogEntry.getOpTime())
+ << "OpTime returned from logOp() did not match that in the oplog entry written to the "
+ "oplog: "
+ << oplogEntry.toBSON();
+ ASSERT(OpTypeEnum::kNoop == oplogEntry.getOpType())
+ << "Expected 'n' op type but found '" << OpType_serializer(oplogEntry.getOpType())
+ << "' instead: " << oplogEntry.toBSON();
+ ASSERT_BSONOBJ_EQ(msgObj, oplogEntry.getObject());
+ ASSERT_EQ(migrationUuid, oplogEntry.getFromTenantMigration());
+}
+
} // namespace
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/tenant_migration_decoration.cpp b/src/mongo/db/repl/tenant_migration_decoration.cpp
new file mode 100644
index 00000000000..598c70156a9
--- /dev/null
+++ b/src/mongo/db/repl/tenant_migration_decoration.cpp
@@ -0,0 +1,40 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/repl/tenant_migration_decoration.h"
+
+namespace mongo {
+namespace repl {
+const OperationContext::Decoration<boost::optional<TenantMigrationRecipientInfo>>
+ tenantMigrationRecipientInfo =
+ OperationContext::declareDecoration<boost::optional<TenantMigrationRecipientInfo>>();
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/tenant_migration_decoration.h b/src/mongo/db/repl/tenant_migration_decoration.h
new file mode 100644
index 00000000000..25087638280
--- /dev/null
+++ b/src/mongo/db/repl/tenant_migration_decoration.h
@@ -0,0 +1,47 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <boost/optional.hpp>
+
+#include "mongo/db/operation_context.h"
+#include "mongo/util/uuid.h"
+
+namespace mongo {
+namespace repl {
+struct TenantMigrationRecipientInfo {
+ TenantMigrationRecipientInfo(const UUID& in_uuid) : uuid(in_uuid) {}
+ UUID uuid;
+};
+extern const OperationContext::Decoration<boost::optional<TenantMigrationRecipientInfo>>
+ tenantMigrationRecipientInfo;
+
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/tenant_oplog_applier.cpp b/src/mongo/db/repl/tenant_oplog_applier.cpp
new file mode 100644
index 00000000000..e3aae536a18
--- /dev/null
+++ b/src/mongo/db/repl/tenant_oplog_applier.cpp
@@ -0,0 +1,319 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kReplication
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/repl/tenant_oplog_applier.h"
+
+#include <algorithm>
+
+#include "mongo/db/auth/authorization_session.h"
+#include "mongo/db/catalog_raii.h"
+#include "mongo/db/concurrency/write_conflict_exception.h"
+#include "mongo/db/op_observer.h"
+#include "mongo/db/repl/apply_ops.h"
+#include "mongo/db/repl/tenant_migration_decoration.h"
+#include "mongo/db/repl/tenant_oplog_batcher.h"
+#include "mongo/logv2/log.h"
+#include "mongo/util/concurrency/thread_pool.h"
+
+namespace mongo {
+namespace repl {
+
+// These batch sizes are pretty arbitrary.
+// TODO(SERVER-50024): come up with some reasonable numbers, and make them a settable parameter.
+constexpr size_t kTenantApplierBatchSizeBytes = 16 * 1024 * 1024;
+
+// TODO(SERVER-50024): kTenantApplierBatchSizeOps is currently chosen as the default value of
+// internalInsertMaxBatchSize. This is probably reasonable but should be a settable parameter.
+constexpr size_t kTenantApplierBatchSizeOps = 500;
+const size_t kMinOplogEntriesPerThread = 16;
+
+// TODO(SERVER-50024):: This is also arbitary and should be a settable parameter.
+constexpr size_t kTenantApplierThreadCount = 5;
+
+TenantOplogApplier::TenantOplogApplier(const UUID& migrationUuid,
+ const std::string& tenantId,
+ OpTime applyFromOpTime,
+ RandomAccessOplogBuffer* oplogBuffer,
+ std::shared_ptr<executor::TaskExecutor> executor)
+ : AbstractAsyncComponent(executor.get(), std::string("TenantOplogApplier_") + tenantId),
+ _migrationUuid(migrationUuid),
+ _tenantId(tenantId),
+ _applyFromOpTime(applyFromOpTime),
+ _oplogBuffer(oplogBuffer),
+ _executor(std::move(executor)),
+ _limits(kTenantApplierBatchSizeBytes, kTenantApplierBatchSizeOps) {}
+
+void TenantOplogApplier::_makeWriterPool_inlock(int threadCount) {
+ ThreadPool::Options options;
+ options.threadNamePrefix = "TenantOplogWriter-" + _tenantId + "-";
+ options.poolName = "TenantOplogWriterThreadPool-" + _tenantId;
+ options.maxThreads = options.minThreads = static_cast<size_t>(threadCount);
+ options.onCreateThread = [migrationUuid = this->_migrationUuid](const std::string&) {
+ Client::initThread(getThreadName());
+ AuthorizationSession::get(cc())->grantInternalAuthorization(&cc());
+ };
+ _writerPool = std::make_unique<ThreadPool>(options);
+ _writerPool->startup();
+}
+
+TenantOplogApplier::~TenantOplogApplier() {}
+
+SemiFuture<TenantOplogApplier::OpTimePair> TenantOplogApplier::getNotificationForOpTime(
+ OpTime donorOpTime) {
+ stdx::lock_guard lk(_mutex);
+ // If we're not running, return a future with the status we shut down with.
+ if (!_isActive_inlock()) {
+ return SemiFuture<OpTimePair>::makeReady(_finalStatus);
+ }
+ // If this optime has already passed, just return a ready future.
+ if (_lastBatchCompletedOpTimes.donorOpTime >= donorOpTime) {
+ return SemiFuture<OpTimePair>::makeReady(_lastBatchCompletedOpTimes);
+ }
+
+ // This will pull a new future off the existing promise for this time if it exists, otherwise
+ // it constructs a new promise and pulls a future off of it.
+ auto [iter, isNew] = _opTimeNotificationList.try_emplace(donorOpTime);
+ return iter->second.getFuture().semi();
+}
+
+Status TenantOplogApplier::_doStartup_inlock() noexcept {
+ _makeWriterPool_inlock(kTenantApplierThreadCount);
+ _oplogBatcher = std::make_unique<TenantOplogBatcher>(_tenantId, _oplogBuffer, _executor);
+ auto status = _oplogBatcher->startup();
+ if (!status.isOK())
+ return status;
+ auto fut = _oplogBatcher->getNextBatch(_limits);
+ std::move(fut)
+ .thenRunOn(_executor)
+ .then([&](TenantOplogBatch batch) { _applyLoop(std::move(batch)); })
+ .onError([&](Status status) { _handleError(status); })
+ .getAsync([](auto status) {});
+ return Status::OK();
+}
+
+void TenantOplogApplier::_doShutdown_inlock() noexcept {
+ // Shutting down the oplog batcher will make the _applyLoop stop with an error future, thus
+ // shutting down the applier.
+ _oplogBatcher->shutdown();
+}
+
+void TenantOplogApplier::_applyLoop(TenantOplogBatch batch) {
+ // Getting the future for the next batch here means the batcher can retrieve the next batch
+ // while the applier is processing the current one.
+ auto nextBatchFuture = _oplogBatcher->getNextBatch(_limits);
+ try {
+ _applyOplogBatch(batch);
+ } catch (const DBException& e) {
+ _handleError(e.toStatus());
+ return;
+ }
+ {
+ stdx::lock_guard lk(_mutex);
+ if (_isShuttingDown_inlock()) {
+ _finishShutdown(lk, {ErrorCodes::CallbackCanceled, "Tenant Oplog Applier shut down"});
+ return;
+ }
+ }
+ std::move(nextBatchFuture)
+ .thenRunOn(_executor)
+ .then([&](TenantOplogBatch batch) { _applyLoop(std::move(batch)); })
+ .onError([&](Status status) { _handleError(status); })
+ .getAsync([](auto status) {});
+}
+
+void TenantOplogApplier::_handleError(Status status) {
+ LOGV2_DEBUG(4886005,
+ 1,
+ "TenantOplogApplier::_handleError",
+ "tenant"_attr = _tenantId,
+ "migrationUuid"_attr = _migrationUuid,
+ "status"_attr = status);
+ shutdown();
+ stdx::lock_guard lk(_mutex);
+ // If we reach _handleError, it means the applyLoop is not running.
+ _finishShutdown(lk, status);
+}
+
+void TenantOplogApplier::_finishShutdown(WithLock, Status status) {
+ // Any unfulfilled notifications are errored out.
+ for (auto& listEntry : _opTimeNotificationList) {
+ listEntry.second.setError(status);
+ }
+ _opTimeNotificationList.clear();
+ _writerPool->shutdown();
+ _writerPool->join();
+ _finalStatus = status;
+ _transitionToComplete_inlock();
+}
+
+void TenantOplogApplier::_applyOplogBatch(const TenantOplogBatch& batch) {
+ LOGV2_DEBUG(4886004,
+ 1,
+ "Tenant Oplog Applier starting to apply batch",
+ "tenant"_attr = _tenantId,
+ "migrationUuid"_attr = _migrationUuid,
+ "firstDonorOptime"_attr = batch.ops.front().entry.getOpTime(),
+ "lastDonorOptime"_attr = batch.ops.back().entry.getOpTime());
+ auto lastBatchCompletedOpTimes = _writeNoOpEntries(batch);
+ stdx::lock_guard lk(_mutex);
+ _lastBatchCompletedOpTimes = lastBatchCompletedOpTimes;
+ LOGV2_DEBUG(4886002,
+ 1,
+ "Tenant Oplog Applier finished applying batch",
+ "tenant"_attr = _tenantId,
+ "migrationUuid"_attr = _migrationUuid,
+ "lastDonorOptime"_attr = lastBatchCompletedOpTimes.donorOpTime,
+ "lastRecipientOptime"_attr = lastBatchCompletedOpTimes.recipientOpTime);
+
+ // Notify all the waiters on optimes before and including _lastBatchCompletedOpTimes.
+ auto firstUnexpiredIter =
+ _opTimeNotificationList.upper_bound(_lastBatchCompletedOpTimes.donorOpTime);
+ for (auto iter = _opTimeNotificationList.begin(); iter != firstUnexpiredIter; iter++) {
+ iter->second.emplaceValue(_lastBatchCompletedOpTimes);
+ }
+ _opTimeNotificationList.erase(_opTimeNotificationList.begin(), firstUnexpiredIter);
+}
+
+TenantOplogApplier::OpTimePair TenantOplogApplier::_writeNoOpEntries(
+ const TenantOplogBatch& batch) {
+ auto opCtx = cc().makeOperationContext();
+ auto* opObserver = cc().getServiceContext()->getOpObserver();
+
+ WriteUnitOfWork wuow(opCtx.get());
+ // Reserve oplog slots for all entries. This allows us to write them in parallel.
+ auto oplogSlots = repl::getNextOpTimes(opCtx.get(), batch.ops.size());
+ auto slotIter = oplogSlots.begin();
+ for (const auto& op : batch.ops) {
+ _setRecipientOpTime(op.entry.getOpTime(), *slotIter++);
+ }
+ const size_t numOplogThreads = _writerPool->getStats().numThreads;
+ const size_t numOpsPerThread =
+ std::max(kMinOplogEntriesPerThread, (batch.ops.size() / numOplogThreads));
+ slotIter = oplogSlots.begin();
+ auto opsIter = batch.ops.begin();
+ LOGV2_DEBUG(4886003,
+ 1,
+ "Tenant Oplog Applier scheduling no-ops ",
+ "tenant"_attr = _tenantId,
+ "migrationUuid"_attr = _migrationUuid,
+ "firstDonorOptime"_attr = batch.ops.front().entry.getOpTime(),
+ "lastDonorOptime"_attr = batch.ops.back().entry.getOpTime(),
+ "numOplogThreads"_attr = numOplogThreads,
+ "numOpsPerThread"_attr = numOpsPerThread);
+ size_t numOpsRemaining = batch.ops.size();
+ for (size_t thread = 0; thread < numOplogThreads && opsIter != batch.ops.end(); thread++) {
+ auto numOps = std::min(numOpsPerThread, numOpsRemaining);
+ if (thread == numOplogThreads - 1) {
+ numOps = numOpsRemaining;
+ }
+ _writerPool->schedule([=](Status status) {
+ invariant(status);
+ _writeNoOpsForRange(opObserver, opsIter, opsIter + numOps, slotIter);
+ });
+ slotIter += numOps;
+ opsIter += numOps;
+ numOpsRemaining -= numOps;
+ }
+ invariant(opsIter == batch.ops.end());
+ _writerPool->waitForIdle();
+ return {batch.ops.back().entry.getOpTime(), oplogSlots.back()};
+}
+
+
+// These two routines can't be the ultimate solution. It's not necessarily practical to keep a list
+// of every op we've written, and it doesn't work for failover. But as far as I can tell, it's
+// possible to refer to oplog entries arbitarily far back. We probably don't want to search the
+// oplog each time because it requires a collection scan to do so.
+// TODO(SERVER-50263): Come up with the right way to do this.
+OpTime TenantOplogApplier::_getRecipientOpTime(const OpTime& donorOpTime) {
+ stdx::lock_guard lk(_mutex);
+ auto times = std::upper_bound(
+ _opTimeMapping.begin(), _opTimeMapping.end(), OpTimePair(donorOpTime, OpTime()));
+ uassert(4886000,
+ str::stream() << "Recipient optime not found for donor optime "
+ << donorOpTime.toString(),
+ times->donorOpTime == donorOpTime);
+ return times->recipientOpTime;
+}
+
+void TenantOplogApplier::_setRecipientOpTime(const OpTime& donorOpTime,
+ const OpTime& recipientOpTime) {
+ stdx::lock_guard lk(_mutex);
+ // The _opTimeMapping is an array strictly ordered by donorOpTime; this uassert assures the
+ // order remains intact.
+ uassert(4886001,
+ str::stream() << "Donor optimes inserted out of order "
+ << _opTimeMapping.back().donorOpTime.toString()
+ << " >= " << donorOpTime.toString(),
+ _opTimeMapping.empty() || _opTimeMapping.back().donorOpTime < donorOpTime);
+ _opTimeMapping.emplace_back(donorOpTime, recipientOpTime);
+}
+
+boost::optional<OpTime> TenantOplogApplier::_maybeGetRecipientOpTime(
+ const boost::optional<OpTime> donorOpTime) {
+ if (!donorOpTime || donorOpTime->isNull())
+ return donorOpTime;
+ return _getRecipientOpTime(*donorOpTime);
+}
+
+void TenantOplogApplier::_writeNoOpsForRange(OpObserver* opObserver,
+ std::vector<TenantOplogEntry>::const_iterator begin,
+ std::vector<TenantOplogEntry>::const_iterator end,
+ std::vector<OplogSlot>::iterator firstSlot) {
+ auto opCtx = cc().makeOperationContext();
+ tenantMigrationRecipientInfo(opCtx.get()) =
+ boost::make_optional<TenantMigrationRecipientInfo>(_migrationUuid);
+ AutoGetOplog oplogWrite(opCtx.get(), OplogAccessMode::kWrite);
+ writeConflictRetry(
+ opCtx.get(), "writeTenantNoOps", NamespaceString::kRsOplogNamespace.ns(), [&] {
+ WriteUnitOfWork wuow(opCtx.get());
+ auto slot = firstSlot;
+ for (auto iter = begin; iter != end; iter++, slot++) {
+ opObserver->onInternalOpMessage(
+ opCtx.get(),
+ iter->entry.getNss(),
+ iter->entry.getUuid(),
+ iter->entry.toBSON(),
+ BSONObj(),
+ // We link the no-ops together by recipient op time the same way the actual ops
+ // were linked together by donor op time. This is to allow retryable writes
+ // and changestreams to find the ops they need.
+ _maybeGetRecipientOpTime(iter->entry.getPreImageOpTime()),
+ _maybeGetRecipientOpTime(iter->entry.getPostImageOpTime()),
+ _maybeGetRecipientOpTime(iter->entry.getPrevWriteOpTimeInTransaction()),
+ *slot);
+ }
+ });
+}
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/tenant_oplog_applier.h b/src/mongo/db/repl/tenant_oplog_applier.h
new file mode 100644
index 00000000000..ca415d2e18a
--- /dev/null
+++ b/src/mongo/db/repl/tenant_oplog_applier.h
@@ -0,0 +1,145 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <string>
+#include <vector>
+
+#include "mongo/db/repl/abstract_async_component.h"
+#include "mongo/db/repl/oplog.h"
+#include "mongo/db/repl/oplog_buffer.h"
+#include "mongo/db/repl/oplog_entry.h"
+#include "mongo/db/repl/tenant_oplog_batcher.h"
+#include "mongo/util/future.h"
+
+namespace mongo {
+class ThreadPool;
+
+namespace repl {
+
+/**
+ * This class reads oplog entries from a tenant migration, then applies those entries to the
+ * (real) oplog, then writes out no-op entries corresponding to the original oplog entries
+ * from the oplog buffer. Applier will not apply, but will write no-op entries for,
+ * entries before the applyFromOpTime.
+ *
+ */
+class TenantOplogApplier : public AbstractAsyncComponent {
+public:
+ struct OpTimePair {
+ OpTimePair() = default;
+ OpTimePair(OpTime in_donorOpTime, OpTime in_recipientOpTime)
+ : donorOpTime(in_donorOpTime), recipientOpTime(in_recipientOpTime) {}
+ bool operator<(const OpTimePair& other) const {
+ if (donorOpTime == other.donorOpTime)
+ return recipientOpTime < other.recipientOpTime;
+ return donorOpTime < other.donorOpTime;
+ }
+ OpTime donorOpTime;
+ OpTime recipientOpTime;
+ };
+
+ TenantOplogApplier(const UUID& migrationUuid,
+ const std::string& tenantId,
+ OpTime applyFromOpTime,
+ RandomAccessOplogBuffer* oplogBuffer,
+ std::shared_ptr<executor::TaskExecutor> executor);
+
+ virtual ~TenantOplogApplier();
+
+ /**
+ * Return a future which will be notified when that optime has been reached. Future will
+ * contain donor and recipient optime of last oplog entry in batch where donor optime is greater
+ * than passed-in time.
+ */
+ SemiFuture<OpTimePair> getNotificationForOpTime(OpTime donorOpTime);
+
+ /**
+ * Returns the last donor and recipient optimes of the last batch applied.
+ */
+ OpTimePair getLastBatchCompletedOpTimes();
+
+ void applyOplogBatch_forTest();
+
+ void setBatchLimits_forTest(TenantOplogBatcher::BatchLimits limits) {
+ _limits = limits;
+ }
+
+private:
+ Status _doStartup_inlock() noexcept final;
+ void _doShutdown_inlock() noexcept final;
+ void _finishShutdown(WithLock lk, Status status);
+
+ void _applyLoop(TenantOplogBatch batch);
+ void _handleError(Status status);
+
+ void _applyOplogBatch(const TenantOplogBatch& batch);
+ OpTimePair _writeNoOpEntries(const TenantOplogBatch& batch);
+ void _writeNoOpsForRange(OpObserver* opObserver,
+ std::vector<TenantOplogEntry>::const_iterator begin,
+ std::vector<TenantOplogEntry>::const_iterator end,
+ std::vector<OplogSlot>::iterator firstSlot);
+ void _makeWriterPool_inlock(int threadCount);
+ OpTime _getRecipientOpTime(const OpTime& donorOpTime);
+ // This is a convenience call for getRecipientOpTime which handles boost::none and nulls.
+ boost::optional<OpTime> _maybeGetRecipientOpTime(const boost::optional<OpTime>);
+ // _setRecipientOpTime must be called in optime order.
+ void _setRecipientOpTime(const OpTime& donorOpTime, const OpTime& recipientOpTime);
+
+ Mutex* _getMutex() noexcept final {
+ return &_mutex;
+ }
+
+ Mutex _mutex = MONGO_MAKE_LATCH("TenantOplogApplier::_mutex");
+ // All member variables are labeled with one of the following codes indicating the
+ // synchronization rules for accessing them.
+ //
+ // (R) Read-only in concurrent operation; no synchronization required.
+ // (S) Self-synchronizing; access according to class's own rules.
+ // (M) Reads and writes guarded by _mutex
+ // (X) Access only allowed from the main flow of control called from run() or constructor.
+
+ // Handles consuming oplog entries from the OplogBuffer for oplog application.
+ std::unique_ptr<TenantOplogBatcher> _oplogBatcher; // (R)
+ const UUID _migrationUuid; // (R)
+ const std::string _tenantId; // (R)
+ const OpTime _applyFromOpTime; // (R)
+ RandomAccessOplogBuffer* _oplogBuffer; // (R)
+ std::shared_ptr<executor::TaskExecutor> _executor; // (R)
+ std::unique_ptr<ThreadPool> _writerPool; // (S)
+ OpTimePair _lastBatchCompletedOpTimes; // (M)
+ std::vector<OpTimePair> _opTimeMapping; // (M)
+ TenantOplogBatcher::BatchLimits _limits; // (R)
+ std::map<OpTime, SharedPromise<OpTimePair>> _opTimeNotificationList; // (M)
+ Status _finalStatus = Status::OK(); // (M)
+};
+
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/tenant_oplog_applier_test.cpp b/src/mongo/db/repl/tenant_oplog_applier_test.cpp
new file mode 100644
index 00000000000..e25ff026a18
--- /dev/null
+++ b/src/mongo/db/repl/tenant_oplog_applier_test.cpp
@@ -0,0 +1,280 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest
+
+#include "mongo/platform/basic.h"
+
+#include <algorithm>
+#include <boost/optional/optional_io.hpp>
+#include <vector>
+
+#include "mongo/db/op_observer_noop.h"
+#include "mongo/db/op_observer_registry.h"
+#include "mongo/db/repl/oplog_batcher_test_fixture.h"
+#include "mongo/db/repl/replication_coordinator_mock.h"
+#include "mongo/db/repl/storage_interface_impl.h"
+#include "mongo/db/repl/tenant_migration_decoration.h"
+#include "mongo/db/repl/tenant_oplog_applier.h"
+#include "mongo/db/repl/tenant_oplog_batcher.h"
+#include "mongo/db/service_context_d_test_fixture.h"
+#include "mongo/db/service_context_test_fixture.h"
+#include "mongo/executor/thread_pool_task_executor_test_fixture.h"
+#include "mongo/logv2/log.h"
+#include "mongo/unittest/log_test.h"
+
+namespace mongo {
+
+using executor::TaskExecutor;
+using executor::ThreadPoolExecutorTest;
+
+namespace repl {
+
+class TenantOplogApplierTestOpObserver : public OpObserverNoop {
+public:
+ void onInternalOpMessage(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const boost::optional<UUID> uuid,
+ const BSONObj& msgObj,
+ const boost::optional<BSONObj> o2MsgObj,
+ const boost::optional<repl::OpTime> preImageOpTime,
+ const boost::optional<repl::OpTime> postImageOpTime,
+ const boost::optional<repl::OpTime> prevWriteOpTimeInTransaction,
+ const boost::optional<OplogSlot> slot) final {
+ MutableOplogEntry oplogEntry;
+ oplogEntry.setOpType(repl::OpTypeEnum::kNoop);
+ oplogEntry.setNss(nss);
+ oplogEntry.setUuid(uuid);
+ oplogEntry.setObject(msgObj);
+ oplogEntry.setObject2(o2MsgObj);
+ oplogEntry.setPreImageOpTime(preImageOpTime);
+ oplogEntry.setPostImageOpTime(postImageOpTime);
+ oplogEntry.setPrevWriteOpTimeInTransaction(prevWriteOpTimeInTransaction);
+ if (slot) {
+ oplogEntry.setOpTime(*slot);
+ } else {
+ oplogEntry.setOpTime(getNextOpTime(opCtx));
+ }
+ const auto& recipientInfo = tenantMigrationRecipientInfo(opCtx);
+ if (recipientInfo) {
+ oplogEntry.setFromTenantMigration(recipientInfo->uuid);
+ }
+ stdx::lock_guard lk(_mutex);
+ _entries.push_back(oplogEntry);
+ }
+
+ // Returns a vector of the oplog entries recorded, in optime order.
+ std::vector<MutableOplogEntry> getEntries() {
+ std::vector<MutableOplogEntry> entries;
+ {
+ stdx::lock_guard lk(_mutex);
+ entries = _entries;
+ }
+ std::sort(entries.begin(),
+ entries.end(),
+ [](const MutableOplogEntry& a, const MutableOplogEntry& b) {
+ return a.getOpTime() < b.getOpTime();
+ });
+ return entries;
+ }
+
+private:
+ mutable Mutex _mutex = MONGO_MAKE_LATCH("TenantOplogApplierTestOpObserver::_mutex");
+ std::vector<MutableOplogEntry> _entries;
+};
+constexpr auto dbName = "tenant_test"_sd;
+
+class TenantOplogApplierTest : public ServiceContextMongoDTest {
+public:
+ void setUp() override {
+ ServiceContextMongoDTest::setUp();
+
+ // Set up an OpObserver to track the documents OplogApplierImpl inserts.
+ auto service = getServiceContext();
+ auto opObserver = std::make_unique<TenantOplogApplierTestOpObserver>();
+ _opObserver = opObserver.get();
+ auto opObserverRegistry = dynamic_cast<OpObserverRegistry*>(service->getOpObserver());
+ opObserverRegistry->addObserver(std::move(opObserver));
+
+ auto network = std::make_unique<executor::NetworkInterfaceMock>();
+ _net = network.get();
+ executor::ThreadPoolMock::Options thread_pool_options;
+ thread_pool_options.onCreateThread = [] { Client::initThread("TenantOplogApplier"); };
+ _executor = makeSharedThreadPoolTestExecutor(std::move(network), thread_pool_options);
+ _executor->startup();
+ _oplogBuffer.startup(nullptr);
+
+ // Set up a replication coordinator and storage interface, needed for opObservers.
+ repl::StorageInterface::set(service, std::make_unique<repl::StorageInterfaceImpl>());
+ repl::ReplicationCoordinator::set(
+ service, std::make_unique<repl::ReplicationCoordinatorMock>(service));
+
+ // Set up oplog collection. If the WT storage engine is used, the oplog collection is
+ // expected to exist when fetching the next opTime (LocalOplogInfo::getNextOpTimes) to use
+ // for a write.
+ auto opCtx = cc().makeOperationContext();
+ repl::setOplogCollectionName(service);
+ repl::createOplog(opCtx.get());
+ }
+
+ void assertNoOpMatches(const OplogEntry& op, const MutableOplogEntry& noOp) {
+ ASSERT_BSONOBJ_EQ(op.toBSON(), noOp.getObject());
+ ASSERT_EQ(op.getNss(), noOp.getNss());
+ ASSERT_EQ(op.getUuid(), noOp.getUuid());
+ ASSERT_EQ(_migrationUuid, noOp.getFromTenantMigration());
+ }
+
+ void pushOps(const std::vector<OplogEntry>& ops) {
+ std::vector<BSONObj> bsonOps;
+ for (const auto& op : ops) {
+ bsonOps.push_back(op.toBSON());
+ }
+ _oplogBuffer.push(nullptr, bsonOps.begin(), bsonOps.end());
+ }
+
+protected:
+ OplogBufferMock _oplogBuffer;
+ executor::NetworkInterfaceMock* _net;
+ std::shared_ptr<executor::ThreadPoolTaskExecutor> _executor;
+ std::string _tenantId = "tenant";
+ UUID _migrationUuid = UUID::gen();
+ TenantOplogApplierTestOpObserver* _opObserver; // Owned by service context opObserverRegistry
+
+private:
+ unittest::MinimumLoggedSeverityGuard _replicationSeverityGuard{
+ logv2::LogComponent::kReplication, logv2::LogSeverity::Debug(1)};
+};
+
+TEST_F(TenantOplogApplierTest, NoOpsForSingleBatch) {
+ std::vector<OplogEntry> srcOps;
+ srcOps.push_back(makeInsertOplogEntry(1, NamespaceString(dbName, "foo")));
+ srcOps.push_back(makeInsertOplogEntry(2, NamespaceString(dbName, "bar")));
+ pushOps(srcOps);
+
+ TenantOplogApplier applier(_migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor);
+ ASSERT_OK(applier.startup());
+ // Even if we wait for the first op in a batch, it is the last op we should be notified on.
+ auto lastBatchTimes = applier.getNotificationForOpTime(srcOps.front().getOpTime()).get();
+ ASSERT_EQ(srcOps.back().getOpTime(), lastBatchTimes.donorOpTime);
+ auto entries = _opObserver->getEntries();
+ ASSERT_EQ(2, entries.size());
+ assertNoOpMatches(srcOps[0], entries[0]);
+ assertNoOpMatches(srcOps[1], entries[1]);
+ applier.shutdown();
+ applier.join();
+}
+
+TEST_F(TenantOplogApplierTest, NoOpsForLargeBatch) {
+ std::vector<OplogEntry> srcOps;
+ // This should be big enough to use several threads to do the writing
+ for (int i = 0; i < 64; i++) {
+ srcOps.push_back(makeInsertOplogEntry(i + 1, NamespaceString(dbName, "foo")));
+ }
+ pushOps(srcOps);
+
+ TenantOplogApplier applier(_migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor);
+ ASSERT_OK(applier.startup());
+ // Even if we wait for the first op in a batch, it is the last op we should be notified on.
+ auto lastBatchTimes = applier.getNotificationForOpTime(srcOps.front().getOpTime()).get();
+ ASSERT_EQ(srcOps.back().getOpTime(), lastBatchTimes.donorOpTime);
+ auto entries = _opObserver->getEntries();
+ ASSERT_EQ(srcOps.size(), entries.size());
+ for (size_t i = 0; i < srcOps.size(); i++) {
+ assertNoOpMatches(srcOps[i], entries[i]);
+ }
+ applier.shutdown();
+ applier.join();
+}
+
+TEST_F(TenantOplogApplierTest, NoOpsForMultipleBatches) {
+ std::vector<OplogEntry> srcOps;
+ srcOps.push_back(makeInsertOplogEntry(1, NamespaceString(dbName, "foo")));
+ srcOps.push_back(makeInsertOplogEntry(2, NamespaceString(dbName, "bar")));
+ srcOps.push_back(makeInsertOplogEntry(3, NamespaceString(dbName, "baz")));
+ srcOps.push_back(makeInsertOplogEntry(4, NamespaceString(dbName, "bif")));
+
+ TenantOplogApplier applier(_migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor);
+ TenantOplogBatcher::BatchLimits smallLimits(100 * 1024 /* bytes */, 2 /*ops*/);
+ applier.setBatchLimits_forTest(smallLimits);
+ ASSERT_OK(applier.startup());
+ auto firstBatchFuture = applier.getNotificationForOpTime(srcOps[0].getOpTime());
+ auto secondBatchFuture = applier.getNotificationForOpTime(srcOps[2].getOpTime());
+ pushOps(srcOps);
+ // We should see the last batch optime for each batch in our notifications.
+ ASSERT_EQ(srcOps[1].getOpTime(), firstBatchFuture.get().donorOpTime);
+ ASSERT_EQ(srcOps[3].getOpTime(), secondBatchFuture.get().donorOpTime);
+ auto entries = _opObserver->getEntries();
+ ASSERT_EQ(4, entries.size());
+ assertNoOpMatches(srcOps[0], entries[0]);
+ assertNoOpMatches(srcOps[1], entries[1]);
+ assertNoOpMatches(srcOps[2], entries[2]);
+ assertNoOpMatches(srcOps[3], entries[3]);
+ applier.shutdown();
+ applier.join();
+}
+
+TEST_F(TenantOplogApplierTest, NoOpsForLargeTransaction) {
+ std::vector<OplogEntry> innerOps1;
+ innerOps1.push_back(makeInsertOplogEntry(11, NamespaceString(dbName, "bar")));
+ innerOps1.push_back(makeInsertOplogEntry(12, NamespaceString(dbName, "bar")));
+ std::vector<OplogEntry> innerOps2;
+ innerOps2.push_back(makeInsertOplogEntry(21, NamespaceString(dbName, "bar")));
+ innerOps2.push_back(makeInsertOplogEntry(22, NamespaceString(dbName, "bar")));
+ std::vector<OplogEntry> innerOps3;
+ innerOps3.push_back(makeInsertOplogEntry(31, NamespaceString(dbName, "bar")));
+ innerOps3.push_back(makeInsertOplogEntry(32, NamespaceString(dbName, "bar")));
+
+ // Makes entries with ts from range [2, 5).
+ std::vector<OplogEntry> srcOps = makeMultiEntryTransactionOplogEntries(
+ 2, dbName, /* prepared */ false, {innerOps1, innerOps2, innerOps3});
+ pushOps(srcOps);
+
+ TenantOplogApplier applier(_migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor);
+ ASSERT_OK(applier.startup());
+ // The first two ops should come in the first batch.
+ auto firstBatchFuture = applier.getNotificationForOpTime(srcOps[0].getOpTime());
+ ASSERT_EQ(srcOps[1].getOpTime(), firstBatchFuture.get().donorOpTime);
+ // The last op is in a batch by itself.
+ auto secondBatchFuture = applier.getNotificationForOpTime(srcOps[2].getOpTime());
+ ASSERT_EQ(srcOps[2].getOpTime(), secondBatchFuture.get().donorOpTime);
+ auto entries = _opObserver->getEntries();
+ ASSERT_EQ(srcOps.size(), entries.size());
+ for (size_t i = 0; i < srcOps.size(); i++) {
+ assertNoOpMatches(srcOps[i], entries[i]);
+ }
+ // Make sure the no-ops got linked properly.
+ ASSERT_EQ(OpTime(), entries[0].getPrevWriteOpTimeInTransaction());
+ ASSERT_EQ(entries[0].getOpTime(), entries[1].getPrevWriteOpTimeInTransaction());
+ ASSERT_EQ(entries[1].getOpTime(), entries[2].getPrevWriteOpTimeInTransaction());
+ applier.shutdown();
+ applier.join();
+}
+
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/s/config_server_op_observer.h b/src/mongo/db/s/config_server_op_observer.h
index 3d9095124dd..d40eaadac68 100644
--- a/src/mongo/db/s/config_server_op_observer.h
+++ b/src/mongo/db/s/config_server_op_observer.h
@@ -101,7 +101,11 @@ public:
const NamespaceString& nss,
const boost::optional<UUID> uuid,
const BSONObj& msgObj,
- const boost::optional<BSONObj> o2MsgObj) override {}
+ const boost::optional<BSONObj> o2MsgObj,
+ const boost::optional<repl::OpTime> preImageOpTime,
+ const boost::optional<repl::OpTime> postImageOpTime,
+ const boost::optional<repl::OpTime> prevWriteOpTimeInTransaction,
+ const boost::optional<OplogSlot> slot) final{};
void onCreateCollection(OperationContext* opCtx,
Collection* coll,
diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp
index 69474685733..fd6c4b296a4 100644
--- a/src/mongo/db/s/migration_source_manager.cpp
+++ b/src/mongo/db/s/migration_source_manager.cpp
@@ -640,8 +640,15 @@ void MigrationSourceManager::_notifyChangeStreamsOnRecipientFirstChunk(
writeConflictRetry(
_opCtx, "migrateChunkToNewShard", NamespaceString::kRsOplogNamespace.ns(), [&] {
WriteUnitOfWork uow(_opCtx);
- serviceContext->getOpObserver()->onInternalOpMessage(
- _opCtx, getNss(), *_collectionUUID, BSON("msg" << dbgMessage), o2Message);
+ serviceContext->getOpObserver()->onInternalOpMessage(_opCtx,
+ getNss(),
+ *_collectionUUID,
+ BSON("msg" << dbgMessage),
+ o2Message,
+ boost::none,
+ boost::none,
+ boost::none,
+ boost::none);
uow.commit();
});
}
diff --git a/src/mongo/db/s/session_catalog_migration_source.cpp b/src/mongo/db/s/session_catalog_migration_source.cpp
index 5d82b1d5875..fca032c80a3 100644
--- a/src/mongo/db/s/session_catalog_migration_source.cpp
+++ b/src/mongo/db/s/session_catalog_migration_source.cpp
@@ -163,7 +163,15 @@ SessionCatalogMigrationSource::SessionCatalogMigrationSource(OperationContext* o
WriteUnitOfWork wuow(opCtx);
opCtx->getClient()->getServiceContext()->getOpObserver()->onInternalOpMessage(
- opCtx, _ns, {}, {}, message);
+ opCtx,
+ _ns,
+ {},
+ {},
+ message,
+ boost::none,
+ boost::none,
+ boost::none,
+ boost::none);
wuow.commit();
});
}
diff --git a/src/mongo/db/s/shard_server_op_observer.h b/src/mongo/db/s/shard_server_op_observer.h
index 0a40af218d1..8a3beea80a9 100644
--- a/src/mongo/db/s/shard_server_op_observer.h
+++ b/src/mongo/db/s/shard_server_op_observer.h
@@ -99,7 +99,11 @@ public:
const NamespaceString& nss,
const boost::optional<UUID> uuid,
const BSONObj& msgObj,
- const boost::optional<BSONObj> o2MsgObj) override {}
+ const boost::optional<BSONObj> o2MsgObj,
+ const boost::optional<repl::OpTime> preImageOpTime,
+ const boost::optional<repl::OpTime> postImageOpTime,
+ const boost::optional<repl::OpTime> prevWriteOpTimeInTransaction,
+ const boost::optional<OplogSlot> slot) final{};
void onCreateCollection(OperationContext* opCtx,
Collection* coll,