diff options
author | Bernard Gorman <bernard.gorman@gmail.com> | 2019-05-09 20:03:43 +0100 |
---|---|---|
committer | Bernard Gorman <bernard.gorman@gmail.com> | 2019-05-17 16:22:28 +0100 |
commit | c7f86d2fce5ad0145b57007f83584cf96d02d9d4 (patch) | |
tree | ae865183a5c1094c2aafbe65df9985ba8534181a /src | |
parent | 99585f23be46f97d2e8a962d00fef6ca6758390c (diff) | |
download | mongo-c7f86d2fce5ad0145b57007f83584cf96d02d9d4.tar.gz |
SERVER-40406 Add support for $$NOW and $$CLUSTER_TIME in the update command
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/commands/write_commands/write_commands.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/ops/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/ops/parsed_update.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/ops/update_request.h | 16 | ||||
-rw-r--r-- | src/mongo/db/ops/write_ops.idl | 6 | ||||
-rw-r--r-- | src/mongo/db/ops/write_ops_exec.cpp | 11 | ||||
-rw-r--r-- | src/mongo/db/pipeline/SConscript | 12 | ||||
-rw-r--r-- | src/mongo/db/pipeline/expression_context.cpp | 12 | ||||
-rw-r--r-- | src/mongo/db/pipeline/expression_context.h | 8 | ||||
-rw-r--r-- | src/mongo/db/pipeline/expression_context_for_test.h | 2 | ||||
-rw-r--r-- | src/mongo/db/pipeline/variables.cpp | 22 | ||||
-rw-r--r-- | src/mongo/db/pipeline/variables.h | 9 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_write_cmd.cpp | 9 | ||||
-rw-r--r-- | src/mongo/s/write_ops/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/s/write_ops/batch_write_op.cpp | 5 | ||||
-rw-r--r-- | src/mongo/s/write_ops/batched_command_request.h | 15 |
16 files changed, 112 insertions, 24 deletions
diff --git a/src/mongo/db/commands/write_commands/write_commands.cpp b/src/mongo/db/commands/write_commands/write_commands.cpp index 9f6813be8bc..610f5bb7b2c 100644 --- a/src/mongo/db/commands/write_commands/write_commands.cpp +++ b/src/mongo/db/commands/write_commands/write_commands.cpp @@ -363,6 +363,8 @@ private: UpdateRequest updateRequest(_batch.getNamespace()); updateRequest.setQuery(_batch.getUpdates()[0].getQ()); updateRequest.setUpdateModification(_batch.getUpdates()[0].getU()); + updateRequest.setRuntimeConstants( + _batch.getRuntimeConstants().value_or(Variables::generateRuntimeConstants(opCtx))); updateRequest.setCollation(write_ops::collationOf(_batch.getUpdates()[0])); updateRequest.setArrayFilters(write_ops::arrayFiltersOf(_batch.getUpdates()[0])); updateRequest.setMulti(_batch.getUpdates()[0].getMulti()); diff --git a/src/mongo/db/ops/SConscript b/src/mongo/db/ops/SConscript index 8f1df866c99..dff8b81dd47 100644 --- a/src/mongo/db/ops/SConscript +++ b/src/mongo/db/ops/SConscript @@ -36,6 +36,7 @@ env.Library( '$BUILD_DIR/mongo/base', '$BUILD_DIR/mongo/db/commands/test_commands_enabled', '$BUILD_DIR/mongo/db/dbmessage', + '$BUILD_DIR/mongo/db/pipeline/runtime_constants_idl', '$BUILD_DIR/mongo/idl/idl_parser', ], ) diff --git a/src/mongo/db/ops/parsed_update.cpp b/src/mongo/db/ops/parsed_update.cpp index acedfc48d8a..d47d973c94b 100644 --- a/src/mongo/db/ops/parsed_update.cpp +++ b/src/mongo/db/ops/parsed_update.cpp @@ -44,7 +44,7 @@ ParsedUpdate::ParsedUpdate(OperationContext* opCtx, const ExtensionsCallback& extensionsCallback) : _opCtx(opCtx), _request(request), - _driver(new ExpressionContext(opCtx, nullptr)), + _driver(new ExpressionContext(opCtx, nullptr, _request->getRuntimeConstants())), _canonicalQuery(), _extensionsCallback(extensionsCallback) {} @@ -123,7 +123,8 @@ Status ParsedUpdate::parseQueryToCQ() { allowedMatcherFeatures &= ~MatchExpressionParser::AllowedFeatures::kExpr; } - boost::intrusive_ptr<ExpressionContext> expCtx; + auto expCtx = + make_intrusive<ExpressionContext>(_opCtx, _collator.get(), _request->getRuntimeConstants()); auto statusWithCQ = CanonicalQuery::canonicalize( _opCtx, std::move(qr), std::move(expCtx), _extensionsCallback, allowedMatcherFeatures); if (statusWithCQ.isOK()) { diff --git a/src/mongo/db/ops/update_request.h b/src/mongo/db/ops/update_request.h index 587fdb1b718..2e84c575f11 100644 --- a/src/mongo/db/ops/update_request.h +++ b/src/mongo/db/ops/update_request.h @@ -34,6 +34,7 @@ #include "mongo/db/logical_session_id.h" #include "mongo/db/namespace_string.h" #include "mongo/db/ops/write_ops_parsers.h" +#include "mongo/db/pipeline/runtime_constants_gen.h" #include "mongo/db/query/explain.h" #include "mongo/util/str.h" @@ -110,6 +111,14 @@ public: return _updateMod; } + inline void setRuntimeConstants(const RuntimeConstants& runtimeConstants) { + _runtimeConstants = runtimeConstants; + } + + inline const boost::optional<RuntimeConstants>& getRuntimeConstants() const { + return _runtimeConstants; + } + inline void setArrayFilters(const std::vector<BSONObj>& arrayFilters) { _arrayFilters = arrayFilters; } @@ -210,6 +219,10 @@ public: builder << " updateModification: " << _updateMod.toString(); builder << " stmtId: " << _stmtId; + if (_runtimeConstants) { + builder << "runtimeConstants: " << _runtimeConstants->toBSON().toString(); + } + builder << " arrayFilters: ["; bool first = true; for (auto arrayFilter : _arrayFilters) { @@ -248,6 +261,9 @@ private: // Contains the modifiers to apply to matched objects, or a replacement document. write_ops::UpdateModification _updateMod; + // Contains any constant values which may be required by the query or update operation. + boost::optional<RuntimeConstants> _runtimeConstants; + // Filters to specify which array elements should be updated. std::vector<BSONObj> _arrayFilters; diff --git a/src/mongo/db/ops/write_ops.idl b/src/mongo/db/ops/write_ops.idl index c570b799b36..95ef7f5e71d 100644 --- a/src/mongo/db/ops/write_ops.idl +++ b/src/mongo/db/ops/write_ops.idl @@ -33,6 +33,7 @@ global: imports: - "mongo/db/logical_session_id.idl" + - "mongo/db/pipeline/runtime_constants.idl" types: multi_delete_bool: @@ -163,6 +164,11 @@ commands: description: "An array of one or more update statements to perform." type: array<UpdateOpEntry> supports_doc_sequence: true + runtimeConstants: + description: "A collection of values that do not change once computed. These are + used by pipeline-style update operations." + type: RuntimeConstants + optional: true delete: description: "Parser for the 'delete' command." diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp index 3bab66af9ae..acd90695b4f 100644 --- a/src/mongo/db/ops/write_ops_exec.cpp +++ b/src/mongo/db/ops/write_ops_exec.cpp @@ -685,7 +685,8 @@ static SingleWriteResult performSingleUpdateOp(OperationContext* opCtx, static SingleWriteResult performSingleUpdateOpWithDupKeyRetry(OperationContext* opCtx, const NamespaceString& ns, StmtId stmtId, - const write_ops::UpdateOpEntry& op) { + const write_ops::UpdateOpEntry& op, + RuntimeConstants runtimeConstants) { globalOpCounters.gotUpdate(); ServerWriteConcernMetrics::get(opCtx)->recordWriteConcernForUpdate(opCtx->getWriteConcern()); auto& curOp = *CurOp::get(opCtx); @@ -707,6 +708,7 @@ static SingleWriteResult performSingleUpdateOpWithDupKeyRetry(OperationContext* UpdateRequest request(ns); request.setQuery(op.getQ()); request.setUpdateModification(op.getU()); + request.setRuntimeConstants(std::move(runtimeConstants)); request.setCollation(write_ops::collationOf(op)); request.setStmtId(stmtId); request.setArrayFilters(write_ops::arrayFiltersOf(op)); @@ -770,6 +772,11 @@ WriteResult performUpdates(OperationContext* opCtx, const write_ops::Update& who WriteResult out; out.results.reserve(wholeOp.getUpdates().size()); + // If the update command specified runtime constants, we adopt them. Otherwise, we set them to + // the current local and cluster time. These constants are applied to each update in the batch. + const auto& runtimeConstants = + wholeOp.getRuntimeConstants().value_or(Variables::generateRuntimeConstants(opCtx)); + for (auto&& singleOp : wholeOp.getUpdates()) { const auto stmtId = getStmtIdForWriteOp(opCtx, wholeOp, stmtIdIndex++); if (opCtx->getTxnNumber()) { @@ -796,7 +803,7 @@ WriteResult performUpdates(OperationContext* opCtx, const write_ops::Update& who try { lastOpFixer.startingOp(); out.results.emplace_back(performSingleUpdateOpWithDupKeyRetry( - opCtx, wholeOp.getNamespace(), stmtId, singleOp)); + opCtx, wholeOp.getNamespace(), stmtId, singleOp, runtimeConstants)); lastOpFixer.finishedOpSuccessfully(); } catch (const DBException& ex) { const bool canContinue = diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index 3644881ca0d..24e70b27ada 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -594,6 +594,16 @@ env.CppUnitTest( ) env.Library( + target='runtime_constants_idl', + source=[ + env.Idlc('runtime_constants.idl')[0] + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/idl/idl_parser', + ], +) + +env.Library( target='document_sources_idl', source=[ env.Idlc('document_source_change_stream.idl')[0], @@ -603,7 +613,6 @@ env.Library( env.Idlc('document_source_out.idl')[0], env.Idlc('document_source_replace_root.idl')[0], env.Idlc('exchange_spec.idl')[0], - env.Idlc('runtime_constants.idl')[0], env.Idlc('value.idl')[0], 'document_source_merge_spec.cpp', 'resume_token.cpp' @@ -614,6 +623,7 @@ env.Library( '$BUILD_DIR/mongo/idl/idl_parser', '$BUILD_DIR/mongo/s/common_s', 'document_value', + 'runtime_constants_idl', ], ) diff --git a/src/mongo/db/pipeline/expression_context.cpp b/src/mongo/db/pipeline/expression_context.cpp index 21651f6f2b8..0054e0de98c 100644 --- a/src/mongo/db/pipeline/expression_context.cpp +++ b/src/mongo/db/pipeline/expression_context.cpp @@ -65,11 +65,13 @@ ExpressionContext::ExpressionContext(OperationContext* opCtx, if (request.getRuntimeConstants()) { variables.setRuntimeConstants(request.getRuntimeConstants().get()); } else { - variables.generateRuntimeConstants(opCtx); + variables.setDefaultRuntimeConstants(opCtx); } } -ExpressionContext::ExpressionContext(OperationContext* opCtx, const CollatorInterface* collator) +ExpressionContext::ExpressionContext(OperationContext* opCtx, + const CollatorInterface* collator, + const boost::optional<RuntimeConstants>& runtimeConstants) : opCtx(opCtx), mongoProcessInterface(std::make_shared<StubMongoProcessInterface>()), timeZoneDatabase(opCtx && opCtx->getServiceContext() @@ -78,7 +80,11 @@ ExpressionContext::ExpressionContext(OperationContext* opCtx, const CollatorInte variablesParseState(variables.useIdGenerator()), _collator(collator), _documentComparator(_collator), - _valueComparator(_collator) {} + _valueComparator(_collator) { + if (runtimeConstants) { + variables.setRuntimeConstants(*runtimeConstants); + } +} ExpressionContext::ExpressionContext(NamespaceString nss, std::shared_ptr<MongoProcessInterface> processInterface, diff --git a/src/mongo/db/pipeline/expression_context.h b/src/mongo/db/pipeline/expression_context.h index c02310887c7..4ed86cf5efb 100644 --- a/src/mongo/db/pipeline/expression_context.h +++ b/src/mongo/db/pipeline/expression_context.h @@ -108,10 +108,12 @@ public: boost::optional<UUID> collUUID); /** - * Constructs an ExpressionContext to be used for MatchExpression parsing outside of the context - * of aggregation. + * Constructs an ExpressionContext suitable for use outside of the aggregation system, including + * for MatchExpression parsing and executing pipeline-style operations in the Update system. */ - ExpressionContext(OperationContext* opCtx, const CollatorInterface* collator); + ExpressionContext(OperationContext* opCtx, + const CollatorInterface* collator, + const boost::optional<RuntimeConstants>& runtimeConstants = boost::none); /** * Used by a pipeline to check for interrupts so that killOp() works. Throws a UserAssertion if diff --git a/src/mongo/db/pipeline/expression_context_for_test.h b/src/mongo/db/pipeline/expression_context_for_test.h index 5438937133e..d402f304bb3 100644 --- a/src/mongo/db/pipeline/expression_context_for_test.h +++ b/src/mongo/db/pipeline/expression_context_for_test.h @@ -60,7 +60,7 @@ public: // initialize with a nullptr and set post-construction. timeZoneDatabase = TimeZoneDatabase::get(_serviceContext.getServiceContext()); opCtx = _testOpCtx.get(); - RuntimeConstants constants(Date_t::now(), Timestamp()); + RuntimeConstants constants(Date_t::now(), Timestamp(1, 0)); variables.setRuntimeConstants(constants); } diff --git a/src/mongo/db/pipeline/variables.cpp b/src/mongo/db/pipeline/variables.cpp index d8a17fe72d2..9e7e2c9256e 100644 --- a/src/mongo/db/pipeline/variables.cpp +++ b/src/mongo/db/pipeline/variables.cpp @@ -190,23 +190,31 @@ RuntimeConstants Variables::getRuntimeConstants() const { void Variables::setRuntimeConstants(const RuntimeConstants& constants) { _runtimeConstants[kNowId] = Value(constants.getLocalNow()); - _runtimeConstants[kClusterTimeId] = Value(constants.getClusterTime()); + // We use a null Timestamp to indicate that the clusterTime is not available; this can happen if + // the logical clock is not running. We do not use boost::optional because this would allow the + // IDL to serialize a RuntimConstants without clusterTime, which should always be an error. + if (!constants.getClusterTime().isNull()) { + _runtimeConstants[kClusterTimeId] = Value(constants.getClusterTime()); + } } -void Variables::generateRuntimeConstants(OperationContext* opCtx) { - _runtimeConstants[kNowId] = Value(Date_t::now()); +void Variables::setDefaultRuntimeConstants(OperationContext* opCtx) { + setRuntimeConstants(Variables::generateRuntimeConstants(opCtx)); +} +RuntimeConstants Variables::generateRuntimeConstants(OperationContext* opCtx) { + // On a standalone, the clock may not be running and $$CLUSTER_TIME is unavailable. If the + // logical clock is available, set the clusterTime in the runtime constants. Otherwise, the + // clusterTime is set to the null Timestamp. if (opCtx->getClient()) { if (auto logicalClock = LogicalClock::get(opCtx); logicalClock) { auto clusterTime = logicalClock->getClusterTime(); - - // On a standalone mongod the logical clock may not be running and $$CLUSTER_TIME is not - // available. if (clusterTime != LogicalTime::kUninitialized) { - _runtimeConstants[kClusterTimeId] = Value(clusterTime.asTimestamp()); + return {Date_t::now(), clusterTime.asTimestamp()}; } } } + return {Date_t::now(), Timestamp()}; } Variables::Id VariablesParseState::defineVariable(StringData name) { diff --git a/src/mongo/db/pipeline/variables.h b/src/mongo/db/pipeline/variables.h index 22a0e88526f..9627659b25b 100644 --- a/src/mongo/db/pipeline/variables.h +++ b/src/mongo/db/pipeline/variables.h @@ -48,6 +48,11 @@ public: using Id = int64_t; /** + * Generate runtime constants using the current local and cluster times. + */ + static RuntimeConstants generateRuntimeConstants(OperationContext* opCtx); + + /** * Generates Variables::Id and keeps track of the number of Ids handed out. Each successive Id * generated by an instance of this class must be greater than all preceding Ids. */ @@ -134,9 +139,9 @@ public: void setRuntimeConstants(const RuntimeConstants& constants); /** - * Generate values that must be constant during the execution. + * Set the runtime constants using the current local and cluster times. */ - void generateRuntimeConstants(OperationContext* opCtx); + void setDefaultRuntimeConstants(OperationContext* opCtx); private: struct ValueAndState { diff --git a/src/mongo/s/commands/cluster_write_cmd.cpp b/src/mongo/s/commands/cluster_write_cmd.cpp index 42d692d601b..23e8e92b48c 100644 --- a/src/mongo/s/commands/cluster_write_cmd.cpp +++ b/src/mongo/s/commands/cluster_write_cmd.cpp @@ -39,6 +39,7 @@ #include "mongo/db/commands/write_commands/write_commands_common.h" #include "mongo/db/curop.h" #include "mongo/db/lasterror.h" +#include "mongo/db/logical_clock.h" #include "mongo/db/stats/counters.h" #include "mongo/db/storage/duplicate_key_error_info.h" #include "mongo/executor/task_executor_pool.h" @@ -621,8 +622,12 @@ private: std::unique_ptr<CommandInvocation> parse(OperationContext* opCtx, const OpMsgRequest& request) final { - return stdx::make_unique<Invocation>( - this, request, BatchedCommandRequest::parseUpdate(request)); + auto parsedRequest = BatchedCommandRequest::parseUpdate(request); + uassert(51195, + "Cannot specify runtime constants option to a mongos", + !parsedRequest.hasRuntimeConstants()); + parsedRequest.setRuntimeConstants(Variables::generateRuntimeConstants(opCtx)); + return stdx::make_unique<Invocation>(this, request, std::move(parsedRequest)); } std::string help() const override { diff --git a/src/mongo/s/write_ops/SConscript b/src/mongo/s/write_ops/SConscript index 43d38aecc7e..d1205ab42b4 100644 --- a/src/mongo/s/write_ops/SConscript +++ b/src/mongo/s/write_ops/SConscript @@ -15,6 +15,7 @@ env.Library( LIBDEPS=[ '$BUILD_DIR/mongo/base', '$BUILD_DIR/mongo/db/common', + '$BUILD_DIR/mongo/db/logical_clock', '$BUILD_DIR/mongo/db/ops/write_ops_parsers', '$BUILD_DIR/mongo/db/repl/optime', '$BUILD_DIR/mongo/db/commands', diff --git a/src/mongo/s/write_ops/batch_write_op.cpp b/src/mongo/s/write_ops/batch_write_op.cpp index 29cc054c951..74e75276f7c 100644 --- a/src/mongo/s/write_ops/batch_write_op.cpp +++ b/src/mongo/s/write_ops/batch_write_op.cpp @@ -481,12 +481,15 @@ BatchedCommandRequest BatchWriteOp::buildBatchRequest( insertOp.setDocuments(std::move(*insertDocs)); return insertOp; }()); - case BatchedCommandRequest::BatchType_Update: + case BatchedCommandRequest::BatchType_Update: { return BatchedCommandRequest([&] { write_ops::Update updateOp(_clientRequest.getNS()); updateOp.setUpdates(std::move(*updates)); + // Each child batch inherits its runtime constants from the parent batch. + updateOp.setRuntimeConstants(_clientRequest.getRuntimeConstants()); return updateOp; }()); + } case BatchedCommandRequest::BatchType_Delete: return BatchedCommandRequest([&] { write_ops::Delete deleteOp(_clientRequest.getNS()); diff --git a/src/mongo/s/write_ops/batched_command_request.h b/src/mongo/s/write_ops/batched_command_request.h index 786fa26d43c..1c454adaecc 100644 --- a/src/mongo/s/write_ops/batched_command_request.h +++ b/src/mongo/s/write_ops/batched_command_request.h @@ -115,6 +115,21 @@ public: return *_shardVersion; } + void setRuntimeConstants(const RuntimeConstants& runtimeConstants) { + invariant(_updateReq); + _updateReq->setRuntimeConstants(runtimeConstants); + } + + bool hasRuntimeConstants() const { + invariant(_updateReq); + return _updateReq->getRuntimeConstants().has_value(); + } + + const boost::optional<RuntimeConstants>& getRuntimeConstants() const { + invariant(_updateReq); + return _updateReq->getRuntimeConstants(); + } + const write_ops::WriteCommandBase& getWriteCommandBase() const; void setWriteCommandBase(write_ops::WriteCommandBase writeCommandBase); |