diff options
author | Mark Benvenuto <mark.benvenuto@mongodb.com> | 2022-03-17 22:38:48 -0400 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-03-22 04:03:55 +0000 |
commit | 3c97f5a618d359181c83c3d1d5018c581bb5cbff (patch) | |
tree | 306a44326e0c7267879f0c3fffc3ea5fe6899cee | |
parent | bb0a88a3a1cc2d97935cd4adb6e3733604e1623c (diff) | |
download | mongo-3c97f5a618d359181c83c3d1d5018c581bb5cbff.tar.gz |
SERVER-64399 Support replacement style update and upsert
-rw-r--r-- | src/mongo/db/fle_crud.cpp | 180 | ||||
-rw-r--r-- | src/mongo/db/fle_crud.h | 11 | ||||
-rw-r--r-- | src/mongo/db/fle_crud_test.cpp | 80 |
3 files changed, 209 insertions, 62 deletions
diff --git a/src/mongo/db/fle_crud.cpp b/src/mongo/db/fle_crud.cpp index ad72f827dc3..2420308ccfa 100644 --- a/src/mongo/db/fle_crud.cpp +++ b/src/mongo/db/fle_crud.cpp @@ -75,9 +75,10 @@ public: const EncryptionInformation& ei, const write_ops::DeleteCommandRequest& deleteRequest) final; - BSONObj updateWithPreimage(const NamespaceString& nss, - const EncryptionInformation& ei, - const write_ops::UpdateCommandRequest& updateRequest) final; + std::pair<write_ops::UpdateCommandReply, BSONObj> updateWithPreimage( + const NamespaceString& nss, + const EncryptionInformation& ei, + const write_ops::UpdateCommandRequest& updateRequest) final; write_ops::FindAndModifyCommandReply findAndModify( const NamespaceString& nss, @@ -296,15 +297,14 @@ boost::optional<BSONObj> mergeLetAndCVariables(const boost::optional<BSONObj>& l } else if (let.has_value() && c.has_value()) { BSONObj obj = let.value(); // Prioritize the fields in c over the fields in let in case of duplicates - obj.addFields(c.value()); - return {obj}; + return obj.addFields(c.value()); } else if (let.has_value()) { return let; } return c; } -BSONObj FLEQueryInterfaceImpl::updateWithPreimage( +std::pair<write_ops::UpdateCommandReply, BSONObj> FLEQueryInterfaceImpl::updateWithPreimage( const NamespaceString& nss, const EncryptionInformation& ei, const write_ops::UpdateCommandRequest& updateRequest) { @@ -314,7 +314,7 @@ BSONObj FLEQueryInterfaceImpl::updateWithPreimage( findAndModifyRequest.setQuery(updateOpEntry.getQ()); findAndModifyRequest.setUpdate(updateOpEntry.getU()); findAndModifyRequest.setBatchSize(1); - findAndModifyRequest.setUpsert(false); + findAndModifyRequest.setUpsert(updateOpEntry.getUpsert()); findAndModifyRequest.setSingleBatch(true); findAndModifyRequest.setRemove(false); findAndModifyRequest.setArrayFilters(updateOpEntry.getArrayFilters()); @@ -334,11 +334,32 @@ BSONObj FLEQueryInterfaceImpl::updateWithPreimage( auto reply = write_ops::FindAndModifyCommandReply::parse(IDLParserErrorContext("reply"), response); - if (!reply.getValue().has_value()) { - return BSONObj(); + write_ops::UpdateCommandReply updateReply; + + if (!status.isOK()) { + updateReply.getWriteCommandReplyBase().setN(0); + updateReply.getWriteCommandReplyBase().setWriteErrors(singleStatusToWriteErrors(status)); + } else { + if (reply.getRetriedStmtId().has_value()) { + updateReply.getWriteCommandReplyBase().setRetriedStmtIds( + std::vector<std::int32_t>{reply.getRetriedStmtId().value()}); + } + updateReply.getWriteCommandReplyBase().setN(reply.getLastErrorObject().getNumDocs()); + + if (reply.getLastErrorObject().getUpserted().has_value()) { + write_ops::Upserted upserted; + upserted.setIndex(0); + upserted.set_id(reply.getLastErrorObject().getUpserted().value()); + updateReply.setUpserted(std::vector<mongo::write_ops::Upserted>{upserted}); + } + + if (reply.getLastErrorObject().getNumDocs() > 0) { + updateReply.setNModified(1); + updateReply.getWriteCommandReplyBase().setN(1); + } } - return reply.getValue().value(); + return {updateReply, reply.getValue().value_or(BSONObj())}; } write_ops::FindAndModifyCommandReply FLEQueryInterfaceImpl::findAndModify( @@ -549,9 +570,10 @@ StatusWith<uint64_t> processDelete(OperationContext* opCtx, return count; } -StatusWith<uint64_t> processUpdate(OperationContext* opCtx, - const write_ops::UpdateCommandRequest& updateRequest, - GetTxnCallback getTxns) { +StatusWith<write_ops::UpdateCommandReply> processUpdate( + OperationContext* opCtx, + const write_ops::UpdateCommandRequest& updateRequest, + std::function<std::shared_ptr<txn_api::TransactionWithRetries>(OperationContext*)> getTxns) { auto updates = updateRequest.getUpdates(); uassert(6371502, "Only single document updates are permitted", updates.size() == 1); @@ -571,8 +593,8 @@ StatusWith<uint64_t> processUpdate(OperationContext* opCtx, // The function that handles the transaction may outlive this function so we need to use // shared_ptrs - uint64_t count = 0; - auto updateBlock = std::tie(updateRequest, count); + write_ops::UpdateCommandReply reply; + auto updateBlock = std::tie(updateRequest, reply); auto sharedupdateBlock = std::make_shared<decltype(updateBlock)>(updateBlock); auto swResult = runInTxnWithRetry( @@ -581,9 +603,9 @@ StatusWith<uint64_t> processUpdate(OperationContext* opCtx, [sharedupdateBlock](const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) { FLEQueryInterfaceImpl queryImpl(txnClient); - auto [updateRequest2, count] = *sharedupdateBlock.get(); + auto [updateRequest2, reply] = *sharedupdateBlock.get(); - count = processUpdate(&queryImpl, updateRequest2); + reply = processUpdate(&queryImpl, updateRequest2); return SemiFuture<void>::makeReady(); }); @@ -592,7 +614,7 @@ StatusWith<uint64_t> processUpdate(OperationContext* opCtx, return swResult.getStatus(); } - return count; + return reply; } namespace { @@ -781,6 +803,14 @@ StatusWith<write_ops::FindAndModifyCommandReply> processFindAndModifyRequest( "findAndModify fields must be empty", findAndModifyRequest.getFields().value_or(BSONObj()).isEmpty()); + // pipeline - is agg specific, delta is oplog, transform is internal (timeseries) + auto updateModicationType = + findAndModifyRequest.getUpdate().value_or(write_ops::UpdateModification()).type(); + uassert(6439901, + "FLE only supports modifier and replacement style updates", + updateModicationType == write_ops::UpdateModification::Type::kModifier || + updateModicationType == write_ops::UpdateModification::Type::kReplacement); + std::shared_ptr<txn_api::TransactionWithRetries> trun = getTxns(opCtx); // The function that handles the transaction may outlive this function so we need to use @@ -865,39 +895,63 @@ uint64_t processDelete(FLEQueryInterface* queryImpl, * 5. Find the removed fields and update ECC * 6. Remove the stale tags from the original document with a new push */ -uint64_t processUpdate(FLEQueryInterface* queryImpl, - const write_ops::UpdateCommandRequest& updateRequest) { +write_ops::UpdateCommandReply processUpdate(FLEQueryInterface* queryImpl, + const write_ops::UpdateCommandRequest& updateRequest) { auto edcNss = updateRequest.getNamespace(); auto ei = updateRequest.getEncryptionInformation().get(); auto efc = EncryptionInformationHelpers::getAndValidateSchema(edcNss, ei); auto tokenMap = EncryptionInformationHelpers::getDeleteTokens(edcNss, ei); + const auto updateOpEntry = updateRequest.getUpdates()[0]; - auto updateOpEntry = updateRequest.getUpdates()[0]; - auto updateModifier = updateOpEntry.getU().getUpdateModifier(); + const auto updateModification = updateOpEntry.getU(); // Step 1 ---- - auto serverPayload = EDCServerCollection::getEncryptedFieldInfo(updateModifier); + std::vector<EDCServerPayloadInfo> serverPayload; + auto newUpdateOpEntry = updateRequest.getUpdates()[0]; - processFieldsForInsert(queryImpl, edcNss, serverPayload, efc); + if (updateModification.type() == write_ops::UpdateModification::Type::kModifier) { + auto updateModifier = updateModification.getUpdateModifier(); + serverPayload = EDCServerCollection::getEncryptedFieldInfo(updateModifier); - // Step 2 ---- - auto pushUpdate = EDCServerCollection::finalizeForUpdate(updateModifier, serverPayload); + processFieldsForInsert(queryImpl, edcNss, serverPayload, efc); - // Step 3 ---- - auto newUpdateOpEntry = updateRequest.getUpdates()[0]; + // Step 2 ---- + auto pushUpdate = EDCServerCollection::finalizeForUpdate(updateModifier, serverPayload); + + newUpdateOpEntry.setU(mongo::write_ops::UpdateModification( + pushUpdate, write_ops::UpdateModification::ClassicTag(), false)); - newUpdateOpEntry.setU(mongo::write_ops::UpdateModification( - pushUpdate, write_ops::UpdateModification::ClassicTag(), false)); + } else { + auto replacementDocument = updateModification.getUpdateReplacement(); + serverPayload = EDCServerCollection::getEncryptedFieldInfo(replacementDocument); + + processFieldsForInsert(queryImpl, edcNss, serverPayload, efc); + + // Step 2 ---- + auto safeContentReplace = + EDCServerCollection::finalizeForInsert(replacementDocument, serverPayload); + + newUpdateOpEntry.setU(mongo::write_ops::UpdateModification( + safeContentReplace, write_ops::UpdateModification::ClassicTag(), true)); + } + + // Step 3 ---- auto newUpdateRequest = updateRequest; newUpdateRequest.setUpdates({newUpdateOpEntry}); // TODO - use this update for retryable writes - BSONObj originalDocument = queryImpl->updateWithPreimage(edcNss, ei, newUpdateRequest); + auto [updateReply, originalDocument] = + queryImpl->updateWithPreimage(edcNss, ei, newUpdateRequest); if (originalDocument.isEmpty()) { // if there is no preimage, then we did not update any documents, we are done - return 0; + return updateReply; + } + + // If there are errors, we are done + if (updateReply.getWriteErrors().has_value() && !updateReply.getWriteErrors().value().empty()) { + return updateReply; } // Step 4 ---- @@ -929,9 +983,9 @@ uint64_t processUpdate(FLEQueryInterface* queryImpl, pullUpdateOpEntry.setU(mongo::write_ops::UpdateModification( pullUpdate, write_ops::UpdateModification::ClassicTag(), false)); newUpdateRequest.setUpdates({pullUpdateOpEntry}); - BSONObj finalCorrectDocument = queryImpl->updateWithPreimage(edcNss, ei, newUpdateRequest); + /* ignore */ queryImpl->updateWithPreimage(edcNss, ei, newUpdateRequest); - return 1; + return updateReply; } @@ -985,8 +1039,24 @@ FLEBatchResult processFLEBatch(OperationContext* opCtx, response->setNModified(0); } else { response->setStatus(Status::OK()); - response->setN(swResult.getValue()); - response->setNModified(swResult.getValue()); + auto updateReply = swResult.getValue(); + response->setN(updateReply.getN()); + response->setNModified(updateReply.getNModified()); + + if (updateReply.getUpserted().has_value() && + updateReply.getUpserted().value().size() > 0) { + + auto upsertReply = updateReply.getUpserted().value()[0]; + + BatchedUpsertDetail upsert; + upsert.setIndex(upsertReply.getIndex()); + upsert.setUpsertedID(upsertReply.get_id().getElement().wrap("")); + + std::vector<BatchedUpsertDetail*> upserts; + upserts.push_back(&upsert); + + response->setUpsertDetails(upserts); + } } return FLEBatchResult::kProcessed; @@ -1010,17 +1080,37 @@ write_ops::FindAndModifyCommandReply processFindAndModify( // If we have an update object, we have to process for ESC auto newFindAndModifyRequest = findAndModifyRequest; if (findAndModifyRequest.getUpdate().has_value()) { - auto updateModifier = findAndModifyRequest.getUpdate().value().getUpdateModifier(); - auto serverPayload = EDCServerCollection::getEncryptedFieldInfo(updateModifier); - processFieldsForInsert(queryImpl, edcNss, serverPayload, efc); + std::vector<EDCServerPayloadInfo> serverPayload; + const auto updateModification = findAndModifyRequest.getUpdate().value(); + write_ops::UpdateModification newUpdateModification; - // Step 2 ---- - auto pushUpdate = EDCServerCollection::finalizeForUpdate(updateModifier, serverPayload); + if (updateModification.type() == write_ops::UpdateModification::Type::kModifier) { + auto updateModifier = updateModification.getUpdateModifier(); + serverPayload = EDCServerCollection::getEncryptedFieldInfo(updateModifier); + processFieldsForInsert(queryImpl, edcNss, serverPayload, efc); + + auto pushUpdate = EDCServerCollection::finalizeForUpdate(updateModifier, serverPayload); + + // Step 2 ---- + newUpdateModification = write_ops::UpdateModification( + pushUpdate, write_ops::UpdateModification::ClassicTag(), false); + } else { + auto replacementDocument = updateModification.getUpdateReplacement(); + serverPayload = EDCServerCollection::getEncryptedFieldInfo(replacementDocument); + + processFieldsForInsert(queryImpl, edcNss, serverPayload, efc); + + // Step 2 ---- + auto safeContentReplace = + EDCServerCollection::finalizeForInsert(replacementDocument, serverPayload); + + newUpdateModification = write_ops::UpdateModification( + safeContentReplace, write_ops::UpdateModification::ClassicTag(), true); + } // Step 3 ---- - newFindAndModifyRequest.setUpdate(mongo::write_ops::UpdateModification( - pushUpdate, write_ops::UpdateModification::ClassicTag(), false)); + newFindAndModifyRequest.setUpdate(newUpdateModification); } // TODO - use this update for retryable writes @@ -1078,7 +1168,9 @@ write_ops::FindAndModifyCommandReply processFindAndModify( pullUpdateOpEntry.setU(mongo::write_ops::UpdateModification( pullUpdate, write_ops::UpdateModification::ClassicTag(), false)); newUpdateRequest.setUpdates({pullUpdateOpEntry}); - BSONObj finalCorrectDocument = queryImpl->updateWithPreimage(edcNss, ei, newUpdateRequest); + auto [finalUpdateReply, finalCorrectDocument] = + queryImpl->updateWithPreimage(edcNss, ei, newUpdateRequest); + checkWriteErrors(finalUpdateReply); } return reply; diff --git a/src/mongo/db/fle_crud.h b/src/mongo/db/fle_crud.h index f49baacbf36..b26e76703ae 100644 --- a/src/mongo/db/fle_crud.h +++ b/src/mongo/db/fle_crud.h @@ -142,9 +142,10 @@ public: * Returns the pre-image of the updated document. If no documents were updated, returns an empty * BSON object. */ - virtual BSONObj updateWithPreimage(const NamespaceString& nss, - const EncryptionInformation& ei, - const write_ops::UpdateCommandRequest& updateRequest) = 0; + virtual std::pair<write_ops::UpdateCommandReply, BSONObj> updateWithPreimage( + const NamespaceString& nss, + const EncryptionInformation& ei, + const write_ops::UpdateCommandRequest& updateRequest) = 0; /** * Do a single findAndModify request. @@ -182,8 +183,8 @@ uint64_t processDelete(FLEQueryInterface* queryImpl, * * Used by unit tests. */ -uint64_t processUpdate(FLEQueryInterface* queryImpl, - const write_ops::UpdateCommandRequest& updateRequest); +write_ops::UpdateCommandReply processUpdate(FLEQueryInterface* queryImpl, + const write_ops::UpdateCommandRequest& updateRequest); /** * Callback function to get a TransactionWithRetries with the appropiate Executor diff --git a/src/mongo/db/fle_crud_test.cpp b/src/mongo/db/fle_crud_test.cpp index 6477037e99e..743a166cce9 100644 --- a/src/mongo/db/fle_crud_test.cpp +++ b/src/mongo/db/fle_crud_test.cpp @@ -93,9 +93,10 @@ public: const EncryptionInformation& ei, const write_ops::DeleteCommandRequest& deleteRequest) final; - BSONObj updateWithPreimage(const NamespaceString& nss, - const EncryptionInformation& ei, - const write_ops::UpdateCommandRequest& updateRequest) final; + std::pair<write_ops::UpdateCommandReply, BSONObj> updateWithPreimage( + const NamespaceString& nss, + const EncryptionInformation& ei, + const write_ops::UpdateCommandRequest& updateRequest) final; write_ops::FindAndModifyCommandReply findAndModify( const NamespaceString& nss, @@ -154,9 +155,10 @@ BSONObj FLEQueryTestImpl::deleteWithPreimage(const NamespaceString& nss, return uassertStatusOK(swDoc); } -BSONObj FLEQueryTestImpl::updateWithPreimage(const NamespaceString& nss, - const EncryptionInformation& ei, - const write_ops::UpdateCommandRequest& updateRequest) { +std::pair<write_ops::UpdateCommandReply, BSONObj> FLEQueryTestImpl::updateWithPreimage( + const NamespaceString& nss, + const EncryptionInformation& ei, + const write_ops::UpdateCommandRequest& updateRequest) { // A limit of the API, we can delete by _id and get the pre-image so we limit our unittests to // this ASSERT_EQ(updateRequest.getUpdates().size(), 1); @@ -165,12 +167,20 @@ BSONObj FLEQueryTestImpl::updateWithPreimage(const NamespaceString& nss, BSONObj preimage = getById(nss, updateOpEntry.getQ().firstElement()); - uassertStatusOK(_storage->upsertById(_opCtx, - nss, - updateOpEntry.getQ().firstElement(), - updateOpEntry.getU().getUpdateModifier())); + if (updateOpEntry.getU().type() == write_ops::UpdateModification::Type::kModifier) { + uassertStatusOK(_storage->upsertById(_opCtx, + nss, + updateOpEntry.getQ().firstElement(), + updateOpEntry.getU().getUpdateModifier())); + } else { + uassertStatusOK(_storage->upsertById(_opCtx, + nss, + updateOpEntry.getQ().firstElement(), + updateOpEntry.getU().getUpdateReplacement())); + } + - return preimage; + return {write_ops::UpdateCommandReply(), preimage}; } write_ops::FindAndModifyCommandReply FLEQueryTestImpl::findAndModify( @@ -286,6 +296,7 @@ protected: void doSingleUpdate(int id, BSONElement element); void doSingleUpdate(int id, BSONObj obj); void doSingleUpdateWithUpdateDoc(int id, BSONObj update); + void doSingleUpdateWithUpdateDoc(int id, const write_ops::UpdateModification& modification); void doFindAndModify(write_ops::FindAndModifyCommandRequest& request); @@ -610,6 +621,14 @@ void FleCrudTest::doSingleUpdate(int id, BSONElement element) { } void FleCrudTest::doSingleUpdateWithUpdateDoc(int id, BSONObj update) { + doSingleUpdateWithUpdateDoc( + id, + write_ops::UpdateModification(update, write_ops::UpdateModification::ClassicTag{}, false)); +} + +void FleCrudTest::doSingleUpdateWithUpdateDoc(int id, + const write_ops::UpdateModification& modification) { + auto efc = getTestEncryptedFieldConfig(); auto doc = EncryptionInformationHelpers::encryptionInformationSerializeForDelete( _edcNs, efc, &_keyVault); @@ -617,8 +636,7 @@ void FleCrudTest::doSingleUpdateWithUpdateDoc(int id, BSONObj update) { write_ops::UpdateOpEntry entry; entry.setQ(BSON("_id" << id)); - entry.setU( - write_ops::UpdateModification(update, write_ops::UpdateModification::ClassicTag{}, false)); + entry.setU(modification); write_ops::UpdateCommandRequest updateRequest(_edcNs); updateRequest.setUpdates({entry}); @@ -950,6 +968,42 @@ TEST_F(FleCrudTest, UpdateOneSameValue) { << "secret")); } + +// Update one document with replacement +TEST_F(FleCrudTest, UpdateOneReplace) { + + doSingleInsert(1, + BSON("encrypted" + << "secret")); + + assertDocumentCounts(1, 1, 0, 1); + + auto replace = BSON("encrypted" + << "top secret"); + + auto buf = generateSinglePlaceholder(replace.firstElement()); + + auto replaceEP = BSON("plainText" + << "fake" + << "encrypted" + << BSONBinData(buf.data(), buf.size(), BinDataType::Encrypt)); + + auto result = FLEClientCrypto::generateInsertOrUpdateFromPlaceholders(replaceEP, &_keyVault); + + doSingleUpdateWithUpdateDoc( + 1, + write_ops::UpdateModification(result, write_ops::UpdateModification::ClassicTag{}, true)); + + + assertDocumentCounts(1, 2, 1, 3); + + validateDocument(1, + BSON("_id" << 1 << "plainText" + << "fake" + << "encrypted" + << "top secret")); +} + // Rename safeContent TEST_F(FleCrudTest, RenameSafeContent) { |