diff options
Diffstat (limited to 'src/mongo/s/commands/cluster_map_reduce_cmd.cpp')
-rw-r--r-- | src/mongo/s/commands/cluster_map_reduce_cmd.cpp | 62 |
1 files changed, 32 insertions, 30 deletions
diff --git a/src/mongo/s/commands/cluster_map_reduce_cmd.cpp b/src/mongo/s/commands/cluster_map_reduce_cmd.cpp index 6d4a4155365..088b8d6d4d1 100644 --- a/src/mongo/s/commands/cluster_map_reduce_cmd.cpp +++ b/src/mongo/s/commands/cluster_map_reduce_cmd.cpp @@ -182,7 +182,7 @@ public: mr::addPrivilegesRequiredForMapReduce(this, dbname, cmdObj, out); } - virtual bool run(OperationContext* txn, + virtual bool run(OperationContext* opCtx, const std::string& dbname, BSONObj& cmdObj, int options, @@ -232,7 +232,7 @@ public: } // Ensure the input database exists - auto status = Grid::get(txn)->catalogCache()->getDatabase(txn, dbname); + auto status = Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, dbname); if (!status.isOK()) { return appendCommandStatus(result, status.getStatus()); } @@ -242,7 +242,7 @@ public: shared_ptr<DBConfig> confOut; if (customOutDB) { // Create the output database implicitly, since we have a custom output requested - auto scopedDb = uassertStatusOK(ScopedShardDatabase::getOrCreate(txn, outDB)); + auto scopedDb = uassertStatusOK(ScopedShardDatabase::getOrCreate(opCtx, outDB)); confOut = scopedDb.getSharedDbReference(); } else { confOut = confIn; @@ -274,14 +274,14 @@ public: maxChunkSizeBytes = cmdObj["maxChunkSizeBytes"].numberLong(); if (maxChunkSizeBytes == 0) { maxChunkSizeBytes = - Grid::get(txn)->getBalancerConfiguration()->getMaxChunkSizeBytes(); + Grid::get(opCtx)->getBalancerConfiguration()->getMaxChunkSizeBytes(); } // maxChunkSizeBytes is sent as int BSON field invariant(maxChunkSizeBytes < std::numeric_limits<int>::max()); } - const auto shardRegistry = Grid::get(txn)->shardRegistry(); + const auto shardRegistry = Grid::get(opCtx)->shardRegistry(); // modify command to run on shards with output to tmp collection string badShardedField; @@ -292,7 +292,7 @@ public: LOG(1) << "simple MR, just passthrough"; const auto shard = - uassertStatusOK(shardRegistry->getShard(txn, confIn->getPrimaryId())); + uassertStatusOK(shardRegistry->getShard(opCtx, confIn->getPrimaryId())); ShardConnection conn(shard->getConnString(), ""); @@ -338,7 +338,7 @@ public: try { Strategy::commandOp( - txn, dbname, shardedCommand, 0, nss.ns(), q, collation, &mrCommandResults); + opCtx, dbname, shardedCommand, 0, nss.ns(), q, collation, &mrCommandResults); } catch (DBException& e) { e.addContext(str::stream() << "could not run map command on all shards for ns " << nss.ns() @@ -352,7 +352,7 @@ public: string server; { const auto shard = - uassertStatusOK(shardRegistry->getShard(txn, mrResult.shardTargetId)); + uassertStatusOK(shardRegistry->getShard(opCtx, mrResult.shardTargetId)); server = shard->getConnString().toString(); } servers.insert(server); @@ -413,7 +413,7 @@ public: finalCmd.append("inputDB", dbname); finalCmd.append("shardedOutputCollection", shardResultCollection); finalCmd.append("shards", shardResultsB.done()); - finalCmd.append("writeConcern", txn->getWriteConcern().toBSON()); + finalCmd.append("writeConcern", opCtx->getWriteConcern().toBSON()); BSONObj shardCounts = shardCountsB.done(); finalCmd.append("shardCounts", shardCounts); @@ -446,7 +446,7 @@ public: if (!shardedOutput) { const auto shard = - uassertStatusOK(shardRegistry->getShard(txn, confOut->getPrimaryId())); + uassertStatusOK(shardRegistry->getShard(opCtx, confOut->getPrimaryId())); LOG(1) << "MR with single shard output, NS=" << outputCollNss.ns() << " primary=" << shard->toString(); @@ -472,20 +472,20 @@ public: // Create the sharded collection if needed if (!confOut->isSharded(outputCollNss.ns())) { // Enable sharding on the output db - Status status = Grid::get(txn)->catalogClient(txn)->enableSharding( - txn, outputCollNss.db().toString()); + Status status = Grid::get(opCtx)->catalogClient(opCtx)->enableSharding( + opCtx, outputCollNss.db().toString()); // If the database has sharding already enabled, we can ignore the error if (status.isOK()) { // Invalidate the output database so it gets reloaded on the next fetch attempt - Grid::get(txn)->catalogCache()->invalidate(outputCollNss.db()); + Grid::get(opCtx)->catalogCache()->invalidate(outputCollNss.db()); } else if (status != ErrorCodes::AlreadyInitialized) { uassertStatusOK(status); } confOut.reset(); - confOut = uassertStatusOK(Grid::get(txn)->catalogCache()->getDatabase( - txn, outputCollNss.db().toString())); + confOut = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getDatabase( + opCtx, outputCollNss.db().toString())); // Shard collection according to split points vector<BSONObj> sortedSplitPts; @@ -523,24 +523,24 @@ public: BSONObj defaultCollation; uassertStatusOK( - Grid::get(txn)->catalogClient(txn)->shardCollection(txn, - outputCollNss.ns(), - sortKeyPattern, - defaultCollation, - true, - sortedSplitPts, - outShardIds)); + Grid::get(opCtx)->catalogClient(opCtx)->shardCollection(opCtx, + outputCollNss.ns(), + sortKeyPattern, + defaultCollation, + true, + sortedSplitPts, + outShardIds)); // Make sure the cached metadata for the collection knows that we are now sharded - confOut->getChunkManager(txn, outputCollNss.ns(), true /* reload */); + confOut->getChunkManager(opCtx, outputCollNss.ns(), true /* reload */); } auto chunkSizes = SimpleBSONObjComparator::kInstance.makeBSONObjIndexedMap<int>(); { // Take distributed lock to prevent split / migration. auto scopedDistLock = - Grid::get(txn)->catalogClient(txn)->getDistLockManager()->lock( - txn, outputCollNss.ns(), "mr-post-process", kNoDistLockTimeout); + Grid::get(opCtx)->catalogClient(opCtx)->getDistLockManager()->lock( + opCtx, outputCollNss.ns(), "mr-post-process", kNoDistLockTimeout); if (!scopedDistLock.isOK()) { return appendCommandStatus(result, scopedDistLock.getStatus()); } @@ -550,7 +550,7 @@ public: try { const BSONObj query; - Strategy::commandOp(txn, + Strategy::commandOp(opCtx, outDB, finalCmdObj, 0, @@ -570,8 +570,9 @@ public: for (const auto& mrResult : mrCommandResults) { string server; { - const auto shard = uassertStatusOK( - Grid::get(txn)->shardRegistry()->getShard(txn, mrResult.shardTargetId)); + const auto shard = + uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard( + opCtx, mrResult.shardTargetId)); server = shard->getConnString().toString(); } singleResult = mrResult.result; @@ -609,7 +610,8 @@ public: } // Do the splitting round - shared_ptr<ChunkManager> cm = confOut->getChunkManagerIfExists(txn, outputCollNss.ns()); + shared_ptr<ChunkManager> cm = + confOut->getChunkManagerIfExists(opCtx, outputCollNss.ns()); uassert(34359, str::stream() << "Failed to write mapreduce output to " << outputCollNss.ns() << "; expected that collection to be sharded, but it was not", @@ -626,7 +628,7 @@ public: warning() << "Mongod reported " << size << " bytes inserted for key " << key << " but can't find chunk"; } else { - updateChunkWriteStatsAndSplitIfNeeded(txn, cm.get(), c.get(), size); + updateChunkWriteStatsAndSplitIfNeeded(opCtx, cm.get(), c.get(), size); } } } |