summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJason Chan <jason.chan@mongodb.com>2021-04-20 21:42:18 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-04-21 16:54:23 +0000
commitd45413a73b86ccedf2e6748e52c5bb95f9e1b5cb (patch)
tree44dbc5c30017fbf7c193b285f48fabd2744d9545
parentb1f65c29872abfe0111559a8be6ff209ecf18ade (diff)
downloadmongo-d45413a73b86ccedf2e6748e52c5bb95f9e1b5cb.tar.gz
SERVER-55380 Serialize oplog application of writes to the tenant migration namespace
-rw-r--r--src/mongo/db/repl/oplog_applier_impl.cpp16
-rw-r--r--src/mongo/db/repl/oplog_applier_impl_test.cpp54
2 files changed, 66 insertions, 4 deletions
diff --git a/src/mongo/db/repl/oplog_applier_impl.cpp b/src/mongo/db/repl/oplog_applier_impl.cpp
index 5e7e7d43e78..6a4d08d8c37 100644
--- a/src/mongo/db/repl/oplog_applier_impl.cpp
+++ b/src/mongo/db/repl/oplog_applier_impl.cpp
@@ -616,6 +616,9 @@ void OplogApplierImpl::_deriveOpsAndFillWriterVectors(
LogicalSessionIdMap<std::vector<OplogEntry*>> partialTxnOps;
CachedCollectionProperties collPropertiesCache;
+
+ // Used to serialize writes to the tenant migrations donor and recipient namespaces.
+ boost::optional<uint32_t> tenantMigrationsWriterId;
for (auto&& op : *ops) {
// If the operation's optime is before or the same as the beginApplyingOpTime we don't want
// to apply it, so don't include it in writerVectors.
@@ -697,6 +700,19 @@ void OplogApplierImpl::_deriveOpsAndFillWriterVectors(
continue;
}
+ // Writes to the tenant migration namespaces must be serialized to preserve the order of
+ // migration and access blocker states.
+ if (op.getNss() == NamespaceString::kTenantMigrationDonorsNamespace ||
+ op.getNss() == NamespaceString::kTenantMigrationRecipientsNamespace) {
+ auto writerId = OplogApplierUtils::addToWriterVector(
+ opCtx, &op, writerVectors, &collPropertiesCache, tenantMigrationsWriterId);
+ if (!tenantMigrationsWriterId) {
+ tenantMigrationsWriterId.emplace(writerId);
+ } else {
+ invariant(writerId == *tenantMigrationsWriterId);
+ }
+ continue;
+ }
OplogApplierUtils::addToWriterVector(opCtx, &op, writerVectors, &collPropertiesCache);
}
}
diff --git a/src/mongo/db/repl/oplog_applier_impl_test.cpp b/src/mongo/db/repl/oplog_applier_impl_test.cpp
index bf06beebfdb..90c142b60a4 100644
--- a/src/mongo/db/repl/oplog_applier_impl_test.cpp
+++ b/src/mongo/db/repl/oplog_applier_impl_test.cpp
@@ -375,15 +375,25 @@ public:
Status applyOplogBatchPerWorker(OperationContext* opCtx,
std::vector<const OplogEntry*>* ops,
WorkerMultikeyPathInfo* workerMultikeyPathInfo) override;
- std::vector<OplogEntry> operationsApplied;
+
+ std::vector<OplogEntry> getOperationsApplied() {
+ stdx::lock_guard lk(_mutex);
+ return _operationsApplied;
+ }
+
+private:
+ std::vector<OplogEntry> _operationsApplied;
+ // Synchronize reads and writes to 'operationsApplied'.
+ Mutex _mutex = MONGO_MAKE_LATCH("TrackOpsAppliedApplier::_mutex");
};
Status TrackOpsAppliedApplier::applyOplogBatchPerWorker(
OperationContext* opCtx,
std::vector<const OplogEntry*>* ops,
WorkerMultikeyPathInfo* workerMultikeyPathInfo) {
+ stdx::lock_guard lk(_mutex);
for (auto&& opPtr : *ops) {
- operationsApplied.push_back(*opPtr);
+ _operationsApplied.push_back(*opPtr);
}
return Status::OK();
}
@@ -428,8 +438,9 @@ bool _testOplogEntryIsForCappedCollection(OperationContext* opCtx,
auto lastOpTime = unittest::assertGet(oplogApplier.applyOplogBatch(opCtx, {op}));
ASSERT_EQUALS(op.getOpTime(), lastOpTime);
- ASSERT_EQUALS(1U, oplogApplier.operationsApplied.size());
- const auto& opApplied = oplogApplier.operationsApplied.front();
+ const auto opsApplied = oplogApplier.getOperationsApplied();
+ ASSERT_EQUALS(1U, opsApplied.size());
+ const auto& opApplied = opsApplied.front();
ASSERT_EQUALS(op.getEntry(), opApplied.getEntry());
// "isForCappedCollection" is not parsed from raw oplog entry document.
return opApplied.isForCappedCollection();
@@ -2401,6 +2412,41 @@ TEST_F(OplogApplierImplTest, DoNotLogNonSlowOpApplicationWhenSuccessful) {
<< applyDuration << "ms";
ASSERT_EQUALS(0, countTextFormatLogLinesContaining(expected.str()));
}
+
+TEST_F(OplogApplierImplTest, SerializeOplogApplicationOfWritesToTenantMigrationNamespaces) {
+ auto writerPool = makeReplWriterPool();
+ NoopOplogApplierObserver observer;
+ TrackOpsAppliedApplier oplogApplier(
+ nullptr, // executor
+ nullptr, // oplogBuffer
+ &observer,
+ ReplicationCoordinator::get(_opCtx.get()),
+ getConsistencyMarkers(),
+ getStorageInterface(),
+ repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary),
+ writerPool.get());
+
+ const auto donorNss = NamespaceString::kTenantMigrationDonorsNamespace;
+ const auto recipientNss = NamespaceString::kTenantMigrationRecipientsNamespace;
+
+ std::vector<OplogEntry> opsToApply;
+ opsToApply.push_back(makeOplogEntry(OpTypeEnum::kDelete, donorNss, {}, BSON("_id" << 2)));
+ opsToApply.push_back(makeInsertDocumentOplogEntry(
+ {Timestamp(Seconds(3), 0), 1LL}, recipientNss, BSON("_id" << 3)));
+ opsToApply.push_back(makeOplogEntry(OpTypeEnum::kDelete, recipientNss, {}, BSON("_id" << 3)));
+ opsToApply.push_back(
+ makeInsertDocumentOplogEntry({Timestamp(Seconds(4), 0), 1LL}, donorNss, BSON("_id" << 4)));
+
+ ASSERT_OK(oplogApplier.applyOplogBatch(_opCtx.get(), opsToApply));
+ const auto applied = oplogApplier.getOperationsApplied();
+ ASSERT_EQ(4U, applied.size());
+ ASSERT_BSONOBJ_EQ(opsToApply[0].getEntry().toBSON(), applied[0].getEntry().toBSON());
+ ASSERT_BSONOBJ_EQ(opsToApply[1].getEntry().toBSON(), applied[1].getEntry().toBSON());
+ ASSERT_BSONOBJ_EQ(opsToApply[2].getEntry().toBSON(), applied[2].getEntry().toBSON());
+ ASSERT_BSONOBJ_EQ(opsToApply[3].getEntry().toBSON(), applied[3].getEntry().toBSON());
+}
+
+
class OplogApplierImplTxnTableTest : public OplogApplierImplTest {
public:
void setUp() override {