summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2016-12-27 13:08:06 -0500
committerKaloian Manassiev <kaloian.manassiev@mongodb.com>2016-12-28 09:01:04 -0500
commit5d24dd0ade8b30d9e591eda0f8a008f2f7bde881 (patch)
treef2cd55f2a7f81e9331032b1593d638f6ccd73e46
parent11ece9372e50454a7256ecce2a793b178f43c4fe (diff)
downloadmongo-5d24dd0ade8b30d9e591eda0f8a008f2f7bde881.tar.gz
SERVER-27527 Remove explicit shardingEnabled check for databases in M/R and aggregation
The sharded property should be read from the collection itself and not the database.
-rw-r--r--src/mongo/s/commands/cluster_aggregate.cpp43
-rw-r--r--src/mongo/s/commands/cluster_aggregate.h5
-rw-r--r--src/mongo/s/commands/cluster_map_reduce_cmd.cpp3
3 files changed, 25 insertions, 26 deletions
diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp
index 1b687de25dd..9f62f50613f 100644
--- a/src/mongo/s/commands/cluster_aggregate.cpp
+++ b/src/mongo/s/commands/cluster_aggregate.cpp
@@ -47,7 +47,6 @@
#include "mongo/db/views/view.h"
#include "mongo/executor/task_executor_pool.h"
#include "mongo/rpc/get_status_from_command_result.h"
-#include "mongo/s/catalog/catalog_cache.h"
#include "mongo/s/chunk_manager.h"
#include "mongo/s/client/shard_connection.h"
#include "mongo/s/client/shard_registry.h"
@@ -56,6 +55,7 @@
#include "mongo/s/grid.h"
#include "mongo/s/query/cluster_query_knobs.h"
#include "mongo/s/query/store_possible_cursor.h"
+#include "mongo/s/sharding_raii.h"
#include "mongo/s/stale_exception.h"
#include "mongo/util/log.h"
@@ -66,24 +66,20 @@ Status ClusterAggregate::runAggregate(OperationContext* txn,
BSONObj cmdObj,
int options,
BSONObjBuilder* result) {
- auto dbname = namespaces.executionNss.db().toString();
- auto status = grid.catalogCache()->getDatabase(txn, dbname);
- if (!status.isOK()) {
- appendEmptyResultSet(*result, status.getStatus(), namespaces.requestedNss.ns());
+ auto scopedShardDbStatus = ScopedShardDatabase::getExisting(txn, namespaces.executionNss.db());
+ if (!scopedShardDbStatus.isOK()) {
+ appendEmptyResultSet(
+ *result, scopedShardDbStatus.getStatus(), namespaces.requestedNss.ns());
return Status::OK();
}
- std::shared_ptr<DBConfig> conf = status.getValue();
-
- if (!conf->isShardingEnabled()) {
- return aggPassthrough(txn, namespaces, conf, cmdObj, result, options);
- }
-
auto request = AggregationRequest::parseFromBSON(namespaces.executionNss, cmdObj);
if (!request.isOK()) {
return request.getStatus();
}
+ const auto conf = scopedShardDbStatus.getValue().db();
+
// Determine the appropriate collation and 'resolve' involved namespaces to make the
// ExpressionContext.
@@ -102,6 +98,7 @@ Status ClusterAggregate::runAggregate(OperationContext* txn,
if (!conf->isSharded(namespaces.executionNss.ns())) {
return aggPassthrough(txn, namespaces, conf, cmdObj, result, options);
}
+
auto chunkMgr = conf->getChunkManager(txn, namespaces.executionNss.ns());
std::unique_ptr<CollatorInterface> collation;
@@ -177,7 +174,7 @@ Status ClusterAggregate::runAggregate(OperationContext* txn,
// TODO need to make sure cursors are killed if a retry is needed
std::vector<Strategy::CommandResult> shardResults;
Strategy::commandOp(txn,
- dbname,
+ namespaces.executionNss.db().toString(),
shardedCommand,
options,
namespaces.executionNss.ns(),
@@ -210,13 +207,13 @@ Status ClusterAggregate::runAggregate(OperationContext* txn,
if (!needSplit) {
invariant(shardResults.size() == 1);
invariant(shardResults[0].target.getServers().size() == 1);
- auto executorPool = grid.getExecutorPool();
+ auto executorPool = Grid::get(txn)->getExecutorPool();
const BSONObj reply =
uassertStatusOK(storePossibleCursor(shardResults[0].target.getServers()[0],
shardResults[0].result,
namespaces.requestedNss,
executorPool->getArbitraryExecutor(),
- grid.getCursorManager()));
+ Grid::get(txn)->getCursorManager()));
result->appendElements(reply);
return getStatusFromCommandResult(reply);
}
@@ -261,11 +258,12 @@ Status ClusterAggregate::runAggregate(OperationContext* txn,
const auto& mergingShardId = (needPrimaryShardMerger || internalQueryAlwaysMergeOnPrimaryShard)
? conf->getPrimaryId()
: shardResults[prng.nextInt32(shardResults.size())].shardTargetId;
- const auto mergingShard = uassertStatusOK(grid.shardRegistry()->getShard(txn, mergingShardId));
+ const auto mergingShard =
+ uassertStatusOK(Grid::get(txn)->shardRegistry()->getShard(txn, mergingShardId));
ShardConnection conn(mergingShard->getConnString(), outputNsOrEmpty);
BSONObj mergedResults =
- aggRunCommand(conn.get(), namespaces, mergeCmd.freeze().toBson(), options);
+ aggRunCommand(txn, conn.get(), namespaces, mergeCmd.freeze().toBson(), options);
conn.done();
if (auto wcErrorElem = mergedResults["writeConcernError"]) {
@@ -382,7 +380,8 @@ void ClusterAggregate::killAllCursors(const std::vector<Strategy::CommandResult>
}
}
-BSONObj ClusterAggregate::aggRunCommand(DBClientBase* conn,
+BSONObj ClusterAggregate::aggRunCommand(OperationContext* txn,
+ DBClientBase* conn,
const Namespaces& namespaces,
BSONObj cmd,
int queryOptions) {
@@ -409,29 +408,29 @@ BSONObj ClusterAggregate::aggRunCommand(DBClientBase* conn,
throw RecvStaleConfigException("command failed because of stale config", result);
}
- auto executorPool = grid.getExecutorPool();
+ auto executorPool = Grid::get(txn)->getExecutorPool();
result = uassertStatusOK(storePossibleCursor(HostAndPort(cursor->originalHost()),
result,
namespaces.requestedNss,
executorPool->getArbitraryExecutor(),
- grid.getCursorManager()));
+ Grid::get(txn)->getCursorManager()));
return result;
}
Status ClusterAggregate::aggPassthrough(OperationContext* txn,
const Namespaces& namespaces,
- std::shared_ptr<DBConfig> conf,
+ DBConfig* conf,
BSONObj cmdObj,
BSONObjBuilder* out,
int queryOptions) {
// Temporary hack. See comment on declaration for details.
- auto shardStatus = grid.shardRegistry()->getShard(txn, conf->getPrimaryId());
+ auto shardStatus = Grid::get(txn)->shardRegistry()->getShard(txn, conf->getPrimaryId());
if (!shardStatus.isOK()) {
return shardStatus.getStatus();
}
ShardConnection conn(shardStatus.getValue()->getConnString(), "");
- BSONObj result = aggRunCommand(conn.get(), namespaces, cmdObj, queryOptions);
+ BSONObj result = aggRunCommand(txn, conn.get(), namespaces, cmdObj, queryOptions);
conn.done();
// First append the properly constructed writeConcernError. It will then be skipped
diff --git a/src/mongo/s/commands/cluster_aggregate.h b/src/mongo/s/commands/cluster_aggregate.h
index 969f62ec907..da8e7bb46bc 100644
--- a/src/mongo/s/commands/cluster_aggregate.h
+++ b/src/mongo/s/commands/cluster_aggregate.h
@@ -82,14 +82,15 @@ private:
// could be different from conn->getServerAddress() for connections that map to
// multiple servers such as for replica sets. These also take care of registering
// returned cursors.
- static BSONObj aggRunCommand(DBClientBase* conn,
+ static BSONObj aggRunCommand(OperationContext* txn,
+ DBClientBase* conn,
const Namespaces& namespaces,
BSONObj cmd,
int queryOptions);
static Status aggPassthrough(OperationContext* txn,
const Namespaces& namespaces,
- std::shared_ptr<DBConfig> conf,
+ DBConfig* conf,
BSONObj cmd,
BSONObjBuilder* result,
int queryOptions);
diff --git a/src/mongo/s/commands/cluster_map_reduce_cmd.cpp b/src/mongo/s/commands/cluster_map_reduce_cmd.cpp
index 8d3a121c5a7..be4b489ff39 100644
--- a/src/mongo/s/commands/cluster_map_reduce_cmd.cpp
+++ b/src/mongo/s/commands/cluster_map_reduce_cmd.cpp
@@ -256,8 +256,7 @@ public:
<< " which lives on config servers"));
}
- const bool shardedInput =
- confIn && confIn->isShardingEnabled() && confIn->isSharded(nss.ns());
+ const bool shardedInput = confIn && confIn->isSharded(nss.ns());
if (!shardedOutput) {
uassert(15920,