summaryrefslogtreecommitdiff
path: root/src/mongo/s/commands/cluster_map_reduce_cmd.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/s/commands/cluster_map_reduce_cmd.cpp')
-rw-r--r--src/mongo/s/commands/cluster_map_reduce_cmd.cpp62
1 files changed, 32 insertions, 30 deletions
diff --git a/src/mongo/s/commands/cluster_map_reduce_cmd.cpp b/src/mongo/s/commands/cluster_map_reduce_cmd.cpp
index 6d4a4155365..088b8d6d4d1 100644
--- a/src/mongo/s/commands/cluster_map_reduce_cmd.cpp
+++ b/src/mongo/s/commands/cluster_map_reduce_cmd.cpp
@@ -182,7 +182,7 @@ public:
mr::addPrivilegesRequiredForMapReduce(this, dbname, cmdObj, out);
}
- virtual bool run(OperationContext* txn,
+ virtual bool run(OperationContext* opCtx,
const std::string& dbname,
BSONObj& cmdObj,
int options,
@@ -232,7 +232,7 @@ public:
}
// Ensure the input database exists
- auto status = Grid::get(txn)->catalogCache()->getDatabase(txn, dbname);
+ auto status = Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, dbname);
if (!status.isOK()) {
return appendCommandStatus(result, status.getStatus());
}
@@ -242,7 +242,7 @@ public:
shared_ptr<DBConfig> confOut;
if (customOutDB) {
// Create the output database implicitly, since we have a custom output requested
- auto scopedDb = uassertStatusOK(ScopedShardDatabase::getOrCreate(txn, outDB));
+ auto scopedDb = uassertStatusOK(ScopedShardDatabase::getOrCreate(opCtx, outDB));
confOut = scopedDb.getSharedDbReference();
} else {
confOut = confIn;
@@ -274,14 +274,14 @@ public:
maxChunkSizeBytes = cmdObj["maxChunkSizeBytes"].numberLong();
if (maxChunkSizeBytes == 0) {
maxChunkSizeBytes =
- Grid::get(txn)->getBalancerConfiguration()->getMaxChunkSizeBytes();
+ Grid::get(opCtx)->getBalancerConfiguration()->getMaxChunkSizeBytes();
}
// maxChunkSizeBytes is sent as int BSON field
invariant(maxChunkSizeBytes < std::numeric_limits<int>::max());
}
- const auto shardRegistry = Grid::get(txn)->shardRegistry();
+ const auto shardRegistry = Grid::get(opCtx)->shardRegistry();
// modify command to run on shards with output to tmp collection
string badShardedField;
@@ -292,7 +292,7 @@ public:
LOG(1) << "simple MR, just passthrough";
const auto shard =
- uassertStatusOK(shardRegistry->getShard(txn, confIn->getPrimaryId()));
+ uassertStatusOK(shardRegistry->getShard(opCtx, confIn->getPrimaryId()));
ShardConnection conn(shard->getConnString(), "");
@@ -338,7 +338,7 @@ public:
try {
Strategy::commandOp(
- txn, dbname, shardedCommand, 0, nss.ns(), q, collation, &mrCommandResults);
+ opCtx, dbname, shardedCommand, 0, nss.ns(), q, collation, &mrCommandResults);
} catch (DBException& e) {
e.addContext(str::stream() << "could not run map command on all shards for ns "
<< nss.ns()
@@ -352,7 +352,7 @@ public:
string server;
{
const auto shard =
- uassertStatusOK(shardRegistry->getShard(txn, mrResult.shardTargetId));
+ uassertStatusOK(shardRegistry->getShard(opCtx, mrResult.shardTargetId));
server = shard->getConnString().toString();
}
servers.insert(server);
@@ -413,7 +413,7 @@ public:
finalCmd.append("inputDB", dbname);
finalCmd.append("shardedOutputCollection", shardResultCollection);
finalCmd.append("shards", shardResultsB.done());
- finalCmd.append("writeConcern", txn->getWriteConcern().toBSON());
+ finalCmd.append("writeConcern", opCtx->getWriteConcern().toBSON());
BSONObj shardCounts = shardCountsB.done();
finalCmd.append("shardCounts", shardCounts);
@@ -446,7 +446,7 @@ public:
if (!shardedOutput) {
const auto shard =
- uassertStatusOK(shardRegistry->getShard(txn, confOut->getPrimaryId()));
+ uassertStatusOK(shardRegistry->getShard(opCtx, confOut->getPrimaryId()));
LOG(1) << "MR with single shard output, NS=" << outputCollNss.ns()
<< " primary=" << shard->toString();
@@ -472,20 +472,20 @@ public:
// Create the sharded collection if needed
if (!confOut->isSharded(outputCollNss.ns())) {
// Enable sharding on the output db
- Status status = Grid::get(txn)->catalogClient(txn)->enableSharding(
- txn, outputCollNss.db().toString());
+ Status status = Grid::get(opCtx)->catalogClient(opCtx)->enableSharding(
+ opCtx, outputCollNss.db().toString());
// If the database has sharding already enabled, we can ignore the error
if (status.isOK()) {
// Invalidate the output database so it gets reloaded on the next fetch attempt
- Grid::get(txn)->catalogCache()->invalidate(outputCollNss.db());
+ Grid::get(opCtx)->catalogCache()->invalidate(outputCollNss.db());
} else if (status != ErrorCodes::AlreadyInitialized) {
uassertStatusOK(status);
}
confOut.reset();
- confOut = uassertStatusOK(Grid::get(txn)->catalogCache()->getDatabase(
- txn, outputCollNss.db().toString()));
+ confOut = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getDatabase(
+ opCtx, outputCollNss.db().toString()));
// Shard collection according to split points
vector<BSONObj> sortedSplitPts;
@@ -523,24 +523,24 @@ public:
BSONObj defaultCollation;
uassertStatusOK(
- Grid::get(txn)->catalogClient(txn)->shardCollection(txn,
- outputCollNss.ns(),
- sortKeyPattern,
- defaultCollation,
- true,
- sortedSplitPts,
- outShardIds));
+ Grid::get(opCtx)->catalogClient(opCtx)->shardCollection(opCtx,
+ outputCollNss.ns(),
+ sortKeyPattern,
+ defaultCollation,
+ true,
+ sortedSplitPts,
+ outShardIds));
// Make sure the cached metadata for the collection knows that we are now sharded
- confOut->getChunkManager(txn, outputCollNss.ns(), true /* reload */);
+ confOut->getChunkManager(opCtx, outputCollNss.ns(), true /* reload */);
}
auto chunkSizes = SimpleBSONObjComparator::kInstance.makeBSONObjIndexedMap<int>();
{
// Take distributed lock to prevent split / migration.
auto scopedDistLock =
- Grid::get(txn)->catalogClient(txn)->getDistLockManager()->lock(
- txn, outputCollNss.ns(), "mr-post-process", kNoDistLockTimeout);
+ Grid::get(opCtx)->catalogClient(opCtx)->getDistLockManager()->lock(
+ opCtx, outputCollNss.ns(), "mr-post-process", kNoDistLockTimeout);
if (!scopedDistLock.isOK()) {
return appendCommandStatus(result, scopedDistLock.getStatus());
}
@@ -550,7 +550,7 @@ public:
try {
const BSONObj query;
- Strategy::commandOp(txn,
+ Strategy::commandOp(opCtx,
outDB,
finalCmdObj,
0,
@@ -570,8 +570,9 @@ public:
for (const auto& mrResult : mrCommandResults) {
string server;
{
- const auto shard = uassertStatusOK(
- Grid::get(txn)->shardRegistry()->getShard(txn, mrResult.shardTargetId));
+ const auto shard =
+ uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(
+ opCtx, mrResult.shardTargetId));
server = shard->getConnString().toString();
}
singleResult = mrResult.result;
@@ -609,7 +610,8 @@ public:
}
// Do the splitting round
- shared_ptr<ChunkManager> cm = confOut->getChunkManagerIfExists(txn, outputCollNss.ns());
+ shared_ptr<ChunkManager> cm =
+ confOut->getChunkManagerIfExists(opCtx, outputCollNss.ns());
uassert(34359,
str::stream() << "Failed to write mapreduce output to " << outputCollNss.ns()
<< "; expected that collection to be sharded, but it was not",
@@ -626,7 +628,7 @@ public:
warning() << "Mongod reported " << size << " bytes inserted for key " << key
<< " but can't find chunk";
} else {
- updateChunkWriteStatsAndSplitIfNeeded(txn, cm.get(), c.get(), size);
+ updateChunkWriteStatsAndSplitIfNeeded(opCtx, cm.get(), c.get(), size);
}
}
}