diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2016-12-27 13:08:06 -0500 |
---|---|---|
committer | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2016-12-28 09:01:04 -0500 |
commit | 5d24dd0ade8b30d9e591eda0f8a008f2f7bde881 (patch) | |
tree | f2cd55f2a7f81e9331032b1593d638f6ccd73e46 | |
parent | 11ece9372e50454a7256ecce2a793b178f43c4fe (diff) | |
download | mongo-5d24dd0ade8b30d9e591eda0f8a008f2f7bde881.tar.gz |
SERVER-27527 Remove explicit shardingEnabled check for databases in M/R and aggregation
The sharded property should be read from the collection itself and not the
database.
-rw-r--r-- | src/mongo/s/commands/cluster_aggregate.cpp | 43 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_aggregate.h | 5 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_map_reduce_cmd.cpp | 3 |
3 files changed, 25 insertions, 26 deletions
diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp index 1b687de25dd..9f62f50613f 100644 --- a/src/mongo/s/commands/cluster_aggregate.cpp +++ b/src/mongo/s/commands/cluster_aggregate.cpp @@ -47,7 +47,6 @@ #include "mongo/db/views/view.h" #include "mongo/executor/task_executor_pool.h" #include "mongo/rpc/get_status_from_command_result.h" -#include "mongo/s/catalog/catalog_cache.h" #include "mongo/s/chunk_manager.h" #include "mongo/s/client/shard_connection.h" #include "mongo/s/client/shard_registry.h" @@ -56,6 +55,7 @@ #include "mongo/s/grid.h" #include "mongo/s/query/cluster_query_knobs.h" #include "mongo/s/query/store_possible_cursor.h" +#include "mongo/s/sharding_raii.h" #include "mongo/s/stale_exception.h" #include "mongo/util/log.h" @@ -66,24 +66,20 @@ Status ClusterAggregate::runAggregate(OperationContext* txn, BSONObj cmdObj, int options, BSONObjBuilder* result) { - auto dbname = namespaces.executionNss.db().toString(); - auto status = grid.catalogCache()->getDatabase(txn, dbname); - if (!status.isOK()) { - appendEmptyResultSet(*result, status.getStatus(), namespaces.requestedNss.ns()); + auto scopedShardDbStatus = ScopedShardDatabase::getExisting(txn, namespaces.executionNss.db()); + if (!scopedShardDbStatus.isOK()) { + appendEmptyResultSet( + *result, scopedShardDbStatus.getStatus(), namespaces.requestedNss.ns()); return Status::OK(); } - std::shared_ptr<DBConfig> conf = status.getValue(); - - if (!conf->isShardingEnabled()) { - return aggPassthrough(txn, namespaces, conf, cmdObj, result, options); - } - auto request = AggregationRequest::parseFromBSON(namespaces.executionNss, cmdObj); if (!request.isOK()) { return request.getStatus(); } + const auto conf = scopedShardDbStatus.getValue().db(); + // Determine the appropriate collation and 'resolve' involved namespaces to make the // ExpressionContext. @@ -102,6 +98,7 @@ Status ClusterAggregate::runAggregate(OperationContext* txn, if (!conf->isSharded(namespaces.executionNss.ns())) { return aggPassthrough(txn, namespaces, conf, cmdObj, result, options); } + auto chunkMgr = conf->getChunkManager(txn, namespaces.executionNss.ns()); std::unique_ptr<CollatorInterface> collation; @@ -177,7 +174,7 @@ Status ClusterAggregate::runAggregate(OperationContext* txn, // TODO need to make sure cursors are killed if a retry is needed std::vector<Strategy::CommandResult> shardResults; Strategy::commandOp(txn, - dbname, + namespaces.executionNss.db().toString(), shardedCommand, options, namespaces.executionNss.ns(), @@ -210,13 +207,13 @@ Status ClusterAggregate::runAggregate(OperationContext* txn, if (!needSplit) { invariant(shardResults.size() == 1); invariant(shardResults[0].target.getServers().size() == 1); - auto executorPool = grid.getExecutorPool(); + auto executorPool = Grid::get(txn)->getExecutorPool(); const BSONObj reply = uassertStatusOK(storePossibleCursor(shardResults[0].target.getServers()[0], shardResults[0].result, namespaces.requestedNss, executorPool->getArbitraryExecutor(), - grid.getCursorManager())); + Grid::get(txn)->getCursorManager())); result->appendElements(reply); return getStatusFromCommandResult(reply); } @@ -261,11 +258,12 @@ Status ClusterAggregate::runAggregate(OperationContext* txn, const auto& mergingShardId = (needPrimaryShardMerger || internalQueryAlwaysMergeOnPrimaryShard) ? conf->getPrimaryId() : shardResults[prng.nextInt32(shardResults.size())].shardTargetId; - const auto mergingShard = uassertStatusOK(grid.shardRegistry()->getShard(txn, mergingShardId)); + const auto mergingShard = + uassertStatusOK(Grid::get(txn)->shardRegistry()->getShard(txn, mergingShardId)); ShardConnection conn(mergingShard->getConnString(), outputNsOrEmpty); BSONObj mergedResults = - aggRunCommand(conn.get(), namespaces, mergeCmd.freeze().toBson(), options); + aggRunCommand(txn, conn.get(), namespaces, mergeCmd.freeze().toBson(), options); conn.done(); if (auto wcErrorElem = mergedResults["writeConcernError"]) { @@ -382,7 +380,8 @@ void ClusterAggregate::killAllCursors(const std::vector<Strategy::CommandResult> } } -BSONObj ClusterAggregate::aggRunCommand(DBClientBase* conn, +BSONObj ClusterAggregate::aggRunCommand(OperationContext* txn, + DBClientBase* conn, const Namespaces& namespaces, BSONObj cmd, int queryOptions) { @@ -409,29 +408,29 @@ BSONObj ClusterAggregate::aggRunCommand(DBClientBase* conn, throw RecvStaleConfigException("command failed because of stale config", result); } - auto executorPool = grid.getExecutorPool(); + auto executorPool = Grid::get(txn)->getExecutorPool(); result = uassertStatusOK(storePossibleCursor(HostAndPort(cursor->originalHost()), result, namespaces.requestedNss, executorPool->getArbitraryExecutor(), - grid.getCursorManager())); + Grid::get(txn)->getCursorManager())); return result; } Status ClusterAggregate::aggPassthrough(OperationContext* txn, const Namespaces& namespaces, - std::shared_ptr<DBConfig> conf, + DBConfig* conf, BSONObj cmdObj, BSONObjBuilder* out, int queryOptions) { // Temporary hack. See comment on declaration for details. - auto shardStatus = grid.shardRegistry()->getShard(txn, conf->getPrimaryId()); + auto shardStatus = Grid::get(txn)->shardRegistry()->getShard(txn, conf->getPrimaryId()); if (!shardStatus.isOK()) { return shardStatus.getStatus(); } ShardConnection conn(shardStatus.getValue()->getConnString(), ""); - BSONObj result = aggRunCommand(conn.get(), namespaces, cmdObj, queryOptions); + BSONObj result = aggRunCommand(txn, conn.get(), namespaces, cmdObj, queryOptions); conn.done(); // First append the properly constructed writeConcernError. It will then be skipped diff --git a/src/mongo/s/commands/cluster_aggregate.h b/src/mongo/s/commands/cluster_aggregate.h index 969f62ec907..da8e7bb46bc 100644 --- a/src/mongo/s/commands/cluster_aggregate.h +++ b/src/mongo/s/commands/cluster_aggregate.h @@ -82,14 +82,15 @@ private: // could be different from conn->getServerAddress() for connections that map to // multiple servers such as for replica sets. These also take care of registering // returned cursors. - static BSONObj aggRunCommand(DBClientBase* conn, + static BSONObj aggRunCommand(OperationContext* txn, + DBClientBase* conn, const Namespaces& namespaces, BSONObj cmd, int queryOptions); static Status aggPassthrough(OperationContext* txn, const Namespaces& namespaces, - std::shared_ptr<DBConfig> conf, + DBConfig* conf, BSONObj cmd, BSONObjBuilder* result, int queryOptions); diff --git a/src/mongo/s/commands/cluster_map_reduce_cmd.cpp b/src/mongo/s/commands/cluster_map_reduce_cmd.cpp index 8d3a121c5a7..be4b489ff39 100644 --- a/src/mongo/s/commands/cluster_map_reduce_cmd.cpp +++ b/src/mongo/s/commands/cluster_map_reduce_cmd.cpp @@ -256,8 +256,7 @@ public: << " which lives on config servers")); } - const bool shardedInput = - confIn && confIn->isShardingEnabled() && confIn->isSharded(nss.ns()); + const bool shardedInput = confIn && confIn->isSharded(nss.ns()); if (!shardedOutput) { uassert(15920, |