diff options
author | Nick Zolnierz <nicholas.zolnierz@mongodb.com> | 2018-04-10 10:04:41 -0400 |
---|---|---|
committer | Nick Zolnierz <nicholas.zolnierz@mongodb.com> | 2018-05-29 09:51:51 -0400 |
commit | a76082905d63ac8aaaae25e5c76812e6edf9bc07 (patch) | |
tree | 5882b480ff3b7378a30f27106cd8fad6e8271407 /src/mongo/s/commands/cluster_aggregate.cpp | |
parent | 8150b50f14579b6cbd673f12968726670f6e1b78 (diff) | |
download | mongo-a76082905d63ac8aaaae25e5c76812e6edf9bc07.tar.gz |
SERVER-32088: ChangeStream resumeAfter does not work on sharded collections if not all shards have chunks for the collection
Diffstat (limited to 'src/mongo/s/commands/cluster_aggregate.cpp')
-rw-r--r-- | src/mongo/s/commands/cluster_aggregate.cpp | 185 |
1 files changed, 124 insertions, 61 deletions
diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp index 46badd74eb5..3c8ecd50278 100644 --- a/src/mongo/s/commands/cluster_aggregate.cpp +++ b/src/mongo/s/commands/cluster_aggregate.cpp @@ -148,20 +148,11 @@ Status appendCursorResponseToCommandResult(const ShardId& shardId, return getStatusFromCommandResult(result->asTempObj()); } -bool mustRunOnAllShards(const NamespaceString& nss, - const boost::optional<CachedCollectionRoutingInfo>& routingInfo, - const LiteParsedPipeline& litePipe) { - // Non-existent routing table is only valid for $changeStream aggregations which have been - // opened prior to the creation of the target database. - invariant(routingInfo || litePipe.hasChangeStream()); +bool mustRunOnAllShards(const NamespaceString& nss, const LiteParsedPipeline& litePipe) { // The following aggregations must be routed to all shards: - // - Any collectionless aggregation such as $currentOp - // - $changeStream on a non-existent database - // - $changeStream on a sharded collection - const bool dbExists = static_cast<bool>(routingInfo); - const bool nsIsSharded = dbExists && static_cast<bool>(routingInfo->cm()); - return !dbExists || nss.isCollectionlessAggregateNS() || - (nsIsSharded && litePipe.hasChangeStream()); + // - Any collectionless aggregation, such as non-localOps $currentOp. + // - Any aggregation which begins with a $changeStream stage. + return nss.isCollectionlessAggregateNS() || litePipe.hasChangeStream(); } StatusWith<CachedCollectionRoutingInfo> getExecutionNsRoutingInfo(OperationContext* opCtx, @@ -205,7 +196,9 @@ BSONObj createCommandForTargetedShards( const AggregationRequest& request, const BSONObj originalCmdObj, const std::unique_ptr<Pipeline, PipelineDeleter>& pipelineForTargetedShards, + const BSONObj collationObj, boost::optional<LogicalTime> atClusterTime) { + // Create the command for the shards. MutableDocument targetedCmd(request.serializeToCommandObj()); targetedCmd[AggregationRequest::kFromMongosName] = Value(true); @@ -233,6 +226,10 @@ BSONObj createCommandForTargetedShards( targetedCmd.reset(wrapAggAsExplain(targetedCmd.freeze(), *explainVerbosity)); } + if (!collationObj.isEmpty()) { + targetedCmd[AggregationRequest::kCollationName] = Value(collationObj); + } + if (opCtx->getTxnNumber()) { invariant(!targetedCmd.hasField(OperationSessionInfo::kTxnNumberFieldName)); targetedCmd[OperationSessionInfo::kTxnNumberFieldName] = @@ -282,7 +279,7 @@ std::vector<RemoteCursor> establishShardCursors( const BSONObj& collation) { LOG(1) << "Dispatching command " << redact(cmdObj) << " to establish cursors on shards"; - const bool mustRunOnAll = mustRunOnAllShards(nss, routingInfo, litePipe); + const bool mustRunOnAll = mustRunOnAllShards(nss, litePipe); std::set<ShardId> shardIds = getTargetedShards(opCtx, mustRunOnAll, routingInfo, shardQuery, collation); std::vector<std::pair<ShardId, BSONObj>> requests; @@ -363,7 +360,8 @@ DispatchShardPipelineResults dispatchShardPipeline( BSONObj originalCmdObj, const AggregationRequest& aggRequest, const LiteParsedPipeline& liteParsedPipeline, - std::unique_ptr<Pipeline, PipelineDeleter> pipeline) { + std::unique_ptr<Pipeline, PipelineDeleter> pipeline, + BSONObj collationObj) { // The process is as follows: // - First, determine whether we need to target more than one shard. If so, we split the // pipeline; if not, we retain the existing pipeline. @@ -398,8 +396,7 @@ DispatchShardPipelineResults dispatchShardPipeline( : boost::optional<CachedCollectionRoutingInfo>{}; // Determine whether we can run the entire aggregation on a single shard. - const bool mustRunOnAll = - mustRunOnAllShards(executionNss, executionNsRoutingInfo, liteParsedPipeline); + const bool mustRunOnAll = mustRunOnAllShards(executionNss, liteParsedPipeline); std::set<ShardId> shardIds = getTargetedShards( opCtx, mustRunOnAll, executionNsRoutingInfo, shardQuery, aggRequest.getCollation()); @@ -423,7 +420,7 @@ DispatchShardPipelineResults dispatchShardPipeline( // Generate the command object for the targeted shards. BSONObj targetedCommand = createCommandForTargetedShards( - opCtx, aggRequest, originalCmdObj, pipelineForTargetedShards, atClusterTime); + opCtx, aggRequest, originalCmdObj, pipelineForTargetedShards, collationObj, atClusterTime); // Refresh the shard registry if we're targeting all shards. We need the shard registry // to be at least as current as the logical time used when creating the command for @@ -617,16 +614,31 @@ BSONObj establishMergingMongosCursor(OperationContext* opCtx, return cursorResponse.obj(); } -BSONObj getDefaultCollationForUnshardedCollection(const Shard* primaryShard, - const NamespaceString& nss) { +/** + * Returns the output of the listCollections command filtered to the namespace 'nss'. + */ +BSONObj getUnshardedCollInfo(const Shard* primaryShard, const NamespaceString& nss) { ScopedDbConnection conn(primaryShard->getConnString()); - BSONObj defaultCollation; std::list<BSONObj> all = conn->getCollectionInfos(nss.db().toString(), BSON("name" << nss.coll())); if (all.empty()) { - return defaultCollation; + // Collection does not exist, return an empty object. + return BSONObj(); + } + return all.front(); +} + +/** + * Returns the collection default collation or the simple collator if there is no default. If the + * collection does not exist, then returns an empty BSON Object. + */ +BSONObj getDefaultCollationForUnshardedCollection(const BSONObj collectionInfo) { + if (collectionInfo.isEmpty()) { + // Collection does not exist, return an empty object. + return BSONObj(); } - BSONObj collectionInfo = all.front(); + + BSONObj defaultCollation = CollationSpec::kSimpleSpec; if (collectionInfo["options"].type() == BSONType::Object) { BSONObj collectionOptions = collectionInfo["options"].Obj(); BSONElement collationElement; @@ -644,6 +656,60 @@ BSONObj getDefaultCollationForUnshardedCollection(const Shard* primaryShard, return defaultCollation; } +/** + * Populates the "collation" and "uuid" parameters with the following semantics: + * - The "collation" parameter will be set to the default collation for the collection or the + * simple collation if there is no default. If the collection does not exist, this will be set to an + * empty object. + * - The "uuid" is retrieved from the chunk manager for sharded collections or the listCollections + * output for unsharded collections. + */ +std::pair<BSONObj, boost::optional<UUID>> getCollationAndUUID( + const boost::optional<CachedCollectionRoutingInfo>& routingInfo, + const NamespaceString& nss, + const AggregationRequest& request) { + const bool collectionIsSharded = (routingInfo && routingInfo->cm()); + const bool collectionIsNotSharded = (routingInfo && !routingInfo->cm()); + + // If the collection is unsharded, obtain collInfo from the primary shard. + const auto unshardedCollInfo = collectionIsNotSharded + ? getUnshardedCollInfo(routingInfo->db().primary().get(), nss) + : BSONObj(); + + // Return the collection UUID if available, or boost::none otherwise. + const auto getUUID = [&]() -> auto { + if (collectionIsSharded) { + return routingInfo->cm()->getUUID(); + } else { + return unshardedCollInfo["info"] && unshardedCollInfo["info"]["uuid"] + ? boost::optional<UUID>{uassertStatusOK( + UUID::parse(unshardedCollInfo["info"]["uuid"]))} + : boost::optional<UUID>{boost::none}; + } + }; + + // If the collection exists, return its default collation, or the simple + // collation if no explicit default is present. If the collection does not + // exist, return an empty BSONObj. + const auto getCollation = [&]() -> auto { + if (!collectionIsSharded && !collectionIsNotSharded) { + return BSONObj(); + } + if (collectionIsNotSharded) { + return getDefaultCollationForUnshardedCollection(unshardedCollInfo); + } else { + return routingInfo->cm()->getDefaultCollator() + ? routingInfo->cm()->getDefaultCollator()->getSpec().toBSON() + : CollationSpec::kSimpleSpec; + } + }; + + // If the user specified an explicit collation, we always adopt it. Otherwise, + // obtain the collection default or simple collation as appropriate, and return + // it along with the collection's UUID. + return {request.getCollation().isEmpty() ? getCollation() : request.getCollation(), getUUID()}; +} + ShardId pickMergingShard(OperationContext* opCtx, const DispatchShardPipelineResults& dispatchResults, ShardId primaryShard) { @@ -666,6 +732,7 @@ ShardId pickMergingShard(OperationContext* opCtx, // collections are sharded. StringMap<ExpressionContext::ResolvedNamespace> resolveInvolvedNamespaces( OperationContext* opCtx, const LiteParsedPipeline& litePipe) { + StringMap<ExpressionContext::ResolvedNamespace> resolvedNamespaces; for (auto&& nss : litePipe.getInvolvedNamespaces()) { const auto resolvedNsRoutingInfo = @@ -678,44 +745,31 @@ StringMap<ExpressionContext::ResolvedNamespace> resolveInvolvedNamespaces( } // Build an appropriate ExpressionContext for the pipeline. This helper validates that all involved -// namespaces are unsharded, obtains the appropriate user-defined or default collator, and creates a -// MongoProcessInterface for use by the pipeline's stages. -boost::intrusive_ptr<ExpressionContext> makeExpressionContext( - OperationContext* opCtx, - const NamespaceString& executionNss, - const AggregationRequest& request, - const LiteParsedPipeline& litePipe, - const boost::optional<CachedCollectionRoutingInfo>& routingInfo) { - // Determine the appropriate collation for the ExpressionContext. - const bool collectionIsSharded = (routingInfo && routingInfo->cm()); - const bool collectionIsNotSharded = (routingInfo && !routingInfo->cm()); - std::unique_ptr<CollatorInterface> collation; +// namespaces are unsharded, instantiates an appropriate collator, creates a MongoProcessInterface +// for use by the pipeline's stages, and optionally extracts the UUID from the collection info if +// present. +boost::intrusive_ptr<ExpressionContext> makeExpressionContext(OperationContext* opCtx, + const AggregationRequest& request, + const LiteParsedPipeline& litePipe, + BSONObj collationObj, + boost::optional<UUID> uuid) { - if (!request.getCollation().isEmpty()) { - collation = uassertStatusOK(CollatorFactoryInterface::get(opCtx->getServiceContext()) - ->makeFromBSON(request.getCollation())); - } else if (collectionIsSharded) { - if (routingInfo->cm()->getDefaultCollator()) { - collation = routingInfo->cm()->getDefaultCollator()->clone(); - } - } else if (collectionIsNotSharded) { - // Get collection metadata from primary chunk. - auto collationObj = getDefaultCollationForUnshardedCollection( - routingInfo->db().primary().get(), executionNss); - if (!collationObj.isEmpty()) { - collation = uassertStatusOK(CollatorFactoryInterface::get(opCtx->getServiceContext()) - ->makeFromBSON(collationObj)); - } + std::unique_ptr<CollatorInterface> collation; + if (!collationObj.isEmpty()) { + // This will be null if attempting to build an interface for the simple collator. + collation = uassertStatusOK( + CollatorFactoryInterface::get(opCtx->getServiceContext())->makeFromBSON(collationObj)); } - // Create the expression context, and set 'inMongos' to true before returning it. We explicitly - // do *not* set mergeCtx->tempDir. Note that we resolve the pipeline's involved namespaces here, - // validating that none of them are sharded. + // Create the expression context, and set 'inMongos' to true. We explicitly do *not* set + // mergeCtx->tempDir. auto mergeCtx = new ExpressionContext(opCtx, request, std::move(collation), std::make_shared<PipelineS::MongoSInterface>(), - resolveInvolvedNamespaces(opCtx, litePipe)); + resolveInvolvedNamespaces(opCtx, litePipe), + uuid); + mergeCtx->inMongos = true; return mergeCtx; } @@ -861,7 +915,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, } // Determine whether this aggregation must be dispatched to all shards in the cluster. - const bool mustRunOnAll = mustRunOnAllShards(namespaces.executionNss, routingInfo, litePipe); + const bool mustRunOnAll = mustRunOnAllShards(namespaces.executionNss, litePipe); // If we don't have a routing table, then this is a $changeStream which must run on all shards. invariant(routingInfo || (mustRunOnAll && litePipe.hasChangeStream())); @@ -877,11 +931,15 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, return aggPassthrough(opCtx, namespaces, primaryShardId, cmdObj, request, litePipe, result); } + // Populate the collection UUID and the appropriate collation to use. + auto collInfo = getCollationAndUUID(routingInfo, namespaces.executionNss, request); + BSONObj collationObj = collInfo.first; + boost::optional<UUID> uuid = collInfo.second; + // Build an ExpressionContext for the pipeline. This instantiates an appropriate collator, // resolves all involved namespaces, and creates a shared MongoProcessInterface for use by the // pipeline's stages. - auto expCtx = - makeExpressionContext(opCtx, namespaces.executionNss, request, litePipe, routingInfo); + auto expCtx = makeExpressionContext(opCtx, request, litePipe, collationObj, uuid); // Parse and optimize the full pipeline. auto pipeline = uassertStatusOK(Pipeline::parse(request.getPipeline(), expCtx)); @@ -894,8 +952,13 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, } // If not, split the pipeline as necessary and dispatch to the relevant shards. - auto shardDispatchResults = dispatchShardPipeline( - expCtx, namespaces.executionNss, cmdObj, request, litePipe, std::move(pipeline)); + auto shardDispatchResults = dispatchShardPipeline(expCtx, + namespaces.executionNss, + cmdObj, + request, + litePipe, + std::move(pipeline), + collationObj); // If the operation is an explain, then we verify that it succeeded on all targeted shards, // write the results to the output builder, and return immediately. @@ -965,8 +1028,8 @@ Status ClusterAggregate::aggPassthrough(OperationContext* opCtx, // Format the command for the shard. This adds the 'fromMongos' field, wraps the command as an // explain if necessary, and rewrites the result into a format safe to forward to shards. - cmdObj = CommandHelpers::filterCommandRequestForPassthrough( - createCommandForTargetedShards(opCtx, aggRequest, cmdObj, nullptr, atClusterTime)); + cmdObj = CommandHelpers::filterCommandRequestForPassthrough(createCommandForTargetedShards( + opCtx, aggRequest, cmdObj, nullptr, BSONObj(), atClusterTime)); auto cmdResponse = uassertStatusOK(shard->runCommandWithFixedRetryAttempts( opCtx, |