summaryrefslogtreecommitdiff
path: root/src/mongo/s
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2016-04-28 16:41:40 -0400
committerKaloian Manassiev <kaloian.manassiev@mongodb.com>2016-05-05 10:58:37 -0400
commiteb2636bcc2d6aed84d07dae2af34743a7863571d (patch)
tree8eaddd996cc6d60ba25dc4497f20b67d58ccc0bd /src/mongo/s
parent359f97d52340e356324e2bf3b724fad6a2383ea1 (diff)
downloadmongo-eb2636bcc2d6aed84d07dae2af34743a7863571d.tar.gz
SERVER-22672 Make moveChunk and shardCollection use the balancer
The moveChunk and shardCollection commands should not be calling directly into the shards when they need to move chunks and instead they should use the balancer. That way when the balancer moves to the CSRS primary, they can just start making calls into it.
Diffstat (limited to 'src/mongo/s')
-rw-r--r--src/mongo/s/balancer/balancer.cpp166
-rw-r--r--src/mongo/s/balancer/balancer.h23
-rw-r--r--src/mongo/s/balancer/balancer_chunk_selection_policy.h9
-rw-r--r--src/mongo/s/balancer/balancer_chunk_selection_policy_impl.cpp32
-rw-r--r--src/mongo/s/balancer/balancer_chunk_selection_policy_impl.h4
-rw-r--r--src/mongo/s/balancer/balancer_policy.cpp36
-rw-r--r--src/mongo/s/balancer/balancer_policy.h12
-rw-r--r--src/mongo/s/chunk.cpp62
-rw-r--r--src/mongo/s/chunk.h24
-rw-r--r--src/mongo/s/commands/cluster_move_chunk_cmd.cpp44
-rw-r--r--src/mongo/s/commands/cluster_shard_collection_cmd.cpp118
11 files changed, 292 insertions, 238 deletions
diff --git a/src/mongo/s/balancer/balancer.cpp b/src/mongo/s/balancer/balancer.cpp
index e71c0ac0e21..95215846a5f 100644
--- a/src/mongo/s/balancer/balancer.cpp
+++ b/src/mongo/s/balancer/balancer.cpp
@@ -33,11 +33,13 @@
#include "mongo/s/balancer/balancer.h"
#include "mongo/base/status_with.h"
+#include "mongo/bson/util/bson_extract.h"
#include "mongo/client/read_preference.h"
#include "mongo/client/remote_command_targeter.h"
#include "mongo/db/client.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
+#include "mongo/db/query/lite_parsed_query.h"
#include "mongo/db/server_options.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/s/balancer/balancer_chunk_selection_policy_impl.h"
@@ -52,6 +54,7 @@
#include "mongo/s/client/shard.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/grid.h"
+#include "mongo/s/move_chunk_request.h"
#include "mongo/s/shard_util.h"
#include "mongo/s/sharding_raii.h"
#include "mongo/stdx/memory.h"
@@ -75,6 +78,8 @@ namespace {
const Seconds kBalanceRoundDefaultInterval(10);
const Seconds kShortBalanceRoundInterval(1);
+const char kChunkTooBig[] = "chunkTooBig";
+
const auto getBalancer = ServiceContext::declareDecoration<Balancer>();
/**
@@ -148,6 +153,82 @@ void warnOnMultiVersion(const vector<ClusterStatistics::ShardStatistics>& cluste
warning() << sb.str();
}
+/**
+ * Blocking method, which requests a single chunk migration to run.
+ */
+Status executeSingleMigration(OperationContext* txn,
+ const MigrateInfo& migrateInfo,
+ uint64_t maxChunkSizeBytes,
+ const MigrationSecondaryThrottleOptions& secondaryThrottle,
+ bool waitForDelete) {
+ const NamespaceString nss(migrateInfo.ns);
+
+ auto scopedCMStatus = ScopedChunkManager::getExisting(txn, nss);
+ if (!scopedCMStatus.isOK()) {
+ return scopedCMStatus.getStatus();
+ }
+
+ ChunkManager* const cm = scopedCMStatus.getValue().cm();
+
+ shared_ptr<Chunk> c = cm->findIntersectingChunk(txn, migrateInfo.minKey);
+
+ BSONObjBuilder builder;
+ MoveChunkRequest::appendAsCommand(
+ &builder,
+ nss,
+ cm->getVersion(),
+ Grid::get(txn)->shardRegistry()->getConfigServerConnectionString(),
+ migrateInfo.from,
+ migrateInfo.to,
+ c->getMin(),
+ c->getMax(),
+ maxChunkSizeBytes,
+ secondaryThrottle,
+ waitForDelete);
+ builder.append(LiteParsedQuery::cmdOptionMaxTimeMS,
+ durationCount<Milliseconds>(Microseconds(txn->getRemainingMaxTimeMicros())));
+
+ BSONObj cmdObj = builder.obj();
+
+ Status status{ErrorCodes::NotYetInitialized, "Uninitialized"};
+
+ auto shard = Grid::get(txn)->shardRegistry()->getShard(txn, migrateInfo.from);
+ if (!shard) {
+ status = {ErrorCodes::ShardNotFound,
+ str::stream() << "shard " << migrateInfo.from << " not found"};
+ } else {
+ auto cmdStatus = shard->runCommand(txn,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ "admin",
+ cmdObj,
+ Shard::RetryPolicy::kNotIdempotent);
+ if (!cmdStatus.isOK()) {
+ status = std::move(cmdStatus.getStatus());
+ } else {
+ status = std::move(cmdStatus.getValue().commandStatus);
+ BSONObj cmdResponse = std::move(cmdStatus.getValue().response);
+
+ // For backwards compatibility with 3.2 and earlier, where the move chunk command
+ // instead of returning a ChunkTooBig status includes an extra field in the response
+ bool chunkTooBig = false;
+ bsonExtractBooleanFieldWithDefault(cmdResponse, kChunkTooBig, false, &chunkTooBig);
+ if (chunkTooBig) {
+ invariant(!status.isOK());
+ status = {ErrorCodes::ChunkTooBig, status.reason()};
+ }
+ }
+ }
+
+ if (!status.isOK()) {
+ log() << "Move chunk " << cmdObj << " failed" << causedBy(status);
+ return {status.code(), str::stream() << "move failed due to " << status.toString()};
+ }
+
+ cm->reload(txn);
+
+ return Status::OK();
+}
+
MONGO_FP_DECLARE(skipBalanceRound);
MONGO_FP_DECLARE(balancerRoundIntervalSetting);
@@ -183,15 +264,34 @@ Status Balancer::rebalanceSingleChunk(OperationContext* txn, const ChunkType& ch
return refreshStatus;
}
- try {
- _moveChunks(txn,
- {*migrateInfo},
- balancerConfig->getSecondaryThrottle(),
- balancerConfig->waitForDelete());
- return Status::OK();
- } catch (const DBException& e) {
- return e.toStatus();
+ _moveChunks(txn,
+ {*migrateInfo},
+ balancerConfig->getSecondaryThrottle(),
+ balancerConfig->waitForDelete());
+
+ return Status::OK();
+}
+
+Status Balancer::moveSingleChunk(OperationContext* txn,
+ const ChunkType& chunk,
+ const ShardId& newShardId,
+ uint64_t maxChunkSizeBytes,
+ const MigrationSecondaryThrottleOptions& secondaryThrottle,
+ bool waitForDelete) {
+ auto moveAllowedStatus = _chunkSelectionPolicy->checkMoveAllowed(txn, chunk, newShardId);
+ if (!moveAllowedStatus.isOK()) {
+ return moveAllowedStatus;
}
+
+ return executeSingleMigration(txn,
+ MigrateInfo(chunk.getNS(), newShardId, chunk),
+ maxChunkSizeBytes,
+ secondaryThrottle,
+ waitForDelete);
+}
+
+std::string Balancer::name() const {
+ return "Balancer";
}
void Balancer::run() {
@@ -468,8 +568,10 @@ int Balancer::_moveChunks(OperationContext* txn,
int movedCount = 0;
for (const auto& migrateInfo : candidateChunks) {
+ auto balancerConfig = Grid::get(txn)->getBalancerConfiguration();
+
// If the balancer was disabled since we started this round, don't start new chunk moves
- if (!Grid::get(txn)->getBalancerConfiguration()->isBalancerActive()) {
+ if (!balancerConfig->isBalancerActive()) {
LOG(1) << "Stopping balancing round early as balancing was disabled";
return movedCount;
}
@@ -487,47 +589,33 @@ int Balancer::_moveChunks(OperationContext* txn,
const NamespaceString nss(migrateInfo.ns);
try {
- auto scopedCM = uassertStatusOK(ScopedChunkManager::getExisting(txn, nss));
- ChunkManager* const cm = scopedCM.cm();
-
- shared_ptr<Chunk> c = cm->findIntersectingChunk(txn, migrateInfo.minKey);
-
- if (c->getMin().woCompare(migrateInfo.minKey) ||
- c->getMax().woCompare(migrateInfo.maxKey)) {
- log() << "Migration " << migrateInfo
- << " will be skipped this round due to chunk metadata mismatch.";
- scopedCM.db()->getChunkManager(txn, nss.ns(), true);
- continue;
- }
-
- BSONObj res;
- if (c->moveAndCommit(txn,
- migrateInfo.to,
- Grid::get(txn)->getBalancerConfiguration()->getMaxChunkSizeBytes(),
- secondaryThrottle,
- waitForDelete,
- 0, /* maxTimeMS */
- res)) {
+ Status status = executeSingleMigration(txn,
+ migrateInfo,
+ balancerConfig->getMaxChunkSizeBytes(),
+ balancerConfig->getSecondaryThrottle(),
+ balancerConfig->waitForDelete());
+ if (status.isOK()) {
movedCount++;
- continue;
- }
+ } else if (status == ErrorCodes::ChunkTooBig) {
+ log() << "Performing a split because migrate failed for size reasons"
+ << causedBy(status);
- log() << "balancer move failed: " << res << ", migrate: " << migrateInfo;
+ auto scopedCM = uassertStatusOK(ScopedChunkManager::getExisting(txn, nss));
+ ChunkManager* const cm = scopedCM.cm();
- Status moveStatus = getStatusFromCommandResult(res);
+ shared_ptr<Chunk> c = cm->findIntersectingChunk(txn, migrateInfo.minKey);
- if (moveStatus == ErrorCodes::ChunkTooBig || res["chunkTooBig"].trueValue()) {
- log() << "Performing a split because migrate failed for size reasons";
-
- auto splitStatus = c->split(txn, Chunk::normal, NULL);
+ auto splitStatus = c->split(txn, Chunk::normal, nullptr);
if (!splitStatus.isOK()) {
- log() << "marking chunk as jumbo: " << c->toString();
+ log() << "Marking chunk " << c->toString() << " as jumbo.";
c->markAsJumbo(txn);
// We increment moveCount so we do another round right away
movedCount++;
}
+ } else {
+ log() << "Balancer move failed" << causedBy(status);
}
} catch (const DBException& ex) {
log() << "balancer move " << migrateInfo << " failed" << causedBy(ex);
diff --git a/src/mongo/s/balancer/balancer.h b/src/mongo/s/balancer/balancer.h
index 63159beceee..ccac10d92db 100644
--- a/src/mongo/s/balancer/balancer.h
+++ b/src/mongo/s/balancer/balancer.h
@@ -72,14 +72,25 @@ public:
*/
Status rebalanceSingleChunk(OperationContext* txn, const ChunkType& chunk);
-private:
- // BackgroundJob methods implementation
+ /**
+ * Blocking call, which requests the balancer to move a single chunk to the specified location
+ * in accordance with the active balancer policy. An error will be returned if the attempt to
+ * move fails for any reason.
+ *
+ * NOTE: This call disregards the balancer enabled/disabled status and will proceed with the
+ * move regardless. If should be used only for user-initiated moves.
+ */
+ Status moveSingleChunk(OperationContext* txn,
+ const ChunkType& chunk,
+ const ShardId& newShardId,
+ uint64_t maxChunkSizeBytes,
+ const MigrationSecondaryThrottleOptions& secondaryThrottle,
+ bool waitForDelete);
- void run() override;
+private:
+ std::string name() const final;
- std::string name() const override {
- return "Balancer";
- }
+ void run() final;
/**
* Checks that the balancer can connect to all servers it needs to do its job.
diff --git a/src/mongo/s/balancer/balancer_chunk_selection_policy.h b/src/mongo/s/balancer/balancer_chunk_selection_policy.h
index 7bdf049c331..1329f3f9194 100644
--- a/src/mongo/s/balancer/balancer_chunk_selection_policy.h
+++ b/src/mongo/s/balancer/balancer_chunk_selection_policy.h
@@ -103,6 +103,15 @@ public:
virtual StatusWith<boost::optional<MigrateInfo>> selectSpecificChunkToMove(
OperationContext* txn, const ChunkType& chunk) = 0;
+ /**
+ * Asks the chunk selection policy to validate that the specified chunk migration is allowed
+ * given the current rules. Returns OK if the migration won't violate any rules or any other
+ * failed status otherwise.
+ */
+ virtual Status checkMoveAllowed(OperationContext* txn,
+ const ChunkType& chunk,
+ const ShardId& newShardId) = 0;
+
protected:
BalancerChunkSelectionPolicy();
};
diff --git a/src/mongo/s/balancer/balancer_chunk_selection_policy_impl.cpp b/src/mongo/s/balancer/balancer_chunk_selection_policy_impl.cpp
index 9993bd0d6f8..a685dee7317 100644
--- a/src/mongo/s/balancer/balancer_chunk_selection_policy_impl.cpp
+++ b/src/mongo/s/balancer/balancer_chunk_selection_policy_impl.cpp
@@ -217,6 +217,38 @@ BalancerChunkSelectionPolicyImpl::selectSpecificChunkToMove(OperationContext* tx
return boost::optional<MigrateInfo>{MigrateInfo(nss.ns(), newShardId, chunk)};
}
+Status BalancerChunkSelectionPolicyImpl::checkMoveAllowed(OperationContext* txn,
+ const ChunkType& chunk,
+ const ShardId& newShardId) {
+ auto tagForChunkStatus =
+ Grid::get(txn)->catalogManager(txn)->getTagForChunk(txn, chunk.getNS(), chunk);
+ if (!tagForChunkStatus.isOK()) {
+ return tagForChunkStatus.getStatus();
+ }
+
+ auto shardStatsStatus = _clusterStats->getStats(txn);
+ if (!shardStatsStatus.isOK()) {
+ return shardStatsStatus.getStatus();
+ }
+
+ const auto& shardStats = shardStatsStatus.getValue();
+
+ auto newShardIterator =
+ std::find_if(shardStats.begin(),
+ shardStats.end(),
+ [&newShardId](const ClusterStatistics::ShardStatistics& stat) {
+ return stat.shardId == newShardId;
+ });
+ if (newShardIterator == shardStats.end()) {
+ return {ErrorCodes::ShardNotFound,
+ str::stream() << "Unable to find constraints information for shard " << newShardId
+ << ". Move to this shard will be disallowed."};
+ }
+
+ return DistributionStatus::isShardSuitableReceiver(*newShardIterator,
+ tagForChunkStatus.getValue());
+}
+
StatusWith<SplitInfoVector> BalancerChunkSelectionPolicyImpl::_getSplitCandidatesForCollection(
OperationContext* txn, const NamespaceString& nss) {
auto scopedCMStatus = ScopedChunkManager::getExisting(txn, nss);
diff --git a/src/mongo/s/balancer/balancer_chunk_selection_policy_impl.h b/src/mongo/s/balancer/balancer_chunk_selection_policy_impl.h
index ab6230243d9..0b1cb9582a6 100644
--- a/src/mongo/s/balancer/balancer_chunk_selection_policy_impl.h
+++ b/src/mongo/s/balancer/balancer_chunk_selection_policy_impl.h
@@ -47,6 +47,10 @@ public:
StatusWith<boost::optional<MigrateInfo>> selectSpecificChunkToMove(
OperationContext* txn, const ChunkType& chunk) override;
+ Status checkMoveAllowed(OperationContext* txn,
+ const ChunkType& chunk,
+ const ShardId& newShardId) override;
+
private:
/**
* Synchronous method, which iterates the collection's chunks and uses the tags information to
diff --git a/src/mongo/s/balancer/balancer_policy.cpp b/src/mongo/s/balancer/balancer_policy.cpp
index 17cd2136979..cc7b47693e5 100644
--- a/src/mongo/s/balancer/balancer_policy.cpp
+++ b/src/mongo/s/balancer/balancer_policy.cpp
@@ -97,23 +97,35 @@ unsigned DistributionStatus::numberOfChunksInShardWithTag(const ShardId& shardId
return total;
}
+Status DistributionStatus::isShardSuitableReceiver(const ClusterStatistics::ShardStatistics& stat,
+ const string& chunkTag) {
+ if (stat.isSizeMaxed()) {
+ return {ErrorCodes::IllegalOperation,
+ str::stream() << stat.shardId
+ << " has already reached the maximum total chunk size."};
+ }
+
+ if (stat.isDraining) {
+ return {ErrorCodes::IllegalOperation,
+ str::stream() << stat.shardId << " is currently draining."};
+ }
+
+ if (!chunkTag.empty() && !stat.shardTags.count(chunkTag)) {
+ return {ErrorCodes::IllegalOperation,
+ str::stream() << stat.shardId << " doesn't have right tag"};
+ }
+
+ return Status::OK();
+}
+
string DistributionStatus::getBestReceieverShard(const string& tag) const {
string best;
unsigned minChunks = numeric_limits<unsigned>::max();
for (const auto& stat : _shardInfo) {
- if (stat.isSizeMaxed()) {
- LOG(1) << stat.shardId << " has already reached the maximum total chunk size.";
- continue;
- }
-
- if (stat.isDraining) {
- LOG(1) << stat.shardId << " is currently draining.";
- continue;
- }
-
- if (!tag.empty() && !stat.shardTags.count(tag)) {
- LOG(1) << stat.shardId << " doesn't have right tag";
+ auto status = isShardSuitableReceiver(stat, tag);
+ if (!status.isOK()) {
+ LOG(1) << status.codeString();
continue;
}
diff --git a/src/mongo/s/balancer/balancer_policy.h b/src/mongo/s/balancer/balancer_policy.h
index 45fb6409294..428349098a1 100644
--- a/src/mongo/s/balancer/balancer_policy.h
+++ b/src/mongo/s/balancer/balancer_policy.h
@@ -78,20 +78,24 @@ class DistributionStatus {
public:
DistributionStatus(ShardStatisticsVector shardInfo, const ShardToChunksMap& shardToChunksMap);
- // only used when building
-
/**
* @return if range is valid
*/
bool addTagRange(const TagRange& range);
- // ---- these methods might be better suiting in BalancerPolicy
+ /**
+ * Determines whether a shard with the specified utilization statistics would be able to accept
+ * a chunk with the specified tag. According to the policy a shard cannot accept chunks if its
+ * size is maxed out and if the chunk's tag conflicts with the tag of the shard.
+ */
+ static Status isShardSuitableReceiver(const ClusterStatistics::ShardStatistics& stat,
+ const std::string& chunkTag);
/**
* @param forTag "" if you don't care, or a tag
* @return shard best suited to receive a chunk
*/
- std::string getBestReceieverShard(const std::string& forTag) const;
+ std::string getBestReceieverShard(const std::string& tag) const;
/**
* @return the shard with the most chunks
diff --git a/src/mongo/s/chunk.cpp b/src/mongo/s/chunk.cpp
index 44ebb40b5b3..b6096e3051d 100644
--- a/src/mongo/s/chunk.cpp
+++ b/src/mongo/s/chunk.cpp
@@ -33,11 +33,9 @@
#include "mongo/s/chunk.h"
#include "mongo/client/connpool.h"
-#include "mongo/client/read_preference.h"
#include "mongo/db/commands.h"
#include "mongo/db/lasterror.h"
#include "mongo/platform/random.h"
-#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/s/balancer/balancer.h"
#include "mongo/s/balancer/balancer_configuration.h"
#include "mongo/s/catalog/catalog_manager.h"
@@ -46,7 +44,6 @@
#include "mongo/s/chunk_manager.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/grid.h"
-#include "mongo/s/move_chunk_request.h"
#include "mongo/s/shard_util.h"
#include "mongo/util/log.h"
@@ -297,65 +294,6 @@ StatusWith<boost::optional<std::pair<BSONObj, BSONObj>>> Chunk::split(
return splitStatus.getValue();
}
-bool Chunk::moveAndCommit(OperationContext* txn,
- const ShardId& toShardId,
- long long chunkSize /* bytes */,
- const MigrationSecondaryThrottleOptions& secondaryThrottle,
- bool waitForDelete,
- int maxTimeMS,
- BSONObj& res) const {
- uassert(10167, "can't move shard to its current location!", getShardId() != toShardId);
-
- BSONObjBuilder builder;
- MoveChunkRequest::appendAsCommand(&builder,
- NamespaceString(_manager->getns()),
- _manager->getVersion(),
- grid.shardRegistry()->getConfigServerConnectionString(),
- _shardId,
- toShardId,
- _min,
- _max,
- chunkSize,
- secondaryThrottle,
- waitForDelete);
- builder.append(LiteParsedQuery::cmdOptionMaxTimeMS, maxTimeMS);
-
- BSONObj cmdObj = builder.obj();
- log() << "Moving chunk with the following arguments: " << cmdObj;
-
- Status status{ErrorCodes::NotYetInitialized, "Uninitialized"};
-
- auto shard = Grid::get(txn)->shardRegistry()->getShard(txn, _shardId);
- if (!shard) {
- status = Status(ErrorCodes::ShardNotFound,
- str::stream() << "shard " << _shardId << " not found");
- } else {
- auto response = shard->runCommand(txn,
- ReadPreferenceSetting{ReadPreference::PrimaryOnly},
- "admin",
- cmdObj,
- Shard::RetryPolicy::kNotIdempotent);
- if (!response.isOK()) {
- status = std::move(response.getStatus());
- } else {
- status = std::move(response.getValue().commandStatus);
- res = std::move(response.getValue().response);
- }
- }
-
- if (status.isOK()) {
- LOG(1) << "moveChunk result: " << res;
- } else {
- warning() << "Move chunk failed" << causedBy(status);
- }
-
- // If succeeded we needs to reload the chunk manager in order to pick up the new location. If
- // failed, mongos may be stale.
- _manager->reload(txn);
-
- return status.isOK();
-}
-
bool Chunk::splitIfShould(OperationContext* txn, long dataWritten) {
LastError::Disabled d(&LastError::get(cc()));
diff --git a/src/mongo/s/chunk.h b/src/mongo/s/chunk.h
index d9cf1886a6b..28bb1e519cc 100644
--- a/src/mongo/s/chunk.h
+++ b/src/mongo/s/chunk.h
@@ -37,7 +37,6 @@ namespace mongo {
class ChunkManager;
class ChunkType;
-class MigrationSecondaryThrottleOptions;
class OperationContext;
/**
@@ -132,29 +131,6 @@ public:
SplitPointMode mode,
size_t* resultingSplits) const;
- //
- // migration support
- //
-
- /**
- * Issues a migrate request for this chunk
- *
- * @param to shard to move this chunk to
- * @param chunSize maximum number of bytes beyond which the migrate should no go trhough
- * @param writeConcern detailed write concern. NULL means the default write concern.
- * @param waitForDelete whether chunk move should wait for cleanup or return immediately
- * @param maxTimeMS max time for the migrate request
- * @param res the object containing details about the migrate execution
- * @return true if move was successful
- */
- bool moveAndCommit(OperationContext* txn,
- const ShardId& to,
- long long chunkSize,
- const MigrationSecondaryThrottleOptions& secondaryThrottle,
- bool waitForDelete,
- int maxTimeMS,
- BSONObj& res) const;
-
/**
* marks this chunk as a jumbo chunk
* that means the chunk will be inelligble for migrates
diff --git a/src/mongo/s/commands/cluster_move_chunk_cmd.cpp b/src/mongo/s/commands/cluster_move_chunk_cmd.cpp
index ae156c39753..d853f68a244 100644
--- a/src/mongo/s/commands/cluster_move_chunk_cmd.cpp
+++ b/src/mongo/s/commands/cluster_move_chunk_cmd.cpp
@@ -38,6 +38,7 @@
#include "mongo/db/client_basic.h"
#include "mongo/db/commands.h"
#include "mongo/db/write_concern_options.h"
+#include "mongo/s/balancer/balancer.h"
#include "mongo/s/balancer/balancer_configuration.h"
#include "mongo/s/catalog/catalog_cache.h"
#include "mongo/s/chunk_manager.h"
@@ -213,39 +214,30 @@ public:
}
}
- LOG(0) << "CMD: movechunk: " << cmdObj;
-
- StatusWith<int> maxTimeMS =
- LiteParsedQuery::parseMaxTimeMS(cmdObj[LiteParsedQuery::cmdOptionMaxTimeMS]);
-
- if (!maxTimeMS.isOK()) {
- errmsg = maxTimeMS.getStatus().reason();
- return false;
- }
-
const auto secondaryThrottle =
uassertStatusOK(MigrationSecondaryThrottleOptions::createFromCommand(cmdObj));
- BSONObj res;
- if (!chunk->moveAndCommit(txn,
- to->getId(),
- maxChunkSizeBytes,
- secondaryThrottle,
- cmdObj["_waitForDelete"].trueValue(),
- maxTimeMS.getValue(),
- res)) {
- errmsg = "move failed";
- result.append("cause", res);
-
- if (!res["code"].eoo()) {
- result.append(res["code"]);
- }
+ log() << "CMD: movechunk: " << cmdObj;
- return false;
+ {
+ ChunkType chunkType;
+ chunkType.setNS(nss.ns());
+ chunkType.setName(ChunkType::genID(nss.ns(), chunk->getMin()));
+ chunkType.setMin(chunk->getMin());
+ chunkType.setMax(chunk->getMax());
+ chunkType.setShard(chunk->getShardId());
+ chunkType.setVersion(info->getVersion());
+
+ uassertStatusOK(
+ Balancer::get(txn)->moveSingleChunk(txn,
+ chunkType,
+ to->getId(),
+ maxChunkSizeBytes,
+ secondaryThrottle,
+ cmdObj["_waitForDelete"].trueValue()));
}
result.append("millis", t.millis());
-
return true;
}
diff --git a/src/mongo/s/commands/cluster_shard_collection_cmd.cpp b/src/mongo/s/commands/cluster_shard_collection_cmd.cpp
index 7bebe1ac3c7..6eddee0f17c 100644
--- a/src/mongo/s/commands/cluster_shard_collection_cmd.cpp
+++ b/src/mongo/s/commands/cluster_shard_collection_cmd.cpp
@@ -44,6 +44,7 @@
#include "mongo/db/commands.h"
#include "mongo/db/hasher.h"
#include "mongo/db/write_concern_options.h"
+#include "mongo/s/balancer/balancer.h"
#include "mongo/s/balancer/balancer_configuration.h"
#include "mongo/s/catalog/catalog_cache.h"
#include "mongo/s/catalog/catalog_manager.h"
@@ -110,33 +111,17 @@ public:
int options,
std::string& errmsg,
BSONObjBuilder& result) {
- const string ns = parseNs(dbname, cmdObj);
- if (ns.size() == 0) {
- errmsg = "no ns";
- return false;
- }
+ const NamespaceString nss(parseNs(dbname, cmdObj));
+ uassert(ErrorCodes::InvalidNamespace, "Invalid namespace", nss.isValid());
- const NamespaceString nsStr(ns);
- if (!nsStr.isValid()) {
- return appendCommandStatus(
- result,
- Status(ErrorCodes::InvalidNamespace, "invalid collection namespace [" + ns + "]"));
- }
-
- auto config = uassertStatusOK(grid.catalogCache()->getDatabase(txn, nsStr.db().toString()));
- if (!config->isShardingEnabled()) {
- return appendCommandStatus(
- result,
- Status(ErrorCodes::IllegalOperation,
- str::stream() << "sharding not enabled for db " << nsStr.db()));
- }
+ auto config = uassertStatusOK(grid.catalogCache()->getDatabase(txn, nss.db().toString()));
+ uassert(ErrorCodes::IllegalOperation,
+ str::stream() << "sharding not enabled for db " << nss.db(),
+ config->isShardingEnabled());
- if (config->isSharded(ns)) {
- return appendCommandStatus(
- result,
- Status(ErrorCodes::IllegalOperation,
- str::stream() << "sharding already enabled for collection " << ns));
- }
+ uassert(ErrorCodes::IllegalOperation,
+ str::stream() << "sharding already enabled for collection " << nss.ns(),
+ !config->isSharded(nss.ns()));
// NOTE: We *must* take ownership of the key here - otherwise the shared BSONObj
// becomes corrupt as soon as the command ends.
@@ -166,10 +151,7 @@ public:
return false;
}
- if (ns.find(".system.") != string::npos) {
- errmsg = "can't shard system namespaces";
- return false;
- }
+ uassert(ErrorCodes::IllegalOperation, "can't shard system namespaces", !nss.isSystem());
vector<ShardId> shardIds;
grid.shardRegistry()->getAllShardIds(&shardIds);
@@ -200,8 +182,8 @@ public:
// check that collection is not capped
BSONObj res;
{
- list<BSONObj> all = conn->getCollectionInfos(
- config->name(), BSON("name" << nsToCollectionSubstring(ns)));
+ list<BSONObj> all =
+ conn->getCollectionInfos(config->name(), BSON("name" << nss.coll()));
if (!all.empty()) {
res = all.front().getOwned();
}
@@ -241,7 +223,7 @@ public:
// 5. If the collection is empty, and it's still possible to create an index
// on the proposed key, we go ahead and do so.
- list<BSONObj> indexes = conn->getIndexSpecs(ns);
+ list<BSONObj> indexes = conn->getIndexSpecs(nss.ns());
// 1. Verify consistency with existing unique indexes
ShardKeyPattern proposedShardKey(proposedKey);
@@ -251,7 +233,7 @@ public:
bool isUnique = idx["unique"].trueValue();
if (isUnique && !proposedShardKey.isUniqueIndexCompatible(currentKey)) {
- errmsg = str::stream() << "can't shard collection '" << ns << "' "
+ errmsg = str::stream() << "can't shard collection '" << nss.ns() << "' "
<< "with unique index on " << currentKey << " "
<< "and proposed shard key " << proposedKey << ". "
<< "Uniqueness can't be maintained unless "
@@ -276,7 +258,7 @@ public:
// per field per collection.
if (isHashedShardKey && !idx["seed"].eoo() &&
idx["seed"].numberInt() != BSONElementHasher::DEFAULT_HASH_SEED) {
- errmsg = str::stream() << "can't shard collection " << ns
+ errmsg = str::stream() << "can't shard collection " << nss.ns()
<< " with hashed shard key " << proposedKey
<< " because the hashed index uses a non-default"
<< " seed of " << idx["seed"].numberInt();
@@ -291,7 +273,7 @@ public:
// 3. If proposed key is required to be unique, additionally check for exact match.
bool careAboutUnique = cmdObj["unique"].trueValue();
if (hasUsefulIndexForKey && careAboutUnique) {
- BSONObj eqQuery = BSON("ns" << ns << "key" << proposedKey);
+ BSONObj eqQuery = BSON("ns" << nss.ns() << "key" << proposedKey);
BSONObj eqQueryResult;
for (list<BSONObj>::iterator it = indexes.begin(); it != indexes.end(); ++it) {
@@ -311,8 +293,8 @@ public:
bool isCurrentID = str::equals(currKey.firstElementFieldName(), "_id");
if (!isExplicitlyUnique && !isCurrentID) {
- errmsg = str::stream() << "can't shard collection " << ns << ", " << proposedKey
- << " index not unique, "
+ errmsg = str::stream() << "can't shard collection " << nss.ns() << ", "
+ << proposedKey << " index not unique, "
<< "and unique index explicitly specified";
conn.done();
return false;
@@ -324,7 +306,7 @@ public:
// Check 2.iii and 2.iv. Make sure no null entries in the sharding index
// and that there is a useful, non-multikey index available
BSONObjBuilder checkShardingIndexCmd;
- checkShardingIndexCmd.append("checkShardingIndex", ns);
+ checkShardingIndexCmd.append("checkShardingIndex", nss.ns());
checkShardingIndexCmd.append("keyPattern", proposedKey);
if (!conn.get()->runCommand("admin", checkShardingIndexCmd.obj(), res)) {
@@ -332,7 +314,7 @@ public:
conn.done();
return false;
}
- } else if (conn->count(ns) != 0) {
+ } else if (conn->count(nss.ns()) != 0) {
// 4. if no useful index, and collection is non-empty, fail
errmsg = str::stream() << "please create an index that starts with the "
<< "shard key before sharding.";
@@ -344,7 +326,7 @@ public:
// 5. If no useful index exists, and collection empty, create one on proposedKey.
// Only need to call ensureIndex on primary shard, since indexes get copied to
// receiving shard whenever a migrate occurs.
- Status status = clusterCreateIndex(txn, ns, proposedKey, careAboutUnique);
+ Status status = clusterCreateIndex(txn, nss.ns(), proposedKey, careAboutUnique);
if (!status.isOK()) {
errmsg = str::stream() << "ensureIndex failed to create index on "
<< "primary shard: " << status.reason();
@@ -353,7 +335,7 @@ public:
}
}
- bool isEmpty = (conn->count(ns) == 0);
+ bool isEmpty = (conn->count(nss.ns()) == 0);
conn.done();
@@ -419,25 +401,26 @@ public:
LOG(0) << "CMD: shardcollection: " << cmdObj;
- audit::logShardCollection(ClientBasic::getCurrent(), ns, proposedKey, careAboutUnique);
+ audit::logShardCollection(
+ ClientBasic::getCurrent(), nss.ns(), proposedKey, careAboutUnique);
Status status = grid.catalogManager(txn)->shardCollection(
- txn, ns, proposedShardKey, careAboutUnique, initSplits, std::set<ShardId>{});
+ txn, nss.ns(), proposedShardKey, careAboutUnique, initSplits, {});
if (!status.isOK()) {
return appendCommandStatus(result, status);
}
// Make sure the cached metadata for the collection knows that we are now sharded
- config = uassertStatusOK(grid.catalogCache()->getDatabase(txn, nsStr.db().toString()));
- config->getChunkManager(txn, nsStr.ns(), true /* force */);
+ config = uassertStatusOK(grid.catalogCache()->getDatabase(txn, nss.db().toString()));
+ config->getChunkManager(txn, nss.ns(), true /* force */);
- result << "collectionsharded" << ns;
+ result << "collectionsharded" << nss.ns();
// Only initially move chunks when using a hashed shard key
if (isHashedShardKey && isEmpty) {
// Reload the new config info. If we created more than one initial chunk, then
// we need to move them around to balance.
- shared_ptr<ChunkManager> chunkManager = config->getChunkManager(txn, ns, true);
+ shared_ptr<ChunkManager> chunkManager = config->getChunkManager(txn, nss.ns(), true);
ChunkMap chunkMap = chunkManager->getChunkMap();
// 2. Move and commit each "big chunk" to a different shard.
@@ -451,25 +434,30 @@ public:
shared_ptr<Chunk> chunk = c->second;
- // can't move chunk to shard it's already on
+ // Can't move chunk to shard it's already on
if (to->getId() == chunk->getShardId()) {
continue;
}
- BSONObj moveResult;
- WriteConcernOptions noThrottle;
- if (!chunk->moveAndCommit(
- txn,
- to->getId(),
- Grid::get(txn)->getBalancerConfiguration()->getMaxChunkSizeBytes(),
- MigrationSecondaryThrottleOptions::create(
- MigrationSecondaryThrottleOptions::kOff),
- true,
- 0,
- moveResult)) {
+ ChunkType chunkType;
+ chunkType.setNS(nss.ns());
+ chunkType.setName(ChunkType::genID(nss.ns(), chunk->getMin()));
+ chunkType.setMin(chunk->getMin());
+ chunkType.setMax(chunk->getMax());
+ chunkType.setShard(chunk->getShardId());
+ chunkType.setVersion(chunkManager->getVersion());
+
+ Status moveStatus = Balancer::get(txn)->moveSingleChunk(
+ txn,
+ chunkType,
+ to->getId(),
+ Grid::get(txn)->getBalancerConfiguration()->getMaxChunkSizeBytes(),
+ MigrationSecondaryThrottleOptions::create(
+ MigrationSecondaryThrottleOptions::kOff),
+ true);
+ if (!moveStatus.isOK()) {
warning() << "couldn't move chunk " << chunk->toString() << " to shard " << *to
- << " while sharding collection " << ns << "."
- << " Reason: " << moveResult;
+ << " while sharding collection " << nss.ns() << causedBy(moveStatus);
}
}
@@ -478,7 +466,7 @@ public:
}
// Reload the config info, after all the migrations
- chunkManager = config->getChunkManager(txn, ns, true);
+ chunkManager = config->getChunkManager(txn, nss.ns(), true);
// 3. Subdivide the big chunks by splitting at each of the points in "allSplits"
// that we haven't already split by.
@@ -491,7 +479,7 @@ public:
auto splitStatus = shardutil::splitChunkAtMultiplePoints(
txn,
currentChunk->getShardId(),
- NamespaceString(ns),
+ nss,
chunkManager->getShardKeyPattern(),
chunkManager->getVersion(),
currentChunk->getMin(),
@@ -499,7 +487,7 @@ public:
subSplits);
if (!splitStatus.isOK()) {
warning() << "couldn't split chunk " << currentChunk->toString()
- << " while sharding collection " << ns
+ << " while sharding collection " << nss.ns()
<< causedBy(splitStatus.getStatus());
}
@@ -523,7 +511,7 @@ public:
// Proactively refresh the chunk manager. Not really necessary, but this way it's
// immediately up-to-date the next time it's used.
- config->getChunkManager(txn, ns, true);
+ config->getChunkManager(txn, nss.ns(), true);
}
return true;