summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jstests/fle2/compact_collection.js2
-rw-r--r--jstests/fle2/libs/encrypted_client_util.js13
-rw-r--r--src/mongo/base/error_codes.yml1
-rw-r--r--src/mongo/db/SConscript25
-rw-r--r--src/mongo/db/commands/SConscript1
-rw-r--r--src/mongo/db/commands/write_commands.cpp9
-rw-r--r--src/mongo/db/fle_crud.cpp235
-rw-r--r--src/mongo/db/fle_crud.h46
-rw-r--r--src/mongo/db/fle_crud_mongod.cpp192
-rw-r--r--src/mongo/db/fle_crud_test.cpp19
-rw-r--r--src/mongo/db/mongod_main.cpp6
11 files changed, 465 insertions, 84 deletions
diff --git a/jstests/fle2/compact_collection.js b/jstests/fle2/compact_collection.js
index 0969fc216d2..d84d0472314 100644
--- a/jstests/fle2/compact_collection.js
+++ b/jstests/fle2/compact_collection.js
@@ -10,7 +10,7 @@ load("jstests/fle2/libs/encrypted_client_util.js");
(function() {
'use strict';
-if (!isFLE2Enabled()) {
+if (!isFLE2ReplicationEnabled()) {
return;
}
diff --git a/jstests/fle2/libs/encrypted_client_util.js b/jstests/fle2/libs/encrypted_client_util.js
index 7c062009c13..095f926bc0d 100644
--- a/jstests/fle2/libs/encrypted_client_util.js
+++ b/jstests/fle2/libs/encrypted_client_util.js
@@ -1,3 +1,5 @@
+load("jstests/concurrency/fsm_workload_helpers/server_types.js"); // For isMongos.
+
/**
* Create a FLE client that has an unencrypted and encrypted client to the same database
*/
@@ -160,6 +162,17 @@ class EncryptedClient {
}
}
+ assertWriteCommandReplyFields(response) {
+ if (isMongod(this._edb)) {
+ // These fields are replica set specific
+ assert(response.hasOwnProperty("electionId"));
+ assert(response.hasOwnProperty("opTime"));
+ }
+
+ assert(response.hasOwnProperty("$clusterTime"));
+ assert(response.hasOwnProperty("operationTime"));
+ }
+
/**
* Take a snapshot of a collection sorted by _id, run a operation, take a second snapshot.
*
diff --git a/src/mongo/base/error_codes.yml b/src/mongo/base/error_codes.yml
index a908025a078..351a708abe4 100644
--- a/src/mongo/base/error_codes.yml
+++ b/src/mongo/base/error_codes.yml
@@ -475,6 +475,7 @@ error_codes:
- {code: 367, name: FLECompactionPlaceholder}
- {code: 368, name: FLEStateCollectionContention}
+ - {code: 369, name: FLETransactionAbort}
# Error codes 4000-8999 are reserved.
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index 1eaa1dbe0b0..955c9fc5c6b 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -861,8 +861,11 @@ env.Library(
source=[
'fle_crud.cpp',
],
- LIBDEPS_PRIVATE=[
+ LIBDEPS=[
'$BUILD_DIR/mongo/crypto/encrypted_field_config',
+ 'transaction_api',
+ ],
+ LIBDEPS_PRIVATE=[
'$BUILD_DIR/mongo/crypto/fle_crypto',
'$BUILD_DIR/mongo/db/ops/write_ops_parsers',
'$BUILD_DIR/mongo/db/query/query_request',
@@ -870,7 +873,24 @@ env.Library(
'$BUILD_DIR/mongo/s/grid',
'$BUILD_DIR/mongo/s/sharding_router_api',
'logical_session_id',
- 'transaction_api',
+ ],
+)
+
+env.Library(
+ target='fle_crud_mongod',
+ source=[
+ 'fle_crud_mongod.cpp',
+ ],
+ LIBDEPS_PRIVATE=[
+ '$BUILD_DIR/mongo/crypto/encrypted_field_config',
+ '$BUILD_DIR/mongo/crypto/fle_crypto',
+ '$BUILD_DIR/mongo/db/ops/write_ops_parsers',
+ '$BUILD_DIR/mongo/db/query/query_request',
+ '$BUILD_DIR/mongo/db/repl/repl_coordinator_interface',
+ '$BUILD_DIR/mongo/executor/task_executor_pool',
+ 'fle_crud',
+ 'logical_session_id',
+ 'transaction',
],
)
@@ -2416,6 +2436,7 @@ env.Library(
'concurrency/flow_control_ticketholder',
'concurrency/lock_manager',
'fcv_op_observer',
+ 'fle_crud_mongod',
'free_mon/free_mon_mongod',
'ftdc/ftdc_mongod',
'index/index_access_method_factory',
diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript
index 3c6fee64ed5..50467ca097b 100644
--- a/src/mongo/db/commands/SConscript
+++ b/src/mongo/db/commands/SConscript
@@ -396,6 +396,7 @@ env.Library(
'$BUILD_DIR/mongo/db/concurrency/write_conflict_exception',
'$BUILD_DIR/mongo/db/curop_failpoint_helpers',
'$BUILD_DIR/mongo/db/exec/sbe/query_sbe_abt',
+ '$BUILD_DIR/mongo/db/fle_crud_mongod',
'$BUILD_DIR/mongo/db/index_builds_coordinator_interface',
'$BUILD_DIR/mongo/db/index_commands_idl',
'$BUILD_DIR/mongo/db/multitenancy',
diff --git a/src/mongo/db/commands/write_commands.cpp b/src/mongo/db/commands/write_commands.cpp
index d562d21a00e..9068023447a 100644
--- a/src/mongo/db/commands/write_commands.cpp
+++ b/src/mongo/db/commands/write_commands.cpp
@@ -43,6 +43,7 @@
#include "mongo/db/commands/write_commands_common.h"
#include "mongo/db/curop.h"
#include "mongo/db/db_raii.h"
+#include "mongo/db/fle_crud.h"
#include "mongo/db/json.h"
#include "mongo/db/matcher/doc_validation_error.h"
#include "mongo/db/matcher/extensions_callback_real.h"
@@ -524,6 +525,14 @@ public:
write_ops::InsertCommandReply typedRun(OperationContext* opCtx) final try {
transactionChecks(opCtx, ns());
+ if (request().getEncryptionInformation().has_value()) {
+ write_ops::InsertCommandReply insertReply;
+ auto batch = processFLEInsert(opCtx, request(), &insertReply);
+ if (batch == FLEBatchResult::kProcessed) {
+ return insertReply;
+ }
+ }
+
if (isTimeseries(opCtx, request())) {
// Re-throw parsing exceptions to be consistent with CmdInsert::Invocation's
// constructor.
diff --git a/src/mongo/db/fle_crud.cpp b/src/mongo/db/fle_crud.cpp
index bbfda99161e..ad72f827dc3 100644
--- a/src/mongo/db/fle_crud.cpp
+++ b/src/mongo/db/fle_crud.cpp
@@ -52,6 +52,7 @@
#include "mongo/s/grid.h"
#include "mongo/s/transaction_router_resource_yielder.h"
#include "mongo/s/write_ops/batch_write_exec.h"
+#include "mongo/s/write_ops/write_error_detail.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/concurrency/thread_pool.h"
@@ -66,7 +67,9 @@ public:
uint64_t countDocuments(const NamespaceString& nss) final;
- void insertDocument(const NamespaceString& nss, BSONObj obj, bool translateDuplicateKey) final;
+ StatusWith<write_ops::InsertCommandReply> insertDocument(const NamespaceString& nss,
+ BSONObj obj,
+ bool translateDuplicateKey) final;
BSONObj deleteWithPreimage(const NamespaceString& nss,
const EncryptionInformation& ei,
@@ -155,9 +158,71 @@ uint64_t FLEQueryInterfaceImpl::countDocuments(const NamespaceString& nss) {
return docCount;
}
-void FLEQueryInterfaceImpl::insertDocument(const NamespaceString& nss,
- BSONObj obj,
- bool translateDuplicateKey) {
+write_ops::WriteError writeErrorDetailToWriteError(const WriteErrorDetail* detail) {
+ return write_ops::WriteError(detail->getIndex(), detail->toStatus());
+}
+
+
+std::vector<write_ops::WriteError> writeErrorsDetailToWriteErrors(
+ const std::vector<WriteErrorDetail*>& details) {
+ std::vector<write_ops::WriteError> errors;
+ errors.reserve(details.size());
+ for (const auto& detail : details) {
+ errors.push_back(writeErrorDetailToWriteError(detail));
+ }
+ return errors;
+}
+
+std::vector<write_ops::WriteError> singleStatusToWriteErrors(const Status& status) {
+ std::vector<write_ops::WriteError> errors;
+
+ errors.push_back(write_ops::WriteError(0, status));
+
+ return errors;
+}
+
+void appendSingleStatusToWriteErrors(const Status& status,
+ write_ops::WriteCommandReplyBase* replyBase) {
+ std::vector<write_ops::WriteError> errors;
+
+ if (replyBase->getWriteErrors()) {
+ errors = std::move(replyBase->getWriteErrors().value());
+ }
+
+ errors.push_back(write_ops::WriteError(0, status));
+
+ replyBase->setWriteErrors(errors);
+}
+
+void writeErrorsToWriteDetails(const std::vector<write_ops::WriteError>& errors,
+ BatchedCommandResponse* response) {
+ for (const auto& error : errors) {
+ auto detail = std::make_unique<WriteErrorDetail>();
+ detail->setIndex(error.getIndex());
+ detail->setStatus(error.getStatus());
+
+ response->addToErrDetails(detail.release());
+ }
+}
+
+void replyToResponse(write_ops::WriteCommandReplyBase* replyBase,
+ BatchedCommandResponse* response) {
+ response->setStatus(Status::OK());
+ response->setN(replyBase->getN());
+ if (replyBase->getElectionId()) {
+ response->setElectionId(replyBase->getElectionId().value());
+ }
+ if (replyBase->getOpTime()) {
+ response->setLastOp(replyBase->getOpTime().value());
+ }
+ if (replyBase->getWriteErrors().has_value()) {
+ writeErrorsToWriteDetails(replyBase->getWriteErrors().value(), response);
+ }
+}
+
+
+StatusWith<write_ops::InsertCommandReply> FLEQueryInterfaceImpl::insertDocument(
+ const NamespaceString& nss, BSONObj obj, bool translateDuplicateKey) {
write_ops::InsertCommandRequest insertRequest(nss);
insertRequest.setDocuments({obj});
// TODO SERVER-64143 - insertRequest.setWriteConcern
@@ -167,10 +232,28 @@ void FLEQueryInterfaceImpl::insertDocument(const NamespaceString& nss,
auto status = response.toStatus();
if (translateDuplicateKey && status.code() == ErrorCodes::DuplicateKey) {
- uassertStatusOK(Status(ErrorCodes::FLEStateCollectionContention, status.reason()));
+ return Status(ErrorCodes::FLEStateCollectionContention, status.reason());
}
- uassertStatusOK(status);
+ write_ops::InsertCommandReply reply;
+
+ if (response.isLastOpSet()) {
+ reply.getWriteCommandReplyBase().setOpTime(response.getLastOp());
+ }
+
+ if (response.isElectionIdSet()) {
+ reply.getWriteCommandReplyBase().setElectionId(response.getElectionId());
+ }
+
+ reply.getWriteCommandReplyBase().setN(response.getN());
+ if (response.isErrDetailsSet()) {
+ reply.getWriteCommandReplyBase().setWriteErrors(
+ writeErrorsDetailToWriteErrors(response.getErrDetails()));
+ }
+
+ reply.getWriteCommandReplyBase().setRetriedStmtIds(reply.getRetriedStmtIds());
+
+ return {reply};
}
BSONObj FLEQueryInterfaceImpl::deleteWithPreimage(
@@ -344,10 +427,21 @@ StatusWith<txn_api::CommitResult> runInTxnWithRetry(
}
}
-StatusWith<FLEBatchResult> processInsert(
+
+std::shared_ptr<txn_api::TransactionWithRetries> getTransactionWithRetriesForMongoS(
+ OperationContext* opCtx) {
+ return std::make_shared<txn_api::TransactionWithRetries>(
+ opCtx,
+ Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(),
+ TransactionRouterResourceYielder::make());
+}
+
+} // namespace
+
+std::pair<FLEBatchResult, write_ops::InsertCommandReply> processInsert(
OperationContext* opCtx,
const write_ops::InsertCommandRequest& insertRequest,
- std::function<std::shared_ptr<txn_api::TransactionWithRetries>(OperationContext*)> getTxns) {
+ GetTxnCallback getTxns) {
auto documents = insertRequest.getDocuments();
// TODO - how to check if a document will be too large???
@@ -359,20 +453,22 @@ StatusWith<FLEBatchResult> processInsert(
if (serverPayload->size() == 0) {
// No actual FLE2 indexed fields
- return FLEBatchResult::kNotProcessed;
+ return std::pair<FLEBatchResult, write_ops::InsertCommandReply>{
+ FLEBatchResult::kNotProcessed, write_ops::InsertCommandReply()};
}
auto ei = insertRequest.getEncryptionInformation().get();
auto edcNss = insertRequest.getNamespace();
auto efc = EncryptionInformationHelpers::getAndValidateSchema(insertRequest.getNamespace(), ei);
+ write_ops::InsertCommandReply reply;
std::shared_ptr<txn_api::TransactionWithRetries> trun = getTxns(opCtx);
// The function that handles the transaction may outlive this function so we need to use
// shared_ptrs since it runs on another thread
auto ownedDocument = document.getOwned();
- auto insertBlock = std::tie(edcNss, efc, serverPayload);
+ auto insertBlock = std::tie(edcNss, efc, serverPayload, reply);
auto sharedInsertBlock = std::make_shared<decltype(insertBlock)>(insertBlock);
auto swResult = runInTxnWithRetry(
@@ -382,23 +478,40 @@ StatusWith<FLEBatchResult> processInsert(
ExecutorPtr txnExec) {
FLEQueryInterfaceImpl queryImpl(txnClient);
- auto [edcNss2, efc2, serverPayload2] = *sharedInsertBlock.get();
+ auto [edcNss2, efc2, serverPayload2, reply2] = *sharedInsertBlock.get();
- processInsert(&queryImpl, edcNss2, *serverPayload2.get(), efc2, ownedDocument);
+ reply2 = uassertStatusOK(
+ processInsert(&queryImpl, edcNss2, *serverPayload2.get(), efc2, ownedDocument));
+
+ // If we have write errors but no unexpected internal errors, then we reach here
+ // If we have write errors, we need to return a failed status to ensure the txn client
+ // does not try to commit the transaction.
+ if (reply2.getWriteErrors().has_value() && !reply2.getWriteErrors().value().empty()) {
+ return SemiFuture<void>::makeReady(
+ Status(ErrorCodes::FLETransactionAbort, "FLE2 write errors on insert"));
+ }
return SemiFuture<void>::makeReady();
});
+
if (!swResult.isOK()) {
- return swResult.getStatus();
+ // FLETransactionAbort is used for control flow so it means we have a valid
+ // InsertCommandReply with write errors so we should return that.
+ if (swResult.getStatus() == ErrorCodes::FLETransactionAbort) {
+ return std::pair<FLEBatchResult, write_ops::InsertCommandReply>{
+ FLEBatchResult::kProcessed, reply};
+ }
+
+ appendSingleStatusToWriteErrors(swResult.getStatus(), &reply.getWriteCommandReplyBase());
}
- return FLEBatchResult::kProcessed;
+ return std::pair<FLEBatchResult, write_ops::InsertCommandReply>{FLEBatchResult::kProcessed,
+ reply};
}
-StatusWith<uint64_t> processDelete(
- OperationContext* opCtx,
- const write_ops::DeleteCommandRequest& deleteRequest,
- std::function<std::shared_ptr<txn_api::TransactionWithRetries>(OperationContext*)> getTxns) {
+StatusWith<uint64_t> processDelete(OperationContext* opCtx,
+ const write_ops::DeleteCommandRequest& deleteRequest,
+ GetTxnCallback getTxns) {
auto deletes = deleteRequest.getDeletes();
uassert(6371302, "Only single document deletes are permitted", deletes.size() == 1);
@@ -436,10 +549,9 @@ StatusWith<uint64_t> processDelete(
return count;
}
-StatusWith<uint64_t> processUpdate(
- OperationContext* opCtx,
- const write_ops::UpdateCommandRequest& updateRequest,
- std::function<std::shared_ptr<txn_api::TransactionWithRetries>(OperationContext*)> getTxns) {
+StatusWith<uint64_t> processUpdate(OperationContext* opCtx,
+ const write_ops::UpdateCommandRequest& updateRequest,
+ GetTxnCallback getTxns) {
auto updates = updateRequest.getUpdates();
uassert(6371502, "Only single document updates are permitted", updates.size() == 1);
@@ -483,6 +595,7 @@ StatusWith<uint64_t> processUpdate(
return count;
}
+namespace {
void processFieldsForInsert(FLEQueryInterface* queryImpl,
const NamespaceString& edcNss,
std::vector<EDCServerPayloadInfo>& serverPayload,
@@ -538,18 +651,22 @@ void processFieldsForInsert(FLEQueryInterface* queryImpl,
payload.count = count;
- queryImpl->insertDocument(
+ auto escInsertReply = uassertStatusOK(queryImpl->insertDocument(
nssEsc,
ESCCollection::generateInsertDocument(tagToken, valueToken, position, count),
- true);
+ true));
+ checkWriteErrors(escInsertReply);
+
NamespaceString nssEcoc(edcNss.db(), efc.getEcocCollection().get());
// TODO - should we make this a batch of ECOC updates?
- queryImpl->insertDocument(nssEcoc,
- ECOCCollection::generateDocument(
- payload.fieldPathName, payload.payload.getEncryptedTokens()),
- false);
+ auto ecocInsertReply = uassertStatusOK(queryImpl->insertDocument(
+ nssEcoc,
+ ECOCCollection::generateDocument(payload.fieldPathName,
+ payload.payload.getEncryptedTokens()),
+ false));
+ checkWriteErrors(ecocInsertReply);
}
}
@@ -624,20 +741,22 @@ void processRemovedFields(FLEQueryInterface* queryImpl,
index = alpha.value() + 1;
}
- queryImpl->insertDocument(
+ auto eccInsertReply = uassertStatusOK(queryImpl->insertDocument(
nssEcc,
ECCCollection::generateDocument(tagToken, valueToken, index, plainTextField.count),
- true);
+ true));
+ checkWriteErrors(eccInsertReply);
NamespaceString nssEcoc(edcNss.db(), efc.getEcocCollection().get());
// TODO - make this a batch of ECOC updates?
EncryptedStateCollectionTokens tokens(plainTextField.esc, plainTextField.ecc);
auto encryptedTokens = uassertStatusOK(tokens.serialize(deleteToken.ecocToken));
- queryImpl->insertDocument(
+ auto ecocInsertReply = uassertStatusOK(queryImpl->insertDocument(
nssEcoc,
ECOCCollection::generateDocument(deletedField.fieldPathName, encryptedTokens),
- false);
+ false));
+ checkWriteErrors(ecocInsertReply);
}
}
@@ -696,17 +815,19 @@ StatusWith<write_ops::FindAndModifyCommandReply> processFindAndModifyRequest(
FLEQueryInterface::~FLEQueryInterface() {}
-void processInsert(FLEQueryInterface* queryImpl,
- const NamespaceString& edcNss,
- std::vector<EDCServerPayloadInfo>& serverPayload,
- const EncryptedFieldConfig& efc,
- BSONObj document) {
+
+StatusWith<write_ops::InsertCommandReply> processInsert(
+ FLEQueryInterface* queryImpl,
+ const NamespaceString& edcNss,
+ std::vector<EDCServerPayloadInfo>& serverPayload,
+ const EncryptedFieldConfig& efc,
+ BSONObj document) {
processFieldsForInsert(queryImpl, edcNss, serverPayload, efc);
auto finalDoc = EDCServerCollection::finalizeForInsert(document, serverPayload);
- queryImpl->insertDocument(edcNss, finalDoc, false);
+ return queryImpl->insertDocument(edcNss, finalDoc, false);
}
uint64_t processDelete(FLEQueryInterface* queryImpl,
@@ -824,34 +945,23 @@ FLEBatchResult processFLEBatch(OperationContext* opCtx,
uasserted(6371209, "Feature flag FLE2 is not enabled");
}
- auto getTxn = [](OperationContext* opCtx) {
- return std::make_shared<txn_api::TransactionWithRetries>(
- opCtx,
- Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(),
- TransactionRouterResourceYielder::make());
- };
-
if (request.getBatchType() == BatchedCommandRequest::BatchType_Insert) {
auto insertRequest = request.getInsertRequest();
- auto swResult = processInsert(opCtx, insertRequest, getTxn);
-
- if (!swResult.isOK()) {
- response->setStatus(swResult.getStatus());
- response->setN(0);
-
- return FLEBatchResult::kProcessed;
- } else if (swResult.getValue() == FLEBatchResult::kProcessed) {
- response->setStatus(Status::OK());
- response->setN(1);
+ auto [batchResult, insertReply] =
+ processInsert(opCtx, insertRequest, &getTransactionWithRetriesForMongoS);
+ if (batchResult == FLEBatchResult::kNotProcessed) {
+ return FLEBatchResult::kNotProcessed;
}
- return swResult.getValue();
+ replyToResponse(&insertReply.getWriteCommandReplyBase(), response);
+
+ return FLEBatchResult::kProcessed;
} else if (request.getBatchType() == BatchedCommandRequest::BatchType_Delete) {
auto deleteRequest = request.getDeleteRequest();
- auto swResult = processDelete(opCtx, deleteRequest, getTxn);
+ auto swResult = processDelete(opCtx, deleteRequest, &getTransactionWithRetriesForMongoS);
if (!swResult.isOK()) {
@@ -867,7 +977,7 @@ FLEBatchResult processFLEBatch(OperationContext* opCtx,
auto updateRequest = request.getUpdateRequest();
- auto swResult = processUpdate(opCtx, updateRequest, getTxn);
+ auto swResult = processUpdate(opCtx, updateRequest, &getTransactionWithRetriesForMongoS);
if (!swResult.isOK()) {
response->setStatus(swResult.getStatus());
@@ -998,14 +1108,7 @@ FLEBatchResult processFLEFindAndModify(OperationContext* opCtx,
return FLEBatchResult::kNotProcessed;
}
- auto getTxn = [](OperationContext* opCtx) {
- return std::make_shared<txn_api::TransactionWithRetries>(
- opCtx,
- Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(),
- TransactionRouterResourceYielder::make());
- };
-
- auto swReply = processFindAndModifyRequest(opCtx, request, getTxn);
+ auto swReply = processFindAndModifyRequest(opCtx, request, &getTransactionWithRetriesForMongoS);
auto reply = uassertStatusOK(swReply);
diff --git a/src/mongo/db/fle_crud.h b/src/mongo/db/fle_crud.h
index ef86d926f7c..f49baacbf36 100644
--- a/src/mongo/db/fle_crud.h
+++ b/src/mongo/db/fle_crud.h
@@ -38,6 +38,7 @@
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/ops/write_ops_gen.h"
+#include "mongo/db/transaction_api.h"
#include "mongo/s/write_ops/batch_write_exec.h"
#include "mongo/s/write_ops/batched_command_response.h"
@@ -68,6 +69,24 @@ FLEBatchResult processFLEBatch(OperationContext* opCtx,
BatchedCommandResponse* response,
boost::optional<OID> targetEpoch);
+
+/**
+ * Initialize the FLE CRUD subsystem on Mongod.
+ */
+void startFLECrud(ServiceContext* serviceContext);
+
+/**
+ * Stop the FLE CRUD subsystem on Mongod.
+ */
+void stopFLECrud();
+
+
+/**
+ * Process a replica set insert.
+ */
+FLEBatchResult processFLEInsert(OperationContext* opCtx,
+ const write_ops::InsertCommandRequest& insertRequest,
+ write_ops::InsertCommandReply* insertReply);
/**
* Process a findAndModify request from mongos
*/
@@ -104,9 +123,8 @@ public:
* If translateDuplicateKey == true and the insert returns DuplicateKey, returns
* FLEStateCollectionContention instead.
*/
- virtual void insertDocument(const NamespaceString& nss,
- BSONObj obj,
- bool translateDuplicateKey) = 0;
+ virtual StatusWith<write_ops::InsertCommandReply> insertDocument(
+ const NamespaceString& nss, BSONObj obj, bool translateDuplicateKey) = 0;
/**
* Delete a single document with the given query.
@@ -144,11 +162,12 @@ public:
*
* Used by unit tests.
*/
-void processInsert(FLEQueryInterface* queryImpl,
- const NamespaceString& edcNss,
- std::vector<EDCServerPayloadInfo>& serverPayload,
- const EncryptedFieldConfig& efc,
- BSONObj document);
+StatusWith<write_ops::InsertCommandReply> processInsert(
+ FLEQueryInterface* queryImpl,
+ const NamespaceString& edcNss,
+ std::vector<EDCServerPayloadInfo>& serverPayload,
+ const EncryptedFieldConfig& efc,
+ BSONObj document);
/**
* Process a FLE delete with the query interface
@@ -167,6 +186,17 @@ uint64_t processUpdate(FLEQueryInterface* queryImpl,
const write_ops::UpdateCommandRequest& updateRequest);
/**
+ * Callback function to get a TransactionWithRetries with the appropiate Executor
+ */
+using GetTxnCallback =
+ std::function<std::shared_ptr<txn_api::TransactionWithRetries>(OperationContext*)>;
+
+std::pair<FLEBatchResult, write_ops::InsertCommandReply> processInsert(
+ OperationContext* opCtx,
+ const write_ops::InsertCommandRequest& insertRequest,
+ GetTxnCallback getTxns);
+
+/**
* Process a FLE Find And Modify with the query interface
*
* Used by unit tests.
diff --git a/src/mongo/db/fle_crud_mongod.cpp b/src/mongo/db/fle_crud_mongod.cpp
new file mode 100644
index 00000000000..e96c5133e3b
--- /dev/null
+++ b/src/mongo/db/fle_crud_mongod.cpp
@@ -0,0 +1,192 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <utility>
+
+#include "mongo/db/fle_crud.h"
+
+#include "mongo/bson/bsonelement.h"
+#include "mongo/bson/bsonmisc.h"
+#include "mongo/bson/bsonobj.h"
+#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/bson/bsontypes.h"
+#include "mongo/crypto/encryption_fields_gen.h"
+#include "mongo/crypto/fle_crypto.h"
+#include "mongo/db/namespace_string.h"
+#include "mongo/db/ops/write_ops_gen.h"
+#include "mongo/db/ops/write_ops_parsers.h"
+#include "mongo/db/repl/repl_client_info.h"
+#include "mongo/db/session.h"
+#include "mongo/db/session_catalog.h"
+#include "mongo/db/session_catalog_mongod.h"
+#include "mongo/db/transaction_api.h"
+#include "mongo/db/transaction_participant.h"
+#include "mongo/db/transaction_participant_resource_yielder.h"
+#include "mongo/idl/idl_parser.h"
+#include "mongo/s/grid.h"
+#include "mongo/s/write_ops/batch_write_exec.h"
+#include "mongo/util/assert_util.h"
+#include "mongo/util/concurrency/thread_pool.h"
+
+namespace mongo {
+namespace {
+
+std::shared_ptr<ThreadPool> _fleCrudthreadPool;
+
+ThreadPool::Options getThreadPoolOptions() {
+ ThreadPool::Options tpOptions;
+ tpOptions.poolName = "FLECrud";
+ tpOptions.maxThreads = ThreadPool::Options::kUnlimited;
+
+ // SEPTransactionClient::runCommand manages the client itself so do not create one via
+ // onCreateThread
+ return tpOptions;
+}
+
+void setMongosFieldsInReply(OperationContext* opCtx, write_ops::WriteCommandReplyBase* replyBase) {
+ // Set these fields only if not set
+ if (replyBase->getOpTime().has_value() && replyBase->getElectionId().has_value()) {
+ return;
+ }
+
+ // Undocumented repl fields that mongos depends on.
+ auto* replCoord = repl::ReplicationCoordinator::get(opCtx->getServiceContext());
+ const auto replMode = replCoord->getReplicationMode();
+ if (replMode != repl::ReplicationCoordinator::modeNone) {
+
+ replyBase->setOpTime(repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp());
+ replyBase->setElectionId(replCoord->getElectionId());
+ }
+}
+
+
+class FLEMongoDResourceYielder : public ResourceYielder {
+public:
+ void yield(OperationContext* opCtx) override {
+ // We're about to block. Check back in the session so that it's available to other
+ // threads. Note that we may block on a request to _ourselves_, meaning that we may have to
+ // wait for another thread which will use the same session. This step is necessary
+ // to prevent deadlocks.
+
+ Session* const session = OperationContextSession::get(opCtx);
+ if (session) {
+ if (auto txnParticipant = TransactionParticipant::get(opCtx)) {
+ txnParticipant.stashTransactionResources(opCtx);
+ }
+
+ MongoDOperationContextSession::checkIn(opCtx);
+ }
+ _yielded = (session != nullptr);
+ }
+
+ void unyield(OperationContext* opCtx) override {
+ if (_yielded) {
+ // This may block on a sub-operation on this node finishing. It's possible that while
+ // blocked on the network layer, another shard could have responded, theoretically
+ // unblocking this thread of execution. However, we must wait until the child operation
+ // on this shard finishes so we can get the session back. This may limit the throughput
+ // of the operation, but it's correct.
+ MongoDOperationContextSession::checkOut(opCtx);
+
+ if (auto txnParticipant = TransactionParticipant::get(opCtx)) {
+ // Assumes this is only called from the 'aggregate' or 'getMore' commands. The code
+ // which relies on this parameter does not distinguish/care about the difference so
+ // we simply always pass 'aggregate'.
+ //
+ // Catch NoSuchTransaction which happens when the transaction is aborted by an
+ // unrelated error If this error is not caught, then a user error like DuplicateKey
+ // gets ignored for NoSuchTransaction
+ try {
+ txnParticipant.unstashTransactionResources(opCtx, "aggregate");
+ } catch (ExceptionFor<ErrorCodes::NoSuchTransaction>&) {
+ }
+ }
+ }
+ }
+
+private:
+ bool _yielded = false;
+};
+
+std::shared_ptr<txn_api::TransactionWithRetries> getTransactionWithRetriesForMongoD(
+ OperationContext* opCtx) {
+ return std::make_shared<txn_api::TransactionWithRetries>(
+ opCtx, _fleCrudthreadPool, std::make_unique<FLEMongoDResourceYielder>());
+}
+
+} // namespace
+
+
+void startFLECrud(ServiceContext* serviceContext) {
+ // FLE crud is only supported on replica sets so no reason to start thread pool on standalones
+ if (repl::ReplicationCoordinator::get(serviceContext)->getReplicationMode() ==
+ repl::ReplicationCoordinator::modeNone) {
+ return;
+ }
+
+ _fleCrudthreadPool = std::make_shared<ThreadPool>(getThreadPoolOptions());
+ _fleCrudthreadPool->startup();
+}
+
+void stopFLECrud() {
+ // Check if it was started
+ if (_fleCrudthreadPool.get() != nullptr) {
+ _fleCrudthreadPool->shutdown();
+ }
+}
+
+FLEBatchResult processFLEInsert(OperationContext* opCtx,
+ const write_ops::InsertCommandRequest& insertRequest,
+ write_ops::InsertCommandReply* insertReply) {
+
+ uassert(6371602,
+ "Encrypted index operations are only supported on replica sets",
+ repl::ReplicationCoordinator::get(opCtx->getServiceContext())->getReplicationMode() ==
+ repl::ReplicationCoordinator::modeReplSet);
+
+ auto [batchResult, insertReplyReturn] =
+ processInsert(opCtx, insertRequest, &getTransactionWithRetriesForMongoD);
+
+ if (batchResult == FLEBatchResult::kNotProcessed) {
+ return FLEBatchResult::kNotProcessed;
+ }
+
+ *insertReply = insertReplyReturn;
+
+ setMongosFieldsInReply(opCtx, &insertReply->getWriteCommandReplyBase());
+
+ return FLEBatchResult::kProcessed;
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/fle_crud_test.cpp b/src/mongo/db/fle_crud_test.cpp
index 2823749a0b5..6477037e99e 100644
--- a/src/mongo/db/fle_crud_test.cpp
+++ b/src/mongo/db/fle_crud_test.cpp
@@ -85,7 +85,9 @@ public:
uint64_t countDocuments(const NamespaceString& nss) final;
- void insertDocument(const NamespaceString& nss, BSONObj obj, bool translateDuplicateKey) final;
+ StatusWith<write_ops::InsertCommandReply> insertDocument(const NamespaceString& nss,
+ BSONObj obj,
+ bool translateDuplicateKey) final;
BSONObj deleteWithPreimage(const NamespaceString& nss,
const EncryptionInformation& ei,
@@ -119,15 +121,18 @@ uint64_t FLEQueryTestImpl::countDocuments(const NamespaceString& nss) {
return uassertStatusOK(_storage->getCollectionCount(_opCtx, nss));
}
-void FLEQueryTestImpl::insertDocument(const NamespaceString& nss,
- BSONObj obj,
- bool translateDuplicateKey) {
+StatusWith<write_ops::InsertCommandReply> FLEQueryTestImpl::insertDocument(
+ const NamespaceString& nss, BSONObj obj, bool translateDuplicateKey) {
repl::TimestampedBSONObj tb;
tb.obj = obj;
auto status = _storage->insertDocument(_opCtx, nss, tb, 0);
- uassertStatusOK(status);
+ if (!status.isOK()) {
+ return status;
+ }
+
+ return write_ops::InsertCommandReply();
}
BSONObj FLEQueryTestImpl::deleteWithPreimage(const NamespaceString& nss,
@@ -520,7 +525,7 @@ void FleCrudTest::doSingleWideInsert(int id, uint64_t fieldCount, ValueGenerator
auto efc = getTestEncryptedFieldConfig();
- processInsert(_queryImpl.get(), _edcNs, serverPayload, efc, result);
+ uassertStatusOK(processInsert(_queryImpl.get(), _edcNs, serverPayload, efc, result));
}
@@ -581,7 +586,7 @@ void FleCrudTest::doSingleInsert(int id, BSONElement element) {
auto efc = getTestEncryptedFieldConfig();
- processInsert(_queryImpl.get(), _edcNs, serverPayload, efc, result);
+ uassertStatusOK(processInsert(_queryImpl.get(), _edcNs, serverPayload, efc, result));
}
void FleCrudTest::doSingleInsert(int id, BSONObj obj) {
diff --git a/src/mongo/db/mongod_main.cpp b/src/mongo/db/mongod_main.cpp
index 147ea766eae..8dd1fec50fd 100644
--- a/src/mongo/db/mongod_main.cpp
+++ b/src/mongo/db/mongod_main.cpp
@@ -81,6 +81,7 @@
#include "mongo/db/dbmessage.h"
#include "mongo/db/exec/working_set_common.h"
#include "mongo/db/fcv_op_observer.h"
+#include "mongo/db/fle_crud.h"
#include "mongo/db/free_mon/free_mon_mongod.h"
#include "mongo/db/ftdc/ftdc_mongod.h"
#include "mongo/db/ftdc/util.h"
@@ -750,6 +751,8 @@ ExitCode _initAndListen(ServiceContext* serviceContext, int listenPort) {
}
storageEngine->startTimestampMonitor();
+
+ startFLECrud(serviceContext);
}
startClientCursorMonitor();
@@ -1232,6 +1235,9 @@ void shutdownTask(const ShutdownTaskArgs& shutdownArgs) {
LOGV2_OPTIONS(4695103, {LogComponent::kReplication}, "Exiting quiesce mode for shutdown");
}
+ LOGV2_OPTIONS(6371601, {LogComponent::kDefault}, "Shutting down the FLE Crud thread pool");
+ stopFLECrud();
+
LOGV2_OPTIONS(4784901, {LogComponent::kCommand}, "Shutting down the MirrorMaestro");
MirrorMaestro::shutdown(serviceContext);