summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/s/catalog/catalog_manager.h38
-rw-r--r--src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp142
-rw-r--r--src/mongo/s/catalog/legacy/catalog_manager_legacy.h9
-rw-r--r--src/mongo/s/commands/cluster_remove_shard_cmd.cpp190
4 files changed, 232 insertions, 147 deletions
diff --git a/src/mongo/s/catalog/catalog_manager.h b/src/mongo/s/catalog/catalog_manager.h
index 1c682d70427..96274c0bcbb 100644
--- a/src/mongo/s/catalog/catalog_manager.h
+++ b/src/mongo/s/catalog/catalog_manager.h
@@ -30,6 +30,7 @@
#include <boost/shared_ptr.hpp>
#include <string>
+#include <vector>
#include "mongo/base/disallow_copying.h"
@@ -38,13 +39,23 @@ namespace mongo {
class BatchedCommandRequest;
class BatchedCommandResponse;
class BSONObj;
+ class ChunkType;
class ConnectionString;
class DatabaseType;
class OperationContext;
class Status;
template<typename T> class StatusWith;
-
+ /**
+ * Used to indicate to the caller of the removeShard method whether draining of chunks for
+ * a particular shard has started, is ongoing, or has been completed.
+ */
+ enum ShardDrainingStatus {
+ STARTED,
+ ONGOING,
+ COMPLETED,
+ };
+
/**
* Abstracts reads and writes of the sharding catalog metadata.
*
@@ -86,6 +97,17 @@ namespace mongo {
const long long maxSize) = 0;
/**
+ * Tries to remove a shard. To completely remove a shard from a sharded cluster,
+ * the data residing in that shard must be moved to the remaining shards in the
+ * cluster by "draining" chunks from that shard.
+ *
+ * Because of the asynchronous nature of the draining mechanism, this method returns
+ * the current draining status. See ShardDrainingStatus enum definition for more details.
+ */
+ virtual StatusWith<ShardDrainingStatus> removeShard(OperationContext* txn,
+ const std::string& name) = 0;
+
+ /**
* Updates the metadata for a given database. Currently, if the specified DB entry does
* not exist, it will be created.
*/
@@ -97,6 +119,20 @@ namespace mongo {
virtual StatusWith<DatabaseType> getDatabase(const std::string& dbName) = 0;
/**
+ * Retrieves all databases for a shard.
+ * Returns a !OK status if an error occurs.
+ */
+ virtual void getDatabasesForShard(const std::string& shardName,
+ std::vector<std::string>* dbs) = 0;
+
+ /**
+ * Gets all chunks (of type ChunkType) for a shard.
+ * Returns a !OK status if an error occurs.
+ */
+ virtual Status getChunksForShard(const std::string& shardName,
+ std::vector<ChunkType>* chunks) = 0;
+
+ /**
* Logs a diagnostic event locally and on the config server.
*
* NOTE: This method is best effort so it should never throw.
diff --git a/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp b/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp
index 81b92f7824a..48774e3ca0d 100644
--- a/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp
+++ b/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp
@@ -39,6 +39,7 @@
#include "mongo/client/connpool.h"
#include "mongo/client/replica_set_monitor.h"
+#include "mongo/db/audit.h"
#include "mongo/db/client.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/server_options.h"
@@ -46,7 +47,10 @@
#include "mongo/s/catalog/legacy/config_coordinator.h"
#include "mongo/s/catalog/type_changelog.h"
#include "mongo/s/client/dbclient_multi_command.h"
+#include "mongo/s/client/shard_connection.h"
#include "mongo/s/shard.h"
+#include "mongo/s/type_chunk.h"
+#include "mongo/s/type_shard.h"
#include "mongo/s/type_database.h"
#include "mongo/s/type_shard.h"
#include "mongo/s/write_ops/batched_command_request.h"
@@ -237,6 +241,14 @@ namespace {
return dbNames;
}
+ BSONObj buildRemoveLogEntry(const string& shardName, bool isDraining) {
+ BSONObjBuilder details;
+ details.append("shard", shardName);
+ details.append("isDraining", isDraining);
+
+ return details.obj();
+ }
+
// Whether the logChange call should attempt to create the changelog collection
AtomicInt32 changeLogCollectionCreated(0);
@@ -515,6 +527,100 @@ namespace {
return shardName;
}
+ StatusWith<ShardDrainingStatus> CatalogManagerLegacy::removeShard(OperationContext* txn,
+ const std::string& name) {
+ ScopedDbConnection conn(_configServerConnectionString, 30);
+
+ if (conn->count(ShardType::ConfigNS,
+ BSON(ShardType::name() << NE << name
+ << ShardType::draining(true)))) {
+ conn.done();
+ return Status(ErrorCodes::ConflictingOperationInProgress,
+ "Can't have more than one draining shard at a time");
+ }
+
+ if (conn->count(ShardType::ConfigNS,
+ BSON(ShardType::name() << NE << name)) == 0) {
+ conn.done();
+ return Status(ErrorCodes::IllegalOperation,
+ "Can't remove last shard");
+ }
+
+ BSONObj searchDoc = BSON(ShardType::name() << name);
+
+ // Case 1: start draining chunks
+ BSONObj drainingDoc =
+ BSON(ShardType::name() << name << ShardType::draining(true));
+ BSONObj shardDoc = conn->findOne(ShardType::ConfigNS, drainingDoc);
+ if (shardDoc.isEmpty()) {
+ log() << "going to start draining shard: " << name;
+ BSONObj newStatus = BSON("$set" << BSON(ShardType::draining(true)));
+
+ Status status = update(ShardType::ConfigNS, searchDoc, newStatus, false, false, NULL);
+ if (!status.isOK()) {
+ log() << "error starting removeShard: " << name
+ << "; err: " << status.reason();
+ return status;
+ }
+
+ BSONObj primaryLocalDoc = BSON(DatabaseType::name("local") <<
+ DatabaseType::primary(name));
+ log() << "primaryLocalDoc: " << primaryLocalDoc;
+ if (conn->count(DatabaseType::ConfigNS, primaryLocalDoc)) {
+ log() << "This shard is listed as primary of local db. Removing entry.";
+
+ Status status = remove(DatabaseType::ConfigNS,
+ BSON(DatabaseType::name("local")),
+ 0,
+ NULL);
+ if (!status.isOK()) {
+ log() << "error removing local db: "
+ << status.reason();
+ return status;
+ }
+ }
+
+ Shard::reloadShardInfo();
+ conn.done();
+
+ // Record start in changelog
+ logChange(txn, "removeShard.start", "", buildRemoveLogEntry(name, true));
+ return ShardDrainingStatus::STARTED;
+ }
+
+ // Case 2: all chunks drained
+ BSONObj shardIDDoc = BSON(ChunkType::shard(shardDoc[ShardType::name()].str()));
+ long long chunkCount = conn->count(ChunkType::ConfigNS, shardIDDoc);
+ long long dbCount = conn->count(DatabaseType::ConfigNS,
+ BSON(DatabaseType::name.ne("local")
+ << DatabaseType::primary(name)));
+ if (chunkCount == 0 && dbCount == 0) {
+ log() << "going to remove shard: " << name;
+ audit::logRemoveShard(ClientBasic::getCurrent(), name);
+
+ Status status = remove(ShardType::ConfigNS, searchDoc, 0, NULL);
+ if (!status.isOK()) {
+ log() << "Error concluding removeShard operation on: " << name
+ << "; err: " << status.reason();
+ return status;
+ }
+
+ Shard::removeShard(name);
+ shardConnectionPool.removeHost(name);
+ ReplicaSetMonitor::remove(name, true);
+
+ Shard::reloadShardInfo();
+ conn.done();
+
+ // Record finish in changelog
+ logChange(txn, "removeShard", "", buildRemoveLogEntry(name, false));
+ return ShardDrainingStatus::COMPLETED;
+ }
+
+ // case 3: draining ongoing
+ return ShardDrainingStatus::ONGOING;
+ }
+
Status CatalogManagerLegacy::updateDatabase(const std::string& dbName, const DatabaseType& db) {
fassert(28616, db.validate());
@@ -597,6 +703,42 @@ namespace {
}
}
+ void CatalogManagerLegacy::getDatabasesForShard(const string& shardName,
+ vector<string>* dbs) {
+ ScopedDbConnection conn(_configServerConnectionString, 30.0);
+ BSONObj prim = BSON(DatabaseType::primary(shardName));
+ boost::scoped_ptr<DBClientCursor> cursor(conn->query(DatabaseType::ConfigNS, prim));
+
+ while (cursor->more()) {
+ BSONObj shard = cursor->nextSafe();
+ dbs->push_back(shard[DatabaseType::name()].str());
+ }
+
+ conn.done();
+ }
+
+ Status CatalogManagerLegacy::getChunksForShard(const string& shardName,
+ vector<ChunkType>* chunks) {
+ ScopedDbConnection conn(_configServerConnectionString, 30.0);
+ boost::scoped_ptr<DBClientCursor> cursor(conn->query(ChunkType::ConfigNS,
+ BSON(ChunkType::shard(shardName))));
+ while (cursor->more()) {
+ BSONObj chunkObj = cursor->nextSafe();
+
+ StatusWith<ChunkType> chunkRes = ChunkType::fromBSON(chunkObj);
+ if (!chunkRes.isOK()) {
+ return Status(ErrorCodes::FailedToParse,
+ str::stream() << "Failed to parse chunk BSONObj: "
+ << chunkRes.getStatus().reason());
+ }
+ ChunkType chunk = chunkRes.getValue();
+ chunks->push_back(chunk);
+ }
+ conn.done();
+
+ return Status::OK();
+ }
+
void CatalogManagerLegacy::writeConfigServerDirect(const BatchedCommandRequest& request,
BatchedCommandResponse* response) {
diff --git a/src/mongo/s/catalog/legacy/catalog_manager_legacy.h b/src/mongo/s/catalog/legacy/catalog_manager_legacy.h
index f017ad60111..016c68c503b 100644
--- a/src/mongo/s/catalog/legacy/catalog_manager_legacy.h
+++ b/src/mongo/s/catalog/legacy/catalog_manager_legacy.h
@@ -56,10 +56,19 @@ namespace mongo {
const ConnectionString& shardConnectionString,
const long long maxSize);
+ virtual StatusWith<ShardDrainingStatus> removeShard(OperationContext* txn,
+ const std::string& name);
+
virtual Status updateDatabase(const std::string& dbName, const DatabaseType& db);
virtual StatusWith<DatabaseType> getDatabase(const std::string& dbName);
+ virtual void getDatabasesForShard(const std::string& shardName,
+ std::vector<std::string>* dbs);
+
+ virtual Status getChunksForShard(const std::string& shardName,
+ std::vector<ChunkType>* chunks);
+
virtual void logChange(OperationContext* txn,
const std::string& what,
const std::string& ns,
diff --git a/src/mongo/s/commands/cluster_remove_shard_cmd.cpp b/src/mongo/s/commands/cluster_remove_shard_cmd.cpp
index 626aa4d5867..ac75bf7483b 100644
--- a/src/mongo/s/commands/cluster_remove_shard_cmd.cpp
+++ b/src/mongo/s/commands/cluster_remove_shard_cmd.cpp
@@ -31,16 +31,13 @@
#include "mongo/platform/basic.h"
#include <string>
+#include <vector>
#include "mongo/client/connpool.h"
-#include "mongo/client/replica_set_monitor.h"
-#include "mongo/db/audit.h"
#include "mongo/db/commands.h"
#include "mongo/db/operation_context.h"
#include "mongo/s/catalog/catalog_manager.h"
-#include "mongo/s/client/shard_connection.h"
#include "mongo/s/cluster_write.h"
-#include "mongo/s/config.h"
#include "mongo/s/grid.h"
#include "mongo/s/shard.h"
#include "mongo/s/type_chunk.h"
@@ -51,18 +48,10 @@
namespace mongo {
using std::string;
+ using std::vector;
namespace {
- BSONObj buildRemoveLogEntry(Shard s, bool isDraining) {
- BSONObjBuilder details;
- details.append("shard", s.getName());
- details.append("isDraining", isDraining);
-
- return details.obj();
- }
-
-
class RemoveShardCmd : public Command {
public:
RemoveShardCmd() : Command("removeShard", false, "removeshard") { }
@@ -112,159 +101,68 @@ namespace {
Status(ErrorCodes::ShardNotFound, msg));
}
- ScopedDbConnection conn(configServer.getPrimary().getConnString(), 30);
-
- if (conn->count(ShardType::ConfigNS,
- BSON(ShardType::name() << NE << s.getName()
- << ShardType::draining(true)))) {
- conn.done();
- errmsg = "Can't have more than one draining shard at a time";
- return false;
+ StatusWith<ShardDrainingStatus> removeShardResult =
+ grid.catalogManager()->removeShard(txn, s.getName());
+ if (!removeShardResult.isOK()) {
+ return appendCommandStatus(result, removeShardResult.getStatus());
}
- if (conn->count(ShardType::ConfigNS,
- BSON(ShardType::name() << NE << s.getName())) == 0) {
- conn.done();
- errmsg = "Can't remove last shard";
- return false;
- }
+ vector<string> databases;
+ grid.catalogManager()->getDatabasesForShard(s.getName(), &databases);
- BSONObj primaryDoc =
- BSON(DatabaseType::name.ne("local") << DatabaseType::primary(s.getName()));
-
- BSONObj dbInfo; // appended at end of result on success
+ // Get BSONObj containing:
+ // 1) note about moving or dropping databases in a shard
+ // 2) list of databases (excluding 'local' database) that need to be moved
+ BSONObj dbInfo;
{
- boost::scoped_ptr<DBClientCursor> cursor(conn->query(DatabaseType::ConfigNS,
- primaryDoc));
- if (cursor->more()) {
- // Skip block and allocations if empty
- BSONObjBuilder dbInfoBuilder;
- dbInfoBuilder.append("note",
- "you need to drop or movePrimary these databases");
-
- BSONArrayBuilder dbs(dbInfoBuilder.subarrayStart("dbsToMove"));
- while (cursor->more()){
- BSONObj db = cursor->nextSafe();
- dbs.append(db[DatabaseType::name()]);
+ BSONObjBuilder dbInfoBuilder;
+ dbInfoBuilder.append("note",
+ "you need to drop or movePrimary these databases");
+ BSONArrayBuilder dbs(dbInfoBuilder.subarrayStart("dbsToMove"));
+ for (vector<string>::const_iterator it = databases.begin();
+ it != databases.end();
+ it++) {
+ if (*it != "local") {
+ dbs.append(*it);
}
- dbs.doneFast();
-
- dbInfo = dbInfoBuilder.obj();
}
+ dbs.doneFast();
+ dbInfo = dbInfoBuilder.obj();
}
- // If the server is not yet draining chunks, put it in draining mode.
- BSONObj searchDoc = BSON(ShardType::name() << s.getName());
- BSONObj drainingDoc =
- BSON(ShardType::name() << s.getName() << ShardType::draining(true));
-
- BSONObj shardDoc = conn->findOne(ShardType::ConfigNS, drainingDoc);
- if (shardDoc.isEmpty()) {
- log() << "going to start draining shard: " << s.getName();
- BSONObj newStatus = BSON("$set" << BSON(ShardType::draining(true)));
-
- Status status = grid.catalogManager()->update(ShardType::ConfigNS,
- searchDoc,
- newStatus,
- false,
- false,
- NULL);
- if (!status.isOK()) {
- errmsg = status.reason();
- log() << "error starting remove shard: " << s.getName()
- << " err: " << errmsg;
- return false;
- }
-
- BSONObj primaryLocalDoc = BSON(DatabaseType::name("local") <<
- DatabaseType::primary(s.getName()));
- PRINT(primaryLocalDoc);
-
- if (conn->count(DatabaseType::ConfigNS, primaryLocalDoc)) {
- log() << "This shard is listed as primary of local db. Removing entry.";
-
- Status status = grid.catalogManager()->remove(
- DatabaseType::ConfigNS,
- BSON(DatabaseType::name("local")),
- 0,
- NULL);
- if (!status.isOK()) {
- log() << "error removing local db: "
- << status.reason();
- return false;
- }
- }
-
- Shard::reloadShardInfo();
-
+ // TODO: Standardize/Seperate how we append to the result object
+ switch (removeShardResult.getValue()) {
+ case ShardDrainingStatus::STARTED:
result.append("msg", "draining started successfully");
result.append("state", "started");
result.append("shard", s.getName());
result.appendElements(dbInfo);
-
- conn.done();
-
- // Record start in changelog
- grid.catalogManager()->logChange(txn,
- "removeShard.start",
- "",
- buildRemoveLogEntry(s, true));
- return true;
- }
-
- // If the server has been completely drained, remove it from the ConfigDB. Check not
- // only for chunks but also databases.
- BSONObj shardIDDoc = BSON(ChunkType::shard(shardDoc[ShardType::name()].str()));
- long long chunkCount = conn->count(ChunkType::ConfigNS, shardIDDoc);
- long long dbCount = conn->count(DatabaseType::ConfigNS, primaryDoc);
-
- if ((chunkCount == 0) && (dbCount == 0)) {
- log() << "going to remove shard: " << s.getName();
- audit::logRemoveShard(ClientBasic::getCurrent(), s.getName());
-
- Status status = grid.catalogManager()->remove(ShardType::ConfigNS,
- searchDoc,
- 0,
- NULL);
+ break;
+ case ShardDrainingStatus::ONGOING: {
+ vector<ChunkType> chunks;
+ Status status = grid.catalogManager()->getChunksForShard(s.getName(), &chunks);
if (!status.isOK()) {
- errmsg = status.reason();
- log() << "error concluding remove shard: " << s.getName()
- << " err: " << errmsg;
- return false;
+ return appendCommandStatus(result, status);
}
- const string shardName = shardDoc[ShardType::name()].str();
- Shard::removeShard(shardName);
- shardConnectionPool.removeHost(shardName);
- ReplicaSetMonitor::remove(shardName, true);
-
- Shard::reloadShardInfo();
-
+ result.append("msg", "draining ongoing");
+ result.append("state", "ongoing");
+ {
+ BSONObjBuilder inner;
+ inner.append("chunks", static_cast<long long>(chunks.size()));
+ inner.append("dbs", static_cast<long long>(databases.size()));
+ BSONObj b = inner.obj();
+ result.append("remaining", b);
+ }
+ result.appendElements(dbInfo);
+ break;
+ }
+ case ShardDrainingStatus::COMPLETED:
result.append("msg", "removeshard completed successfully");
result.append("state", "completed");
result.append("shard", s.getName());
-
- conn.done();
-
- // Record finish in changelog
- grid.catalogManager()->logChange(txn,
- "removeShard",
- "",
- buildRemoveLogEntry(s, false));
- return true;
}
- // If the server is already in draining mode, just report on its progress.
- // Report on databases (not just chunks) that are left too.
- result.append("msg", "draining ongoing");
- result.append("state", "ongoing");
- BSONObjBuilder inner;
- inner.append("chunks", chunkCount);
- inner.append("dbs", dbCount);
- result.append("remaining", inner.obj());
- result.appendElements(dbInfo);
-
- conn.done();
return true;
}