summaryrefslogtreecommitdiff
path: root/src/mongo/db/commands/mr.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/commands/mr.cpp')
-rw-r--r--src/mongo/db/commands/mr.cpp58
1 files changed, 29 insertions, 29 deletions
diff --git a/src/mongo/db/commands/mr.cpp b/src/mongo/db/commands/mr.cpp
index d7b8816891f..af0c740fc70 100644
--- a/src/mongo/db/commands/mr.cpp
+++ b/src/mongo/db/commands/mr.cpp
@@ -68,14 +68,11 @@
#include "mongo/db/server_options.h"
#include "mongo/db/service_context.h"
#include "mongo/s/catalog_cache.h"
-#include "mongo/s/chunk.h"
-#include "mongo/s/chunk_manager.h"
#include "mongo/s/client/parallel.h"
#include "mongo/s/client/shard_connection.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/grid.h"
#include "mongo/s/shard_key_pattern.h"
-#include "mongo/s/sharding_raii.h"
#include "mongo/s/stale_exception.h"
#include "mongo/scripting/engine.h"
#include "mongo/stdx/mutex.h"
@@ -1796,7 +1793,6 @@ public:
state.prepTempCollection();
ON_BLOCK_EXIT_OBJ(state, &State::dropTempCollections);
- BSONList values;
if (!config.outputOptions.outDB.empty()) {
BSONObjBuilder loc;
if (!config.outputOptions.outDB.empty())
@@ -1804,33 +1800,29 @@ public:
if (!config.outputOptions.collectionName.empty())
loc.append("collection", config.outputOptions.collectionName);
result.append("result", loc.obj());
- } else {
- if (!config.outputOptions.collectionName.empty())
- result.append("result", config.outputOptions.collectionName);
- }
-
- auto scopedDbStatus = ScopedShardDatabase::getExisting(opCtx, dbname);
- if (!scopedDbStatus.isOK()) {
- return appendCommandStatus(result, scopedDbStatus.getStatus());
+ } else if (!config.outputOptions.collectionName.empty()) {
+ result.append("result", config.outputOptions.collectionName);
}
- auto confOut = scopedDbStatus.getValue().db();
+ std::vector<std::shared_ptr<Chunk>> chunks;
- vector<shared_ptr<Chunk>> chunks;
-
- if (confOut->isSharded(config.outputOptions.finalNamespace.ns())) {
- shared_ptr<ChunkManager> cm =
- confOut->getChunkManager(opCtx, config.outputOptions.finalNamespace.ns());
+ if (config.outputOptions.outType != Config::OutputType::INMEMORY) {
+ auto outRoutingInfoStatus = Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(
+ opCtx, config.outputOptions.finalNamespace);
+ if (!outRoutingInfoStatus.isOK()) {
+ return appendCommandStatus(result, outRoutingInfoStatus.getStatus());
+ }
- // Fetch result from other shards 1 chunk at a time. It would be better to do just one
- // big $or query, but then the sorting would not be efficient.
- const string shardName = ShardingState::get(opCtx)->getShardName();
- const ChunkMap& chunkMap = cm->getChunkMap();
+ if (auto cm = outRoutingInfoStatus.getValue().cm()) {
+ // Fetch result from other shards 1 chunk at a time. It would be better to do just
+ // one big $or query, but then the sorting would not be efficient.
+ const string shardName = ShardingState::get(opCtx)->getShardName();
- for (ChunkMap::const_iterator it = chunkMap.begin(); it != chunkMap.end(); ++it) {
- shared_ptr<Chunk> chunk = it->second;
- if (chunk->getShardId() == shardName) {
- chunks.push_back(chunk);
+ for (const auto& chunkEntry : cm->chunkMap()) {
+ const auto& chunk = chunkEntry.second;
+ if (chunk->getShardId() == shardName) {
+ chunks.push_back(chunk);
+ }
}
}
}
@@ -1839,6 +1831,8 @@ public:
unsigned int index = 0;
BSONObj query;
BSONArrayBuilder chunkSizes;
+ BSONList values;
+
while (true) {
shared_ptr<Chunk> chunk;
if (chunks.size() > 0) {
@@ -1855,6 +1849,7 @@ public:
ParallelSortClusteredCursor cursor(
servers, inputNS, Query(query).sort(sortKey), QueryOption_NoCursorTimeout);
cursor.init(opCtx);
+
int chunkSize = 0;
while (cursor.more() || !values.empty()) {
@@ -1880,7 +1875,9 @@ public:
state.insert(config.tempNamespace, res);
else
state.emit(res);
+
values.clear();
+
if (!t.isEmpty())
values.push_back(t);
}
@@ -1889,6 +1886,7 @@ public:
chunkSizes.append(chunk->getMin());
chunkSizes.append(chunkSize);
}
+
if (++index >= chunks.size())
break;
}
@@ -1907,8 +1905,10 @@ public:
countsB.append("output", outputCount);
result.append("counts", countsB.obj());
- return 1;
+ return true;
}
+
} mapReduceFinishCommand;
-}
-}
+
+} // namespace
+} // namespace mongo