summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorMark Benvenuto <mark.benvenuto@mongodb.com>2022-05-09 12:10:40 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-05-11 15:00:19 +0000
commit669b63d34bdf887e083081630084b41edeef40d2 (patch)
treea74dcd1b57e0c41429f555450b6d5811b67be8b7 /src/mongo/db
parent7bfa67c5120992a4d85bea225716b8d539329488 (diff)
downloadmongo-669b63d34bdf887e083081630084b41edeef40d2.tar.gz
SERVER-66317 Make owned bson copies for txn api threads
(cherry picked from commit cc0335394bee5441350a36fd9351d5746a6000d8)
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/commands/find_and_modify.cpp5
-rw-r--r--src/mongo/db/fle_crud.cpp101
-rw-r--r--src/mongo/db/fle_crud.h22
-rw-r--r--src/mongo/db/fle_crud_mongod.cpp7
4 files changed, 94 insertions, 41 deletions
diff --git a/src/mongo/db/commands/find_and_modify.cpp b/src/mongo/db/commands/find_and_modify.cpp
index fb8f0b13468..00203a4c485 100644
--- a/src/mongo/db/commands/find_and_modify.cpp
+++ b/src/mongo/db/commands/find_and_modify.cpp
@@ -550,14 +550,15 @@ void CmdFindAndModify::Invocation::explain(OperationContext* opCtx,
const BSONObj& cmdObj = this->request().toBSON(BSONObj() /* commandPassthroughFields */);
validate(this->request());
- auto request = [&]() {
+ auto requestAndMsg = [&]() {
if (this->request().getEncryptionInformation().has_value() &&
!this->request().getEncryptionInformation()->getCrudProcessed().get_value_or(false)) {
return processFLEFindAndModifyExplainMongod(opCtx, this->request());
} else {
- return this->request();
+ return std::pair{this->request(), OpMsgRequest()};
}
}();
+ auto request = requestAndMsg.first;
const NamespaceString& nsString = request.getNamespace();
uassertStatusOK(userAllowedWriteNS(opCtx, nsString));
diff --git a/src/mongo/db/fle_crud.cpp b/src/mongo/db/fle_crud.cpp
index 12d7a3be6ca..3cdaa83e8a8 100644
--- a/src/mongo/db/fle_crud.cpp
+++ b/src/mongo/db/fle_crud.cpp
@@ -268,29 +268,36 @@ std::pair<FLEBatchResult, write_ops::InsertCommandReply> processInsert(
write_ops::DeleteCommandReply processDelete(OperationContext* opCtx,
const write_ops::DeleteCommandRequest& deleteRequest,
GetTxnCallback getTxns) {
+ {
+ auto deletes = deleteRequest.getDeletes();
+ uassert(6371302, "Only single document deletes are permitted", deletes.size() == 1);
- auto deletes = deleteRequest.getDeletes();
- uassert(6371302, "Only single document deletes are permitted", deletes.size() == 1);
+ auto deleteOpEntry = deletes[0];
- auto deleteOpEntry = deletes[0];
-
- uassert(
- 6371303, "FLE only supports single document deletes", deleteOpEntry.getMulti() == false);
+ uassert(6371303,
+ "FLE only supports single document deletes",
+ deleteOpEntry.getMulti() == false);
+ }
std::shared_ptr<txn_api::SyncTransactionWithRetries> trun = getTxns(opCtx);
auto reply = std::make_shared<write_ops::DeleteCommandReply>();
- auto expCtx = makeExpCtx(opCtx, deleteRequest, deleteOpEntry);
+ auto ownedRequest = deleteRequest.serialize({});
+ auto ownedDeleteRequest =
+ write_ops::DeleteCommandRequest::parse(IDLParserErrorContext("delete"), ownedRequest);
+ auto ownedDeleteOpEntry = ownedDeleteRequest.getDeletes()[0];
+
+ auto expCtx = makeExpCtx(opCtx, ownedDeleteRequest, ownedDeleteOpEntry);
// The function that handles the transaction may outlive this function so we need to use
// shared_ptrs
- auto deleteBlock = std::make_tuple(deleteRequest, expCtx);
+ auto deleteBlock = std::make_tuple(ownedDeleteRequest, expCtx);
auto sharedDeleteBlock = std::make_shared<decltype(deleteBlock)>(deleteBlock);
auto swResult = trun->runNoThrow(
opCtx,
- [sharedDeleteBlock, reply](const txn_api::TransactionClient& txnClient,
- ExecutorPtr txnExec) {
+ [sharedDeleteBlock, ownedRequest, reply](const txn_api::TransactionClient& txnClient,
+ ExecutorPtr txnExec) {
FLEQueryInterfaceImpl queryImpl(txnClient, getGlobalServiceContext());
auto [deleteRequest2, expCtx2] = *sharedDeleteBlock.get();
@@ -339,19 +346,23 @@ write_ops::UpdateCommandReply processUpdate(OperationContext* opCtx,
const write_ops::UpdateCommandRequest& updateRequest,
GetTxnCallback getTxns) {
- auto updates = updateRequest.getUpdates();
- uassert(6371502, "Only single document updates are permitted", updates.size() == 1);
+ {
+ auto updates = updateRequest.getUpdates();
+ uassert(6371502, "Only single document updates are permitted", updates.size() == 1);
- auto updateOpEntry = updates[0];
+ auto updateOpEntry = updates[0];
- uassert(
- 6371503, "FLE only supports single document updates", updateOpEntry.getMulti() == false);
+ uassert(6371503,
+ "FLE only supports single document updates",
+ updateOpEntry.getMulti() == false);
- // pipeline - is agg specific, delta is oplog, transform is internal (timeseries)
- uassert(6371517,
- "FLE only supports modifier and replacement style updates",
- updateOpEntry.getU().type() == write_ops::UpdateModification::Type::kModifier ||
- updateOpEntry.getU().type() == write_ops::UpdateModification::Type::kReplacement);
+ // pipeline - is agg specific, delta is oplog, transform is internal (timeseries)
+ uassert(6371517,
+ "FLE only supports modifier and replacement style updates",
+ updateOpEntry.getU().type() == write_ops::UpdateModification::Type::kModifier ||
+ updateOpEntry.getU().type() ==
+ write_ops::UpdateModification::Type::kReplacement);
+ }
std::shared_ptr<txn_api::SyncTransactionWithRetries> trun = getTxns(opCtx);
@@ -359,14 +370,19 @@ write_ops::UpdateCommandReply processUpdate(OperationContext* opCtx,
// shared_ptrs
auto reply = std::make_shared<write_ops::UpdateCommandReply>();
- auto expCtx = makeExpCtx(opCtx, updateRequest, updateOpEntry);
- auto updateBlock = std::make_tuple(updateRequest, expCtx);
+ auto ownedRequest = updateRequest.serialize({});
+ auto ownedUpdateRequest =
+ write_ops::UpdateCommandRequest::parse(IDLParserErrorContext("update"), ownedRequest);
+ auto ownedUpdateOpEntry = ownedUpdateRequest.getUpdates()[0];
+
+ auto expCtx = makeExpCtx(opCtx, ownedUpdateRequest, ownedUpdateOpEntry);
+ auto updateBlock = std::make_tuple(ownedUpdateRequest, expCtx);
auto sharedupdateBlock = std::make_shared<decltype(updateBlock)>(updateBlock);
auto swResult = trun->runNoThrow(
opCtx,
- [sharedupdateBlock, reply](const txn_api::TransactionClient& txnClient,
- ExecutorPtr txnExec) {
+ [sharedupdateBlock, reply, ownedRequest](const txn_api::TransactionClient& txnClient,
+ ExecutorPtr txnExec) {
FLEQueryInterfaceImpl queryImpl(txnClient, getGlobalServiceContext());
auto [updateRequest2, expCtx2] = *sharedupdateBlock.get();
@@ -595,7 +611,7 @@ std::shared_ptr<write_ops::FindAndModifyCommandRequest> constructDefaultReply()
} // namespace
template <typename ReplyType>
-StatusWith<ReplyType> processFindAndModifyRequest(
+StatusWith<std::pair<ReplyType, OpMsgRequest>> processFindAndModifyRequest(
OperationContext* opCtx,
const write_ops::FindAndModifyCommandRequest& findAndModifyRequest,
GetTxnCallback getTxns,
@@ -627,18 +643,22 @@ StatusWith<ReplyType> processFindAndModifyRequest(
std::shared_ptr<txn_api::SyncTransactionWithRetries> trun = getTxns(opCtx);
- auto expCtx = makeExpCtx(opCtx, findAndModifyRequest, findAndModifyRequest);
-
// The function that handles the transaction may outlive this function so we need to use
// shared_ptrs
std::shared_ptr<ReplyType> reply = constructDefaultReply<ReplyType>();
- auto findAndModifyBlock = std::make_tuple(findAndModifyRequest, expCtx);
+
+ auto ownedRequest = findAndModifyRequest.serialize({});
+ auto ownedFindAndModifyRequest = write_ops::FindAndModifyCommandRequest::parse(
+ IDLParserErrorContext("findAndModify"), ownedRequest);
+
+ auto expCtx = makeExpCtx(opCtx, ownedFindAndModifyRequest, ownedFindAndModifyRequest);
+ auto findAndModifyBlock = std::make_tuple(ownedFindAndModifyRequest, expCtx);
auto sharedFindAndModifyBlock =
std::make_shared<decltype(findAndModifyBlock)>(findAndModifyBlock);
auto swResult = trun->runNoThrow(
opCtx,
- [sharedFindAndModifyBlock, reply, processCallback](
+ [sharedFindAndModifyBlock, ownedRequest, reply, processCallback](
const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) {
FLEQueryInterfaceImpl queryImpl(txnClient, getGlobalServiceContext());
@@ -665,9 +685,23 @@ StatusWith<ReplyType> processFindAndModifyRequest(
return swResult.getValue().getEffectiveStatus();
}
- return *reply;
+ return std::pair<ReplyType, OpMsgRequest>{*reply, ownedRequest};
}
+template StatusWith<std::pair<write_ops::FindAndModifyCommandReply, OpMsgRequest>>
+processFindAndModifyRequest<write_ops::FindAndModifyCommandReply>(
+ OperationContext* opCtx,
+ const write_ops::FindAndModifyCommandRequest& findAndModifyRequest,
+ GetTxnCallback getTxns,
+ ProcessFindAndModifyCallback<write_ops::FindAndModifyCommandReply> processCallback);
+
+template StatusWith<std::pair<write_ops::FindAndModifyCommandRequest, OpMsgRequest>>
+processFindAndModifyRequest<write_ops::FindAndModifyCommandRequest>(
+ OperationContext* opCtx,
+ const write_ops::FindAndModifyCommandRequest& findAndModifyRequest,
+ GetTxnCallback getTxns,
+ ProcessFindAndModifyCallback<write_ops::FindAndModifyCommandRequest> processCallback);
+
FLEQueryInterface::~FLEQueryInterface() {}
StatusWith<write_ops::InsertCommandReply> processInsert(
@@ -1120,15 +1154,16 @@ FLEBatchResult processFLEFindAndModify(OperationContext* opCtx,
auto swReply = processFindAndModifyRequest<write_ops::FindAndModifyCommandReply>(
opCtx, request, &getTransactionWithRetriesForMongoS);
- auto reply = uassertStatusOK(swReply);
+ auto reply = uassertStatusOK(swReply).first;
reply.serialize(&result);
return FLEBatchResult::kProcessed;
}
-write_ops::FindAndModifyCommandRequest processFLEFindAndModifyExplainMongos(
- OperationContext* opCtx, const write_ops::FindAndModifyCommandRequest& request) {
+std::pair<write_ops::FindAndModifyCommandRequest, OpMsgRequest>
+processFLEFindAndModifyExplainMongos(OperationContext* opCtx,
+ const write_ops::FindAndModifyCommandRequest& request) {
tassert(6513400,
"Missing encryptionInformation for findAndModify",
request.getEncryptionInformation().has_value());
diff --git a/src/mongo/db/fle_crud.h b/src/mongo/db/fle_crud.h
index 0d5531fdb7f..738e85b8996 100644
--- a/src/mongo/db/fle_crud.h
+++ b/src/mongo/db/fle_crud.h
@@ -45,6 +45,7 @@
#include "mongo/db/server_options.h"
#include "mongo/db/service_context.h"
#include "mongo/db/transaction_api.h"
+#include "mongo/rpc/op_msg.h"
#include "mongo/s/write_ops/batch_write_exec.h"
#include "mongo/s/write_ops/batched_command_response.h"
@@ -151,7 +152,8 @@ FLEBatchResult processFLEFindAndModify(OperationContext* opCtx,
const BSONObj& cmdObj,
BSONObjBuilder& result);
-write_ops::FindAndModifyCommandRequest processFLEFindAndModifyExplainMongos(
+std::pair<write_ops::FindAndModifyCommandRequest, OpMsgRequest>
+processFLEFindAndModifyExplainMongos(
OperationContext* opCtx, const write_ops::FindAndModifyCommandRequest& findAndModifyRequest);
/**
@@ -160,7 +162,8 @@ write_ops::FindAndModifyCommandRequest processFLEFindAndModifyExplainMongos(
write_ops::FindAndModifyCommandReply processFLEFindAndModify(
OperationContext* opCtx, const write_ops::FindAndModifyCommandRequest& findAndModifyRequest);
-write_ops::FindAndModifyCommandRequest processFLEFindAndModifyExplainMongod(
+std::pair<write_ops::FindAndModifyCommandRequest, OpMsgRequest>
+processFLEFindAndModifyExplainMongod(
OperationContext* opCtx, const write_ops::FindAndModifyCommandRequest& findAndModifyRequest);
/**
@@ -462,12 +465,25 @@ using ProcessFindAndModifyCallback =
const write_ops::FindAndModifyCommandRequest& findAndModifyRequest)>;
template <typename ReplyType>
-StatusWith<ReplyType> processFindAndModifyRequest(
+StatusWith<std::pair<ReplyType, OpMsgRequest>> processFindAndModifyRequest(
OperationContext* opCtx,
const write_ops::FindAndModifyCommandRequest& findAndModifyRequest,
GetTxnCallback getTxns,
ProcessFindAndModifyCallback<ReplyType> processCallback = processFindAndModify);
+extern template StatusWith<std::pair<write_ops::FindAndModifyCommandReply, OpMsgRequest>>
+processFindAndModifyRequest<write_ops::FindAndModifyCommandReply>(
+ OperationContext* opCtx,
+ const write_ops::FindAndModifyCommandRequest& findAndModifyRequest,
+ GetTxnCallback getTxns,
+ ProcessFindAndModifyCallback<write_ops::FindAndModifyCommandReply> processCallback);
+
+extern template StatusWith<std::pair<write_ops::FindAndModifyCommandRequest, OpMsgRequest>>
+processFindAndModifyRequest<write_ops::FindAndModifyCommandRequest>(
+ OperationContext* opCtx,
+ const write_ops::FindAndModifyCommandRequest& findAndModifyRequest,
+ GetTxnCallback getTxns,
+ ProcessFindAndModifyCallback<write_ops::FindAndModifyCommandRequest> processCallback);
write_ops::UpdateCommandReply processUpdate(OperationContext* opCtx,
const write_ops::UpdateCommandRequest& updateRequest,
diff --git a/src/mongo/db/fle_crud_mongod.cpp b/src/mongo/db/fle_crud_mongod.cpp
index 1221a8a572a..d198a35f398 100644
--- a/src/mongo/db/fle_crud_mongod.cpp
+++ b/src/mongo/db/fle_crud_mongod.cpp
@@ -227,7 +227,7 @@ write_ops::FindAndModifyCommandReply processFLEFindAndModify(
auto reply = processFindAndModifyRequest<write_ops::FindAndModifyCommandReply>(
opCtx, findAndModifyRequest, &getTransactionWithRetriesForMongoD);
- return uassertStatusOK(reply);
+ return uassertStatusOK(reply).first;
}
write_ops::UpdateCommandReply processFLEUpdate(
@@ -282,8 +282,9 @@ BSONObj processFLEWriteExplainD(OperationContext* opCtx,
return fle::rewriteQuery(opCtx, expCtx, nss, info, query, &getTransactionWithRetriesForMongoD);
}
-write_ops::FindAndModifyCommandRequest processFLEFindAndModifyExplainMongod(
- OperationContext* opCtx, const write_ops::FindAndModifyCommandRequest& request) {
+std::pair<write_ops::FindAndModifyCommandRequest, OpMsgRequest>
+processFLEFindAndModifyExplainMongod(OperationContext* opCtx,
+ const write_ops::FindAndModifyCommandRequest& request) {
tassert(6513401,
"Missing encryptionInformation for findAndModify",
request.getEncryptionInformation().has_value());