diff options
author | Mark Benvenuto <mark.benvenuto@mongodb.com> | 2022-05-04 15:42:29 -0400 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-05-05 16:47:46 +0000 |
commit | c23236c1b63f147f950d921a5411749e637d54ae (patch) | |
tree | 2afe6194df4db920ad371c2ed3738ab031b9989f /src/mongo/db/fle_crud.cpp | |
parent | bae75df36389b0c5ca59df2a9a88067e03990561 (diff) | |
download | mongo-c23236c1b63f147f950d921a5411749e637d54ae.tar.gz |
SERVER-66064 Optimize FLE2 document counting
Diffstat (limited to 'src/mongo/db/fle_crud.cpp')
-rw-r--r-- | src/mongo/db/fle_crud.cpp | 70 |
1 files changed, 24 insertions, 46 deletions
diff --git a/src/mongo/db/fle_crud.cpp b/src/mongo/db/fle_crud.cpp index b673809ebdf..8c7f178bfc0 100644 --- a/src/mongo/db/fle_crud.cpp +++ b/src/mongo/db/fle_crud.cpp @@ -46,9 +46,11 @@ #include "mongo/db/query/collation/collator_factory_interface.h" #include "mongo/db/query/find_command_gen.h" #include "mongo/db/query/fle/server_rewrite.h" +#include "mongo/db/service_context.h" #include "mongo/db/transaction_api.h" #include "mongo/idl/idl_parser.h" #include "mongo/logv2/log.h" +#include "mongo/rpc/factory.h" #include "mongo/s/grid.h" #include "mongo/s/transaction_router_resource_yielder.h" #include "mongo/s/write_ops/batch_write_exec.h" @@ -216,7 +218,7 @@ std::pair<FLEBatchResult, write_ops::InsertCommandReply> processInsert( opCtx, [sharedInsertBlock, reply, ownedDocument](const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) { - FLEQueryInterfaceImpl queryImpl(txnClient); + FLEQueryInterfaceImpl queryImpl(txnClient, getGlobalServiceContext()); auto [edcNss2, efc2, serverPayload2, stmtId2] = *sharedInsertBlock.get(); @@ -288,7 +290,7 @@ write_ops::DeleteCommandReply processDelete(OperationContext* opCtx, opCtx, [sharedDeleteBlock, reply](const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) { - FLEQueryInterfaceImpl queryImpl(txnClient); + FLEQueryInterfaceImpl queryImpl(txnClient, getGlobalServiceContext()); auto [deleteRequest2, expCtx2] = *sharedDeleteBlock.get(); @@ -364,7 +366,7 @@ write_ops::UpdateCommandReply processUpdate(OperationContext* opCtx, opCtx, [sharedupdateBlock, reply](const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) { - FLEQueryInterfaceImpl queryImpl(txnClient); + FLEQueryInterfaceImpl queryImpl(txnClient, getGlobalServiceContext()); auto [updateRequest2, expCtx2] = *sharedupdateBlock.get(); @@ -637,7 +639,7 @@ StatusWith<ReplyType> processFindAndModifyRequest( opCtx, [sharedFindAndModifyBlock, reply, processCallback]( const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) { - FLEQueryInterfaceImpl queryImpl(txnClient); + FLEQueryInterfaceImpl queryImpl(txnClient, getGlobalServiceContext()); auto [findAndModifyRequest2, expCtx] = *sharedFindAndModifyBlock.get(); @@ -1169,55 +1171,31 @@ BSONObj FLEQueryInterfaceImpl::getById(const NamespaceString& nss, BSONElement e } uint64_t FLEQueryInterfaceImpl::countDocuments(const NamespaceString& nss) { - // TODO - what about - // count cmd - // $collStats - // approxCount - - // Build the following pipeline: - // - //{ aggregate : "testColl", pipeline: [{$match:{}}, {$group : {_id: null, n : {$sum:1} - //}} ], cursor: {}} - - BSONObjBuilder builder; - // $db - TXN API DOES THIS FOR US by building OP_MSG - builder.append("aggregate", nss.coll()); - - AggregateCommandRequest request(nss); - - std::vector<BSONObj> pipeline; - pipeline.push_back(BSON("$match" << BSONObj())); - - { - BSONObjBuilder sub; - { - BSONObjBuilder sub2(sub.subobjStart("$group")); - sub2.appendNull("_id"); - { - BSONObjBuilder sub3(sub.subobjStart("n")); - sub3.append("$sum", 1); - } - } + // Since count() does not work in a transaction, call count() by bypassing the transaction api + invariant(!haveClient()); + auto client = _serviceContext->makeClient("SEP-int-fle-crud"); + AlternativeClientRegion clientRegion(client); + auto opCtx = cc().makeOperationContext(); - pipeline.push_back(sub.obj()); - } + CountCommandRequest ccr(nss); + auto opMsgRequest = ccr.serialize(BSONObj()); - request.setPipeline(pipeline); + auto requestMessage = opMsgRequest.serialize(); - auto commandResponse = _txnClient.runCommand(nss.db(), request.toBSON({})).get(); + auto serviceEntryPoint = opCtx->getServiceContext()->getServiceEntryPoint(); + DbResponse dbResponse = serviceEntryPoint->handleRequest(opCtx.get(), requestMessage).get(); - uint64_t docCount = 0; - auto cursorResponse = uassertStatusOK(CursorResponse::parseFromBSON(commandResponse)); + auto reply = rpc::makeReply(&dbResponse.response)->getCommandReply().getOwned(); + + auto status = getStatusFromWriteCommandReply(reply); + uassertStatusOK(status); - auto firstBatch = cursorResponse.getBatch(); - if (!firstBatch.empty()) { - auto countObj = firstBatch.front(); - int64_t signedDocCount = countObj.getIntField("n"_sd); - uassert(6520701, "Unexpected negative document count", signedDocCount >= 0); - docCount = static_cast<uint64_t>(signedDocCount); + int64_t signedDocCount = reply.getIntField("n"_sd); + if (signedDocCount < 0) { + signedDocCount = 0; } - return docCount; + return static_cast<uint64_t>(signedDocCount); } StatusWith<write_ops::InsertCommandReply> FLEQueryInterfaceImpl::insertDocument( |