summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMark Benvenuto <mark.benvenuto@mongodb.com>2022-04-02 17:16:24 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-04-02 21:46:31 +0000
commitaaae37f453b94ecd6cfd4bf4261a36a42389885c (patch)
tree743a24863e7afc78bd410ab046cd81d75ce4f827
parentc78ba79626722ae69ea9b64762ffd1dc075ce960 (diff)
downloadmongo-aaae37f453b94ecd6cfd4bf4261a36a42389885c.tar.gz
SERVER-64143 Add retryable write support to FLE 2 CRUD
-rw-r--r--src/mongo/base/error_codes.yml1
-rw-r--r--src/mongo/db/commands/fle2_compact.cpp25
-rw-r--r--src/mongo/db/commands/fle_compact_test.cpp4
-rw-r--r--src/mongo/db/fle_crud.cpp214
-rw-r--r--src/mongo/db/fle_crud.h32
-rw-r--r--src/mongo/db/fle_crud_test.cpp4
-rw-r--r--src/mongo/db/fle_query_interface_mock.cpp10
-rw-r--r--src/mongo/db/fle_query_interface_mock.h6
8 files changed, 169 insertions, 127 deletions
diff --git a/src/mongo/base/error_codes.yml b/src/mongo/base/error_codes.yml
index 663654410c6..fccba446c5c 100644
--- a/src/mongo/base/error_codes.yml
+++ b/src/mongo/base/error_codes.yml
@@ -474,7 +474,6 @@ error_codes:
- {code: 366, name: WouldChangeOwningShardDeletedNoDocument}
- {code: 367, name: FLECompactionPlaceholder}
- - {code: 368, name: FLEStateCollectionContention}
- {code: 369, name: FLETransactionAbort}
- {code: 370, name: CannotDropShardKeyIndex}
diff --git a/src/mongo/db/commands/fle2_compact.cpp b/src/mongo/db/commands/fle2_compact.cpp
index 3450efa6741..2d930e6b05c 100644
--- a/src/mongo/db/commands/fle2_compact.cpp
+++ b/src/mongo/db/commands/fle2_compact.cpp
@@ -153,7 +153,8 @@ void upsertNullDocument(FLEQueryInterface* queryImpl,
}
} else {
// insert the null doc; translate duplicate key error to a FLE contention error
- auto reply = uassertStatusOK(queryImpl->insertDocument(nss, newNullDoc, true));
+ StmtId stmtId = kUninitializedStmtId;
+ auto reply = uassertStatusOK(queryImpl->insertDocument(nss, newNullDoc, &stmtId, true));
checkWriteErrors(reply);
statsCtr.addInserts(1);
}
@@ -256,7 +257,9 @@ ESCPreCompactState prepareESCForCompaction(FLEQueryInterface* queryImpl,
// committed before the current compact transaction commits
auto placeholder = ESCCollection::generateCompactionPlaceholderDocument(
tagToken, valueToken, state.pos, state.count);
- auto insertReply = uassertStatusOK(queryImpl->insertDocument(nssEsc, placeholder, true));
+ StmtId stmtId = kUninitializedStmtId;
+ auto insertReply =
+ uassertStatusOK(queryImpl->insertDocument(nssEsc, placeholder, &stmtId, true));
checkWriteErrors(insertReply);
stats.addInserts(1);
@@ -332,7 +335,9 @@ ECCPreCompactState prepareECCForCompaction(FLEQueryInterface* queryImpl,
// committed before the current compact transaction commits
auto placeholder =
ECCCollection::generateCompactionDocument(tagToken, valueToken, state.pos);
- auto insertReply = uassertStatusOK(queryImpl->insertDocument(nssEcc, placeholder, true));
+ StmtId stmtId = kUninitializedStmtId;
+ auto insertReply =
+ uassertStatusOK(queryImpl->insertDocument(nssEcc, placeholder, &stmtId, true));
checkWriteErrors(insertReply);
stats.addInserts(1);
} else {
@@ -442,6 +447,7 @@ void compactOneFieldValuePair(FLEQueryInterface* queryImpl,
// PART 3
// A. compact the ECC
bool allEntriesDeleted = (escState.count == eccState.count);
+ StmtId stmtId = kUninitializedStmtId;
if (eccState.count != 0) {
bool hasNullDoc = (eccState.ipos > 1);
@@ -458,6 +464,7 @@ void compactOneFieldValuePair(FLEQueryInterface* queryImpl,
namespaces.eccNss,
ECCCollection::generateDocument(
eccTagToken, eccValueToken, eccState.pos + k, range.start, range.end),
+ &stmtId,
true));
checkWriteErrors(insertReply);
stats.addInserts(1);
@@ -527,10 +534,8 @@ CompactStats processFLECompact(OperationContext* opCtx,
auto argsBlock = std::tie(c, request, namespaces, ecocStats);
auto sharedBlock = std::make_shared<decltype(argsBlock)>(argsBlock);
- auto swResult = runInTxnWithRetry(
- opCtx,
- trun,
- [sharedBlock](const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) {
+ auto swResult = trun->runSyncNoThrow(
+ opCtx, [sharedBlock](const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) {
FLEQueryInterfaceImpl queryImpl(txnClient);
auto [c2, request2, namespaces2, ecocStats2] = *sharedBlock.get();
@@ -556,10 +561,8 @@ CompactStats processFLECompact(OperationContext* opCtx,
auto argsBlock = std::tie(ecocDoc, namespaces, escStats, eccStats);
auto sharedBlock = std::make_shared<decltype(argsBlock)>(argsBlock);
- auto swResult = runInTxnWithRetry(
- opCtx,
- trun,
- [sharedBlock](const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) {
+ auto swResult = trun->runSyncNoThrow(
+ opCtx, [sharedBlock](const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) {
FLEQueryInterfaceImpl queryImpl(txnClient);
auto [ecocDoc2, namespaces2, escStats2, eccStats2] = *sharedBlock.get();
diff --git a/src/mongo/db/commands/fle_compact_test.cpp b/src/mongo/db/commands/fle_compact_test.cpp
index 0112338facb..9ee23271cfd 100644
--- a/src/mongo/db/commands/fle_compact_test.cpp
+++ b/src/mongo/db/commands/fle_compact_test.cpp
@@ -395,8 +395,8 @@ void FleCompactTest::doSingleInsert(int id, BSONObj encryptedFieldsObj) {
auto efc =
generateEncryptedFieldConfig(encryptedFieldsObj.getFieldNames<std::set<std::string>>());
- uassertStatusOK(
- processInsert(_queryImpl.get(), _namespaces.edcNss, serverPayload, efc, result));
+ uassertStatusOK(processInsert(
+ _queryImpl.get(), _namespaces.edcNss, serverPayload, efc, kUninitializedTxnNumber, result));
}
void FleCompactTest::doSingleDelete(int id, BSONObj encryptedFieldsObj) {
diff --git a/src/mongo/db/fle_crud.cpp b/src/mongo/db/fle_crud.cpp
index 2a6e2243bb9..9031c6ced9c 100644
--- a/src/mongo/db/fle_crud.cpp
+++ b/src/mongo/db/fle_crud.cpp
@@ -98,6 +98,22 @@ void replyToResponse(write_ops::WriteCommandReplyBase* replyBase,
}
}
+void responseToReply(const BatchedCommandResponse& response,
+ write_ops::WriteCommandReplyBase& replyBase) {
+ if (response.isLastOpSet()) {
+ replyBase.setOpTime(response.getLastOp());
+ }
+
+ if (response.isElectionIdSet()) {
+ replyBase.setElectionId(response.getElectionId());
+ }
+
+ replyBase.setN(response.getN());
+ if (response.isErrDetailsSet()) {
+ replyBase.setWriteErrors(response.getErrDetails());
+ }
+}
+
boost::optional<BSONObj> mergeLetAndCVariables(const boost::optional<BSONObj>& let,
const boost::optional<BSONObj>& c) {
if (!let.has_value() && !c.has_value()) {
@@ -111,52 +127,8 @@ boost::optional<BSONObj> mergeLetAndCVariables(const boost::optional<BSONObj>& l
}
return c;
}
-
} // namespace
-StatusWith<txn_api::CommitResult> runInTxnWithRetry(
- OperationContext* opCtx,
- std::shared_ptr<txn_api::TransactionWithRetries> trun,
- std::function<SemiFuture<void>(const txn_api::TransactionClient& txnClient,
- ExecutorPtr txnExec)> callback) {
-
- bool inClientTransaction = opCtx->inMultiDocumentTransaction();
-
- // TODO SERVER-59566 - how much do we retry before we give up?
- while (true) {
-
- // Result will get the status of the TXN
- // Non-client initiated txns get retried automatically.
- // Client txns are the user responsibility to retry and so if we hit a contention
- // placeholder, we need to abort and defer to the client
- auto swResult = trun->runSyncNoThrow(opCtx, callback);
- if (swResult.isOK()) {
- return swResult;
- }
-
- // We cannot retry the transaction if initiated by a user
- if (inClientTransaction) {
- return swResult;
- }
-
- // - DuplicateKeyException - suggestions contention on ESC
- // - FLEContention
- if (swResult.getStatus().code() != ErrorCodes::FLECompactionPlaceholder &&
- swResult.getStatus().code() != ErrorCodes::FLEStateCollectionContention) {
- return swResult;
- }
-
- if (!swResult.isOK()) {
- return swResult;
- }
-
- auto commitResult = swResult.getValue();
- if (commitResult.getEffectiveStatus().isOK()) {
- return commitResult;
- }
- }
-}
-
std::shared_ptr<txn_api::TransactionWithRetries> getTransactionWithRetriesForMongoS(
OperationContext* opCtx) {
return std::make_shared<txn_api::TransactionWithRetries>(
@@ -165,6 +137,7 @@ std::shared_ptr<txn_api::TransactionWithRetries> getTransactionWithRetriesForMon
TransactionRouterResourceYielder::makeForLocalHandoff());
}
+namespace {
/**
* Make an expression context from a batch command request and a specific operation. Templated out
* to work with update and delete.
@@ -190,6 +163,8 @@ boost::intrusive_ptr<ExpressionContext> makeExpCtx(OperationContext* opCtx,
return expCtx;
}
+} // namespace
+
std::pair<FLEBatchResult, write_ops::InsertCommandReply> processInsert(
OperationContext* opCtx,
const write_ops::InsertCommandRequest& insertRequest,
@@ -217,25 +192,26 @@ std::pair<FLEBatchResult, write_ops::InsertCommandReply> processInsert(
write_ops::InsertCommandReply reply;
+ uint32_t stmtId = getStmtIdForWriteAt(insertRequest, 0);
+
std::shared_ptr<txn_api::TransactionWithRetries> trun = getTxns(opCtx);
// The function that handles the transaction may outlive this function so we need to use
// shared_ptrs since it runs on another thread
auto ownedDocument = document.getOwned();
- auto insertBlock = std::tie(edcNss, efc, serverPayload, reply);
+ auto insertBlock = std::tie(edcNss, efc, serverPayload, reply, stmtId);
auto sharedInsertBlock = std::make_shared<decltype(insertBlock)>(insertBlock);
- auto swResult = runInTxnWithRetry(
+ auto swResult = trun->runSyncNoThrow(
opCtx,
- trun,
[sharedInsertBlock, ownedDocument](const txn_api::TransactionClient& txnClient,
ExecutorPtr txnExec) {
FLEQueryInterfaceImpl queryImpl(txnClient);
- auto [edcNss2, efc2, serverPayload2, reply2] = *sharedInsertBlock.get();
+ auto [edcNss2, efc2, serverPayload2, reply2, stmtId2] = *sharedInsertBlock.get();
- reply2 = uassertStatusOK(
- processInsert(&queryImpl, edcNss2, *serverPayload2.get(), efc2, ownedDocument));
+ reply2 = uassertStatusOK(processInsert(
+ &queryImpl, edcNss2, *serverPayload2.get(), efc2, stmtId2, ownedDocument));
if (MONGO_unlikely(fleCrudHangInsert.shouldFail())) {
LOGV2(6371903, "Hanging due to fleCrudHangInsert fail point");
@@ -262,6 +238,9 @@ std::pair<FLEBatchResult, write_ops::InsertCommandReply> processInsert(
}
appendSingleStatusToWriteErrors(swResult.getStatus(), &reply.getWriteCommandReplyBase());
+ } else if (!swResult.getValue().getEffectiveStatus().isOK()) {
+ appendSingleStatusToWriteErrors(swResult.getValue().getEffectiveStatus(),
+ &reply.getWriteCommandReplyBase());
}
return std::pair<FLEBatchResult, write_ops::InsertCommandReply>{FLEBatchResult::kProcessed,
@@ -290,9 +269,8 @@ write_ops::DeleteCommandReply processDelete(OperationContext* opCtx,
auto deleteBlock = std::tie(deleteRequest, reply, expCtx);
auto sharedDeleteBlock = std::make_shared<decltype(deleteBlock)>(deleteBlock);
- auto swResult = runInTxnWithRetry(
+ auto swResult = trun->runSyncNoThrow(
opCtx,
- trun,
[sharedDeleteBlock](const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) {
FLEQueryInterfaceImpl queryImpl(txnClient);
@@ -324,6 +302,9 @@ write_ops::DeleteCommandReply processDelete(OperationContext* opCtx,
}
appendSingleStatusToWriteErrors(swResult.getStatus(), &reply.getWriteCommandReplyBase());
+ } else if (!swResult.getValue().getEffectiveStatus().isOK()) {
+ appendSingleStatusToWriteErrors(swResult.getValue().getEffectiveStatus(),
+ &reply.getWriteCommandReplyBase());
}
return reply;
@@ -356,9 +337,8 @@ write_ops::UpdateCommandReply processUpdate(OperationContext* opCtx,
auto updateBlock = std::tie(updateRequest, reply, expCtx);
auto sharedupdateBlock = std::make_shared<decltype(updateBlock)>(updateBlock);
- auto swResult = runInTxnWithRetry(
+ auto swResult = trun->runSyncNoThrow(
opCtx,
- trun,
[sharedupdateBlock](const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) {
FLEQueryInterfaceImpl queryImpl(txnClient);
@@ -390,6 +370,9 @@ write_ops::UpdateCommandReply processUpdate(OperationContext* opCtx,
}
appendSingleStatusToWriteErrors(swResult.getStatus(), &reply.getWriteCommandReplyBase());
+ } else if (!swResult.getValue().getEffectiveStatus().isOK()) {
+ appendSingleStatusToWriteErrors(swResult.getValue().getEffectiveStatus(),
+ &reply.getWriteCommandReplyBase());
}
return reply;
@@ -400,7 +383,8 @@ namespace {
void processFieldsForInsert(FLEQueryInterface* queryImpl,
const NamespaceString& edcNss,
std::vector<EDCServerPayloadInfo>& serverPayload,
- const EncryptedFieldConfig& efc) {
+ const EncryptedFieldConfig& efc,
+ int32_t* pStmtId) {
NamespaceString nssEsc(edcNss.db(), efc.getEscCollection().get());
@@ -455,6 +439,7 @@ void processFieldsForInsert(FLEQueryInterface* queryImpl,
auto escInsertReply = uassertStatusOK(queryImpl->insertDocument(
nssEsc,
ESCCollection::generateInsertDocument(tagToken, valueToken, position, count),
+ pStmtId,
true));
checkWriteErrors(escInsertReply);
@@ -466,6 +451,7 @@ void processFieldsForInsert(FLEQueryInterface* queryImpl,
nssEcoc,
ECOCCollection::generateDocument(payload.fieldPathName,
payload.payload.getEncryptedTokens()),
+ pStmtId,
false));
checkWriteErrors(ecocInsertReply);
}
@@ -475,7 +461,8 @@ void processRemovedFields(FLEQueryInterface* queryImpl,
const NamespaceString& edcNss,
const EncryptedFieldConfig& efc,
const StringMap<FLEDeleteToken>& tokenMap,
- const std::vector<EDCIndexedFields>& deletedFields) {
+ const std::vector<EDCIndexedFields>& deletedFields,
+ int32_t* pStmtId) {
NamespaceString nssEcc(edcNss.db(), efc.getEccCollection().get());
@@ -545,6 +532,7 @@ void processRemovedFields(FLEQueryInterface* queryImpl,
auto eccInsertReply = uassertStatusOK(queryImpl->insertDocument(
nssEcc,
ECCCollection::generateDocument(tagToken, valueToken, index, plainTextField.count),
+ pStmtId,
true));
checkWriteErrors(eccInsertReply);
@@ -556,6 +544,7 @@ void processRemovedFields(FLEQueryInterface* queryImpl,
auto ecocInsertReply = uassertStatusOK(queryImpl->insertDocument(
nssEcoc,
ECOCCollection::generateDocument(deletedField.fieldPathName, encryptedTokens),
+ pStmtId,
false));
checkWriteErrors(ecocInsertReply);
}
@@ -601,9 +590,8 @@ StatusWith<write_ops::FindAndModifyCommandReply> processFindAndModifyRequest(
auto sharedFindAndModifyBlock =
std::make_shared<decltype(findAndModifyBlock)>(findAndModifyBlock);
- auto swResult = runInTxnWithRetry(
+ auto swResult = trun->runSyncNoThrow(
opCtx,
- trun,
[sharedFindAndModifyBlock](const txn_api::TransactionClient& txnClient,
ExecutorPtr txnExec) {
FLEQueryInterfaceImpl queryImpl(txnClient);
@@ -623,6 +611,8 @@ StatusWith<write_ops::FindAndModifyCommandReply> processFindAndModifyRequest(
if (!swResult.isOK()) {
return swResult.getStatus();
+ } else if (!swResult.getValue().getEffectiveStatus().isOK()) {
+ return swResult.getValue().getEffectiveStatus();
}
return reply;
@@ -635,13 +625,14 @@ StatusWith<write_ops::InsertCommandReply> processInsert(
const NamespaceString& edcNss,
std::vector<EDCServerPayloadInfo>& serverPayload,
const EncryptedFieldConfig& efc,
+ int32_t stmtId,
BSONObj document) {
- processFieldsForInsert(queryImpl, edcNss, serverPayload, efc);
+ processFieldsForInsert(queryImpl, edcNss, serverPayload, efc, &stmtId);
auto finalDoc = EDCServerCollection::finalizeForInsert(document, serverPayload);
- return queryImpl->insertDocument(edcNss, finalDoc, false);
+ return queryImpl->insertDocument(edcNss, finalDoc, &stmtId, false);
}
write_ops::DeleteCommandReply processDelete(FLEQueryInterface* queryImpl,
@@ -653,15 +644,19 @@ write_ops::DeleteCommandReply processDelete(FLEQueryInterface* queryImpl,
auto efc = EncryptionInformationHelpers::getAndValidateSchema(edcNss, ei);
auto tokenMap = EncryptionInformationHelpers::getDeleteTokens(edcNss, ei);
+ int32_t stmtId = getStmtIdForWriteAt(deleteRequest, 0);
- write_ops::DeleteCommandRequest newDeleteRequest = deleteRequest;
+ auto newDeleteRequest = deleteRequest;
auto newDeleteOp = newDeleteRequest.getDeletes()[0];
newDeleteOp.setQ(fle::rewriteEncryptedFilterInsideTxn(
queryImpl, deleteRequest.getDbName(), efc, expCtx, newDeleteOp.getQ()));
newDeleteRequest.setDeletes({newDeleteOp});
- // TODO SERVER-64143 - use this delete for retryable writes
+ newDeleteRequest.getWriteCommandRequestBase().setStmtIds(boost::none);
+ newDeleteRequest.getWriteCommandRequestBase().setStmtId(stmtId);
+ ++stmtId;
+
auto [deleteReply, deletedDocument] =
queryImpl->deleteWithPreimage(edcNss, ei, newDeleteRequest);
@@ -675,7 +670,7 @@ write_ops::DeleteCommandReply processDelete(FLEQueryInterface* queryImpl,
auto deletedFields = EDCServerCollection::getEncryptedIndexedFields(deletedDocument);
- processRemovedFields(queryImpl, edcNss, efc, tokenMap, deletedFields);
+ processRemovedFields(queryImpl, edcNss, efc, tokenMap, deletedFields, &stmtId);
return deleteReply;
}
@@ -705,6 +700,7 @@ write_ops::UpdateCommandReply processUpdate(FLEQueryInterface* queryImpl,
const auto updateModification = updateOpEntry.getU();
+ int32_t stmtId = getStmtIdForWriteAt(updateRequest, 0);
// Step 1 ----
std::vector<EDCServerPayloadInfo> serverPayload;
@@ -718,7 +714,7 @@ write_ops::UpdateCommandReply processUpdate(FLEQueryInterface* queryImpl,
EDCServerCollection::validateEncryptedFieldInfo(setObject, efc);
serverPayload = EDCServerCollection::getEncryptedFieldInfo(updateModifier);
- processFieldsForInsert(queryImpl, edcNss, serverPayload, efc);
+ processFieldsForInsert(queryImpl, edcNss, serverPayload, efc, &stmtId);
// Step 2 ----
auto pushUpdate = EDCServerCollection::finalizeForUpdate(updateModifier, serverPayload);
@@ -730,7 +726,7 @@ write_ops::UpdateCommandReply processUpdate(FLEQueryInterface* queryImpl,
EDCServerCollection::validateEncryptedFieldInfo(replacementDocument, efc);
serverPayload = EDCServerCollection::getEncryptedFieldInfo(replacementDocument);
- processFieldsForInsert(queryImpl, edcNss, serverPayload, efc);
+ processFieldsForInsert(queryImpl, edcNss, serverPayload, efc, &stmtId);
// Step 2 ----
auto safeContentReplace =
@@ -743,8 +739,10 @@ write_ops::UpdateCommandReply processUpdate(FLEQueryInterface* queryImpl,
// Step 3 ----
auto newUpdateRequest = updateRequest;
newUpdateRequest.setUpdates({newUpdateOpEntry});
+ newUpdateRequest.getWriteCommandRequestBase().setStmtIds(boost::none);
+ newUpdateRequest.getWriteCommandRequestBase().setStmtId(stmtId);
+ ++stmtId;
- // TODO - use this update for retryable writes
auto [updateReply, originalDocument] =
queryImpl->updateWithPreimage(edcNss, ei, newUpdateRequest);
if (originalDocument.isEmpty()) {
@@ -775,7 +773,7 @@ write_ops::UpdateCommandReply processUpdate(FLEQueryInterface* queryImpl,
auto newFields = EDCServerCollection::getEncryptedIndexedFields(newDocument);
auto deletedFields = EDCServerCollection::getRemovedTags(originalFields, newFields);
- processRemovedFields(queryImpl, edcNss, efc, tokenMap, deletedFields);
+ processRemovedFields(queryImpl, edcNss, efc, tokenMap, deletedFields, &stmtId);
// Step 6 ----
BSONObj pullUpdate = EDCServerCollection::generateUpdateToRemoveTags(deletedFields, tokenMap);
@@ -786,7 +784,10 @@ write_ops::UpdateCommandReply processUpdate(FLEQueryInterface* queryImpl,
pullUpdateOpEntry.setU(mongo::write_ops::UpdateModification(
pullUpdate, write_ops::UpdateModification::ClassicTag(), false));
newUpdateRequest.setUpdates({pullUpdateOpEntry});
- /* ignore */ queryImpl->updateWithPreimage(edcNss, ei, newUpdateRequest);
+ newUpdateRequest.getWriteCommandRequestBase().setStmtId(boost::none);
+ newUpdateRequest.setLegacyRuntimeConstants(boost::none);
+ newUpdateRequest.getWriteCommandRequestBase().setEncryptionInformation(boost::none);
+ /* ignore */ queryImpl->update(edcNss, stmtId, newUpdateRequest);
return updateReply;
}
@@ -862,6 +863,7 @@ write_ops::FindAndModifyCommandReply processFindAndModify(
auto efc = EncryptionInformationHelpers::getAndValidateSchema(edcNss, ei);
auto tokenMap = EncryptionInformationHelpers::getDeleteTokens(edcNss, ei);
+ int32_t stmtId = findAndModifyRequest.getStmtId().value_or(0);
// Step 1 ----
// If we have an update object, we have to process for ESC
@@ -877,7 +879,7 @@ write_ops::FindAndModifyCommandReply processFindAndModify(
auto setObject = updateModifier.getObjectField("$set");
EDCServerCollection::validateEncryptedFieldInfo(setObject, efc);
serverPayload = EDCServerCollection::getEncryptedFieldInfo(updateModifier);
- processFieldsForInsert(queryImpl, edcNss, serverPayload, efc);
+ processFieldsForInsert(queryImpl, edcNss, serverPayload, efc, &stmtId);
auto pushUpdate = EDCServerCollection::finalizeForUpdate(updateModifier, serverPayload);
@@ -889,7 +891,7 @@ write_ops::FindAndModifyCommandReply processFindAndModify(
EDCServerCollection::validateEncryptedFieldInfo(replacementDocument, efc);
serverPayload = EDCServerCollection::getEncryptedFieldInfo(replacementDocument);
- processFieldsForInsert(queryImpl, edcNss, serverPayload, efc);
+ processFieldsForInsert(queryImpl, edcNss, serverPayload, efc, &stmtId);
// Step 2 ----
auto safeContentReplace =
@@ -903,8 +905,9 @@ write_ops::FindAndModifyCommandReply processFindAndModify(
newFindAndModifyRequest.setUpdate(newUpdateModification);
}
- // TODO - use this update for retryable writes
newFindAndModifyRequest.setNew(false);
+ newFindAndModifyRequest.setStmtId(stmtId);
+ ++stmtId;
auto reply = queryImpl->findAndModify(edcNss, ei, newFindAndModifyRequest);
if (!reply.getValue().has_value() || reply.getValue().value().isEmpty()) {
@@ -942,7 +945,7 @@ write_ops::FindAndModifyCommandReply processFindAndModify(
auto originalFields = EDCServerCollection::getEncryptedIndexedFields(originalDocument);
auto deletedFields = EDCServerCollection::getRemovedTags(originalFields, newFields);
- processRemovedFields(queryImpl, edcNss, efc, tokenMap, deletedFields);
+ processRemovedFields(queryImpl, edcNss, efc, tokenMap, deletedFields, &stmtId);
// Step 6 ----
// We don't need to make a second update in the case of a delete
@@ -958,8 +961,11 @@ write_ops::FindAndModifyCommandReply processFindAndModify(
pullUpdateOpEntry.setU(mongo::write_ops::UpdateModification(
pullUpdate, write_ops::UpdateModification::ClassicTag(), false));
newUpdateRequest.setUpdates({pullUpdateOpEntry});
- auto [finalUpdateReply, finalCorrectDocument] =
- queryImpl->updateWithPreimage(edcNss, ei, newUpdateRequest);
+ newUpdateRequest.setLegacyRuntimeConstants(boost::none);
+ newUpdateRequest.getWriteCommandRequestBase().setStmtId(boost::none);
+ newUpdateRequest.getWriteCommandRequestBase().setEncryptionInformation(boost::none);
+
+ auto finalUpdateReply = queryImpl->update(edcNss, stmtId, newUpdateRequest);
checkWriteErrors(finalUpdateReply);
}
@@ -1070,44 +1076,33 @@ uint64_t FLEQueryInterfaceImpl::countDocuments(const NamespaceString& nss) {
}
StatusWith<write_ops::InsertCommandReply> FLEQueryInterfaceImpl::insertDocument(
- const NamespaceString& nss, BSONObj obj, bool translateDuplicateKey) {
+ const NamespaceString& nss, BSONObj obj, StmtId* pStmtId, bool translateDuplicateKey) {
write_ops::InsertCommandRequest insertRequest(nss);
insertRequest.setDocuments({obj});
- // TODO SERVER-64143 - insertRequest.setWriteConcern
- // TODO SERVER-64143 - propagate the retryable statement ids to runCRUDOp
- auto response = _txnClient.runCRUDOp(BatchedCommandRequest(insertRequest), {}).get();
-
- auto status = response.toStatus();
- if (translateDuplicateKey && status.code() == ErrorCodes::DuplicateKey) {
- return Status(ErrorCodes::FLEStateCollectionContention, status.reason());
+ int32_t stmtId = *pStmtId;
+ if (stmtId != kUninitializedStmtId) {
+ (*pStmtId)++;
}
- write_ops::InsertCommandReply reply;
-
- if (response.isLastOpSet()) {
- reply.getWriteCommandReplyBase().setOpTime(response.getLastOp());
- }
+ auto response = _txnClient.runCRUDOp(BatchedCommandRequest(insertRequest), {stmtId}).get();
- if (response.isElectionIdSet()) {
- reply.getWriteCommandReplyBase().setElectionId(response.getElectionId());
- }
+ auto status = response.toStatus();
- reply.getWriteCommandReplyBase().setN(response.getN());
- if (response.isErrDetailsSet()) {
- reply.getWriteCommandReplyBase().setWriteErrors(response.getErrDetails());
- }
+ write_ops::InsertCommandReply reply;
- reply.getWriteCommandReplyBase().setRetriedStmtIds(reply.getRetriedStmtIds());
+ responseToReply(response, reply.getWriteCommandReplyBase());
return {reply};
}
std::pair<write_ops::DeleteCommandReply, BSONObj> FLEQueryInterfaceImpl::deleteWithPreimage(
-
const NamespaceString& nss,
const EncryptionInformation& ei,
const write_ops::DeleteCommandRequest& deleteRequest) {
+ // We only support a single delete
+ dassert(deleteRequest.getStmtIds().value_or(std::vector<int32_t>()).empty());
+
auto deleteOpEntry = deleteRequest.getDeletes()[0];
write_ops::FindAndModifyCommandRequest findAndModifyRequest(nss);
@@ -1118,7 +1113,8 @@ std::pair<write_ops::DeleteCommandReply, BSONObj> FLEQueryInterfaceImpl::deleteW
findAndModifyRequest.setRemove(true);
findAndModifyRequest.setCollation(deleteOpEntry.getCollation());
findAndModifyRequest.setLet(deleteRequest.getLet());
- // TODO SERVER-64143 - writeConcern
+ findAndModifyRequest.setStmtId(deleteRequest.getStmtId());
+
auto ei2 = ei;
ei2.setCrudProcessed(true);
findAndModifyRequest.setEncryptionInformation(ei2);
@@ -1145,6 +1141,9 @@ std::pair<write_ops::UpdateCommandReply, BSONObj> FLEQueryInterfaceImpl::updateW
const NamespaceString& nss,
const EncryptionInformation& ei,
const write_ops::UpdateCommandRequest& updateRequest) {
+ // We only support a single update
+ dassert(updateRequest.getStmtIds().value_or(std::vector<int32_t>()).empty());
+
auto updateOpEntry = updateRequest.getUpdates()[0];
write_ops::FindAndModifyCommandRequest findAndModifyRequest(nss);
@@ -1159,7 +1158,8 @@ std::pair<write_ops::UpdateCommandReply, BSONObj> FLEQueryInterfaceImpl::updateW
findAndModifyRequest.setHint(updateOpEntry.getHint());
findAndModifyRequest.setLet(
mergeLetAndCVariables(updateRequest.getLet(), updateOpEntry.getC()));
- // TODO SERVER-64143 - writeConcern
+ findAndModifyRequest.setStmtId(updateRequest.getStmtId());
+
auto ei2 = ei;
ei2.setCrudProcessed(true);
findAndModifyRequest.setEncryptionInformation(ei2);
@@ -1199,6 +1199,24 @@ std::pair<write_ops::UpdateCommandReply, BSONObj> FLEQueryInterfaceImpl::updateW
return {updateReply, reply.getValue().value_or(BSONObj())};
}
+write_ops::UpdateCommandReply FLEQueryInterfaceImpl::update(
+ const NamespaceString& nss,
+ int32_t stmtId,
+ const write_ops::UpdateCommandRequest& updateRequest) {
+
+ dassert(updateRequest.getStmtIds().value_or(std::vector<int32_t>()).empty());
+
+ auto response = _txnClient.runCRUDOp(BatchedCommandRequest(updateRequest), {stmtId}).get();
+
+ write_ops::UpdateCommandReply reply;
+
+ responseToReply(response, reply.getWriteCommandReplyBase());
+
+ reply.setNModified(response.getNModified());
+
+ return {reply};
+}
+
write_ops::FindAndModifyCommandReply FLEQueryInterfaceImpl::findAndModify(
const NamespaceString& nss,
const EncryptionInformation& ei,
diff --git a/src/mongo/db/fle_crud.h b/src/mongo/db/fle_crud.h
index 1fbce135aca..07732356432 100644
--- a/src/mongo/db/fle_crud.h
+++ b/src/mongo/db/fle_crud.h
@@ -167,7 +167,7 @@ public:
* FLEStateCollectionContention instead.
*/
virtual StatusWith<write_ops::InsertCommandReply> insertDocument(
- const NamespaceString& nss, BSONObj obj, bool translateDuplicateKey) = 0;
+ const NamespaceString& nss, BSONObj obj, StmtId* pStmtId, bool translateDuplicateKey) = 0;
/**
* Delete a single document with the given query.
@@ -191,10 +191,21 @@ public:
const EncryptionInformation& ei,
const write_ops::UpdateCommandRequest& updateRequest) = 0;
+
+ /**
+ * Update a single document with the given query and update operators.
+ *
+ * Returns an update reply.
+ */
+ virtual write_ops::UpdateCommandReply update(
+ const NamespaceString& nss,
+ int32_t stmtId,
+ const write_ops::UpdateCommandRequest& updateRequest) = 0;
+
/**
* Do a single findAndModify request.
*
- * TODO
+ * Returns a findAndModify reply.
*/
virtual write_ops::FindAndModifyCommandReply findAndModify(
const NamespaceString& nss,
@@ -220,6 +231,7 @@ public:
StatusWith<write_ops::InsertCommandReply> insertDocument(const NamespaceString& nss,
BSONObj obj,
+ int32_t* pStmtId,
bool translateDuplicateKey) final;
std::pair<write_ops::DeleteCommandReply, BSONObj> deleteWithPreimage(
@@ -232,6 +244,11 @@ public:
const EncryptionInformation& ei,
const write_ops::UpdateCommandRequest& updateRequest) final;
+ write_ops::UpdateCommandReply update(
+ const NamespaceString& nss,
+ int32_t stmtId,
+ const write_ops::UpdateCommandRequest& updateRequest) final;
+
write_ops::FindAndModifyCommandReply findAndModify(
const NamespaceString& nss,
const EncryptionInformation& ei,
@@ -270,16 +287,6 @@ private:
};
/**
- * Runs a callback function inside a transaction, and retrying if the transaction fails
- * with a retryable error status.
- */
-StatusWith<txn_api::CommitResult> runInTxnWithRetry(
- OperationContext* opCtx,
- std::shared_ptr<txn_api::TransactionWithRetries> trun,
- std::function<SemiFuture<void>(const txn_api::TransactionClient& txnClient,
- ExecutorPtr txnExec)> callback);
-
-/**
* Creates a new TransactionWithRetries object that runs a transaction on the
* sharding fixed task executor.
*/
@@ -303,6 +310,7 @@ StatusWith<write_ops::InsertCommandReply> processInsert(
const NamespaceString& edcNss,
std::vector<EDCServerPayloadInfo>& serverPayload,
const EncryptedFieldConfig& efc,
+ int32_t stmtId,
BSONObj document);
/**
diff --git a/src/mongo/db/fle_crud_test.cpp b/src/mongo/db/fle_crud_test.cpp
index 70d362605e4..d84aa78d394 100644
--- a/src/mongo/db/fle_crud_test.cpp
+++ b/src/mongo/db/fle_crud_test.cpp
@@ -396,7 +396,7 @@ void FleCrudTest::doSingleWideInsert(int id, uint64_t fieldCount, ValueGenerator
auto efc = getTestEncryptedFieldConfig();
- uassertStatusOK(processInsert(_queryImpl.get(), _edcNs, serverPayload, efc, result));
+ uassertStatusOK(processInsert(_queryImpl.get(), _edcNs, serverPayload, efc, 0, result));
}
@@ -457,7 +457,7 @@ void FleCrudTest::doSingleInsert(int id, BSONElement element) {
auto efc = getTestEncryptedFieldConfig();
- uassertStatusOK(processInsert(_queryImpl.get(), _edcNs, serverPayload, efc, result));
+ uassertStatusOK(processInsert(_queryImpl.get(), _edcNs, serverPayload, efc, 0, result));
}
void FleCrudTest::doSingleInsert(int id, BSONObj obj) {
diff --git a/src/mongo/db/fle_query_interface_mock.cpp b/src/mongo/db/fle_query_interface_mock.cpp
index 316c7be12f0..c6268fc05c2 100644
--- a/src/mongo/db/fle_query_interface_mock.cpp
+++ b/src/mongo/db/fle_query_interface_mock.cpp
@@ -54,7 +54,7 @@ uint64_t FLEQueryInterfaceMock::countDocuments(const NamespaceString& nss) {
}
StatusWith<write_ops::InsertCommandReply> FLEQueryInterfaceMock::insertDocument(
- const NamespaceString& nss, BSONObj obj, bool translateDuplicateKey) {
+ const NamespaceString& nss, BSONObj obj, StmtId* pStmtId, bool translateDuplicateKey) {
repl::TimestampedBSONObj tb;
tb.obj = obj;
@@ -131,6 +131,14 @@ std::pair<write_ops::UpdateCommandReply, BSONObj> FLEQueryInterfaceMock::updateW
return {write_ops::UpdateCommandReply(), preimage};
}
+write_ops::UpdateCommandReply FLEQueryInterfaceMock::update(
+ const NamespaceString& nss,
+ int32_t stmtId,
+ const write_ops::UpdateCommandRequest& updateRequest) {
+ auto [reply, _] = updateWithPreimage(nss, EncryptionInformation(), updateRequest);
+ return reply;
+}
+
write_ops::FindAndModifyCommandReply FLEQueryInterfaceMock::findAndModify(
const NamespaceString& nss,
const EncryptionInformation& ei,
diff --git a/src/mongo/db/fle_query_interface_mock.h b/src/mongo/db/fle_query_interface_mock.h
index c2f480e870f..229d2c08dfe 100644
--- a/src/mongo/db/fle_query_interface_mock.h
+++ b/src/mongo/db/fle_query_interface_mock.h
@@ -49,6 +49,7 @@ public:
StatusWith<write_ops::InsertCommandReply> insertDocument(const NamespaceString& nss,
BSONObj obj,
+ StmtId* pStmtId,
bool translateDuplicateKey) final;
std::pair<write_ops::DeleteCommandReply, BSONObj> deleteWithPreimage(
@@ -61,6 +62,11 @@ public:
const EncryptionInformation& ei,
const write_ops::UpdateCommandRequest& updateRequest) final;
+ write_ops::UpdateCommandReply update(
+ const NamespaceString& nss,
+ int32_t stmtId,
+ const write_ops::UpdateCommandRequest& updateRequest) final;
+
write_ops::FindAndModifyCommandReply findAndModify(
const NamespaceString& nss,
const EncryptionInformation& ei,