From 669b63d34bdf887e083081630084b41edeef40d2 Mon Sep 17 00:00:00 2001 From: Mark Benvenuto Date: Mon, 9 May 2022 12:10:40 -0400 Subject: SERVER-66317 Make owned bson copies for txn api threads (cherry picked from commit cc0335394bee5441350a36fd9351d5746a6000d8) --- src/mongo/db/commands/find_and_modify.cpp | 5 +- src/mongo/db/fle_crud.cpp | 101 ++++++++++++++++++++---------- src/mongo/db/fle_crud.h | 22 ++++++- src/mongo/db/fle_crud_mongod.cpp | 7 ++- 4 files changed, 94 insertions(+), 41 deletions(-) (limited to 'src/mongo/db') diff --git a/src/mongo/db/commands/find_and_modify.cpp b/src/mongo/db/commands/find_and_modify.cpp index fb8f0b13468..00203a4c485 100644 --- a/src/mongo/db/commands/find_and_modify.cpp +++ b/src/mongo/db/commands/find_and_modify.cpp @@ -550,14 +550,15 @@ void CmdFindAndModify::Invocation::explain(OperationContext* opCtx, const BSONObj& cmdObj = this->request().toBSON(BSONObj() /* commandPassthroughFields */); validate(this->request()); - auto request = [&]() { + auto requestAndMsg = [&]() { if (this->request().getEncryptionInformation().has_value() && !this->request().getEncryptionInformation()->getCrudProcessed().get_value_or(false)) { return processFLEFindAndModifyExplainMongod(opCtx, this->request()); } else { - return this->request(); + return std::pair{this->request(), OpMsgRequest()}; } }(); + auto request = requestAndMsg.first; const NamespaceString& nsString = request.getNamespace(); uassertStatusOK(userAllowedWriteNS(opCtx, nsString)); diff --git a/src/mongo/db/fle_crud.cpp b/src/mongo/db/fle_crud.cpp index 12d7a3be6ca..3cdaa83e8a8 100644 --- a/src/mongo/db/fle_crud.cpp +++ b/src/mongo/db/fle_crud.cpp @@ -268,29 +268,36 @@ std::pair processInsert( write_ops::DeleteCommandReply processDelete(OperationContext* opCtx, const write_ops::DeleteCommandRequest& deleteRequest, GetTxnCallback getTxns) { + { + auto deletes = deleteRequest.getDeletes(); + uassert(6371302, "Only single document deletes are permitted", deletes.size() == 1); - auto deletes = deleteRequest.getDeletes(); - uassert(6371302, "Only single document deletes are permitted", deletes.size() == 1); + auto deleteOpEntry = deletes[0]; - auto deleteOpEntry = deletes[0]; - - uassert( - 6371303, "FLE only supports single document deletes", deleteOpEntry.getMulti() == false); + uassert(6371303, + "FLE only supports single document deletes", + deleteOpEntry.getMulti() == false); + } std::shared_ptr trun = getTxns(opCtx); auto reply = std::make_shared(); - auto expCtx = makeExpCtx(opCtx, deleteRequest, deleteOpEntry); + auto ownedRequest = deleteRequest.serialize({}); + auto ownedDeleteRequest = + write_ops::DeleteCommandRequest::parse(IDLParserErrorContext("delete"), ownedRequest); + auto ownedDeleteOpEntry = ownedDeleteRequest.getDeletes()[0]; + + auto expCtx = makeExpCtx(opCtx, ownedDeleteRequest, ownedDeleteOpEntry); // The function that handles the transaction may outlive this function so we need to use // shared_ptrs - auto deleteBlock = std::make_tuple(deleteRequest, expCtx); + auto deleteBlock = std::make_tuple(ownedDeleteRequest, expCtx); auto sharedDeleteBlock = std::make_shared(deleteBlock); auto swResult = trun->runNoThrow( opCtx, - [sharedDeleteBlock, reply](const txn_api::TransactionClient& txnClient, - ExecutorPtr txnExec) { + [sharedDeleteBlock, ownedRequest, reply](const txn_api::TransactionClient& txnClient, + ExecutorPtr txnExec) { FLEQueryInterfaceImpl queryImpl(txnClient, getGlobalServiceContext()); auto [deleteRequest2, expCtx2] = *sharedDeleteBlock.get(); @@ -339,19 +346,23 @@ write_ops::UpdateCommandReply processUpdate(OperationContext* opCtx, const write_ops::UpdateCommandRequest& updateRequest, GetTxnCallback getTxns) { - auto updates = updateRequest.getUpdates(); - uassert(6371502, "Only single document updates are permitted", updates.size() == 1); + { + auto updates = updateRequest.getUpdates(); + uassert(6371502, "Only single document updates are permitted", updates.size() == 1); - auto updateOpEntry = updates[0]; + auto updateOpEntry = updates[0]; - uassert( - 6371503, "FLE only supports single document updates", updateOpEntry.getMulti() == false); + uassert(6371503, + "FLE only supports single document updates", + updateOpEntry.getMulti() == false); - // pipeline - is agg specific, delta is oplog, transform is internal (timeseries) - uassert(6371517, - "FLE only supports modifier and replacement style updates", - updateOpEntry.getU().type() == write_ops::UpdateModification::Type::kModifier || - updateOpEntry.getU().type() == write_ops::UpdateModification::Type::kReplacement); + // pipeline - is agg specific, delta is oplog, transform is internal (timeseries) + uassert(6371517, + "FLE only supports modifier and replacement style updates", + updateOpEntry.getU().type() == write_ops::UpdateModification::Type::kModifier || + updateOpEntry.getU().type() == + write_ops::UpdateModification::Type::kReplacement); + } std::shared_ptr trun = getTxns(opCtx); @@ -359,14 +370,19 @@ write_ops::UpdateCommandReply processUpdate(OperationContext* opCtx, // shared_ptrs auto reply = std::make_shared(); - auto expCtx = makeExpCtx(opCtx, updateRequest, updateOpEntry); - auto updateBlock = std::make_tuple(updateRequest, expCtx); + auto ownedRequest = updateRequest.serialize({}); + auto ownedUpdateRequest = + write_ops::UpdateCommandRequest::parse(IDLParserErrorContext("update"), ownedRequest); + auto ownedUpdateOpEntry = ownedUpdateRequest.getUpdates()[0]; + + auto expCtx = makeExpCtx(opCtx, ownedUpdateRequest, ownedUpdateOpEntry); + auto updateBlock = std::make_tuple(ownedUpdateRequest, expCtx); auto sharedupdateBlock = std::make_shared(updateBlock); auto swResult = trun->runNoThrow( opCtx, - [sharedupdateBlock, reply](const txn_api::TransactionClient& txnClient, - ExecutorPtr txnExec) { + [sharedupdateBlock, reply, ownedRequest](const txn_api::TransactionClient& txnClient, + ExecutorPtr txnExec) { FLEQueryInterfaceImpl queryImpl(txnClient, getGlobalServiceContext()); auto [updateRequest2, expCtx2] = *sharedupdateBlock.get(); @@ -595,7 +611,7 @@ std::shared_ptr constructDefaultReply() } // namespace template -StatusWith processFindAndModifyRequest( +StatusWith> processFindAndModifyRequest( OperationContext* opCtx, const write_ops::FindAndModifyCommandRequest& findAndModifyRequest, GetTxnCallback getTxns, @@ -627,18 +643,22 @@ StatusWith processFindAndModifyRequest( std::shared_ptr trun = getTxns(opCtx); - auto expCtx = makeExpCtx(opCtx, findAndModifyRequest, findAndModifyRequest); - // The function that handles the transaction may outlive this function so we need to use // shared_ptrs std::shared_ptr reply = constructDefaultReply(); - auto findAndModifyBlock = std::make_tuple(findAndModifyRequest, expCtx); + + auto ownedRequest = findAndModifyRequest.serialize({}); + auto ownedFindAndModifyRequest = write_ops::FindAndModifyCommandRequest::parse( + IDLParserErrorContext("findAndModify"), ownedRequest); + + auto expCtx = makeExpCtx(opCtx, ownedFindAndModifyRequest, ownedFindAndModifyRequest); + auto findAndModifyBlock = std::make_tuple(ownedFindAndModifyRequest, expCtx); auto sharedFindAndModifyBlock = std::make_shared(findAndModifyBlock); auto swResult = trun->runNoThrow( opCtx, - [sharedFindAndModifyBlock, reply, processCallback]( + [sharedFindAndModifyBlock, ownedRequest, reply, processCallback]( const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) { FLEQueryInterfaceImpl queryImpl(txnClient, getGlobalServiceContext()); @@ -665,9 +685,23 @@ StatusWith processFindAndModifyRequest( return swResult.getValue().getEffectiveStatus(); } - return *reply; + return std::pair{*reply, ownedRequest}; } +template StatusWith> +processFindAndModifyRequest( + OperationContext* opCtx, + const write_ops::FindAndModifyCommandRequest& findAndModifyRequest, + GetTxnCallback getTxns, + ProcessFindAndModifyCallback processCallback); + +template StatusWith> +processFindAndModifyRequest( + OperationContext* opCtx, + const write_ops::FindAndModifyCommandRequest& findAndModifyRequest, + GetTxnCallback getTxns, + ProcessFindAndModifyCallback processCallback); + FLEQueryInterface::~FLEQueryInterface() {} StatusWith processInsert( @@ -1120,15 +1154,16 @@ FLEBatchResult processFLEFindAndModify(OperationContext* opCtx, auto swReply = processFindAndModifyRequest( opCtx, request, &getTransactionWithRetriesForMongoS); - auto reply = uassertStatusOK(swReply); + auto reply = uassertStatusOK(swReply).first; reply.serialize(&result); return FLEBatchResult::kProcessed; } -write_ops::FindAndModifyCommandRequest processFLEFindAndModifyExplainMongos( - OperationContext* opCtx, const write_ops::FindAndModifyCommandRequest& request) { +std::pair +processFLEFindAndModifyExplainMongos(OperationContext* opCtx, + const write_ops::FindAndModifyCommandRequest& request) { tassert(6513400, "Missing encryptionInformation for findAndModify", request.getEncryptionInformation().has_value()); diff --git a/src/mongo/db/fle_crud.h b/src/mongo/db/fle_crud.h index 0d5531fdb7f..738e85b8996 100644 --- a/src/mongo/db/fle_crud.h +++ b/src/mongo/db/fle_crud.h @@ -45,6 +45,7 @@ #include "mongo/db/server_options.h" #include "mongo/db/service_context.h" #include "mongo/db/transaction_api.h" +#include "mongo/rpc/op_msg.h" #include "mongo/s/write_ops/batch_write_exec.h" #include "mongo/s/write_ops/batched_command_response.h" @@ -151,7 +152,8 @@ FLEBatchResult processFLEFindAndModify(OperationContext* opCtx, const BSONObj& cmdObj, BSONObjBuilder& result); -write_ops::FindAndModifyCommandRequest processFLEFindAndModifyExplainMongos( +std::pair +processFLEFindAndModifyExplainMongos( OperationContext* opCtx, const write_ops::FindAndModifyCommandRequest& findAndModifyRequest); /** @@ -160,7 +162,8 @@ write_ops::FindAndModifyCommandRequest processFLEFindAndModifyExplainMongos( write_ops::FindAndModifyCommandReply processFLEFindAndModify( OperationContext* opCtx, const write_ops::FindAndModifyCommandRequest& findAndModifyRequest); -write_ops::FindAndModifyCommandRequest processFLEFindAndModifyExplainMongod( +std::pair +processFLEFindAndModifyExplainMongod( OperationContext* opCtx, const write_ops::FindAndModifyCommandRequest& findAndModifyRequest); /** @@ -462,12 +465,25 @@ using ProcessFindAndModifyCallback = const write_ops::FindAndModifyCommandRequest& findAndModifyRequest)>; template -StatusWith processFindAndModifyRequest( +StatusWith> processFindAndModifyRequest( OperationContext* opCtx, const write_ops::FindAndModifyCommandRequest& findAndModifyRequest, GetTxnCallback getTxns, ProcessFindAndModifyCallback processCallback = processFindAndModify); +extern template StatusWith> +processFindAndModifyRequest( + OperationContext* opCtx, + const write_ops::FindAndModifyCommandRequest& findAndModifyRequest, + GetTxnCallback getTxns, + ProcessFindAndModifyCallback processCallback); + +extern template StatusWith> +processFindAndModifyRequest( + OperationContext* opCtx, + const write_ops::FindAndModifyCommandRequest& findAndModifyRequest, + GetTxnCallback getTxns, + ProcessFindAndModifyCallback processCallback); write_ops::UpdateCommandReply processUpdate(OperationContext* opCtx, const write_ops::UpdateCommandRequest& updateRequest, diff --git a/src/mongo/db/fle_crud_mongod.cpp b/src/mongo/db/fle_crud_mongod.cpp index 1221a8a572a..d198a35f398 100644 --- a/src/mongo/db/fle_crud_mongod.cpp +++ b/src/mongo/db/fle_crud_mongod.cpp @@ -227,7 +227,7 @@ write_ops::FindAndModifyCommandReply processFLEFindAndModify( auto reply = processFindAndModifyRequest( opCtx, findAndModifyRequest, &getTransactionWithRetriesForMongoD); - return uassertStatusOK(reply); + return uassertStatusOK(reply).first; } write_ops::UpdateCommandReply processFLEUpdate( @@ -282,8 +282,9 @@ BSONObj processFLEWriteExplainD(OperationContext* opCtx, return fle::rewriteQuery(opCtx, expCtx, nss, info, query, &getTransactionWithRetriesForMongoD); } -write_ops::FindAndModifyCommandRequest processFLEFindAndModifyExplainMongod( - OperationContext* opCtx, const write_ops::FindAndModifyCommandRequest& request) { +std::pair +processFLEFindAndModifyExplainMongod(OperationContext* opCtx, + const write_ops::FindAndModifyCommandRequest& request) { tassert(6513401, "Missing encryptionInformation for findAndModify", request.getEncryptionInformation().has_value()); -- cgit v1.2.1