From 8125c55a251805899552d0af4776930216223703 Mon Sep 17 00:00:00 2001 From: Kaloian Manassiev Date: Sun, 12 Mar 2017 17:27:34 -0400 Subject: Revert "SERVER-22611 Sharding catalog cache refactor" This reverts commit ae2518adace4ba7ed6a16eba6943bff6ea4ade10. --- src/mongo/db/commands/mr.cpp | 58 ++++++++++++++++++++++---------------------- 1 file changed, 29 insertions(+), 29 deletions(-) (limited to 'src/mongo/db/commands/mr.cpp') diff --git a/src/mongo/db/commands/mr.cpp b/src/mongo/db/commands/mr.cpp index af0c740fc70..d7b8816891f 100644 --- a/src/mongo/db/commands/mr.cpp +++ b/src/mongo/db/commands/mr.cpp @@ -68,11 +68,14 @@ #include "mongo/db/server_options.h" #include "mongo/db/service_context.h" #include "mongo/s/catalog_cache.h" +#include "mongo/s/chunk.h" +#include "mongo/s/chunk_manager.h" #include "mongo/s/client/parallel.h" #include "mongo/s/client/shard_connection.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" #include "mongo/s/shard_key_pattern.h" +#include "mongo/s/sharding_raii.h" #include "mongo/s/stale_exception.h" #include "mongo/scripting/engine.h" #include "mongo/stdx/mutex.h" @@ -1793,6 +1796,7 @@ public: state.prepTempCollection(); ON_BLOCK_EXIT_OBJ(state, &State::dropTempCollections); + BSONList values; if (!config.outputOptions.outDB.empty()) { BSONObjBuilder loc; if (!config.outputOptions.outDB.empty()) @@ -1800,29 +1804,33 @@ public: if (!config.outputOptions.collectionName.empty()) loc.append("collection", config.outputOptions.collectionName); result.append("result", loc.obj()); - } else if (!config.outputOptions.collectionName.empty()) { - result.append("result", config.outputOptions.collectionName); + } else { + if (!config.outputOptions.collectionName.empty()) + result.append("result", config.outputOptions.collectionName); } - std::vector> chunks; + auto scopedDbStatus = ScopedShardDatabase::getExisting(opCtx, dbname); + if (!scopedDbStatus.isOK()) { + return appendCommandStatus(result, scopedDbStatus.getStatus()); + } - if (config.outputOptions.outType != Config::OutputType::INMEMORY) { - auto outRoutingInfoStatus = Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo( - opCtx, config.outputOptions.finalNamespace); - if (!outRoutingInfoStatus.isOK()) { - return appendCommandStatus(result, outRoutingInfoStatus.getStatus()); - } + auto confOut = scopedDbStatus.getValue().db(); - if (auto cm = outRoutingInfoStatus.getValue().cm()) { - // Fetch result from other shards 1 chunk at a time. It would be better to do just - // one big $or query, but then the sorting would not be efficient. - const string shardName = ShardingState::get(opCtx)->getShardName(); + vector> chunks; - for (const auto& chunkEntry : cm->chunkMap()) { - const auto& chunk = chunkEntry.second; - if (chunk->getShardId() == shardName) { - chunks.push_back(chunk); - } + if (confOut->isSharded(config.outputOptions.finalNamespace.ns())) { + shared_ptr cm = + confOut->getChunkManager(opCtx, config.outputOptions.finalNamespace.ns()); + + // Fetch result from other shards 1 chunk at a time. It would be better to do just one + // big $or query, but then the sorting would not be efficient. + const string shardName = ShardingState::get(opCtx)->getShardName(); + const ChunkMap& chunkMap = cm->getChunkMap(); + + for (ChunkMap::const_iterator it = chunkMap.begin(); it != chunkMap.end(); ++it) { + shared_ptr chunk = it->second; + if (chunk->getShardId() == shardName) { + chunks.push_back(chunk); } } } @@ -1831,8 +1839,6 @@ public: unsigned int index = 0; BSONObj query; BSONArrayBuilder chunkSizes; - BSONList values; - while (true) { shared_ptr chunk; if (chunks.size() > 0) { @@ -1849,7 +1855,6 @@ public: ParallelSortClusteredCursor cursor( servers, inputNS, Query(query).sort(sortKey), QueryOption_NoCursorTimeout); cursor.init(opCtx); - int chunkSize = 0; while (cursor.more() || !values.empty()) { @@ -1875,9 +1880,7 @@ public: state.insert(config.tempNamespace, res); else state.emit(res); - values.clear(); - if (!t.isEmpty()) values.push_back(t); } @@ -1886,7 +1889,6 @@ public: chunkSizes.append(chunk->getMin()); chunkSizes.append(chunkSize); } - if (++index >= chunks.size()) break; } @@ -1905,10 +1907,8 @@ public: countsB.append("output", outputCount); result.append("counts", countsB.obj()); - return true; + return 1; } - } mapReduceFinishCommand; - -} // namespace -} // namespace mongo +} +} -- cgit v1.2.1