diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2017-03-12 17:27:43 -0400 |
---|---|---|
committer | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2017-03-12 17:28:36 -0400 |
commit | 39e06c9ef8c797ad626956b564ac9ebe295cbaf3 (patch) | |
tree | bfa2742fe1a814980def015b29dc8d5bfaf4bad3 /src/mongo/db/commands/mr.cpp | |
parent | 8125c55a251805899552d0af4776930216223703 (diff) | |
download | mongo-39e06c9ef8c797ad626956b564ac9ebe295cbaf3.tar.gz |
SERVER-22611 Sharding catalog cache refactor
Diffstat (limited to 'src/mongo/db/commands/mr.cpp')
-rw-r--r-- | src/mongo/db/commands/mr.cpp | 58 |
1 files changed, 29 insertions, 29 deletions
diff --git a/src/mongo/db/commands/mr.cpp b/src/mongo/db/commands/mr.cpp index d7b8816891f..af0c740fc70 100644 --- a/src/mongo/db/commands/mr.cpp +++ b/src/mongo/db/commands/mr.cpp @@ -68,14 +68,11 @@ #include "mongo/db/server_options.h" #include "mongo/db/service_context.h" #include "mongo/s/catalog_cache.h" -#include "mongo/s/chunk.h" -#include "mongo/s/chunk_manager.h" #include "mongo/s/client/parallel.h" #include "mongo/s/client/shard_connection.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" #include "mongo/s/shard_key_pattern.h" -#include "mongo/s/sharding_raii.h" #include "mongo/s/stale_exception.h" #include "mongo/scripting/engine.h" #include "mongo/stdx/mutex.h" @@ -1796,7 +1793,6 @@ public: state.prepTempCollection(); ON_BLOCK_EXIT_OBJ(state, &State::dropTempCollections); - BSONList values; if (!config.outputOptions.outDB.empty()) { BSONObjBuilder loc; if (!config.outputOptions.outDB.empty()) @@ -1804,33 +1800,29 @@ public: if (!config.outputOptions.collectionName.empty()) loc.append("collection", config.outputOptions.collectionName); result.append("result", loc.obj()); - } else { - if (!config.outputOptions.collectionName.empty()) - result.append("result", config.outputOptions.collectionName); - } - - auto scopedDbStatus = ScopedShardDatabase::getExisting(opCtx, dbname); - if (!scopedDbStatus.isOK()) { - return appendCommandStatus(result, scopedDbStatus.getStatus()); + } else if (!config.outputOptions.collectionName.empty()) { + result.append("result", config.outputOptions.collectionName); } - auto confOut = scopedDbStatus.getValue().db(); + std::vector<std::shared_ptr<Chunk>> chunks; - vector<shared_ptr<Chunk>> chunks; - - if (confOut->isSharded(config.outputOptions.finalNamespace.ns())) { - shared_ptr<ChunkManager> cm = - confOut->getChunkManager(opCtx, config.outputOptions.finalNamespace.ns()); + if (config.outputOptions.outType != Config::OutputType::INMEMORY) { + auto outRoutingInfoStatus = Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo( + opCtx, config.outputOptions.finalNamespace); + if (!outRoutingInfoStatus.isOK()) { + return appendCommandStatus(result, outRoutingInfoStatus.getStatus()); + } - // Fetch result from other shards 1 chunk at a time. It would be better to do just one - // big $or query, but then the sorting would not be efficient. - const string shardName = ShardingState::get(opCtx)->getShardName(); - const ChunkMap& chunkMap = cm->getChunkMap(); + if (auto cm = outRoutingInfoStatus.getValue().cm()) { + // Fetch result from other shards 1 chunk at a time. It would be better to do just + // one big $or query, but then the sorting would not be efficient. + const string shardName = ShardingState::get(opCtx)->getShardName(); - for (ChunkMap::const_iterator it = chunkMap.begin(); it != chunkMap.end(); ++it) { - shared_ptr<Chunk> chunk = it->second; - if (chunk->getShardId() == shardName) { - chunks.push_back(chunk); + for (const auto& chunkEntry : cm->chunkMap()) { + const auto& chunk = chunkEntry.second; + if (chunk->getShardId() == shardName) { + chunks.push_back(chunk); + } } } } @@ -1839,6 +1831,8 @@ public: unsigned int index = 0; BSONObj query; BSONArrayBuilder chunkSizes; + BSONList values; + while (true) { shared_ptr<Chunk> chunk; if (chunks.size() > 0) { @@ -1855,6 +1849,7 @@ public: ParallelSortClusteredCursor cursor( servers, inputNS, Query(query).sort(sortKey), QueryOption_NoCursorTimeout); cursor.init(opCtx); + int chunkSize = 0; while (cursor.more() || !values.empty()) { @@ -1880,7 +1875,9 @@ public: state.insert(config.tempNamespace, res); else state.emit(res); + values.clear(); + if (!t.isEmpty()) values.push_back(t); } @@ -1889,6 +1886,7 @@ public: chunkSizes.append(chunk->getMin()); chunkSizes.append(chunkSize); } + if (++index >= chunks.size()) break; } @@ -1907,8 +1905,10 @@ public: countsB.append("output", outputCount); result.append("counts", countsB.obj()); - return 1; + return true; } + } mapReduceFinishCommand; -} -} + +} // namespace +} // namespace mongo |