diff options
author | Jason Chan <jason.chan@mongodb.com> | 2021-04-20 21:42:18 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-04-21 16:54:23 +0000 |
commit | d45413a73b86ccedf2e6748e52c5bb95f9e1b5cb (patch) | |
tree | 44dbc5c30017fbf7c193b285f48fabd2744d9545 | |
parent | b1f65c29872abfe0111559a8be6ff209ecf18ade (diff) | |
download | mongo-d45413a73b86ccedf2e6748e52c5bb95f9e1b5cb.tar.gz |
SERVER-55380 Serialize oplog application of writes to the tenant migration namespace
-rw-r--r-- | src/mongo/db/repl/oplog_applier_impl.cpp | 16 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_applier_impl_test.cpp | 54 |
2 files changed, 66 insertions, 4 deletions
diff --git a/src/mongo/db/repl/oplog_applier_impl.cpp b/src/mongo/db/repl/oplog_applier_impl.cpp index 5e7e7d43e78..6a4d08d8c37 100644 --- a/src/mongo/db/repl/oplog_applier_impl.cpp +++ b/src/mongo/db/repl/oplog_applier_impl.cpp @@ -616,6 +616,9 @@ void OplogApplierImpl::_deriveOpsAndFillWriterVectors( LogicalSessionIdMap<std::vector<OplogEntry*>> partialTxnOps; CachedCollectionProperties collPropertiesCache; + + // Used to serialize writes to the tenant migrations donor and recipient namespaces. + boost::optional<uint32_t> tenantMigrationsWriterId; for (auto&& op : *ops) { // If the operation's optime is before or the same as the beginApplyingOpTime we don't want // to apply it, so don't include it in writerVectors. @@ -697,6 +700,19 @@ void OplogApplierImpl::_deriveOpsAndFillWriterVectors( continue; } + // Writes to the tenant migration namespaces must be serialized to preserve the order of + // migration and access blocker states. + if (op.getNss() == NamespaceString::kTenantMigrationDonorsNamespace || + op.getNss() == NamespaceString::kTenantMigrationRecipientsNamespace) { + auto writerId = OplogApplierUtils::addToWriterVector( + opCtx, &op, writerVectors, &collPropertiesCache, tenantMigrationsWriterId); + if (!tenantMigrationsWriterId) { + tenantMigrationsWriterId.emplace(writerId); + } else { + invariant(writerId == *tenantMigrationsWriterId); + } + continue; + } OplogApplierUtils::addToWriterVector(opCtx, &op, writerVectors, &collPropertiesCache); } } diff --git a/src/mongo/db/repl/oplog_applier_impl_test.cpp b/src/mongo/db/repl/oplog_applier_impl_test.cpp index bf06beebfdb..90c142b60a4 100644 --- a/src/mongo/db/repl/oplog_applier_impl_test.cpp +++ b/src/mongo/db/repl/oplog_applier_impl_test.cpp @@ -375,15 +375,25 @@ public: Status applyOplogBatchPerWorker(OperationContext* opCtx, std::vector<const OplogEntry*>* ops, WorkerMultikeyPathInfo* workerMultikeyPathInfo) override; - std::vector<OplogEntry> operationsApplied; + + std::vector<OplogEntry> getOperationsApplied() { + stdx::lock_guard lk(_mutex); + return _operationsApplied; + } + +private: + std::vector<OplogEntry> _operationsApplied; + // Synchronize reads and writes to 'operationsApplied'. + Mutex _mutex = MONGO_MAKE_LATCH("TrackOpsAppliedApplier::_mutex"); }; Status TrackOpsAppliedApplier::applyOplogBatchPerWorker( OperationContext* opCtx, std::vector<const OplogEntry*>* ops, WorkerMultikeyPathInfo* workerMultikeyPathInfo) { + stdx::lock_guard lk(_mutex); for (auto&& opPtr : *ops) { - operationsApplied.push_back(*opPtr); + _operationsApplied.push_back(*opPtr); } return Status::OK(); } @@ -428,8 +438,9 @@ bool _testOplogEntryIsForCappedCollection(OperationContext* opCtx, auto lastOpTime = unittest::assertGet(oplogApplier.applyOplogBatch(opCtx, {op})); ASSERT_EQUALS(op.getOpTime(), lastOpTime); - ASSERT_EQUALS(1U, oplogApplier.operationsApplied.size()); - const auto& opApplied = oplogApplier.operationsApplied.front(); + const auto opsApplied = oplogApplier.getOperationsApplied(); + ASSERT_EQUALS(1U, opsApplied.size()); + const auto& opApplied = opsApplied.front(); ASSERT_EQUALS(op.getEntry(), opApplied.getEntry()); // "isForCappedCollection" is not parsed from raw oplog entry document. return opApplied.isForCappedCollection(); @@ -2401,6 +2412,41 @@ TEST_F(OplogApplierImplTest, DoNotLogNonSlowOpApplicationWhenSuccessful) { << applyDuration << "ms"; ASSERT_EQUALS(0, countTextFormatLogLinesContaining(expected.str())); } + +TEST_F(OplogApplierImplTest, SerializeOplogApplicationOfWritesToTenantMigrationNamespaces) { + auto writerPool = makeReplWriterPool(); + NoopOplogApplierObserver observer; + TrackOpsAppliedApplier oplogApplier( + nullptr, // executor + nullptr, // oplogBuffer + &observer, + ReplicationCoordinator::get(_opCtx.get()), + getConsistencyMarkers(), + getStorageInterface(), + repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary), + writerPool.get()); + + const auto donorNss = NamespaceString::kTenantMigrationDonorsNamespace; + const auto recipientNss = NamespaceString::kTenantMigrationRecipientsNamespace; + + std::vector<OplogEntry> opsToApply; + opsToApply.push_back(makeOplogEntry(OpTypeEnum::kDelete, donorNss, {}, BSON("_id" << 2))); + opsToApply.push_back(makeInsertDocumentOplogEntry( + {Timestamp(Seconds(3), 0), 1LL}, recipientNss, BSON("_id" << 3))); + opsToApply.push_back(makeOplogEntry(OpTypeEnum::kDelete, recipientNss, {}, BSON("_id" << 3))); + opsToApply.push_back( + makeInsertDocumentOplogEntry({Timestamp(Seconds(4), 0), 1LL}, donorNss, BSON("_id" << 4))); + + ASSERT_OK(oplogApplier.applyOplogBatch(_opCtx.get(), opsToApply)); + const auto applied = oplogApplier.getOperationsApplied(); + ASSERT_EQ(4U, applied.size()); + ASSERT_BSONOBJ_EQ(opsToApply[0].getEntry().toBSON(), applied[0].getEntry().toBSON()); + ASSERT_BSONOBJ_EQ(opsToApply[1].getEntry().toBSON(), applied[1].getEntry().toBSON()); + ASSERT_BSONOBJ_EQ(opsToApply[2].getEntry().toBSON(), applied[2].getEntry().toBSON()); + ASSERT_BSONOBJ_EQ(opsToApply[3].getEntry().toBSON(), applied[3].getEntry().toBSON()); +} + + class OplogApplierImplTxnTableTest : public OplogApplierImplTest { public: void setUp() override { |