diff options
author | Daniel Alabi <alabidan@gmail.com> | 2015-03-26 01:28:58 -0400 |
---|---|---|
committer | Daniel Alabi <alabidan@gmail.com> | 2015-04-01 10:39:38 -0400 |
commit | b6347c5d8c988dd9b45e561d787a5c31ce670fea (patch) | |
tree | 5b449870930e68dc46aadcb1ac60cb27b7adc2de /src/mongo/s/catalog | |
parent | a18651c7dfe836388120270895d0948cf32d8921 (diff) | |
download | mongo-b6347c5d8c988dd9b45e561d787a5c31ce670fea.tar.gz |
SERVER-17740 Put removeShard in the catalog manager
Diffstat (limited to 'src/mongo/s/catalog')
-rw-r--r-- | src/mongo/s/catalog/catalog_manager.h | 38 | ||||
-rw-r--r-- | src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp | 142 | ||||
-rw-r--r-- | src/mongo/s/catalog/legacy/catalog_manager_legacy.h | 9 |
3 files changed, 188 insertions, 1 deletions
diff --git a/src/mongo/s/catalog/catalog_manager.h b/src/mongo/s/catalog/catalog_manager.h index 1c682d70427..96274c0bcbb 100644 --- a/src/mongo/s/catalog/catalog_manager.h +++ b/src/mongo/s/catalog/catalog_manager.h @@ -30,6 +30,7 @@ #include <boost/shared_ptr.hpp> #include <string> +#include <vector> #include "mongo/base/disallow_copying.h" @@ -38,13 +39,23 @@ namespace mongo { class BatchedCommandRequest; class BatchedCommandResponse; class BSONObj; + class ChunkType; class ConnectionString; class DatabaseType; class OperationContext; class Status; template<typename T> class StatusWith; - + /** + * Used to indicate to the caller of the removeShard method whether draining of chunks for + * a particular shard has started, is ongoing, or has been completed. + */ + enum ShardDrainingStatus { + STARTED, + ONGOING, + COMPLETED, + }; + /** * Abstracts reads and writes of the sharding catalog metadata. * @@ -86,6 +97,17 @@ namespace mongo { const long long maxSize) = 0; /** + * Tries to remove a shard. To completely remove a shard from a sharded cluster, + * the data residing in that shard must be moved to the remaining shards in the + * cluster by "draining" chunks from that shard. + * + * Because of the asynchronous nature of the draining mechanism, this method returns + * the current draining status. See ShardDrainingStatus enum definition for more details. + */ + virtual StatusWith<ShardDrainingStatus> removeShard(OperationContext* txn, + const std::string& name) = 0; + + /** * Updates the metadata for a given database. Currently, if the specified DB entry does * not exist, it will be created. */ @@ -97,6 +119,20 @@ namespace mongo { virtual StatusWith<DatabaseType> getDatabase(const std::string& dbName) = 0; /** + * Retrieves all databases for a shard. + * Returns a !OK status if an error occurs. + */ + virtual void getDatabasesForShard(const std::string& shardName, + std::vector<std::string>* dbs) = 0; + + /** + * Gets all chunks (of type ChunkType) for a shard. + * Returns a !OK status if an error occurs. + */ + virtual Status getChunksForShard(const std::string& shardName, + std::vector<ChunkType>* chunks) = 0; + + /** * Logs a diagnostic event locally and on the config server. * * NOTE: This method is best effort so it should never throw. diff --git a/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp b/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp index 81b92f7824a..48774e3ca0d 100644 --- a/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp +++ b/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp @@ -39,6 +39,7 @@ #include "mongo/client/connpool.h" #include "mongo/client/replica_set_monitor.h" +#include "mongo/db/audit.h" #include "mongo/db/client.h" #include "mongo/db/operation_context.h" #include "mongo/db/server_options.h" @@ -46,7 +47,10 @@ #include "mongo/s/catalog/legacy/config_coordinator.h" #include "mongo/s/catalog/type_changelog.h" #include "mongo/s/client/dbclient_multi_command.h" +#include "mongo/s/client/shard_connection.h" #include "mongo/s/shard.h" +#include "mongo/s/type_chunk.h" +#include "mongo/s/type_shard.h" #include "mongo/s/type_database.h" #include "mongo/s/type_shard.h" #include "mongo/s/write_ops/batched_command_request.h" @@ -237,6 +241,14 @@ namespace { return dbNames; } + BSONObj buildRemoveLogEntry(const string& shardName, bool isDraining) { + BSONObjBuilder details; + details.append("shard", shardName); + details.append("isDraining", isDraining); + + return details.obj(); + } + // Whether the logChange call should attempt to create the changelog collection AtomicInt32 changeLogCollectionCreated(0); @@ -515,6 +527,100 @@ namespace { return shardName; } + StatusWith<ShardDrainingStatus> CatalogManagerLegacy::removeShard(OperationContext* txn, + const std::string& name) { + ScopedDbConnection conn(_configServerConnectionString, 30); + + if (conn->count(ShardType::ConfigNS, + BSON(ShardType::name() << NE << name + << ShardType::draining(true)))) { + conn.done(); + return Status(ErrorCodes::ConflictingOperationInProgress, + "Can't have more than one draining shard at a time"); + } + + if (conn->count(ShardType::ConfigNS, + BSON(ShardType::name() << NE << name)) == 0) { + conn.done(); + return Status(ErrorCodes::IllegalOperation, + "Can't remove last shard"); + } + + BSONObj searchDoc = BSON(ShardType::name() << name); + + // Case 1: start draining chunks + BSONObj drainingDoc = + BSON(ShardType::name() << name << ShardType::draining(true)); + BSONObj shardDoc = conn->findOne(ShardType::ConfigNS, drainingDoc); + if (shardDoc.isEmpty()) { + log() << "going to start draining shard: " << name; + BSONObj newStatus = BSON("$set" << BSON(ShardType::draining(true))); + + Status status = update(ShardType::ConfigNS, searchDoc, newStatus, false, false, NULL); + if (!status.isOK()) { + log() << "error starting removeShard: " << name + << "; err: " << status.reason(); + return status; + } + + BSONObj primaryLocalDoc = BSON(DatabaseType::name("local") << + DatabaseType::primary(name)); + log() << "primaryLocalDoc: " << primaryLocalDoc; + if (conn->count(DatabaseType::ConfigNS, primaryLocalDoc)) { + log() << "This shard is listed as primary of local db. Removing entry."; + + Status status = remove(DatabaseType::ConfigNS, + BSON(DatabaseType::name("local")), + 0, + NULL); + if (!status.isOK()) { + log() << "error removing local db: " + << status.reason(); + return status; + } + } + + Shard::reloadShardInfo(); + conn.done(); + + // Record start in changelog + logChange(txn, "removeShard.start", "", buildRemoveLogEntry(name, true)); + return ShardDrainingStatus::STARTED; + } + + // Case 2: all chunks drained + BSONObj shardIDDoc = BSON(ChunkType::shard(shardDoc[ShardType::name()].str())); + long long chunkCount = conn->count(ChunkType::ConfigNS, shardIDDoc); + long long dbCount = conn->count(DatabaseType::ConfigNS, + BSON(DatabaseType::name.ne("local") + << DatabaseType::primary(name))); + if (chunkCount == 0 && dbCount == 0) { + log() << "going to remove shard: " << name; + audit::logRemoveShard(ClientBasic::getCurrent(), name); + + Status status = remove(ShardType::ConfigNS, searchDoc, 0, NULL); + if (!status.isOK()) { + log() << "Error concluding removeShard operation on: " << name + << "; err: " << status.reason(); + return status; + } + + Shard::removeShard(name); + shardConnectionPool.removeHost(name); + ReplicaSetMonitor::remove(name, true); + + Shard::reloadShardInfo(); + conn.done(); + + // Record finish in changelog + logChange(txn, "removeShard", "", buildRemoveLogEntry(name, false)); + return ShardDrainingStatus::COMPLETED; + } + + // case 3: draining ongoing + return ShardDrainingStatus::ONGOING; + } + Status CatalogManagerLegacy::updateDatabase(const std::string& dbName, const DatabaseType& db) { fassert(28616, db.validate()); @@ -597,6 +703,42 @@ namespace { } } + void CatalogManagerLegacy::getDatabasesForShard(const string& shardName, + vector<string>* dbs) { + ScopedDbConnection conn(_configServerConnectionString, 30.0); + BSONObj prim = BSON(DatabaseType::primary(shardName)); + boost::scoped_ptr<DBClientCursor> cursor(conn->query(DatabaseType::ConfigNS, prim)); + + while (cursor->more()) { + BSONObj shard = cursor->nextSafe(); + dbs->push_back(shard[DatabaseType::name()].str()); + } + + conn.done(); + } + + Status CatalogManagerLegacy::getChunksForShard(const string& shardName, + vector<ChunkType>* chunks) { + ScopedDbConnection conn(_configServerConnectionString, 30.0); + boost::scoped_ptr<DBClientCursor> cursor(conn->query(ChunkType::ConfigNS, + BSON(ChunkType::shard(shardName)))); + while (cursor->more()) { + BSONObj chunkObj = cursor->nextSafe(); + + StatusWith<ChunkType> chunkRes = ChunkType::fromBSON(chunkObj); + if (!chunkRes.isOK()) { + return Status(ErrorCodes::FailedToParse, + str::stream() << "Failed to parse chunk BSONObj: " + << chunkRes.getStatus().reason()); + } + ChunkType chunk = chunkRes.getValue(); + chunks->push_back(chunk); + } + conn.done(); + + return Status::OK(); + } + void CatalogManagerLegacy::writeConfigServerDirect(const BatchedCommandRequest& request, BatchedCommandResponse* response) { diff --git a/src/mongo/s/catalog/legacy/catalog_manager_legacy.h b/src/mongo/s/catalog/legacy/catalog_manager_legacy.h index f017ad60111..016c68c503b 100644 --- a/src/mongo/s/catalog/legacy/catalog_manager_legacy.h +++ b/src/mongo/s/catalog/legacy/catalog_manager_legacy.h @@ -56,10 +56,19 @@ namespace mongo { const ConnectionString& shardConnectionString, const long long maxSize); + virtual StatusWith<ShardDrainingStatus> removeShard(OperationContext* txn, + const std::string& name); + virtual Status updateDatabase(const std::string& dbName, const DatabaseType& db); virtual StatusWith<DatabaseType> getDatabase(const std::string& dbName); + virtual void getDatabasesForShard(const std::string& shardName, + std::vector<std::string>* dbs); + + virtual Status getChunksForShard(const std::string& shardName, + std::vector<ChunkType>* chunks); + virtual void logChange(OperationContext* txn, const std::string& what, const std::string& ns, |