summaryrefslogtreecommitdiff
path: root/src/mongo/db/fle_crud.cpp
diff options
context:
space:
mode:
authorMark Benvenuto <mark.benvenuto@mongodb.com>2022-05-09 12:10:40 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-05-10 21:26:02 +0000
commitcc0335394bee5441350a36fd9351d5746a6000d8 (patch)
tree102494d25f5d39fe8d43fa05318966113c780df6 /src/mongo/db/fle_crud.cpp
parent15aa9218aaed0b522b6c672cac6324c2a15458f8 (diff)
downloadmongo-cc0335394bee5441350a36fd9351d5746a6000d8.tar.gz
SERVER-66317 Make owned bson copies for txn api threads
Diffstat (limited to 'src/mongo/db/fle_crud.cpp')
-rw-r--r--src/mongo/db/fle_crud.cpp91
1 files changed, 56 insertions, 35 deletions
diff --git a/src/mongo/db/fle_crud.cpp b/src/mongo/db/fle_crud.cpp
index dd950743ba2..e8f15a1e8b5 100644
--- a/src/mongo/db/fle_crud.cpp
+++ b/src/mongo/db/fle_crud.cpp
@@ -270,29 +270,36 @@ std::pair<FLEBatchResult, write_ops::InsertCommandReply> processInsert(
write_ops::DeleteCommandReply processDelete(OperationContext* opCtx,
const write_ops::DeleteCommandRequest& deleteRequest,
GetTxnCallback getTxns) {
+ {
+ auto deletes = deleteRequest.getDeletes();
+ uassert(6371302, "Only single document deletes are permitted", deletes.size() == 1);
- auto deletes = deleteRequest.getDeletes();
- uassert(6371302, "Only single document deletes are permitted", deletes.size() == 1);
+ auto deleteOpEntry = deletes[0];
- auto deleteOpEntry = deletes[0];
-
- uassert(
- 6371303, "FLE only supports single document deletes", deleteOpEntry.getMulti() == false);
+ uassert(6371303,
+ "FLE only supports single document deletes",
+ deleteOpEntry.getMulti() == false);
+ }
std::shared_ptr<txn_api::SyncTransactionWithRetries> trun = getTxns(opCtx);
auto reply = std::make_shared<write_ops::DeleteCommandReply>();
- auto expCtx = makeExpCtx(opCtx, deleteRequest, deleteOpEntry);
+ auto ownedRequest = deleteRequest.serialize({});
+ auto ownedDeleteRequest =
+ write_ops::DeleteCommandRequest::parse(IDLParserErrorContext("delete"), ownedRequest);
+ auto ownedDeleteOpEntry = ownedDeleteRequest.getDeletes()[0];
+
+ auto expCtx = makeExpCtx(opCtx, ownedDeleteRequest, ownedDeleteOpEntry);
// The function that handles the transaction may outlive this function so we need to use
// shared_ptrs
- auto deleteBlock = std::make_tuple(deleteRequest, expCtx);
+ auto deleteBlock = std::make_tuple(ownedDeleteRequest, expCtx);
auto sharedDeleteBlock = std::make_shared<decltype(deleteBlock)>(deleteBlock);
auto swResult = trun->runNoThrow(
opCtx,
- [sharedDeleteBlock, reply](const txn_api::TransactionClient& txnClient,
- ExecutorPtr txnExec) {
+ [sharedDeleteBlock, ownedRequest, reply](const txn_api::TransactionClient& txnClient,
+ ExecutorPtr txnExec) {
FLEQueryInterfaceImpl queryImpl(txnClient, getGlobalServiceContext());
auto [deleteRequest2, expCtx2] = *sharedDeleteBlock.get();
@@ -341,19 +348,23 @@ write_ops::UpdateCommandReply processUpdate(OperationContext* opCtx,
const write_ops::UpdateCommandRequest& updateRequest,
GetTxnCallback getTxns) {
- auto updates = updateRequest.getUpdates();
- uassert(6371502, "Only single document updates are permitted", updates.size() == 1);
+ {
+ auto updates = updateRequest.getUpdates();
+ uassert(6371502, "Only single document updates are permitted", updates.size() == 1);
- auto updateOpEntry = updates[0];
+ auto updateOpEntry = updates[0];
- uassert(
- 6371503, "FLE only supports single document updates", updateOpEntry.getMulti() == false);
+ uassert(6371503,
+ "FLE only supports single document updates",
+ updateOpEntry.getMulti() == false);
- // pipeline - is agg specific, delta is oplog, transform is internal (timeseries)
- uassert(6371517,
- "FLE only supports modifier and replacement style updates",
- updateOpEntry.getU().type() == write_ops::UpdateModification::Type::kModifier ||
- updateOpEntry.getU().type() == write_ops::UpdateModification::Type::kReplacement);
+ // pipeline - is agg specific, delta is oplog, transform is internal (timeseries)
+ uassert(6371517,
+ "FLE only supports modifier and replacement style updates",
+ updateOpEntry.getU().type() == write_ops::UpdateModification::Type::kModifier ||
+ updateOpEntry.getU().type() ==
+ write_ops::UpdateModification::Type::kReplacement);
+ }
std::shared_ptr<txn_api::SyncTransactionWithRetries> trun = getTxns(opCtx);
@@ -361,14 +372,19 @@ write_ops::UpdateCommandReply processUpdate(OperationContext* opCtx,
// shared_ptrs
auto reply = std::make_shared<write_ops::UpdateCommandReply>();
- auto expCtx = makeExpCtx(opCtx, updateRequest, updateOpEntry);
- auto updateBlock = std::make_tuple(updateRequest, expCtx);
+ auto ownedRequest = updateRequest.serialize({});
+ auto ownedUpdateRequest =
+ write_ops::UpdateCommandRequest::parse(IDLParserErrorContext("update"), ownedRequest);
+ auto ownedUpdateOpEntry = ownedUpdateRequest.getUpdates()[0];
+
+ auto expCtx = makeExpCtx(opCtx, ownedUpdateRequest, ownedUpdateOpEntry);
+ auto updateBlock = std::make_tuple(ownedUpdateRequest, expCtx);
auto sharedupdateBlock = std::make_shared<decltype(updateBlock)>(updateBlock);
auto swResult = trun->runNoThrow(
opCtx,
- [sharedupdateBlock, reply](const txn_api::TransactionClient& txnClient,
- ExecutorPtr txnExec) {
+ [sharedupdateBlock, reply, ownedRequest](const txn_api::TransactionClient& txnClient,
+ ExecutorPtr txnExec) {
FLEQueryInterfaceImpl queryImpl(txnClient, getGlobalServiceContext());
auto [updateRequest2, expCtx2] = *sharedupdateBlock.get();
@@ -597,7 +613,7 @@ std::shared_ptr<write_ops::FindAndModifyCommandRequest> constructDefaultReply()
} // namespace
template <typename ReplyType>
-StatusWith<ReplyType> processFindAndModifyRequest(
+StatusWith<std::pair<ReplyType, OpMsgRequest>> processFindAndModifyRequest(
OperationContext* opCtx,
const write_ops::FindAndModifyCommandRequest& findAndModifyRequest,
GetTxnCallback getTxns,
@@ -629,18 +645,22 @@ StatusWith<ReplyType> processFindAndModifyRequest(
std::shared_ptr<txn_api::SyncTransactionWithRetries> trun = getTxns(opCtx);
- auto expCtx = makeExpCtx(opCtx, findAndModifyRequest, findAndModifyRequest);
-
// The function that handles the transaction may outlive this function so we need to use
// shared_ptrs
std::shared_ptr<ReplyType> reply = constructDefaultReply<ReplyType>();
- auto findAndModifyBlock = std::make_tuple(findAndModifyRequest, expCtx);
+
+ auto ownedRequest = findAndModifyRequest.serialize({});
+ auto ownedFindAndModifyRequest = write_ops::FindAndModifyCommandRequest::parse(
+ IDLParserErrorContext("findAndModify"), ownedRequest);
+
+ auto expCtx = makeExpCtx(opCtx, ownedFindAndModifyRequest, ownedFindAndModifyRequest);
+ auto findAndModifyBlock = std::make_tuple(ownedFindAndModifyRequest, expCtx);
auto sharedFindAndModifyBlock =
std::make_shared<decltype(findAndModifyBlock)>(findAndModifyBlock);
auto swResult = trun->runNoThrow(
opCtx,
- [sharedFindAndModifyBlock, reply, processCallback](
+ [sharedFindAndModifyBlock, ownedRequest, reply, processCallback](
const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) {
FLEQueryInterfaceImpl queryImpl(txnClient, getGlobalServiceContext());
@@ -667,17 +687,17 @@ StatusWith<ReplyType> processFindAndModifyRequest(
return swResult.getValue().getEffectiveStatus();
}
- return *reply;
+ return std::pair<ReplyType, OpMsgRequest>{*reply, ownedRequest};
}
-template StatusWith<write_ops::FindAndModifyCommandReply>
+template StatusWith<std::pair<write_ops::FindAndModifyCommandReply, OpMsgRequest>>
processFindAndModifyRequest<write_ops::FindAndModifyCommandReply>(
OperationContext* opCtx,
const write_ops::FindAndModifyCommandRequest& findAndModifyRequest,
GetTxnCallback getTxns,
ProcessFindAndModifyCallback<write_ops::FindAndModifyCommandReply> processCallback);
-template StatusWith<write_ops::FindAndModifyCommandRequest>
+template StatusWith<std::pair<write_ops::FindAndModifyCommandRequest, OpMsgRequest>>
processFindAndModifyRequest<write_ops::FindAndModifyCommandRequest>(
OperationContext* opCtx,
const write_ops::FindAndModifyCommandRequest& findAndModifyRequest,
@@ -1136,15 +1156,16 @@ FLEBatchResult processFLEFindAndModify(OperationContext* opCtx,
auto swReply = processFindAndModifyRequest<write_ops::FindAndModifyCommandReply>(
opCtx, request, &getTransactionWithRetriesForMongoS);
- auto reply = uassertStatusOK(swReply);
+ auto reply = uassertStatusOK(swReply).first;
reply.serialize(&result);
return FLEBatchResult::kProcessed;
}
-write_ops::FindAndModifyCommandRequest processFLEFindAndModifyExplainMongos(
- OperationContext* opCtx, const write_ops::FindAndModifyCommandRequest& request) {
+std::pair<write_ops::FindAndModifyCommandRequest, OpMsgRequest>
+processFLEFindAndModifyExplainMongos(OperationContext* opCtx,
+ const write_ops::FindAndModifyCommandRequest& request) {
tassert(6513400,
"Missing encryptionInformation for findAndModify",
request.getEncryptionInformation().has_value());