diff options
Diffstat (limited to 'src/mongo')
21 files changed, 1000 insertions, 25 deletions
diff --git a/src/mongo/db/auth/auth_op_observer.h b/src/mongo/db/auth/auth_op_observer.h index 5920146860e..2dc13af3bd8 100644 --- a/src/mongo/db/auth/auth_op_observer.h +++ b/src/mongo/db/auth/auth_op_observer.h @@ -99,7 +99,11 @@ public: const NamespaceString& nss, const boost::optional<UUID> uuid, const BSONObj& msgObj, - const boost::optional<BSONObj> o2MsgObj) final {} + const boost::optional<BSONObj> o2MsgObj, + const boost::optional<repl::OpTime> preImageOpTime, + const boost::optional<repl::OpTime> postImageOpTime, + const boost::optional<repl::OpTime> prevWriteOpTimeInTransaction, + const boost::optional<OplogSlot> slot) final{}; void onCreateCollection(OperationContext* opCtx, Collection* coll, diff --git a/src/mongo/db/free_mon/free_mon_op_observer.h b/src/mongo/db/free_mon/free_mon_op_observer.h index 30642076e7e..f06efef564f 100644 --- a/src/mongo/db/free_mon/free_mon_op_observer.h +++ b/src/mongo/db/free_mon/free_mon_op_observer.h @@ -99,7 +99,11 @@ public: const NamespaceString& nss, const boost::optional<UUID> uuid, const BSONObj& msgObj, - const boost::optional<BSONObj> o2MsgObj) final {} + const boost::optional<BSONObj> o2MsgObj, + const boost::optional<repl::OpTime> preImageOpTime, + const boost::optional<repl::OpTime> postImageOpTime, + const boost::optional<repl::OpTime> prevWriteOpTimeInTransaction, + const boost::optional<OplogSlot> slot) final{}; void onCreateCollection(OperationContext* opCtx, Collection* coll, diff --git a/src/mongo/db/op_observer.h b/src/mongo/db/op_observer.h index aefdf1fc04e..51753376d88 100644 --- a/src/mongo/db/op_observer.h +++ b/src/mongo/db/op_observer.h @@ -152,20 +152,33 @@ public: /** * Logs a no-op with "msgObj" in the o field into oplog. * - * This function should only be used internally. "nss", "uuid" and the o2 field should never be - * exposed to users (for instance through the appendOplogNote command). + * This function should only be used internally. "nss", "uuid", "o2", and the opTimes should + * never be exposed to users (for instance through the appendOplogNote command). */ - virtual void onInternalOpMessage(OperationContext* opCtx, - const NamespaceString& nss, - const boost::optional<UUID> uuid, - const BSONObj& msgObj, - const boost::optional<BSONObj> o2MsgObj) = 0; + virtual void onInternalOpMessage( + OperationContext* opCtx, + const NamespaceString& nss, + const boost::optional<UUID> uuid, + const BSONObj& msgObj, + const boost::optional<BSONObj> o2MsgObj, + const boost::optional<repl::OpTime> preImageOpTime, + const boost::optional<repl::OpTime> postImageOpTime, + const boost::optional<repl::OpTime> prevWriteOpTimeInTransaction, + const boost::optional<OplogSlot> slot) = 0; /** * Logs a no-op with "msgObj" in the o field into oplog. */ void onOpMessage(OperationContext* opCtx, const BSONObj& msgObj) { - onInternalOpMessage(opCtx, {}, boost::none, msgObj, boost::none); + onInternalOpMessage(opCtx, + {}, + boost::none, + msgObj, + boost::none, + boost::none, + boost::none, + boost::none, + boost::none); } virtual void onCreateCollection(OperationContext* opCtx, diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp index 193fa3885da..11c6402566e 100644 --- a/src/mongo/db/op_observer_impl.cpp +++ b/src/mongo/db/op_observer_impl.cpp @@ -356,6 +356,10 @@ void OpObserverImpl::onStartIndexBuildSinglePhase(OperationContext* opCtx, {}, boost::none, BSON("msg" << std::string(str::stream() << "Creating indexes. Coll: " << nss)), + boost::none, + boost::none, + boost::none, + boost::none, boost::none); } @@ -646,17 +650,28 @@ void OpObserverImpl::onDelete(OperationContext* opCtx, } } -void OpObserverImpl::onInternalOpMessage(OperationContext* opCtx, - const NamespaceString& nss, - const boost::optional<UUID> uuid, - const BSONObj& msgObj, - const boost::optional<BSONObj> o2MsgObj) { +void OpObserverImpl::onInternalOpMessage( + OperationContext* opCtx, + const NamespaceString& nss, + const boost::optional<UUID> uuid, + const BSONObj& msgObj, + const boost::optional<BSONObj> o2MsgObj, + const boost::optional<repl::OpTime> preImageOpTime, + const boost::optional<repl::OpTime> postImageOpTime, + const boost::optional<repl::OpTime> prevWriteOpTimeInTransaction, + const boost::optional<OplogSlot> slot) { MutableOplogEntry oplogEntry; oplogEntry.setOpType(repl::OpTypeEnum::kNoop); oplogEntry.setNss(nss); oplogEntry.setUuid(uuid); oplogEntry.setObject(msgObj); oplogEntry.setObject2(o2MsgObj); + oplogEntry.setPreImageOpTime(preImageOpTime); + oplogEntry.setPostImageOpTime(postImageOpTime); + oplogEntry.setPrevWriteOpTimeInTransaction(prevWriteOpTimeInTransaction); + if (slot) { + oplogEntry.setOpTime(*slot); + } logOperation(opCtx, &oplogEntry); } diff --git a/src/mongo/db/op_observer_impl.h b/src/mongo/db/op_observer_impl.h index 52c50da7b91..266cdb603a6 100644 --- a/src/mongo/db/op_observer_impl.h +++ b/src/mongo/db/op_observer_impl.h @@ -106,7 +106,11 @@ public: const NamespaceString& nss, const boost::optional<UUID> uuid, const BSONObj& msgObj, - const boost::optional<BSONObj> o2MsgObj) final; + const boost::optional<BSONObj> o2MsgObj, + const boost::optional<repl::OpTime> preImageOpTime, + const boost::optional<repl::OpTime> postImageOpTime, + const boost::optional<repl::OpTime> prevWriteOpTimeInTransaction, + const boost::optional<OplogSlot> slot) final; void onCreateCollection(OperationContext* opCtx, Collection* coll, const NamespaceString& collectionName, diff --git a/src/mongo/db/op_observer_noop.h b/src/mongo/db/op_observer_noop.h index f3636fdc43f..2d4cb62da31 100644 --- a/src/mongo/db/op_observer_noop.h +++ b/src/mongo/db/op_observer_noop.h @@ -86,7 +86,11 @@ public: const NamespaceString& nss, const boost::optional<UUID> uuid, const BSONObj& msgObj, - const boost::optional<BSONObj> o2MsgObj) override {} + const boost::optional<BSONObj> o2MsgObj, + const boost::optional<repl::OpTime> preImageOpTime, + const boost::optional<repl::OpTime> postImageOpTime, + const boost::optional<repl::OpTime> prevWriteOpTimeInTransaction, + const boost::optional<OplogSlot> slot) override {} void onCreateCollection(OperationContext* opCtx, Collection* coll, const NamespaceString& collectionName, diff --git a/src/mongo/db/op_observer_registry.h b/src/mongo/db/op_observer_registry.h index 9ce4ec255a0..6988b0560bb 100644 --- a/src/mongo/db/op_observer_registry.h +++ b/src/mongo/db/op_observer_registry.h @@ -153,10 +153,22 @@ public: const NamespaceString& nss, const boost::optional<UUID> uuid, const BSONObj& msgObj, - const boost::optional<BSONObj> o2MsgObj) override { + const boost::optional<BSONObj> o2MsgObj, + const boost::optional<repl::OpTime> preImageOpTime, + const boost::optional<repl::OpTime> postImageOpTime, + const boost::optional<repl::OpTime> prevWriteOpTimeInTransaction, + const boost::optional<OplogSlot> slot) override { ReservedTimes times{opCtx}; for (auto& o : _observers) - o->onInternalOpMessage(opCtx, nss, uuid, msgObj, o2MsgObj); + o->onInternalOpMessage(opCtx, + nss, + uuid, + msgObj, + o2MsgObj, + preImageOpTime, + postImageOpTime, + prevWriteOpTimeInTransaction, + slot); } void onCreateCollection(OperationContext* const opCtx, diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index e5efba6af97..77fdf036e9c 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -35,6 +35,7 @@ env.Library( 'apply_ops.cpp', 'oplog.cpp', 'oplog_entry_or_grouped_inserts.cpp', + 'tenant_migration_decoration.cpp', 'transaction_oplog_application.cpp', env.Idlc('apply_ops.idl')[0], ], @@ -1381,6 +1382,7 @@ env.CppUnitTest( 'sync_source_resolver_test.cpp', 'task_runner_test.cpp', 'task_runner_test_fixture.cpp', + 'tenant_oplog_applier_test.cpp', 'tenant_oplog_batcher_test.cpp', 'vote_requester_test.cpp', 'wait_for_majority_service_test.cpp', @@ -1642,10 +1644,12 @@ env.Library( env.Library( target='tenant_oplog_processing', source=[ - 'tenant_oplog_batcher.cpp' + 'tenant_oplog_batcher.cpp', + 'tenant_oplog_applier.cpp' ], LIBDEPS_PRIVATE=[ '$BUILD_DIR/mongo/base', + '$BUILD_DIR/mongo/db/concurrency/write_conflict_exception', 'abstract_async_component', 'oplog', 'oplog_application_interface', diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index 61ce9f4da5d..4398102e372 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -79,6 +79,7 @@ #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/repl/repl_server_parameters_gen.h" #include "mongo/db/repl/replication_coordinator.h" +#include "mongo/db/repl/tenant_migration_decoration.h" #include "mongo/db/repl/tenant_migration_donor_util.h" #include "mongo/db/repl/timestamp_block.h" #include "mongo/db/repl/transaction_oplog_application.h" @@ -284,6 +285,12 @@ OpTime logOp(OperationContext* opCtx, MutableOplogEntry* oplogEntry) { !oplogEntry->getStatementId()); return {}; } + // If this oplog entry is from a tenant migration, include the tenant migration + // UUID. + const auto& recipientInfo = tenantMigrationRecipientInfo(opCtx); + if (recipientInfo) { + oplogEntry->setFromTenantMigration(recipientInfo->uuid); + } // Use OplogAccessMode::kLogOp to avoid recursive locking. AutoGetOplog oplogWrite(opCtx, OplogAccessMode::kLogOp); @@ -329,6 +336,12 @@ std::vector<OpTime> logInsertOps(OperationContext* opCtx, std::vector<InsertStatement>::const_iterator end) { invariant(begin != end); oplogEntryTemplate->setOpType(repl::OpTypeEnum::kInsert); + // If this oplog entry is from a tenant migration, include the tenant migration + // UUID. + const auto& recipientInfo = tenantMigrationRecipientInfo(opCtx); + if (recipientInfo) { + oplogEntryTemplate->setFromTenantMigration(recipientInfo->uuid); + } auto nss = oplogEntryTemplate->getNss(); auto replCoord = ReplicationCoordinator::get(opCtx); diff --git a/src/mongo/db/repl/oplog_entry.h b/src/mongo/db/repl/oplog_entry.h index 036162e02d3..d29e6b6acc8 100644 --- a/src/mongo/db/repl/oplog_entry.h +++ b/src/mongo/db/repl/oplog_entry.h @@ -206,6 +206,7 @@ public: using MutableOplogEntry::kDestinedRecipientFieldName; using MutableOplogEntry::kDurableReplOperationFieldName; using MutableOplogEntry::kFromMigrateFieldName; + using MutableOplogEntry::kFromTenantMigrationFieldName; using MutableOplogEntry::kHashFieldName; using MutableOplogEntry::kNssFieldName; using MutableOplogEntry::kObject2FieldName; @@ -231,6 +232,7 @@ public: using MutableOplogEntry::getDestinedRecipient; using MutableOplogEntry::getDurableReplOperation; using MutableOplogEntry::getFromMigrate; + using MutableOplogEntry::getFromTenantMigration; using MutableOplogEntry::getHash; using MutableOplogEntry::getNss; using MutableOplogEntry::getObject; diff --git a/src/mongo/db/repl/oplog_entry.idl b/src/mongo/db/repl/oplog_entry.idl index 1bd4e2c2a6a..5147eb9dfe0 100644 --- a/src/mongo/db/repl/oplog_entry.idl +++ b/src/mongo/db/repl/oplog_entry.idl @@ -119,6 +119,11 @@ structs: type: bool optional: true description: "An operation caused by a chunk migration" + fromTenantMigration: + type: uuid + optional: true + description: "Contains the UUID of a tenant migration for an operation caused by + one." _id: type: objectid optional: true diff --git a/src/mongo/db/repl/oplog_test.cpp b/src/mongo/db/repl/oplog_test.cpp index b19713167bf..3451e54d592 100644 --- a/src/mongo/db/repl/oplog_test.cpp +++ b/src/mongo/db/repl/oplog_test.cpp @@ -30,6 +30,7 @@ #include "mongo/platform/basic.h" #include <algorithm> +#include <boost/optional/optional_io.hpp> #include <functional> #include <map> #include <utility> @@ -44,6 +45,7 @@ #include "mongo/db/repl/oplog_interface_local.h" #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/repl/replication_coordinator_mock.h" +#include "mongo/db/repl/tenant_migration_decoration.h" #include "mongo/db/service_context_d_test_fixture.h" #include "mongo/platform/mutex.h" #include "mongo/unittest/barrier.h" @@ -356,6 +358,45 @@ TEST_F(OplogTest, ConcurrentLogOpRevertLastOplogEntry) { _checkOplogEntry(oplogEntries[0], *(opTimeNssMap.cbegin())); } +TEST_F(OplogTest, MigrationIdAddedToOplog) { + auto opCtx = cc().makeOperationContext(); + auto migrationUuid = UUID::gen(); + tenantMigrationRecipientInfo(opCtx.get()) = + boost::make_optional<TenantMigrationRecipientInfo>(migrationUuid); + + const NamespaceString nss("test.coll"); + auto msgObj = BSON("msg" + << "hello, world!"); + + // Write to the oplog. + OpTime opTime; + { + MutableOplogEntry oplogEntry; + oplogEntry.setOpType(repl::OpTypeEnum::kNoop); + oplogEntry.setNss(nss); + oplogEntry.setObject(msgObj); + oplogEntry.setWallClockTime(Date_t::now()); + AutoGetDb autoDb(opCtx.get(), nss.db(), MODE_X); + WriteUnitOfWork wunit(opCtx.get()); + opTime = logOp(opCtx.get(), &oplogEntry); + ASSERT_FALSE(opTime.isNull()); + wunit.commit(); + } + + OplogEntry oplogEntry = _getSingleOplogEntry(opCtx.get()); + + // Ensure that msg fields were properly added to the oplog entry. + ASSERT_EQUALS(opTime, oplogEntry.getOpTime()) + << "OpTime returned from logOp() did not match that in the oplog entry written to the " + "oplog: " + << oplogEntry.toBSON(); + ASSERT(OpTypeEnum::kNoop == oplogEntry.getOpType()) + << "Expected 'n' op type but found '" << OpType_serializer(oplogEntry.getOpType()) + << "' instead: " << oplogEntry.toBSON(); + ASSERT_BSONOBJ_EQ(msgObj, oplogEntry.getObject()); + ASSERT_EQ(migrationUuid, oplogEntry.getFromTenantMigration()); +} + } // namespace } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/tenant_migration_decoration.cpp b/src/mongo/db/repl/tenant_migration_decoration.cpp new file mode 100644 index 00000000000..598c70156a9 --- /dev/null +++ b/src/mongo/db/repl/tenant_migration_decoration.cpp @@ -0,0 +1,40 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/repl/tenant_migration_decoration.h" + +namespace mongo { +namespace repl { +const OperationContext::Decoration<boost::optional<TenantMigrationRecipientInfo>> + tenantMigrationRecipientInfo = + OperationContext::declareDecoration<boost::optional<TenantMigrationRecipientInfo>>(); +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/tenant_migration_decoration.h b/src/mongo/db/repl/tenant_migration_decoration.h new file mode 100644 index 00000000000..25087638280 --- /dev/null +++ b/src/mongo/db/repl/tenant_migration_decoration.h @@ -0,0 +1,47 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <boost/optional.hpp> + +#include "mongo/db/operation_context.h" +#include "mongo/util/uuid.h" + +namespace mongo { +namespace repl { +struct TenantMigrationRecipientInfo { + TenantMigrationRecipientInfo(const UUID& in_uuid) : uuid(in_uuid) {} + UUID uuid; +}; +extern const OperationContext::Decoration<boost::optional<TenantMigrationRecipientInfo>> + tenantMigrationRecipientInfo; + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/tenant_oplog_applier.cpp b/src/mongo/db/repl/tenant_oplog_applier.cpp new file mode 100644 index 00000000000..e3aae536a18 --- /dev/null +++ b/src/mongo/db/repl/tenant_oplog_applier.cpp @@ -0,0 +1,319 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kReplication + +#include "mongo/platform/basic.h" + +#include "mongo/db/repl/tenant_oplog_applier.h" + +#include <algorithm> + +#include "mongo/db/auth/authorization_session.h" +#include "mongo/db/catalog_raii.h" +#include "mongo/db/concurrency/write_conflict_exception.h" +#include "mongo/db/op_observer.h" +#include "mongo/db/repl/apply_ops.h" +#include "mongo/db/repl/tenant_migration_decoration.h" +#include "mongo/db/repl/tenant_oplog_batcher.h" +#include "mongo/logv2/log.h" +#include "mongo/util/concurrency/thread_pool.h" + +namespace mongo { +namespace repl { + +// These batch sizes are pretty arbitrary. +// TODO(SERVER-50024): come up with some reasonable numbers, and make them a settable parameter. +constexpr size_t kTenantApplierBatchSizeBytes = 16 * 1024 * 1024; + +// TODO(SERVER-50024): kTenantApplierBatchSizeOps is currently chosen as the default value of +// internalInsertMaxBatchSize. This is probably reasonable but should be a settable parameter. +constexpr size_t kTenantApplierBatchSizeOps = 500; +const size_t kMinOplogEntriesPerThread = 16; + +// TODO(SERVER-50024):: This is also arbitary and should be a settable parameter. +constexpr size_t kTenantApplierThreadCount = 5; + +TenantOplogApplier::TenantOplogApplier(const UUID& migrationUuid, + const std::string& tenantId, + OpTime applyFromOpTime, + RandomAccessOplogBuffer* oplogBuffer, + std::shared_ptr<executor::TaskExecutor> executor) + : AbstractAsyncComponent(executor.get(), std::string("TenantOplogApplier_") + tenantId), + _migrationUuid(migrationUuid), + _tenantId(tenantId), + _applyFromOpTime(applyFromOpTime), + _oplogBuffer(oplogBuffer), + _executor(std::move(executor)), + _limits(kTenantApplierBatchSizeBytes, kTenantApplierBatchSizeOps) {} + +void TenantOplogApplier::_makeWriterPool_inlock(int threadCount) { + ThreadPool::Options options; + options.threadNamePrefix = "TenantOplogWriter-" + _tenantId + "-"; + options.poolName = "TenantOplogWriterThreadPool-" + _tenantId; + options.maxThreads = options.minThreads = static_cast<size_t>(threadCount); + options.onCreateThread = [migrationUuid = this->_migrationUuid](const std::string&) { + Client::initThread(getThreadName()); + AuthorizationSession::get(cc())->grantInternalAuthorization(&cc()); + }; + _writerPool = std::make_unique<ThreadPool>(options); + _writerPool->startup(); +} + +TenantOplogApplier::~TenantOplogApplier() {} + +SemiFuture<TenantOplogApplier::OpTimePair> TenantOplogApplier::getNotificationForOpTime( + OpTime donorOpTime) { + stdx::lock_guard lk(_mutex); + // If we're not running, return a future with the status we shut down with. + if (!_isActive_inlock()) { + return SemiFuture<OpTimePair>::makeReady(_finalStatus); + } + // If this optime has already passed, just return a ready future. + if (_lastBatchCompletedOpTimes.donorOpTime >= donorOpTime) { + return SemiFuture<OpTimePair>::makeReady(_lastBatchCompletedOpTimes); + } + + // This will pull a new future off the existing promise for this time if it exists, otherwise + // it constructs a new promise and pulls a future off of it. + auto [iter, isNew] = _opTimeNotificationList.try_emplace(donorOpTime); + return iter->second.getFuture().semi(); +} + +Status TenantOplogApplier::_doStartup_inlock() noexcept { + _makeWriterPool_inlock(kTenantApplierThreadCount); + _oplogBatcher = std::make_unique<TenantOplogBatcher>(_tenantId, _oplogBuffer, _executor); + auto status = _oplogBatcher->startup(); + if (!status.isOK()) + return status; + auto fut = _oplogBatcher->getNextBatch(_limits); + std::move(fut) + .thenRunOn(_executor) + .then([&](TenantOplogBatch batch) { _applyLoop(std::move(batch)); }) + .onError([&](Status status) { _handleError(status); }) + .getAsync([](auto status) {}); + return Status::OK(); +} + +void TenantOplogApplier::_doShutdown_inlock() noexcept { + // Shutting down the oplog batcher will make the _applyLoop stop with an error future, thus + // shutting down the applier. + _oplogBatcher->shutdown(); +} + +void TenantOplogApplier::_applyLoop(TenantOplogBatch batch) { + // Getting the future for the next batch here means the batcher can retrieve the next batch + // while the applier is processing the current one. + auto nextBatchFuture = _oplogBatcher->getNextBatch(_limits); + try { + _applyOplogBatch(batch); + } catch (const DBException& e) { + _handleError(e.toStatus()); + return; + } + { + stdx::lock_guard lk(_mutex); + if (_isShuttingDown_inlock()) { + _finishShutdown(lk, {ErrorCodes::CallbackCanceled, "Tenant Oplog Applier shut down"}); + return; + } + } + std::move(nextBatchFuture) + .thenRunOn(_executor) + .then([&](TenantOplogBatch batch) { _applyLoop(std::move(batch)); }) + .onError([&](Status status) { _handleError(status); }) + .getAsync([](auto status) {}); +} + +void TenantOplogApplier::_handleError(Status status) { + LOGV2_DEBUG(4886005, + 1, + "TenantOplogApplier::_handleError", + "tenant"_attr = _tenantId, + "migrationUuid"_attr = _migrationUuid, + "status"_attr = status); + shutdown(); + stdx::lock_guard lk(_mutex); + // If we reach _handleError, it means the applyLoop is not running. + _finishShutdown(lk, status); +} + +void TenantOplogApplier::_finishShutdown(WithLock, Status status) { + // Any unfulfilled notifications are errored out. + for (auto& listEntry : _opTimeNotificationList) { + listEntry.second.setError(status); + } + _opTimeNotificationList.clear(); + _writerPool->shutdown(); + _writerPool->join(); + _finalStatus = status; + _transitionToComplete_inlock(); +} + +void TenantOplogApplier::_applyOplogBatch(const TenantOplogBatch& batch) { + LOGV2_DEBUG(4886004, + 1, + "Tenant Oplog Applier starting to apply batch", + "tenant"_attr = _tenantId, + "migrationUuid"_attr = _migrationUuid, + "firstDonorOptime"_attr = batch.ops.front().entry.getOpTime(), + "lastDonorOptime"_attr = batch.ops.back().entry.getOpTime()); + auto lastBatchCompletedOpTimes = _writeNoOpEntries(batch); + stdx::lock_guard lk(_mutex); + _lastBatchCompletedOpTimes = lastBatchCompletedOpTimes; + LOGV2_DEBUG(4886002, + 1, + "Tenant Oplog Applier finished applying batch", + "tenant"_attr = _tenantId, + "migrationUuid"_attr = _migrationUuid, + "lastDonorOptime"_attr = lastBatchCompletedOpTimes.donorOpTime, + "lastRecipientOptime"_attr = lastBatchCompletedOpTimes.recipientOpTime); + + // Notify all the waiters on optimes before and including _lastBatchCompletedOpTimes. + auto firstUnexpiredIter = + _opTimeNotificationList.upper_bound(_lastBatchCompletedOpTimes.donorOpTime); + for (auto iter = _opTimeNotificationList.begin(); iter != firstUnexpiredIter; iter++) { + iter->second.emplaceValue(_lastBatchCompletedOpTimes); + } + _opTimeNotificationList.erase(_opTimeNotificationList.begin(), firstUnexpiredIter); +} + +TenantOplogApplier::OpTimePair TenantOplogApplier::_writeNoOpEntries( + const TenantOplogBatch& batch) { + auto opCtx = cc().makeOperationContext(); + auto* opObserver = cc().getServiceContext()->getOpObserver(); + + WriteUnitOfWork wuow(opCtx.get()); + // Reserve oplog slots for all entries. This allows us to write them in parallel. + auto oplogSlots = repl::getNextOpTimes(opCtx.get(), batch.ops.size()); + auto slotIter = oplogSlots.begin(); + for (const auto& op : batch.ops) { + _setRecipientOpTime(op.entry.getOpTime(), *slotIter++); + } + const size_t numOplogThreads = _writerPool->getStats().numThreads; + const size_t numOpsPerThread = + std::max(kMinOplogEntriesPerThread, (batch.ops.size() / numOplogThreads)); + slotIter = oplogSlots.begin(); + auto opsIter = batch.ops.begin(); + LOGV2_DEBUG(4886003, + 1, + "Tenant Oplog Applier scheduling no-ops ", + "tenant"_attr = _tenantId, + "migrationUuid"_attr = _migrationUuid, + "firstDonorOptime"_attr = batch.ops.front().entry.getOpTime(), + "lastDonorOptime"_attr = batch.ops.back().entry.getOpTime(), + "numOplogThreads"_attr = numOplogThreads, + "numOpsPerThread"_attr = numOpsPerThread); + size_t numOpsRemaining = batch.ops.size(); + for (size_t thread = 0; thread < numOplogThreads && opsIter != batch.ops.end(); thread++) { + auto numOps = std::min(numOpsPerThread, numOpsRemaining); + if (thread == numOplogThreads - 1) { + numOps = numOpsRemaining; + } + _writerPool->schedule([=](Status status) { + invariant(status); + _writeNoOpsForRange(opObserver, opsIter, opsIter + numOps, slotIter); + }); + slotIter += numOps; + opsIter += numOps; + numOpsRemaining -= numOps; + } + invariant(opsIter == batch.ops.end()); + _writerPool->waitForIdle(); + return {batch.ops.back().entry.getOpTime(), oplogSlots.back()}; +} + + +// These two routines can't be the ultimate solution. It's not necessarily practical to keep a list +// of every op we've written, and it doesn't work for failover. But as far as I can tell, it's +// possible to refer to oplog entries arbitarily far back. We probably don't want to search the +// oplog each time because it requires a collection scan to do so. +// TODO(SERVER-50263): Come up with the right way to do this. +OpTime TenantOplogApplier::_getRecipientOpTime(const OpTime& donorOpTime) { + stdx::lock_guard lk(_mutex); + auto times = std::upper_bound( + _opTimeMapping.begin(), _opTimeMapping.end(), OpTimePair(donorOpTime, OpTime())); + uassert(4886000, + str::stream() << "Recipient optime not found for donor optime " + << donorOpTime.toString(), + times->donorOpTime == donorOpTime); + return times->recipientOpTime; +} + +void TenantOplogApplier::_setRecipientOpTime(const OpTime& donorOpTime, + const OpTime& recipientOpTime) { + stdx::lock_guard lk(_mutex); + // The _opTimeMapping is an array strictly ordered by donorOpTime; this uassert assures the + // order remains intact. + uassert(4886001, + str::stream() << "Donor optimes inserted out of order " + << _opTimeMapping.back().donorOpTime.toString() + << " >= " << donorOpTime.toString(), + _opTimeMapping.empty() || _opTimeMapping.back().donorOpTime < donorOpTime); + _opTimeMapping.emplace_back(donorOpTime, recipientOpTime); +} + +boost::optional<OpTime> TenantOplogApplier::_maybeGetRecipientOpTime( + const boost::optional<OpTime> donorOpTime) { + if (!donorOpTime || donorOpTime->isNull()) + return donorOpTime; + return _getRecipientOpTime(*donorOpTime); +} + +void TenantOplogApplier::_writeNoOpsForRange(OpObserver* opObserver, + std::vector<TenantOplogEntry>::const_iterator begin, + std::vector<TenantOplogEntry>::const_iterator end, + std::vector<OplogSlot>::iterator firstSlot) { + auto opCtx = cc().makeOperationContext(); + tenantMigrationRecipientInfo(opCtx.get()) = + boost::make_optional<TenantMigrationRecipientInfo>(_migrationUuid); + AutoGetOplog oplogWrite(opCtx.get(), OplogAccessMode::kWrite); + writeConflictRetry( + opCtx.get(), "writeTenantNoOps", NamespaceString::kRsOplogNamespace.ns(), [&] { + WriteUnitOfWork wuow(opCtx.get()); + auto slot = firstSlot; + for (auto iter = begin; iter != end; iter++, slot++) { + opObserver->onInternalOpMessage( + opCtx.get(), + iter->entry.getNss(), + iter->entry.getUuid(), + iter->entry.toBSON(), + BSONObj(), + // We link the no-ops together by recipient op time the same way the actual ops + // were linked together by donor op time. This is to allow retryable writes + // and changestreams to find the ops they need. + _maybeGetRecipientOpTime(iter->entry.getPreImageOpTime()), + _maybeGetRecipientOpTime(iter->entry.getPostImageOpTime()), + _maybeGetRecipientOpTime(iter->entry.getPrevWriteOpTimeInTransaction()), + *slot); + } + }); +} +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/tenant_oplog_applier.h b/src/mongo/db/repl/tenant_oplog_applier.h new file mode 100644 index 00000000000..ca415d2e18a --- /dev/null +++ b/src/mongo/db/repl/tenant_oplog_applier.h @@ -0,0 +1,145 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <string> +#include <vector> + +#include "mongo/db/repl/abstract_async_component.h" +#include "mongo/db/repl/oplog.h" +#include "mongo/db/repl/oplog_buffer.h" +#include "mongo/db/repl/oplog_entry.h" +#include "mongo/db/repl/tenant_oplog_batcher.h" +#include "mongo/util/future.h" + +namespace mongo { +class ThreadPool; + +namespace repl { + +/** + * This class reads oplog entries from a tenant migration, then applies those entries to the + * (real) oplog, then writes out no-op entries corresponding to the original oplog entries + * from the oplog buffer. Applier will not apply, but will write no-op entries for, + * entries before the applyFromOpTime. + * + */ +class TenantOplogApplier : public AbstractAsyncComponent { +public: + struct OpTimePair { + OpTimePair() = default; + OpTimePair(OpTime in_donorOpTime, OpTime in_recipientOpTime) + : donorOpTime(in_donorOpTime), recipientOpTime(in_recipientOpTime) {} + bool operator<(const OpTimePair& other) const { + if (donorOpTime == other.donorOpTime) + return recipientOpTime < other.recipientOpTime; + return donorOpTime < other.donorOpTime; + } + OpTime donorOpTime; + OpTime recipientOpTime; + }; + + TenantOplogApplier(const UUID& migrationUuid, + const std::string& tenantId, + OpTime applyFromOpTime, + RandomAccessOplogBuffer* oplogBuffer, + std::shared_ptr<executor::TaskExecutor> executor); + + virtual ~TenantOplogApplier(); + + /** + * Return a future which will be notified when that optime has been reached. Future will + * contain donor and recipient optime of last oplog entry in batch where donor optime is greater + * than passed-in time. + */ + SemiFuture<OpTimePair> getNotificationForOpTime(OpTime donorOpTime); + + /** + * Returns the last donor and recipient optimes of the last batch applied. + */ + OpTimePair getLastBatchCompletedOpTimes(); + + void applyOplogBatch_forTest(); + + void setBatchLimits_forTest(TenantOplogBatcher::BatchLimits limits) { + _limits = limits; + } + +private: + Status _doStartup_inlock() noexcept final; + void _doShutdown_inlock() noexcept final; + void _finishShutdown(WithLock lk, Status status); + + void _applyLoop(TenantOplogBatch batch); + void _handleError(Status status); + + void _applyOplogBatch(const TenantOplogBatch& batch); + OpTimePair _writeNoOpEntries(const TenantOplogBatch& batch); + void _writeNoOpsForRange(OpObserver* opObserver, + std::vector<TenantOplogEntry>::const_iterator begin, + std::vector<TenantOplogEntry>::const_iterator end, + std::vector<OplogSlot>::iterator firstSlot); + void _makeWriterPool_inlock(int threadCount); + OpTime _getRecipientOpTime(const OpTime& donorOpTime); + // This is a convenience call for getRecipientOpTime which handles boost::none and nulls. + boost::optional<OpTime> _maybeGetRecipientOpTime(const boost::optional<OpTime>); + // _setRecipientOpTime must be called in optime order. + void _setRecipientOpTime(const OpTime& donorOpTime, const OpTime& recipientOpTime); + + Mutex* _getMutex() noexcept final { + return &_mutex; + } + + Mutex _mutex = MONGO_MAKE_LATCH("TenantOplogApplier::_mutex"); + // All member variables are labeled with one of the following codes indicating the + // synchronization rules for accessing them. + // + // (R) Read-only in concurrent operation; no synchronization required. + // (S) Self-synchronizing; access according to class's own rules. + // (M) Reads and writes guarded by _mutex + // (X) Access only allowed from the main flow of control called from run() or constructor. + + // Handles consuming oplog entries from the OplogBuffer for oplog application. + std::unique_ptr<TenantOplogBatcher> _oplogBatcher; // (R) + const UUID _migrationUuid; // (R) + const std::string _tenantId; // (R) + const OpTime _applyFromOpTime; // (R) + RandomAccessOplogBuffer* _oplogBuffer; // (R) + std::shared_ptr<executor::TaskExecutor> _executor; // (R) + std::unique_ptr<ThreadPool> _writerPool; // (S) + OpTimePair _lastBatchCompletedOpTimes; // (M) + std::vector<OpTimePair> _opTimeMapping; // (M) + TenantOplogBatcher::BatchLimits _limits; // (R) + std::map<OpTime, SharedPromise<OpTimePair>> _opTimeNotificationList; // (M) + Status _finalStatus = Status::OK(); // (M) +}; + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/tenant_oplog_applier_test.cpp b/src/mongo/db/repl/tenant_oplog_applier_test.cpp new file mode 100644 index 00000000000..e25ff026a18 --- /dev/null +++ b/src/mongo/db/repl/tenant_oplog_applier_test.cpp @@ -0,0 +1,280 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest + +#include "mongo/platform/basic.h" + +#include <algorithm> +#include <boost/optional/optional_io.hpp> +#include <vector> + +#include "mongo/db/op_observer_noop.h" +#include "mongo/db/op_observer_registry.h" +#include "mongo/db/repl/oplog_batcher_test_fixture.h" +#include "mongo/db/repl/replication_coordinator_mock.h" +#include "mongo/db/repl/storage_interface_impl.h" +#include "mongo/db/repl/tenant_migration_decoration.h" +#include "mongo/db/repl/tenant_oplog_applier.h" +#include "mongo/db/repl/tenant_oplog_batcher.h" +#include "mongo/db/service_context_d_test_fixture.h" +#include "mongo/db/service_context_test_fixture.h" +#include "mongo/executor/thread_pool_task_executor_test_fixture.h" +#include "mongo/logv2/log.h" +#include "mongo/unittest/log_test.h" + +namespace mongo { + +using executor::TaskExecutor; +using executor::ThreadPoolExecutorTest; + +namespace repl { + +class TenantOplogApplierTestOpObserver : public OpObserverNoop { +public: + void onInternalOpMessage(OperationContext* opCtx, + const NamespaceString& nss, + const boost::optional<UUID> uuid, + const BSONObj& msgObj, + const boost::optional<BSONObj> o2MsgObj, + const boost::optional<repl::OpTime> preImageOpTime, + const boost::optional<repl::OpTime> postImageOpTime, + const boost::optional<repl::OpTime> prevWriteOpTimeInTransaction, + const boost::optional<OplogSlot> slot) final { + MutableOplogEntry oplogEntry; + oplogEntry.setOpType(repl::OpTypeEnum::kNoop); + oplogEntry.setNss(nss); + oplogEntry.setUuid(uuid); + oplogEntry.setObject(msgObj); + oplogEntry.setObject2(o2MsgObj); + oplogEntry.setPreImageOpTime(preImageOpTime); + oplogEntry.setPostImageOpTime(postImageOpTime); + oplogEntry.setPrevWriteOpTimeInTransaction(prevWriteOpTimeInTransaction); + if (slot) { + oplogEntry.setOpTime(*slot); + } else { + oplogEntry.setOpTime(getNextOpTime(opCtx)); + } + const auto& recipientInfo = tenantMigrationRecipientInfo(opCtx); + if (recipientInfo) { + oplogEntry.setFromTenantMigration(recipientInfo->uuid); + } + stdx::lock_guard lk(_mutex); + _entries.push_back(oplogEntry); + } + + // Returns a vector of the oplog entries recorded, in optime order. + std::vector<MutableOplogEntry> getEntries() { + std::vector<MutableOplogEntry> entries; + { + stdx::lock_guard lk(_mutex); + entries = _entries; + } + std::sort(entries.begin(), + entries.end(), + [](const MutableOplogEntry& a, const MutableOplogEntry& b) { + return a.getOpTime() < b.getOpTime(); + }); + return entries; + } + +private: + mutable Mutex _mutex = MONGO_MAKE_LATCH("TenantOplogApplierTestOpObserver::_mutex"); + std::vector<MutableOplogEntry> _entries; +}; +constexpr auto dbName = "tenant_test"_sd; + +class TenantOplogApplierTest : public ServiceContextMongoDTest { +public: + void setUp() override { + ServiceContextMongoDTest::setUp(); + + // Set up an OpObserver to track the documents OplogApplierImpl inserts. + auto service = getServiceContext(); + auto opObserver = std::make_unique<TenantOplogApplierTestOpObserver>(); + _opObserver = opObserver.get(); + auto opObserverRegistry = dynamic_cast<OpObserverRegistry*>(service->getOpObserver()); + opObserverRegistry->addObserver(std::move(opObserver)); + + auto network = std::make_unique<executor::NetworkInterfaceMock>(); + _net = network.get(); + executor::ThreadPoolMock::Options thread_pool_options; + thread_pool_options.onCreateThread = [] { Client::initThread("TenantOplogApplier"); }; + _executor = makeSharedThreadPoolTestExecutor(std::move(network), thread_pool_options); + _executor->startup(); + _oplogBuffer.startup(nullptr); + + // Set up a replication coordinator and storage interface, needed for opObservers. + repl::StorageInterface::set(service, std::make_unique<repl::StorageInterfaceImpl>()); + repl::ReplicationCoordinator::set( + service, std::make_unique<repl::ReplicationCoordinatorMock>(service)); + + // Set up oplog collection. If the WT storage engine is used, the oplog collection is + // expected to exist when fetching the next opTime (LocalOplogInfo::getNextOpTimes) to use + // for a write. + auto opCtx = cc().makeOperationContext(); + repl::setOplogCollectionName(service); + repl::createOplog(opCtx.get()); + } + + void assertNoOpMatches(const OplogEntry& op, const MutableOplogEntry& noOp) { + ASSERT_BSONOBJ_EQ(op.toBSON(), noOp.getObject()); + ASSERT_EQ(op.getNss(), noOp.getNss()); + ASSERT_EQ(op.getUuid(), noOp.getUuid()); + ASSERT_EQ(_migrationUuid, noOp.getFromTenantMigration()); + } + + void pushOps(const std::vector<OplogEntry>& ops) { + std::vector<BSONObj> bsonOps; + for (const auto& op : ops) { + bsonOps.push_back(op.toBSON()); + } + _oplogBuffer.push(nullptr, bsonOps.begin(), bsonOps.end()); + } + +protected: + OplogBufferMock _oplogBuffer; + executor::NetworkInterfaceMock* _net; + std::shared_ptr<executor::ThreadPoolTaskExecutor> _executor; + std::string _tenantId = "tenant"; + UUID _migrationUuid = UUID::gen(); + TenantOplogApplierTestOpObserver* _opObserver; // Owned by service context opObserverRegistry + +private: + unittest::MinimumLoggedSeverityGuard _replicationSeverityGuard{ + logv2::LogComponent::kReplication, logv2::LogSeverity::Debug(1)}; +}; + +TEST_F(TenantOplogApplierTest, NoOpsForSingleBatch) { + std::vector<OplogEntry> srcOps; + srcOps.push_back(makeInsertOplogEntry(1, NamespaceString(dbName, "foo"))); + srcOps.push_back(makeInsertOplogEntry(2, NamespaceString(dbName, "bar"))); + pushOps(srcOps); + + TenantOplogApplier applier(_migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor); + ASSERT_OK(applier.startup()); + // Even if we wait for the first op in a batch, it is the last op we should be notified on. + auto lastBatchTimes = applier.getNotificationForOpTime(srcOps.front().getOpTime()).get(); + ASSERT_EQ(srcOps.back().getOpTime(), lastBatchTimes.donorOpTime); + auto entries = _opObserver->getEntries(); + ASSERT_EQ(2, entries.size()); + assertNoOpMatches(srcOps[0], entries[0]); + assertNoOpMatches(srcOps[1], entries[1]); + applier.shutdown(); + applier.join(); +} + +TEST_F(TenantOplogApplierTest, NoOpsForLargeBatch) { + std::vector<OplogEntry> srcOps; + // This should be big enough to use several threads to do the writing + for (int i = 0; i < 64; i++) { + srcOps.push_back(makeInsertOplogEntry(i + 1, NamespaceString(dbName, "foo"))); + } + pushOps(srcOps); + + TenantOplogApplier applier(_migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor); + ASSERT_OK(applier.startup()); + // Even if we wait for the first op in a batch, it is the last op we should be notified on. + auto lastBatchTimes = applier.getNotificationForOpTime(srcOps.front().getOpTime()).get(); + ASSERT_EQ(srcOps.back().getOpTime(), lastBatchTimes.donorOpTime); + auto entries = _opObserver->getEntries(); + ASSERT_EQ(srcOps.size(), entries.size()); + for (size_t i = 0; i < srcOps.size(); i++) { + assertNoOpMatches(srcOps[i], entries[i]); + } + applier.shutdown(); + applier.join(); +} + +TEST_F(TenantOplogApplierTest, NoOpsForMultipleBatches) { + std::vector<OplogEntry> srcOps; + srcOps.push_back(makeInsertOplogEntry(1, NamespaceString(dbName, "foo"))); + srcOps.push_back(makeInsertOplogEntry(2, NamespaceString(dbName, "bar"))); + srcOps.push_back(makeInsertOplogEntry(3, NamespaceString(dbName, "baz"))); + srcOps.push_back(makeInsertOplogEntry(4, NamespaceString(dbName, "bif"))); + + TenantOplogApplier applier(_migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor); + TenantOplogBatcher::BatchLimits smallLimits(100 * 1024 /* bytes */, 2 /*ops*/); + applier.setBatchLimits_forTest(smallLimits); + ASSERT_OK(applier.startup()); + auto firstBatchFuture = applier.getNotificationForOpTime(srcOps[0].getOpTime()); + auto secondBatchFuture = applier.getNotificationForOpTime(srcOps[2].getOpTime()); + pushOps(srcOps); + // We should see the last batch optime for each batch in our notifications. + ASSERT_EQ(srcOps[1].getOpTime(), firstBatchFuture.get().donorOpTime); + ASSERT_EQ(srcOps[3].getOpTime(), secondBatchFuture.get().donorOpTime); + auto entries = _opObserver->getEntries(); + ASSERT_EQ(4, entries.size()); + assertNoOpMatches(srcOps[0], entries[0]); + assertNoOpMatches(srcOps[1], entries[1]); + assertNoOpMatches(srcOps[2], entries[2]); + assertNoOpMatches(srcOps[3], entries[3]); + applier.shutdown(); + applier.join(); +} + +TEST_F(TenantOplogApplierTest, NoOpsForLargeTransaction) { + std::vector<OplogEntry> innerOps1; + innerOps1.push_back(makeInsertOplogEntry(11, NamespaceString(dbName, "bar"))); + innerOps1.push_back(makeInsertOplogEntry(12, NamespaceString(dbName, "bar"))); + std::vector<OplogEntry> innerOps2; + innerOps2.push_back(makeInsertOplogEntry(21, NamespaceString(dbName, "bar"))); + innerOps2.push_back(makeInsertOplogEntry(22, NamespaceString(dbName, "bar"))); + std::vector<OplogEntry> innerOps3; + innerOps3.push_back(makeInsertOplogEntry(31, NamespaceString(dbName, "bar"))); + innerOps3.push_back(makeInsertOplogEntry(32, NamespaceString(dbName, "bar"))); + + // Makes entries with ts from range [2, 5). + std::vector<OplogEntry> srcOps = makeMultiEntryTransactionOplogEntries( + 2, dbName, /* prepared */ false, {innerOps1, innerOps2, innerOps3}); + pushOps(srcOps); + + TenantOplogApplier applier(_migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor); + ASSERT_OK(applier.startup()); + // The first two ops should come in the first batch. + auto firstBatchFuture = applier.getNotificationForOpTime(srcOps[0].getOpTime()); + ASSERT_EQ(srcOps[1].getOpTime(), firstBatchFuture.get().donorOpTime); + // The last op is in a batch by itself. + auto secondBatchFuture = applier.getNotificationForOpTime(srcOps[2].getOpTime()); + ASSERT_EQ(srcOps[2].getOpTime(), secondBatchFuture.get().donorOpTime); + auto entries = _opObserver->getEntries(); + ASSERT_EQ(srcOps.size(), entries.size()); + for (size_t i = 0; i < srcOps.size(); i++) { + assertNoOpMatches(srcOps[i], entries[i]); + } + // Make sure the no-ops got linked properly. + ASSERT_EQ(OpTime(), entries[0].getPrevWriteOpTimeInTransaction()); + ASSERT_EQ(entries[0].getOpTime(), entries[1].getPrevWriteOpTimeInTransaction()); + ASSERT_EQ(entries[1].getOpTime(), entries[2].getPrevWriteOpTimeInTransaction()); + applier.shutdown(); + applier.join(); +} + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/s/config_server_op_observer.h b/src/mongo/db/s/config_server_op_observer.h index 3d9095124dd..d40eaadac68 100644 --- a/src/mongo/db/s/config_server_op_observer.h +++ b/src/mongo/db/s/config_server_op_observer.h @@ -101,7 +101,11 @@ public: const NamespaceString& nss, const boost::optional<UUID> uuid, const BSONObj& msgObj, - const boost::optional<BSONObj> o2MsgObj) override {} + const boost::optional<BSONObj> o2MsgObj, + const boost::optional<repl::OpTime> preImageOpTime, + const boost::optional<repl::OpTime> postImageOpTime, + const boost::optional<repl::OpTime> prevWriteOpTimeInTransaction, + const boost::optional<OplogSlot> slot) final{}; void onCreateCollection(OperationContext* opCtx, Collection* coll, diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp index 69474685733..fd6c4b296a4 100644 --- a/src/mongo/db/s/migration_source_manager.cpp +++ b/src/mongo/db/s/migration_source_manager.cpp @@ -640,8 +640,15 @@ void MigrationSourceManager::_notifyChangeStreamsOnRecipientFirstChunk( writeConflictRetry( _opCtx, "migrateChunkToNewShard", NamespaceString::kRsOplogNamespace.ns(), [&] { WriteUnitOfWork uow(_opCtx); - serviceContext->getOpObserver()->onInternalOpMessage( - _opCtx, getNss(), *_collectionUUID, BSON("msg" << dbgMessage), o2Message); + serviceContext->getOpObserver()->onInternalOpMessage(_opCtx, + getNss(), + *_collectionUUID, + BSON("msg" << dbgMessage), + o2Message, + boost::none, + boost::none, + boost::none, + boost::none); uow.commit(); }); } diff --git a/src/mongo/db/s/session_catalog_migration_source.cpp b/src/mongo/db/s/session_catalog_migration_source.cpp index 5d82b1d5875..fca032c80a3 100644 --- a/src/mongo/db/s/session_catalog_migration_source.cpp +++ b/src/mongo/db/s/session_catalog_migration_source.cpp @@ -163,7 +163,15 @@ SessionCatalogMigrationSource::SessionCatalogMigrationSource(OperationContext* o WriteUnitOfWork wuow(opCtx); opCtx->getClient()->getServiceContext()->getOpObserver()->onInternalOpMessage( - opCtx, _ns, {}, {}, message); + opCtx, + _ns, + {}, + {}, + message, + boost::none, + boost::none, + boost::none, + boost::none); wuow.commit(); }); } diff --git a/src/mongo/db/s/shard_server_op_observer.h b/src/mongo/db/s/shard_server_op_observer.h index 0a40af218d1..8a3beea80a9 100644 --- a/src/mongo/db/s/shard_server_op_observer.h +++ b/src/mongo/db/s/shard_server_op_observer.h @@ -99,7 +99,11 @@ public: const NamespaceString& nss, const boost::optional<UUID> uuid, const BSONObj& msgObj, - const boost::optional<BSONObj> o2MsgObj) override {} + const boost::optional<BSONObj> o2MsgObj, + const boost::optional<repl::OpTime> preImageOpTime, + const boost::optional<repl::OpTime> postImageOpTime, + const boost::optional<repl::OpTime> prevWriteOpTimeInTransaction, + const boost::optional<OplogSlot> slot) final{}; void onCreateCollection(OperationContext* opCtx, Collection* coll, |