diff options
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/commands/fle2_compact.cpp | 25 | ||||
-rw-r--r-- | src/mongo/db/commands/fle_compact_test.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/fle_crud.cpp | 214 | ||||
-rw-r--r-- | src/mongo/db/fle_crud.h | 32 | ||||
-rw-r--r-- | src/mongo/db/fle_crud_test.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/fle_query_interface_mock.cpp | 10 | ||||
-rw-r--r-- | src/mongo/db/fle_query_interface_mock.h | 6 |
7 files changed, 169 insertions, 126 deletions
diff --git a/src/mongo/db/commands/fle2_compact.cpp b/src/mongo/db/commands/fle2_compact.cpp index 3450efa6741..2d930e6b05c 100644 --- a/src/mongo/db/commands/fle2_compact.cpp +++ b/src/mongo/db/commands/fle2_compact.cpp @@ -153,7 +153,8 @@ void upsertNullDocument(FLEQueryInterface* queryImpl, } } else { // insert the null doc; translate duplicate key error to a FLE contention error - auto reply = uassertStatusOK(queryImpl->insertDocument(nss, newNullDoc, true)); + StmtId stmtId = kUninitializedStmtId; + auto reply = uassertStatusOK(queryImpl->insertDocument(nss, newNullDoc, &stmtId, true)); checkWriteErrors(reply); statsCtr.addInserts(1); } @@ -256,7 +257,9 @@ ESCPreCompactState prepareESCForCompaction(FLEQueryInterface* queryImpl, // committed before the current compact transaction commits auto placeholder = ESCCollection::generateCompactionPlaceholderDocument( tagToken, valueToken, state.pos, state.count); - auto insertReply = uassertStatusOK(queryImpl->insertDocument(nssEsc, placeholder, true)); + StmtId stmtId = kUninitializedStmtId; + auto insertReply = + uassertStatusOK(queryImpl->insertDocument(nssEsc, placeholder, &stmtId, true)); checkWriteErrors(insertReply); stats.addInserts(1); @@ -332,7 +335,9 @@ ECCPreCompactState prepareECCForCompaction(FLEQueryInterface* queryImpl, // committed before the current compact transaction commits auto placeholder = ECCCollection::generateCompactionDocument(tagToken, valueToken, state.pos); - auto insertReply = uassertStatusOK(queryImpl->insertDocument(nssEcc, placeholder, true)); + StmtId stmtId = kUninitializedStmtId; + auto insertReply = + uassertStatusOK(queryImpl->insertDocument(nssEcc, placeholder, &stmtId, true)); checkWriteErrors(insertReply); stats.addInserts(1); } else { @@ -442,6 +447,7 @@ void compactOneFieldValuePair(FLEQueryInterface* queryImpl, // PART 3 // A. compact the ECC bool allEntriesDeleted = (escState.count == eccState.count); + StmtId stmtId = kUninitializedStmtId; if (eccState.count != 0) { bool hasNullDoc = (eccState.ipos > 1); @@ -458,6 +464,7 @@ void compactOneFieldValuePair(FLEQueryInterface* queryImpl, namespaces.eccNss, ECCCollection::generateDocument( eccTagToken, eccValueToken, eccState.pos + k, range.start, range.end), + &stmtId, true)); checkWriteErrors(insertReply); stats.addInserts(1); @@ -527,10 +534,8 @@ CompactStats processFLECompact(OperationContext* opCtx, auto argsBlock = std::tie(c, request, namespaces, ecocStats); auto sharedBlock = std::make_shared<decltype(argsBlock)>(argsBlock); - auto swResult = runInTxnWithRetry( - opCtx, - trun, - [sharedBlock](const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) { + auto swResult = trun->runSyncNoThrow( + opCtx, [sharedBlock](const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) { FLEQueryInterfaceImpl queryImpl(txnClient); auto [c2, request2, namespaces2, ecocStats2] = *sharedBlock.get(); @@ -556,10 +561,8 @@ CompactStats processFLECompact(OperationContext* opCtx, auto argsBlock = std::tie(ecocDoc, namespaces, escStats, eccStats); auto sharedBlock = std::make_shared<decltype(argsBlock)>(argsBlock); - auto swResult = runInTxnWithRetry( - opCtx, - trun, - [sharedBlock](const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) { + auto swResult = trun->runSyncNoThrow( + opCtx, [sharedBlock](const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) { FLEQueryInterfaceImpl queryImpl(txnClient); auto [ecocDoc2, namespaces2, escStats2, eccStats2] = *sharedBlock.get(); diff --git a/src/mongo/db/commands/fle_compact_test.cpp b/src/mongo/db/commands/fle_compact_test.cpp index 0112338facb..9ee23271cfd 100644 --- a/src/mongo/db/commands/fle_compact_test.cpp +++ b/src/mongo/db/commands/fle_compact_test.cpp @@ -395,8 +395,8 @@ void FleCompactTest::doSingleInsert(int id, BSONObj encryptedFieldsObj) { auto efc = generateEncryptedFieldConfig(encryptedFieldsObj.getFieldNames<std::set<std::string>>()); - uassertStatusOK( - processInsert(_queryImpl.get(), _namespaces.edcNss, serverPayload, efc, result)); + uassertStatusOK(processInsert( + _queryImpl.get(), _namespaces.edcNss, serverPayload, efc, kUninitializedTxnNumber, result)); } void FleCompactTest::doSingleDelete(int id, BSONObj encryptedFieldsObj) { diff --git a/src/mongo/db/fle_crud.cpp b/src/mongo/db/fle_crud.cpp index 2a6e2243bb9..9031c6ced9c 100644 --- a/src/mongo/db/fle_crud.cpp +++ b/src/mongo/db/fle_crud.cpp @@ -98,6 +98,22 @@ void replyToResponse(write_ops::WriteCommandReplyBase* replyBase, } } +void responseToReply(const BatchedCommandResponse& response, + write_ops::WriteCommandReplyBase& replyBase) { + if (response.isLastOpSet()) { + replyBase.setOpTime(response.getLastOp()); + } + + if (response.isElectionIdSet()) { + replyBase.setElectionId(response.getElectionId()); + } + + replyBase.setN(response.getN()); + if (response.isErrDetailsSet()) { + replyBase.setWriteErrors(response.getErrDetails()); + } +} + boost::optional<BSONObj> mergeLetAndCVariables(const boost::optional<BSONObj>& let, const boost::optional<BSONObj>& c) { if (!let.has_value() && !c.has_value()) { @@ -111,52 +127,8 @@ boost::optional<BSONObj> mergeLetAndCVariables(const boost::optional<BSONObj>& l } return c; } - } // namespace -StatusWith<txn_api::CommitResult> runInTxnWithRetry( - OperationContext* opCtx, - std::shared_ptr<txn_api::TransactionWithRetries> trun, - std::function<SemiFuture<void>(const txn_api::TransactionClient& txnClient, - ExecutorPtr txnExec)> callback) { - - bool inClientTransaction = opCtx->inMultiDocumentTransaction(); - - // TODO SERVER-59566 - how much do we retry before we give up? - while (true) { - - // Result will get the status of the TXN - // Non-client initiated txns get retried automatically. - // Client txns are the user responsibility to retry and so if we hit a contention - // placeholder, we need to abort and defer to the client - auto swResult = trun->runSyncNoThrow(opCtx, callback); - if (swResult.isOK()) { - return swResult; - } - - // We cannot retry the transaction if initiated by a user - if (inClientTransaction) { - return swResult; - } - - // - DuplicateKeyException - suggestions contention on ESC - // - FLEContention - if (swResult.getStatus().code() != ErrorCodes::FLECompactionPlaceholder && - swResult.getStatus().code() != ErrorCodes::FLEStateCollectionContention) { - return swResult; - } - - if (!swResult.isOK()) { - return swResult; - } - - auto commitResult = swResult.getValue(); - if (commitResult.getEffectiveStatus().isOK()) { - return commitResult; - } - } -} - std::shared_ptr<txn_api::TransactionWithRetries> getTransactionWithRetriesForMongoS( OperationContext* opCtx) { return std::make_shared<txn_api::TransactionWithRetries>( @@ -165,6 +137,7 @@ std::shared_ptr<txn_api::TransactionWithRetries> getTransactionWithRetriesForMon TransactionRouterResourceYielder::makeForLocalHandoff()); } +namespace { /** * Make an expression context from a batch command request and a specific operation. Templated out * to work with update and delete. @@ -190,6 +163,8 @@ boost::intrusive_ptr<ExpressionContext> makeExpCtx(OperationContext* opCtx, return expCtx; } +} // namespace + std::pair<FLEBatchResult, write_ops::InsertCommandReply> processInsert( OperationContext* opCtx, const write_ops::InsertCommandRequest& insertRequest, @@ -217,25 +192,26 @@ std::pair<FLEBatchResult, write_ops::InsertCommandReply> processInsert( write_ops::InsertCommandReply reply; + uint32_t stmtId = getStmtIdForWriteAt(insertRequest, 0); + std::shared_ptr<txn_api::TransactionWithRetries> trun = getTxns(opCtx); // The function that handles the transaction may outlive this function so we need to use // shared_ptrs since it runs on another thread auto ownedDocument = document.getOwned(); - auto insertBlock = std::tie(edcNss, efc, serverPayload, reply); + auto insertBlock = std::tie(edcNss, efc, serverPayload, reply, stmtId); auto sharedInsertBlock = std::make_shared<decltype(insertBlock)>(insertBlock); - auto swResult = runInTxnWithRetry( + auto swResult = trun->runSyncNoThrow( opCtx, - trun, [sharedInsertBlock, ownedDocument](const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) { FLEQueryInterfaceImpl queryImpl(txnClient); - auto [edcNss2, efc2, serverPayload2, reply2] = *sharedInsertBlock.get(); + auto [edcNss2, efc2, serverPayload2, reply2, stmtId2] = *sharedInsertBlock.get(); - reply2 = uassertStatusOK( - processInsert(&queryImpl, edcNss2, *serverPayload2.get(), efc2, ownedDocument)); + reply2 = uassertStatusOK(processInsert( + &queryImpl, edcNss2, *serverPayload2.get(), efc2, stmtId2, ownedDocument)); if (MONGO_unlikely(fleCrudHangInsert.shouldFail())) { LOGV2(6371903, "Hanging due to fleCrudHangInsert fail point"); @@ -262,6 +238,9 @@ std::pair<FLEBatchResult, write_ops::InsertCommandReply> processInsert( } appendSingleStatusToWriteErrors(swResult.getStatus(), &reply.getWriteCommandReplyBase()); + } else if (!swResult.getValue().getEffectiveStatus().isOK()) { + appendSingleStatusToWriteErrors(swResult.getValue().getEffectiveStatus(), + &reply.getWriteCommandReplyBase()); } return std::pair<FLEBatchResult, write_ops::InsertCommandReply>{FLEBatchResult::kProcessed, @@ -290,9 +269,8 @@ write_ops::DeleteCommandReply processDelete(OperationContext* opCtx, auto deleteBlock = std::tie(deleteRequest, reply, expCtx); auto sharedDeleteBlock = std::make_shared<decltype(deleteBlock)>(deleteBlock); - auto swResult = runInTxnWithRetry( + auto swResult = trun->runSyncNoThrow( opCtx, - trun, [sharedDeleteBlock](const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) { FLEQueryInterfaceImpl queryImpl(txnClient); @@ -324,6 +302,9 @@ write_ops::DeleteCommandReply processDelete(OperationContext* opCtx, } appendSingleStatusToWriteErrors(swResult.getStatus(), &reply.getWriteCommandReplyBase()); + } else if (!swResult.getValue().getEffectiveStatus().isOK()) { + appendSingleStatusToWriteErrors(swResult.getValue().getEffectiveStatus(), + &reply.getWriteCommandReplyBase()); } return reply; @@ -356,9 +337,8 @@ write_ops::UpdateCommandReply processUpdate(OperationContext* opCtx, auto updateBlock = std::tie(updateRequest, reply, expCtx); auto sharedupdateBlock = std::make_shared<decltype(updateBlock)>(updateBlock); - auto swResult = runInTxnWithRetry( + auto swResult = trun->runSyncNoThrow( opCtx, - trun, [sharedupdateBlock](const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) { FLEQueryInterfaceImpl queryImpl(txnClient); @@ -390,6 +370,9 @@ write_ops::UpdateCommandReply processUpdate(OperationContext* opCtx, } appendSingleStatusToWriteErrors(swResult.getStatus(), &reply.getWriteCommandReplyBase()); + } else if (!swResult.getValue().getEffectiveStatus().isOK()) { + appendSingleStatusToWriteErrors(swResult.getValue().getEffectiveStatus(), + &reply.getWriteCommandReplyBase()); } return reply; @@ -400,7 +383,8 @@ namespace { void processFieldsForInsert(FLEQueryInterface* queryImpl, const NamespaceString& edcNss, std::vector<EDCServerPayloadInfo>& serverPayload, - const EncryptedFieldConfig& efc) { + const EncryptedFieldConfig& efc, + int32_t* pStmtId) { NamespaceString nssEsc(edcNss.db(), efc.getEscCollection().get()); @@ -455,6 +439,7 @@ void processFieldsForInsert(FLEQueryInterface* queryImpl, auto escInsertReply = uassertStatusOK(queryImpl->insertDocument( nssEsc, ESCCollection::generateInsertDocument(tagToken, valueToken, position, count), + pStmtId, true)); checkWriteErrors(escInsertReply); @@ -466,6 +451,7 @@ void processFieldsForInsert(FLEQueryInterface* queryImpl, nssEcoc, ECOCCollection::generateDocument(payload.fieldPathName, payload.payload.getEncryptedTokens()), + pStmtId, false)); checkWriteErrors(ecocInsertReply); } @@ -475,7 +461,8 @@ void processRemovedFields(FLEQueryInterface* queryImpl, const NamespaceString& edcNss, const EncryptedFieldConfig& efc, const StringMap<FLEDeleteToken>& tokenMap, - const std::vector<EDCIndexedFields>& deletedFields) { + const std::vector<EDCIndexedFields>& deletedFields, + int32_t* pStmtId) { NamespaceString nssEcc(edcNss.db(), efc.getEccCollection().get()); @@ -545,6 +532,7 @@ void processRemovedFields(FLEQueryInterface* queryImpl, auto eccInsertReply = uassertStatusOK(queryImpl->insertDocument( nssEcc, ECCCollection::generateDocument(tagToken, valueToken, index, plainTextField.count), + pStmtId, true)); checkWriteErrors(eccInsertReply); @@ -556,6 +544,7 @@ void processRemovedFields(FLEQueryInterface* queryImpl, auto ecocInsertReply = uassertStatusOK(queryImpl->insertDocument( nssEcoc, ECOCCollection::generateDocument(deletedField.fieldPathName, encryptedTokens), + pStmtId, false)); checkWriteErrors(ecocInsertReply); } @@ -601,9 +590,8 @@ StatusWith<write_ops::FindAndModifyCommandReply> processFindAndModifyRequest( auto sharedFindAndModifyBlock = std::make_shared<decltype(findAndModifyBlock)>(findAndModifyBlock); - auto swResult = runInTxnWithRetry( + auto swResult = trun->runSyncNoThrow( opCtx, - trun, [sharedFindAndModifyBlock](const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) { FLEQueryInterfaceImpl queryImpl(txnClient); @@ -623,6 +611,8 @@ StatusWith<write_ops::FindAndModifyCommandReply> processFindAndModifyRequest( if (!swResult.isOK()) { return swResult.getStatus(); + } else if (!swResult.getValue().getEffectiveStatus().isOK()) { + return swResult.getValue().getEffectiveStatus(); } return reply; @@ -635,13 +625,14 @@ StatusWith<write_ops::InsertCommandReply> processInsert( const NamespaceString& edcNss, std::vector<EDCServerPayloadInfo>& serverPayload, const EncryptedFieldConfig& efc, + int32_t stmtId, BSONObj document) { - processFieldsForInsert(queryImpl, edcNss, serverPayload, efc); + processFieldsForInsert(queryImpl, edcNss, serverPayload, efc, &stmtId); auto finalDoc = EDCServerCollection::finalizeForInsert(document, serverPayload); - return queryImpl->insertDocument(edcNss, finalDoc, false); + return queryImpl->insertDocument(edcNss, finalDoc, &stmtId, false); } write_ops::DeleteCommandReply processDelete(FLEQueryInterface* queryImpl, @@ -653,15 +644,19 @@ write_ops::DeleteCommandReply processDelete(FLEQueryInterface* queryImpl, auto efc = EncryptionInformationHelpers::getAndValidateSchema(edcNss, ei); auto tokenMap = EncryptionInformationHelpers::getDeleteTokens(edcNss, ei); + int32_t stmtId = getStmtIdForWriteAt(deleteRequest, 0); - write_ops::DeleteCommandRequest newDeleteRequest = deleteRequest; + auto newDeleteRequest = deleteRequest; auto newDeleteOp = newDeleteRequest.getDeletes()[0]; newDeleteOp.setQ(fle::rewriteEncryptedFilterInsideTxn( queryImpl, deleteRequest.getDbName(), efc, expCtx, newDeleteOp.getQ())); newDeleteRequest.setDeletes({newDeleteOp}); - // TODO SERVER-64143 - use this delete for retryable writes + newDeleteRequest.getWriteCommandRequestBase().setStmtIds(boost::none); + newDeleteRequest.getWriteCommandRequestBase().setStmtId(stmtId); + ++stmtId; + auto [deleteReply, deletedDocument] = queryImpl->deleteWithPreimage(edcNss, ei, newDeleteRequest); @@ -675,7 +670,7 @@ write_ops::DeleteCommandReply processDelete(FLEQueryInterface* queryImpl, auto deletedFields = EDCServerCollection::getEncryptedIndexedFields(deletedDocument); - processRemovedFields(queryImpl, edcNss, efc, tokenMap, deletedFields); + processRemovedFields(queryImpl, edcNss, efc, tokenMap, deletedFields, &stmtId); return deleteReply; } @@ -705,6 +700,7 @@ write_ops::UpdateCommandReply processUpdate(FLEQueryInterface* queryImpl, const auto updateModification = updateOpEntry.getU(); + int32_t stmtId = getStmtIdForWriteAt(updateRequest, 0); // Step 1 ---- std::vector<EDCServerPayloadInfo> serverPayload; @@ -718,7 +714,7 @@ write_ops::UpdateCommandReply processUpdate(FLEQueryInterface* queryImpl, EDCServerCollection::validateEncryptedFieldInfo(setObject, efc); serverPayload = EDCServerCollection::getEncryptedFieldInfo(updateModifier); - processFieldsForInsert(queryImpl, edcNss, serverPayload, efc); + processFieldsForInsert(queryImpl, edcNss, serverPayload, efc, &stmtId); // Step 2 ---- auto pushUpdate = EDCServerCollection::finalizeForUpdate(updateModifier, serverPayload); @@ -730,7 +726,7 @@ write_ops::UpdateCommandReply processUpdate(FLEQueryInterface* queryImpl, EDCServerCollection::validateEncryptedFieldInfo(replacementDocument, efc); serverPayload = EDCServerCollection::getEncryptedFieldInfo(replacementDocument); - processFieldsForInsert(queryImpl, edcNss, serverPayload, efc); + processFieldsForInsert(queryImpl, edcNss, serverPayload, efc, &stmtId); // Step 2 ---- auto safeContentReplace = @@ -743,8 +739,10 @@ write_ops::UpdateCommandReply processUpdate(FLEQueryInterface* queryImpl, // Step 3 ---- auto newUpdateRequest = updateRequest; newUpdateRequest.setUpdates({newUpdateOpEntry}); + newUpdateRequest.getWriteCommandRequestBase().setStmtIds(boost::none); + newUpdateRequest.getWriteCommandRequestBase().setStmtId(stmtId); + ++stmtId; - // TODO - use this update for retryable writes auto [updateReply, originalDocument] = queryImpl->updateWithPreimage(edcNss, ei, newUpdateRequest); if (originalDocument.isEmpty()) { @@ -775,7 +773,7 @@ write_ops::UpdateCommandReply processUpdate(FLEQueryInterface* queryImpl, auto newFields = EDCServerCollection::getEncryptedIndexedFields(newDocument); auto deletedFields = EDCServerCollection::getRemovedTags(originalFields, newFields); - processRemovedFields(queryImpl, edcNss, efc, tokenMap, deletedFields); + processRemovedFields(queryImpl, edcNss, efc, tokenMap, deletedFields, &stmtId); // Step 6 ---- BSONObj pullUpdate = EDCServerCollection::generateUpdateToRemoveTags(deletedFields, tokenMap); @@ -786,7 +784,10 @@ write_ops::UpdateCommandReply processUpdate(FLEQueryInterface* queryImpl, pullUpdateOpEntry.setU(mongo::write_ops::UpdateModification( pullUpdate, write_ops::UpdateModification::ClassicTag(), false)); newUpdateRequest.setUpdates({pullUpdateOpEntry}); - /* ignore */ queryImpl->updateWithPreimage(edcNss, ei, newUpdateRequest); + newUpdateRequest.getWriteCommandRequestBase().setStmtId(boost::none); + newUpdateRequest.setLegacyRuntimeConstants(boost::none); + newUpdateRequest.getWriteCommandRequestBase().setEncryptionInformation(boost::none); + /* ignore */ queryImpl->update(edcNss, stmtId, newUpdateRequest); return updateReply; } @@ -862,6 +863,7 @@ write_ops::FindAndModifyCommandReply processFindAndModify( auto efc = EncryptionInformationHelpers::getAndValidateSchema(edcNss, ei); auto tokenMap = EncryptionInformationHelpers::getDeleteTokens(edcNss, ei); + int32_t stmtId = findAndModifyRequest.getStmtId().value_or(0); // Step 1 ---- // If we have an update object, we have to process for ESC @@ -877,7 +879,7 @@ write_ops::FindAndModifyCommandReply processFindAndModify( auto setObject = updateModifier.getObjectField("$set"); EDCServerCollection::validateEncryptedFieldInfo(setObject, efc); serverPayload = EDCServerCollection::getEncryptedFieldInfo(updateModifier); - processFieldsForInsert(queryImpl, edcNss, serverPayload, efc); + processFieldsForInsert(queryImpl, edcNss, serverPayload, efc, &stmtId); auto pushUpdate = EDCServerCollection::finalizeForUpdate(updateModifier, serverPayload); @@ -889,7 +891,7 @@ write_ops::FindAndModifyCommandReply processFindAndModify( EDCServerCollection::validateEncryptedFieldInfo(replacementDocument, efc); serverPayload = EDCServerCollection::getEncryptedFieldInfo(replacementDocument); - processFieldsForInsert(queryImpl, edcNss, serverPayload, efc); + processFieldsForInsert(queryImpl, edcNss, serverPayload, efc, &stmtId); // Step 2 ---- auto safeContentReplace = @@ -903,8 +905,9 @@ write_ops::FindAndModifyCommandReply processFindAndModify( newFindAndModifyRequest.setUpdate(newUpdateModification); } - // TODO - use this update for retryable writes newFindAndModifyRequest.setNew(false); + newFindAndModifyRequest.setStmtId(stmtId); + ++stmtId; auto reply = queryImpl->findAndModify(edcNss, ei, newFindAndModifyRequest); if (!reply.getValue().has_value() || reply.getValue().value().isEmpty()) { @@ -942,7 +945,7 @@ write_ops::FindAndModifyCommandReply processFindAndModify( auto originalFields = EDCServerCollection::getEncryptedIndexedFields(originalDocument); auto deletedFields = EDCServerCollection::getRemovedTags(originalFields, newFields); - processRemovedFields(queryImpl, edcNss, efc, tokenMap, deletedFields); + processRemovedFields(queryImpl, edcNss, efc, tokenMap, deletedFields, &stmtId); // Step 6 ---- // We don't need to make a second update in the case of a delete @@ -958,8 +961,11 @@ write_ops::FindAndModifyCommandReply processFindAndModify( pullUpdateOpEntry.setU(mongo::write_ops::UpdateModification( pullUpdate, write_ops::UpdateModification::ClassicTag(), false)); newUpdateRequest.setUpdates({pullUpdateOpEntry}); - auto [finalUpdateReply, finalCorrectDocument] = - queryImpl->updateWithPreimage(edcNss, ei, newUpdateRequest); + newUpdateRequest.setLegacyRuntimeConstants(boost::none); + newUpdateRequest.getWriteCommandRequestBase().setStmtId(boost::none); + newUpdateRequest.getWriteCommandRequestBase().setEncryptionInformation(boost::none); + + auto finalUpdateReply = queryImpl->update(edcNss, stmtId, newUpdateRequest); checkWriteErrors(finalUpdateReply); } @@ -1070,44 +1076,33 @@ uint64_t FLEQueryInterfaceImpl::countDocuments(const NamespaceString& nss) { } StatusWith<write_ops::InsertCommandReply> FLEQueryInterfaceImpl::insertDocument( - const NamespaceString& nss, BSONObj obj, bool translateDuplicateKey) { + const NamespaceString& nss, BSONObj obj, StmtId* pStmtId, bool translateDuplicateKey) { write_ops::InsertCommandRequest insertRequest(nss); insertRequest.setDocuments({obj}); - // TODO SERVER-64143 - insertRequest.setWriteConcern - // TODO SERVER-64143 - propagate the retryable statement ids to runCRUDOp - auto response = _txnClient.runCRUDOp(BatchedCommandRequest(insertRequest), {}).get(); - - auto status = response.toStatus(); - if (translateDuplicateKey && status.code() == ErrorCodes::DuplicateKey) { - return Status(ErrorCodes::FLEStateCollectionContention, status.reason()); + int32_t stmtId = *pStmtId; + if (stmtId != kUninitializedStmtId) { + (*pStmtId)++; } - write_ops::InsertCommandReply reply; - - if (response.isLastOpSet()) { - reply.getWriteCommandReplyBase().setOpTime(response.getLastOp()); - } + auto response = _txnClient.runCRUDOp(BatchedCommandRequest(insertRequest), {stmtId}).get(); - if (response.isElectionIdSet()) { - reply.getWriteCommandReplyBase().setElectionId(response.getElectionId()); - } + auto status = response.toStatus(); - reply.getWriteCommandReplyBase().setN(response.getN()); - if (response.isErrDetailsSet()) { - reply.getWriteCommandReplyBase().setWriteErrors(response.getErrDetails()); - } + write_ops::InsertCommandReply reply; - reply.getWriteCommandReplyBase().setRetriedStmtIds(reply.getRetriedStmtIds()); + responseToReply(response, reply.getWriteCommandReplyBase()); return {reply}; } std::pair<write_ops::DeleteCommandReply, BSONObj> FLEQueryInterfaceImpl::deleteWithPreimage( - const NamespaceString& nss, const EncryptionInformation& ei, const write_ops::DeleteCommandRequest& deleteRequest) { + // We only support a single delete + dassert(deleteRequest.getStmtIds().value_or(std::vector<int32_t>()).empty()); + auto deleteOpEntry = deleteRequest.getDeletes()[0]; write_ops::FindAndModifyCommandRequest findAndModifyRequest(nss); @@ -1118,7 +1113,8 @@ std::pair<write_ops::DeleteCommandReply, BSONObj> FLEQueryInterfaceImpl::deleteW findAndModifyRequest.setRemove(true); findAndModifyRequest.setCollation(deleteOpEntry.getCollation()); findAndModifyRequest.setLet(deleteRequest.getLet()); - // TODO SERVER-64143 - writeConcern + findAndModifyRequest.setStmtId(deleteRequest.getStmtId()); + auto ei2 = ei; ei2.setCrudProcessed(true); findAndModifyRequest.setEncryptionInformation(ei2); @@ -1145,6 +1141,9 @@ std::pair<write_ops::UpdateCommandReply, BSONObj> FLEQueryInterfaceImpl::updateW const NamespaceString& nss, const EncryptionInformation& ei, const write_ops::UpdateCommandRequest& updateRequest) { + // We only support a single update + dassert(updateRequest.getStmtIds().value_or(std::vector<int32_t>()).empty()); + auto updateOpEntry = updateRequest.getUpdates()[0]; write_ops::FindAndModifyCommandRequest findAndModifyRequest(nss); @@ -1159,7 +1158,8 @@ std::pair<write_ops::UpdateCommandReply, BSONObj> FLEQueryInterfaceImpl::updateW findAndModifyRequest.setHint(updateOpEntry.getHint()); findAndModifyRequest.setLet( mergeLetAndCVariables(updateRequest.getLet(), updateOpEntry.getC())); - // TODO SERVER-64143 - writeConcern + findAndModifyRequest.setStmtId(updateRequest.getStmtId()); + auto ei2 = ei; ei2.setCrudProcessed(true); findAndModifyRequest.setEncryptionInformation(ei2); @@ -1199,6 +1199,24 @@ std::pair<write_ops::UpdateCommandReply, BSONObj> FLEQueryInterfaceImpl::updateW return {updateReply, reply.getValue().value_or(BSONObj())}; } +write_ops::UpdateCommandReply FLEQueryInterfaceImpl::update( + const NamespaceString& nss, + int32_t stmtId, + const write_ops::UpdateCommandRequest& updateRequest) { + + dassert(updateRequest.getStmtIds().value_or(std::vector<int32_t>()).empty()); + + auto response = _txnClient.runCRUDOp(BatchedCommandRequest(updateRequest), {stmtId}).get(); + + write_ops::UpdateCommandReply reply; + + responseToReply(response, reply.getWriteCommandReplyBase()); + + reply.setNModified(response.getNModified()); + + return {reply}; +} + write_ops::FindAndModifyCommandReply FLEQueryInterfaceImpl::findAndModify( const NamespaceString& nss, const EncryptionInformation& ei, diff --git a/src/mongo/db/fle_crud.h b/src/mongo/db/fle_crud.h index 1fbce135aca..07732356432 100644 --- a/src/mongo/db/fle_crud.h +++ b/src/mongo/db/fle_crud.h @@ -167,7 +167,7 @@ public: * FLEStateCollectionContention instead. */ virtual StatusWith<write_ops::InsertCommandReply> insertDocument( - const NamespaceString& nss, BSONObj obj, bool translateDuplicateKey) = 0; + const NamespaceString& nss, BSONObj obj, StmtId* pStmtId, bool translateDuplicateKey) = 0; /** * Delete a single document with the given query. @@ -191,10 +191,21 @@ public: const EncryptionInformation& ei, const write_ops::UpdateCommandRequest& updateRequest) = 0; + + /** + * Update a single document with the given query and update operators. + * + * Returns an update reply. + */ + virtual write_ops::UpdateCommandReply update( + const NamespaceString& nss, + int32_t stmtId, + const write_ops::UpdateCommandRequest& updateRequest) = 0; + /** * Do a single findAndModify request. * - * TODO + * Returns a findAndModify reply. */ virtual write_ops::FindAndModifyCommandReply findAndModify( const NamespaceString& nss, @@ -220,6 +231,7 @@ public: StatusWith<write_ops::InsertCommandReply> insertDocument(const NamespaceString& nss, BSONObj obj, + int32_t* pStmtId, bool translateDuplicateKey) final; std::pair<write_ops::DeleteCommandReply, BSONObj> deleteWithPreimage( @@ -232,6 +244,11 @@ public: const EncryptionInformation& ei, const write_ops::UpdateCommandRequest& updateRequest) final; + write_ops::UpdateCommandReply update( + const NamespaceString& nss, + int32_t stmtId, + const write_ops::UpdateCommandRequest& updateRequest) final; + write_ops::FindAndModifyCommandReply findAndModify( const NamespaceString& nss, const EncryptionInformation& ei, @@ -270,16 +287,6 @@ private: }; /** - * Runs a callback function inside a transaction, and retrying if the transaction fails - * with a retryable error status. - */ -StatusWith<txn_api::CommitResult> runInTxnWithRetry( - OperationContext* opCtx, - std::shared_ptr<txn_api::TransactionWithRetries> trun, - std::function<SemiFuture<void>(const txn_api::TransactionClient& txnClient, - ExecutorPtr txnExec)> callback); - -/** * Creates a new TransactionWithRetries object that runs a transaction on the * sharding fixed task executor. */ @@ -303,6 +310,7 @@ StatusWith<write_ops::InsertCommandReply> processInsert( const NamespaceString& edcNss, std::vector<EDCServerPayloadInfo>& serverPayload, const EncryptedFieldConfig& efc, + int32_t stmtId, BSONObj document); /** diff --git a/src/mongo/db/fle_crud_test.cpp b/src/mongo/db/fle_crud_test.cpp index 70d362605e4..d84aa78d394 100644 --- a/src/mongo/db/fle_crud_test.cpp +++ b/src/mongo/db/fle_crud_test.cpp @@ -396,7 +396,7 @@ void FleCrudTest::doSingleWideInsert(int id, uint64_t fieldCount, ValueGenerator auto efc = getTestEncryptedFieldConfig(); - uassertStatusOK(processInsert(_queryImpl.get(), _edcNs, serverPayload, efc, result)); + uassertStatusOK(processInsert(_queryImpl.get(), _edcNs, serverPayload, efc, 0, result)); } @@ -457,7 +457,7 @@ void FleCrudTest::doSingleInsert(int id, BSONElement element) { auto efc = getTestEncryptedFieldConfig(); - uassertStatusOK(processInsert(_queryImpl.get(), _edcNs, serverPayload, efc, result)); + uassertStatusOK(processInsert(_queryImpl.get(), _edcNs, serverPayload, efc, 0, result)); } void FleCrudTest::doSingleInsert(int id, BSONObj obj) { diff --git a/src/mongo/db/fle_query_interface_mock.cpp b/src/mongo/db/fle_query_interface_mock.cpp index 316c7be12f0..c6268fc05c2 100644 --- a/src/mongo/db/fle_query_interface_mock.cpp +++ b/src/mongo/db/fle_query_interface_mock.cpp @@ -54,7 +54,7 @@ uint64_t FLEQueryInterfaceMock::countDocuments(const NamespaceString& nss) { } StatusWith<write_ops::InsertCommandReply> FLEQueryInterfaceMock::insertDocument( - const NamespaceString& nss, BSONObj obj, bool translateDuplicateKey) { + const NamespaceString& nss, BSONObj obj, StmtId* pStmtId, bool translateDuplicateKey) { repl::TimestampedBSONObj tb; tb.obj = obj; @@ -131,6 +131,14 @@ std::pair<write_ops::UpdateCommandReply, BSONObj> FLEQueryInterfaceMock::updateW return {write_ops::UpdateCommandReply(), preimage}; } +write_ops::UpdateCommandReply FLEQueryInterfaceMock::update( + const NamespaceString& nss, + int32_t stmtId, + const write_ops::UpdateCommandRequest& updateRequest) { + auto [reply, _] = updateWithPreimage(nss, EncryptionInformation(), updateRequest); + return reply; +} + write_ops::FindAndModifyCommandReply FLEQueryInterfaceMock::findAndModify( const NamespaceString& nss, const EncryptionInformation& ei, diff --git a/src/mongo/db/fle_query_interface_mock.h b/src/mongo/db/fle_query_interface_mock.h index c2f480e870f..229d2c08dfe 100644 --- a/src/mongo/db/fle_query_interface_mock.h +++ b/src/mongo/db/fle_query_interface_mock.h @@ -49,6 +49,7 @@ public: StatusWith<write_ops::InsertCommandReply> insertDocument(const NamespaceString& nss, BSONObj obj, + StmtId* pStmtId, bool translateDuplicateKey) final; std::pair<write_ops::DeleteCommandReply, BSONObj> deleteWithPreimage( @@ -61,6 +62,11 @@ public: const EncryptionInformation& ei, const write_ops::UpdateCommandRequest& updateRequest) final; + write_ops::UpdateCommandReply update( + const NamespaceString& nss, + int32_t stmtId, + const write_ops::UpdateCommandRequest& updateRequest) final; + write_ops::FindAndModifyCommandReply findAndModify( const NamespaceString& nss, const EncryptionInformation& ei, |