summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHana Pearlman <hana.pearlman@mongodb.com>2022-04-04 20:50:42 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-04-04 21:42:31 +0000
commit31c733015e41c04d903e6b807a699fbce84d22bb (patch)
treedd0670eac4661180f39f0b8924f89c1d7b1dfe7d
parent006cef15212cf443794d9f62c31a031fc8d87fb5 (diff)
downloadmongo-31c733015e41c04d903e6b807a699fbce84d22bb.tar.gz
SERVER-63312: Implement FLE server-side rewrite for agg on mongod
-rw-r--r--buildscripts/resmokeconfig/suites/fle2.yml2
-rw-r--r--src/mongo/db/SConscript1
-rw-r--r--src/mongo/db/commands/pipeline_command.cpp4
-rw-r--r--src/mongo/db/commands/run_aggregate.cpp14
-rw-r--r--src/mongo/db/commands/run_aggregate.h4
-rw-r--r--src/mongo/db/fle_crud.cpp8
-rw-r--r--src/mongo/db/fle_crud.h19
-rw-r--r--src/mongo/db/fle_crud_mongod.cpp9
-rw-r--r--src/mongo/db/query/fle/server_rewrite.cpp11
-rw-r--r--src/mongo/db/query/fle/server_rewrite.h3
-rw-r--r--src/mongo/s/query/cluster_aggregate.cpp8
11 files changed, 62 insertions, 21 deletions
diff --git a/buildscripts/resmokeconfig/suites/fle2.yml b/buildscripts/resmokeconfig/suites/fle2.yml
index afce3dd15f9..6847e0121d2 100644
--- a/buildscripts/resmokeconfig/suites/fle2.yml
+++ b/buildscripts/resmokeconfig/suites/fle2.yml
@@ -3,8 +3,6 @@ selector:
roots:
- jstests/fle2/**/*.js
- src/mongo/db/modules/*/jstests/fle2/**/*.js
- exclude_files:
- - src/mongo/db/modules/*/jstests/fle2/query/aggregate*.js # TODO: SERVER-63312
executor:
archive:
hooks:
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index 621d7ba3f65..3353b74fd91 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -869,6 +869,7 @@ env.Library(
LIBDEPS=[
'$BUILD_DIR/mongo/crypto/encrypted_field_config',
'$BUILD_DIR/mongo/db/ops/write_ops_parsers',
+ '$BUILD_DIR/mongo/db/pipeline/pipeline',
'transaction_api',
],
LIBDEPS_PRIVATE=[
diff --git a/src/mongo/db/commands/pipeline_command.cpp b/src/mongo/db/commands/pipeline_command.cpp
index c3772f5af11..eb5f74d4387 100644
--- a/src/mongo/db/commands/pipeline_command.cpp
+++ b/src/mongo/db/commands/pipeline_command.cpp
@@ -101,7 +101,7 @@ public:
public:
Invocation(Command* cmd,
const OpMsgRequest& request,
- const AggregateCommandRequest aggregationRequest,
+ AggregateCommandRequest aggregationRequest,
PrivilegeVector privileges)
: CommandInvocation(cmd),
_request(request),
@@ -182,7 +182,7 @@ public:
const OpMsgRequest& _request;
const std::string _dbName;
- const AggregateCommandRequest _aggregationRequest;
+ AggregateCommandRequest _aggregationRequest;
const LiteParsedPipeline _liteParsedPipeline;
const PrivilegeVector _privileges;
};
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp
index 51f8dc4ac77..3f79db4449a 100644
--- a/src/mongo/db/commands/run_aggregate.cpp
+++ b/src/mongo/db/commands/run_aggregate.cpp
@@ -48,6 +48,7 @@
#include "mongo/db/db_raii.h"
#include "mongo/db/exec/document_value/document.h"
#include "mongo/db/exec/working_set_common.h"
+#include "mongo/db/fle_crud.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/pipeline/accumulator.h"
#include "mongo/db/pipeline/aggregation_request_helper.h"
@@ -644,7 +645,7 @@ std::vector<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> createLegacyEx
Status runAggregate(OperationContext* opCtx,
const NamespaceString& nss,
- const AggregateCommandRequest& request,
+ AggregateCommandRequest& request,
const BSONObj& cmdObj,
const PrivilegeVector& privileges,
rpc::ReplyBuilderInterface* result) {
@@ -653,7 +654,7 @@ Status runAggregate(OperationContext* opCtx,
Status runAggregate(OperationContext* opCtx,
const NamespaceString& origNss,
- const AggregateCommandRequest& request,
+ AggregateCommandRequest& request,
const LiteParsedPipeline& liteParsedPipeline,
const BSONObj& cmdObj,
const PrivilegeVector& privileges,
@@ -915,6 +916,15 @@ Status runAggregate(OperationContext* opCtx,
}
}
+ // If the aggregate command supports encrypted collections, do rewrites of the pipeline to
+ // support querying against encrypted fields.
+ if (shouldDoFLERewrite(request)) {
+ // After this rewriting, the encryption info does not need to be kept around.
+ pipeline = processFLEPipelineD(
+ opCtx, nss, request.getEncryptionInformation().get(), std::move(pipeline));
+ request.setEncryptionInformation(boost::none);
+ }
+
pipeline->optimizePipeline();
constexpr bool alreadyOptimized = true;
diff --git a/src/mongo/db/commands/run_aggregate.h b/src/mongo/db/commands/run_aggregate.h
index ea7873dade6..bbfbf7da892 100644
--- a/src/mongo/db/commands/run_aggregate.h
+++ b/src/mongo/db/commands/run_aggregate.h
@@ -53,7 +53,7 @@ namespace mongo {
*/
Status runAggregate(OperationContext* opCtx,
const NamespaceString& nss,
- const AggregateCommandRequest& request,
+ AggregateCommandRequest& request,
const LiteParsedPipeline& liteParsedPipeline,
const BSONObj& cmdObj,
const PrivilegeVector& privileges,
@@ -64,7 +64,7 @@ Status runAggregate(OperationContext* opCtx,
*/
Status runAggregate(OperationContext* opCtx,
const NamespaceString& nss,
- const AggregateCommandRequest& request,
+ AggregateCommandRequest& request,
const BSONObj& cmdObj,
const PrivilegeVector& privileges,
rpc::ReplyBuilderInterface* result);
diff --git a/src/mongo/db/fle_crud.cpp b/src/mongo/db/fle_crud.cpp
index 9031c6ced9c..4ae8c2fdbec 100644
--- a/src/mongo/db/fle_crud.cpp
+++ b/src/mongo/db/fle_crud.cpp
@@ -1248,4 +1248,12 @@ void processFLEFindS(OperationContext* opCtx, FindCommandRequest* findCommand) {
fle::processFindCommand(opCtx, findCommand, &getTransactionWithRetriesForMongoS);
}
+std::unique_ptr<Pipeline, PipelineDeleter> processFLEPipelineS(
+ OperationContext* opCtx,
+ NamespaceString nss,
+ const EncryptionInformation& encryptInfo,
+ std::unique_ptr<Pipeline, PipelineDeleter> toRewrite) {
+ return fle::processPipeline(
+ opCtx, nss, encryptInfo, std::move(toRewrite), &getTransactionWithRetriesForMongoS);
+}
} // namespace mongo
diff --git a/src/mongo/db/fle_crud.h b/src/mongo/db/fle_crud.h
index 07732356432..54439d34273 100644
--- a/src/mongo/db/fle_crud.h
+++ b/src/mongo/db/fle_crud.h
@@ -39,6 +39,7 @@
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/ops/write_ops_gen.h"
+#include "mongo/db/pipeline/pipeline.h"
#include "mongo/db/transaction_api.h"
#include "mongo/s/write_ops/batch_write_exec.h"
#include "mongo/s/write_ops/batched_command_response.h"
@@ -126,6 +127,24 @@ void processFLEFindS(OperationContext* opCtx, FindCommandRequest* findCommand);
void processFLEFindD(OperationContext* opCtx, FindCommandRequest* findCommand);
/**
+ * Process a pipeline from mongos.
+ */
+std::unique_ptr<Pipeline, PipelineDeleter> processFLEPipelineS(
+ OperationContext* opCtx,
+ NamespaceString nss,
+ const EncryptionInformation& encryptInfo,
+ std::unique_ptr<Pipeline, PipelineDeleter> toRewrite);
+
+/**
+ * Process a pipeline from a replica set.
+ */
+std::unique_ptr<Pipeline, PipelineDeleter> processFLEPipelineD(
+ OperationContext* opCtx,
+ NamespaceString nss,
+ const EncryptionInformation& encryptInfo,
+ std::unique_ptr<Pipeline, PipelineDeleter> toRewrite);
+
+/**
* Helper function to determine if an IDL object with encryption information should be rewritten.
*/
template <typename T>
diff --git a/src/mongo/db/fle_crud_mongod.cpp b/src/mongo/db/fle_crud_mongod.cpp
index 8bc1cf6d7b5..1308c8cb136 100644
--- a/src/mongo/db/fle_crud_mongod.cpp
+++ b/src/mongo/db/fle_crud_mongod.cpp
@@ -39,6 +39,7 @@
#include "mongo/bson/bsontypes.h"
#include "mongo/crypto/encryption_fields_gen.h"
#include "mongo/crypto/fle_crypto.h"
+#include "mongo/db/fle_crud.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/ops/write_ops_gen.h"
#include "mongo/db/ops/write_ops_parsers.h"
@@ -236,4 +237,12 @@ void processFLEFindD(OperationContext* opCtx, FindCommandRequest* findCommand) {
fle::processFindCommand(opCtx, findCommand, &getTransactionWithRetriesForMongoD);
}
+std::unique_ptr<Pipeline, PipelineDeleter> processFLEPipelineD(
+ OperationContext* opCtx,
+ NamespaceString nss,
+ const EncryptionInformation& encryptInfo,
+ std::unique_ptr<Pipeline, PipelineDeleter> toRewrite) {
+ return fle::processPipeline(
+ opCtx, nss, encryptInfo, std::move(toRewrite), &getTransactionWithRetriesForMongoD);
+}
} // namespace mongo
diff --git a/src/mongo/db/query/fle/server_rewrite.cpp b/src/mongo/db/query/fle/server_rewrite.cpp
index 63e5e2fa87e..2c6480b44b5 100644
--- a/src/mongo/db/query/fle/server_rewrite.cpp
+++ b/src/mongo/db/query/fle/server_rewrite.cpp
@@ -224,16 +224,11 @@ std::unique_ptr<Pipeline, PipelineDeleter> processPipeline(
OperationContext* opCtx,
NamespaceString nss,
const EncryptionInformation& encryptInfo,
- std::unique_ptr<Pipeline, PipelineDeleter> toRewrite) {
+ std::unique_ptr<Pipeline, PipelineDeleter> toRewrite,
+ GetTxnCallback txn) {
auto sharedBlock = std::make_shared<PipelineRewrite>(nss, encryptInfo, std::move(toRewrite));
- doFLERewriteInTxn(opCtx, sharedBlock, [](auto* opCtx) {
- // TODO: SERVER-63312 pass in the right transaction callback for mongod.
- return std::make_shared<txn_api::TransactionWithRetries>(
- opCtx,
- Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(),
- TransactionRouterResourceYielder::makeForLocalHandoff());
- });
+ doFLERewriteInTxn(opCtx, sharedBlock, txn);
return sharedBlock->getPipeline();
}
diff --git a/src/mongo/db/query/fle/server_rewrite.h b/src/mongo/db/query/fle/server_rewrite.h
index 8d1da86009b..9a94e0a0aa8 100644
--- a/src/mongo/db/query/fle/server_rewrite.h
+++ b/src/mongo/db/query/fle/server_rewrite.h
@@ -62,7 +62,8 @@ std::unique_ptr<Pipeline, PipelineDeleter> processPipeline(
OperationContext* opCtx,
NamespaceString nss,
const EncryptionInformation& encryptInfo,
- std::unique_ptr<Pipeline, PipelineDeleter> toRewrite);
+ std::unique_ptr<Pipeline, PipelineDeleter> toRewrite,
+ GetTxnCallback txn);
/**
* Rewrite a filter MatchExpression with FLE Find Payloads into a disjunction over the tag array
diff --git a/src/mongo/s/query/cluster_aggregate.cpp b/src/mongo/s/query/cluster_aggregate.cpp
index 2034489bb49..4bcd150206b 100644
--- a/src/mongo/s/query/cluster_aggregate.cpp
+++ b/src/mongo/s/query/cluster_aggregate.cpp
@@ -365,10 +365,10 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
// support querying against encrypted fields.
if (shouldDoFLERewrite) {
// After this rewriting, the encryption info does not need to be kept around.
- pipeline = fle::processPipeline(opCtx,
- namespaces.executionNss,
- request.getEncryptionInformation().get(),
- std::move(pipeline));
+ pipeline = processFLEPipelineS(opCtx,
+ namespaces.executionNss,
+ request.getEncryptionInformation().get(),
+ std::move(pipeline));
request.setEncryptionInformation(boost::none);
}