diff options
author | Bernard Gorman <bernard.gorman@mongodb.com> | 2019-12-02 21:55:49 +0000 |
---|---|---|
committer | evergreen <evergreen@mongodb.com> | 2019-12-02 21:55:49 +0000 |
commit | 70ef84cd14388602b3d763b12c6f8448ba62a9d8 (patch) | |
tree | 104b86a7dfe90c4fc21ab246b74e61fb17bbacd2 /src | |
parent | cb125fa6284810a767a6e2ec50f940eb1eca7fad (diff) | |
download | mongo-70ef84cd14388602b3d763b12c6f8448ba62a9d8.tar.gz |
SERVER-43860 Upgrade/downgrade mechanism for new $merge upsert behaviour
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/pipeline/aggregation_request.cpp | 11 | ||||
-rw-r--r-- | src/mongo/db/pipeline/aggregation_request.h | 13 | ||||
-rw-r--r-- | src/mongo/db/pipeline/expression_context.cpp | 14 | ||||
-rw-r--r-- | src/mongo/db/pipeline/expression_context.h | 7 | ||||
-rw-r--r-- | src/mongo/db/pipeline/process_interface_standalone.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/pipeline/sharded_agg_helpers.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/views/resolved_view.cpp | 1 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_map_reduce_agg.cpp | 5 |
8 files changed, 56 insertions, 4 deletions
diff --git a/src/mongo/db/pipeline/aggregation_request.cpp b/src/mongo/db/pipeline/aggregation_request.cpp index 5964e3ef76a..f7e9710dac9 100644 --- a/src/mongo/db/pipeline/aggregation_request.cpp +++ b/src/mongo/db/pipeline/aggregation_request.cpp @@ -212,6 +212,16 @@ StatusWith<AggregationRequest> AggregationRequest::parseFromBSON( } request.setUse44SortKeys(elem.boolean()); + } else if (fieldName == kUseNewUpsert) { + // TODO SERVER-44884: After branching for 4.5, we will continue to accept this option + // for upgrade purposes but will ignore it, as any supported version will be capable of + // using the new upsert mechanism. In 4.7 we will completely remove this parameter. + if (elem.type() != BSONType::Bool) { + return {ErrorCodes::TypeMismatch, + str::stream() << kUseNewUpsert << " must be a boolean, not a " + << typeName(elem.type())}; + } + request.setUseNewUpsert(elem.boolean()); } else if (!isGenericArgument(fieldName)) { return {ErrorCodes::FailedToParse, str::stream() << "unrecognized field '" << elem.fieldName() << "'"}; @@ -311,6 +321,7 @@ Document AggregationRequest::serializeToCommandObj() const { // Only serialize runtime constants if any were specified. {kRuntimeConstants, _runtimeConstants ? Value(_runtimeConstants->toBSON()) : Value()}, {kUse44SortKeys, _use44SortKeys ? Value(true) : Value()}, + {kUseNewUpsert, _useNewUpsert ? Value(true) : Value()}, }; } } // namespace mongo diff --git a/src/mongo/db/pipeline/aggregation_request.h b/src/mongo/db/pipeline/aggregation_request.h index 223aa172c3d..19ad92859b1 100644 --- a/src/mongo/db/pipeline/aggregation_request.h +++ b/src/mongo/db/pipeline/aggregation_request.h @@ -64,6 +64,7 @@ public: static constexpr StringData kExchangeName = "exchange"_sd; static constexpr StringData kRuntimeConstants = "runtimeConstants"_sd; static constexpr StringData kUse44SortKeys = "use44SortKeys"_sd; + static constexpr StringData kUseNewUpsert = "useNewUpsert"_sd; static constexpr long long kDefaultBatchSize = 101; @@ -221,6 +222,10 @@ public: return _use44SortKeys; } + bool getUseNewUpsert() const { + return _useNewUpsert; + } + // // Setters for optional fields. // @@ -289,6 +294,10 @@ public: _use44SortKeys = use44SortKeys; } + void setUseNewUpsert(bool useNewUpsert) { + _useNewUpsert = useNewUpsert; + } + private: // Required fields. const NamespaceString _nss; @@ -342,5 +351,9 @@ private: // All aggregation requests from mongos-4.4 set this flag, indicating that shard results should // use the updated sort key format when returning change stream results. bool _use44SortKeys = false; + + // Indicates whether the aggregation may use the new 'upsertSupplied' mechanism when running + // $merge stages. All 4.4 mongoS and some versions of 4.2 set this flag. + bool _useNewUpsert = false; }; } // namespace mongo diff --git a/src/mongo/db/pipeline/expression_context.cpp b/src/mongo/db/pipeline/expression_context.cpp index 1ecd751eb69..1a9eb04e4ef 100644 --- a/src/mongo/db/pipeline/expression_context.cpp +++ b/src/mongo/db/pipeline/expression_context.cpp @@ -62,7 +62,12 @@ ExpressionContext::ExpressionContext(OperationContext* opCtx, std::move(collator), std::move(processInterface), std::move(resolvedNamespaces), - std::move(collUUID)) {} + std::move(collUUID)) { + // Any request which did not originate from a mongoS, or which did originate from a mongoS but + // has the 'useNewUpsert' flag set, can use the new upsertSupplied mechanism for $merge. + // TODO SERVER-44884: Remove this flag after we branch for 4.5. + useNewUpsert = request.getUseNewUpsert() || !request.isFromMongos(); +} ExpressionContext::ExpressionContext( OperationContext* opCtx, @@ -100,6 +105,12 @@ ExpressionContext::ExpressionContext( variables.setRuntimeConstants(*runtimeConstants); else variables.setDefaultRuntimeConstants(opCtx); + + // Any request which did not originate from a mongoS can use the new upsertSupplied mechanism. + // This is used to set 'useNewUpsert' when constructing a MR context on mongoS or mongoD. The MR + // on mongoS will be issued as an aggregation to the shards and will use the other constructor. + // TODO SERVER-44884: Remove this flag after we branch for 4.5. + useNewUpsert = !fromMongos; } ExpressionContext::ExpressionContext(OperationContext* opCtx, @@ -191,6 +202,7 @@ intrusive_ptr<ExpressionContext> ExpressionContext::copyWith( expCtx->maxFeatureCompatibilityVersion = maxFeatureCompatibilityVersion; expCtx->subPipelineDepth = subPipelineDepth; expCtx->tempDir = tempDir; + expCtx->useNewUpsert = useNewUpsert; // ExpressionContext is used both universally in Agg and in Find within a $expr. In the case // that this context is for use in $expr, the collator will be unowned and we will pass nullptr diff --git a/src/mongo/db/pipeline/expression_context.h b/src/mongo/db/pipeline/expression_context.h index 689c6120385..75cb77fb0ba 100644 --- a/src/mongo/db/pipeline/expression_context.h +++ b/src/mongo/db/pipeline/expression_context.h @@ -117,7 +117,7 @@ public: ExpressionContext(OperationContext* opCtx, const boost::optional<ExplainOptions::Verbosity>& explain, bool fromMongos, - bool needsmerge, + bool needsMerge, bool allowDiskUse, bool bypassDocumentValidation, const NamespaceString& ns, @@ -294,6 +294,11 @@ public: // "$sortKey" using the 4.2 format. bool use42ChangeStreamSortKeys = false; + // True if this context is associated with a pipeline which is permitted to use the new + // upsertSupplied mechanism for applicable $merge modes. + // TODO SERVER-44884: remove this when we branch for 4.5. + bool useNewUpsert = false; + // True if this ExpressionContext is used to parse a view definition pipeline. bool isParsingViewDefinition = false; diff --git a/src/mongo/db/pipeline/process_interface_standalone.cpp b/src/mongo/db/pipeline/process_interface_standalone.cpp index 9e70d9cec0a..548add75062 100644 --- a/src/mongo/db/pipeline/process_interface_standalone.cpp +++ b/src/mongo/db/pipeline/process_interface_standalone.cpp @@ -202,8 +202,9 @@ Update MongoInterfaceStandalone::buildUpdateOp( entry.setU(std::move(u)); entry.setC(std::move(c)); entry.setUpsert(upsert != UpsertType::kNone); - entry.setUpsertSupplied( - {{entry.getUpsert(), upsert == UpsertType::kInsertSuppliedDoc}}); + // TODO SERVER-44884: after branching for 4.5, remove the 'useNewUpsert' flag. + entry.setUpsertSupplied({{entry.getUpsert() && expCtx->useNewUpsert, + upsert == UpsertType::kInsertSuppliedDoc}}); entry.setMulti(multi); return entry; }()); diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp index 9c963533c14..b3a10e92d16 100644 --- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp +++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp @@ -184,6 +184,10 @@ BSONObj genericTransformForShards(MutableDocument&& cmdForShards, // there will only be one sort key format for changes streams, so there will be no need to // set this flag anymore. This flag has no effect on pipelines without a change stream. cmdForShards[AggregationRequest::kUse44SortKeys] = Value(true); + // TODO SERVER-44884: We set this flag to indicate that the shards should always use the new + // upsert mechanism when executing relevant $merge modes. After branching for 4.5, supported + // upgrade versions will all use the new mechanism, and we can remove this flag. + cmdForShards[AggregationRequest::kUseNewUpsert] = Value(true); } return appendAllowImplicitCreate(cmdForShards.freeze().toBson(), false); diff --git a/src/mongo/db/views/resolved_view.cpp b/src/mongo/db/views/resolved_view.cpp index a6ac9d6b4d9..632be798d6a 100644 --- a/src/mongo/db/views/resolved_view.cpp +++ b/src/mongo/db/views/resolved_view.cpp @@ -111,6 +111,7 @@ AggregationRequest ResolvedView::asExpandedViewAggregation( expandedRequest.setUnwrappedReadPref(request.getUnwrappedReadPref()); expandedRequest.setBypassDocumentValidation(request.shouldBypassDocumentValidation()); expandedRequest.setAllowDiskUse(request.shouldAllowDiskUse()); + expandedRequest.setUseNewUpsert(request.getUseNewUpsert()); // Operations on a view must always use the default collation of the view. We must have already // checked that if the user's request specifies a collation, it matches the collation of the diff --git a/src/mongo/s/commands/cluster_map_reduce_agg.cpp b/src/mongo/s/commands/cluster_map_reduce_agg.cpp index c917d8628d9..66755ea7d16 100644 --- a/src/mongo/s/commands/cluster_map_reduce_agg.cpp +++ b/src/mongo/s/commands/cluster_map_reduce_agg.cpp @@ -113,6 +113,11 @@ Document serializeToCommand(BSONObj originalCmd, const MapReduce& parsedMr, Pipe translatedCmd[AggregationRequest::kRuntimeConstants] = Value(pipeline->getContext()->getRuntimeConstants().toBSON()); + // TODO SERVER-44884: We set this flag to indicate that the shards should always use the new + // upsert mechanism when executing relevant $merge modes. After branching for 4.5, supported + // upgrade versions will all use the new mechanism, and we can remove this flag. + translatedCmd[AggregationRequest::kUseNewUpsert] = Value(true); + // Append generic command options. for (const auto& elem : CommandHelpers::appendPassthroughFields(originalCmd, BSONObj())) { translatedCmd[elem.fieldNameStringData()] = Value(elem); |