diff options
author | Jason Zhang <jason.zhang@mongodb.com> | 2021-03-02 20:47:41 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-03-04 17:12:22 +0000 |
commit | 3ddc43645c38364227f602135bdb0eb46167948a (patch) | |
tree | e167b896b3311e26ce122fc0e03bff0a73a1897b | |
parent | a9eb3437044734336d8ca014dffe30fe46e7077b (diff) | |
download | mongo-3ddc43645c38364227f602135bdb0eb46167948a.tar.gz |
SERVER-54841 Use AsyncTry in _fetchAndStoreRecipientClusterTimeKeyDocs
-rw-r--r-- | src/mongo/db/repl/tenant_migration_donor_service.cpp | 173 |
1 files changed, 99 insertions, 74 deletions
diff --git a/src/mongo/db/repl/tenant_migration_donor_service.cpp b/src/mongo/db/repl/tenant_migration_donor_service.cpp index 269a6b74569..80e304c9cd7 100644 --- a/src/mongo/db/repl/tenant_migration_donor_service.cpp +++ b/src/mongo/db/repl/tenant_migration_donor_service.cpp @@ -91,6 +91,13 @@ bool shouldStopSendingRecipientCommand(Status status, const CancelationToken& to return status.isOK() || !ErrorCodes::isRetriableError(status) || token.isCanceled(); } +bool shouldStopFetchingRecipientClusterTimeKeyDocs(Status status, const CancelationToken& token) { + // TODO (SERVER-54926): Convert HostUnreachable error in + // _fetchAndStoreRecipientClusterTimeKeyDocs to specific error. + return status.isOK() || !ErrorCodes::isRetriableError(status) || + status.code() == ErrorCodes::HostUnreachable || token.isCanceled(); +} + void checkIfReceivedDonorAbortMigration(const CancelationToken& serviceToken, const CancelationToken& instanceToken) { // If only the instance token was canceled, then we must have gotten donorAbortMigration. @@ -369,87 +376,105 @@ TenantMigrationDonorService::Instance::_fetchAndStoreRecipientClusterTimeKeyDocs std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS, const CancelationToken& serviceToken, const CancelationToken& instanceToken) { - return recipientTargeterRS->findHost(kPrimaryOnlyReadPreference, instanceToken) - .thenRunOn(**executor) - .then([this, self = shared_from_this(), executor](HostAndPort host) { - const auto nss = NamespaceString::kKeysCollectionNamespace; - - const auto cmdObj = [&] { - FindCommand request(NamespaceStringOrUUID{nss}); - request.setReadConcern( - repl::ReadConcernArgs(repl::ReadConcernLevel::kMajorityReadConcern) - .toBSONInner()); - return request.toBSON(BSONObj()); - }(); - - std::vector<ExternalKeysCollectionDocument> keyDocs; - boost::optional<Status> fetchStatus; - - auto fetcherCallback = [this, self = shared_from_this(), &keyDocs, &fetchStatus]( - const Fetcher::QueryResponseStatus& dataStatus, - Fetcher::NextAction* nextAction, - BSONObjBuilder* getMoreBob) { - // Throw out any accumulated results on error - if (!dataStatus.isOK()) { - fetchStatus = dataStatus.getStatus(); - keyDocs.clear(); - return; - } - - const auto& data = dataStatus.getValue(); - for (const BSONObj& doc : data.documents) { - keyDocs.push_back(tenant_migration_util::makeExternalClusterTimeKeyDoc( - _stateDoc.getId(), doc.getOwned())); - } - fetchStatus = Status::OK(); - if (!getMoreBob) { - return; - } - getMoreBob->append("getMore", data.cursorId); - getMoreBob->append("collection", data.nss.coll()); - }; - - auto fetcher = std::make_shared<Fetcher>( - _recipientCmdExecutor.get(), - host, - nss.db().toString(), - cmdObj, - fetcherCallback, - kPrimaryOnlyReadPreference.toContainingBSON(), - executor::RemoteCommandRequest::kNoTimeout, /* findNetworkTimeout */ - executor::RemoteCommandRequest::kNoTimeout, /* getMoreNetworkTimeout */ - RemoteCommandRetryScheduler::makeRetryPolicy<ErrorCategory::RetriableError>( - kMaxRecipientKeyDocsFindAttempts, executor::RemoteCommandRequest::kNoTimeout), - _sslMode); - uassertStatusOK(fetcher->schedule()); + return AsyncTry([this, + self = shared_from_this(), + executor, + recipientTargeterRS, + serviceToken, + instanceToken] { + return recipientTargeterRS->findHost(kPrimaryOnlyReadPreference, instanceToken) + .thenRunOn(**executor) + .then([this, self = shared_from_this(), executor](HostAndPort host) { + const auto nss = NamespaceString::kKeysCollectionNamespace; + + const auto cmdObj = [&] { + FindCommand request(NamespaceStringOrUUID{nss}); + request.setReadConcern( + repl::ReadConcernArgs(repl::ReadConcernLevel::kMajorityReadConcern) + .toBSONInner()); + return request.toBSON(BSONObj()); + }(); + + std::vector<ExternalKeysCollectionDocument> keyDocs; + boost::optional<Status> fetchStatus; + + auto fetcherCallback = + [this, self = shared_from_this(), &keyDocs, &fetchStatus]( + const Fetcher::QueryResponseStatus& dataStatus, + Fetcher::NextAction* nextAction, + BSONObjBuilder* getMoreBob) { + // Throw out any accumulated results on error + if (!dataStatus.isOK()) { + fetchStatus = dataStatus.getStatus(); + keyDocs.clear(); + return; + } + + const auto& data = dataStatus.getValue(); + for (const BSONObj& doc : data.documents) { + keyDocs.push_back( + tenant_migration_util::makeExternalClusterTimeKeyDoc( + _stateDoc.getId(), doc.getOwned())); + } + fetchStatus = Status::OK(); + + if (!getMoreBob) { + return; + } + getMoreBob->append("getMore", data.cursorId); + getMoreBob->append("collection", data.nss.coll()); + }; + + auto fetcher = std::make_shared<Fetcher>( + _recipientCmdExecutor.get(), + host, + nss.db().toString(), + cmdObj, + fetcherCallback, + kPrimaryOnlyReadPreference.toContainingBSON(), + executor::RemoteCommandRequest::kNoTimeout, /* findNetworkTimeout */ + executor::RemoteCommandRequest::kNoTimeout, /* getMoreNetworkTimeout */ + RemoteCommandRetryScheduler::makeRetryPolicy< + ErrorCategory::RetriableError>( + kMaxRecipientKeyDocsFindAttempts, + executor::RemoteCommandRequest::kNoTimeout), + _sslMode); + uassertStatusOK(fetcher->schedule()); + + { + stdx::lock_guard<Latch> lg(_mutex); + _recipientKeysFetcher = fetcher; + } - { - stdx::lock_guard<Latch> lg(_mutex); - _recipientKeysFetcher = fetcher; - } + fetcher->join(); - fetcher->join(); + { + stdx::lock_guard<Latch> lg(_mutex); + _recipientKeysFetcher.reset(); + } - { - stdx::lock_guard<Latch> lg(_mutex); - _recipientKeysFetcher.reset(); - } + if (!fetchStatus) { + // The callback never got invoked. + uasserted(5340400, "Internal error running cursor callback in command"); + } + uassertStatusOK(fetchStatus.get()); - if (!fetchStatus) { - // The callback never got invoked. - uasserted(5340400, "Internal error running cursor callback in command"); - } - uassertStatusOK(fetchStatus.get()); + return keyDocs; + }) + .then([this, self = shared_from_this(), executor, serviceToken, instanceToken]( + auto keyDocs) { + checkIfReceivedDonorAbortMigration(serviceToken, instanceToken); - return keyDocs; + tenant_migration_util::storeExternalClusterTimeKeyDocs(executor, + std::move(keyDocs)); + }); + }) + .until([instanceToken](Status status) { + return shouldStopFetchingRecipientClusterTimeKeyDocs(status, instanceToken); }) - .then([this, self = shared_from_this(), executor, serviceToken, instanceToken]( - auto keyDocs) { - checkIfReceivedDonorAbortMigration(serviceToken, instanceToken); - - tenant_migration_util::storeExternalClusterTimeKeyDocs(executor, std::move(keyDocs)); - }); + .withBackoffBetweenIterations(kExponentialBackoff) + .on(**executor, CancelationToken::uncancelable()); } ExecutorFuture<repl::OpTime> TenantMigrationDonorService::Instance::_insertStateDoc( |