diff options
-rw-r--r-- | src/mongo/db/pipeline/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/pipeline/aggregation_request.cpp | 9 | ||||
-rw-r--r-- | src/mongo/db/pipeline/aggregation_request.h | 14 | ||||
-rw-r--r-- | src/mongo/db/pipeline/expression_context.cpp | 8 | ||||
-rw-r--r-- | src/mongo/db/pipeline/expression_context.h | 3 | ||||
-rw-r--r-- | src/mongo/db/pipeline/expression_context_for_test.h | 2 | ||||
-rw-r--r-- | src/mongo/db/pipeline/expression_test.cpp | 38 | ||||
-rw-r--r-- | src/mongo/db/pipeline/runtime_constants.idl | 50 | ||||
-rw-r--r-- | src/mongo/db/pipeline/sharded_agg_helpers.cpp | 27 | ||||
-rw-r--r-- | src/mongo/db/pipeline/sharded_agg_helpers.h | 5 | ||||
-rw-r--r-- | src/mongo/db/pipeline/variables.cpp | 55 | ||||
-rw-r--r-- | src/mongo/db/pipeline/variables.h | 29 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_aggregate.cpp | 19 |
13 files changed, 243 insertions, 17 deletions
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index d8cd96c02bc..e9183efb0eb 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -595,6 +595,7 @@ env.Library( env.Idlc('document_source_list_sessions.idl')[0], env.Idlc('document_source_out.idl')[0], env.Idlc('exchange_spec.idl')[0], + env.Idlc('runtime_constants.idl')[0], 'resume_token.cpp', ], LIBDEPS=[ diff --git a/src/mongo/db/pipeline/aggregation_request.cpp b/src/mongo/db/pipeline/aggregation_request.cpp index ce2dc95da88..2357693d4f0 100644 --- a/src/mongo/db/pipeline/aggregation_request.cpp +++ b/src/mongo/db/pipeline/aggregation_request.cpp @@ -239,6 +239,13 @@ StatusWith<AggregationRequest> AggregationRequest::parseFromBSON( WriteConcernOptions writeConcern; uassertStatusOK(writeConcern.parse(elem.embeddedObject())); request.setWriteConcern(writeConcern); + } else if (kRuntimeConstants == fieldName) { + try { + IDLParserErrorContext ctx("internalRuntimeConstants"); + request.setRuntimeConstants(RuntimeConstants::parse(ctx, elem.Obj())); + } catch (const DBException& ex) { + return ex.toStatus(); + } } else if (!isGenericArgument(fieldName)) { return {ErrorCodes::FailedToParse, str::stream() << "unrecognized field '" << elem.fieldName() << "'"}; @@ -342,6 +349,8 @@ Document AggregationRequest::serializeToCommandObj() const { {kExchangeName, _exchangeSpec ? Value(_exchangeSpec->toBSON()) : Value()}, {WriteConcernOptions::kWriteConcernField, _writeConcern ? Value(_writeConcern->toBSON()) : Value()}, + // Only serialize runtime constants if any were specified. + {kRuntimeConstants, _runtimeConstants ? Value(_runtimeConstants->toBSON()) : Value()}, }; } } // namespace mongo diff --git a/src/mongo/db/pipeline/aggregation_request.h b/src/mongo/db/pipeline/aggregation_request.h index d58bf8d2b09..fc146ef4324 100644 --- a/src/mongo/db/pipeline/aggregation_request.h +++ b/src/mongo/db/pipeline/aggregation_request.h @@ -36,6 +36,7 @@ #include "mongo/bson/bsonobj.h" #include "mongo/db/namespace_string.h" #include "mongo/db/pipeline/exchange_spec_gen.h" +#include "mongo/db/pipeline/runtime_constants_gen.h" #include "mongo/db/query/explain_options.h" #include "mongo/db/write_concern_options.h" @@ -63,6 +64,7 @@ public: static constexpr StringData kHintName = "hint"_sd; static constexpr StringData kCommentName = "comment"_sd; static constexpr StringData kExchangeName = "exchange"_sd; + static constexpr StringData kRuntimeConstants = "runtimeConstants"_sd; static constexpr long long kDefaultBatchSize = 101; @@ -216,6 +218,10 @@ public: return _writeConcern; } + const auto& getRuntimeConstants() const { + return _runtimeConstants; + } + // // Setters for optional fields. // @@ -284,6 +290,10 @@ public: _writeConcern = writeConcern; } + void setRuntimeConstants(RuntimeConstants runtimeConstants) { + _runtimeConstants = std::move(runtimeConstants); + } + private: // Required fields. const NamespaceString _nss; @@ -333,5 +343,9 @@ private: // The explicit writeConcern for the operation or boost::none if the user did not specifiy one. boost::optional<WriteConcernOptions> _writeConcern; + + // A document containing runtime constants; i.e. values that do not change once computed (e.g. + // $$NOW). + boost::optional<RuntimeConstants> _runtimeConstants; }; } // namespace mongo diff --git a/src/mongo/db/pipeline/expression_context.cpp b/src/mongo/db/pipeline/expression_context.cpp index 508f51788c2..21651f6f2b8 100644 --- a/src/mongo/db/pipeline/expression_context.cpp +++ b/src/mongo/db/pipeline/expression_context.cpp @@ -62,6 +62,11 @@ ExpressionContext::ExpressionContext(OperationContext* opCtx, _ownedCollator = std::move(collator); _resolvedNamespaces = std::move(resolvedNamespaces); uuid = std::move(collUUID); + if (request.getRuntimeConstants()) { + variables.setRuntimeConstants(request.getRuntimeConstants().get()); + } else { + variables.generateRuntimeConstants(opCtx); + } } ExpressionContext::ExpressionContext(OperationContext* opCtx, const CollatorInterface* collator) @@ -170,6 +175,9 @@ intrusive_ptr<ExpressionContext> ExpressionContext::copyWith( expCtx->_resolvedNamespaces = _resolvedNamespaces; + expCtx->variables = variables; + expCtx->variablesParseState = variablesParseState.copyWith(expCtx->variables.useIdGenerator()); + // Note that we intentionally skip copying the value of '_interruptCounter' because 'expCtx' is // intended to be used for executing a separate aggregation pipeline. diff --git a/src/mongo/db/pipeline/expression_context.h b/src/mongo/db/pipeline/expression_context.h index 156d58854d1..e3e9e76b2f3 100644 --- a/src/mongo/db/pipeline/expression_context.h +++ b/src/mongo/db/pipeline/expression_context.h @@ -196,6 +196,9 @@ public: _resolvedNamespaces[nss.coll()] = std::move(resolvedNamespace); } + auto getRuntimeConstants() const { + return variables.getRuntimeConstants(); + } // The explain verbosity requested by the user, or boost::none if no explain was requested. boost::optional<ExplainOptions::Verbosity> explain; diff --git a/src/mongo/db/pipeline/expression_context_for_test.h b/src/mongo/db/pipeline/expression_context_for_test.h index 713e6cfe512..5438937133e 100644 --- a/src/mongo/db/pipeline/expression_context_for_test.h +++ b/src/mongo/db/pipeline/expression_context_for_test.h @@ -60,6 +60,8 @@ public: // initialize with a nullptr and set post-construction. timeZoneDatabase = TimeZoneDatabase::get(_serviceContext.getServiceContext()); opCtx = _testOpCtx.get(); + RuntimeConstants constants(Date_t::now(), Timestamp()); + variables.setRuntimeConstants(constants); } ExpressionContextForTest(OperationContext* opCtx, const AggregationRequest& request) diff --git a/src/mongo/db/pipeline/expression_test.cpp b/src/mongo/db/pipeline/expression_test.cpp index a5077b0b952..d0ee16fc2fa 100644 --- a/src/mongo/db/pipeline/expression_test.cpp +++ b/src/mongo/db/pipeline/expression_test.cpp @@ -6268,4 +6268,42 @@ public: SuiteInstance<All> myall; +namespace NowAndClusterTime { +TEST(NowAndClusterTime, BasicTest) { + intrusive_ptr<ExpressionContextForTest> expCtx(new ExpressionContextForTest()); + + // $$NOW is the Date type. + { + auto expression = ExpressionFieldPath::parse(expCtx, "$$NOW", expCtx->variablesParseState); + Value result = expression->evaluate(Document()); + ASSERT_EQ(result.getType(), Date); + } + // $$CLUSTER_TIME is the timestamp type. + { + auto expression = + ExpressionFieldPath::parse(expCtx, "$$CLUSTER_TIME", expCtx->variablesParseState); + Value result = expression->evaluate(Document()); + ASSERT_EQ(result.getType(), bsonTimestamp); + } + + // Multiple references to $$NOW must return the same value. + { + auto expression = Expression::parseExpression( + expCtx, fromjson("{$eq: [\"$$NOW\", \"$$NOW\"]}"), expCtx->variablesParseState); + Value result = expression->evaluate(Document()); + + ASSERT_VALUE_EQ(result, Value{true}); + } + // Same is true for the $$CLUSTER_TIME. + { + auto expression = + Expression::parseExpression(expCtx, + fromjson("{$eq: [\"$$CLUSTER_TIME\", \"$$CLUSTER_TIME\"]}"), + expCtx->variablesParseState); + Value result = expression->evaluate(Document()); + + ASSERT_VALUE_EQ(result, Value{true}); + } +} +} } // namespace ExpressionTests diff --git a/src/mongo/db/pipeline/runtime_constants.idl b/src/mongo/db/pipeline/runtime_constants.idl new file mode 100644 index 00000000000..4a68db64c75 --- /dev/null +++ b/src/mongo/db/pipeline/runtime_constants.idl @@ -0,0 +1,50 @@ +# Copyright (C) 2018-present MongoDB, Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the Server Side Public License, version 1, +# as published by MongoDB, Inc. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Server Side Public License for more details. +# +# You should have received a copy of the Server Side Public License +# along with this program. If not, see +# <http://www.mongodb.com/licensing/server-side-public-license>. +# +# As a special exception, the copyright holders give permission to link the +# code of portions of this program with the OpenSSL library under certain +# conditions as described in each individual source file and distribute +# linked combinations including the program with the OpenSSL library. You +# must comply with the Server Side Public License in all respects for +# all of the code used other than as permitted herein. If you modify file(s) +# with this exception, you may extend this exception to your version of the +# file(s), but you are not obligated to do so. If you do not wish to do so, +# delete this exception statement from your version. If you delete this +# exception statement from all source files in the program, then also delete +# it in the license file. +# + +# Runtime constants IDL file + +global: + cpp_namespace: "mongo" + +imports: + - "mongo/idl/basic_types.idl" + +structs: + RuntimeConstants: + description: A collection of values that do not change once computed. It is shipped as part + of an aggregation request so all shards participating in a query see the same + values. + fields: + localNow: + cpp_name: localNow + type: date + description: A value of the $$NOW variable. + clusterTime: + cpp_name: clusterTime + type: timestamp + description: A value of the $$CLUSTER_TIME variable. diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp index 3f87f138e78..9c84317c8d9 100644 --- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp +++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp @@ -75,7 +75,7 @@ Document wrapAggAsExplain(Document aggregateCommand, ExplainOptions::Verbosity v BSONObj createPassthroughCommandForShard(OperationContext* opCtx, const AggregationRequest& request, - const boost::optional<ShardId>& shardId, + const boost::optional<RuntimeConstants>& constants, Pipeline* pipeline, BSONObj collationObj) { // Create the command for the shards. @@ -84,14 +84,19 @@ BSONObj createPassthroughCommandForShard(OperationContext* opCtx, targetedCmd[AggregationRequest::kPipelineName] = Value(pipeline->serialize()); } - return genericTransformForShards(std::move(targetedCmd), opCtx, shardId, request, collationObj); + return genericTransformForShards( + std::move(targetedCmd), opCtx, request, constants, collationObj); } BSONObj genericTransformForShards(MutableDocument&& cmdForShards, OperationContext* opCtx, - const boost::optional<ShardId>& shardId, const AggregationRequest& request, + const boost::optional<RuntimeConstants>& constants, BSONObj collationObj) { + if (constants) { + cmdForShards[AggregationRequest::kRuntimeConstants] = Value(constants.get().toBSON()); + } + cmdForShards[AggregationRequest::kFromMongosName] = Value(true); // If this is a request for an aggregation explain, then we must wrap the aggregate inside an // explain command. @@ -158,8 +163,8 @@ BSONObj createCommandForTargetedShards( const cluster_aggregation_planner::SplitPipeline& splitPipeline, const BSONObj collationObj, const boost::optional<cluster_aggregation_planner::ShardedExchangePolicy> exchangeSpec, + const boost::optional<RuntimeConstants>& constants, bool needsMerge) { - // Create the command for the shards. MutableDocument targetedCmd(request.serializeToCommandObj()); // If we've parsed a pipeline on mongos, always override the pipeline, in case parsing it @@ -196,7 +201,7 @@ BSONObj createCommandForTargetedShards( exchangeSpec ? Value(exchangeSpec->exchangeSpec.toBSON()) : Value(); return genericTransformForShards( - std::move(targetedCmd), opCtx, boost::none, request, collationObj); + std::move(targetedCmd), opCtx, request, constants, collationObj); } /** @@ -270,10 +275,16 @@ DispatchShardPipelineResults dispatchShardPipeline( // Generate the command object for the targeted shards. BSONObj targetedCommand = splitPipeline - ? createCommandForTargetedShards( - opCtx, aggRequest, litePipe, *splitPipeline, collationObj, exchangeSpec, true) + ? createCommandForTargetedShards(opCtx, + aggRequest, + litePipe, + *splitPipeline, + collationObj, + exchangeSpec, + expCtx->getRuntimeConstants(), + true) : createPassthroughCommandForShard( - opCtx, aggRequest, boost::none, pipeline.get(), collationObj); + opCtx, aggRequest, expCtx->getRuntimeConstants(), pipeline.get(), collationObj); // Refresh the shard registry if we're targeting all shards. We need the shard registry // to be at least as current as the logical time used when creating the command for diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.h b/src/mongo/db/pipeline/sharded_agg_helpers.h index f1dc2c2d91e..861e1acb84c 100644 --- a/src/mongo/db/pipeline/sharded_agg_helpers.h +++ b/src/mongo/db/pipeline/sharded_agg_helpers.h @@ -110,18 +110,19 @@ BSONObj createCommandForTargetedShards( const cluster_aggregation_planner::SplitPipeline& splitPipeline, const BSONObj collationObj, const boost::optional<cluster_aggregation_planner::ShardedExchangePolicy> exchangeSpec, + const boost::optional<RuntimeConstants>& constants, bool needsMerge); BSONObj createPassthroughCommandForShard(OperationContext* opCtx, const AggregationRequest& request, - const boost::optional<ShardId>& shardId, + const boost::optional<RuntimeConstants>& constants, Pipeline* pipeline, BSONObj collationObj); BSONObj genericTransformForShards(MutableDocument&& cmdForShards, OperationContext* opCtx, - const boost::optional<ShardId>& shardId, const AggregationRequest& request, + const boost::optional<RuntimeConstants>& constants, BSONObj collationObj); /** diff --git a/src/mongo/db/pipeline/variables.cpp b/src/mongo/db/pipeline/variables.cpp index 13f2344e311..82a62224784 100644 --- a/src/mongo/db/pipeline/variables.cpp +++ b/src/mongo/db/pipeline/variables.cpp @@ -27,18 +27,21 @@ * it in the license file. */ -#include "mongo/platform/basic.h" - #include "mongo/db/pipeline/variables.h" +#include "mongo/db/client.h" +#include "mongo/db/logical_clock.h" +#include "mongo/platform/basic.h" +#include "mongo/platform/random.h" #include "mongo/util/mongoutils/str.h" +#include "mongo/util/time_support.h" namespace mongo { constexpr Variables::Id Variables::kRootId; constexpr Variables::Id Variables::kRemoveId; -const StringMap<Variables::Id> Variables::kBuiltinVarNameToId = {{"ROOT", kRootId}, - {"REMOVE", kRemoveId}}; +const StringMap<Variables::Id> Variables::kBuiltinVarNameToId = { + {"ROOT", kRootId}, {"REMOVE", kRemoveId}, {"NOW", kNowId}, {"CLUSTER_TIME", kClusterTimeId}}; void Variables::uassertValidNameForUserWrite(StringData varName) { // System variables users allowed to write to (currently just one) @@ -141,6 +144,16 @@ Value Variables::getValue(Id id, const Document& root) const { return Value(root); case Variables::kRemoveId: return Value(); + case Variables::kNowId: + case Variables::kClusterTimeId: + if (auto it = _runtimeConstants.find(id); it != _runtimeConstants.end()) { + return it->second; + } + + uasserted(51144, + str::stream() << "Buildin variable $$" << getBuiltinVariableName(id) + << " is not available."); + MONGO_UNREACHABLE; default: MONGO_UNREACHABLE; } @@ -162,6 +175,40 @@ Document Variables::getDocument(Id id, const Document& root) const { return Document(); } +RuntimeConstants Variables::getRuntimeConstants() const { + RuntimeConstants constants; + + if (auto it = _runtimeConstants.find(kNowId); it != _runtimeConstants.end()) { + constants.setLocalNow(it->second.getDate()); + } + if (auto it = _runtimeConstants.find(kClusterTimeId); it != _runtimeConstants.end()) { + constants.setClusterTime(it->second.getTimestamp()); + } + + return constants; +} + +void Variables::setRuntimeConstants(const RuntimeConstants& constants) { + _runtimeConstants[kNowId] = Value(constants.getLocalNow()); + _runtimeConstants[kClusterTimeId] = Value(constants.getClusterTime()); +} + +void Variables::generateRuntimeConstants(OperationContext* opCtx) { + _runtimeConstants[kNowId] = Value(Date_t::now()); + + if (opCtx->getClient()) { + if (auto logicalClock = LogicalClock::get(opCtx); logicalClock) { + auto clusterTime = logicalClock->getClusterTime(); + + // On a standalone mongod the logical clock may not be running and $$CLUSTER_TIME is not + // available. + if (clusterTime != LogicalTime::kUninitialized) { + _runtimeConstants[kClusterTimeId] = Value(clusterTime.asTimestamp()); + } + } + } +} + Variables::Id VariablesParseState::defineVariable(StringData name) { // Caller should have validated before hand by using Variables::uassertValidNameForUserWrite. massert(17275, diff --git a/src/mongo/db/pipeline/variables.h b/src/mongo/db/pipeline/variables.h index c681e9ace14..22a0e88526f 100644 --- a/src/mongo/db/pipeline/variables.h +++ b/src/mongo/db/pipeline/variables.h @@ -29,7 +29,9 @@ #pragma once +#include "mongo/db/operation_context.h" #include "mongo/db/pipeline/document.h" +#include "mongo/db/pipeline/runtime_constants_gen.h" #include "mongo/stdx/memory.h" #include "mongo/stdx/unordered_map.h" #include "mongo/util/string_map.h" @@ -73,6 +75,8 @@ public: // Ids for builtin variables. static constexpr Variables::Id kRootId = Id(-1); static constexpr Variables::Id kRemoveId = Id(-2); + static constexpr Variables::Id kNowId = Id(-3); + static constexpr Variables::Id kClusterTimeId = Id(-4); // Map from builtin var name to reserved id number. static const StringMap<Id> kBuiltinVarNameToId; @@ -119,6 +123,21 @@ public: return &_idGenerator; } + /** + * Serializes runtime constants. This is used to send the constants to shards. + */ + RuntimeConstants getRuntimeConstants() const; + + /** + * Deserialize runtime constants. + */ + void setRuntimeConstants(const RuntimeConstants& constants); + + /** + * Generate values that must be constant during the execution. + */ + void generateRuntimeConstants(OperationContext* opCtx); + private: struct ValueAndState { ValueAndState() = default; @@ -131,8 +150,18 @@ private: void setValue(Id id, const Value& value, bool isConstant); + static auto getBuiltinVariableName(Variables::Id variable) { + for (auto & [ name, id ] : kBuiltinVarNameToId) { + if (variable == id) { + return name; + } + } + return std::string(); + } + IdGenerator _idGenerator; std::vector<ValueAndState> _valueList; + stdx::unordered_map<Id, Value> _runtimeConstants; }; /** diff --git a/src/mongo/s/query/cluster_aggregate.cpp b/src/mongo/s/query/cluster_aggregate.cpp index e683ae162de..9149ae33530 100644 --- a/src/mongo/s/query/cluster_aggregate.cpp +++ b/src/mongo/s/query/cluster_aggregate.cpp @@ -113,6 +113,9 @@ BSONObj createCommandForMergingShard(const AggregationRequest& request, mergeCmd["pipeline"] = Value(pipelineForMerging->serialize()); mergeCmd[AggregationRequest::kFromMongosName] = Value(true); + mergeCmd[AggregationRequest::kRuntimeConstants] = + Value(mergeCtx->getRuntimeConstants().toBSON()); + // If the user didn't specify a collation already, make sure there's a collation attached to // the merge command, since the merging shard may not have the collection metadata. if (mergeCmd.peek()["collation"].missing()) { @@ -178,8 +181,15 @@ sharded_agg_helpers::DispatchShardPipelineResults dispatchExchangeConsumerPipeli consumerPipelines.emplace_back(std::move(consumerPipeline), nullptr, boost::none); - auto consumerCmdObj = sharded_agg_helpers::createCommandForTargetedShards( - opCtx, request, litePipe, consumerPipelines.back(), collationObj, boost::none, false); + auto consumerCmdObj = + sharded_agg_helpers::createCommandForTargetedShards(opCtx, + request, + litePipe, + consumerPipelines.back(), + collationObj, + boost::none, + expCtx->getRuntimeConstants(), + false); requests.emplace_back(shardDispatchResults->exchangeSpec->consumerShards[idx], consumerCmdObj); @@ -704,6 +714,9 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, const PrivilegeVector& privileges, BSONObjBuilder* result) { uassert(51028, "Cannot specify exchange option to a mongos", !request.getExchangeSpec()); + uassert(51143, + "Cannot specify runtime constants option to a mongos", + !request.getRuntimeConstants()); uassert(51089, str::stream() << "Internal parameter(s) [" << AggregationRequest::kNeedsMergeName << ", " @@ -870,7 +883,7 @@ Status ClusterAggregate::aggPassthrough(OperationContext* opCtx, // explain if necessary, and rewrites the result into a format safe to forward to shards. BSONObj cmdObj = CommandHelpers::filterCommandRequestForPassthrough( sharded_agg_helpers::createPassthroughCommandForShard( - opCtx, aggRequest, shardId, nullptr, BSONObj())); + opCtx, aggRequest, boost::none, nullptr, BSONObj())); MultiStatementTransactionRequestsSender ars( opCtx, |