summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorHana Pearlman <hana.pearlman@mongodb.com>2022-04-04 20:50:42 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-04-04 21:42:31 +0000
commit31c733015e41c04d903e6b807a699fbce84d22bb (patch)
treedd0670eac4661180f39f0b8924f89c1d7b1dfe7d /src/mongo/db
parent006cef15212cf443794d9f62c31a031fc8d87fb5 (diff)
downloadmongo-31c733015e41c04d903e6b807a699fbce84d22bb.tar.gz
SERVER-63312: Implement FLE server-side rewrite for agg on mongod
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/SConscript1
-rw-r--r--src/mongo/db/commands/pipeline_command.cpp4
-rw-r--r--src/mongo/db/commands/run_aggregate.cpp14
-rw-r--r--src/mongo/db/commands/run_aggregate.h4
-rw-r--r--src/mongo/db/fle_crud.cpp8
-rw-r--r--src/mongo/db/fle_crud.h19
-rw-r--r--src/mongo/db/fle_crud_mongod.cpp9
-rw-r--r--src/mongo/db/query/fle/server_rewrite.cpp11
-rw-r--r--src/mongo/db/query/fle/server_rewrite.h3
9 files changed, 58 insertions, 15 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index 621d7ba3f65..3353b74fd91 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -869,6 +869,7 @@ env.Library(
LIBDEPS=[
'$BUILD_DIR/mongo/crypto/encrypted_field_config',
'$BUILD_DIR/mongo/db/ops/write_ops_parsers',
+ '$BUILD_DIR/mongo/db/pipeline/pipeline',
'transaction_api',
],
LIBDEPS_PRIVATE=[
diff --git a/src/mongo/db/commands/pipeline_command.cpp b/src/mongo/db/commands/pipeline_command.cpp
index c3772f5af11..eb5f74d4387 100644
--- a/src/mongo/db/commands/pipeline_command.cpp
+++ b/src/mongo/db/commands/pipeline_command.cpp
@@ -101,7 +101,7 @@ public:
public:
Invocation(Command* cmd,
const OpMsgRequest& request,
- const AggregateCommandRequest aggregationRequest,
+ AggregateCommandRequest aggregationRequest,
PrivilegeVector privileges)
: CommandInvocation(cmd),
_request(request),
@@ -182,7 +182,7 @@ public:
const OpMsgRequest& _request;
const std::string _dbName;
- const AggregateCommandRequest _aggregationRequest;
+ AggregateCommandRequest _aggregationRequest;
const LiteParsedPipeline _liteParsedPipeline;
const PrivilegeVector _privileges;
};
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp
index 51f8dc4ac77..3f79db4449a 100644
--- a/src/mongo/db/commands/run_aggregate.cpp
+++ b/src/mongo/db/commands/run_aggregate.cpp
@@ -48,6 +48,7 @@
#include "mongo/db/db_raii.h"
#include "mongo/db/exec/document_value/document.h"
#include "mongo/db/exec/working_set_common.h"
+#include "mongo/db/fle_crud.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/pipeline/accumulator.h"
#include "mongo/db/pipeline/aggregation_request_helper.h"
@@ -644,7 +645,7 @@ std::vector<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> createLegacyEx
Status runAggregate(OperationContext* opCtx,
const NamespaceString& nss,
- const AggregateCommandRequest& request,
+ AggregateCommandRequest& request,
const BSONObj& cmdObj,
const PrivilegeVector& privileges,
rpc::ReplyBuilderInterface* result) {
@@ -653,7 +654,7 @@ Status runAggregate(OperationContext* opCtx,
Status runAggregate(OperationContext* opCtx,
const NamespaceString& origNss,
- const AggregateCommandRequest& request,
+ AggregateCommandRequest& request,
const LiteParsedPipeline& liteParsedPipeline,
const BSONObj& cmdObj,
const PrivilegeVector& privileges,
@@ -915,6 +916,15 @@ Status runAggregate(OperationContext* opCtx,
}
}
+ // If the aggregate command supports encrypted collections, do rewrites of the pipeline to
+ // support querying against encrypted fields.
+ if (shouldDoFLERewrite(request)) {
+ // After this rewriting, the encryption info does not need to be kept around.
+ pipeline = processFLEPipelineD(
+ opCtx, nss, request.getEncryptionInformation().get(), std::move(pipeline));
+ request.setEncryptionInformation(boost::none);
+ }
+
pipeline->optimizePipeline();
constexpr bool alreadyOptimized = true;
diff --git a/src/mongo/db/commands/run_aggregate.h b/src/mongo/db/commands/run_aggregate.h
index ea7873dade6..bbfbf7da892 100644
--- a/src/mongo/db/commands/run_aggregate.h
+++ b/src/mongo/db/commands/run_aggregate.h
@@ -53,7 +53,7 @@ namespace mongo {
*/
Status runAggregate(OperationContext* opCtx,
const NamespaceString& nss,
- const AggregateCommandRequest& request,
+ AggregateCommandRequest& request,
const LiteParsedPipeline& liteParsedPipeline,
const BSONObj& cmdObj,
const PrivilegeVector& privileges,
@@ -64,7 +64,7 @@ Status runAggregate(OperationContext* opCtx,
*/
Status runAggregate(OperationContext* opCtx,
const NamespaceString& nss,
- const AggregateCommandRequest& request,
+ AggregateCommandRequest& request,
const BSONObj& cmdObj,
const PrivilegeVector& privileges,
rpc::ReplyBuilderInterface* result);
diff --git a/src/mongo/db/fle_crud.cpp b/src/mongo/db/fle_crud.cpp
index 9031c6ced9c..4ae8c2fdbec 100644
--- a/src/mongo/db/fle_crud.cpp
+++ b/src/mongo/db/fle_crud.cpp
@@ -1248,4 +1248,12 @@ void processFLEFindS(OperationContext* opCtx, FindCommandRequest* findCommand) {
fle::processFindCommand(opCtx, findCommand, &getTransactionWithRetriesForMongoS);
}
+std::unique_ptr<Pipeline, PipelineDeleter> processFLEPipelineS(
+ OperationContext* opCtx,
+ NamespaceString nss,
+ const EncryptionInformation& encryptInfo,
+ std::unique_ptr<Pipeline, PipelineDeleter> toRewrite) {
+ return fle::processPipeline(
+ opCtx, nss, encryptInfo, std::move(toRewrite), &getTransactionWithRetriesForMongoS);
+}
} // namespace mongo
diff --git a/src/mongo/db/fle_crud.h b/src/mongo/db/fle_crud.h
index 07732356432..54439d34273 100644
--- a/src/mongo/db/fle_crud.h
+++ b/src/mongo/db/fle_crud.h
@@ -39,6 +39,7 @@
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/ops/write_ops_gen.h"
+#include "mongo/db/pipeline/pipeline.h"
#include "mongo/db/transaction_api.h"
#include "mongo/s/write_ops/batch_write_exec.h"
#include "mongo/s/write_ops/batched_command_response.h"
@@ -126,6 +127,24 @@ void processFLEFindS(OperationContext* opCtx, FindCommandRequest* findCommand);
void processFLEFindD(OperationContext* opCtx, FindCommandRequest* findCommand);
/**
+ * Process a pipeline from mongos.
+ */
+std::unique_ptr<Pipeline, PipelineDeleter> processFLEPipelineS(
+ OperationContext* opCtx,
+ NamespaceString nss,
+ const EncryptionInformation& encryptInfo,
+ std::unique_ptr<Pipeline, PipelineDeleter> toRewrite);
+
+/**
+ * Process a pipeline from a replica set.
+ */
+std::unique_ptr<Pipeline, PipelineDeleter> processFLEPipelineD(
+ OperationContext* opCtx,
+ NamespaceString nss,
+ const EncryptionInformation& encryptInfo,
+ std::unique_ptr<Pipeline, PipelineDeleter> toRewrite);
+
+/**
* Helper function to determine if an IDL object with encryption information should be rewritten.
*/
template <typename T>
diff --git a/src/mongo/db/fle_crud_mongod.cpp b/src/mongo/db/fle_crud_mongod.cpp
index 8bc1cf6d7b5..1308c8cb136 100644
--- a/src/mongo/db/fle_crud_mongod.cpp
+++ b/src/mongo/db/fle_crud_mongod.cpp
@@ -39,6 +39,7 @@
#include "mongo/bson/bsontypes.h"
#include "mongo/crypto/encryption_fields_gen.h"
#include "mongo/crypto/fle_crypto.h"
+#include "mongo/db/fle_crud.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/ops/write_ops_gen.h"
#include "mongo/db/ops/write_ops_parsers.h"
@@ -236,4 +237,12 @@ void processFLEFindD(OperationContext* opCtx, FindCommandRequest* findCommand) {
fle::processFindCommand(opCtx, findCommand, &getTransactionWithRetriesForMongoD);
}
+std::unique_ptr<Pipeline, PipelineDeleter> processFLEPipelineD(
+ OperationContext* opCtx,
+ NamespaceString nss,
+ const EncryptionInformation& encryptInfo,
+ std::unique_ptr<Pipeline, PipelineDeleter> toRewrite) {
+ return fle::processPipeline(
+ opCtx, nss, encryptInfo, std::move(toRewrite), &getTransactionWithRetriesForMongoD);
+}
} // namespace mongo
diff --git a/src/mongo/db/query/fle/server_rewrite.cpp b/src/mongo/db/query/fle/server_rewrite.cpp
index 63e5e2fa87e..2c6480b44b5 100644
--- a/src/mongo/db/query/fle/server_rewrite.cpp
+++ b/src/mongo/db/query/fle/server_rewrite.cpp
@@ -224,16 +224,11 @@ std::unique_ptr<Pipeline, PipelineDeleter> processPipeline(
OperationContext* opCtx,
NamespaceString nss,
const EncryptionInformation& encryptInfo,
- std::unique_ptr<Pipeline, PipelineDeleter> toRewrite) {
+ std::unique_ptr<Pipeline, PipelineDeleter> toRewrite,
+ GetTxnCallback txn) {
auto sharedBlock = std::make_shared<PipelineRewrite>(nss, encryptInfo, std::move(toRewrite));
- doFLERewriteInTxn(opCtx, sharedBlock, [](auto* opCtx) {
- // TODO: SERVER-63312 pass in the right transaction callback for mongod.
- return std::make_shared<txn_api::TransactionWithRetries>(
- opCtx,
- Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(),
- TransactionRouterResourceYielder::makeForLocalHandoff());
- });
+ doFLERewriteInTxn(opCtx, sharedBlock, txn);
return sharedBlock->getPipeline();
}
diff --git a/src/mongo/db/query/fle/server_rewrite.h b/src/mongo/db/query/fle/server_rewrite.h
index 8d1da86009b..9a94e0a0aa8 100644
--- a/src/mongo/db/query/fle/server_rewrite.h
+++ b/src/mongo/db/query/fle/server_rewrite.h
@@ -62,7 +62,8 @@ std::unique_ptr<Pipeline, PipelineDeleter> processPipeline(
OperationContext* opCtx,
NamespaceString nss,
const EncryptionInformation& encryptInfo,
- std::unique_ptr<Pipeline, PipelineDeleter> toRewrite);
+ std::unique_ptr<Pipeline, PipelineDeleter> toRewrite,
+ GetTxnCallback txn);
/**
* Rewrite a filter MatchExpression with FLE Find Payloads into a disjunction over the tag array