summaryrefslogtreecommitdiff
path: root/src/mongo/s/catalog
diff options
context:
space:
mode:
authorDaniel Alabi <alabidan@gmail.com>2015-03-26 01:28:58 -0400
committerDaniel Alabi <alabidan@gmail.com>2015-04-01 10:39:38 -0400
commitb6347c5d8c988dd9b45e561d787a5c31ce670fea (patch)
tree5b449870930e68dc46aadcb1ac60cb27b7adc2de /src/mongo/s/catalog
parenta18651c7dfe836388120270895d0948cf32d8921 (diff)
downloadmongo-b6347c5d8c988dd9b45e561d787a5c31ce670fea.tar.gz
SERVER-17740 Put removeShard in the catalog manager
Diffstat (limited to 'src/mongo/s/catalog')
-rw-r--r--src/mongo/s/catalog/catalog_manager.h38
-rw-r--r--src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp142
-rw-r--r--src/mongo/s/catalog/legacy/catalog_manager_legacy.h9
3 files changed, 188 insertions, 1 deletions
diff --git a/src/mongo/s/catalog/catalog_manager.h b/src/mongo/s/catalog/catalog_manager.h
index 1c682d70427..96274c0bcbb 100644
--- a/src/mongo/s/catalog/catalog_manager.h
+++ b/src/mongo/s/catalog/catalog_manager.h
@@ -30,6 +30,7 @@
#include <boost/shared_ptr.hpp>
#include <string>
+#include <vector>
#include "mongo/base/disallow_copying.h"
@@ -38,13 +39,23 @@ namespace mongo {
class BatchedCommandRequest;
class BatchedCommandResponse;
class BSONObj;
+ class ChunkType;
class ConnectionString;
class DatabaseType;
class OperationContext;
class Status;
template<typename T> class StatusWith;
-
+ /**
+ * Used to indicate to the caller of the removeShard method whether draining of chunks for
+ * a particular shard has started, is ongoing, or has been completed.
+ */
+ enum ShardDrainingStatus {
+ STARTED,
+ ONGOING,
+ COMPLETED,
+ };
+
/**
* Abstracts reads and writes of the sharding catalog metadata.
*
@@ -86,6 +97,17 @@ namespace mongo {
const long long maxSize) = 0;
/**
+ * Tries to remove a shard. To completely remove a shard from a sharded cluster,
+ * the data residing in that shard must be moved to the remaining shards in the
+ * cluster by "draining" chunks from that shard.
+ *
+ * Because of the asynchronous nature of the draining mechanism, this method returns
+ * the current draining status. See ShardDrainingStatus enum definition for more details.
+ */
+ virtual StatusWith<ShardDrainingStatus> removeShard(OperationContext* txn,
+ const std::string& name) = 0;
+
+ /**
* Updates the metadata for a given database. Currently, if the specified DB entry does
* not exist, it will be created.
*/
@@ -97,6 +119,20 @@ namespace mongo {
virtual StatusWith<DatabaseType> getDatabase(const std::string& dbName) = 0;
/**
+ * Retrieves all databases for a shard.
+ * Returns a !OK status if an error occurs.
+ */
+ virtual void getDatabasesForShard(const std::string& shardName,
+ std::vector<std::string>* dbs) = 0;
+
+ /**
+ * Gets all chunks (of type ChunkType) for a shard.
+ * Returns a !OK status if an error occurs.
+ */
+ virtual Status getChunksForShard(const std::string& shardName,
+ std::vector<ChunkType>* chunks) = 0;
+
+ /**
* Logs a diagnostic event locally and on the config server.
*
* NOTE: This method is best effort so it should never throw.
diff --git a/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp b/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp
index 81b92f7824a..48774e3ca0d 100644
--- a/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp
+++ b/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp
@@ -39,6 +39,7 @@
#include "mongo/client/connpool.h"
#include "mongo/client/replica_set_monitor.h"
+#include "mongo/db/audit.h"
#include "mongo/db/client.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/server_options.h"
@@ -46,7 +47,10 @@
#include "mongo/s/catalog/legacy/config_coordinator.h"
#include "mongo/s/catalog/type_changelog.h"
#include "mongo/s/client/dbclient_multi_command.h"
+#include "mongo/s/client/shard_connection.h"
#include "mongo/s/shard.h"
+#include "mongo/s/type_chunk.h"
+#include "mongo/s/type_shard.h"
#include "mongo/s/type_database.h"
#include "mongo/s/type_shard.h"
#include "mongo/s/write_ops/batched_command_request.h"
@@ -237,6 +241,14 @@ namespace {
return dbNames;
}
+ BSONObj buildRemoveLogEntry(const string& shardName, bool isDraining) {
+ BSONObjBuilder details;
+ details.append("shard", shardName);
+ details.append("isDraining", isDraining);
+
+ return details.obj();
+ }
+
// Whether the logChange call should attempt to create the changelog collection
AtomicInt32 changeLogCollectionCreated(0);
@@ -515,6 +527,100 @@ namespace {
return shardName;
}
+ StatusWith<ShardDrainingStatus> CatalogManagerLegacy::removeShard(OperationContext* txn,
+ const std::string& name) {
+ ScopedDbConnection conn(_configServerConnectionString, 30);
+
+ if (conn->count(ShardType::ConfigNS,
+ BSON(ShardType::name() << NE << name
+ << ShardType::draining(true)))) {
+ conn.done();
+ return Status(ErrorCodes::ConflictingOperationInProgress,
+ "Can't have more than one draining shard at a time");
+ }
+
+ if (conn->count(ShardType::ConfigNS,
+ BSON(ShardType::name() << NE << name)) == 0) {
+ conn.done();
+ return Status(ErrorCodes::IllegalOperation,
+ "Can't remove last shard");
+ }
+
+ BSONObj searchDoc = BSON(ShardType::name() << name);
+
+ // Case 1: start draining chunks
+ BSONObj drainingDoc =
+ BSON(ShardType::name() << name << ShardType::draining(true));
+ BSONObj shardDoc = conn->findOne(ShardType::ConfigNS, drainingDoc);
+ if (shardDoc.isEmpty()) {
+ log() << "going to start draining shard: " << name;
+ BSONObj newStatus = BSON("$set" << BSON(ShardType::draining(true)));
+
+ Status status = update(ShardType::ConfigNS, searchDoc, newStatus, false, false, NULL);
+ if (!status.isOK()) {
+ log() << "error starting removeShard: " << name
+ << "; err: " << status.reason();
+ return status;
+ }
+
+ BSONObj primaryLocalDoc = BSON(DatabaseType::name("local") <<
+ DatabaseType::primary(name));
+ log() << "primaryLocalDoc: " << primaryLocalDoc;
+ if (conn->count(DatabaseType::ConfigNS, primaryLocalDoc)) {
+ log() << "This shard is listed as primary of local db. Removing entry.";
+
+ Status status = remove(DatabaseType::ConfigNS,
+ BSON(DatabaseType::name("local")),
+ 0,
+ NULL);
+ if (!status.isOK()) {
+ log() << "error removing local db: "
+ << status.reason();
+ return status;
+ }
+ }
+
+ Shard::reloadShardInfo();
+ conn.done();
+
+ // Record start in changelog
+ logChange(txn, "removeShard.start", "", buildRemoveLogEntry(name, true));
+ return ShardDrainingStatus::STARTED;
+ }
+
+ // Case 2: all chunks drained
+ BSONObj shardIDDoc = BSON(ChunkType::shard(shardDoc[ShardType::name()].str()));
+ long long chunkCount = conn->count(ChunkType::ConfigNS, shardIDDoc);
+ long long dbCount = conn->count(DatabaseType::ConfigNS,
+ BSON(DatabaseType::name.ne("local")
+ << DatabaseType::primary(name)));
+ if (chunkCount == 0 && dbCount == 0) {
+ log() << "going to remove shard: " << name;
+ audit::logRemoveShard(ClientBasic::getCurrent(), name);
+
+ Status status = remove(ShardType::ConfigNS, searchDoc, 0, NULL);
+ if (!status.isOK()) {
+ log() << "Error concluding removeShard operation on: " << name
+ << "; err: " << status.reason();
+ return status;
+ }
+
+ Shard::removeShard(name);
+ shardConnectionPool.removeHost(name);
+ ReplicaSetMonitor::remove(name, true);
+
+ Shard::reloadShardInfo();
+ conn.done();
+
+ // Record finish in changelog
+ logChange(txn, "removeShard", "", buildRemoveLogEntry(name, false));
+ return ShardDrainingStatus::COMPLETED;
+ }
+
+ // case 3: draining ongoing
+ return ShardDrainingStatus::ONGOING;
+ }
+
Status CatalogManagerLegacy::updateDatabase(const std::string& dbName, const DatabaseType& db) {
fassert(28616, db.validate());
@@ -597,6 +703,42 @@ namespace {
}
}
+ void CatalogManagerLegacy::getDatabasesForShard(const string& shardName,
+ vector<string>* dbs) {
+ ScopedDbConnection conn(_configServerConnectionString, 30.0);
+ BSONObj prim = BSON(DatabaseType::primary(shardName));
+ boost::scoped_ptr<DBClientCursor> cursor(conn->query(DatabaseType::ConfigNS, prim));
+
+ while (cursor->more()) {
+ BSONObj shard = cursor->nextSafe();
+ dbs->push_back(shard[DatabaseType::name()].str());
+ }
+
+ conn.done();
+ }
+
+ Status CatalogManagerLegacy::getChunksForShard(const string& shardName,
+ vector<ChunkType>* chunks) {
+ ScopedDbConnection conn(_configServerConnectionString, 30.0);
+ boost::scoped_ptr<DBClientCursor> cursor(conn->query(ChunkType::ConfigNS,
+ BSON(ChunkType::shard(shardName))));
+ while (cursor->more()) {
+ BSONObj chunkObj = cursor->nextSafe();
+
+ StatusWith<ChunkType> chunkRes = ChunkType::fromBSON(chunkObj);
+ if (!chunkRes.isOK()) {
+ return Status(ErrorCodes::FailedToParse,
+ str::stream() << "Failed to parse chunk BSONObj: "
+ << chunkRes.getStatus().reason());
+ }
+ ChunkType chunk = chunkRes.getValue();
+ chunks->push_back(chunk);
+ }
+ conn.done();
+
+ return Status::OK();
+ }
+
void CatalogManagerLegacy::writeConfigServerDirect(const BatchedCommandRequest& request,
BatchedCommandResponse* response) {
diff --git a/src/mongo/s/catalog/legacy/catalog_manager_legacy.h b/src/mongo/s/catalog/legacy/catalog_manager_legacy.h
index f017ad60111..016c68c503b 100644
--- a/src/mongo/s/catalog/legacy/catalog_manager_legacy.h
+++ b/src/mongo/s/catalog/legacy/catalog_manager_legacy.h
@@ -56,10 +56,19 @@ namespace mongo {
const ConnectionString& shardConnectionString,
const long long maxSize);
+ virtual StatusWith<ShardDrainingStatus> removeShard(OperationContext* txn,
+ const std::string& name);
+
virtual Status updateDatabase(const std::string& dbName, const DatabaseType& db);
virtual StatusWith<DatabaseType> getDatabase(const std::string& dbName);
+ virtual void getDatabasesForShard(const std::string& shardName,
+ std::vector<std::string>* dbs);
+
+ virtual Status getChunksForShard(const std::string& shardName,
+ std::vector<ChunkType>* chunks);
+
virtual void logChange(OperationContext* txn,
const std::string& what,
const std::string& ns,