summaryrefslogtreecommitdiff
path: root/src/mongo/db/fle_crud.cpp
diff options
context:
space:
mode:
authorMark Benvenuto <mark.benvenuto@mongodb.com>2022-05-04 15:42:29 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-05-05 16:47:46 +0000
commitc23236c1b63f147f950d921a5411749e637d54ae (patch)
tree2afe6194df4db920ad371c2ed3738ab031b9989f /src/mongo/db/fle_crud.cpp
parentbae75df36389b0c5ca59df2a9a88067e03990561 (diff)
downloadmongo-c23236c1b63f147f950d921a5411749e637d54ae.tar.gz
SERVER-66064 Optimize FLE2 document counting
Diffstat (limited to 'src/mongo/db/fle_crud.cpp')
-rw-r--r--src/mongo/db/fle_crud.cpp70
1 files changed, 24 insertions, 46 deletions
diff --git a/src/mongo/db/fle_crud.cpp b/src/mongo/db/fle_crud.cpp
index b673809ebdf..8c7f178bfc0 100644
--- a/src/mongo/db/fle_crud.cpp
+++ b/src/mongo/db/fle_crud.cpp
@@ -46,9 +46,11 @@
#include "mongo/db/query/collation/collator_factory_interface.h"
#include "mongo/db/query/find_command_gen.h"
#include "mongo/db/query/fle/server_rewrite.h"
+#include "mongo/db/service_context.h"
#include "mongo/db/transaction_api.h"
#include "mongo/idl/idl_parser.h"
#include "mongo/logv2/log.h"
+#include "mongo/rpc/factory.h"
#include "mongo/s/grid.h"
#include "mongo/s/transaction_router_resource_yielder.h"
#include "mongo/s/write_ops/batch_write_exec.h"
@@ -216,7 +218,7 @@ std::pair<FLEBatchResult, write_ops::InsertCommandReply> processInsert(
opCtx,
[sharedInsertBlock, reply, ownedDocument](const txn_api::TransactionClient& txnClient,
ExecutorPtr txnExec) {
- FLEQueryInterfaceImpl queryImpl(txnClient);
+ FLEQueryInterfaceImpl queryImpl(txnClient, getGlobalServiceContext());
auto [edcNss2, efc2, serverPayload2, stmtId2] = *sharedInsertBlock.get();
@@ -288,7 +290,7 @@ write_ops::DeleteCommandReply processDelete(OperationContext* opCtx,
opCtx,
[sharedDeleteBlock, reply](const txn_api::TransactionClient& txnClient,
ExecutorPtr txnExec) {
- FLEQueryInterfaceImpl queryImpl(txnClient);
+ FLEQueryInterfaceImpl queryImpl(txnClient, getGlobalServiceContext());
auto [deleteRequest2, expCtx2] = *sharedDeleteBlock.get();
@@ -364,7 +366,7 @@ write_ops::UpdateCommandReply processUpdate(OperationContext* opCtx,
opCtx,
[sharedupdateBlock, reply](const txn_api::TransactionClient& txnClient,
ExecutorPtr txnExec) {
- FLEQueryInterfaceImpl queryImpl(txnClient);
+ FLEQueryInterfaceImpl queryImpl(txnClient, getGlobalServiceContext());
auto [updateRequest2, expCtx2] = *sharedupdateBlock.get();
@@ -637,7 +639,7 @@ StatusWith<ReplyType> processFindAndModifyRequest(
opCtx,
[sharedFindAndModifyBlock, reply, processCallback](
const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) {
- FLEQueryInterfaceImpl queryImpl(txnClient);
+ FLEQueryInterfaceImpl queryImpl(txnClient, getGlobalServiceContext());
auto [findAndModifyRequest2, expCtx] = *sharedFindAndModifyBlock.get();
@@ -1169,55 +1171,31 @@ BSONObj FLEQueryInterfaceImpl::getById(const NamespaceString& nss, BSONElement e
}
uint64_t FLEQueryInterfaceImpl::countDocuments(const NamespaceString& nss) {
- // TODO - what about
- // count cmd
- // $collStats
- // approxCount
-
- // Build the following pipeline:
- //
- //{ aggregate : "testColl", pipeline: [{$match:{}}, {$group : {_id: null, n : {$sum:1}
- //}} ], cursor: {}}
-
- BSONObjBuilder builder;
- // $db - TXN API DOES THIS FOR US by building OP_MSG
- builder.append("aggregate", nss.coll());
-
- AggregateCommandRequest request(nss);
-
- std::vector<BSONObj> pipeline;
- pipeline.push_back(BSON("$match" << BSONObj()));
-
- {
- BSONObjBuilder sub;
- {
- BSONObjBuilder sub2(sub.subobjStart("$group"));
- sub2.appendNull("_id");
- {
- BSONObjBuilder sub3(sub.subobjStart("n"));
- sub3.append("$sum", 1);
- }
- }
+ // Since count() does not work in a transaction, call count() by bypassing the transaction api
+ invariant(!haveClient());
+ auto client = _serviceContext->makeClient("SEP-int-fle-crud");
+ AlternativeClientRegion clientRegion(client);
+ auto opCtx = cc().makeOperationContext();
- pipeline.push_back(sub.obj());
- }
+ CountCommandRequest ccr(nss);
+ auto opMsgRequest = ccr.serialize(BSONObj());
- request.setPipeline(pipeline);
+ auto requestMessage = opMsgRequest.serialize();
- auto commandResponse = _txnClient.runCommand(nss.db(), request.toBSON({})).get();
+ auto serviceEntryPoint = opCtx->getServiceContext()->getServiceEntryPoint();
+ DbResponse dbResponse = serviceEntryPoint->handleRequest(opCtx.get(), requestMessage).get();
- uint64_t docCount = 0;
- auto cursorResponse = uassertStatusOK(CursorResponse::parseFromBSON(commandResponse));
+ auto reply = rpc::makeReply(&dbResponse.response)->getCommandReply().getOwned();
+
+ auto status = getStatusFromWriteCommandReply(reply);
+ uassertStatusOK(status);
- auto firstBatch = cursorResponse.getBatch();
- if (!firstBatch.empty()) {
- auto countObj = firstBatch.front();
- int64_t signedDocCount = countObj.getIntField("n"_sd);
- uassert(6520701, "Unexpected negative document count", signedDocCount >= 0);
- docCount = static_cast<uint64_t>(signedDocCount);
+ int64_t signedDocCount = reply.getIntField("n"_sd);
+ if (signedDocCount < 0) {
+ signedDocCount = 0;
}
- return docCount;
+ return static_cast<uint64_t>(signedDocCount);
}
StatusWith<write_ops::InsertCommandReply> FLEQueryInterfaceImpl::insertDocument(