summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJason Zhang <jason.zhang@mongodb.com>2021-03-02 20:47:41 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-03-04 17:12:22 +0000
commit3ddc43645c38364227f602135bdb0eb46167948a (patch)
treee167b896b3311e26ce122fc0e03bff0a73a1897b
parenta9eb3437044734336d8ca014dffe30fe46e7077b (diff)
downloadmongo-3ddc43645c38364227f602135bdb0eb46167948a.tar.gz
SERVER-54841 Use AsyncTry in _fetchAndStoreRecipientClusterTimeKeyDocs
-rw-r--r--src/mongo/db/repl/tenant_migration_donor_service.cpp173
1 files changed, 99 insertions, 74 deletions
diff --git a/src/mongo/db/repl/tenant_migration_donor_service.cpp b/src/mongo/db/repl/tenant_migration_donor_service.cpp
index 269a6b74569..80e304c9cd7 100644
--- a/src/mongo/db/repl/tenant_migration_donor_service.cpp
+++ b/src/mongo/db/repl/tenant_migration_donor_service.cpp
@@ -91,6 +91,13 @@ bool shouldStopSendingRecipientCommand(Status status, const CancelationToken& to
return status.isOK() || !ErrorCodes::isRetriableError(status) || token.isCanceled();
}
+bool shouldStopFetchingRecipientClusterTimeKeyDocs(Status status, const CancelationToken& token) {
+ // TODO (SERVER-54926): Convert HostUnreachable error in
+ // _fetchAndStoreRecipientClusterTimeKeyDocs to specific error.
+ return status.isOK() || !ErrorCodes::isRetriableError(status) ||
+ status.code() == ErrorCodes::HostUnreachable || token.isCanceled();
+}
+
void checkIfReceivedDonorAbortMigration(const CancelationToken& serviceToken,
const CancelationToken& instanceToken) {
// If only the instance token was canceled, then we must have gotten donorAbortMigration.
@@ -369,87 +376,105 @@ TenantMigrationDonorService::Instance::_fetchAndStoreRecipientClusterTimeKeyDocs
std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS,
const CancelationToken& serviceToken,
const CancelationToken& instanceToken) {
- return recipientTargeterRS->findHost(kPrimaryOnlyReadPreference, instanceToken)
- .thenRunOn(**executor)
- .then([this, self = shared_from_this(), executor](HostAndPort host) {
- const auto nss = NamespaceString::kKeysCollectionNamespace;
-
- const auto cmdObj = [&] {
- FindCommand request(NamespaceStringOrUUID{nss});
- request.setReadConcern(
- repl::ReadConcernArgs(repl::ReadConcernLevel::kMajorityReadConcern)
- .toBSONInner());
- return request.toBSON(BSONObj());
- }();
-
- std::vector<ExternalKeysCollectionDocument> keyDocs;
- boost::optional<Status> fetchStatus;
-
- auto fetcherCallback = [this, self = shared_from_this(), &keyDocs, &fetchStatus](
- const Fetcher::QueryResponseStatus& dataStatus,
- Fetcher::NextAction* nextAction,
- BSONObjBuilder* getMoreBob) {
- // Throw out any accumulated results on error
- if (!dataStatus.isOK()) {
- fetchStatus = dataStatus.getStatus();
- keyDocs.clear();
- return;
- }
-
- const auto& data = dataStatus.getValue();
- for (const BSONObj& doc : data.documents) {
- keyDocs.push_back(tenant_migration_util::makeExternalClusterTimeKeyDoc(
- _stateDoc.getId(), doc.getOwned()));
- }
- fetchStatus = Status::OK();
- if (!getMoreBob) {
- return;
- }
- getMoreBob->append("getMore", data.cursorId);
- getMoreBob->append("collection", data.nss.coll());
- };
-
- auto fetcher = std::make_shared<Fetcher>(
- _recipientCmdExecutor.get(),
- host,
- nss.db().toString(),
- cmdObj,
- fetcherCallback,
- kPrimaryOnlyReadPreference.toContainingBSON(),
- executor::RemoteCommandRequest::kNoTimeout, /* findNetworkTimeout */
- executor::RemoteCommandRequest::kNoTimeout, /* getMoreNetworkTimeout */
- RemoteCommandRetryScheduler::makeRetryPolicy<ErrorCategory::RetriableError>(
- kMaxRecipientKeyDocsFindAttempts, executor::RemoteCommandRequest::kNoTimeout),
- _sslMode);
- uassertStatusOK(fetcher->schedule());
+ return AsyncTry([this,
+ self = shared_from_this(),
+ executor,
+ recipientTargeterRS,
+ serviceToken,
+ instanceToken] {
+ return recipientTargeterRS->findHost(kPrimaryOnlyReadPreference, instanceToken)
+ .thenRunOn(**executor)
+ .then([this, self = shared_from_this(), executor](HostAndPort host) {
+ const auto nss = NamespaceString::kKeysCollectionNamespace;
+
+ const auto cmdObj = [&] {
+ FindCommand request(NamespaceStringOrUUID{nss});
+ request.setReadConcern(
+ repl::ReadConcernArgs(repl::ReadConcernLevel::kMajorityReadConcern)
+ .toBSONInner());
+ return request.toBSON(BSONObj());
+ }();
+
+ std::vector<ExternalKeysCollectionDocument> keyDocs;
+ boost::optional<Status> fetchStatus;
+
+ auto fetcherCallback =
+ [this, self = shared_from_this(), &keyDocs, &fetchStatus](
+ const Fetcher::QueryResponseStatus& dataStatus,
+ Fetcher::NextAction* nextAction,
+ BSONObjBuilder* getMoreBob) {
+ // Throw out any accumulated results on error
+ if (!dataStatus.isOK()) {
+ fetchStatus = dataStatus.getStatus();
+ keyDocs.clear();
+ return;
+ }
+
+ const auto& data = dataStatus.getValue();
+ for (const BSONObj& doc : data.documents) {
+ keyDocs.push_back(
+ tenant_migration_util::makeExternalClusterTimeKeyDoc(
+ _stateDoc.getId(), doc.getOwned()));
+ }
+ fetchStatus = Status::OK();
+
+ if (!getMoreBob) {
+ return;
+ }
+ getMoreBob->append("getMore", data.cursorId);
+ getMoreBob->append("collection", data.nss.coll());
+ };
+
+ auto fetcher = std::make_shared<Fetcher>(
+ _recipientCmdExecutor.get(),
+ host,
+ nss.db().toString(),
+ cmdObj,
+ fetcherCallback,
+ kPrimaryOnlyReadPreference.toContainingBSON(),
+ executor::RemoteCommandRequest::kNoTimeout, /* findNetworkTimeout */
+ executor::RemoteCommandRequest::kNoTimeout, /* getMoreNetworkTimeout */
+ RemoteCommandRetryScheduler::makeRetryPolicy<
+ ErrorCategory::RetriableError>(
+ kMaxRecipientKeyDocsFindAttempts,
+ executor::RemoteCommandRequest::kNoTimeout),
+ _sslMode);
+ uassertStatusOK(fetcher->schedule());
+
+ {
+ stdx::lock_guard<Latch> lg(_mutex);
+ _recipientKeysFetcher = fetcher;
+ }
- {
- stdx::lock_guard<Latch> lg(_mutex);
- _recipientKeysFetcher = fetcher;
- }
+ fetcher->join();
- fetcher->join();
+ {
+ stdx::lock_guard<Latch> lg(_mutex);
+ _recipientKeysFetcher.reset();
+ }
- {
- stdx::lock_guard<Latch> lg(_mutex);
- _recipientKeysFetcher.reset();
- }
+ if (!fetchStatus) {
+ // The callback never got invoked.
+ uasserted(5340400, "Internal error running cursor callback in command");
+ }
+ uassertStatusOK(fetchStatus.get());
- if (!fetchStatus) {
- // The callback never got invoked.
- uasserted(5340400, "Internal error running cursor callback in command");
- }
- uassertStatusOK(fetchStatus.get());
+ return keyDocs;
+ })
+ .then([this, self = shared_from_this(), executor, serviceToken, instanceToken](
+ auto keyDocs) {
+ checkIfReceivedDonorAbortMigration(serviceToken, instanceToken);
- return keyDocs;
+ tenant_migration_util::storeExternalClusterTimeKeyDocs(executor,
+ std::move(keyDocs));
+ });
+ })
+ .until([instanceToken](Status status) {
+ return shouldStopFetchingRecipientClusterTimeKeyDocs(status, instanceToken);
})
- .then([this, self = shared_from_this(), executor, serviceToken, instanceToken](
- auto keyDocs) {
- checkIfReceivedDonorAbortMigration(serviceToken, instanceToken);
-
- tenant_migration_util::storeExternalClusterTimeKeyDocs(executor, std::move(keyDocs));
- });
+ .withBackoffBetweenIterations(kExponentialBackoff)
+ .on(**executor, CancelationToken::uncancelable());
}
ExecutorFuture<repl::OpTime> TenantMigrationDonorService::Instance::_insertStateDoc(