diff options
author | Mark Benvenuto <mark.benvenuto@mongodb.com> | 2022-05-09 12:10:40 -0400 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-05-10 21:26:02 +0000 |
commit | cc0335394bee5441350a36fd9351d5746a6000d8 (patch) | |
tree | 102494d25f5d39fe8d43fa05318966113c780df6 /src/mongo/db/fle_crud.cpp | |
parent | 15aa9218aaed0b522b6c672cac6324c2a15458f8 (diff) | |
download | mongo-cc0335394bee5441350a36fd9351d5746a6000d8.tar.gz |
SERVER-66317 Make owned bson copies for txn api threads
Diffstat (limited to 'src/mongo/db/fle_crud.cpp')
-rw-r--r-- | src/mongo/db/fle_crud.cpp | 91 |
1 files changed, 56 insertions, 35 deletions
diff --git a/src/mongo/db/fle_crud.cpp b/src/mongo/db/fle_crud.cpp index dd950743ba2..e8f15a1e8b5 100644 --- a/src/mongo/db/fle_crud.cpp +++ b/src/mongo/db/fle_crud.cpp @@ -270,29 +270,36 @@ std::pair<FLEBatchResult, write_ops::InsertCommandReply> processInsert( write_ops::DeleteCommandReply processDelete(OperationContext* opCtx, const write_ops::DeleteCommandRequest& deleteRequest, GetTxnCallback getTxns) { + { + auto deletes = deleteRequest.getDeletes(); + uassert(6371302, "Only single document deletes are permitted", deletes.size() == 1); - auto deletes = deleteRequest.getDeletes(); - uassert(6371302, "Only single document deletes are permitted", deletes.size() == 1); + auto deleteOpEntry = deletes[0]; - auto deleteOpEntry = deletes[0]; - - uassert( - 6371303, "FLE only supports single document deletes", deleteOpEntry.getMulti() == false); + uassert(6371303, + "FLE only supports single document deletes", + deleteOpEntry.getMulti() == false); + } std::shared_ptr<txn_api::SyncTransactionWithRetries> trun = getTxns(opCtx); auto reply = std::make_shared<write_ops::DeleteCommandReply>(); - auto expCtx = makeExpCtx(opCtx, deleteRequest, deleteOpEntry); + auto ownedRequest = deleteRequest.serialize({}); + auto ownedDeleteRequest = + write_ops::DeleteCommandRequest::parse(IDLParserErrorContext("delete"), ownedRequest); + auto ownedDeleteOpEntry = ownedDeleteRequest.getDeletes()[0]; + + auto expCtx = makeExpCtx(opCtx, ownedDeleteRequest, ownedDeleteOpEntry); // The function that handles the transaction may outlive this function so we need to use // shared_ptrs - auto deleteBlock = std::make_tuple(deleteRequest, expCtx); + auto deleteBlock = std::make_tuple(ownedDeleteRequest, expCtx); auto sharedDeleteBlock = std::make_shared<decltype(deleteBlock)>(deleteBlock); auto swResult = trun->runNoThrow( opCtx, - [sharedDeleteBlock, reply](const txn_api::TransactionClient& txnClient, - ExecutorPtr txnExec) { + [sharedDeleteBlock, ownedRequest, reply](const txn_api::TransactionClient& txnClient, + ExecutorPtr txnExec) { FLEQueryInterfaceImpl queryImpl(txnClient, getGlobalServiceContext()); auto [deleteRequest2, expCtx2] = *sharedDeleteBlock.get(); @@ -341,19 +348,23 @@ write_ops::UpdateCommandReply processUpdate(OperationContext* opCtx, const write_ops::UpdateCommandRequest& updateRequest, GetTxnCallback getTxns) { - auto updates = updateRequest.getUpdates(); - uassert(6371502, "Only single document updates are permitted", updates.size() == 1); + { + auto updates = updateRequest.getUpdates(); + uassert(6371502, "Only single document updates are permitted", updates.size() == 1); - auto updateOpEntry = updates[0]; + auto updateOpEntry = updates[0]; - uassert( - 6371503, "FLE only supports single document updates", updateOpEntry.getMulti() == false); + uassert(6371503, + "FLE only supports single document updates", + updateOpEntry.getMulti() == false); - // pipeline - is agg specific, delta is oplog, transform is internal (timeseries) - uassert(6371517, - "FLE only supports modifier and replacement style updates", - updateOpEntry.getU().type() == write_ops::UpdateModification::Type::kModifier || - updateOpEntry.getU().type() == write_ops::UpdateModification::Type::kReplacement); + // pipeline - is agg specific, delta is oplog, transform is internal (timeseries) + uassert(6371517, + "FLE only supports modifier and replacement style updates", + updateOpEntry.getU().type() == write_ops::UpdateModification::Type::kModifier || + updateOpEntry.getU().type() == + write_ops::UpdateModification::Type::kReplacement); + } std::shared_ptr<txn_api::SyncTransactionWithRetries> trun = getTxns(opCtx); @@ -361,14 +372,19 @@ write_ops::UpdateCommandReply processUpdate(OperationContext* opCtx, // shared_ptrs auto reply = std::make_shared<write_ops::UpdateCommandReply>(); - auto expCtx = makeExpCtx(opCtx, updateRequest, updateOpEntry); - auto updateBlock = std::make_tuple(updateRequest, expCtx); + auto ownedRequest = updateRequest.serialize({}); + auto ownedUpdateRequest = + write_ops::UpdateCommandRequest::parse(IDLParserErrorContext("update"), ownedRequest); + auto ownedUpdateOpEntry = ownedUpdateRequest.getUpdates()[0]; + + auto expCtx = makeExpCtx(opCtx, ownedUpdateRequest, ownedUpdateOpEntry); + auto updateBlock = std::make_tuple(ownedUpdateRequest, expCtx); auto sharedupdateBlock = std::make_shared<decltype(updateBlock)>(updateBlock); auto swResult = trun->runNoThrow( opCtx, - [sharedupdateBlock, reply](const txn_api::TransactionClient& txnClient, - ExecutorPtr txnExec) { + [sharedupdateBlock, reply, ownedRequest](const txn_api::TransactionClient& txnClient, + ExecutorPtr txnExec) { FLEQueryInterfaceImpl queryImpl(txnClient, getGlobalServiceContext()); auto [updateRequest2, expCtx2] = *sharedupdateBlock.get(); @@ -597,7 +613,7 @@ std::shared_ptr<write_ops::FindAndModifyCommandRequest> constructDefaultReply() } // namespace template <typename ReplyType> -StatusWith<ReplyType> processFindAndModifyRequest( +StatusWith<std::pair<ReplyType, OpMsgRequest>> processFindAndModifyRequest( OperationContext* opCtx, const write_ops::FindAndModifyCommandRequest& findAndModifyRequest, GetTxnCallback getTxns, @@ -629,18 +645,22 @@ StatusWith<ReplyType> processFindAndModifyRequest( std::shared_ptr<txn_api::SyncTransactionWithRetries> trun = getTxns(opCtx); - auto expCtx = makeExpCtx(opCtx, findAndModifyRequest, findAndModifyRequest); - // The function that handles the transaction may outlive this function so we need to use // shared_ptrs std::shared_ptr<ReplyType> reply = constructDefaultReply<ReplyType>(); - auto findAndModifyBlock = std::make_tuple(findAndModifyRequest, expCtx); + + auto ownedRequest = findAndModifyRequest.serialize({}); + auto ownedFindAndModifyRequest = write_ops::FindAndModifyCommandRequest::parse( + IDLParserErrorContext("findAndModify"), ownedRequest); + + auto expCtx = makeExpCtx(opCtx, ownedFindAndModifyRequest, ownedFindAndModifyRequest); + auto findAndModifyBlock = std::make_tuple(ownedFindAndModifyRequest, expCtx); auto sharedFindAndModifyBlock = std::make_shared<decltype(findAndModifyBlock)>(findAndModifyBlock); auto swResult = trun->runNoThrow( opCtx, - [sharedFindAndModifyBlock, reply, processCallback]( + [sharedFindAndModifyBlock, ownedRequest, reply, processCallback]( const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) { FLEQueryInterfaceImpl queryImpl(txnClient, getGlobalServiceContext()); @@ -667,17 +687,17 @@ StatusWith<ReplyType> processFindAndModifyRequest( return swResult.getValue().getEffectiveStatus(); } - return *reply; + return std::pair<ReplyType, OpMsgRequest>{*reply, ownedRequest}; } -template StatusWith<write_ops::FindAndModifyCommandReply> +template StatusWith<std::pair<write_ops::FindAndModifyCommandReply, OpMsgRequest>> processFindAndModifyRequest<write_ops::FindAndModifyCommandReply>( OperationContext* opCtx, const write_ops::FindAndModifyCommandRequest& findAndModifyRequest, GetTxnCallback getTxns, ProcessFindAndModifyCallback<write_ops::FindAndModifyCommandReply> processCallback); -template StatusWith<write_ops::FindAndModifyCommandRequest> +template StatusWith<std::pair<write_ops::FindAndModifyCommandRequest, OpMsgRequest>> processFindAndModifyRequest<write_ops::FindAndModifyCommandRequest>( OperationContext* opCtx, const write_ops::FindAndModifyCommandRequest& findAndModifyRequest, @@ -1136,15 +1156,16 @@ FLEBatchResult processFLEFindAndModify(OperationContext* opCtx, auto swReply = processFindAndModifyRequest<write_ops::FindAndModifyCommandReply>( opCtx, request, &getTransactionWithRetriesForMongoS); - auto reply = uassertStatusOK(swReply); + auto reply = uassertStatusOK(swReply).first; reply.serialize(&result); return FLEBatchResult::kProcessed; } -write_ops::FindAndModifyCommandRequest processFLEFindAndModifyExplainMongos( - OperationContext* opCtx, const write_ops::FindAndModifyCommandRequest& request) { +std::pair<write_ops::FindAndModifyCommandRequest, OpMsgRequest> +processFLEFindAndModifyExplainMongos(OperationContext* opCtx, + const write_ops::FindAndModifyCommandRequest& request) { tassert(6513400, "Missing encryptionInformation for findAndModify", request.getEncryptionInformation().has_value()); |