diff options
Diffstat (limited to 'src/mongo/s/commands/cluster_aggregate.cpp')
-rw-r--r-- | src/mongo/s/commands/cluster_aggregate.cpp | 46 |
1 files changed, 24 insertions, 22 deletions
diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp index 07bf5dcb5c9..a6887ea0498 100644 --- a/src/mongo/s/commands/cluster_aggregate.cpp +++ b/src/mongo/s/commands/cluster_aggregate.cpp @@ -61,12 +61,13 @@ namespace mongo { -Status ClusterAggregate::runAggregate(OperationContext* txn, +Status ClusterAggregate::runAggregate(OperationContext* opCtx, const Namespaces& namespaces, BSONObj cmdObj, int options, BSONObjBuilder* result) { - auto scopedShardDbStatus = ScopedShardDatabase::getExisting(txn, namespaces.executionNss.db()); + auto scopedShardDbStatus = + ScopedShardDatabase::getExisting(opCtx, namespaces.executionNss.db()); if (!scopedShardDbStatus.isOK()) { appendEmptyResultSet( *result, scopedShardDbStatus.getStatus(), namespaces.requestedNss.ns()); @@ -96,21 +97,21 @@ Status ClusterAggregate::runAggregate(OperationContext* txn, } if (!conf->isSharded(namespaces.executionNss.ns())) { - return aggPassthrough(txn, namespaces, conf, cmdObj, result, options); + return aggPassthrough(opCtx, namespaces, conf, cmdObj, result, options); } - auto chunkMgr = conf->getChunkManager(txn, namespaces.executionNss.ns()); + auto chunkMgr = conf->getChunkManager(opCtx, namespaces.executionNss.ns()); std::unique_ptr<CollatorInterface> collation; if (!request.getValue().getCollation().isEmpty()) { - collation = uassertStatusOK(CollatorFactoryInterface::get(txn->getServiceContext()) + collation = uassertStatusOK(CollatorFactoryInterface::get(opCtx->getServiceContext()) ->makeFromBSON(request.getValue().getCollation())); } else if (chunkMgr->getDefaultCollator()) { collation = chunkMgr->getDefaultCollator()->clone(); } boost::intrusive_ptr<ExpressionContext> mergeCtx = new ExpressionContext( - txn, request.getValue(), std::move(collation), std::move(resolvedNamespaces)); + opCtx, request.getValue(), std::move(collation), std::move(resolvedNamespaces)); mergeCtx->inRouter = true; // explicitly *not* setting mergeCtx->tempDir @@ -127,7 +128,7 @@ Status ClusterAggregate::runAggregate(OperationContext* txn, const bool singleShard = [&]() { BSONObj firstMatchQuery = pipeline.getValue()->getInitialQuery(); BSONObj shardKeyMatches = uassertStatusOK( - chunkMgr->getShardKeyPattern().extractShardKeyFromQuery(txn, firstMatchQuery)); + chunkMgr->getShardKeyPattern().extractShardKeyFromQuery(opCtx, firstMatchQuery)); if (shardKeyMatches.isEmpty()) { return false; @@ -176,7 +177,7 @@ Status ClusterAggregate::runAggregate(OperationContext* txn, // Run the command on the shards // TODO need to make sure cursors are killed if a retry is needed std::vector<Strategy::CommandResult> shardResults; - Strategy::commandOp(txn, + Strategy::commandOp(opCtx, namespaces.executionNss.db().toString(), shardedCommand, options, @@ -210,14 +211,14 @@ Status ClusterAggregate::runAggregate(OperationContext* txn, if (!needSplit) { invariant(shardResults.size() == 1); invariant(shardResults[0].target.getServers().size() == 1); - auto executorPool = Grid::get(txn)->getExecutorPool(); + auto executorPool = Grid::get(opCtx)->getExecutorPool(); const BSONObj reply = - uassertStatusOK(storePossibleCursor(txn, + uassertStatusOK(storePossibleCursor(opCtx, shardResults[0].target.getServers()[0], shardResults[0].result, namespaces.requestedNss, executorPool->getArbitraryExecutor(), - Grid::get(txn)->getCursorManager())); + Grid::get(opCtx)->getCursorManager())); result->appendElements(reply); return getStatusFromCommandResult(reply); } @@ -258,17 +259,17 @@ Status ClusterAggregate::runAggregate(OperationContext* txn, // Run merging command on random shard, unless a stage needs the primary shard. Need to use // ShardConnection so that the merging mongod is sent the config servers on connection init. - auto& prng = txn->getClient()->getPrng(); + auto& prng = opCtx->getClient()->getPrng(); const auto& mergingShardId = (needPrimaryShardMerger || internalQueryAlwaysMergeOnPrimaryShard.load()) ? conf->getPrimaryId() : shardResults[prng.nextInt32(shardResults.size())].shardTargetId; const auto mergingShard = - uassertStatusOK(Grid::get(txn)->shardRegistry()->getShard(txn, mergingShardId)); + uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, mergingShardId)); ShardConnection conn(mergingShard->getConnString(), outputNsOrEmpty); BSONObj mergedResults = - aggRunCommand(txn, conn.get(), namespaces, mergeCmd.freeze().toBson(), options); + aggRunCommand(opCtx, conn.get(), namespaces, mergeCmd.freeze().toBson(), options); conn.done(); if (auto wcErrorElem = mergedResults["writeConcernError"]) { @@ -385,7 +386,7 @@ void ClusterAggregate::killAllCursors(const std::vector<Strategy::CommandResult> } } -BSONObj ClusterAggregate::aggRunCommand(OperationContext* txn, +BSONObj ClusterAggregate::aggRunCommand(OperationContext* opCtx, DBClientBase* conn, const Namespaces& namespaces, BSONObj cmd, @@ -413,30 +414,30 @@ BSONObj ClusterAggregate::aggRunCommand(OperationContext* txn, throw RecvStaleConfigException("command failed because of stale config", result); } - auto executorPool = Grid::get(txn)->getExecutorPool(); - result = uassertStatusOK(storePossibleCursor(txn, + auto executorPool = Grid::get(opCtx)->getExecutorPool(); + result = uassertStatusOK(storePossibleCursor(opCtx, HostAndPort(cursor->originalHost()), result, namespaces.requestedNss, executorPool->getArbitraryExecutor(), - Grid::get(txn)->getCursorManager())); + Grid::get(opCtx)->getCursorManager())); return result; } -Status ClusterAggregate::aggPassthrough(OperationContext* txn, +Status ClusterAggregate::aggPassthrough(OperationContext* opCtx, const Namespaces& namespaces, DBConfig* conf, BSONObj cmdObj, BSONObjBuilder* out, int queryOptions) { // Temporary hack. See comment on declaration for details. - auto shardStatus = Grid::get(txn)->shardRegistry()->getShard(txn, conf->getPrimaryId()); + auto shardStatus = Grid::get(opCtx)->shardRegistry()->getShard(opCtx, conf->getPrimaryId()); if (!shardStatus.isOK()) { return shardStatus.getStatus(); } ShardConnection conn(shardStatus.getValue()->getConnString(), ""); - BSONObj result = aggRunCommand(txn, conn.get(), namespaces, cmdObj, queryOptions); + BSONObj result = aggRunCommand(opCtx, conn.get(), namespaces, cmdObj, queryOptions); conn.done(); // First append the properly constructed writeConcernError. It will then be skipped @@ -472,7 +473,8 @@ Status ClusterAggregate::aggPassthrough(OperationContext* txn, Namespaces nsStruct; nsStruct.requestedNss = namespaces.requestedNss; nsStruct.executionNss = resolvedView.getNamespace(); - return ClusterAggregate::runAggregate(txn, nsStruct, aggCmd.getValue(), queryOptions, out); + return ClusterAggregate::runAggregate( + opCtx, nsStruct, aggCmd.getValue(), queryOptions, out); } return getStatusFromCommandResult(result); |