summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCheahuychou Mao <cheahuychou.mao@mongodb.com>2020-09-10 22:06:30 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-09-11 17:16:27 +0000
commit8dfebbb8bcc197c19108daebbc46224b0ad846bf (patch)
tree6c6054dbc7a6ffff3a950805207f8a0682f13976
parentb39aecbdf43f463832739d7415d59958619b8cff (diff)
downloadmongo-8dfebbb8bcc197c19108daebbc46224b0ad846bf.tar.gz
SERVER-50872 Make the OpObserver construct TenantMigrationAccessBlocker on observing insertion for state doc with state "data sync"
-rw-r--r--src/mongo/db/op_observer_impl.cpp6
-rw-r--r--src/mongo/db/repl/tenant_migration_donor_service.cpp41
-rw-r--r--src/mongo/db/repl/tenant_migration_donor_service.h1
-rw-r--r--src/mongo/db/repl/tenant_migration_donor_util.cpp53
-rw-r--r--src/mongo/db/repl/tenant_migration_donor_util.h2
5 files changed, 60 insertions, 43 deletions
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp
index c13f444e191..b10b2c240b1 100644
--- a/src/mongo/db/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer_impl.cpp
@@ -505,6 +505,10 @@ void OpObserverImpl::onInserts(OperationContext* opCtx,
ReadWriteConcernDefaults::get(opCtx).observeDirectWriteToConfigSettings(
opCtx, it->doc["_id"], it->doc);
}
+ } else if (nss == NamespaceString::kTenantMigrationDonorsNamespace) {
+ for (auto it = first; it != last; it++) {
+ tenant_migration_donor::onWriteToDonorStateDoc(opCtx, it->doc);
+ }
}
}
@@ -574,7 +578,7 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg
ReadWriteConcernDefaults::get(opCtx).observeDirectWriteToConfigSettings(
opCtx, args.updateArgs.updatedDoc["_id"], args.updateArgs.updatedDoc);
} else if (args.nss == NamespaceString::kTenantMigrationDonorsNamespace) {
- tenant_migration_donor::onDonorStateDocUpdate(opCtx, args.updateArgs.updatedDoc);
+ tenant_migration_donor::onWriteToDonorStateDoc(opCtx, args.updateArgs.updatedDoc);
}
}
diff --git a/src/mongo/db/repl/tenant_migration_donor_service.cpp b/src/mongo/db/repl/tenant_migration_donor_service.cpp
index a07833a2caf..cb73f47a053 100644
--- a/src/mongo/db/repl/tenant_migration_donor_service.cpp
+++ b/src/mongo/db/repl/tenant_migration_donor_service.cpp
@@ -56,6 +56,12 @@ MONGO_FAIL_POINT_DEFINE(skipSendingRecipientSyncDataCommand);
const Seconds kRecipientSyncDataTimeout(30);
+std::shared_ptr<TenantMigrationAccessBlocker> getTenantMigrationAccessBlocker(
+ ServiceContext* serviceContext, StringData dbPrefix) {
+ return TenantMigrationAccessBlockerByPrefix::get(serviceContext)
+ .getTenantMigrationAccessBlockerForDbPrefix(dbPrefix);
+}
+
} // namespace
TenantMigrationDonorService::Instance::Instance(ServiceContext* serviceContext,
@@ -63,13 +69,6 @@ TenantMigrationDonorService::Instance::Instance(ServiceContext* serviceContext,
: repl::PrimaryOnlyService::TypedInstance<Instance>(), _serviceContext(serviceContext) {
_stateDoc =
TenantMigrationDonorDocument::parse(IDLParserErrorContext("initialStateDoc"), initialState);
-
- _mtab = std::make_shared<TenantMigrationAccessBlocker>(
- _serviceContext,
- tenant_migration_donor::makeTenantMigrationExecutor(_serviceContext),
- _stateDoc.getDatabasePrefix().toString());
- TenantMigrationAccessBlockerByPrefix::get(_serviceContext)
- .add(_stateDoc.getDatabasePrefix(), _mtab);
}
Status TenantMigrationDonorService::Instance::checkIfOptionsConflict(BSONObj options) {
@@ -316,7 +315,10 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run(
})
.then([this, executor] {
// Enter "blocking" state.
- _mtab->startBlockingWrites();
+ auto mtab =
+ getTenantMigrationAccessBlocker(_serviceContext, _stateDoc.getDatabasePrefix());
+ invariant(mtab);
+ mtab->startBlockingWrites();
const auto opTime = _updateStateDocument(TenantMigrationDonorStateEnum::kBlocking);
return _waitForMajorityWriteConcern(executor, std::move(opTime));
})
@@ -344,15 +346,22 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run(
.then([this] {
// Enter "commit" state.
_updateStateDocument(TenantMigrationDonorStateEnum::kCommitted);
+ auto mtab =
+ getTenantMigrationAccessBlocker(_serviceContext, _stateDoc.getDatabasePrefix());
+ invariant(mtab);
+ return mtab->onCompletion();
})
- .onError([this](Status status) {
- // Enter "abort" state.
- _abortReason.emplace(status);
- _updateStateDocument(TenantMigrationDonorStateEnum::kAborted);
- })
- .then([this] {
- // Wait for the migration to commit or abort.
- return _mtab->onCompletion();
+ .onError([this, executor](Status status) {
+ auto mtab =
+ getTenantMigrationAccessBlocker(_serviceContext, _stateDoc.getDatabasePrefix());
+ if (status == ErrorCodes::ConflictingOperationInProgress || !mtab) {
+ return SharedSemiFuture<void>(status);
+ } else {
+ // Enter "abort" state.
+ _abortReason.emplace(status);
+ _updateStateDocument(TenantMigrationDonorStateEnum::kAborted);
+ return mtab->onCompletion();
+ }
})
.onCompletion([this](Status status) {
LOGV2(5006601,
diff --git a/src/mongo/db/repl/tenant_migration_donor_service.h b/src/mongo/db/repl/tenant_migration_donor_service.h
index 6d1da3ac6d4..6234a4b4619 100644
--- a/src/mongo/db/repl/tenant_migration_donor_service.h
+++ b/src/mongo/db/repl/tenant_migration_donor_service.h
@@ -146,7 +146,6 @@ public:
TenantMigrationDonorDocument _stateDoc;
- std::shared_ptr<TenantMigrationAccessBlocker> _mtab;
boost::optional<Status> _abortReason;
// Promise that is resolved when the donor has majority-committed the migration decision.
diff --git a/src/mongo/db/repl/tenant_migration_donor_util.cpp b/src/mongo/db/repl/tenant_migration_donor_util.cpp
index edae74ab4ff..906655a12f1 100644
--- a/src/mongo/db/repl/tenant_migration_donor_util.cpp
+++ b/src/mongo/db/repl/tenant_migration_donor_util.cpp
@@ -55,6 +55,21 @@ const char kPoolName[] = "TenantMigrationWorkerThreadPool";
const char kNetName[] = "TenantMigrationWorkerNetwork";
/**
+ * Updates the TenantMigrationAccessBlocker when the tenant migration transitions to the data sync
+ * state.
+ */
+void onTransitionToDataSync(OperationContext* opCtx,
+ const TenantMigrationDonorDocument& donorStateDoc) {
+ invariant(donorStateDoc.getState() == TenantMigrationDonorStateEnum::kDataSync);
+ auto mtab = std::make_shared<TenantMigrationAccessBlocker>(
+ opCtx->getServiceContext(),
+ tenant_migration_donor::makeTenantMigrationExecutor(opCtx->getServiceContext()),
+ donorStateDoc.getDatabasePrefix().toString());
+ TenantMigrationAccessBlockerByPrefix::get(opCtx->getServiceContext())
+ .add(donorStateDoc.getDatabasePrefix(), mtab);
+}
+
+/**
* Updates the TenantMigrationAccessBlocker when the tenant migration transitions to the blocking
* state.
*/
@@ -63,26 +78,17 @@ void onTransitionToBlocking(OperationContext* opCtx,
invariant(donorStateDoc.getState() == TenantMigrationDonorStateEnum::kBlocking);
invariant(donorStateDoc.getBlockTimestamp());
- auto& mtabByPrefix = TenantMigrationAccessBlockerByPrefix::get(opCtx->getServiceContext());
- auto mtab =
- mtabByPrefix.getTenantMigrationAccessBlockerForDbPrefix(donorStateDoc.getDatabasePrefix());
+ auto mtab = TenantMigrationAccessBlockerByPrefix::get(opCtx->getServiceContext())
+ .getTenantMigrationAccessBlockerForDbPrefix(donorStateDoc.getDatabasePrefix());
+ invariant(mtab);
if (!opCtx->writesAreReplicated()) {
- // A primary must create the TenantMigrationAccessBlocker and call startBlockingWrites on it
- // before reserving the OpTime for the "start blocking" write, so only secondaries create
- // the TenantMigrationAccessBlocker and call startBlockingWrites on it in the op observer.
- invariant(!mtab);
-
- mtab = std::make_shared<TenantMigrationAccessBlocker>(
- opCtx->getServiceContext(),
- tenant_migration_donor::makeTenantMigrationExecutor(opCtx->getServiceContext()),
- donorStateDoc.getDatabasePrefix().toString());
- mtabByPrefix.add(donorStateDoc.getDatabasePrefix(), mtab);
+ // A primary must call startBlockingWrites on the TenantMigrationAccessBlocker before
+ // reserving the OpTime for the "start blocking" write, so only secondaries call
+ // startBlockingWrites on the TenantMigrationAccessBlocker in the op observer.
mtab->startBlockingWrites();
}
- invariant(mtab);
-
// Both primaries and secondaries call startBlockingReadsAfter in the op observer, since
// startBlockingReadsAfter just needs to be called before the "start blocking" write's oplog
// hole is filled.
@@ -97,9 +103,8 @@ void onTransitionToCommitted(OperationContext* opCtx,
invariant(donorStateDoc.getState() == TenantMigrationDonorStateEnum::kCommitted);
invariant(donorStateDoc.getCommitOrAbortOpTime());
- auto& mtabByPrefix = TenantMigrationAccessBlockerByPrefix::get(opCtx->getServiceContext());
- auto mtab =
- mtabByPrefix.getTenantMigrationAccessBlockerForDbPrefix(donorStateDoc.getDatabasePrefix());
+ auto mtab = TenantMigrationAccessBlockerByPrefix::get(opCtx->getServiceContext())
+ .getTenantMigrationAccessBlockerForDbPrefix(donorStateDoc.getDatabasePrefix());
invariant(mtab);
mtab->commit(donorStateDoc.getCommitOrAbortOpTime().get());
}
@@ -112,9 +117,8 @@ void onTransitionToAborted(OperationContext* opCtx,
invariant(donorStateDoc.getState() == TenantMigrationDonorStateEnum::kAborted);
invariant(donorStateDoc.getCommitOrAbortOpTime());
- auto& mtabByPrefix = TenantMigrationAccessBlockerByPrefix::get(opCtx->getServiceContext());
- auto mtab =
- mtabByPrefix.getTenantMigrationAccessBlockerForDbPrefix(donorStateDoc.getDatabasePrefix());
+ auto mtab = TenantMigrationAccessBlockerByPrefix::get(opCtx->getServiceContext())
+ .getTenantMigrationAccessBlockerForDbPrefix(donorStateDoc.getDatabasePrefix());
invariant(mtab);
mtab->abort(donorStateDoc.getCommitOrAbortOpTime().get());
}
@@ -133,7 +137,7 @@ std::unique_ptr<executor::TaskExecutor> makeTenantMigrationExecutor(
executor::makeNetworkInterface(kNetName, nullptr, nullptr));
}
-void onDonorStateDocUpdate(OperationContext* opCtx, const BSONObj& donorStateDocBson) {
+void onWriteToDonorStateDoc(OperationContext* opCtx, const BSONObj& donorStateDocBson) {
auto donorStateDoc = TenantMigrationDonorDocument::parse(IDLParserErrorContext("donorStateDoc"),
donorStateDocBson);
if (donorStateDoc.getExpireAt()) {
@@ -142,6 +146,7 @@ void onDonorStateDocUpdate(OperationContext* opCtx, const BSONObj& donorStateDoc
} else {
switch (donorStateDoc.getState()) {
case TenantMigrationDonorStateEnum::kDataSync:
+ onTransitionToDataSync(opCtx, donorStateDoc);
break;
case TenantMigrationDonorStateEnum::kBlocking:
onTransitionToBlocking(opCtx, donorStateDoc);
@@ -196,8 +201,8 @@ void checkIfLinearizableReadWasAllowedOrThrow(OperationContext* opCtx, StringDat
}
void onWriteToDatabase(OperationContext* opCtx, StringData dbName) {
- auto& mtabByPrefix = TenantMigrationAccessBlockerByPrefix::get(opCtx->getServiceContext());
- auto mtab = mtabByPrefix.getTenantMigrationAccessBlockerForDbName(dbName);
+ auto mtab = TenantMigrationAccessBlockerByPrefix::get(opCtx->getServiceContext())
+ .getTenantMigrationAccessBlockerForDbName(dbName);
if (mtab) {
mtab->checkIfCanWriteOrThrow();
diff --git a/src/mongo/db/repl/tenant_migration_donor_util.h b/src/mongo/db/repl/tenant_migration_donor_util.h
index 4d3376fe170..6c47c594621 100644
--- a/src/mongo/db/repl/tenant_migration_donor_util.h
+++ b/src/mongo/db/repl/tenant_migration_donor_util.h
@@ -51,7 +51,7 @@ std::unique_ptr<executor::TaskExecutor> makeTenantMigrationExecutor(ServiceConte
/**
* Updates the donor's in-memory migration state to reflect the given persisted state.
*/
-void onDonorStateDocUpdate(OperationContext* opCtx, const BSONObj& donorStateDocBson);
+void onWriteToDonorStateDoc(OperationContext* opCtx, const BSONObj& donorStateDocBson);
/**
* If the operation has read concern "snapshot" or includes afterClusterTime, and the database is