diff options
author | Mark Benvenuto <mark.benvenuto@mongodb.com> | 2022-03-21 15:56:36 -0400 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-03-21 22:44:49 +0000 |
commit | 823364aca6a005271833384a28bea6a5a920cb6f (patch) | |
tree | 794d7a2e9d601b47ff6bd3faf930546d845a2a4a /src | |
parent | dd3239281d72d42aa48c268794446f5a18730e37 (diff) | |
download | mongo-823364aca6a005271833384a28bea6a5a920cb6f.tar.gz |
SERVER-63716 Add support for FLE 2 insert in MongoD
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/base/error_codes.yml | 1 | ||||
-rw-r--r-- | src/mongo/db/SConscript | 25 | ||||
-rw-r--r-- | src/mongo/db/commands/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/commands/write_commands.cpp | 9 | ||||
-rw-r--r-- | src/mongo/db/fle_crud.cpp | 235 | ||||
-rw-r--r-- | src/mongo/db/fle_crud.h | 46 | ||||
-rw-r--r-- | src/mongo/db/fle_crud_mongod.cpp | 192 | ||||
-rw-r--r-- | src/mongo/db/fle_crud_test.cpp | 19 | ||||
-rw-r--r-- | src/mongo/db/mongod_main.cpp | 6 |
9 files changed, 451 insertions, 83 deletions
diff --git a/src/mongo/base/error_codes.yml b/src/mongo/base/error_codes.yml index a908025a078..351a708abe4 100644 --- a/src/mongo/base/error_codes.yml +++ b/src/mongo/base/error_codes.yml @@ -475,6 +475,7 @@ error_codes: - {code: 367, name: FLECompactionPlaceholder} - {code: 368, name: FLEStateCollectionContention} + - {code: 369, name: FLETransactionAbort} # Error codes 4000-8999 are reserved. diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 1eaa1dbe0b0..955c9fc5c6b 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -861,8 +861,11 @@ env.Library( source=[ 'fle_crud.cpp', ], - LIBDEPS_PRIVATE=[ + LIBDEPS=[ '$BUILD_DIR/mongo/crypto/encrypted_field_config', + 'transaction_api', + ], + LIBDEPS_PRIVATE=[ '$BUILD_DIR/mongo/crypto/fle_crypto', '$BUILD_DIR/mongo/db/ops/write_ops_parsers', '$BUILD_DIR/mongo/db/query/query_request', @@ -870,7 +873,24 @@ env.Library( '$BUILD_DIR/mongo/s/grid', '$BUILD_DIR/mongo/s/sharding_router_api', 'logical_session_id', - 'transaction_api', + ], +) + +env.Library( + target='fle_crud_mongod', + source=[ + 'fle_crud_mongod.cpp', + ], + LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/crypto/encrypted_field_config', + '$BUILD_DIR/mongo/crypto/fle_crypto', + '$BUILD_DIR/mongo/db/ops/write_ops_parsers', + '$BUILD_DIR/mongo/db/query/query_request', + '$BUILD_DIR/mongo/db/repl/repl_coordinator_interface', + '$BUILD_DIR/mongo/executor/task_executor_pool', + 'fle_crud', + 'logical_session_id', + 'transaction', ], ) @@ -2416,6 +2436,7 @@ env.Library( 'concurrency/flow_control_ticketholder', 'concurrency/lock_manager', 'fcv_op_observer', + 'fle_crud_mongod', 'free_mon/free_mon_mongod', 'ftdc/ftdc_mongod', 'index/index_access_method_factory', diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript index 3c6fee64ed5..50467ca097b 100644 --- a/src/mongo/db/commands/SConscript +++ b/src/mongo/db/commands/SConscript @@ -396,6 +396,7 @@ env.Library( '$BUILD_DIR/mongo/db/concurrency/write_conflict_exception', '$BUILD_DIR/mongo/db/curop_failpoint_helpers', '$BUILD_DIR/mongo/db/exec/sbe/query_sbe_abt', + '$BUILD_DIR/mongo/db/fle_crud_mongod', '$BUILD_DIR/mongo/db/index_builds_coordinator_interface', '$BUILD_DIR/mongo/db/index_commands_idl', '$BUILD_DIR/mongo/db/multitenancy', diff --git a/src/mongo/db/commands/write_commands.cpp b/src/mongo/db/commands/write_commands.cpp index d562d21a00e..9068023447a 100644 --- a/src/mongo/db/commands/write_commands.cpp +++ b/src/mongo/db/commands/write_commands.cpp @@ -43,6 +43,7 @@ #include "mongo/db/commands/write_commands_common.h" #include "mongo/db/curop.h" #include "mongo/db/db_raii.h" +#include "mongo/db/fle_crud.h" #include "mongo/db/json.h" #include "mongo/db/matcher/doc_validation_error.h" #include "mongo/db/matcher/extensions_callback_real.h" @@ -524,6 +525,14 @@ public: write_ops::InsertCommandReply typedRun(OperationContext* opCtx) final try { transactionChecks(opCtx, ns()); + if (request().getEncryptionInformation().has_value()) { + write_ops::InsertCommandReply insertReply; + auto batch = processFLEInsert(opCtx, request(), &insertReply); + if (batch == FLEBatchResult::kProcessed) { + return insertReply; + } + } + if (isTimeseries(opCtx, request())) { // Re-throw parsing exceptions to be consistent with CmdInsert::Invocation's // constructor. diff --git a/src/mongo/db/fle_crud.cpp b/src/mongo/db/fle_crud.cpp index bbfda99161e..ad72f827dc3 100644 --- a/src/mongo/db/fle_crud.cpp +++ b/src/mongo/db/fle_crud.cpp @@ -52,6 +52,7 @@ #include "mongo/s/grid.h" #include "mongo/s/transaction_router_resource_yielder.h" #include "mongo/s/write_ops/batch_write_exec.h" +#include "mongo/s/write_ops/write_error_detail.h" #include "mongo/util/assert_util.h" #include "mongo/util/concurrency/thread_pool.h" @@ -66,7 +67,9 @@ public: uint64_t countDocuments(const NamespaceString& nss) final; - void insertDocument(const NamespaceString& nss, BSONObj obj, bool translateDuplicateKey) final; + StatusWith<write_ops::InsertCommandReply> insertDocument(const NamespaceString& nss, + BSONObj obj, + bool translateDuplicateKey) final; BSONObj deleteWithPreimage(const NamespaceString& nss, const EncryptionInformation& ei, @@ -155,9 +158,71 @@ uint64_t FLEQueryInterfaceImpl::countDocuments(const NamespaceString& nss) { return docCount; } -void FLEQueryInterfaceImpl::insertDocument(const NamespaceString& nss, - BSONObj obj, - bool translateDuplicateKey) { +write_ops::WriteError writeErrorDetailToWriteError(const WriteErrorDetail* detail) { + return write_ops::WriteError(detail->getIndex(), detail->toStatus()); +} + + +std::vector<write_ops::WriteError> writeErrorsDetailToWriteErrors( + const std::vector<WriteErrorDetail*>& details) { + std::vector<write_ops::WriteError> errors; + errors.reserve(details.size()); + for (const auto& detail : details) { + errors.push_back(writeErrorDetailToWriteError(detail)); + } + return errors; +} + +std::vector<write_ops::WriteError> singleStatusToWriteErrors(const Status& status) { + std::vector<write_ops::WriteError> errors; + + errors.push_back(write_ops::WriteError(0, status)); + + return errors; +} + +void appendSingleStatusToWriteErrors(const Status& status, + write_ops::WriteCommandReplyBase* replyBase) { + std::vector<write_ops::WriteError> errors; + + if (replyBase->getWriteErrors()) { + errors = std::move(replyBase->getWriteErrors().value()); + } + + errors.push_back(write_ops::WriteError(0, status)); + + replyBase->setWriteErrors(errors); +} + +void writeErrorsToWriteDetails(const std::vector<write_ops::WriteError>& errors, + BatchedCommandResponse* response) { + for (const auto& error : errors) { + auto detail = std::make_unique<WriteErrorDetail>(); + detail->setIndex(error.getIndex()); + detail->setStatus(error.getStatus()); + + response->addToErrDetails(detail.release()); + } +} + +void replyToResponse(write_ops::WriteCommandReplyBase* replyBase, + BatchedCommandResponse* response) { + response->setStatus(Status::OK()); + response->setN(replyBase->getN()); + if (replyBase->getElectionId()) { + response->setElectionId(replyBase->getElectionId().value()); + } + if (replyBase->getOpTime()) { + response->setLastOp(replyBase->getOpTime().value()); + } + if (replyBase->getWriteErrors().has_value()) { + writeErrorsToWriteDetails(replyBase->getWriteErrors().value(), response); + } +} + + +StatusWith<write_ops::InsertCommandReply> FLEQueryInterfaceImpl::insertDocument( + const NamespaceString& nss, BSONObj obj, bool translateDuplicateKey) { write_ops::InsertCommandRequest insertRequest(nss); insertRequest.setDocuments({obj}); // TODO SERVER-64143 - insertRequest.setWriteConcern @@ -167,10 +232,28 @@ void FLEQueryInterfaceImpl::insertDocument(const NamespaceString& nss, auto status = response.toStatus(); if (translateDuplicateKey && status.code() == ErrorCodes::DuplicateKey) { - uassertStatusOK(Status(ErrorCodes::FLEStateCollectionContention, status.reason())); + return Status(ErrorCodes::FLEStateCollectionContention, status.reason()); } - uassertStatusOK(status); + write_ops::InsertCommandReply reply; + + if (response.isLastOpSet()) { + reply.getWriteCommandReplyBase().setOpTime(response.getLastOp()); + } + + if (response.isElectionIdSet()) { + reply.getWriteCommandReplyBase().setElectionId(response.getElectionId()); + } + + reply.getWriteCommandReplyBase().setN(response.getN()); + if (response.isErrDetailsSet()) { + reply.getWriteCommandReplyBase().setWriteErrors( + writeErrorsDetailToWriteErrors(response.getErrDetails())); + } + + reply.getWriteCommandReplyBase().setRetriedStmtIds(reply.getRetriedStmtIds()); + + return {reply}; } BSONObj FLEQueryInterfaceImpl::deleteWithPreimage( @@ -344,10 +427,21 @@ StatusWith<txn_api::CommitResult> runInTxnWithRetry( } } -StatusWith<FLEBatchResult> processInsert( + +std::shared_ptr<txn_api::TransactionWithRetries> getTransactionWithRetriesForMongoS( + OperationContext* opCtx) { + return std::make_shared<txn_api::TransactionWithRetries>( + opCtx, + Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(), + TransactionRouterResourceYielder::make()); +} + +} // namespace + +std::pair<FLEBatchResult, write_ops::InsertCommandReply> processInsert( OperationContext* opCtx, const write_ops::InsertCommandRequest& insertRequest, - std::function<std::shared_ptr<txn_api::TransactionWithRetries>(OperationContext*)> getTxns) { + GetTxnCallback getTxns) { auto documents = insertRequest.getDocuments(); // TODO - how to check if a document will be too large??? @@ -359,20 +453,22 @@ StatusWith<FLEBatchResult> processInsert( if (serverPayload->size() == 0) { // No actual FLE2 indexed fields - return FLEBatchResult::kNotProcessed; + return std::pair<FLEBatchResult, write_ops::InsertCommandReply>{ + FLEBatchResult::kNotProcessed, write_ops::InsertCommandReply()}; } auto ei = insertRequest.getEncryptionInformation().get(); auto edcNss = insertRequest.getNamespace(); auto efc = EncryptionInformationHelpers::getAndValidateSchema(insertRequest.getNamespace(), ei); + write_ops::InsertCommandReply reply; std::shared_ptr<txn_api::TransactionWithRetries> trun = getTxns(opCtx); // The function that handles the transaction may outlive this function so we need to use // shared_ptrs since it runs on another thread auto ownedDocument = document.getOwned(); - auto insertBlock = std::tie(edcNss, efc, serverPayload); + auto insertBlock = std::tie(edcNss, efc, serverPayload, reply); auto sharedInsertBlock = std::make_shared<decltype(insertBlock)>(insertBlock); auto swResult = runInTxnWithRetry( @@ -382,23 +478,40 @@ StatusWith<FLEBatchResult> processInsert( ExecutorPtr txnExec) { FLEQueryInterfaceImpl queryImpl(txnClient); - auto [edcNss2, efc2, serverPayload2] = *sharedInsertBlock.get(); + auto [edcNss2, efc2, serverPayload2, reply2] = *sharedInsertBlock.get(); - processInsert(&queryImpl, edcNss2, *serverPayload2.get(), efc2, ownedDocument); + reply2 = uassertStatusOK( + processInsert(&queryImpl, edcNss2, *serverPayload2.get(), efc2, ownedDocument)); + + // If we have write errors but no unexpected internal errors, then we reach here + // If we have write errors, we need to return a failed status to ensure the txn client + // does not try to commit the transaction. + if (reply2.getWriteErrors().has_value() && !reply2.getWriteErrors().value().empty()) { + return SemiFuture<void>::makeReady( + Status(ErrorCodes::FLETransactionAbort, "FLE2 write errors on insert")); + } return SemiFuture<void>::makeReady(); }); + if (!swResult.isOK()) { - return swResult.getStatus(); + // FLETransactionAbort is used for control flow so it means we have a valid + // InsertCommandReply with write errors so we should return that. + if (swResult.getStatus() == ErrorCodes::FLETransactionAbort) { + return std::pair<FLEBatchResult, write_ops::InsertCommandReply>{ + FLEBatchResult::kProcessed, reply}; + } + + appendSingleStatusToWriteErrors(swResult.getStatus(), &reply.getWriteCommandReplyBase()); } - return FLEBatchResult::kProcessed; + return std::pair<FLEBatchResult, write_ops::InsertCommandReply>{FLEBatchResult::kProcessed, + reply}; } -StatusWith<uint64_t> processDelete( - OperationContext* opCtx, - const write_ops::DeleteCommandRequest& deleteRequest, - std::function<std::shared_ptr<txn_api::TransactionWithRetries>(OperationContext*)> getTxns) { +StatusWith<uint64_t> processDelete(OperationContext* opCtx, + const write_ops::DeleteCommandRequest& deleteRequest, + GetTxnCallback getTxns) { auto deletes = deleteRequest.getDeletes(); uassert(6371302, "Only single document deletes are permitted", deletes.size() == 1); @@ -436,10 +549,9 @@ StatusWith<uint64_t> processDelete( return count; } -StatusWith<uint64_t> processUpdate( - OperationContext* opCtx, - const write_ops::UpdateCommandRequest& updateRequest, - std::function<std::shared_ptr<txn_api::TransactionWithRetries>(OperationContext*)> getTxns) { +StatusWith<uint64_t> processUpdate(OperationContext* opCtx, + const write_ops::UpdateCommandRequest& updateRequest, + GetTxnCallback getTxns) { auto updates = updateRequest.getUpdates(); uassert(6371502, "Only single document updates are permitted", updates.size() == 1); @@ -483,6 +595,7 @@ StatusWith<uint64_t> processUpdate( return count; } +namespace { void processFieldsForInsert(FLEQueryInterface* queryImpl, const NamespaceString& edcNss, std::vector<EDCServerPayloadInfo>& serverPayload, @@ -538,18 +651,22 @@ void processFieldsForInsert(FLEQueryInterface* queryImpl, payload.count = count; - queryImpl->insertDocument( + auto escInsertReply = uassertStatusOK(queryImpl->insertDocument( nssEsc, ESCCollection::generateInsertDocument(tagToken, valueToken, position, count), - true); + true)); + checkWriteErrors(escInsertReply); + NamespaceString nssEcoc(edcNss.db(), efc.getEcocCollection().get()); // TODO - should we make this a batch of ECOC updates? - queryImpl->insertDocument(nssEcoc, - ECOCCollection::generateDocument( - payload.fieldPathName, payload.payload.getEncryptedTokens()), - false); + auto ecocInsertReply = uassertStatusOK(queryImpl->insertDocument( + nssEcoc, + ECOCCollection::generateDocument(payload.fieldPathName, + payload.payload.getEncryptedTokens()), + false)); + checkWriteErrors(ecocInsertReply); } } @@ -624,20 +741,22 @@ void processRemovedFields(FLEQueryInterface* queryImpl, index = alpha.value() + 1; } - queryImpl->insertDocument( + auto eccInsertReply = uassertStatusOK(queryImpl->insertDocument( nssEcc, ECCCollection::generateDocument(tagToken, valueToken, index, plainTextField.count), - true); + true)); + checkWriteErrors(eccInsertReply); NamespaceString nssEcoc(edcNss.db(), efc.getEcocCollection().get()); // TODO - make this a batch of ECOC updates? EncryptedStateCollectionTokens tokens(plainTextField.esc, plainTextField.ecc); auto encryptedTokens = uassertStatusOK(tokens.serialize(deleteToken.ecocToken)); - queryImpl->insertDocument( + auto ecocInsertReply = uassertStatusOK(queryImpl->insertDocument( nssEcoc, ECOCCollection::generateDocument(deletedField.fieldPathName, encryptedTokens), - false); + false)); + checkWriteErrors(ecocInsertReply); } } @@ -696,17 +815,19 @@ StatusWith<write_ops::FindAndModifyCommandReply> processFindAndModifyRequest( FLEQueryInterface::~FLEQueryInterface() {} -void processInsert(FLEQueryInterface* queryImpl, - const NamespaceString& edcNss, - std::vector<EDCServerPayloadInfo>& serverPayload, - const EncryptedFieldConfig& efc, - BSONObj document) { + +StatusWith<write_ops::InsertCommandReply> processInsert( + FLEQueryInterface* queryImpl, + const NamespaceString& edcNss, + std::vector<EDCServerPayloadInfo>& serverPayload, + const EncryptedFieldConfig& efc, + BSONObj document) { processFieldsForInsert(queryImpl, edcNss, serverPayload, efc); auto finalDoc = EDCServerCollection::finalizeForInsert(document, serverPayload); - queryImpl->insertDocument(edcNss, finalDoc, false); + return queryImpl->insertDocument(edcNss, finalDoc, false); } uint64_t processDelete(FLEQueryInterface* queryImpl, @@ -824,34 +945,23 @@ FLEBatchResult processFLEBatch(OperationContext* opCtx, uasserted(6371209, "Feature flag FLE2 is not enabled"); } - auto getTxn = [](OperationContext* opCtx) { - return std::make_shared<txn_api::TransactionWithRetries>( - opCtx, - Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(), - TransactionRouterResourceYielder::make()); - }; - if (request.getBatchType() == BatchedCommandRequest::BatchType_Insert) { auto insertRequest = request.getInsertRequest(); - auto swResult = processInsert(opCtx, insertRequest, getTxn); - - if (!swResult.isOK()) { - response->setStatus(swResult.getStatus()); - response->setN(0); - - return FLEBatchResult::kProcessed; - } else if (swResult.getValue() == FLEBatchResult::kProcessed) { - response->setStatus(Status::OK()); - response->setN(1); + auto [batchResult, insertReply] = + processInsert(opCtx, insertRequest, &getTransactionWithRetriesForMongoS); + if (batchResult == FLEBatchResult::kNotProcessed) { + return FLEBatchResult::kNotProcessed; } - return swResult.getValue(); + replyToResponse(&insertReply.getWriteCommandReplyBase(), response); + + return FLEBatchResult::kProcessed; } else if (request.getBatchType() == BatchedCommandRequest::BatchType_Delete) { auto deleteRequest = request.getDeleteRequest(); - auto swResult = processDelete(opCtx, deleteRequest, getTxn); + auto swResult = processDelete(opCtx, deleteRequest, &getTransactionWithRetriesForMongoS); if (!swResult.isOK()) { @@ -867,7 +977,7 @@ FLEBatchResult processFLEBatch(OperationContext* opCtx, auto updateRequest = request.getUpdateRequest(); - auto swResult = processUpdate(opCtx, updateRequest, getTxn); + auto swResult = processUpdate(opCtx, updateRequest, &getTransactionWithRetriesForMongoS); if (!swResult.isOK()) { response->setStatus(swResult.getStatus()); @@ -998,14 +1108,7 @@ FLEBatchResult processFLEFindAndModify(OperationContext* opCtx, return FLEBatchResult::kNotProcessed; } - auto getTxn = [](OperationContext* opCtx) { - return std::make_shared<txn_api::TransactionWithRetries>( - opCtx, - Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(), - TransactionRouterResourceYielder::make()); - }; - - auto swReply = processFindAndModifyRequest(opCtx, request, getTxn); + auto swReply = processFindAndModifyRequest(opCtx, request, &getTransactionWithRetriesForMongoS); auto reply = uassertStatusOK(swReply); diff --git a/src/mongo/db/fle_crud.h b/src/mongo/db/fle_crud.h index ef86d926f7c..f49baacbf36 100644 --- a/src/mongo/db/fle_crud.h +++ b/src/mongo/db/fle_crud.h @@ -38,6 +38,7 @@ #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" #include "mongo/db/ops/write_ops_gen.h" +#include "mongo/db/transaction_api.h" #include "mongo/s/write_ops/batch_write_exec.h" #include "mongo/s/write_ops/batched_command_response.h" @@ -68,6 +69,24 @@ FLEBatchResult processFLEBatch(OperationContext* opCtx, BatchedCommandResponse* response, boost::optional<OID> targetEpoch); + +/** + * Initialize the FLE CRUD subsystem on Mongod. + */ +void startFLECrud(ServiceContext* serviceContext); + +/** + * Stop the FLE CRUD subsystem on Mongod. + */ +void stopFLECrud(); + + +/** + * Process a replica set insert. + */ +FLEBatchResult processFLEInsert(OperationContext* opCtx, + const write_ops::InsertCommandRequest& insertRequest, + write_ops::InsertCommandReply* insertReply); /** * Process a findAndModify request from mongos */ @@ -104,9 +123,8 @@ public: * If translateDuplicateKey == true and the insert returns DuplicateKey, returns * FLEStateCollectionContention instead. */ - virtual void insertDocument(const NamespaceString& nss, - BSONObj obj, - bool translateDuplicateKey) = 0; + virtual StatusWith<write_ops::InsertCommandReply> insertDocument( + const NamespaceString& nss, BSONObj obj, bool translateDuplicateKey) = 0; /** * Delete a single document with the given query. @@ -144,11 +162,12 @@ public: * * Used by unit tests. */ -void processInsert(FLEQueryInterface* queryImpl, - const NamespaceString& edcNss, - std::vector<EDCServerPayloadInfo>& serverPayload, - const EncryptedFieldConfig& efc, - BSONObj document); +StatusWith<write_ops::InsertCommandReply> processInsert( + FLEQueryInterface* queryImpl, + const NamespaceString& edcNss, + std::vector<EDCServerPayloadInfo>& serverPayload, + const EncryptedFieldConfig& efc, + BSONObj document); /** * Process a FLE delete with the query interface @@ -167,6 +186,17 @@ uint64_t processUpdate(FLEQueryInterface* queryImpl, const write_ops::UpdateCommandRequest& updateRequest); /** + * Callback function to get a TransactionWithRetries with the appropiate Executor + */ +using GetTxnCallback = + std::function<std::shared_ptr<txn_api::TransactionWithRetries>(OperationContext*)>; + +std::pair<FLEBatchResult, write_ops::InsertCommandReply> processInsert( + OperationContext* opCtx, + const write_ops::InsertCommandRequest& insertRequest, + GetTxnCallback getTxns); + +/** * Process a FLE Find And Modify with the query interface * * Used by unit tests. diff --git a/src/mongo/db/fle_crud_mongod.cpp b/src/mongo/db/fle_crud_mongod.cpp new file mode 100644 index 00000000000..e96c5133e3b --- /dev/null +++ b/src/mongo/db/fle_crud_mongod.cpp @@ -0,0 +1,192 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include <cstdint> +#include <memory> +#include <string> +#include <utility> + +#include "mongo/db/fle_crud.h" + +#include "mongo/bson/bsonelement.h" +#include "mongo/bson/bsonmisc.h" +#include "mongo/bson/bsonobj.h" +#include "mongo/bson/bsonobjbuilder.h" +#include "mongo/bson/bsontypes.h" +#include "mongo/crypto/encryption_fields_gen.h" +#include "mongo/crypto/fle_crypto.h" +#include "mongo/db/namespace_string.h" +#include "mongo/db/ops/write_ops_gen.h" +#include "mongo/db/ops/write_ops_parsers.h" +#include "mongo/db/repl/repl_client_info.h" +#include "mongo/db/session.h" +#include "mongo/db/session_catalog.h" +#include "mongo/db/session_catalog_mongod.h" +#include "mongo/db/transaction_api.h" +#include "mongo/db/transaction_participant.h" +#include "mongo/db/transaction_participant_resource_yielder.h" +#include "mongo/idl/idl_parser.h" +#include "mongo/s/grid.h" +#include "mongo/s/write_ops/batch_write_exec.h" +#include "mongo/util/assert_util.h" +#include "mongo/util/concurrency/thread_pool.h" + +namespace mongo { +namespace { + +std::shared_ptr<ThreadPool> _fleCrudthreadPool; + +ThreadPool::Options getThreadPoolOptions() { + ThreadPool::Options tpOptions; + tpOptions.poolName = "FLECrud"; + tpOptions.maxThreads = ThreadPool::Options::kUnlimited; + + // SEPTransactionClient::runCommand manages the client itself so do not create one via + // onCreateThread + return tpOptions; +} + +void setMongosFieldsInReply(OperationContext* opCtx, write_ops::WriteCommandReplyBase* replyBase) { + // Set these fields only if not set + if (replyBase->getOpTime().has_value() && replyBase->getElectionId().has_value()) { + return; + } + + // Undocumented repl fields that mongos depends on. + auto* replCoord = repl::ReplicationCoordinator::get(opCtx->getServiceContext()); + const auto replMode = replCoord->getReplicationMode(); + if (replMode != repl::ReplicationCoordinator::modeNone) { + + replyBase->setOpTime(repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp()); + replyBase->setElectionId(replCoord->getElectionId()); + } +} + + +class FLEMongoDResourceYielder : public ResourceYielder { +public: + void yield(OperationContext* opCtx) override { + // We're about to block. Check back in the session so that it's available to other + // threads. Note that we may block on a request to _ourselves_, meaning that we may have to + // wait for another thread which will use the same session. This step is necessary + // to prevent deadlocks. + + Session* const session = OperationContextSession::get(opCtx); + if (session) { + if (auto txnParticipant = TransactionParticipant::get(opCtx)) { + txnParticipant.stashTransactionResources(opCtx); + } + + MongoDOperationContextSession::checkIn(opCtx); + } + _yielded = (session != nullptr); + } + + void unyield(OperationContext* opCtx) override { + if (_yielded) { + // This may block on a sub-operation on this node finishing. It's possible that while + // blocked on the network layer, another shard could have responded, theoretically + // unblocking this thread of execution. However, we must wait until the child operation + // on this shard finishes so we can get the session back. This may limit the throughput + // of the operation, but it's correct. + MongoDOperationContextSession::checkOut(opCtx); + + if (auto txnParticipant = TransactionParticipant::get(opCtx)) { + // Assumes this is only called from the 'aggregate' or 'getMore' commands. The code + // which relies on this parameter does not distinguish/care about the difference so + // we simply always pass 'aggregate'. + // + // Catch NoSuchTransaction which happens when the transaction is aborted by an + // unrelated error If this error is not caught, then a user error like DuplicateKey + // gets ignored for NoSuchTransaction + try { + txnParticipant.unstashTransactionResources(opCtx, "aggregate"); + } catch (ExceptionFor<ErrorCodes::NoSuchTransaction>&) { + } + } + } + } + +private: + bool _yielded = false; +}; + +std::shared_ptr<txn_api::TransactionWithRetries> getTransactionWithRetriesForMongoD( + OperationContext* opCtx) { + return std::make_shared<txn_api::TransactionWithRetries>( + opCtx, _fleCrudthreadPool, std::make_unique<FLEMongoDResourceYielder>()); +} + +} // namespace + + +void startFLECrud(ServiceContext* serviceContext) { + // FLE crud is only supported on replica sets so no reason to start thread pool on standalones + if (repl::ReplicationCoordinator::get(serviceContext)->getReplicationMode() == + repl::ReplicationCoordinator::modeNone) { + return; + } + + _fleCrudthreadPool = std::make_shared<ThreadPool>(getThreadPoolOptions()); + _fleCrudthreadPool->startup(); +} + +void stopFLECrud() { + // Check if it was started + if (_fleCrudthreadPool.get() != nullptr) { + _fleCrudthreadPool->shutdown(); + } +} + +FLEBatchResult processFLEInsert(OperationContext* opCtx, + const write_ops::InsertCommandRequest& insertRequest, + write_ops::InsertCommandReply* insertReply) { + + uassert(6371602, + "Encrypted index operations are only supported on replica sets", + repl::ReplicationCoordinator::get(opCtx->getServiceContext())->getReplicationMode() == + repl::ReplicationCoordinator::modeReplSet); + + auto [batchResult, insertReplyReturn] = + processInsert(opCtx, insertRequest, &getTransactionWithRetriesForMongoD); + + if (batchResult == FLEBatchResult::kNotProcessed) { + return FLEBatchResult::kNotProcessed; + } + + *insertReply = insertReplyReturn; + + setMongosFieldsInReply(opCtx, &insertReply->getWriteCommandReplyBase()); + + return FLEBatchResult::kProcessed; +} + +} // namespace mongo diff --git a/src/mongo/db/fle_crud_test.cpp b/src/mongo/db/fle_crud_test.cpp index 2823749a0b5..6477037e99e 100644 --- a/src/mongo/db/fle_crud_test.cpp +++ b/src/mongo/db/fle_crud_test.cpp @@ -85,7 +85,9 @@ public: uint64_t countDocuments(const NamespaceString& nss) final; - void insertDocument(const NamespaceString& nss, BSONObj obj, bool translateDuplicateKey) final; + StatusWith<write_ops::InsertCommandReply> insertDocument(const NamespaceString& nss, + BSONObj obj, + bool translateDuplicateKey) final; BSONObj deleteWithPreimage(const NamespaceString& nss, const EncryptionInformation& ei, @@ -119,15 +121,18 @@ uint64_t FLEQueryTestImpl::countDocuments(const NamespaceString& nss) { return uassertStatusOK(_storage->getCollectionCount(_opCtx, nss)); } -void FLEQueryTestImpl::insertDocument(const NamespaceString& nss, - BSONObj obj, - bool translateDuplicateKey) { +StatusWith<write_ops::InsertCommandReply> FLEQueryTestImpl::insertDocument( + const NamespaceString& nss, BSONObj obj, bool translateDuplicateKey) { repl::TimestampedBSONObj tb; tb.obj = obj; auto status = _storage->insertDocument(_opCtx, nss, tb, 0); - uassertStatusOK(status); + if (!status.isOK()) { + return status; + } + + return write_ops::InsertCommandReply(); } BSONObj FLEQueryTestImpl::deleteWithPreimage(const NamespaceString& nss, @@ -520,7 +525,7 @@ void FleCrudTest::doSingleWideInsert(int id, uint64_t fieldCount, ValueGenerator auto efc = getTestEncryptedFieldConfig(); - processInsert(_queryImpl.get(), _edcNs, serverPayload, efc, result); + uassertStatusOK(processInsert(_queryImpl.get(), _edcNs, serverPayload, efc, result)); } @@ -581,7 +586,7 @@ void FleCrudTest::doSingleInsert(int id, BSONElement element) { auto efc = getTestEncryptedFieldConfig(); - processInsert(_queryImpl.get(), _edcNs, serverPayload, efc, result); + uassertStatusOK(processInsert(_queryImpl.get(), _edcNs, serverPayload, efc, result)); } void FleCrudTest::doSingleInsert(int id, BSONObj obj) { diff --git a/src/mongo/db/mongod_main.cpp b/src/mongo/db/mongod_main.cpp index 147ea766eae..8dd1fec50fd 100644 --- a/src/mongo/db/mongod_main.cpp +++ b/src/mongo/db/mongod_main.cpp @@ -81,6 +81,7 @@ #include "mongo/db/dbmessage.h" #include "mongo/db/exec/working_set_common.h" #include "mongo/db/fcv_op_observer.h" +#include "mongo/db/fle_crud.h" #include "mongo/db/free_mon/free_mon_mongod.h" #include "mongo/db/ftdc/ftdc_mongod.h" #include "mongo/db/ftdc/util.h" @@ -750,6 +751,8 @@ ExitCode _initAndListen(ServiceContext* serviceContext, int listenPort) { } storageEngine->startTimestampMonitor(); + + startFLECrud(serviceContext); } startClientCursorMonitor(); @@ -1232,6 +1235,9 @@ void shutdownTask(const ShutdownTaskArgs& shutdownArgs) { LOGV2_OPTIONS(4695103, {LogComponent::kReplication}, "Exiting quiesce mode for shutdown"); } + LOGV2_OPTIONS(6371601, {LogComponent::kDefault}, "Shutting down the FLE Crud thread pool"); + stopFLECrud(); + LOGV2_OPTIONS(4784901, {LogComponent::kCommand}, "Shutting down the MirrorMaestro"); MirrorMaestro::shutdown(serviceContext); |