summaryrefslogtreecommitdiff
path: root/src/mongo/s/commands/cluster_aggregate.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/s/commands/cluster_aggregate.cpp')
-rw-r--r--src/mongo/s/commands/cluster_aggregate.cpp46
1 files changed, 24 insertions, 22 deletions
diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp
index 07bf5dcb5c9..a6887ea0498 100644
--- a/src/mongo/s/commands/cluster_aggregate.cpp
+++ b/src/mongo/s/commands/cluster_aggregate.cpp
@@ -61,12 +61,13 @@
namespace mongo {
-Status ClusterAggregate::runAggregate(OperationContext* txn,
+Status ClusterAggregate::runAggregate(OperationContext* opCtx,
const Namespaces& namespaces,
BSONObj cmdObj,
int options,
BSONObjBuilder* result) {
- auto scopedShardDbStatus = ScopedShardDatabase::getExisting(txn, namespaces.executionNss.db());
+ auto scopedShardDbStatus =
+ ScopedShardDatabase::getExisting(opCtx, namespaces.executionNss.db());
if (!scopedShardDbStatus.isOK()) {
appendEmptyResultSet(
*result, scopedShardDbStatus.getStatus(), namespaces.requestedNss.ns());
@@ -96,21 +97,21 @@ Status ClusterAggregate::runAggregate(OperationContext* txn,
}
if (!conf->isSharded(namespaces.executionNss.ns())) {
- return aggPassthrough(txn, namespaces, conf, cmdObj, result, options);
+ return aggPassthrough(opCtx, namespaces, conf, cmdObj, result, options);
}
- auto chunkMgr = conf->getChunkManager(txn, namespaces.executionNss.ns());
+ auto chunkMgr = conf->getChunkManager(opCtx, namespaces.executionNss.ns());
std::unique_ptr<CollatorInterface> collation;
if (!request.getValue().getCollation().isEmpty()) {
- collation = uassertStatusOK(CollatorFactoryInterface::get(txn->getServiceContext())
+ collation = uassertStatusOK(CollatorFactoryInterface::get(opCtx->getServiceContext())
->makeFromBSON(request.getValue().getCollation()));
} else if (chunkMgr->getDefaultCollator()) {
collation = chunkMgr->getDefaultCollator()->clone();
}
boost::intrusive_ptr<ExpressionContext> mergeCtx = new ExpressionContext(
- txn, request.getValue(), std::move(collation), std::move(resolvedNamespaces));
+ opCtx, request.getValue(), std::move(collation), std::move(resolvedNamespaces));
mergeCtx->inRouter = true;
// explicitly *not* setting mergeCtx->tempDir
@@ -127,7 +128,7 @@ Status ClusterAggregate::runAggregate(OperationContext* txn,
const bool singleShard = [&]() {
BSONObj firstMatchQuery = pipeline.getValue()->getInitialQuery();
BSONObj shardKeyMatches = uassertStatusOK(
- chunkMgr->getShardKeyPattern().extractShardKeyFromQuery(txn, firstMatchQuery));
+ chunkMgr->getShardKeyPattern().extractShardKeyFromQuery(opCtx, firstMatchQuery));
if (shardKeyMatches.isEmpty()) {
return false;
@@ -176,7 +177,7 @@ Status ClusterAggregate::runAggregate(OperationContext* txn,
// Run the command on the shards
// TODO need to make sure cursors are killed if a retry is needed
std::vector<Strategy::CommandResult> shardResults;
- Strategy::commandOp(txn,
+ Strategy::commandOp(opCtx,
namespaces.executionNss.db().toString(),
shardedCommand,
options,
@@ -210,14 +211,14 @@ Status ClusterAggregate::runAggregate(OperationContext* txn,
if (!needSplit) {
invariant(shardResults.size() == 1);
invariant(shardResults[0].target.getServers().size() == 1);
- auto executorPool = Grid::get(txn)->getExecutorPool();
+ auto executorPool = Grid::get(opCtx)->getExecutorPool();
const BSONObj reply =
- uassertStatusOK(storePossibleCursor(txn,
+ uassertStatusOK(storePossibleCursor(opCtx,
shardResults[0].target.getServers()[0],
shardResults[0].result,
namespaces.requestedNss,
executorPool->getArbitraryExecutor(),
- Grid::get(txn)->getCursorManager()));
+ Grid::get(opCtx)->getCursorManager()));
result->appendElements(reply);
return getStatusFromCommandResult(reply);
}
@@ -258,17 +259,17 @@ Status ClusterAggregate::runAggregate(OperationContext* txn,
// Run merging command on random shard, unless a stage needs the primary shard. Need to use
// ShardConnection so that the merging mongod is sent the config servers on connection init.
- auto& prng = txn->getClient()->getPrng();
+ auto& prng = opCtx->getClient()->getPrng();
const auto& mergingShardId =
(needPrimaryShardMerger || internalQueryAlwaysMergeOnPrimaryShard.load())
? conf->getPrimaryId()
: shardResults[prng.nextInt32(shardResults.size())].shardTargetId;
const auto mergingShard =
- uassertStatusOK(Grid::get(txn)->shardRegistry()->getShard(txn, mergingShardId));
+ uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, mergingShardId));
ShardConnection conn(mergingShard->getConnString(), outputNsOrEmpty);
BSONObj mergedResults =
- aggRunCommand(txn, conn.get(), namespaces, mergeCmd.freeze().toBson(), options);
+ aggRunCommand(opCtx, conn.get(), namespaces, mergeCmd.freeze().toBson(), options);
conn.done();
if (auto wcErrorElem = mergedResults["writeConcernError"]) {
@@ -385,7 +386,7 @@ void ClusterAggregate::killAllCursors(const std::vector<Strategy::CommandResult>
}
}
-BSONObj ClusterAggregate::aggRunCommand(OperationContext* txn,
+BSONObj ClusterAggregate::aggRunCommand(OperationContext* opCtx,
DBClientBase* conn,
const Namespaces& namespaces,
BSONObj cmd,
@@ -413,30 +414,30 @@ BSONObj ClusterAggregate::aggRunCommand(OperationContext* txn,
throw RecvStaleConfigException("command failed because of stale config", result);
}
- auto executorPool = Grid::get(txn)->getExecutorPool();
- result = uassertStatusOK(storePossibleCursor(txn,
+ auto executorPool = Grid::get(opCtx)->getExecutorPool();
+ result = uassertStatusOK(storePossibleCursor(opCtx,
HostAndPort(cursor->originalHost()),
result,
namespaces.requestedNss,
executorPool->getArbitraryExecutor(),
- Grid::get(txn)->getCursorManager()));
+ Grid::get(opCtx)->getCursorManager()));
return result;
}
-Status ClusterAggregate::aggPassthrough(OperationContext* txn,
+Status ClusterAggregate::aggPassthrough(OperationContext* opCtx,
const Namespaces& namespaces,
DBConfig* conf,
BSONObj cmdObj,
BSONObjBuilder* out,
int queryOptions) {
// Temporary hack. See comment on declaration for details.
- auto shardStatus = Grid::get(txn)->shardRegistry()->getShard(txn, conf->getPrimaryId());
+ auto shardStatus = Grid::get(opCtx)->shardRegistry()->getShard(opCtx, conf->getPrimaryId());
if (!shardStatus.isOK()) {
return shardStatus.getStatus();
}
ShardConnection conn(shardStatus.getValue()->getConnString(), "");
- BSONObj result = aggRunCommand(txn, conn.get(), namespaces, cmdObj, queryOptions);
+ BSONObj result = aggRunCommand(opCtx, conn.get(), namespaces, cmdObj, queryOptions);
conn.done();
// First append the properly constructed writeConcernError. It will then be skipped
@@ -472,7 +473,8 @@ Status ClusterAggregate::aggPassthrough(OperationContext* txn,
Namespaces nsStruct;
nsStruct.requestedNss = namespaces.requestedNss;
nsStruct.executionNss = resolvedView.getNamespace();
- return ClusterAggregate::runAggregate(txn, nsStruct, aggCmd.getValue(), queryOptions, out);
+ return ClusterAggregate::runAggregate(
+ opCtx, nsStruct, aggCmd.getValue(), queryOptions, out);
}
return getStatusFromCommandResult(result);