summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMark Benvenuto <mark.benvenuto@mongodb.com>2022-03-17 22:38:48 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-03-22 04:03:55 +0000
commit3c97f5a618d359181c83c3d1d5018c581bb5cbff (patch)
tree306a44326e0c7267879f0c3fffc3ea5fe6899cee
parentbb0a88a3a1cc2d97935cd4adb6e3733604e1623c (diff)
downloadmongo-3c97f5a618d359181c83c3d1d5018c581bb5cbff.tar.gz
SERVER-64399 Support replacement style update and upsert
-rw-r--r--src/mongo/db/fle_crud.cpp180
-rw-r--r--src/mongo/db/fle_crud.h11
-rw-r--r--src/mongo/db/fle_crud_test.cpp80
3 files changed, 209 insertions, 62 deletions
diff --git a/src/mongo/db/fle_crud.cpp b/src/mongo/db/fle_crud.cpp
index ad72f827dc3..2420308ccfa 100644
--- a/src/mongo/db/fle_crud.cpp
+++ b/src/mongo/db/fle_crud.cpp
@@ -75,9 +75,10 @@ public:
const EncryptionInformation& ei,
const write_ops::DeleteCommandRequest& deleteRequest) final;
- BSONObj updateWithPreimage(const NamespaceString& nss,
- const EncryptionInformation& ei,
- const write_ops::UpdateCommandRequest& updateRequest) final;
+ std::pair<write_ops::UpdateCommandReply, BSONObj> updateWithPreimage(
+ const NamespaceString& nss,
+ const EncryptionInformation& ei,
+ const write_ops::UpdateCommandRequest& updateRequest) final;
write_ops::FindAndModifyCommandReply findAndModify(
const NamespaceString& nss,
@@ -296,15 +297,14 @@ boost::optional<BSONObj> mergeLetAndCVariables(const boost::optional<BSONObj>& l
} else if (let.has_value() && c.has_value()) {
BSONObj obj = let.value();
// Prioritize the fields in c over the fields in let in case of duplicates
- obj.addFields(c.value());
- return {obj};
+ return obj.addFields(c.value());
} else if (let.has_value()) {
return let;
}
return c;
}
-BSONObj FLEQueryInterfaceImpl::updateWithPreimage(
+std::pair<write_ops::UpdateCommandReply, BSONObj> FLEQueryInterfaceImpl::updateWithPreimage(
const NamespaceString& nss,
const EncryptionInformation& ei,
const write_ops::UpdateCommandRequest& updateRequest) {
@@ -314,7 +314,7 @@ BSONObj FLEQueryInterfaceImpl::updateWithPreimage(
findAndModifyRequest.setQuery(updateOpEntry.getQ());
findAndModifyRequest.setUpdate(updateOpEntry.getU());
findAndModifyRequest.setBatchSize(1);
- findAndModifyRequest.setUpsert(false);
+ findAndModifyRequest.setUpsert(updateOpEntry.getUpsert());
findAndModifyRequest.setSingleBatch(true);
findAndModifyRequest.setRemove(false);
findAndModifyRequest.setArrayFilters(updateOpEntry.getArrayFilters());
@@ -334,11 +334,32 @@ BSONObj FLEQueryInterfaceImpl::updateWithPreimage(
auto reply =
write_ops::FindAndModifyCommandReply::parse(IDLParserErrorContext("reply"), response);
- if (!reply.getValue().has_value()) {
- return BSONObj();
+ write_ops::UpdateCommandReply updateReply;
+
+ if (!status.isOK()) {
+ updateReply.getWriteCommandReplyBase().setN(0);
+ updateReply.getWriteCommandReplyBase().setWriteErrors(singleStatusToWriteErrors(status));
+ } else {
+ if (reply.getRetriedStmtId().has_value()) {
+ updateReply.getWriteCommandReplyBase().setRetriedStmtIds(
+ std::vector<std::int32_t>{reply.getRetriedStmtId().value()});
+ }
+ updateReply.getWriteCommandReplyBase().setN(reply.getLastErrorObject().getNumDocs());
+
+ if (reply.getLastErrorObject().getUpserted().has_value()) {
+ write_ops::Upserted upserted;
+ upserted.setIndex(0);
+ upserted.set_id(reply.getLastErrorObject().getUpserted().value());
+ updateReply.setUpserted(std::vector<mongo::write_ops::Upserted>{upserted});
+ }
+
+ if (reply.getLastErrorObject().getNumDocs() > 0) {
+ updateReply.setNModified(1);
+ updateReply.getWriteCommandReplyBase().setN(1);
+ }
}
- return reply.getValue().value();
+ return {updateReply, reply.getValue().value_or(BSONObj())};
}
write_ops::FindAndModifyCommandReply FLEQueryInterfaceImpl::findAndModify(
@@ -549,9 +570,10 @@ StatusWith<uint64_t> processDelete(OperationContext* opCtx,
return count;
}
-StatusWith<uint64_t> processUpdate(OperationContext* opCtx,
- const write_ops::UpdateCommandRequest& updateRequest,
- GetTxnCallback getTxns) {
+StatusWith<write_ops::UpdateCommandReply> processUpdate(
+ OperationContext* opCtx,
+ const write_ops::UpdateCommandRequest& updateRequest,
+ std::function<std::shared_ptr<txn_api::TransactionWithRetries>(OperationContext*)> getTxns) {
auto updates = updateRequest.getUpdates();
uassert(6371502, "Only single document updates are permitted", updates.size() == 1);
@@ -571,8 +593,8 @@ StatusWith<uint64_t> processUpdate(OperationContext* opCtx,
// The function that handles the transaction may outlive this function so we need to use
// shared_ptrs
- uint64_t count = 0;
- auto updateBlock = std::tie(updateRequest, count);
+ write_ops::UpdateCommandReply reply;
+ auto updateBlock = std::tie(updateRequest, reply);
auto sharedupdateBlock = std::make_shared<decltype(updateBlock)>(updateBlock);
auto swResult = runInTxnWithRetry(
@@ -581,9 +603,9 @@ StatusWith<uint64_t> processUpdate(OperationContext* opCtx,
[sharedupdateBlock](const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) {
FLEQueryInterfaceImpl queryImpl(txnClient);
- auto [updateRequest2, count] = *sharedupdateBlock.get();
+ auto [updateRequest2, reply] = *sharedupdateBlock.get();
- count = processUpdate(&queryImpl, updateRequest2);
+ reply = processUpdate(&queryImpl, updateRequest2);
return SemiFuture<void>::makeReady();
});
@@ -592,7 +614,7 @@ StatusWith<uint64_t> processUpdate(OperationContext* opCtx,
return swResult.getStatus();
}
- return count;
+ return reply;
}
namespace {
@@ -781,6 +803,14 @@ StatusWith<write_ops::FindAndModifyCommandReply> processFindAndModifyRequest(
"findAndModify fields must be empty",
findAndModifyRequest.getFields().value_or(BSONObj()).isEmpty());
+ // pipeline - is agg specific, delta is oplog, transform is internal (timeseries)
+ auto updateModicationType =
+ findAndModifyRequest.getUpdate().value_or(write_ops::UpdateModification()).type();
+ uassert(6439901,
+ "FLE only supports modifier and replacement style updates",
+ updateModicationType == write_ops::UpdateModification::Type::kModifier ||
+ updateModicationType == write_ops::UpdateModification::Type::kReplacement);
+
std::shared_ptr<txn_api::TransactionWithRetries> trun = getTxns(opCtx);
// The function that handles the transaction may outlive this function so we need to use
@@ -865,39 +895,63 @@ uint64_t processDelete(FLEQueryInterface* queryImpl,
* 5. Find the removed fields and update ECC
* 6. Remove the stale tags from the original document with a new push
*/
-uint64_t processUpdate(FLEQueryInterface* queryImpl,
- const write_ops::UpdateCommandRequest& updateRequest) {
+write_ops::UpdateCommandReply processUpdate(FLEQueryInterface* queryImpl,
+ const write_ops::UpdateCommandRequest& updateRequest) {
auto edcNss = updateRequest.getNamespace();
auto ei = updateRequest.getEncryptionInformation().get();
auto efc = EncryptionInformationHelpers::getAndValidateSchema(edcNss, ei);
auto tokenMap = EncryptionInformationHelpers::getDeleteTokens(edcNss, ei);
+ const auto updateOpEntry = updateRequest.getUpdates()[0];
- auto updateOpEntry = updateRequest.getUpdates()[0];
- auto updateModifier = updateOpEntry.getU().getUpdateModifier();
+ const auto updateModification = updateOpEntry.getU();
// Step 1 ----
- auto serverPayload = EDCServerCollection::getEncryptedFieldInfo(updateModifier);
+ std::vector<EDCServerPayloadInfo> serverPayload;
+ auto newUpdateOpEntry = updateRequest.getUpdates()[0];
- processFieldsForInsert(queryImpl, edcNss, serverPayload, efc);
+ if (updateModification.type() == write_ops::UpdateModification::Type::kModifier) {
+ auto updateModifier = updateModification.getUpdateModifier();
+ serverPayload = EDCServerCollection::getEncryptedFieldInfo(updateModifier);
- // Step 2 ----
- auto pushUpdate = EDCServerCollection::finalizeForUpdate(updateModifier, serverPayload);
+ processFieldsForInsert(queryImpl, edcNss, serverPayload, efc);
- // Step 3 ----
- auto newUpdateOpEntry = updateRequest.getUpdates()[0];
+ // Step 2 ----
+ auto pushUpdate = EDCServerCollection::finalizeForUpdate(updateModifier, serverPayload);
+
+ newUpdateOpEntry.setU(mongo::write_ops::UpdateModification(
+ pushUpdate, write_ops::UpdateModification::ClassicTag(), false));
- newUpdateOpEntry.setU(mongo::write_ops::UpdateModification(
- pushUpdate, write_ops::UpdateModification::ClassicTag(), false));
+ } else {
+ auto replacementDocument = updateModification.getUpdateReplacement();
+ serverPayload = EDCServerCollection::getEncryptedFieldInfo(replacementDocument);
+
+ processFieldsForInsert(queryImpl, edcNss, serverPayload, efc);
+
+ // Step 2 ----
+ auto safeContentReplace =
+ EDCServerCollection::finalizeForInsert(replacementDocument, serverPayload);
+
+ newUpdateOpEntry.setU(mongo::write_ops::UpdateModification(
+ safeContentReplace, write_ops::UpdateModification::ClassicTag(), true));
+ }
+
+ // Step 3 ----
auto newUpdateRequest = updateRequest;
newUpdateRequest.setUpdates({newUpdateOpEntry});
// TODO - use this update for retryable writes
- BSONObj originalDocument = queryImpl->updateWithPreimage(edcNss, ei, newUpdateRequest);
+ auto [updateReply, originalDocument] =
+ queryImpl->updateWithPreimage(edcNss, ei, newUpdateRequest);
if (originalDocument.isEmpty()) {
// if there is no preimage, then we did not update any documents, we are done
- return 0;
+ return updateReply;
+ }
+
+ // If there are errors, we are done
+ if (updateReply.getWriteErrors().has_value() && !updateReply.getWriteErrors().value().empty()) {
+ return updateReply;
}
// Step 4 ----
@@ -929,9 +983,9 @@ uint64_t processUpdate(FLEQueryInterface* queryImpl,
pullUpdateOpEntry.setU(mongo::write_ops::UpdateModification(
pullUpdate, write_ops::UpdateModification::ClassicTag(), false));
newUpdateRequest.setUpdates({pullUpdateOpEntry});
- BSONObj finalCorrectDocument = queryImpl->updateWithPreimage(edcNss, ei, newUpdateRequest);
+ /* ignore */ queryImpl->updateWithPreimage(edcNss, ei, newUpdateRequest);
- return 1;
+ return updateReply;
}
@@ -985,8 +1039,24 @@ FLEBatchResult processFLEBatch(OperationContext* opCtx,
response->setNModified(0);
} else {
response->setStatus(Status::OK());
- response->setN(swResult.getValue());
- response->setNModified(swResult.getValue());
+ auto updateReply = swResult.getValue();
+ response->setN(updateReply.getN());
+ response->setNModified(updateReply.getNModified());
+
+ if (updateReply.getUpserted().has_value() &&
+ updateReply.getUpserted().value().size() > 0) {
+
+ auto upsertReply = updateReply.getUpserted().value()[0];
+
+ BatchedUpsertDetail upsert;
+ upsert.setIndex(upsertReply.getIndex());
+ upsert.setUpsertedID(upsertReply.get_id().getElement().wrap(""));
+
+ std::vector<BatchedUpsertDetail*> upserts;
+ upserts.push_back(&upsert);
+
+ response->setUpsertDetails(upserts);
+ }
}
return FLEBatchResult::kProcessed;
@@ -1010,17 +1080,37 @@ write_ops::FindAndModifyCommandReply processFindAndModify(
// If we have an update object, we have to process for ESC
auto newFindAndModifyRequest = findAndModifyRequest;
if (findAndModifyRequest.getUpdate().has_value()) {
- auto updateModifier = findAndModifyRequest.getUpdate().value().getUpdateModifier();
- auto serverPayload = EDCServerCollection::getEncryptedFieldInfo(updateModifier);
- processFieldsForInsert(queryImpl, edcNss, serverPayload, efc);
+ std::vector<EDCServerPayloadInfo> serverPayload;
+ const auto updateModification = findAndModifyRequest.getUpdate().value();
+ write_ops::UpdateModification newUpdateModification;
- // Step 2 ----
- auto pushUpdate = EDCServerCollection::finalizeForUpdate(updateModifier, serverPayload);
+ if (updateModification.type() == write_ops::UpdateModification::Type::kModifier) {
+ auto updateModifier = updateModification.getUpdateModifier();
+ serverPayload = EDCServerCollection::getEncryptedFieldInfo(updateModifier);
+ processFieldsForInsert(queryImpl, edcNss, serverPayload, efc);
+
+ auto pushUpdate = EDCServerCollection::finalizeForUpdate(updateModifier, serverPayload);
+
+ // Step 2 ----
+ newUpdateModification = write_ops::UpdateModification(
+ pushUpdate, write_ops::UpdateModification::ClassicTag(), false);
+ } else {
+ auto replacementDocument = updateModification.getUpdateReplacement();
+ serverPayload = EDCServerCollection::getEncryptedFieldInfo(replacementDocument);
+
+ processFieldsForInsert(queryImpl, edcNss, serverPayload, efc);
+
+ // Step 2 ----
+ auto safeContentReplace =
+ EDCServerCollection::finalizeForInsert(replacementDocument, serverPayload);
+
+ newUpdateModification = write_ops::UpdateModification(
+ safeContentReplace, write_ops::UpdateModification::ClassicTag(), true);
+ }
// Step 3 ----
- newFindAndModifyRequest.setUpdate(mongo::write_ops::UpdateModification(
- pushUpdate, write_ops::UpdateModification::ClassicTag(), false));
+ newFindAndModifyRequest.setUpdate(newUpdateModification);
}
// TODO - use this update for retryable writes
@@ -1078,7 +1168,9 @@ write_ops::FindAndModifyCommandReply processFindAndModify(
pullUpdateOpEntry.setU(mongo::write_ops::UpdateModification(
pullUpdate, write_ops::UpdateModification::ClassicTag(), false));
newUpdateRequest.setUpdates({pullUpdateOpEntry});
- BSONObj finalCorrectDocument = queryImpl->updateWithPreimage(edcNss, ei, newUpdateRequest);
+ auto [finalUpdateReply, finalCorrectDocument] =
+ queryImpl->updateWithPreimage(edcNss, ei, newUpdateRequest);
+ checkWriteErrors(finalUpdateReply);
}
return reply;
diff --git a/src/mongo/db/fle_crud.h b/src/mongo/db/fle_crud.h
index f49baacbf36..b26e76703ae 100644
--- a/src/mongo/db/fle_crud.h
+++ b/src/mongo/db/fle_crud.h
@@ -142,9 +142,10 @@ public:
* Returns the pre-image of the updated document. If no documents were updated, returns an empty
* BSON object.
*/
- virtual BSONObj updateWithPreimage(const NamespaceString& nss,
- const EncryptionInformation& ei,
- const write_ops::UpdateCommandRequest& updateRequest) = 0;
+ virtual std::pair<write_ops::UpdateCommandReply, BSONObj> updateWithPreimage(
+ const NamespaceString& nss,
+ const EncryptionInformation& ei,
+ const write_ops::UpdateCommandRequest& updateRequest) = 0;
/**
* Do a single findAndModify request.
@@ -182,8 +183,8 @@ uint64_t processDelete(FLEQueryInterface* queryImpl,
*
* Used by unit tests.
*/
-uint64_t processUpdate(FLEQueryInterface* queryImpl,
- const write_ops::UpdateCommandRequest& updateRequest);
+write_ops::UpdateCommandReply processUpdate(FLEQueryInterface* queryImpl,
+ const write_ops::UpdateCommandRequest& updateRequest);
/**
* Callback function to get a TransactionWithRetries with the appropiate Executor
diff --git a/src/mongo/db/fle_crud_test.cpp b/src/mongo/db/fle_crud_test.cpp
index 6477037e99e..743a166cce9 100644
--- a/src/mongo/db/fle_crud_test.cpp
+++ b/src/mongo/db/fle_crud_test.cpp
@@ -93,9 +93,10 @@ public:
const EncryptionInformation& ei,
const write_ops::DeleteCommandRequest& deleteRequest) final;
- BSONObj updateWithPreimage(const NamespaceString& nss,
- const EncryptionInformation& ei,
- const write_ops::UpdateCommandRequest& updateRequest) final;
+ std::pair<write_ops::UpdateCommandReply, BSONObj> updateWithPreimage(
+ const NamespaceString& nss,
+ const EncryptionInformation& ei,
+ const write_ops::UpdateCommandRequest& updateRequest) final;
write_ops::FindAndModifyCommandReply findAndModify(
const NamespaceString& nss,
@@ -154,9 +155,10 @@ BSONObj FLEQueryTestImpl::deleteWithPreimage(const NamespaceString& nss,
return uassertStatusOK(swDoc);
}
-BSONObj FLEQueryTestImpl::updateWithPreimage(const NamespaceString& nss,
- const EncryptionInformation& ei,
- const write_ops::UpdateCommandRequest& updateRequest) {
+std::pair<write_ops::UpdateCommandReply, BSONObj> FLEQueryTestImpl::updateWithPreimage(
+ const NamespaceString& nss,
+ const EncryptionInformation& ei,
+ const write_ops::UpdateCommandRequest& updateRequest) {
// A limit of the API, we can delete by _id and get the pre-image so we limit our unittests to
// this
ASSERT_EQ(updateRequest.getUpdates().size(), 1);
@@ -165,12 +167,20 @@ BSONObj FLEQueryTestImpl::updateWithPreimage(const NamespaceString& nss,
BSONObj preimage = getById(nss, updateOpEntry.getQ().firstElement());
- uassertStatusOK(_storage->upsertById(_opCtx,
- nss,
- updateOpEntry.getQ().firstElement(),
- updateOpEntry.getU().getUpdateModifier()));
+ if (updateOpEntry.getU().type() == write_ops::UpdateModification::Type::kModifier) {
+ uassertStatusOK(_storage->upsertById(_opCtx,
+ nss,
+ updateOpEntry.getQ().firstElement(),
+ updateOpEntry.getU().getUpdateModifier()));
+ } else {
+ uassertStatusOK(_storage->upsertById(_opCtx,
+ nss,
+ updateOpEntry.getQ().firstElement(),
+ updateOpEntry.getU().getUpdateReplacement()));
+ }
+
- return preimage;
+ return {write_ops::UpdateCommandReply(), preimage};
}
write_ops::FindAndModifyCommandReply FLEQueryTestImpl::findAndModify(
@@ -286,6 +296,7 @@ protected:
void doSingleUpdate(int id, BSONElement element);
void doSingleUpdate(int id, BSONObj obj);
void doSingleUpdateWithUpdateDoc(int id, BSONObj update);
+ void doSingleUpdateWithUpdateDoc(int id, const write_ops::UpdateModification& modification);
void doFindAndModify(write_ops::FindAndModifyCommandRequest& request);
@@ -610,6 +621,14 @@ void FleCrudTest::doSingleUpdate(int id, BSONElement element) {
}
void FleCrudTest::doSingleUpdateWithUpdateDoc(int id, BSONObj update) {
+ doSingleUpdateWithUpdateDoc(
+ id,
+ write_ops::UpdateModification(update, write_ops::UpdateModification::ClassicTag{}, false));
+}
+
+void FleCrudTest::doSingleUpdateWithUpdateDoc(int id,
+ const write_ops::UpdateModification& modification) {
+
auto efc = getTestEncryptedFieldConfig();
auto doc = EncryptionInformationHelpers::encryptionInformationSerializeForDelete(
_edcNs, efc, &_keyVault);
@@ -617,8 +636,7 @@ void FleCrudTest::doSingleUpdateWithUpdateDoc(int id, BSONObj update) {
write_ops::UpdateOpEntry entry;
entry.setQ(BSON("_id" << id));
- entry.setU(
- write_ops::UpdateModification(update, write_ops::UpdateModification::ClassicTag{}, false));
+ entry.setU(modification);
write_ops::UpdateCommandRequest updateRequest(_edcNs);
updateRequest.setUpdates({entry});
@@ -950,6 +968,42 @@ TEST_F(FleCrudTest, UpdateOneSameValue) {
<< "secret"));
}
+
+// Update one document with replacement
+TEST_F(FleCrudTest, UpdateOneReplace) {
+
+ doSingleInsert(1,
+ BSON("encrypted"
+ << "secret"));
+
+ assertDocumentCounts(1, 1, 0, 1);
+
+ auto replace = BSON("encrypted"
+ << "top secret");
+
+ auto buf = generateSinglePlaceholder(replace.firstElement());
+
+ auto replaceEP = BSON("plainText"
+ << "fake"
+ << "encrypted"
+ << BSONBinData(buf.data(), buf.size(), BinDataType::Encrypt));
+
+ auto result = FLEClientCrypto::generateInsertOrUpdateFromPlaceholders(replaceEP, &_keyVault);
+
+ doSingleUpdateWithUpdateDoc(
+ 1,
+ write_ops::UpdateModification(result, write_ops::UpdateModification::ClassicTag{}, true));
+
+
+ assertDocumentCounts(1, 2, 1, 3);
+
+ validateDocument(1,
+ BSON("_id" << 1 << "plainText"
+ << "fake"
+ << "encrypted"
+ << "top secret"));
+}
+
// Rename safeContent
TEST_F(FleCrudTest, RenameSafeContent) {