diff options
Diffstat (limited to 'src/mongo/s/commands/cluster_merge_chunks_cmd.cpp')
-rw-r--r-- | src/mongo/s/commands/cluster_merge_chunks_cmd.cpp | 292 |
1 files changed, 150 insertions, 142 deletions
diff --git a/src/mongo/s/commands/cluster_merge_chunks_cmd.cpp b/src/mongo/s/commands/cluster_merge_chunks_cmd.cpp index fba40110919..c6173244fbb 100644 --- a/src/mongo/s/commands/cluster_merge_chunks_cmd.cpp +++ b/src/mongo/s/commands/cluster_merge_chunks_cmd.cpp @@ -45,160 +45,168 @@ namespace mongo { - using std::shared_ptr; - using std::string; - using std::stringstream; - using std::vector; +using std::shared_ptr; +using std::string; +using std::stringstream; +using std::vector; namespace { - /** - * Mongos-side command for merging chunks, passes command to appropriate shard. - */ - class ClusterMergeChunksCommand : public Command { - public: - ClusterMergeChunksCommand() : Command("mergeChunks") {} +/** + * Mongos-side command for merging chunks, passes command to appropriate shard. + */ +class ClusterMergeChunksCommand : public Command { +public: + ClusterMergeChunksCommand() : Command("mergeChunks") {} + + virtual void help(stringstream& h) const { + h << "Merge Chunks command\n" + << "usage: { mergeChunks : <ns>, bounds : [ <min key>, <max key> ] }"; + } + + virtual Status checkAuthForCommand(ClientBasic* client, + const std::string& dbname, + const BSONObj& cmdObj) { + if (!AuthorizationSession::get(client)->isAuthorizedForActionsOnResource( + ResourcePattern::forExactNamespace(NamespaceString(parseNs(dbname, cmdObj))), + ActionType::splitChunk)) { + return Status(ErrorCodes::Unauthorized, "Unauthorized"); + } + return Status::OK(); + } + + virtual std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const { + return parseNsFullyQualified(dbname, cmdObj); + } + + virtual bool adminOnly() const { + return true; + } + virtual bool slaveOk() const { + return false; + } + virtual bool isWriteCommandForConfigServer() const { + return false; + } + + // Required + static BSONField<string> nsField; + static BSONField<vector<BSONObj>> boundsField; + + // Used to send sharding state + static BSONField<string> shardNameField; + static BSONField<string> configField; + + + bool run(OperationContext* txn, + const string& dbname, + BSONObj& cmdObj, + int, + string& errmsg, + BSONObjBuilder& result) { + vector<BSONObj> bounds; + if (!FieldParser::extract(cmdObj, boundsField, &bounds, &errmsg)) { + return false; + } + + if (bounds.size() == 0) { + errmsg = "no bounds were specified"; + return false; + } - virtual void help(stringstream& h) const { - h << "Merge Chunks command\n" - << "usage: { mergeChunks : <ns>, bounds : [ <min key>, <max key> ] }"; + if (bounds.size() != 2) { + errmsg = "only a min and max bound may be specified"; + return false; } - virtual Status checkAuthForCommand(ClientBasic* client, - const std::string& dbname, - const BSONObj& cmdObj) { - if (!AuthorizationSession::get(client)->isAuthorizedForActionsOnResource( - ResourcePattern::forExactNamespace(NamespaceString(parseNs(dbname, cmdObj))), - ActionType::splitChunk)) { - return Status(ErrorCodes::Unauthorized, "Unauthorized"); - } - return Status::OK(); + BSONObj minKey = bounds[0]; + BSONObj maxKey = bounds[1]; + + if (minKey.isEmpty()) { + errmsg = "no min key specified"; + return false; } - virtual std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const { - return parseNsFullyQualified(dbname, cmdObj); + if (maxKey.isEmpty()) { + errmsg = "no max key specified"; + return false; } - virtual bool adminOnly() const { return true; } - virtual bool slaveOk() const { return false; } - virtual bool isWriteCommandForConfigServer() const { return false; } - - // Required - static BSONField<string> nsField; - static BSONField<vector<BSONObj> > boundsField; - - // Used to send sharding state - static BSONField<string> shardNameField; - static BSONField<string> configField; - - - bool run(OperationContext* txn, const string& dbname, - BSONObj& cmdObj, - int, - string& errmsg, - BSONObjBuilder& result) { - - vector<BSONObj> bounds; - if ( !FieldParser::extract( cmdObj, boundsField, &bounds, &errmsg ) ) { - return false; - } - - if ( bounds.size() == 0 ) { - errmsg = "no bounds were specified"; - return false; - } - - if ( bounds.size() != 2 ) { - errmsg = "only a min and max bound may be specified"; - return false; - } - - BSONObj minKey = bounds[0]; - BSONObj maxKey = bounds[1]; - - if ( minKey.isEmpty() ) { - errmsg = "no min key specified"; - return false; - } - - if ( maxKey.isEmpty() ) { - errmsg = "no max key specified"; - return false; - } - - const NamespaceString nss(parseNs(dbname, cmdObj)); - if (nss.size() == 0) { - return appendCommandStatus(result, Status(ErrorCodes::InvalidNamespace, - "no namespace specified")); - } - - auto status = grid.catalogCache()->getDatabase(nss.db().toString()); - if (!status.isOK()) { - return appendCommandStatus(result, status.getStatus()); - } - - std::shared_ptr<DBConfig> config = status.getValue(); - if (!config->isSharded(nss.ns())) { - return appendCommandStatus(result, Status(ErrorCodes::NamespaceNotSharded, - "ns [" + nss.ns() + " is not sharded.")); - } - - // This refreshes the chunk metadata if stale. - ChunkManagerPtr manager = config->getChunkManagerIfExists(nss, true); - if (!manager) { - return appendCommandStatus(result, Status(ErrorCodes::NamespaceNotSharded, - "ns [" + nss.ns() + " is not sharded.")); - } - - if (!manager->getShardKeyPattern().isShardKey(minKey) - || !manager->getShardKeyPattern().isShardKey(maxKey)) { - errmsg = stream() << "shard key bounds " << "[" << minKey << "," << maxKey << ")" - << " are not valid for shard key pattern " - << manager->getShardKeyPattern().toBSON(); - return false; - } - - minKey = manager->getShardKeyPattern().normalizeShardKey(minKey); - maxKey = manager->getShardKeyPattern().normalizeShardKey(maxKey); - - ChunkPtr firstChunk = manager->findIntersectingChunk(minKey); - verify(firstChunk); - - BSONObjBuilder remoteCmdObjB; - remoteCmdObjB.append( cmdObj[ ClusterMergeChunksCommand::nsField() ] ); - remoteCmdObjB.append( cmdObj[ ClusterMergeChunksCommand::boundsField() ] ); - remoteCmdObjB.append( ClusterMergeChunksCommand::configField(), - grid.catalogManager()->connectionString().toString() ); - remoteCmdObjB.append( ClusterMergeChunksCommand::shardNameField(), - firstChunk->getShardId() ); - - BSONObj remoteResult; - - // Throws, but handled at level above. Don't want to rewrap to preserve exception - // formatting. - const auto shard = grid.shardRegistry()->getShard(firstChunk->getShardId()); - if (!shard) { - return appendCommandStatus(result, - Status(ErrorCodes::ShardNotFound, - str::stream() << "Can't find shard for chunk: " - << firstChunk->toString())); - } - - ScopedDbConnection conn(shard->getConnString()); - bool ok = conn->runCommand( "admin", remoteCmdObjB.obj(), remoteResult ); - conn.done(); - - result.appendElements( remoteResult ); - return ok; + const NamespaceString nss(parseNs(dbname, cmdObj)); + if (nss.size() == 0) { + return appendCommandStatus( + result, Status(ErrorCodes::InvalidNamespace, "no namespace specified")); } - } clusterMergeChunksCommand; + auto status = grid.catalogCache()->getDatabase(nss.db().toString()); + if (!status.isOK()) { + return appendCommandStatus(result, status.getStatus()); + } + + std::shared_ptr<DBConfig> config = status.getValue(); + if (!config->isSharded(nss.ns())) { + return appendCommandStatus( + result, + Status(ErrorCodes::NamespaceNotSharded, "ns [" + nss.ns() + " is not sharded.")); + } + + // This refreshes the chunk metadata if stale. + ChunkManagerPtr manager = config->getChunkManagerIfExists(nss, true); + if (!manager) { + return appendCommandStatus( + result, + Status(ErrorCodes::NamespaceNotSharded, "ns [" + nss.ns() + " is not sharded.")); + } + + if (!manager->getShardKeyPattern().isShardKey(minKey) || + !manager->getShardKeyPattern().isShardKey(maxKey)) { + errmsg = stream() << "shard key bounds " + << "[" << minKey << "," << maxKey << ")" + << " are not valid for shard key pattern " + << manager->getShardKeyPattern().toBSON(); + return false; + } + + minKey = manager->getShardKeyPattern().normalizeShardKey(minKey); + maxKey = manager->getShardKeyPattern().normalizeShardKey(maxKey); + + ChunkPtr firstChunk = manager->findIntersectingChunk(minKey); + verify(firstChunk); + + BSONObjBuilder remoteCmdObjB; + remoteCmdObjB.append(cmdObj[ClusterMergeChunksCommand::nsField()]); + remoteCmdObjB.append(cmdObj[ClusterMergeChunksCommand::boundsField()]); + remoteCmdObjB.append(ClusterMergeChunksCommand::configField(), + grid.catalogManager()->connectionString().toString()); + remoteCmdObjB.append(ClusterMergeChunksCommand::shardNameField(), firstChunk->getShardId()); + + BSONObj remoteResult; + + // Throws, but handled at level above. Don't want to rewrap to preserve exception + // formatting. + const auto shard = grid.shardRegistry()->getShard(firstChunk->getShardId()); + if (!shard) { + return appendCommandStatus( + result, + Status(ErrorCodes::ShardNotFound, + str::stream() << "Can't find shard for chunk: " << firstChunk->toString())); + } + + ScopedDbConnection conn(shard->getConnString()); + bool ok = conn->runCommand("admin", remoteCmdObjB.obj(), remoteResult); + conn.done(); + + result.appendElements(remoteResult); + return ok; + } + +} clusterMergeChunksCommand; - BSONField<string> ClusterMergeChunksCommand::nsField( "mergeChunks" ); - BSONField<vector<BSONObj> > ClusterMergeChunksCommand::boundsField( "bounds" ); +BSONField<string> ClusterMergeChunksCommand::nsField("mergeChunks"); +BSONField<vector<BSONObj>> ClusterMergeChunksCommand::boundsField("bounds"); - BSONField<string> ClusterMergeChunksCommand::configField( "config" ); - BSONField<string> ClusterMergeChunksCommand::shardNameField( "shardName" ); +BSONField<string> ClusterMergeChunksCommand::configField("config"); +BSONField<string> ClusterMergeChunksCommand::shardNameField("shardName"); -} // namespace -} // namespace mongo +} // namespace +} // namespace mongo |