summaryrefslogtreecommitdiff
path: root/src/mongo/s/commands/cluster_aggregate.cpp
diff options
context:
space:
mode:
authorNick Zolnierz <nicholas.zolnierz@mongodb.com>2018-04-10 10:04:41 -0400
committerNick Zolnierz <nicholas.zolnierz@mongodb.com>2018-05-29 09:51:51 -0400
commita76082905d63ac8aaaae25e5c76812e6edf9bc07 (patch)
tree5882b480ff3b7378a30f27106cd8fad6e8271407 /src/mongo/s/commands/cluster_aggregate.cpp
parent8150b50f14579b6cbd673f12968726670f6e1b78 (diff)
downloadmongo-a76082905d63ac8aaaae25e5c76812e6edf9bc07.tar.gz
SERVER-32088: ChangeStream resumeAfter does not work on sharded collections if not all shards have chunks for the collection
Diffstat (limited to 'src/mongo/s/commands/cluster_aggregate.cpp')
-rw-r--r--src/mongo/s/commands/cluster_aggregate.cpp185
1 files changed, 124 insertions, 61 deletions
diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp
index 46badd74eb5..3c8ecd50278 100644
--- a/src/mongo/s/commands/cluster_aggregate.cpp
+++ b/src/mongo/s/commands/cluster_aggregate.cpp
@@ -148,20 +148,11 @@ Status appendCursorResponseToCommandResult(const ShardId& shardId,
return getStatusFromCommandResult(result->asTempObj());
}
-bool mustRunOnAllShards(const NamespaceString& nss,
- const boost::optional<CachedCollectionRoutingInfo>& routingInfo,
- const LiteParsedPipeline& litePipe) {
- // Non-existent routing table is only valid for $changeStream aggregations which have been
- // opened prior to the creation of the target database.
- invariant(routingInfo || litePipe.hasChangeStream());
+bool mustRunOnAllShards(const NamespaceString& nss, const LiteParsedPipeline& litePipe) {
// The following aggregations must be routed to all shards:
- // - Any collectionless aggregation such as $currentOp
- // - $changeStream on a non-existent database
- // - $changeStream on a sharded collection
- const bool dbExists = static_cast<bool>(routingInfo);
- const bool nsIsSharded = dbExists && static_cast<bool>(routingInfo->cm());
- return !dbExists || nss.isCollectionlessAggregateNS() ||
- (nsIsSharded && litePipe.hasChangeStream());
+ // - Any collectionless aggregation, such as non-localOps $currentOp.
+ // - Any aggregation which begins with a $changeStream stage.
+ return nss.isCollectionlessAggregateNS() || litePipe.hasChangeStream();
}
StatusWith<CachedCollectionRoutingInfo> getExecutionNsRoutingInfo(OperationContext* opCtx,
@@ -205,7 +196,9 @@ BSONObj createCommandForTargetedShards(
const AggregationRequest& request,
const BSONObj originalCmdObj,
const std::unique_ptr<Pipeline, PipelineDeleter>& pipelineForTargetedShards,
+ const BSONObj collationObj,
boost::optional<LogicalTime> atClusterTime) {
+
// Create the command for the shards.
MutableDocument targetedCmd(request.serializeToCommandObj());
targetedCmd[AggregationRequest::kFromMongosName] = Value(true);
@@ -233,6 +226,10 @@ BSONObj createCommandForTargetedShards(
targetedCmd.reset(wrapAggAsExplain(targetedCmd.freeze(), *explainVerbosity));
}
+ if (!collationObj.isEmpty()) {
+ targetedCmd[AggregationRequest::kCollationName] = Value(collationObj);
+ }
+
if (opCtx->getTxnNumber()) {
invariant(!targetedCmd.hasField(OperationSessionInfo::kTxnNumberFieldName));
targetedCmd[OperationSessionInfo::kTxnNumberFieldName] =
@@ -282,7 +279,7 @@ std::vector<RemoteCursor> establishShardCursors(
const BSONObj& collation) {
LOG(1) << "Dispatching command " << redact(cmdObj) << " to establish cursors on shards";
- const bool mustRunOnAll = mustRunOnAllShards(nss, routingInfo, litePipe);
+ const bool mustRunOnAll = mustRunOnAllShards(nss, litePipe);
std::set<ShardId> shardIds =
getTargetedShards(opCtx, mustRunOnAll, routingInfo, shardQuery, collation);
std::vector<std::pair<ShardId, BSONObj>> requests;
@@ -363,7 +360,8 @@ DispatchShardPipelineResults dispatchShardPipeline(
BSONObj originalCmdObj,
const AggregationRequest& aggRequest,
const LiteParsedPipeline& liteParsedPipeline,
- std::unique_ptr<Pipeline, PipelineDeleter> pipeline) {
+ std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
+ BSONObj collationObj) {
// The process is as follows:
// - First, determine whether we need to target more than one shard. If so, we split the
// pipeline; if not, we retain the existing pipeline.
@@ -398,8 +396,7 @@ DispatchShardPipelineResults dispatchShardPipeline(
: boost::optional<CachedCollectionRoutingInfo>{};
// Determine whether we can run the entire aggregation on a single shard.
- const bool mustRunOnAll =
- mustRunOnAllShards(executionNss, executionNsRoutingInfo, liteParsedPipeline);
+ const bool mustRunOnAll = mustRunOnAllShards(executionNss, liteParsedPipeline);
std::set<ShardId> shardIds = getTargetedShards(
opCtx, mustRunOnAll, executionNsRoutingInfo, shardQuery, aggRequest.getCollation());
@@ -423,7 +420,7 @@ DispatchShardPipelineResults dispatchShardPipeline(
// Generate the command object for the targeted shards.
BSONObj targetedCommand = createCommandForTargetedShards(
- opCtx, aggRequest, originalCmdObj, pipelineForTargetedShards, atClusterTime);
+ opCtx, aggRequest, originalCmdObj, pipelineForTargetedShards, collationObj, atClusterTime);
// Refresh the shard registry if we're targeting all shards. We need the shard registry
// to be at least as current as the logical time used when creating the command for
@@ -617,16 +614,31 @@ BSONObj establishMergingMongosCursor(OperationContext* opCtx,
return cursorResponse.obj();
}
-BSONObj getDefaultCollationForUnshardedCollection(const Shard* primaryShard,
- const NamespaceString& nss) {
+/**
+ * Returns the output of the listCollections command filtered to the namespace 'nss'.
+ */
+BSONObj getUnshardedCollInfo(const Shard* primaryShard, const NamespaceString& nss) {
ScopedDbConnection conn(primaryShard->getConnString());
- BSONObj defaultCollation;
std::list<BSONObj> all =
conn->getCollectionInfos(nss.db().toString(), BSON("name" << nss.coll()));
if (all.empty()) {
- return defaultCollation;
+ // Collection does not exist, return an empty object.
+ return BSONObj();
+ }
+ return all.front();
+}
+
+/**
+ * Returns the collection default collation or the simple collator if there is no default. If the
+ * collection does not exist, then returns an empty BSON Object.
+ */
+BSONObj getDefaultCollationForUnshardedCollection(const BSONObj collectionInfo) {
+ if (collectionInfo.isEmpty()) {
+ // Collection does not exist, return an empty object.
+ return BSONObj();
}
- BSONObj collectionInfo = all.front();
+
+ BSONObj defaultCollation = CollationSpec::kSimpleSpec;
if (collectionInfo["options"].type() == BSONType::Object) {
BSONObj collectionOptions = collectionInfo["options"].Obj();
BSONElement collationElement;
@@ -644,6 +656,60 @@ BSONObj getDefaultCollationForUnshardedCollection(const Shard* primaryShard,
return defaultCollation;
}
+/**
+ * Populates the "collation" and "uuid" parameters with the following semantics:
+ * - The "collation" parameter will be set to the default collation for the collection or the
+ * simple collation if there is no default. If the collection does not exist, this will be set to an
+ * empty object.
+ * - The "uuid" is retrieved from the chunk manager for sharded collections or the listCollections
+ * output for unsharded collections.
+ */
+std::pair<BSONObj, boost::optional<UUID>> getCollationAndUUID(
+ const boost::optional<CachedCollectionRoutingInfo>& routingInfo,
+ const NamespaceString& nss,
+ const AggregationRequest& request) {
+ const bool collectionIsSharded = (routingInfo && routingInfo->cm());
+ const bool collectionIsNotSharded = (routingInfo && !routingInfo->cm());
+
+ // If the collection is unsharded, obtain collInfo from the primary shard.
+ const auto unshardedCollInfo = collectionIsNotSharded
+ ? getUnshardedCollInfo(routingInfo->db().primary().get(), nss)
+ : BSONObj();
+
+ // Return the collection UUID if available, or boost::none otherwise.
+ const auto getUUID = [&]() -> auto {
+ if (collectionIsSharded) {
+ return routingInfo->cm()->getUUID();
+ } else {
+ return unshardedCollInfo["info"] && unshardedCollInfo["info"]["uuid"]
+ ? boost::optional<UUID>{uassertStatusOK(
+ UUID::parse(unshardedCollInfo["info"]["uuid"]))}
+ : boost::optional<UUID>{boost::none};
+ }
+ };
+
+ // If the collection exists, return its default collation, or the simple
+ // collation if no explicit default is present. If the collection does not
+ // exist, return an empty BSONObj.
+ const auto getCollation = [&]() -> auto {
+ if (!collectionIsSharded && !collectionIsNotSharded) {
+ return BSONObj();
+ }
+ if (collectionIsNotSharded) {
+ return getDefaultCollationForUnshardedCollection(unshardedCollInfo);
+ } else {
+ return routingInfo->cm()->getDefaultCollator()
+ ? routingInfo->cm()->getDefaultCollator()->getSpec().toBSON()
+ : CollationSpec::kSimpleSpec;
+ }
+ };
+
+ // If the user specified an explicit collation, we always adopt it. Otherwise,
+ // obtain the collection default or simple collation as appropriate, and return
+ // it along with the collection's UUID.
+ return {request.getCollation().isEmpty() ? getCollation() : request.getCollation(), getUUID()};
+}
+
ShardId pickMergingShard(OperationContext* opCtx,
const DispatchShardPipelineResults& dispatchResults,
ShardId primaryShard) {
@@ -666,6 +732,7 @@ ShardId pickMergingShard(OperationContext* opCtx,
// collections are sharded.
StringMap<ExpressionContext::ResolvedNamespace> resolveInvolvedNamespaces(
OperationContext* opCtx, const LiteParsedPipeline& litePipe) {
+
StringMap<ExpressionContext::ResolvedNamespace> resolvedNamespaces;
for (auto&& nss : litePipe.getInvolvedNamespaces()) {
const auto resolvedNsRoutingInfo =
@@ -678,44 +745,31 @@ StringMap<ExpressionContext::ResolvedNamespace> resolveInvolvedNamespaces(
}
// Build an appropriate ExpressionContext for the pipeline. This helper validates that all involved
-// namespaces are unsharded, obtains the appropriate user-defined or default collator, and creates a
-// MongoProcessInterface for use by the pipeline's stages.
-boost::intrusive_ptr<ExpressionContext> makeExpressionContext(
- OperationContext* opCtx,
- const NamespaceString& executionNss,
- const AggregationRequest& request,
- const LiteParsedPipeline& litePipe,
- const boost::optional<CachedCollectionRoutingInfo>& routingInfo) {
- // Determine the appropriate collation for the ExpressionContext.
- const bool collectionIsSharded = (routingInfo && routingInfo->cm());
- const bool collectionIsNotSharded = (routingInfo && !routingInfo->cm());
- std::unique_ptr<CollatorInterface> collation;
+// namespaces are unsharded, instantiates an appropriate collator, creates a MongoProcessInterface
+// for use by the pipeline's stages, and optionally extracts the UUID from the collection info if
+// present.
+boost::intrusive_ptr<ExpressionContext> makeExpressionContext(OperationContext* opCtx,
+ const AggregationRequest& request,
+ const LiteParsedPipeline& litePipe,
+ BSONObj collationObj,
+ boost::optional<UUID> uuid) {
- if (!request.getCollation().isEmpty()) {
- collation = uassertStatusOK(CollatorFactoryInterface::get(opCtx->getServiceContext())
- ->makeFromBSON(request.getCollation()));
- } else if (collectionIsSharded) {
- if (routingInfo->cm()->getDefaultCollator()) {
- collation = routingInfo->cm()->getDefaultCollator()->clone();
- }
- } else if (collectionIsNotSharded) {
- // Get collection metadata from primary chunk.
- auto collationObj = getDefaultCollationForUnshardedCollection(
- routingInfo->db().primary().get(), executionNss);
- if (!collationObj.isEmpty()) {
- collation = uassertStatusOK(CollatorFactoryInterface::get(opCtx->getServiceContext())
- ->makeFromBSON(collationObj));
- }
+ std::unique_ptr<CollatorInterface> collation;
+ if (!collationObj.isEmpty()) {
+ // This will be null if attempting to build an interface for the simple collator.
+ collation = uassertStatusOK(
+ CollatorFactoryInterface::get(opCtx->getServiceContext())->makeFromBSON(collationObj));
}
- // Create the expression context, and set 'inMongos' to true before returning it. We explicitly
- // do *not* set mergeCtx->tempDir. Note that we resolve the pipeline's involved namespaces here,
- // validating that none of them are sharded.
+ // Create the expression context, and set 'inMongos' to true. We explicitly do *not* set
+ // mergeCtx->tempDir.
auto mergeCtx = new ExpressionContext(opCtx,
request,
std::move(collation),
std::make_shared<PipelineS::MongoSInterface>(),
- resolveInvolvedNamespaces(opCtx, litePipe));
+ resolveInvolvedNamespaces(opCtx, litePipe),
+ uuid);
+
mergeCtx->inMongos = true;
return mergeCtx;
}
@@ -861,7 +915,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
}
// Determine whether this aggregation must be dispatched to all shards in the cluster.
- const bool mustRunOnAll = mustRunOnAllShards(namespaces.executionNss, routingInfo, litePipe);
+ const bool mustRunOnAll = mustRunOnAllShards(namespaces.executionNss, litePipe);
// If we don't have a routing table, then this is a $changeStream which must run on all shards.
invariant(routingInfo || (mustRunOnAll && litePipe.hasChangeStream()));
@@ -877,11 +931,15 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
return aggPassthrough(opCtx, namespaces, primaryShardId, cmdObj, request, litePipe, result);
}
+ // Populate the collection UUID and the appropriate collation to use.
+ auto collInfo = getCollationAndUUID(routingInfo, namespaces.executionNss, request);
+ BSONObj collationObj = collInfo.first;
+ boost::optional<UUID> uuid = collInfo.second;
+
// Build an ExpressionContext for the pipeline. This instantiates an appropriate collator,
// resolves all involved namespaces, and creates a shared MongoProcessInterface for use by the
// pipeline's stages.
- auto expCtx =
- makeExpressionContext(opCtx, namespaces.executionNss, request, litePipe, routingInfo);
+ auto expCtx = makeExpressionContext(opCtx, request, litePipe, collationObj, uuid);
// Parse and optimize the full pipeline.
auto pipeline = uassertStatusOK(Pipeline::parse(request.getPipeline(), expCtx));
@@ -894,8 +952,13 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
}
// If not, split the pipeline as necessary and dispatch to the relevant shards.
- auto shardDispatchResults = dispatchShardPipeline(
- expCtx, namespaces.executionNss, cmdObj, request, litePipe, std::move(pipeline));
+ auto shardDispatchResults = dispatchShardPipeline(expCtx,
+ namespaces.executionNss,
+ cmdObj,
+ request,
+ litePipe,
+ std::move(pipeline),
+ collationObj);
// If the operation is an explain, then we verify that it succeeded on all targeted shards,
// write the results to the output builder, and return immediately.
@@ -965,8 +1028,8 @@ Status ClusterAggregate::aggPassthrough(OperationContext* opCtx,
// Format the command for the shard. This adds the 'fromMongos' field, wraps the command as an
// explain if necessary, and rewrites the result into a format safe to forward to shards.
- cmdObj = CommandHelpers::filterCommandRequestForPassthrough(
- createCommandForTargetedShards(opCtx, aggRequest, cmdObj, nullptr, atClusterTime));
+ cmdObj = CommandHelpers::filterCommandRequestForPassthrough(createCommandForTargetedShards(
+ opCtx, aggRequest, cmdObj, nullptr, BSONObj(), atClusterTime));
auto cmdResponse = uassertStatusOK(shard->runCommandWithFixedRetryAttempts(
opCtx,