diff options
author | Jason Zhang <jason.zhang@mongodb.com> | 2022-04-14 16:12:08 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-04-20 18:05:45 +0000 |
commit | ec279adb4b46a9c1a1792d36a37bcf54392e3f3f (patch) | |
tree | 6ff0b7f992abd522d363ab2ac514b608423c2e37 | |
parent | db1f183c4e7339c4a13d7e8345bb187d6d0348fa (diff) | |
download | mongo-ec279adb4b46a9c1a1792d36a37bcf54392e3f3f.tar.gz |
SERVER-64793 Make exhaustiveFind asynchronous
(cherry picked from commit 06625aa0a8fc826d9d5d80e8604b0456ffcd7d4a)
-rw-r--r-- | src/mongo/base/error_codes.yml | 2 | ||||
-rw-r--r-- | src/mongo/db/transaction_api.cpp | 65 |
2 files changed, 45 insertions, 22 deletions
diff --git a/src/mongo/base/error_codes.yml b/src/mongo/base/error_codes.yml index 0283537e251..b5d308c44d3 100644 --- a/src/mongo/base/error_codes.yml +++ b/src/mongo/base/error_codes.yml @@ -486,6 +486,8 @@ error_codes: - {code: 372, name: CloseConnectionForShutdownCommand, categories: [CloseConnectionError,InternalOnly]} + - {code: 373, name: InternalTransactionsExhaustiveFindHasMore} + # Error codes 4000-8999 are reserved. # Non-sequential error codes for compatibility only) diff --git a/src/mongo/db/transaction_api.cpp b/src/mongo/db/transaction_api.cpp index 7888e701122..efc74708ddc 100644 --- a/src/mongo/db/transaction_api.cpp +++ b/src/mongo/db/transaction_api.cpp @@ -58,6 +58,8 @@ #include "mongo/rpc/reply_interface.h" #include "mongo/stdx/future.h" #include "mongo/transport/service_entry_point.h" +#include "mongo/util/cancellation.h" +#include "mongo/util/future_util.h" // TODO SERVER-65395: Remove failpoint when fle2 tests can reliably support internal transaction // retry limit. @@ -319,31 +321,50 @@ SemiFuture<BatchedCommandResponse> SEPTransactionClient::runCRUDOp( SemiFuture<std::vector<BSONObj>> SEPTransactionClient::exhaustiveFind( const FindCommandRequest& cmd) const { - // TODO SERVER-64793: Make exhaustiveFind asynchronous return runCommand(cmd.getDbName(), cmd.toBSON({})) .thenRunOn(_executor) .then([this, batchSize = cmd.getBatchSize()](BSONObj reply) { - std::vector<BSONObj> response; - auto cursorResponse = uassertStatusOK(CursorResponse::parseFromBSON(reply)); - while (true) { - auto releasedBatch = cursorResponse.releaseBatch(); - response.insert(response.end(), releasedBatch.begin(), releasedBatch.end()); - - // We keep issuing getMores until the cursorId signifies that there are no more - // documents to fetch. - if (!cursorResponse.getCursorId()) { - break; - } - - GetMoreCommandRequest getMoreRequest(cursorResponse.getCursorId(), - cursorResponse.getNSS().coll().toString()); - getMoreRequest.setBatchSize(batchSize); - - // We block until we get the response back from runCommand(). - cursorResponse = uassertStatusOK(CursorResponse::parseFromBSON( - runCommand(cursorResponse.getNSS().db(), getMoreRequest.toBSON({})).get())); - } - return response; + auto cursorResponse = std::make_shared<CursorResponse>( + uassertStatusOK(CursorResponse::parseFromBSON(reply))); + auto response = std::make_shared<std::vector<BSONObj>>(); + return AsyncTry([this, + batchSize = batchSize, + cursorResponse = std::move(cursorResponse), + response]() mutable { + auto releasedBatch = cursorResponse->releaseBatch(); + response->insert( + response->end(), releasedBatch.begin(), releasedBatch.end()); + + // If we've fetched all the documents, we can return the response vector + // wrapped in an OK status. + if (!cursorResponse->getCursorId()) { + return SemiFuture<void>(Status::OK()); + } + + GetMoreCommandRequest getMoreRequest( + cursorResponse->getCursorId(), + cursorResponse->getNSS().coll().toString()); + getMoreRequest.setBatchSize(batchSize); + + return runCommand(cursorResponse->getNSS().db(), getMoreRequest.toBSON({})) + .thenRunOn(_executor) + .then([response, cursorResponse](BSONObj reply) { + // We keep the state of cursorResponse to be able to check the + // cursorId in the next iteration. + *cursorResponse = + uassertStatusOK(CursorResponse::parseFromBSON(reply)); + uasserted(ErrorCodes::InternalTransactionsExhaustiveFindHasMore, + "More documents to fetch"); + }) + .semi(); + }) + .until([&](Status result) { + // We stop execution if there is either no more documents to fetch or there was + // an error upon fetching more documents. + return result != ErrorCodes::InternalTransactionsExhaustiveFindHasMore; + }) + .on(_executor, CancellationToken::uncancelable()) + .then([response = std::move(response)] { return std::move(*response); }); }) .semi(); } |