summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMark Benvenuto <mark.benvenuto@mongodb.com>2022-05-02 11:10:57 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-05-04 21:24:15 +0000
commit79942357f74e7b73823848aceda8a1af27043bfe (patch)
tree8946d114a1d0f057840716f2f343a4813d9bd8e5
parent4a8d40cb6e9faa3081a0198e5152ea35134d4de0 (diff)
downloadmongo-79942357f74e7b73823848aceda8a1af27043bfe.tar.gz
SERVER-65167 Remove usage of std:tie in fle_crud.cpp
(cherry picked from commit 3df96ded70db4da1da4c1a1f86feb7567c48c3cb)
-rw-r--r--jstests/libs/curop_helpers.js3
-rw-r--r--src/mongo/crypto/fle_field_schema.idl4
-rw-r--r--src/mongo/db/commands/fle2_compact.cpp36
-rw-r--r--src/mongo/db/fle_crud.cpp118
4 files changed, 106 insertions, 55 deletions
diff --git a/jstests/libs/curop_helpers.js b/jstests/libs/curop_helpers.js
index 5cbf2294f31..08c6e60f817 100644
--- a/jstests/libs/curop_helpers.js
+++ b/jstests/libs/curop_helpers.js
@@ -8,8 +8,9 @@ function waitForCurOpByFilter(db, filter, options = {}) {
return results.length > 0;
},
() => {
+ let allResults = adminDB.aggregate([{$currentOp: options}]).toArray();
return "Failed to find a matching op for filter: " + tojson(filter) +
- "in currentOp output: " + tojson(results);
+ "in currentOp output: " + tojson(allResults);
});
return results;
}
diff --git a/src/mongo/crypto/fle_field_schema.idl b/src/mongo/crypto/fle_field_schema.idl
index 55751df25ad..a1d3ef5394a 100644
--- a/src/mongo/crypto/fle_field_schema.idl
+++ b/src/mongo/crypto/fle_field_schema.idl
@@ -221,12 +221,12 @@ structs:
unstable: true
deleteTokens:
description: "A map of field paths to FLEDeletePayload"
- type: object
+ type: object_owned
optional: true
unstable: true
schema:
description: "A map of NamespaceString to EncryptedFieldConfig"
- type: object
+ type: object_owned
unstable: true
crudProcessed:
description: "A boolean to indicate whether the CRUD layer has already processed this FLE2 request. Used to prevent infinite recursion."
diff --git a/src/mongo/db/commands/fle2_compact.cpp b/src/mongo/db/commands/fle2_compact.cpp
index 240a77e32a1..ae034dc2d3e 100644
--- a/src/mongo/db/commands/fle2_compact.cpp
+++ b/src/mongo/db/commands/fle2_compact.cpp
@@ -31,6 +31,8 @@
#include "mongo/db/commands/fle2_compact.h"
+#include <memory>
+
#include "mongo/crypto/encryption_fields_gen.h"
#include "mongo/db/catalog/collection_catalog.h"
#include "mongo/db/commands/fle2_compact_gen.h"
@@ -588,9 +590,10 @@ CompactStats processFLECompact(OperationContext* opCtx,
const CompactStructuredEncryptionData& request,
GetTxnCallback getTxn,
const EncryptedStateCollectionsNamespaces& namespaces) {
- ECOCStats ecocStats;
- ECStats escStats, eccStats;
- stdx::unordered_set<ECOCCompactionDocument> c;
+ auto ecocStats = std::make_shared<ECOCStats>();
+ auto escStats = std::make_shared<ECStats>();
+ auto eccStats = std::make_shared<ECStats>();
+ auto c = std::make_shared<stdx::unordered_set<ECOCCompactionDocument>>();
// Read the ECOC documents in a transaction
{
@@ -598,17 +601,19 @@ CompactStats processFLECompact(OperationContext* opCtx,
// The function that handles the transaction may outlive this function so we need to use
// shared_ptrs
- auto argsBlock = std::tie(c, request, namespaces, ecocStats);
+ auto argsBlock = std::make_tuple(request, namespaces);
auto sharedBlock = std::make_shared<decltype(argsBlock)>(argsBlock);
auto swResult = trun->runNoThrow(
- opCtx, [sharedBlock](const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) {
+ opCtx,
+ [sharedBlock, c, ecocStats](const txn_api::TransactionClient& txnClient,
+ ExecutorPtr txnExec) {
FLEQueryInterfaceImpl queryImpl(txnClient);
- auto [c2, request2, namespaces2, ecocStats2] = *sharedBlock.get();
+ auto [request2, namespaces2] = *sharedBlock.get();
- c2 = getUniqueCompactionDocuments(
- &queryImpl, request2, namespaces2.ecocRenameNss, &ecocStats2);
+ *c = getUniqueCompactionDocuments(
+ &queryImpl, request2, namespaces2.ecocRenameNss, ecocStats.get());
return SemiFuture<void>::makeReady();
});
@@ -619,22 +624,25 @@ CompactStats processFLECompact(OperationContext* opCtx,
// Each entry in 'C' represents a unique field/value pair. For each field/value pair,
// compact the ESC & ECC entries for that field/value pair in one transaction.
- for (auto& ecocDoc : c) {
+ for (auto& ecocDoc : *c) {
// start a new transaction
std::shared_ptr<txn_api::SyncTransactionWithRetries> trun = getTxn(opCtx);
// The function that handles the transaction may outlive this function so we need to use
// shared_ptrs
- auto argsBlock = std::tie(ecocDoc, namespaces, escStats, eccStats);
+ auto argsBlock = std::make_tuple(ecocDoc, namespaces);
auto sharedBlock = std::make_shared<decltype(argsBlock)>(argsBlock);
auto swResult = trun->runNoThrow(
- opCtx, [sharedBlock](const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) {
+ opCtx,
+ [sharedBlock, escStats, eccStats](const txn_api::TransactionClient& txnClient,
+ ExecutorPtr txnExec) {
FLEQueryInterfaceImpl queryImpl(txnClient);
- auto [ecocDoc2, namespaces2, escStats2, eccStats2] = *sharedBlock.get();
+ auto [ecocDoc2, namespaces2] = *sharedBlock.get();
- compactOneFieldValuePair(&queryImpl, ecocDoc2, namespaces2, &escStats2, &eccStats2);
+ compactOneFieldValuePair(
+ &queryImpl, ecocDoc2, namespaces2, escStats.get(), eccStats.get());
return SemiFuture<void>::makeReady();
});
@@ -643,7 +651,7 @@ CompactStats processFLECompact(OperationContext* opCtx,
uassertStatusOK(swResult.getValue().getEffectiveStatus());
}
- CompactStats stats(ecocStats, eccStats, escStats);
+ CompactStats stats(*ecocStats, *eccStats, *escStats);
_fleCompactStatsStatusSection.updateStats(stats);
return stats;
diff --git a/src/mongo/db/fle_crud.cpp b/src/mongo/db/fle_crud.cpp
index 7cb29fddcc2..ead3e250605 100644
--- a/src/mongo/db/fle_crud.cpp
+++ b/src/mongo/db/fle_crud.cpp
@@ -31,6 +31,8 @@
#include "mongo/db/fle_crud.h"
+#include <memory>
+
#include "mongo/bson/bsonelement.h"
#include "mongo/bson/bsonmisc.h"
#include "mongo/bson/bsonobj.h"
@@ -54,9 +56,16 @@
#include "mongo/util/concurrency/thread_pool.h"
MONGO_FAIL_POINT_DEFINE(fleCrudHangInsert);
+MONGO_FAIL_POINT_DEFINE(fleCrudHangPreInsert);
+
MONGO_FAIL_POINT_DEFINE(fleCrudHangUpdate);
+MONGO_FAIL_POINT_DEFINE(fleCrudHangPreUpdate);
+
MONGO_FAIL_POINT_DEFINE(fleCrudHangDelete);
+MONGO_FAIL_POINT_DEFINE(fleCrudHangPreDelete);
+
MONGO_FAIL_POINT_DEFINE(fleCrudHangFindAndModify);
+MONGO_FAIL_POINT_DEFINE(fleCrudHangPreFindAndModify);
namespace mongo {
namespace {
@@ -191,7 +200,7 @@ std::pair<FLEBatchResult, write_ops::InsertCommandReply> processInsert(
FLEBatchResult::kNotProcessed, write_ops::InsertCommandReply()};
}
- write_ops::InsertCommandReply reply;
+ auto reply = std::make_shared<write_ops::InsertCommandReply>();
uint32_t stmtId = getStmtIdForWriteAt(insertRequest, 0);
@@ -200,18 +209,23 @@ std::pair<FLEBatchResult, write_ops::InsertCommandReply> processInsert(
// The function that handles the transaction may outlive this function so we need to use
// shared_ptrs since it runs on another thread
auto ownedDocument = document.getOwned();
- auto insertBlock = std::tie(edcNss, efc, serverPayload, reply, stmtId);
+ auto insertBlock = std::make_tuple(edcNss, efc, serverPayload, stmtId);
auto sharedInsertBlock = std::make_shared<decltype(insertBlock)>(insertBlock);
auto swResult = trun->runNoThrow(
opCtx,
- [sharedInsertBlock, ownedDocument](const txn_api::TransactionClient& txnClient,
- ExecutorPtr txnExec) {
+ [sharedInsertBlock, reply, ownedDocument](const txn_api::TransactionClient& txnClient,
+ ExecutorPtr txnExec) {
FLEQueryInterfaceImpl queryImpl(txnClient);
- auto [edcNss2, efc2, serverPayload2, reply2, stmtId2] = *sharedInsertBlock.get();
+ auto [edcNss2, efc2, serverPayload2, stmtId2] = *sharedInsertBlock.get();
+
+ if (MONGO_unlikely(fleCrudHangPreInsert.shouldFail())) {
+ LOGV2(6516701, "Hanging due to fleCrudHangPreInsert fail point");
+ fleCrudHangPreInsert.pauseWhileSet();
+ }
- reply2 = uassertStatusOK(processInsert(
+ *reply = uassertStatusOK(processInsert(
&queryImpl, edcNss2, *serverPayload2.get(), efc2, stmtId2, ownedDocument));
if (MONGO_unlikely(fleCrudHangInsert.shouldFail())) {
@@ -222,7 +236,7 @@ std::pair<FLEBatchResult, write_ops::InsertCommandReply> processInsert(
// If we have write errors but no unexpected internal errors, then we reach here
// If we have write errors, we need to return a failed status to ensure the txn client
// does not try to commit the transaction.
- if (reply2.getWriteErrors().has_value() && !reply2.getWriteErrors().value().empty()) {
+ if (reply->getWriteErrors().has_value() && !reply->getWriteErrors().value().empty()) {
return SemiFuture<void>::makeReady(
Status(ErrorCodes::FLETransactionAbort, "FLE2 write errors on insert"));
}
@@ -235,17 +249,17 @@ std::pair<FLEBatchResult, write_ops::InsertCommandReply> processInsert(
// InsertCommandReply with write errors so we should return that.
if (swResult.getStatus() == ErrorCodes::FLETransactionAbort) {
return std::pair<FLEBatchResult, write_ops::InsertCommandReply>{
- FLEBatchResult::kProcessed, reply};
+ FLEBatchResult::kProcessed, *reply};
}
- appendSingleStatusToWriteErrors(swResult.getStatus(), &reply.getWriteCommandReplyBase());
+ appendSingleStatusToWriteErrors(swResult.getStatus(), &reply->getWriteCommandReplyBase());
} else if (!swResult.getValue().getEffectiveStatus().isOK()) {
appendSingleStatusToWriteErrors(swResult.getValue().getEffectiveStatus(),
- &reply.getWriteCommandReplyBase());
+ &reply->getWriteCommandReplyBase());
}
return std::pair<FLEBatchResult, write_ops::InsertCommandReply>{FLEBatchResult::kProcessed,
- reply};
+ *reply};
}
write_ops::DeleteCommandReply processDelete(OperationContext* opCtx,
@@ -262,22 +276,29 @@ write_ops::DeleteCommandReply processDelete(OperationContext* opCtx,
std::shared_ptr<txn_api::SyncTransactionWithRetries> trun = getTxns(opCtx);
- write_ops::DeleteCommandReply reply;
+ auto reply = std::make_shared<write_ops::DeleteCommandReply>();
auto expCtx = makeExpCtx(opCtx, deleteRequest, deleteOpEntry);
// The function that handles the transaction may outlive this function so we need to use
// shared_ptrs
- auto deleteBlock = std::tie(deleteRequest, reply, expCtx);
+ auto deleteBlock = std::make_tuple(deleteRequest, expCtx);
auto sharedDeleteBlock = std::make_shared<decltype(deleteBlock)>(deleteBlock);
auto swResult = trun->runNoThrow(
opCtx,
- [sharedDeleteBlock](const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) {
+ [sharedDeleteBlock, reply](const txn_api::TransactionClient& txnClient,
+ ExecutorPtr txnExec) {
FLEQueryInterfaceImpl queryImpl(txnClient);
- auto [deleteRequest2, reply2, expCtx2] = *sharedDeleteBlock.get();
+ auto [deleteRequest2, expCtx2] = *sharedDeleteBlock.get();
- reply2 = processDelete(&queryImpl, expCtx2, deleteRequest2);
+ if (MONGO_unlikely(fleCrudHangPreDelete.shouldFail())) {
+ LOGV2(6516702, "Hanging due to fleCrudHangPreDelete fail point");
+ fleCrudHangPreDelete.pauseWhileSet();
+ }
+
+
+ *reply = processDelete(&queryImpl, expCtx2, deleteRequest2);
if (MONGO_unlikely(fleCrudHangDelete.shouldFail())) {
LOGV2(6371902, "Hanging due to fleCrudHangDelete fail point");
@@ -287,7 +308,7 @@ write_ops::DeleteCommandReply processDelete(OperationContext* opCtx,
// If we have write errors but no unexpected internal errors, then we reach here
// If we have write errors, we need to return a failed status to ensure the txn client
// does not try to commit the transaction.
- if (reply2.getWriteErrors().has_value() && !reply2.getWriteErrors().value().empty()) {
+ if (reply->getWriteErrors().has_value() && !reply->getWriteErrors().value().empty()) {
return SemiFuture<void>::makeReady(
Status(ErrorCodes::FLETransactionAbort, "FLE2 write errors on delete"));
}
@@ -299,16 +320,16 @@ write_ops::DeleteCommandReply processDelete(OperationContext* opCtx,
// FLETransactionAbort is used for control flow so it means we have a valid
// InsertCommandReply with write errors so we should return that.
if (swResult.getStatus() == ErrorCodes::FLETransactionAbort) {
- return reply;
+ return *reply;
}
- appendSingleStatusToWriteErrors(swResult.getStatus(), &reply.getWriteCommandReplyBase());
+ appendSingleStatusToWriteErrors(swResult.getStatus(), &reply->getWriteCommandReplyBase());
} else if (!swResult.getValue().getEffectiveStatus().isOK()) {
appendSingleStatusToWriteErrors(swResult.getValue().getEffectiveStatus(),
- &reply.getWriteCommandReplyBase());
+ &reply->getWriteCommandReplyBase());
}
- return reply;
+ return *reply;
}
write_ops::UpdateCommandReply processUpdate(OperationContext* opCtx,
@@ -333,19 +354,26 @@ write_ops::UpdateCommandReply processUpdate(OperationContext* opCtx,
// The function that handles the transaction may outlive this function so we need to use
// shared_ptrs
- write_ops::UpdateCommandReply reply;
+ auto reply = std::make_shared<write_ops::UpdateCommandReply>();
+
auto expCtx = makeExpCtx(opCtx, updateRequest, updateOpEntry);
- auto updateBlock = std::tie(updateRequest, reply, expCtx);
+ auto updateBlock = std::make_tuple(updateRequest, expCtx);
auto sharedupdateBlock = std::make_shared<decltype(updateBlock)>(updateBlock);
auto swResult = trun->runNoThrow(
opCtx,
- [sharedupdateBlock](const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) {
+ [sharedupdateBlock, reply](const txn_api::TransactionClient& txnClient,
+ ExecutorPtr txnExec) {
FLEQueryInterfaceImpl queryImpl(txnClient);
- auto [updateRequest2, reply2, expCtx2] = *sharedupdateBlock.get();
+ auto [updateRequest2, expCtx2] = *sharedupdateBlock.get();
+
+ if (MONGO_unlikely(fleCrudHangPreUpdate.shouldFail())) {
+ LOGV2(6516703, "Hanging due to fleCrudHangPreUpdate fail point");
+ fleCrudHangPreUpdate.pauseWhileSet();
+ }
- reply2 = processUpdate(&queryImpl, expCtx2, updateRequest2);
+ *reply = processUpdate(&queryImpl, expCtx2, updateRequest2);
if (MONGO_unlikely(fleCrudHangUpdate.shouldFail())) {
LOGV2(6371901, "Hanging due to fleCrudHangUpdate fail point");
@@ -355,7 +383,7 @@ write_ops::UpdateCommandReply processUpdate(OperationContext* opCtx,
// If we have write errors but no unexpected internal errors, then we reach here
// If we have write errors, we need to return a failed status to ensure the txn client
// does not try to commit the transaction.
- if (reply2.getWriteErrors().has_value() && !reply2.getWriteErrors().value().empty()) {
+ if (reply->getWriteErrors().has_value() && !reply->getWriteErrors().value().empty()) {
return SemiFuture<void>::makeReady(
Status(ErrorCodes::FLETransactionAbort, "FLE2 write errors on delete"));
}
@@ -367,16 +395,16 @@ write_ops::UpdateCommandReply processUpdate(OperationContext* opCtx,
// FLETransactionAbort is used for control flow so it means we have a valid
// InsertCommandReply with write errors so we should return that.
if (swResult.getStatus() == ErrorCodes::FLETransactionAbort) {
- return reply;
+ return *reply;
}
- appendSingleStatusToWriteErrors(swResult.getStatus(), &reply.getWriteCommandReplyBase());
+ appendSingleStatusToWriteErrors(swResult.getStatus(), &reply->getWriteCommandReplyBase());
} else if (!swResult.getValue().getEffectiveStatus().isOK()) {
appendSingleStatusToWriteErrors(swResult.getValue().getEffectiveStatus(),
- &reply.getWriteCommandReplyBase());
+ &reply->getWriteCommandReplyBase());
}
- return reply;
+ return *reply;
}
namespace {
@@ -551,6 +579,16 @@ void processRemovedFields(FLEQueryInterface* queryImpl,
}
}
+template <typename ReplyType>
+std::shared_ptr<ReplyType> constructDefaultReply() {
+ return std::make_shared<ReplyType>();
+}
+
+template <>
+std::shared_ptr<write_ops::FindAndModifyCommandRequest> constructDefaultReply() {
+ return std::make_shared<write_ops::FindAndModifyCommandRequest>(NamespaceString());
+}
+
} // namespace
template <typename ReplyType>
@@ -590,21 +628,25 @@ StatusWith<ReplyType> processFindAndModifyRequest(
// The function that handles the transaction may outlive this function so we need to use
// shared_ptrs
- std::shared_ptr<ReplyType> reply;
- auto findAndModifyBlock = std::tie(findAndModifyRequest, reply, expCtx);
+ std::shared_ptr<ReplyType> reply = constructDefaultReply<ReplyType>();
+ auto findAndModifyBlock = std::make_tuple(findAndModifyRequest, expCtx);
auto sharedFindAndModifyBlock =
std::make_shared<decltype(findAndModifyBlock)>(findAndModifyBlock);
auto swResult = trun->runNoThrow(
opCtx,
- [sharedFindAndModifyBlock, processCallback](const txn_api::TransactionClient& txnClient,
- ExecutorPtr txnExec) {
+ [sharedFindAndModifyBlock, reply, processCallback](
+ const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) {
FLEQueryInterfaceImpl queryImpl(txnClient);
- auto [findAndModifyRequest2, reply2, expCtx] = *sharedFindAndModifyBlock.get();
+ auto [findAndModifyRequest2, expCtx] = *sharedFindAndModifyBlock.get();
+
+ if (MONGO_unlikely(fleCrudHangPreFindAndModify.shouldFail())) {
+ LOGV2(6516704, "Hanging due to fleCrudHangPreFindAndModify fail point");
+ fleCrudHangPreFindAndModify.pauseWhileSet();
+ }
- reply2 = std::make_shared<ReplyType>(
- processCallback(expCtx, &queryImpl, findAndModifyRequest2));
+ *reply = processCallback(expCtx, &queryImpl, findAndModifyRequest2);
if (MONGO_unlikely(fleCrudHangFindAndModify.shouldFail())) {
LOGV2(6371900, "Hanging due to fleCrudHangFindAndModify fail point");