diff options
author | Mark Benvenuto <mark.benvenuto@mongodb.com> | 2022-05-02 11:10:57 -0400 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-05-04 21:24:15 +0000 |
commit | 79942357f74e7b73823848aceda8a1af27043bfe (patch) | |
tree | 8946d114a1d0f057840716f2f343a4813d9bd8e5 | |
parent | 4a8d40cb6e9faa3081a0198e5152ea35134d4de0 (diff) | |
download | mongo-79942357f74e7b73823848aceda8a1af27043bfe.tar.gz |
SERVER-65167 Remove usage of std:tie in fle_crud.cpp
(cherry picked from commit 3df96ded70db4da1da4c1a1f86feb7567c48c3cb)
-rw-r--r-- | jstests/libs/curop_helpers.js | 3 | ||||
-rw-r--r-- | src/mongo/crypto/fle_field_schema.idl | 4 | ||||
-rw-r--r-- | src/mongo/db/commands/fle2_compact.cpp | 36 | ||||
-rw-r--r-- | src/mongo/db/fle_crud.cpp | 118 |
4 files changed, 106 insertions, 55 deletions
diff --git a/jstests/libs/curop_helpers.js b/jstests/libs/curop_helpers.js index 5cbf2294f31..08c6e60f817 100644 --- a/jstests/libs/curop_helpers.js +++ b/jstests/libs/curop_helpers.js @@ -8,8 +8,9 @@ function waitForCurOpByFilter(db, filter, options = {}) { return results.length > 0; }, () => { + let allResults = adminDB.aggregate([{$currentOp: options}]).toArray(); return "Failed to find a matching op for filter: " + tojson(filter) + - "in currentOp output: " + tojson(results); + "in currentOp output: " + tojson(allResults); }); return results; } diff --git a/src/mongo/crypto/fle_field_schema.idl b/src/mongo/crypto/fle_field_schema.idl index 55751df25ad..a1d3ef5394a 100644 --- a/src/mongo/crypto/fle_field_schema.idl +++ b/src/mongo/crypto/fle_field_schema.idl @@ -221,12 +221,12 @@ structs: unstable: true deleteTokens: description: "A map of field paths to FLEDeletePayload" - type: object + type: object_owned optional: true unstable: true schema: description: "A map of NamespaceString to EncryptedFieldConfig" - type: object + type: object_owned unstable: true crudProcessed: description: "A boolean to indicate whether the CRUD layer has already processed this FLE2 request. Used to prevent infinite recursion." diff --git a/src/mongo/db/commands/fle2_compact.cpp b/src/mongo/db/commands/fle2_compact.cpp index 240a77e32a1..ae034dc2d3e 100644 --- a/src/mongo/db/commands/fle2_compact.cpp +++ b/src/mongo/db/commands/fle2_compact.cpp @@ -31,6 +31,8 @@ #include "mongo/db/commands/fle2_compact.h" +#include <memory> + #include "mongo/crypto/encryption_fields_gen.h" #include "mongo/db/catalog/collection_catalog.h" #include "mongo/db/commands/fle2_compact_gen.h" @@ -588,9 +590,10 @@ CompactStats processFLECompact(OperationContext* opCtx, const CompactStructuredEncryptionData& request, GetTxnCallback getTxn, const EncryptedStateCollectionsNamespaces& namespaces) { - ECOCStats ecocStats; - ECStats escStats, eccStats; - stdx::unordered_set<ECOCCompactionDocument> c; + auto ecocStats = std::make_shared<ECOCStats>(); + auto escStats = std::make_shared<ECStats>(); + auto eccStats = std::make_shared<ECStats>(); + auto c = std::make_shared<stdx::unordered_set<ECOCCompactionDocument>>(); // Read the ECOC documents in a transaction { @@ -598,17 +601,19 @@ CompactStats processFLECompact(OperationContext* opCtx, // The function that handles the transaction may outlive this function so we need to use // shared_ptrs - auto argsBlock = std::tie(c, request, namespaces, ecocStats); + auto argsBlock = std::make_tuple(request, namespaces); auto sharedBlock = std::make_shared<decltype(argsBlock)>(argsBlock); auto swResult = trun->runNoThrow( - opCtx, [sharedBlock](const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) { + opCtx, + [sharedBlock, c, ecocStats](const txn_api::TransactionClient& txnClient, + ExecutorPtr txnExec) { FLEQueryInterfaceImpl queryImpl(txnClient); - auto [c2, request2, namespaces2, ecocStats2] = *sharedBlock.get(); + auto [request2, namespaces2] = *sharedBlock.get(); - c2 = getUniqueCompactionDocuments( - &queryImpl, request2, namespaces2.ecocRenameNss, &ecocStats2); + *c = getUniqueCompactionDocuments( + &queryImpl, request2, namespaces2.ecocRenameNss, ecocStats.get()); return SemiFuture<void>::makeReady(); }); @@ -619,22 +624,25 @@ CompactStats processFLECompact(OperationContext* opCtx, // Each entry in 'C' represents a unique field/value pair. For each field/value pair, // compact the ESC & ECC entries for that field/value pair in one transaction. - for (auto& ecocDoc : c) { + for (auto& ecocDoc : *c) { // start a new transaction std::shared_ptr<txn_api::SyncTransactionWithRetries> trun = getTxn(opCtx); // The function that handles the transaction may outlive this function so we need to use // shared_ptrs - auto argsBlock = std::tie(ecocDoc, namespaces, escStats, eccStats); + auto argsBlock = std::make_tuple(ecocDoc, namespaces); auto sharedBlock = std::make_shared<decltype(argsBlock)>(argsBlock); auto swResult = trun->runNoThrow( - opCtx, [sharedBlock](const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) { + opCtx, + [sharedBlock, escStats, eccStats](const txn_api::TransactionClient& txnClient, + ExecutorPtr txnExec) { FLEQueryInterfaceImpl queryImpl(txnClient); - auto [ecocDoc2, namespaces2, escStats2, eccStats2] = *sharedBlock.get(); + auto [ecocDoc2, namespaces2] = *sharedBlock.get(); - compactOneFieldValuePair(&queryImpl, ecocDoc2, namespaces2, &escStats2, &eccStats2); + compactOneFieldValuePair( + &queryImpl, ecocDoc2, namespaces2, escStats.get(), eccStats.get()); return SemiFuture<void>::makeReady(); }); @@ -643,7 +651,7 @@ CompactStats processFLECompact(OperationContext* opCtx, uassertStatusOK(swResult.getValue().getEffectiveStatus()); } - CompactStats stats(ecocStats, eccStats, escStats); + CompactStats stats(*ecocStats, *eccStats, *escStats); _fleCompactStatsStatusSection.updateStats(stats); return stats; diff --git a/src/mongo/db/fle_crud.cpp b/src/mongo/db/fle_crud.cpp index 7cb29fddcc2..ead3e250605 100644 --- a/src/mongo/db/fle_crud.cpp +++ b/src/mongo/db/fle_crud.cpp @@ -31,6 +31,8 @@ #include "mongo/db/fle_crud.h" +#include <memory> + #include "mongo/bson/bsonelement.h" #include "mongo/bson/bsonmisc.h" #include "mongo/bson/bsonobj.h" @@ -54,9 +56,16 @@ #include "mongo/util/concurrency/thread_pool.h" MONGO_FAIL_POINT_DEFINE(fleCrudHangInsert); +MONGO_FAIL_POINT_DEFINE(fleCrudHangPreInsert); + MONGO_FAIL_POINT_DEFINE(fleCrudHangUpdate); +MONGO_FAIL_POINT_DEFINE(fleCrudHangPreUpdate); + MONGO_FAIL_POINT_DEFINE(fleCrudHangDelete); +MONGO_FAIL_POINT_DEFINE(fleCrudHangPreDelete); + MONGO_FAIL_POINT_DEFINE(fleCrudHangFindAndModify); +MONGO_FAIL_POINT_DEFINE(fleCrudHangPreFindAndModify); namespace mongo { namespace { @@ -191,7 +200,7 @@ std::pair<FLEBatchResult, write_ops::InsertCommandReply> processInsert( FLEBatchResult::kNotProcessed, write_ops::InsertCommandReply()}; } - write_ops::InsertCommandReply reply; + auto reply = std::make_shared<write_ops::InsertCommandReply>(); uint32_t stmtId = getStmtIdForWriteAt(insertRequest, 0); @@ -200,18 +209,23 @@ std::pair<FLEBatchResult, write_ops::InsertCommandReply> processInsert( // The function that handles the transaction may outlive this function so we need to use // shared_ptrs since it runs on another thread auto ownedDocument = document.getOwned(); - auto insertBlock = std::tie(edcNss, efc, serverPayload, reply, stmtId); + auto insertBlock = std::make_tuple(edcNss, efc, serverPayload, stmtId); auto sharedInsertBlock = std::make_shared<decltype(insertBlock)>(insertBlock); auto swResult = trun->runNoThrow( opCtx, - [sharedInsertBlock, ownedDocument](const txn_api::TransactionClient& txnClient, - ExecutorPtr txnExec) { + [sharedInsertBlock, reply, ownedDocument](const txn_api::TransactionClient& txnClient, + ExecutorPtr txnExec) { FLEQueryInterfaceImpl queryImpl(txnClient); - auto [edcNss2, efc2, serverPayload2, reply2, stmtId2] = *sharedInsertBlock.get(); + auto [edcNss2, efc2, serverPayload2, stmtId2] = *sharedInsertBlock.get(); + + if (MONGO_unlikely(fleCrudHangPreInsert.shouldFail())) { + LOGV2(6516701, "Hanging due to fleCrudHangPreInsert fail point"); + fleCrudHangPreInsert.pauseWhileSet(); + } - reply2 = uassertStatusOK(processInsert( + *reply = uassertStatusOK(processInsert( &queryImpl, edcNss2, *serverPayload2.get(), efc2, stmtId2, ownedDocument)); if (MONGO_unlikely(fleCrudHangInsert.shouldFail())) { @@ -222,7 +236,7 @@ std::pair<FLEBatchResult, write_ops::InsertCommandReply> processInsert( // If we have write errors but no unexpected internal errors, then we reach here // If we have write errors, we need to return a failed status to ensure the txn client // does not try to commit the transaction. - if (reply2.getWriteErrors().has_value() && !reply2.getWriteErrors().value().empty()) { + if (reply->getWriteErrors().has_value() && !reply->getWriteErrors().value().empty()) { return SemiFuture<void>::makeReady( Status(ErrorCodes::FLETransactionAbort, "FLE2 write errors on insert")); } @@ -235,17 +249,17 @@ std::pair<FLEBatchResult, write_ops::InsertCommandReply> processInsert( // InsertCommandReply with write errors so we should return that. if (swResult.getStatus() == ErrorCodes::FLETransactionAbort) { return std::pair<FLEBatchResult, write_ops::InsertCommandReply>{ - FLEBatchResult::kProcessed, reply}; + FLEBatchResult::kProcessed, *reply}; } - appendSingleStatusToWriteErrors(swResult.getStatus(), &reply.getWriteCommandReplyBase()); + appendSingleStatusToWriteErrors(swResult.getStatus(), &reply->getWriteCommandReplyBase()); } else if (!swResult.getValue().getEffectiveStatus().isOK()) { appendSingleStatusToWriteErrors(swResult.getValue().getEffectiveStatus(), - &reply.getWriteCommandReplyBase()); + &reply->getWriteCommandReplyBase()); } return std::pair<FLEBatchResult, write_ops::InsertCommandReply>{FLEBatchResult::kProcessed, - reply}; + *reply}; } write_ops::DeleteCommandReply processDelete(OperationContext* opCtx, @@ -262,22 +276,29 @@ write_ops::DeleteCommandReply processDelete(OperationContext* opCtx, std::shared_ptr<txn_api::SyncTransactionWithRetries> trun = getTxns(opCtx); - write_ops::DeleteCommandReply reply; + auto reply = std::make_shared<write_ops::DeleteCommandReply>(); auto expCtx = makeExpCtx(opCtx, deleteRequest, deleteOpEntry); // The function that handles the transaction may outlive this function so we need to use // shared_ptrs - auto deleteBlock = std::tie(deleteRequest, reply, expCtx); + auto deleteBlock = std::make_tuple(deleteRequest, expCtx); auto sharedDeleteBlock = std::make_shared<decltype(deleteBlock)>(deleteBlock); auto swResult = trun->runNoThrow( opCtx, - [sharedDeleteBlock](const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) { + [sharedDeleteBlock, reply](const txn_api::TransactionClient& txnClient, + ExecutorPtr txnExec) { FLEQueryInterfaceImpl queryImpl(txnClient); - auto [deleteRequest2, reply2, expCtx2] = *sharedDeleteBlock.get(); + auto [deleteRequest2, expCtx2] = *sharedDeleteBlock.get(); - reply2 = processDelete(&queryImpl, expCtx2, deleteRequest2); + if (MONGO_unlikely(fleCrudHangPreDelete.shouldFail())) { + LOGV2(6516702, "Hanging due to fleCrudHangPreDelete fail point"); + fleCrudHangPreDelete.pauseWhileSet(); + } + + + *reply = processDelete(&queryImpl, expCtx2, deleteRequest2); if (MONGO_unlikely(fleCrudHangDelete.shouldFail())) { LOGV2(6371902, "Hanging due to fleCrudHangDelete fail point"); @@ -287,7 +308,7 @@ write_ops::DeleteCommandReply processDelete(OperationContext* opCtx, // If we have write errors but no unexpected internal errors, then we reach here // If we have write errors, we need to return a failed status to ensure the txn client // does not try to commit the transaction. - if (reply2.getWriteErrors().has_value() && !reply2.getWriteErrors().value().empty()) { + if (reply->getWriteErrors().has_value() && !reply->getWriteErrors().value().empty()) { return SemiFuture<void>::makeReady( Status(ErrorCodes::FLETransactionAbort, "FLE2 write errors on delete")); } @@ -299,16 +320,16 @@ write_ops::DeleteCommandReply processDelete(OperationContext* opCtx, // FLETransactionAbort is used for control flow so it means we have a valid // InsertCommandReply with write errors so we should return that. if (swResult.getStatus() == ErrorCodes::FLETransactionAbort) { - return reply; + return *reply; } - appendSingleStatusToWriteErrors(swResult.getStatus(), &reply.getWriteCommandReplyBase()); + appendSingleStatusToWriteErrors(swResult.getStatus(), &reply->getWriteCommandReplyBase()); } else if (!swResult.getValue().getEffectiveStatus().isOK()) { appendSingleStatusToWriteErrors(swResult.getValue().getEffectiveStatus(), - &reply.getWriteCommandReplyBase()); + &reply->getWriteCommandReplyBase()); } - return reply; + return *reply; } write_ops::UpdateCommandReply processUpdate(OperationContext* opCtx, @@ -333,19 +354,26 @@ write_ops::UpdateCommandReply processUpdate(OperationContext* opCtx, // The function that handles the transaction may outlive this function so we need to use // shared_ptrs - write_ops::UpdateCommandReply reply; + auto reply = std::make_shared<write_ops::UpdateCommandReply>(); + auto expCtx = makeExpCtx(opCtx, updateRequest, updateOpEntry); - auto updateBlock = std::tie(updateRequest, reply, expCtx); + auto updateBlock = std::make_tuple(updateRequest, expCtx); auto sharedupdateBlock = std::make_shared<decltype(updateBlock)>(updateBlock); auto swResult = trun->runNoThrow( opCtx, - [sharedupdateBlock](const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) { + [sharedupdateBlock, reply](const txn_api::TransactionClient& txnClient, + ExecutorPtr txnExec) { FLEQueryInterfaceImpl queryImpl(txnClient); - auto [updateRequest2, reply2, expCtx2] = *sharedupdateBlock.get(); + auto [updateRequest2, expCtx2] = *sharedupdateBlock.get(); + + if (MONGO_unlikely(fleCrudHangPreUpdate.shouldFail())) { + LOGV2(6516703, "Hanging due to fleCrudHangPreUpdate fail point"); + fleCrudHangPreUpdate.pauseWhileSet(); + } - reply2 = processUpdate(&queryImpl, expCtx2, updateRequest2); + *reply = processUpdate(&queryImpl, expCtx2, updateRequest2); if (MONGO_unlikely(fleCrudHangUpdate.shouldFail())) { LOGV2(6371901, "Hanging due to fleCrudHangUpdate fail point"); @@ -355,7 +383,7 @@ write_ops::UpdateCommandReply processUpdate(OperationContext* opCtx, // If we have write errors but no unexpected internal errors, then we reach here // If we have write errors, we need to return a failed status to ensure the txn client // does not try to commit the transaction. - if (reply2.getWriteErrors().has_value() && !reply2.getWriteErrors().value().empty()) { + if (reply->getWriteErrors().has_value() && !reply->getWriteErrors().value().empty()) { return SemiFuture<void>::makeReady( Status(ErrorCodes::FLETransactionAbort, "FLE2 write errors on delete")); } @@ -367,16 +395,16 @@ write_ops::UpdateCommandReply processUpdate(OperationContext* opCtx, // FLETransactionAbort is used for control flow so it means we have a valid // InsertCommandReply with write errors so we should return that. if (swResult.getStatus() == ErrorCodes::FLETransactionAbort) { - return reply; + return *reply; } - appendSingleStatusToWriteErrors(swResult.getStatus(), &reply.getWriteCommandReplyBase()); + appendSingleStatusToWriteErrors(swResult.getStatus(), &reply->getWriteCommandReplyBase()); } else if (!swResult.getValue().getEffectiveStatus().isOK()) { appendSingleStatusToWriteErrors(swResult.getValue().getEffectiveStatus(), - &reply.getWriteCommandReplyBase()); + &reply->getWriteCommandReplyBase()); } - return reply; + return *reply; } namespace { @@ -551,6 +579,16 @@ void processRemovedFields(FLEQueryInterface* queryImpl, } } +template <typename ReplyType> +std::shared_ptr<ReplyType> constructDefaultReply() { + return std::make_shared<ReplyType>(); +} + +template <> +std::shared_ptr<write_ops::FindAndModifyCommandRequest> constructDefaultReply() { + return std::make_shared<write_ops::FindAndModifyCommandRequest>(NamespaceString()); +} + } // namespace template <typename ReplyType> @@ -590,21 +628,25 @@ StatusWith<ReplyType> processFindAndModifyRequest( // The function that handles the transaction may outlive this function so we need to use // shared_ptrs - std::shared_ptr<ReplyType> reply; - auto findAndModifyBlock = std::tie(findAndModifyRequest, reply, expCtx); + std::shared_ptr<ReplyType> reply = constructDefaultReply<ReplyType>(); + auto findAndModifyBlock = std::make_tuple(findAndModifyRequest, expCtx); auto sharedFindAndModifyBlock = std::make_shared<decltype(findAndModifyBlock)>(findAndModifyBlock); auto swResult = trun->runNoThrow( opCtx, - [sharedFindAndModifyBlock, processCallback](const txn_api::TransactionClient& txnClient, - ExecutorPtr txnExec) { + [sharedFindAndModifyBlock, reply, processCallback]( + const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) { FLEQueryInterfaceImpl queryImpl(txnClient); - auto [findAndModifyRequest2, reply2, expCtx] = *sharedFindAndModifyBlock.get(); + auto [findAndModifyRequest2, expCtx] = *sharedFindAndModifyBlock.get(); + + if (MONGO_unlikely(fleCrudHangPreFindAndModify.shouldFail())) { + LOGV2(6516704, "Hanging due to fleCrudHangPreFindAndModify fail point"); + fleCrudHangPreFindAndModify.pauseWhileSet(); + } - reply2 = std::make_shared<ReplyType>( - processCallback(expCtx, &queryImpl, findAndModifyRequest2)); + *reply = processCallback(expCtx, &queryImpl, findAndModifyRequest2); if (MONGO_unlikely(fleCrudHangFindAndModify.shouldFail())) { LOGV2(6371900, "Hanging due to fleCrudHangFindAndModify fail point"); |