diff options
-rw-r--r-- | src/mongo/db/repl/tenant_migration_recipient_service.cpp | 46 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_migration_recipient_service_test.cpp | 3 |
2 files changed, 39 insertions, 10 deletions
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp index 9c605bedcfb..866765e0dc8 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp @@ -113,9 +113,12 @@ boost::intrusive_ptr<ExpressionContext> makeExpressionContext(OperationContext* boost::none); /* collUUID */ } +// We allow retrying on the following oplog fetcher errors: +// 1) InvalidSyncSource - we cannot sync from the chosen sync source, potentially because the sync +// source is too stale or there was a network error when connecting to the sync source. +// 2) ShudownInProgress - the current sync source is shutting down bool isRetriableOplogFetcherError(Status oplogFetcherStatus) { return oplogFetcherStatus == ErrorCodes::InvalidSyncSource || - oplogFetcherStatus == ErrorCodes::TooStaleToSyncFromSource || oplogFetcherStatus == ErrorCodes::ShutdownInProgress; } @@ -1209,22 +1212,45 @@ TenantMigrationRecipientService::Instance::_fetchRetryableWritesOplogBeforeStart void TenantMigrationRecipientService::Instance::_startOplogFetcher() { auto opCtx = cc().makeOperationContext(); - stdx::unique_lock lk(_mutex); - - _dataReplicatorExternalState = std::make_unique<DataReplicatorExternalStateTenantMigration>(); - auto startFetchOpTime = *_stateDoc.getStartFetchingDonorOpTime(); + OpTime startFetchOpTime; auto resumingFromOplogBuffer = false; + + { + stdx::lock_guard lk(_mutex); + _dataReplicatorExternalState = + std::make_unique<DataReplicatorExternalStateTenantMigration>(); + startFetchOpTime = *_stateDoc.getStartFetchingDonorOpTime(); + } + if (_sharedData->isResuming()) { - // Release the mutex lock since we acquire a collection mode IS lock when checking the last - // object pushed in the oplog buffer. - lk.unlock(); - // If the oplog buffer already contains fetched documents, we must be resuming a migration. + // If the oplog buffer already contains fetched documents, we must be resuming a + // migration. if (auto topOfOplogBuffer = _donorOplogBuffer->lastObjectPushed(opCtx.get())) { startFetchOpTime = uassertStatusOK(OpTime::parseFromOplogEntry(topOfOplogBuffer.get())); resumingFromOplogBuffer = true; } - lk.lock(); } + + const auto donorMajorityOpTime = _getDonorMajorityOpTime(_oplogFetcherClient); + if (donorMajorityOpTime < startFetchOpTime) { + LOGV2_ERROR(5535800, + "Donor sync source's majority OpTime is behind our startFetchOpTime", + "migrationId"_attr = getMigrationUUID(), + "tenantId"_attr = getTenantId(), + "donorMajorityOpTime"_attr = donorMajorityOpTime, + "startFetchOpTime"_attr = startFetchOpTime); + const auto now = getGlobalServiceContext()->getFastClockSource()->now(); + + stdx::lock_guard lk(_mutex); + _excludeDonorHost(lk, + _oplogFetcherClient->getServerHostAndPort(), + now + Milliseconds(tenantMigrationExcludeDonorHostTimeoutMS)); + uasserted(ErrorCodes::InvalidSyncSource, + "Donor sync source's majority OpTime is behind our startFetchOpTime, retrying " + "sync source selection"); + } + + stdx::lock_guard lk(_mutex); OplogFetcher::Config oplogFetcherConfig( startFetchOpTime, _oplogFetcherClient->getServerHostAndPort(), diff --git a/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp b/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp index d3eb14fb9b0..d07efa4226a 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp @@ -1741,6 +1741,9 @@ TEST_F(TenantMigrationRecipientServiceTest, OplogFetcherResumesFromTopOfOplogBuf hangBeforeFetcherFp->waitForTimesEntered(initialTimesEntered + 1); + const OpTime updatedOpTime(Timestamp(3, 1), 1); + insertTopOfOplog(&replSet, updatedOpTime); + const auto oplogBuffer = getDonorOplogBuffer(instance.get()); OplogBuffer::Batch batch1; const OpTime resumeOpTime(Timestamp(2, 1), initialOpTime.getTerm()); |