summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/s/balancer/balancer.cpp166
-rw-r--r--src/mongo/s/balancer/balancer.h23
-rw-r--r--src/mongo/s/balancer/balancer_chunk_selection_policy.h9
-rw-r--r--src/mongo/s/balancer/balancer_chunk_selection_policy_impl.cpp32
-rw-r--r--src/mongo/s/balancer/balancer_chunk_selection_policy_impl.h4
-rw-r--r--src/mongo/s/balancer/balancer_policy.cpp36
-rw-r--r--src/mongo/s/balancer/balancer_policy.h12
-rw-r--r--src/mongo/s/chunk.cpp62
-rw-r--r--src/mongo/s/chunk.h24
-rw-r--r--src/mongo/s/commands/cluster_move_chunk_cmd.cpp44
-rw-r--r--src/mongo/s/commands/cluster_shard_collection_cmd.cpp118
11 files changed, 292 insertions, 238 deletions
diff --git a/src/mongo/s/balancer/balancer.cpp b/src/mongo/s/balancer/balancer.cpp
index e71c0ac0e21..95215846a5f 100644
--- a/src/mongo/s/balancer/balancer.cpp
+++ b/src/mongo/s/balancer/balancer.cpp
@@ -33,11 +33,13 @@
#include "mongo/s/balancer/balancer.h"
#include "mongo/base/status_with.h"
+#include "mongo/bson/util/bson_extract.h"
#include "mongo/client/read_preference.h"
#include "mongo/client/remote_command_targeter.h"
#include "mongo/db/client.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
+#include "mongo/db/query/lite_parsed_query.h"
#include "mongo/db/server_options.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/s/balancer/balancer_chunk_selection_policy_impl.h"
@@ -52,6 +54,7 @@
#include "mongo/s/client/shard.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/grid.h"
+#include "mongo/s/move_chunk_request.h"
#include "mongo/s/shard_util.h"
#include "mongo/s/sharding_raii.h"
#include "mongo/stdx/memory.h"
@@ -75,6 +78,8 @@ namespace {
const Seconds kBalanceRoundDefaultInterval(10);
const Seconds kShortBalanceRoundInterval(1);
+const char kChunkTooBig[] = "chunkTooBig";
+
const auto getBalancer = ServiceContext::declareDecoration<Balancer>();
/**
@@ -148,6 +153,82 @@ void warnOnMultiVersion(const vector<ClusterStatistics::ShardStatistics>& cluste
warning() << sb.str();
}
+/**
+ * Blocking method, which requests a single chunk migration to run.
+ */
+Status executeSingleMigration(OperationContext* txn,
+ const MigrateInfo& migrateInfo,
+ uint64_t maxChunkSizeBytes,
+ const MigrationSecondaryThrottleOptions& secondaryThrottle,
+ bool waitForDelete) {
+ const NamespaceString nss(migrateInfo.ns);
+
+ auto scopedCMStatus = ScopedChunkManager::getExisting(txn, nss);
+ if (!scopedCMStatus.isOK()) {
+ return scopedCMStatus.getStatus();
+ }
+
+ ChunkManager* const cm = scopedCMStatus.getValue().cm();
+
+ shared_ptr<Chunk> c = cm->findIntersectingChunk(txn, migrateInfo.minKey);
+
+ BSONObjBuilder builder;
+ MoveChunkRequest::appendAsCommand(
+ &builder,
+ nss,
+ cm->getVersion(),
+ Grid::get(txn)->shardRegistry()->getConfigServerConnectionString(),
+ migrateInfo.from,
+ migrateInfo.to,
+ c->getMin(),
+ c->getMax(),
+ maxChunkSizeBytes,
+ secondaryThrottle,
+ waitForDelete);
+ builder.append(LiteParsedQuery::cmdOptionMaxTimeMS,
+ durationCount<Milliseconds>(Microseconds(txn->getRemainingMaxTimeMicros())));
+
+ BSONObj cmdObj = builder.obj();
+
+ Status status{ErrorCodes::NotYetInitialized, "Uninitialized"};
+
+ auto shard = Grid::get(txn)->shardRegistry()->getShard(txn, migrateInfo.from);
+ if (!shard) {
+ status = {ErrorCodes::ShardNotFound,
+ str::stream() << "shard " << migrateInfo.from << " not found"};
+ } else {
+ auto cmdStatus = shard->runCommand(txn,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ "admin",
+ cmdObj,
+ Shard::RetryPolicy::kNotIdempotent);
+ if (!cmdStatus.isOK()) {
+ status = std::move(cmdStatus.getStatus());
+ } else {
+ status = std::move(cmdStatus.getValue().commandStatus);
+ BSONObj cmdResponse = std::move(cmdStatus.getValue().response);
+
+ // For backwards compatibility with 3.2 and earlier, where the move chunk command
+ // instead of returning a ChunkTooBig status includes an extra field in the response
+ bool chunkTooBig = false;
+ bsonExtractBooleanFieldWithDefault(cmdResponse, kChunkTooBig, false, &chunkTooBig);
+ if (chunkTooBig) {
+ invariant(!status.isOK());
+ status = {ErrorCodes::ChunkTooBig, status.reason()};
+ }
+ }
+ }
+
+ if (!status.isOK()) {
+ log() << "Move chunk " << cmdObj << " failed" << causedBy(status);
+ return {status.code(), str::stream() << "move failed due to " << status.toString()};
+ }
+
+ cm->reload(txn);
+
+ return Status::OK();
+}
+
MONGO_FP_DECLARE(skipBalanceRound);
MONGO_FP_DECLARE(balancerRoundIntervalSetting);
@@ -183,15 +264,34 @@ Status Balancer::rebalanceSingleChunk(OperationContext* txn, const ChunkType& ch
return refreshStatus;
}
- try {
- _moveChunks(txn,
- {*migrateInfo},
- balancerConfig->getSecondaryThrottle(),
- balancerConfig->waitForDelete());
- return Status::OK();
- } catch (const DBException& e) {
- return e.toStatus();
+ _moveChunks(txn,
+ {*migrateInfo},
+ balancerConfig->getSecondaryThrottle(),
+ balancerConfig->waitForDelete());
+
+ return Status::OK();
+}
+
+Status Balancer::moveSingleChunk(OperationContext* txn,
+ const ChunkType& chunk,
+ const ShardId& newShardId,
+ uint64_t maxChunkSizeBytes,
+ const MigrationSecondaryThrottleOptions& secondaryThrottle,
+ bool waitForDelete) {
+ auto moveAllowedStatus = _chunkSelectionPolicy->checkMoveAllowed(txn, chunk, newShardId);
+ if (!moveAllowedStatus.isOK()) {
+ return moveAllowedStatus;
}
+
+ return executeSingleMigration(txn,
+ MigrateInfo(chunk.getNS(), newShardId, chunk),
+ maxChunkSizeBytes,
+ secondaryThrottle,
+ waitForDelete);
+}
+
+std::string Balancer::name() const {
+ return "Balancer";
}
void Balancer::run() {
@@ -468,8 +568,10 @@ int Balancer::_moveChunks(OperationContext* txn,
int movedCount = 0;
for (const auto& migrateInfo : candidateChunks) {
+ auto balancerConfig = Grid::get(txn)->getBalancerConfiguration();
+
// If the balancer was disabled since we started this round, don't start new chunk moves
- if (!Grid::get(txn)->getBalancerConfiguration()->isBalancerActive()) {
+ if (!balancerConfig->isBalancerActive()) {
LOG(1) << "Stopping balancing round early as balancing was disabled";
return movedCount;
}
@@ -487,47 +589,33 @@ int Balancer::_moveChunks(OperationContext* txn,
const NamespaceString nss(migrateInfo.ns);
try {
- auto scopedCM = uassertStatusOK(ScopedChunkManager::getExisting(txn, nss));
- ChunkManager* const cm = scopedCM.cm();
-
- shared_ptr<Chunk> c = cm->findIntersectingChunk(txn, migrateInfo.minKey);
-
- if (c->getMin().woCompare(migrateInfo.minKey) ||
- c->getMax().woCompare(migrateInfo.maxKey)) {
- log() << "Migration " << migrateInfo
- << " will be skipped this round due to chunk metadata mismatch.";
- scopedCM.db()->getChunkManager(txn, nss.ns(), true);
- continue;
- }
-
- BSONObj res;
- if (c->moveAndCommit(txn,
- migrateInfo.to,
- Grid::get(txn)->getBalancerConfiguration()->getMaxChunkSizeBytes(),
- secondaryThrottle,
- waitForDelete,
- 0, /* maxTimeMS */
- res)) {
+ Status status = executeSingleMigration(txn,
+ migrateInfo,
+ balancerConfig->getMaxChunkSizeBytes(),
+ balancerConfig->getSecondaryThrottle(),
+ balancerConfig->waitForDelete());
+ if (status.isOK()) {
movedCount++;
- continue;
- }
+ } else if (status == ErrorCodes::ChunkTooBig) {
+ log() << "Performing a split because migrate failed for size reasons"
+ << causedBy(status);
- log() << "balancer move failed: " << res << ", migrate: " << migrateInfo;
+ auto scopedCM = uassertStatusOK(ScopedChunkManager::getExisting(txn, nss));
+ ChunkManager* const cm = scopedCM.cm();
- Status moveStatus = getStatusFromCommandResult(res);
+ shared_ptr<Chunk> c = cm->findIntersectingChunk(txn, migrateInfo.minKey);
- if (moveStatus == ErrorCodes::ChunkTooBig || res["chunkTooBig"].trueValue()) {
- log() << "Performing a split because migrate failed for size reasons";
-
- auto splitStatus = c->split(txn, Chunk::normal, NULL);
+ auto splitStatus = c->split(txn, Chunk::normal, nullptr);
if (!splitStatus.isOK()) {
- log() << "marking chunk as jumbo: " << c->toString();
+ log() << "Marking chunk " << c->toString() << " as jumbo.";
c->markAsJumbo(txn);
// We increment moveCount so we do another round right away
movedCount++;
}
+ } else {
+ log() << "Balancer move failed" << causedBy(status);
}
} catch (const DBException& ex) {
log() << "balancer move " << migrateInfo << " failed" << causedBy(ex);
diff --git a/src/mongo/s/balancer/balancer.h b/src/mongo/s/balancer/balancer.h
index 63159beceee..ccac10d92db 100644
--- a/src/mongo/s/balancer/balancer.h
+++ b/src/mongo/s/balancer/balancer.h
@@ -72,14 +72,25 @@ public:
*/
Status rebalanceSingleChunk(OperationContext* txn, const ChunkType& chunk);
-private:
- // BackgroundJob methods implementation
+ /**
+ * Blocking call, which requests the balancer to move a single chunk to the specified location
+ * in accordance with the active balancer policy. An error will be returned if the attempt to
+ * move fails for any reason.
+ *
+ * NOTE: This call disregards the balancer enabled/disabled status and will proceed with the
+ * move regardless. If should be used only for user-initiated moves.
+ */
+ Status moveSingleChunk(OperationContext* txn,
+ const ChunkType& chunk,
+ const ShardId& newShardId,
+ uint64_t maxChunkSizeBytes,
+ const MigrationSecondaryThrottleOptions& secondaryThrottle,
+ bool waitForDelete);
- void run() override;
+private:
+ std::string name() const final;
- std::string name() const override {
- return "Balancer";
- }
+ void run() final;
/**
* Checks that the balancer can connect to all servers it needs to do its job.
diff --git a/src/mongo/s/balancer/balancer_chunk_selection_policy.h b/src/mongo/s/balancer/balancer_chunk_selection_policy.h
index 7bdf049c331..1329f3f9194 100644
--- a/src/mongo/s/balancer/balancer_chunk_selection_policy.h
+++ b/src/mongo/s/balancer/balancer_chunk_selection_policy.h
@@ -103,6 +103,15 @@ public:
virtual StatusWith<boost::optional<MigrateInfo>> selectSpecificChunkToMove(
OperationContext* txn, const ChunkType& chunk) = 0;
+ /**
+ * Asks the chunk selection policy to validate that the specified chunk migration is allowed
+ * given the current rules. Returns OK if the migration won't violate any rules or any other
+ * failed status otherwise.
+ */
+ virtual Status checkMoveAllowed(OperationContext* txn,
+ const ChunkType& chunk,
+ const ShardId& newShardId) = 0;
+
protected:
BalancerChunkSelectionPolicy();
};
diff --git a/src/mongo/s/balancer/balancer_chunk_selection_policy_impl.cpp b/src/mongo/s/balancer/balancer_chunk_selection_policy_impl.cpp
index 9993bd0d6f8..a685dee7317 100644
--- a/src/mongo/s/balancer/balancer_chunk_selection_policy_impl.cpp
+++ b/src/mongo/s/balancer/balancer_chunk_selection_policy_impl.cpp
@@ -217,6 +217,38 @@ BalancerChunkSelectionPolicyImpl::selectSpecificChunkToMove(OperationContext* tx
return boost::optional<MigrateInfo>{MigrateInfo(nss.ns(), newShardId, chunk)};
}
+Status BalancerChunkSelectionPolicyImpl::checkMoveAllowed(OperationContext* txn,
+ const ChunkType& chunk,
+ const ShardId& newShardId) {
+ auto tagForChunkStatus =
+ Grid::get(txn)->catalogManager(txn)->getTagForChunk(txn, chunk.getNS(), chunk);
+ if (!tagForChunkStatus.isOK()) {
+ return tagForChunkStatus.getStatus();
+ }
+
+ auto shardStatsStatus = _clusterStats->getStats(txn);
+ if (!shardStatsStatus.isOK()) {
+ return shardStatsStatus.getStatus();
+ }
+
+ const auto& shardStats = shardStatsStatus.getValue();
+
+ auto newShardIterator =
+ std::find_if(shardStats.begin(),
+ shardStats.end(),
+ [&newShardId](const ClusterStatistics::ShardStatistics& stat) {
+ return stat.shardId == newShardId;
+ });
+ if (newShardIterator == shardStats.end()) {
+ return {ErrorCodes::ShardNotFound,
+ str::stream() << "Unable to find constraints information for shard " << newShardId
+ << ". Move to this shard will be disallowed."};
+ }
+
+ return DistributionStatus::isShardSuitableReceiver(*newShardIterator,
+ tagForChunkStatus.getValue());
+}
+
StatusWith<SplitInfoVector> BalancerChunkSelectionPolicyImpl::_getSplitCandidatesForCollection(
OperationContext* txn, const NamespaceString& nss) {
auto scopedCMStatus = ScopedChunkManager::getExisting(txn, nss);
diff --git a/src/mongo/s/balancer/balancer_chunk_selection_policy_impl.h b/src/mongo/s/balancer/balancer_chunk_selection_policy_impl.h
index ab6230243d9..0b1cb9582a6 100644
--- a/src/mongo/s/balancer/balancer_chunk_selection_policy_impl.h
+++ b/src/mongo/s/balancer/balancer_chunk_selection_policy_impl.h
@@ -47,6 +47,10 @@ public:
StatusWith<boost::optional<MigrateInfo>> selectSpecificChunkToMove(
OperationContext* txn, const ChunkType& chunk) override;
+ Status checkMoveAllowed(OperationContext* txn,
+ const ChunkType& chunk,
+ const ShardId& newShardId) override;
+
private:
/**
* Synchronous method, which iterates the collection's chunks and uses the tags information to
diff --git a/src/mongo/s/balancer/balancer_policy.cpp b/src/mongo/s/balancer/balancer_policy.cpp
index 17cd2136979..cc7b47693e5 100644
--- a/src/mongo/s/balancer/balancer_policy.cpp
+++ b/src/mongo/s/balancer/balancer_policy.cpp
@@ -97,23 +97,35 @@ unsigned DistributionStatus::numberOfChunksInShardWithTag(const ShardId& shardId
return total;
}
+Status DistributionStatus::isShardSuitableReceiver(const ClusterStatistics::ShardStatistics& stat,
+ const string& chunkTag) {
+ if (stat.isSizeMaxed()) {
+ return {ErrorCodes::IllegalOperation,
+ str::stream() << stat.shardId
+ << " has already reached the maximum total chunk size."};
+ }
+
+ if (stat.isDraining) {
+ return {ErrorCodes::IllegalOperation,
+ str::stream() << stat.shardId << " is currently draining."};
+ }
+
+ if (!chunkTag.empty() && !stat.shardTags.count(chunkTag)) {
+ return {ErrorCodes::IllegalOperation,
+ str::stream() << stat.shardId << " doesn't have right tag"};
+ }
+
+ return Status::OK();
+}
+
string DistributionStatus::getBestReceieverShard(const string& tag) const {
string best;
unsigned minChunks = numeric_limits<unsigned>::max();
for (const auto& stat : _shardInfo) {
- if (stat.isSizeMaxed()) {
- LOG(1) << stat.shardId << " has already reached the maximum total chunk size.";
- continue;
- }
-
- if (stat.isDraining) {
- LOG(1) << stat.shardId << " is currently draining.";
- continue;
- }
-
- if (!tag.empty() && !stat.shardTags.count(tag)) {
- LOG(1) << stat.shardId << " doesn't have right tag";
+ auto status = isShardSuitableReceiver(stat, tag);
+ if (!status.isOK()) {
+ LOG(1) << status.codeString();
continue;
}
diff --git a/src/mongo/s/balancer/balancer_policy.h b/src/mongo/s/balancer/balancer_policy.h
index 45fb6409294..428349098a1 100644
--- a/src/mongo/s/balancer/balancer_policy.h
+++ b/src/mongo/s/balancer/balancer_policy.h
@@ -78,20 +78,24 @@ class DistributionStatus {
public:
DistributionStatus(ShardStatisticsVector shardInfo, const ShardToChunksMap& shardToChunksMap);
- // only used when building
-
/**
* @return if range is valid
*/
bool addTagRange(const TagRange& range);
- // ---- these methods might be better suiting in BalancerPolicy
+ /**
+ * Determines whether a shard with the specified utilization statistics would be able to accept
+ * a chunk with the specified tag. According to the policy a shard cannot accept chunks if its
+ * size is maxed out and if the chunk's tag conflicts with the tag of the shard.
+ */
+ static Status isShardSuitableReceiver(const ClusterStatistics::ShardStatistics& stat,
+ const std::string& chunkTag);
/**
* @param forTag "" if you don't care, or a tag
* @return shard best suited to receive a chunk
*/
- std::string getBestReceieverShard(const std::string& forTag) const;
+ std::string getBestReceieverShard(const std::string& tag) const;
/**
* @return the shard with the most chunks
diff --git a/src/mongo/s/chunk.cpp b/src/mongo/s/chunk.cpp
index 44ebb40b5b3..b6096e3051d 100644
--- a/src/mongo/s/chunk.cpp
+++ b/src/mongo/s/chunk.cpp
@@ -33,11 +33,9 @@
#include "mongo/s/chunk.h"
#include "mongo/client/connpool.h"
-#include "mongo/client/read_preference.h"
#include "mongo/db/commands.h"
#include "mongo/db/lasterror.h"
#include "mongo/platform/random.h"
-#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/s/balancer/balancer.h"
#include "mongo/s/balancer/balancer_configuration.h"
#include "mongo/s/catalog/catalog_manager.h"
@@ -46,7 +44,6 @@
#include "mongo/s/chunk_manager.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/grid.h"
-#include "mongo/s/move_chunk_request.h"
#include "mongo/s/shard_util.h"
#include "mongo/util/log.h"
@@ -297,65 +294,6 @@ StatusWith<boost::optional<std::pair<BSONObj, BSONObj>>> Chunk::split(
return splitStatus.getValue();
}
-bool Chunk::moveAndCommit(OperationContext* txn,
- const ShardId& toShardId,
- long long chunkSize /* bytes */,
- const MigrationSecondaryThrottleOptions& secondaryThrottle,
- bool waitForDelete,
- int maxTimeMS,
- BSONObj& res) const {
- uassert(10167, "can't move shard to its current location!", getShardId() != toShardId);
-
- BSONObjBuilder builder;
- MoveChunkRequest::appendAsCommand(&builder,
- NamespaceString(_manager->getns()),
- _manager->getVersion(),
- grid.shardRegistry()->getConfigServerConnectionString(),
- _shardId,
- toShardId,
- _min,
- _max,
- chunkSize,
- secondaryThrottle,
- waitForDelete);
- builder.append(LiteParsedQuery::cmdOptionMaxTimeMS, maxTimeMS);
-
- BSONObj cmdObj = builder.obj();
- log() << "Moving chunk with the following arguments: " << cmdObj;
-
- Status status{ErrorCodes::NotYetInitialized, "Uninitialized"};
-
- auto shard = Grid::get(txn)->shardRegistry()->getShard(txn, _shardId);
- if (!shard) {
- status = Status(ErrorCodes::ShardNotFound,
- str::stream() << "shard " << _shardId << " not found");
- } else {
- auto response = shard->runCommand(txn,
- ReadPreferenceSetting{ReadPreference::PrimaryOnly},
- "admin",
- cmdObj,
- Shard::RetryPolicy::kNotIdempotent);
- if (!response.isOK()) {
- status = std::move(response.getStatus());
- } else {
- status = std::move(response.getValue().commandStatus);
- res = std::move(response.getValue().response);
- }
- }
-
- if (status.isOK()) {
- LOG(1) << "moveChunk result: " << res;
- } else {
- warning() << "Move chunk failed" << causedBy(status);
- }
-
- // If succeeded we needs to reload the chunk manager in order to pick up the new location. If
- // failed, mongos may be stale.
- _manager->reload(txn);
-
- return status.isOK();
-}
-
bool Chunk::splitIfShould(OperationContext* txn, long dataWritten) {
LastError::Disabled d(&LastError::get(cc()));
diff --git a/src/mongo/s/chunk.h b/src/mongo/s/chunk.h
index d9cf1886a6b..28bb1e519cc 100644
--- a/src/mongo/s/chunk.h
+++ b/src/mongo/s/chunk.h
@@ -37,7 +37,6 @@ namespace mongo {
class ChunkManager;
class ChunkType;
-class MigrationSecondaryThrottleOptions;
class OperationContext;
/**
@@ -132,29 +131,6 @@ public:
SplitPointMode mode,
size_t* resultingSplits) const;
- //
- // migration support
- //
-
- /**
- * Issues a migrate request for this chunk
- *
- * @param to shard to move this chunk to
- * @param chunSize maximum number of bytes beyond which the migrate should no go trhough
- * @param writeConcern detailed write concern. NULL means the default write concern.
- * @param waitForDelete whether chunk move should wait for cleanup or return immediately
- * @param maxTimeMS max time for the migrate request
- * @param res the object containing details about the migrate execution
- * @return true if move was successful
- */
- bool moveAndCommit(OperationContext* txn,
- const ShardId& to,
- long long chunkSize,
- const MigrationSecondaryThrottleOptions& secondaryThrottle,
- bool waitForDelete,
- int maxTimeMS,
- BSONObj& res) const;
-
/**
* marks this chunk as a jumbo chunk
* that means the chunk will be inelligble for migrates
diff --git a/src/mongo/s/commands/cluster_move_chunk_cmd.cpp b/src/mongo/s/commands/cluster_move_chunk_cmd.cpp
index ae156c39753..d853f68a244 100644
--- a/src/mongo/s/commands/cluster_move_chunk_cmd.cpp
+++ b/src/mongo/s/commands/cluster_move_chunk_cmd.cpp
@@ -38,6 +38,7 @@
#include "mongo/db/client_basic.h"
#include "mongo/db/commands.h"
#include "mongo/db/write_concern_options.h"
+#include "mongo/s/balancer/balancer.h"
#include "mongo/s/balancer/balancer_configuration.h"
#include "mongo/s/catalog/catalog_cache.h"
#include "mongo/s/chunk_manager.h"
@@ -213,39 +214,30 @@ public:
}
}
- LOG(0) << "CMD: movechunk: " << cmdObj;
-
- StatusWith<int> maxTimeMS =
- LiteParsedQuery::parseMaxTimeMS(cmdObj[LiteParsedQuery::cmdOptionMaxTimeMS]);
-
- if (!maxTimeMS.isOK()) {
- errmsg = maxTimeMS.getStatus().reason();
- return false;
- }
-
const auto secondaryThrottle =
uassertStatusOK(MigrationSecondaryThrottleOptions::createFromCommand(cmdObj));
- BSONObj res;
- if (!chunk->moveAndCommit(txn,
- to->getId(),
- maxChunkSizeBytes,
- secondaryThrottle,
- cmdObj["_waitForDelete"].trueValue(),
- maxTimeMS.getValue(),
- res)) {
- errmsg = "move failed";
- result.append("cause", res);
-
- if (!res["code"].eoo()) {
- result.append(res["code"]);
- }
+ log() << "CMD: movechunk: " << cmdObj;
- return false;
+ {
+ ChunkType chunkType;
+ chunkType.setNS(nss.ns());
+ chunkType.setName(ChunkType::genID(nss.ns(), chunk->getMin()));
+ chunkType.setMin(chunk->getMin());
+ chunkType.setMax(chunk->getMax());
+ chunkType.setShard(chunk->getShardId());
+ chunkType.setVersion(info->getVersion());
+
+ uassertStatusOK(
+ Balancer::get(txn)->moveSingleChunk(txn,
+ chunkType,
+ to->getId(),
+ maxChunkSizeBytes,
+ secondaryThrottle,
+ cmdObj["_waitForDelete"].trueValue()));
}
result.append("millis", t.millis());
-
return true;
}
diff --git a/src/mongo/s/commands/cluster_shard_collection_cmd.cpp b/src/mongo/s/commands/cluster_shard_collection_cmd.cpp
index 7bebe1ac3c7..6eddee0f17c 100644
--- a/src/mongo/s/commands/cluster_shard_collection_cmd.cpp
+++ b/src/mongo/s/commands/cluster_shard_collection_cmd.cpp
@@ -44,6 +44,7 @@
#include "mongo/db/commands.h"
#include "mongo/db/hasher.h"
#include "mongo/db/write_concern_options.h"
+#include "mongo/s/balancer/balancer.h"
#include "mongo/s/balancer/balancer_configuration.h"
#include "mongo/s/catalog/catalog_cache.h"
#include "mongo/s/catalog/catalog_manager.h"
@@ -110,33 +111,17 @@ public:
int options,
std::string& errmsg,
BSONObjBuilder& result) {
- const string ns = parseNs(dbname, cmdObj);
- if (ns.size() == 0) {
- errmsg = "no ns";
- return false;
- }
+ const NamespaceString nss(parseNs(dbname, cmdObj));
+ uassert(ErrorCodes::InvalidNamespace, "Invalid namespace", nss.isValid());
- const NamespaceString nsStr(ns);
- if (!nsStr.isValid()) {
- return appendCommandStatus(
- result,
- Status(ErrorCodes::InvalidNamespace, "invalid collection namespace [" + ns + "]"));
- }
-
- auto config = uassertStatusOK(grid.catalogCache()->getDatabase(txn, nsStr.db().toString()));
- if (!config->isShardingEnabled()) {
- return appendCommandStatus(
- result,
- Status(ErrorCodes::IllegalOperation,
- str::stream() << "sharding not enabled for db " << nsStr.db()));
- }
+ auto config = uassertStatusOK(grid.catalogCache()->getDatabase(txn, nss.db().toString()));
+ uassert(ErrorCodes::IllegalOperation,
+ str::stream() << "sharding not enabled for db " << nss.db(),
+ config->isShardingEnabled());
- if (config->isSharded(ns)) {
- return appendCommandStatus(
- result,
- Status(ErrorCodes::IllegalOperation,
- str::stream() << "sharding already enabled for collection " << ns));
- }
+ uassert(ErrorCodes::IllegalOperation,
+ str::stream() << "sharding already enabled for collection " << nss.ns(),
+ !config->isSharded(nss.ns()));
// NOTE: We *must* take ownership of the key here - otherwise the shared BSONObj
// becomes corrupt as soon as the command ends.
@@ -166,10 +151,7 @@ public:
return false;
}
- if (ns.find(".system.") != string::npos) {
- errmsg = "can't shard system namespaces";
- return false;
- }
+ uassert(ErrorCodes::IllegalOperation, "can't shard system namespaces", !nss.isSystem());
vector<ShardId> shardIds;
grid.shardRegistry()->getAllShardIds(&shardIds);
@@ -200,8 +182,8 @@ public:
// check that collection is not capped
BSONObj res;
{
- list<BSONObj> all = conn->getCollectionInfos(
- config->name(), BSON("name" << nsToCollectionSubstring(ns)));
+ list<BSONObj> all =
+ conn->getCollectionInfos(config->name(), BSON("name" << nss.coll()));
if (!all.empty()) {
res = all.front().getOwned();
}
@@ -241,7 +223,7 @@ public:
// 5. If the collection is empty, and it's still possible to create an index
// on the proposed key, we go ahead and do so.
- list<BSONObj> indexes = conn->getIndexSpecs(ns);
+ list<BSONObj> indexes = conn->getIndexSpecs(nss.ns());
// 1. Verify consistency with existing unique indexes
ShardKeyPattern proposedShardKey(proposedKey);
@@ -251,7 +233,7 @@ public:
bool isUnique = idx["unique"].trueValue();
if (isUnique && !proposedShardKey.isUniqueIndexCompatible(currentKey)) {
- errmsg = str::stream() << "can't shard collection '" << ns << "' "
+ errmsg = str::stream() << "can't shard collection '" << nss.ns() << "' "
<< "with unique index on " << currentKey << " "
<< "and proposed shard key " << proposedKey << ". "
<< "Uniqueness can't be maintained unless "
@@ -276,7 +258,7 @@ public:
// per field per collection.
if (isHashedShardKey && !idx["seed"].eoo() &&
idx["seed"].numberInt() != BSONElementHasher::DEFAULT_HASH_SEED) {
- errmsg = str::stream() << "can't shard collection " << ns
+ errmsg = str::stream() << "can't shard collection " << nss.ns()
<< " with hashed shard key " << proposedKey
<< " because the hashed index uses a non-default"
<< " seed of " << idx["seed"].numberInt();
@@ -291,7 +273,7 @@ public:
// 3. If proposed key is required to be unique, additionally check for exact match.
bool careAboutUnique = cmdObj["unique"].trueValue();
if (hasUsefulIndexForKey && careAboutUnique) {
- BSONObj eqQuery = BSON("ns" << ns << "key" << proposedKey);
+ BSONObj eqQuery = BSON("ns" << nss.ns() << "key" << proposedKey);
BSONObj eqQueryResult;
for (list<BSONObj>::iterator it = indexes.begin(); it != indexes.end(); ++it) {
@@ -311,8 +293,8 @@ public:
bool isCurrentID = str::equals(currKey.firstElementFieldName(), "_id");
if (!isExplicitlyUnique && !isCurrentID) {
- errmsg = str::stream() << "can't shard collection " << ns << ", " << proposedKey
- << " index not unique, "
+ errmsg = str::stream() << "can't shard collection " << nss.ns() << ", "
+ << proposedKey << " index not unique, "
<< "and unique index explicitly specified";
conn.done();
return false;
@@ -324,7 +306,7 @@ public:
// Check 2.iii and 2.iv. Make sure no null entries in the sharding index
// and that there is a useful, non-multikey index available
BSONObjBuilder checkShardingIndexCmd;
- checkShardingIndexCmd.append("checkShardingIndex", ns);
+ checkShardingIndexCmd.append("checkShardingIndex", nss.ns());
checkShardingIndexCmd.append("keyPattern", proposedKey);
if (!conn.get()->runCommand("admin", checkShardingIndexCmd.obj(), res)) {
@@ -332,7 +314,7 @@ public:
conn.done();
return false;
}
- } else if (conn->count(ns) != 0) {
+ } else if (conn->count(nss.ns()) != 0) {
// 4. if no useful index, and collection is non-empty, fail
errmsg = str::stream() << "please create an index that starts with the "
<< "shard key before sharding.";
@@ -344,7 +326,7 @@ public:
// 5. If no useful index exists, and collection empty, create one on proposedKey.
// Only need to call ensureIndex on primary shard, since indexes get copied to
// receiving shard whenever a migrate occurs.
- Status status = clusterCreateIndex(txn, ns, proposedKey, careAboutUnique);
+ Status status = clusterCreateIndex(txn, nss.ns(), proposedKey, careAboutUnique);
if (!status.isOK()) {
errmsg = str::stream() << "ensureIndex failed to create index on "
<< "primary shard: " << status.reason();
@@ -353,7 +335,7 @@ public:
}
}
- bool isEmpty = (conn->count(ns) == 0);
+ bool isEmpty = (conn->count(nss.ns()) == 0);
conn.done();
@@ -419,25 +401,26 @@ public:
LOG(0) << "CMD: shardcollection: " << cmdObj;
- audit::logShardCollection(ClientBasic::getCurrent(), ns, proposedKey, careAboutUnique);
+ audit::logShardCollection(
+ ClientBasic::getCurrent(), nss.ns(), proposedKey, careAboutUnique);
Status status = grid.catalogManager(txn)->shardCollection(
- txn, ns, proposedShardKey, careAboutUnique, initSplits, std::set<ShardId>{});
+ txn, nss.ns(), proposedShardKey, careAboutUnique, initSplits, {});
if (!status.isOK()) {
return appendCommandStatus(result, status);
}
// Make sure the cached metadata for the collection knows that we are now sharded
- config = uassertStatusOK(grid.catalogCache()->getDatabase(txn, nsStr.db().toString()));
- config->getChunkManager(txn, nsStr.ns(), true /* force */);
+ config = uassertStatusOK(grid.catalogCache()->getDatabase(txn, nss.db().toString()));
+ config->getChunkManager(txn, nss.ns(), true /* force */);
- result << "collectionsharded" << ns;
+ result << "collectionsharded" << nss.ns();
// Only initially move chunks when using a hashed shard key
if (isHashedShardKey && isEmpty) {
// Reload the new config info. If we created more than one initial chunk, then
// we need to move them around to balance.
- shared_ptr<ChunkManager> chunkManager = config->getChunkManager(txn, ns, true);
+ shared_ptr<ChunkManager> chunkManager = config->getChunkManager(txn, nss.ns(), true);
ChunkMap chunkMap = chunkManager->getChunkMap();
// 2. Move and commit each "big chunk" to a different shard.
@@ -451,25 +434,30 @@ public:
shared_ptr<Chunk> chunk = c->second;
- // can't move chunk to shard it's already on
+ // Can't move chunk to shard it's already on
if (to->getId() == chunk->getShardId()) {
continue;
}
- BSONObj moveResult;
- WriteConcernOptions noThrottle;
- if (!chunk->moveAndCommit(
- txn,
- to->getId(),
- Grid::get(txn)->getBalancerConfiguration()->getMaxChunkSizeBytes(),
- MigrationSecondaryThrottleOptions::create(
- MigrationSecondaryThrottleOptions::kOff),
- true,
- 0,
- moveResult)) {
+ ChunkType chunkType;
+ chunkType.setNS(nss.ns());
+ chunkType.setName(ChunkType::genID(nss.ns(), chunk->getMin()));
+ chunkType.setMin(chunk->getMin());
+ chunkType.setMax(chunk->getMax());
+ chunkType.setShard(chunk->getShardId());
+ chunkType.setVersion(chunkManager->getVersion());
+
+ Status moveStatus = Balancer::get(txn)->moveSingleChunk(
+ txn,
+ chunkType,
+ to->getId(),
+ Grid::get(txn)->getBalancerConfiguration()->getMaxChunkSizeBytes(),
+ MigrationSecondaryThrottleOptions::create(
+ MigrationSecondaryThrottleOptions::kOff),
+ true);
+ if (!moveStatus.isOK()) {
warning() << "couldn't move chunk " << chunk->toString() << " to shard " << *to
- << " while sharding collection " << ns << "."
- << " Reason: " << moveResult;
+ << " while sharding collection " << nss.ns() << causedBy(moveStatus);
}
}
@@ -478,7 +466,7 @@ public:
}
// Reload the config info, after all the migrations
- chunkManager = config->getChunkManager(txn, ns, true);
+ chunkManager = config->getChunkManager(txn, nss.ns(), true);
// 3. Subdivide the big chunks by splitting at each of the points in "allSplits"
// that we haven't already split by.
@@ -491,7 +479,7 @@ public:
auto splitStatus = shardutil::splitChunkAtMultiplePoints(
txn,
currentChunk->getShardId(),
- NamespaceString(ns),
+ nss,
chunkManager->getShardKeyPattern(),
chunkManager->getVersion(),
currentChunk->getMin(),
@@ -499,7 +487,7 @@ public:
subSplits);
if (!splitStatus.isOK()) {
warning() << "couldn't split chunk " << currentChunk->toString()
- << " while sharding collection " << ns
+ << " while sharding collection " << nss.ns()
<< causedBy(splitStatus.getStatus());
}
@@ -523,7 +511,7 @@ public:
// Proactively refresh the chunk manager. Not really necessary, but this way it's
// immediately up-to-date the next time it's used.
- config->getChunkManager(txn, ns, true);
+ config->getChunkManager(txn, nss.ns(), true);
}
return true;