summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJason Zhang <jason.zhang@mongodb.com>2022-04-14 16:12:08 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-04-20 18:05:45 +0000
commitec279adb4b46a9c1a1792d36a37bcf54392e3f3f (patch)
tree6ff0b7f992abd522d363ab2ac514b608423c2e37
parentdb1f183c4e7339c4a13d7e8345bb187d6d0348fa (diff)
downloadmongo-ec279adb4b46a9c1a1792d36a37bcf54392e3f3f.tar.gz
SERVER-64793 Make exhaustiveFind asynchronous
(cherry picked from commit 06625aa0a8fc826d9d5d80e8604b0456ffcd7d4a)
-rw-r--r--src/mongo/base/error_codes.yml2
-rw-r--r--src/mongo/db/transaction_api.cpp65
2 files changed, 45 insertions, 22 deletions
diff --git a/src/mongo/base/error_codes.yml b/src/mongo/base/error_codes.yml
index 0283537e251..b5d308c44d3 100644
--- a/src/mongo/base/error_codes.yml
+++ b/src/mongo/base/error_codes.yml
@@ -486,6 +486,8 @@ error_codes:
- {code: 372, name: CloseConnectionForShutdownCommand, categories: [CloseConnectionError,InternalOnly]}
+ - {code: 373, name: InternalTransactionsExhaustiveFindHasMore}
+
# Error codes 4000-8999 are reserved.
# Non-sequential error codes for compatibility only)
diff --git a/src/mongo/db/transaction_api.cpp b/src/mongo/db/transaction_api.cpp
index 7888e701122..efc74708ddc 100644
--- a/src/mongo/db/transaction_api.cpp
+++ b/src/mongo/db/transaction_api.cpp
@@ -58,6 +58,8 @@
#include "mongo/rpc/reply_interface.h"
#include "mongo/stdx/future.h"
#include "mongo/transport/service_entry_point.h"
+#include "mongo/util/cancellation.h"
+#include "mongo/util/future_util.h"
// TODO SERVER-65395: Remove failpoint when fle2 tests can reliably support internal transaction
// retry limit.
@@ -319,31 +321,50 @@ SemiFuture<BatchedCommandResponse> SEPTransactionClient::runCRUDOp(
SemiFuture<std::vector<BSONObj>> SEPTransactionClient::exhaustiveFind(
const FindCommandRequest& cmd) const {
- // TODO SERVER-64793: Make exhaustiveFind asynchronous
return runCommand(cmd.getDbName(), cmd.toBSON({}))
.thenRunOn(_executor)
.then([this, batchSize = cmd.getBatchSize()](BSONObj reply) {
- std::vector<BSONObj> response;
- auto cursorResponse = uassertStatusOK(CursorResponse::parseFromBSON(reply));
- while (true) {
- auto releasedBatch = cursorResponse.releaseBatch();
- response.insert(response.end(), releasedBatch.begin(), releasedBatch.end());
-
- // We keep issuing getMores until the cursorId signifies that there are no more
- // documents to fetch.
- if (!cursorResponse.getCursorId()) {
- break;
- }
-
- GetMoreCommandRequest getMoreRequest(cursorResponse.getCursorId(),
- cursorResponse.getNSS().coll().toString());
- getMoreRequest.setBatchSize(batchSize);
-
- // We block until we get the response back from runCommand().
- cursorResponse = uassertStatusOK(CursorResponse::parseFromBSON(
- runCommand(cursorResponse.getNSS().db(), getMoreRequest.toBSON({})).get()));
- }
- return response;
+ auto cursorResponse = std::make_shared<CursorResponse>(
+ uassertStatusOK(CursorResponse::parseFromBSON(reply)));
+ auto response = std::make_shared<std::vector<BSONObj>>();
+ return AsyncTry([this,
+ batchSize = batchSize,
+ cursorResponse = std::move(cursorResponse),
+ response]() mutable {
+ auto releasedBatch = cursorResponse->releaseBatch();
+ response->insert(
+ response->end(), releasedBatch.begin(), releasedBatch.end());
+
+ // If we've fetched all the documents, we can return the response vector
+ // wrapped in an OK status.
+ if (!cursorResponse->getCursorId()) {
+ return SemiFuture<void>(Status::OK());
+ }
+
+ GetMoreCommandRequest getMoreRequest(
+ cursorResponse->getCursorId(),
+ cursorResponse->getNSS().coll().toString());
+ getMoreRequest.setBatchSize(batchSize);
+
+ return runCommand(cursorResponse->getNSS().db(), getMoreRequest.toBSON({}))
+ .thenRunOn(_executor)
+ .then([response, cursorResponse](BSONObj reply) {
+ // We keep the state of cursorResponse to be able to check the
+ // cursorId in the next iteration.
+ *cursorResponse =
+ uassertStatusOK(CursorResponse::parseFromBSON(reply));
+ uasserted(ErrorCodes::InternalTransactionsExhaustiveFindHasMore,
+ "More documents to fetch");
+ })
+ .semi();
+ })
+ .until([&](Status result) {
+ // We stop execution if there is either no more documents to fetch or there was
+ // an error upon fetching more documents.
+ return result != ErrorCodes::InternalTransactionsExhaustiveFindHasMore;
+ })
+ .on(_executor, CancellationToken::uncancelable())
+ .then([response = std::move(response)] { return std::move(*response); });
})
.semi();
}