diff options
Diffstat (limited to 'src/mongo/s/catalog/legacy')
18 files changed, 3775 insertions, 3988 deletions
diff --git a/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp b/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp index ab274c579fb..c25618e31e8 100644 --- a/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp +++ b/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp @@ -80,1657 +80,1570 @@ namespace mongo { - using std::map; - using std::pair; - using std::set; - using std::string; - using std::vector; - using str::stream; +using std::map; +using std::pair; +using std::set; +using std::string; +using std::vector; +using str::stream; namespace { - bool validConfigWC(const BSONObj& writeConcern) { - BSONElement elem(writeConcern["w"]); - if (elem.eoo()) { - return true; - } +bool validConfigWC(const BSONObj& writeConcern) { + BSONElement elem(writeConcern["w"]); + if (elem.eoo()) { + return true; + } - if (elem.isNumber() && elem.numberInt() <= 1) { - return true; - } + if (elem.isNumber() && elem.numberInt() <= 1) { + return true; + } - if (elem.type() == String && elem.str() == "majority") { - return true; - } + if (elem.type() == String && elem.str() == "majority") { + return true; + } - return false; + return false; +} + +void toBatchError(const Status& status, BatchedCommandResponse* response) { + response->clear(); + response->setErrCode(status.code()); + response->setErrMessage(status.reason()); + response->setOk(false); + + dassert(response->isValid(NULL)); +} + +StatusWith<string> isValidShard(const string& name, + const ConnectionString& shardConnectionString, + ScopedDbConnection& conn) { + if (conn->type() == ConnectionString::SYNC) { + return Status(ErrorCodes::BadValue, + "can't use sync cluster as a shard; for a replica set, " + "you have to use <setname>/<server1>,<server2>,..."); } - void toBatchError(const Status& status, BatchedCommandResponse* response) { - response->clear(); - response->setErrCode(status.code()); - response->setErrMessage(status.reason()); - response->setOk(false); + BSONObj resIsMongos; + // (ok == 0) implies that it is a mongos + if (conn->runCommand("admin", BSON("isdbgrid" << 1), resIsMongos)) { + return Status(ErrorCodes::BadValue, "can't add a mongos process as a shard"); + } - dassert(response->isValid(NULL)); + BSONObj resIsMaster; + if (!conn->runCommand("admin", BSON("isMaster" << 1), resIsMaster)) { + return Status(ErrorCodes::OperationFailed, + str::stream() << "failed running isMaster: " << resIsMaster); } - StatusWith<string> isValidShard(const string& name, - const ConnectionString& shardConnectionString, - ScopedDbConnection& conn) { - if (conn->type() == ConnectionString::SYNC) { - return Status(ErrorCodes::BadValue, - "can't use sync cluster as a shard; for a replica set, " - "you have to use <setname>/<server1>,<server2>,..."); - } + // if the shard has only one host, make sure it is not part of a replica set + string setName = resIsMaster["setName"].str(); + string commandSetName = shardConnectionString.getSetName(); + if (commandSetName.empty() && !setName.empty()) { + return Status(ErrorCodes::BadValue, + str::stream() << "host is part of set " << setName << "; " + << "use replica set url format " + << "<setname>/<server1>,<server2>, ..."); + } - BSONObj resIsMongos; - // (ok == 0) implies that it is a mongos - if (conn->runCommand("admin", BSON("isdbgrid" << 1), resIsMongos)) { - return Status(ErrorCodes::BadValue, - "can't add a mongos process as a shard"); - } + if (!commandSetName.empty() && setName.empty()) { + return Status(ErrorCodes::OperationFailed, + str::stream() << "host did not return a set name; " + << "is the replica set still initializing? " << resIsMaster); + } - BSONObj resIsMaster; - if (!conn->runCommand("admin", BSON("isMaster" << 1), resIsMaster)) { - return Status(ErrorCodes::OperationFailed, - str::stream() << "failed running isMaster: " << resIsMaster); - } + // if the shard is part of replica set, make sure it is the right one + if (!commandSetName.empty() && (commandSetName != setName)) { + return Status(ErrorCodes::OperationFailed, + str::stream() << "host is part of a different set: " << setName); + } - // if the shard has only one host, make sure it is not part of a replica set - string setName = resIsMaster["setName"].str(); - string commandSetName = shardConnectionString.getSetName(); - if (commandSetName.empty() && !setName.empty()) { + if (setName.empty()) { + // check this isn't a --configsvr + BSONObj res; + bool ok = conn->runCommand("admin", BSON("replSetGetStatus" << 1), res); + if (!ok && res["info"].type() == String && res["info"].String() == "configsvr") { return Status(ErrorCodes::BadValue, - str::stream() << "host is part of set " << setName << "; " - << "use replica set url format " - << "<setname>/<server1>,<server2>, ..."); - } - - if (!commandSetName.empty() && setName.empty()) { - return Status(ErrorCodes::OperationFailed, - str::stream() << "host did not return a set name; " - << "is the replica set still initializing? " - << resIsMaster); + "the specified mongod is a --configsvr and " + "should thus not be a shard server"); } + } - // if the shard is part of replica set, make sure it is the right one - if (!commandSetName.empty() && (commandSetName != setName)) { - return Status(ErrorCodes::OperationFailed, - str::stream() << "host is part of a different set: " << setName); + // if the shard is part of a replica set, + // make sure all the hosts mentioned in 'shardConnectionString' are part of + // the set. It is fine if not all members of the set are present in 'shardConnectionString'. + bool foundAll = true; + string offendingHost; + if (!commandSetName.empty()) { + set<string> hostSet; + BSONObjIterator iter(resIsMaster["hosts"].Obj()); + while (iter.more()) { + hostSet.insert(iter.next().String()); // host:port + } + if (resIsMaster["passives"].isABSONObj()) { + BSONObjIterator piter(resIsMaster["passives"].Obj()); + while (piter.more()) { + hostSet.insert(piter.next().String()); // host:port + } } - - if (setName.empty()) { - // check this isn't a --configsvr - BSONObj res; - bool ok = conn->runCommand("admin", - BSON("replSetGetStatus" << 1), - res); - if(!ok && res["info"].type() == String && res["info"].String() == "configsvr") { - return Status(ErrorCodes::BadValue, - "the specified mongod is a --configsvr and " - "should thus not be a shard server"); + if (resIsMaster["arbiters"].isABSONObj()) { + BSONObjIterator piter(resIsMaster["arbiters"].Obj()); + while (piter.more()) { + hostSet.insert(piter.next().String()); // host:port } } - // if the shard is part of a replica set, - // make sure all the hosts mentioned in 'shardConnectionString' are part of - // the set. It is fine if not all members of the set are present in 'shardConnectionString'. - bool foundAll = true; - string offendingHost; - if (!commandSetName.empty()) { - set<string> hostSet; - BSONObjIterator iter(resIsMaster["hosts"].Obj()); - while (iter.more()) { - hostSet.insert(iter.next().String()); // host:port - } - if (resIsMaster["passives"].isABSONObj()) { - BSONObjIterator piter(resIsMaster["passives"].Obj()); - while (piter.more()) { - hostSet.insert(piter.next().String()); // host:port - } - } - if (resIsMaster["arbiters"].isABSONObj()) { - BSONObjIterator piter(resIsMaster["arbiters"].Obj()); - while (piter.more()) { - hostSet.insert(piter.next().String()); // host:port - } + vector<HostAndPort> hosts = shardConnectionString.getServers(); + for (size_t i = 0; i < hosts.size(); i++) { + if (!hosts[i].hasPort()) { + hosts[i] = HostAndPort(hosts[i].host(), hosts[i].port()); } - - vector<HostAndPort> hosts = shardConnectionString.getServers(); - for (size_t i = 0; i < hosts.size(); i++) { - if (!hosts[i].hasPort()) { - hosts[i] = HostAndPort(hosts[i].host(), hosts[i].port()); - } - string host = hosts[i].toString(); // host:port - if (hostSet.find(host) == hostSet.end()) { - offendingHost = host; - foundAll = false; - break; - } + string host = hosts[i].toString(); // host:port + if (hostSet.find(host) == hostSet.end()) { + offendingHost = host; + foundAll = false; + break; } } - if (!foundAll) { - return Status(ErrorCodes::OperationFailed, - str::stream() << "in seed list " << shardConnectionString.toString() - << ", host " << offendingHost - << " does not belong to replica set " << setName); - } - - string shardName(name); - // shard name defaults to the name of the replica set - if (name.empty() && !setName.empty()) { - shardName = setName; - } - - // disallow adding shard replica set with name 'config' - if (shardName == "config") { - return Status(ErrorCodes::BadValue, - "use of shard replica set with name 'config' is not allowed"); - } + } + if (!foundAll) { + return Status(ErrorCodes::OperationFailed, + str::stream() << "in seed list " << shardConnectionString.toString() + << ", host " << offendingHost + << " does not belong to replica set " << setName); + } - return shardName; + string shardName(name); + // shard name defaults to the name of the replica set + if (name.empty() && !setName.empty()) { + shardName = setName; } - // In order to be accepted as a new shard, that mongod must not have - // any database name that exists already in any other shards. - // If that test passes, the new shard's databases are going to be entered as - // non-sharded db's whose primary is the newly added shard. - StatusWith<vector<string>> getDBNames(const ConnectionString& shardConnectionString, - ScopedDbConnection& conn) { - vector<string> dbNames; + // disallow adding shard replica set with name 'config' + if (shardName == "config") { + return Status(ErrorCodes::BadValue, + "use of shard replica set with name 'config' is not allowed"); + } - BSONObj resListDB; - if (!conn->runCommand("admin", BSON("listDatabases" << 1), resListDB)) { - return Status(ErrorCodes::OperationFailed, - str::stream() << "failed listing " - << shardConnectionString.toString() - << "'s databases:" << resListDB); - } + return shardName; +} - BSONObjIterator i(resListDB["databases"].Obj()); - while (i.more()) { - BSONObj dbEntry = i.next().Obj(); - const string& dbName = dbEntry["name"].String(); - if (!(dbName == "local" || dbName == "admin" || dbName == "config")) { - dbNames.push_back(dbName); - } - } +// In order to be accepted as a new shard, that mongod must not have +// any database name that exists already in any other shards. +// If that test passes, the new shard's databases are going to be entered as +// non-sharded db's whose primary is the newly added shard. +StatusWith<vector<string>> getDBNames(const ConnectionString& shardConnectionString, + ScopedDbConnection& conn) { + vector<string> dbNames; - return dbNames; + BSONObj resListDB; + if (!conn->runCommand("admin", BSON("listDatabases" << 1), resListDB)) { + return Status(ErrorCodes::OperationFailed, + str::stream() << "failed listing " << shardConnectionString.toString() + << "'s databases:" << resListDB); } - BSONObj buildRemoveLogEntry(const string& shardName, bool isDraining) { - BSONObjBuilder details; - details.append("shard", shardName); - details.append("isDraining", isDraining); - - return details.obj(); + BSONObjIterator i(resListDB["databases"].Obj()); + while (i.more()) { + BSONObj dbEntry = i.next().Obj(); + const string& dbName = dbEntry["name"].String(); + if (!(dbName == "local" || dbName == "admin" || dbName == "config")) { + dbNames.push_back(dbName); + } } - // Whether the logChange call should attempt to create the changelog collection - AtomicInt32 changeLogCollectionCreated(0); + return dbNames; +} - // Whether the logAction call should attempt to create the actionlog collection - AtomicInt32 actionLogCollectionCreated(0); +BSONObj buildRemoveLogEntry(const string& shardName, bool isDraining) { + BSONObjBuilder details; + details.append("shard", shardName); + details.append("isDraining", isDraining); -} // namespace + return details.obj(); +} +// Whether the logChange call should attempt to create the changelog collection +AtomicInt32 changeLogCollectionCreated(0); - CatalogManagerLegacy::CatalogManagerLegacy() = default; +// Whether the logAction call should attempt to create the actionlog collection +AtomicInt32 actionLogCollectionCreated(0); - CatalogManagerLegacy::~CatalogManagerLegacy() = default; +} // namespace - Status CatalogManagerLegacy::init(const ConnectionString& configDBCS) { - // Initialization should not happen more than once - invariant(!_configServerConnectionString.isValid()); - invariant(_configServers.empty()); - invariant(configDBCS.isValid()); - - // Extract the hosts in HOST:PORT format - set<HostAndPort> configHostsAndPortsSet; - set<string> configHostsOnly; - std::vector<HostAndPort> configHostAndPorts = configDBCS.getServers(); - for (size_t i = 0; i < configHostAndPorts.size(); i++) { - // Append the default port, if not specified - HostAndPort configHost = configHostAndPorts[i]; - if (!configHost.hasPort()) { - configHost = HostAndPort(configHost.host(), ServerGlobalParams::ConfigServerPort); - } - - // Make sure there are no duplicates - if (!configHostsAndPortsSet.insert(configHost).second) { - StringBuilder sb; - sb << "Host " << configHost.toString() - << " exists twice in the config servers listing."; - return Status(ErrorCodes::InvalidOptions, sb.str()); - } +CatalogManagerLegacy::CatalogManagerLegacy() = default; - configHostsOnly.insert(configHost.host()); - } +CatalogManagerLegacy::~CatalogManagerLegacy() = default; - // Make sure the hosts are reachable - for (set<string>::const_iterator i = configHostsOnly.begin(); - i != configHostsOnly.end(); - i++) { +Status CatalogManagerLegacy::init(const ConnectionString& configDBCS) { + // Initialization should not happen more than once + invariant(!_configServerConnectionString.isValid()); + invariant(_configServers.empty()); + invariant(configDBCS.isValid()); - const string host = *i; + // Extract the hosts in HOST:PORT format + set<HostAndPort> configHostsAndPortsSet; + set<string> configHostsOnly; + std::vector<HostAndPort> configHostAndPorts = configDBCS.getServers(); + for (size_t i = 0; i < configHostAndPorts.size(); i++) { + // Append the default port, if not specified + HostAndPort configHost = configHostAndPorts[i]; + if (!configHost.hasPort()) { + configHost = HostAndPort(configHost.host(), ServerGlobalParams::ConfigServerPort); + } - // If this is a CUSTOM connection string (for testing) don't do DNS resolution - string errMsg; - if (ConnectionString::parse(host, errMsg).type() == ConnectionString::CUSTOM) { - continue; - } + // Make sure there are no duplicates + if (!configHostsAndPortsSet.insert(configHost).second) { + StringBuilder sb; + sb << "Host " << configHost.toString() + << " exists twice in the config servers listing."; - bool ok = false; + return Status(ErrorCodes::InvalidOptions, sb.str()); + } - for (int x = 10; x > 0; x--) { - if (!hostbyname(host.c_str()).empty()) { - ok = true; - break; - } + configHostsOnly.insert(configHost.host()); + } - log() << "can't resolve DNS for [" << host << "] sleeping and trying " - << x << " more times"; - sleepsecs(10); - } + // Make sure the hosts are reachable + for (set<string>::const_iterator i = configHostsOnly.begin(); i != configHostsOnly.end(); i++) { + const string host = *i; - if (!ok) { - return Status(ErrorCodes::HostNotFound, - stream() << "unable to resolve DNS for host " << host); - } + // If this is a CUSTOM connection string (for testing) don't do DNS resolution + string errMsg; + if (ConnectionString::parse(host, errMsg).type() == ConnectionString::CUSTOM) { + continue; } - LOG(1) << " config string : " << configDBCS.toString(); - - // Now that the config hosts are verified, initialize the catalog manager. The code below - // should never fail. + bool ok = false; - _configServerConnectionString = configDBCS; - - if (_configServerConnectionString.type() == ConnectionString::MASTER) { - _configServers.push_back(_configServerConnectionString); - } - else if (_configServerConnectionString.type() == ConnectionString::SYNC || - (_configServerConnectionString.type() == ConnectionString::SET && - _configServerConnectionString.getServers().size() == 1)) { - // TODO(spencer): Remove second part of the above or statement that allows replset - // config server strings once we've separated the legacy catalog manager from the - // CSRS version. - const vector<HostAndPort> configHPs = _configServerConnectionString.getServers(); - for (vector<HostAndPort>::const_iterator it = configHPs.begin(); - it != configHPs.end(); - ++it) { - - _configServers.push_back(ConnectionString(*it)); + for (int x = 10; x > 0; x--) { + if (!hostbyname(host.c_str()).empty()) { + ok = true; + break; } - } - else { - // This is only for tests. - invariant(_configServerConnectionString.type() == ConnectionString::CUSTOM); - _configServers.push_back(_configServerConnectionString); - } - _distLockManager = stdx::make_unique<LegacyDistLockManager>(_configServerConnectionString); - _distLockManager->startUp(); + log() << "can't resolve DNS for [" << host << "] sleeping and trying " << x + << " more times"; + sleepsecs(10); + } - { - stdx::lock_guard<stdx::mutex> lk(_mutex); - _inShutdown = false; - _consistentFromLastCheck = true; + if (!ok) { + return Status(ErrorCodes::HostNotFound, + stream() << "unable to resolve DNS for host " << host); } + } - return Status::OK(); + LOG(1) << " config string : " << configDBCS.toString(); + + // Now that the config hosts are verified, initialize the catalog manager. The code below + // should never fail. + + _configServerConnectionString = configDBCS; + + if (_configServerConnectionString.type() == ConnectionString::MASTER) { + _configServers.push_back(_configServerConnectionString); + } else if (_configServerConnectionString.type() == ConnectionString::SYNC || + (_configServerConnectionString.type() == ConnectionString::SET && + _configServerConnectionString.getServers().size() == 1)) { + // TODO(spencer): Remove second part of the above or statement that allows replset + // config server strings once we've separated the legacy catalog manager from the + // CSRS version. + const vector<HostAndPort> configHPs = _configServerConnectionString.getServers(); + for (vector<HostAndPort>::const_iterator it = configHPs.begin(); it != configHPs.end(); + ++it) { + _configServers.push_back(ConnectionString(*it)); + } + } else { + // This is only for tests. + invariant(_configServerConnectionString.type() == ConnectionString::CUSTOM); + _configServers.push_back(_configServerConnectionString); } - Status CatalogManagerLegacy::startup(bool upgrade) { - Status status = _startConfigServerChecker(); - if (!status.isOK()) { - return status; - } + _distLockManager = stdx::make_unique<LegacyDistLockManager>(_configServerConnectionString); + _distLockManager->startUp(); - status = _checkAndUpgradeConfigMetadata(upgrade); - return status; + { + stdx::lock_guard<stdx::mutex> lk(_mutex); + _inShutdown = false; + _consistentFromLastCheck = true; } - Status CatalogManagerLegacy::_checkAndUpgradeConfigMetadata(bool doUpgrade) { - VersionType initVersionInfo; - VersionType versionInfo; - string errMsg; + return Status::OK(); +} - bool upgraded = checkAndUpgradeConfigVersion(this, - doUpgrade, - &initVersionInfo, - &versionInfo, - &errMsg); - if (!upgraded) { - return Status(ErrorCodes::IncompatibleShardingMetadata, - str::stream() << "error upgrading config database to v" - << CURRENT_CONFIG_VERSION << causedBy(errMsg)); - } - - return Status::OK(); +Status CatalogManagerLegacy::startup(bool upgrade) { + Status status = _startConfigServerChecker(); + if (!status.isOK()) { + return status; } - Status CatalogManagerLegacy::_startConfigServerChecker() { - if (!_checkConfigServersConsistent()) { - return Status(ErrorCodes::ConfigServersInconsistent, - "Data inconsistency detected amongst config servers"); - } + status = _checkAndUpgradeConfigMetadata(upgrade); + return status; +} + +Status CatalogManagerLegacy::_checkAndUpgradeConfigMetadata(bool doUpgrade) { + VersionType initVersionInfo; + VersionType versionInfo; + string errMsg; + + bool upgraded = + checkAndUpgradeConfigVersion(this, doUpgrade, &initVersionInfo, &versionInfo, &errMsg); + if (!upgraded) { + return Status(ErrorCodes::IncompatibleShardingMetadata, + str::stream() << "error upgrading config database to v" + << CURRENT_CONFIG_VERSION << causedBy(errMsg)); + } - stdx::thread t(stdx::bind(&CatalogManagerLegacy::_consistencyChecker, this)); - _consistencyCheckerThread.swap(t); + return Status::OK(); +} - return Status::OK(); +Status CatalogManagerLegacy::_startConfigServerChecker() { + if (!_checkConfigServersConsistent()) { + return Status(ErrorCodes::ConfigServersInconsistent, + "Data inconsistency detected amongst config servers"); } - ConnectionString CatalogManagerLegacy::connectionString() const { - return _configServerConnectionString; - } + stdx::thread t(stdx::bind(&CatalogManagerLegacy::_consistencyChecker, this)); + _consistencyCheckerThread.swap(t); - void CatalogManagerLegacy::shutDown() { - LOG(1) << "CatalogManagerLegacy::shutDown() called."; - { - stdx::lock_guard<stdx::mutex> lk(_mutex); - _inShutdown = true; - _consistencyCheckerCV.notify_one(); - } - _consistencyCheckerThread.join(); + return Status::OK(); +} + +ConnectionString CatalogManagerLegacy::connectionString() const { + return _configServerConnectionString; +} - invariant(_distLockManager); - _distLockManager->shutDown(); +void CatalogManagerLegacy::shutDown() { + LOG(1) << "CatalogManagerLegacy::shutDown() called."; + { + stdx::lock_guard<stdx::mutex> lk(_mutex); + _inShutdown = true; + _consistencyCheckerCV.notify_one(); } + _consistencyCheckerThread.join(); - Status CatalogManagerLegacy::enableSharding(const std::string& dbName) { - invariant(nsIsDbOnly(dbName)); + invariant(_distLockManager); + _distLockManager->shutDown(); +} - DatabaseType db; +Status CatalogManagerLegacy::enableSharding(const std::string& dbName) { + invariant(nsIsDbOnly(dbName)); - // Check for case sensitivity violations - Status status = _checkDbDoesNotExist(dbName); - if (status.isOK()) { - // Database does not exist, create a new entry - const ShardPtr primary = Shard::pick(); - if (primary) { - log() << "Placing [" << dbName << "] on: " << primary->toString(); + DatabaseType db; - db.setName(dbName); - db.setPrimary(primary->getId()); - db.setSharded(true); - } - else { - return Status(ErrorCodes::ShardNotFound, "can't find a shard to put new db on"); - } - } - else if (status.code() == ErrorCodes::NamespaceExists) { - // Database exists, so just update it - StatusWith<DatabaseType> dbStatus = getDatabase(dbName); - if (!dbStatus.isOK()) { - return dbStatus.getStatus(); - } + // Check for case sensitivity violations + Status status = _checkDbDoesNotExist(dbName); + if (status.isOK()) { + // Database does not exist, create a new entry + const ShardPtr primary = Shard::pick(); + if (primary) { + log() << "Placing [" << dbName << "] on: " << primary->toString(); - db = dbStatus.getValue(); + db.setName(dbName); + db.setPrimary(primary->getId()); db.setSharded(true); + } else { + return Status(ErrorCodes::ShardNotFound, "can't find a shard to put new db on"); } - else { - // Some fatal error - return status; + } else if (status.code() == ErrorCodes::NamespaceExists) { + // Database exists, so just update it + StatusWith<DatabaseType> dbStatus = getDatabase(dbName); + if (!dbStatus.isOK()) { + return dbStatus.getStatus(); } - log() << "Enabling sharding for database [" << dbName << "] in config db"; - - return updateDatabase(dbName, db); + db = dbStatus.getValue(); + db.setSharded(true); + } else { + // Some fatal error + return status; } - Status CatalogManagerLegacy::shardCollection(const string& ns, - const ShardKeyPattern& fieldsAndOrder, - bool unique, - vector<BSONObj>* initPoints, - set<ShardId>* initShardIds) { + log() << "Enabling sharding for database [" << dbName << "] in config db"; - StatusWith<DatabaseType> status = getDatabase(nsToDatabase(ns)); - if (!status.isOK()) { - return status.getStatus(); - } + return updateDatabase(dbName, db); +} - DatabaseType dbt = status.getValue(); - ShardId dbPrimaryShardId = dbt.getPrimary(); - - // This is an extra safety check that the collection is not getting sharded concurrently by - // two different mongos instances. It is not 100%-proof, but it reduces the chance that two - // invocations of shard collection will step on each other's toes. - { - ScopedDbConnection conn(_configServerConnectionString, 30); - unsigned long long existingChunks = conn->count(ChunkType::ConfigNS, - BSON(ChunkType::ns(ns))); - if (existingChunks > 0) { - conn.done(); - return Status(ErrorCodes::AlreadyInitialized, - str::stream() << "collection " << ns << " already sharded with " - << existingChunks << " chunks."); - } +Status CatalogManagerLegacy::shardCollection(const string& ns, + const ShardKeyPattern& fieldsAndOrder, + bool unique, + vector<BSONObj>* initPoints, + set<ShardId>* initShardIds) { + StatusWith<DatabaseType> status = getDatabase(nsToDatabase(ns)); + if (!status.isOK()) { + return status.getStatus(); + } + + DatabaseType dbt = status.getValue(); + ShardId dbPrimaryShardId = dbt.getPrimary(); + // This is an extra safety check that the collection is not getting sharded concurrently by + // two different mongos instances. It is not 100%-proof, but it reduces the chance that two + // invocations of shard collection will step on each other's toes. + { + ScopedDbConnection conn(_configServerConnectionString, 30); + unsigned long long existingChunks = + conn->count(ChunkType::ConfigNS, BSON(ChunkType::ns(ns))); + if (existingChunks > 0) { conn.done(); + return Status(ErrorCodes::AlreadyInitialized, + str::stream() << "collection " << ns << " already sharded with " + << existingChunks << " chunks."); } - log() << "enable sharding on: " << ns << " with shard key: " << fieldsAndOrder; + conn.done(); + } - // Record start in changelog - BSONObjBuilder collectionDetail; - collectionDetail.append("shardKey", fieldsAndOrder.toBSON()); - collectionDetail.append("collection", ns); - string dbPrimaryShardStr; - { - const auto shard = grid.shardRegistry()->getShard(dbPrimaryShardId); - dbPrimaryShardStr = shard->toString(); - } - collectionDetail.append("primary", dbPrimaryShardStr); - - BSONArray initialShards; - if (initShardIds == NULL) - initialShards = BSONArray(); - else { - BSONArrayBuilder b; - for (const ShardId& shardId : *initShardIds) { - b.append(shardId); - } - initialShards = b.arr(); + log() << "enable sharding on: " << ns << " with shard key: " << fieldsAndOrder; + + // Record start in changelog + BSONObjBuilder collectionDetail; + collectionDetail.append("shardKey", fieldsAndOrder.toBSON()); + collectionDetail.append("collection", ns); + string dbPrimaryShardStr; + { + const auto shard = grid.shardRegistry()->getShard(dbPrimaryShardId); + dbPrimaryShardStr = shard->toString(); + } + collectionDetail.append("primary", dbPrimaryShardStr); + + BSONArray initialShards; + if (initShardIds == NULL) + initialShards = BSONArray(); + else { + BSONArrayBuilder b; + for (const ShardId& shardId : *initShardIds) { + b.append(shardId); } + initialShards = b.arr(); + } - collectionDetail.append("initShards", initialShards); - collectionDetail.append("numChunks", static_cast<int>(initPoints->size() + 1)); - - logChange(NULL, "shardCollection.start", ns, collectionDetail.obj()); - - ChunkManagerPtr manager(new ChunkManager(ns, fieldsAndOrder, unique)); - manager->createFirstChunks(dbPrimaryShardId, - initPoints, - initShardIds); - manager->loadExistingRanges(nullptr); - - CollectionInfo collInfo; - collInfo.useChunkManager(manager); - collInfo.save(ns); - manager->reload(true); - - // Tell the primary mongod to refresh its data - // TODO: Think the real fix here is for mongos to just - // assume that all collections are sharded, when we get there - for (int i = 0;i < 4;i++) { - if (i == 3) { - warning() << "too many tries updating initial version of " << ns - << " on shard primary " << dbPrimaryShardStr - << ", other mongoses may not see the collection as sharded immediately"; - break; - } + collectionDetail.append("initShards", initialShards); + collectionDetail.append("numChunks", static_cast<int>(initPoints->size() + 1)); - try { - const auto shard = grid.shardRegistry()->getShard(dbPrimaryShardId); - ShardConnection conn(shard->getConnString(), ns); - bool isVersionSet = conn.setVersion(); - conn.done(); - if (!isVersionSet) { - warning() << "could not update initial version of " - << ns << " on shard primary " << dbPrimaryShardStr; - } else { - break; - } - } - catch (const DBException& e) { - warning() << "could not update initial version of " << ns - << " on shard primary " << dbPrimaryShardStr - << causedBy(e); - } + logChange(NULL, "shardCollection.start", ns, collectionDetail.obj()); - sleepsecs(i); - } + ChunkManagerPtr manager(new ChunkManager(ns, fieldsAndOrder, unique)); + manager->createFirstChunks(dbPrimaryShardId, initPoints, initShardIds); + manager->loadExistingRanges(nullptr); - // Record finish in changelog - BSONObjBuilder finishDetail; + CollectionInfo collInfo; + collInfo.useChunkManager(manager); + collInfo.save(ns); + manager->reload(true); - finishDetail.append("version", manager->getVersion().toString()); + // Tell the primary mongod to refresh its data + // TODO: Think the real fix here is for mongos to just + // assume that all collections are sharded, when we get there + for (int i = 0; i < 4; i++) { + if (i == 3) { + warning() << "too many tries updating initial version of " << ns << " on shard primary " + << dbPrimaryShardStr + << ", other mongoses may not see the collection as sharded immediately"; + break; + } - logChange(NULL, "shardCollection", ns, finishDetail.obj()); + try { + const auto shard = grid.shardRegistry()->getShard(dbPrimaryShardId); + ShardConnection conn(shard->getConnString(), ns); + bool isVersionSet = conn.setVersion(); + conn.done(); + if (!isVersionSet) { + warning() << "could not update initial version of " << ns << " on shard primary " + << dbPrimaryShardStr; + } else { + break; + } + } catch (const DBException& e) { + warning() << "could not update initial version of " << ns << " on shard primary " + << dbPrimaryShardStr << causedBy(e); + } - return Status::OK(); + sleepsecs(i); } - Status CatalogManagerLegacy::createDatabase(const std::string& dbName) { - invariant(nsIsDbOnly(dbName)); + // Record finish in changelog + BSONObjBuilder finishDetail; - // The admin and config databases should never be explicitly created. They "just exist", - // i.e. getDatabase will always return an entry for them. - invariant(dbName != "admin"); - invariant(dbName != "config"); + finishDetail.append("version", manager->getVersion().toString()); - // Lock the database globally to prevent conflicts with simultaneous database creation. - auto scopedDistLock = getDistLockManager()->lock(dbName, - "createDatabase", - Seconds{5}, - Milliseconds{500}); - if (!scopedDistLock.isOK()) { - return scopedDistLock.getStatus(); - } - - // Check for case sensitivity violations - auto status = _checkDbDoesNotExist(dbName); - if (!status.isOK()) { - return status; - } + logChange(NULL, "shardCollection", ns, finishDetail.obj()); - // Database does not exist, pick a shard and create a new entry - const ShardPtr primaryShard = Shard::pick(); - if (!primaryShard) { - return Status(ErrorCodes::ShardNotFound, "can't find a shard to put new db on"); - } + return Status::OK(); +} - log() << "Placing [" << dbName << "] on: " << primaryShard->toString(); +Status CatalogManagerLegacy::createDatabase(const std::string& dbName) { + invariant(nsIsDbOnly(dbName)); - DatabaseType db; - db.setName(dbName); - db.setPrimary(primaryShard->getId()); - db.setSharded(false); + // The admin and config databases should never be explicitly created. They "just exist", + // i.e. getDatabase will always return an entry for them. + invariant(dbName != "admin"); + invariant(dbName != "config"); - BatchedCommandResponse response; - status = insert(DatabaseType::ConfigNS, db.toBSON(), &response); - if (status.isOK()) { - return status; - } + // Lock the database globally to prevent conflicts with simultaneous database creation. + auto scopedDistLock = + getDistLockManager()->lock(dbName, "createDatabase", Seconds{5}, Milliseconds{500}); + if (!scopedDistLock.isOK()) { + return scopedDistLock.getStatus(); + } - if (status.code() == ErrorCodes::DuplicateKey) { - return Status(ErrorCodes::NamespaceExists, "database " + dbName + " already exists"); - } + // Check for case sensitivity violations + auto status = _checkDbDoesNotExist(dbName); + if (!status.isOK()) { + return status; + } - return Status(status.code(), str::stream() << "database metadata write failed for " - << dbName << ". Error: " << response.toBSON()); + // Database does not exist, pick a shard and create a new entry + const ShardPtr primaryShard = Shard::pick(); + if (!primaryShard) { + return Status(ErrorCodes::ShardNotFound, "can't find a shard to put new db on"); } - StatusWith<string> CatalogManagerLegacy::addShard(const string& name, - const ConnectionString& shardConnectionString, - const long long maxSize) { + log() << "Placing [" << dbName << "] on: " << primaryShard->toString(); - string shardName; - ReplicaSetMonitorPtr rsMonitor; - vector<string> dbNames; + DatabaseType db; + db.setName(dbName); + db.setPrimary(primaryShard->getId()); + db.setSharded(false); - try { - ScopedDbConnection newShardConn(shardConnectionString); - newShardConn->getLastError(); - - StatusWith<string> validShard = isValidShard(name, - shardConnectionString, - newShardConn); - if (!validShard.isOK()) { - newShardConn.done(); - return validShard.getStatus(); - } - shardName = validShard.getValue(); + BatchedCommandResponse response; + status = insert(DatabaseType::ConfigNS, db.toBSON(), &response); + if (status.isOK()) { + return status; + } - StatusWith<vector<string>> shardDBNames = getDBNames(shardConnectionString, - newShardConn); - if (!shardDBNames.isOK()) { - newShardConn.done(); - return shardDBNames.getStatus(); - } - dbNames = shardDBNames.getValue(); + if (status.code() == ErrorCodes::DuplicateKey) { + return Status(ErrorCodes::NamespaceExists, "database " + dbName + " already exists"); + } - if (newShardConn->type() == ConnectionString::SET) { - rsMonitor = ReplicaSetMonitor::get(shardConnectionString.getSetName()); - } + return Status(status.code(), + str::stream() << "database metadata write failed for " << dbName + << ". Error: " << response.toBSON()); +} + +StatusWith<string> CatalogManagerLegacy::addShard(const string& name, + const ConnectionString& shardConnectionString, + const long long maxSize) { + string shardName; + ReplicaSetMonitorPtr rsMonitor; + vector<string> dbNames; + try { + ScopedDbConnection newShardConn(shardConnectionString); + newShardConn->getLastError(); + + StatusWith<string> validShard = isValidShard(name, shardConnectionString, newShardConn); + if (!validShard.isOK()) { newShardConn.done(); + return validShard.getStatus(); } - catch (const DBException& e) { - if (shardConnectionString.type() == ConnectionString::SET) { - shardConnectionPool.removeHost(shardConnectionString.getSetName()); - ReplicaSetMonitor::remove(shardConnectionString.getSetName()); - } + shardName = validShard.getValue(); - return Status(ErrorCodes::OperationFailed, - str::stream() << "couldn't connect to new shard " - << e.what()); + StatusWith<vector<string>> shardDBNames = getDBNames(shardConnectionString, newShardConn); + if (!shardDBNames.isOK()) { + newShardConn.done(); + return shardDBNames.getStatus(); } + dbNames = shardDBNames.getValue(); - // check that none of the existing shard candidate's db's exist elsewhere - for (vector<string>::const_iterator it = dbNames.begin(); it != dbNames.end(); ++it) { - StatusWith<DatabaseType> dbt = getDatabase(*it); - if (dbt.isOK()) { - return Status(ErrorCodes::OperationFailed, - str::stream() << "can't add shard " - << "'" << shardConnectionString.toString() << "'" - << " because a local database '" << *it - << "' exists in another " - << dbt.getValue().getPrimary()); - } + if (newShardConn->type() == ConnectionString::SET) { + rsMonitor = ReplicaSetMonitor::get(shardConnectionString.getSetName()); } - // if a name for a shard wasn't provided, pick one. - if (shardName.empty()) { - StatusWith<string> result = _getNewShardName(); - if (!result.isOK()) { - return Status(ErrorCodes::OperationFailed, - "error generating new shard name"); - } - shardName = result.getValue(); + newShardConn.done(); + } catch (const DBException& e) { + if (shardConnectionString.type() == ConnectionString::SET) { + shardConnectionPool.removeHost(shardConnectionString.getSetName()); + ReplicaSetMonitor::remove(shardConnectionString.getSetName()); } - // build the ConfigDB shard document - BSONObjBuilder b; - b.append(ShardType::name(), shardName); - b.append(ShardType::host(), - rsMonitor ? rsMonitor->getServerAddress() : shardConnectionString.toString()); - if (maxSize > 0) { - b.append(ShardType::maxSizeMB(), maxSize); - } - BSONObj shardDoc = b.obj(); + return Status(ErrorCodes::OperationFailed, + str::stream() << "couldn't connect to new shard " << e.what()); + } - if (isShardHost(shardConnectionString)) { - return Status(ErrorCodes::OperationFailed, "host already used"); + // check that none of the existing shard candidate's db's exist elsewhere + for (vector<string>::const_iterator it = dbNames.begin(); it != dbNames.end(); ++it) { + StatusWith<DatabaseType> dbt = getDatabase(*it); + if (dbt.isOK()) { + return Status(ErrorCodes::OperationFailed, + str::stream() << "can't add shard " + << "'" << shardConnectionString.toString() << "'" + << " because a local database '" << *it + << "' exists in another " << dbt.getValue().getPrimary()); } + } - log() << "going to add shard: " << shardDoc; - - Status result = insert(ShardType::ConfigNS, shardDoc, NULL); + // if a name for a shard wasn't provided, pick one. + if (shardName.empty()) { + StatusWith<string> result = _getNewShardName(); if (!result.isOK()) { - log() << "error adding shard: " << shardDoc << " err: " << result.reason(); - return result; + return Status(ErrorCodes::OperationFailed, "error generating new shard name"); } + shardName = result.getValue(); + } - Shard::reloadShardInfo(); - - // add all databases of the new shard - for (vector<string>::const_iterator it = dbNames.begin(); it != dbNames.end(); ++it) { - DatabaseType dbt; - dbt.setName(*it); - dbt.setPrimary(shardName); - dbt.setSharded(false); - Status status = updateDatabase(*it, dbt); - if (!status.isOK()) { - log() << "adding shard " << shardConnectionString.toString() - << " even though could not add database " << *it; - } - } + // build the ConfigDB shard document + BSONObjBuilder b; + b.append(ShardType::name(), shardName); + b.append(ShardType::host(), + rsMonitor ? rsMonitor->getServerAddress() : shardConnectionString.toString()); + if (maxSize > 0) { + b.append(ShardType::maxSizeMB(), maxSize); + } + BSONObj shardDoc = b.obj(); - // Record in changelog - BSONObjBuilder shardDetails; - shardDetails.append("name", shardName); - shardDetails.append("host", shardConnectionString.toString()); + if (isShardHost(shardConnectionString)) { + return Status(ErrorCodes::OperationFailed, "host already used"); + } - logChange(NULL, "addShard", "", shardDetails.obj()); + log() << "going to add shard: " << shardDoc; - return shardName; + Status result = insert(ShardType::ConfigNS, shardDoc, NULL); + if (!result.isOK()) { + log() << "error adding shard: " << shardDoc << " err: " << result.reason(); + return result; } - StatusWith<ShardDrainingStatus> CatalogManagerLegacy::removeShard(OperationContext* txn, - const std::string& name) { - ScopedDbConnection conn(_configServerConnectionString, 30); + Shard::reloadShardInfo(); - if (conn->count(ShardType::ConfigNS, - BSON(ShardType::name() << NE << name - << ShardType::draining(true)))) { - conn.done(); - return Status(ErrorCodes::ConflictingOperationInProgress, - "Can't have more than one draining shard at a time"); + // add all databases of the new shard + for (vector<string>::const_iterator it = dbNames.begin(); it != dbNames.end(); ++it) { + DatabaseType dbt; + dbt.setName(*it); + dbt.setPrimary(shardName); + dbt.setSharded(false); + Status status = updateDatabase(*it, dbt); + if (!status.isOK()) { + log() << "adding shard " << shardConnectionString.toString() + << " even though could not add database " << *it; } + } - if (conn->count(ShardType::ConfigNS, - BSON(ShardType::name() << NE << name)) == 0) { - conn.done(); - return Status(ErrorCodes::IllegalOperation, - "Can't remove last shard"); - } + // Record in changelog + BSONObjBuilder shardDetails; + shardDetails.append("name", shardName); + shardDetails.append("host", shardConnectionString.toString()); - BSONObj searchDoc = BSON(ShardType::name() << name); + logChange(NULL, "addShard", "", shardDetails.obj()); - // Case 1: start draining chunks - BSONObj drainingDoc = BSON(ShardType::name() << name << ShardType::draining(true)); - BSONObj shardDoc = conn->findOne(ShardType::ConfigNS, drainingDoc); - if (shardDoc.isEmpty()) { - log() << "going to start draining shard: " << name; - BSONObj newStatus = BSON("$set" << BSON(ShardType::draining(true))); + return shardName; +} - Status status = update(ShardType::ConfigNS, searchDoc, newStatus, false, false, NULL); - if (!status.isOK()) { - log() << "error starting removeShard: " << name - << "; err: " << status.reason(); - return status; - } +StatusWith<ShardDrainingStatus> CatalogManagerLegacy::removeShard(OperationContext* txn, + const std::string& name) { + ScopedDbConnection conn(_configServerConnectionString, 30); - BSONObj primaryLocalDoc = BSON(DatabaseType::name("local") << - DatabaseType::primary(name)); - log() << "primaryLocalDoc: " << primaryLocalDoc; - if (conn->count(DatabaseType::ConfigNS, primaryLocalDoc)) { - log() << "This shard is listed as primary of local db. Removing entry."; - - Status status = remove(DatabaseType::ConfigNS, - BSON(DatabaseType::name("local")), - 0, - NULL); - if (!status.isOK()) { - log() << "error removing local db: " - << status.reason(); - return status; - } - } + if (conn->count(ShardType::ConfigNS, + BSON(ShardType::name() << NE << name << ShardType::draining(true)))) { + conn.done(); + return Status(ErrorCodes::ConflictingOperationInProgress, + "Can't have more than one draining shard at a time"); + } - Shard::reloadShardInfo(); - conn.done(); + if (conn->count(ShardType::ConfigNS, BSON(ShardType::name() << NE << name)) == 0) { + conn.done(); + return Status(ErrorCodes::IllegalOperation, "Can't remove last shard"); + } + + BSONObj searchDoc = BSON(ShardType::name() << name); + + // Case 1: start draining chunks + BSONObj drainingDoc = BSON(ShardType::name() << name << ShardType::draining(true)); + BSONObj shardDoc = conn->findOne(ShardType::ConfigNS, drainingDoc); + if (shardDoc.isEmpty()) { + log() << "going to start draining shard: " << name; + BSONObj newStatus = BSON("$set" << BSON(ShardType::draining(true))); - // Record start in changelog - logChange(txn, "removeShard.start", "", buildRemoveLogEntry(name, true)); - return ShardDrainingStatus::STARTED; + Status status = update(ShardType::ConfigNS, searchDoc, newStatus, false, false, NULL); + if (!status.isOK()) { + log() << "error starting removeShard: " << name << "; err: " << status.reason(); + return status; } - // Case 2: all chunks drained - BSONObj shardIDDoc = BSON(ChunkType::shard(shardDoc[ShardType::name()].str())); - long long chunkCount = conn->count(ChunkType::ConfigNS, shardIDDoc); - long long dbCount = conn->count(DatabaseType::ConfigNS, - BSON(DatabaseType::name.ne("local") - << DatabaseType::primary(name))); - if (chunkCount == 0 && dbCount == 0) { - log() << "going to remove shard: " << name; - audit::logRemoveShard(ClientBasic::getCurrent(), name); - - Status status = remove(ShardType::ConfigNS, searchDoc, 0, NULL); + BSONObj primaryLocalDoc = BSON(DatabaseType::name("local") << DatabaseType::primary(name)); + log() << "primaryLocalDoc: " << primaryLocalDoc; + if (conn->count(DatabaseType::ConfigNS, primaryLocalDoc)) { + log() << "This shard is listed as primary of local db. Removing entry."; + + Status status = + remove(DatabaseType::ConfigNS, BSON(DatabaseType::name("local")), 0, NULL); if (!status.isOK()) { - log() << "Error concluding removeShard operation on: " << name - << "; err: " << status.reason(); + log() << "error removing local db: " << status.reason(); return status; } + } - grid.shardRegistry()->remove(name); - - shardConnectionPool.removeHost(name); - ReplicaSetMonitor::remove(name); + Shard::reloadShardInfo(); + conn.done(); - Shard::reloadShardInfo(); - conn.done(); + // Record start in changelog + logChange(txn, "removeShard.start", "", buildRemoveLogEntry(name, true)); + return ShardDrainingStatus::STARTED; + } - // Record finish in changelog - logChange(txn, "removeShard", "", buildRemoveLogEntry(name, false)); - return ShardDrainingStatus::COMPLETED; + // Case 2: all chunks drained + BSONObj shardIDDoc = BSON(ChunkType::shard(shardDoc[ShardType::name()].str())); + long long chunkCount = conn->count(ChunkType::ConfigNS, shardIDDoc); + long long dbCount = + conn->count(DatabaseType::ConfigNS, + BSON(DatabaseType::name.ne("local") << DatabaseType::primary(name))); + if (chunkCount == 0 && dbCount == 0) { + log() << "going to remove shard: " << name; + audit::logRemoveShard(ClientBasic::getCurrent(), name); + + Status status = remove(ShardType::ConfigNS, searchDoc, 0, NULL); + if (!status.isOK()) { + log() << "Error concluding removeShard operation on: " << name + << "; err: " << status.reason(); + return status; } - // case 3: draining ongoing - return ShardDrainingStatus::ONGOING; - } + grid.shardRegistry()->remove(name); - Status CatalogManagerLegacy::updateDatabase(const std::string& dbName, const DatabaseType& db) { - fassert(28616, db.validate()); + shardConnectionPool.removeHost(name); + ReplicaSetMonitor::remove(name); - BatchedCommandResponse response; - Status status = update(DatabaseType::ConfigNS, - BSON(DatabaseType::name(dbName)), - db.toBSON(), - true, // upsert - false, // multi - &response); - if (!status.isOK()) { - return Status(status.code(), - str::stream() << "database metadata write failed: " - << response.toBSON() << "; status: " << status.toString()); - } + Shard::reloadShardInfo(); + conn.done(); - return Status::OK(); + // Record finish in changelog + logChange(txn, "removeShard", "", buildRemoveLogEntry(name, false)); + return ShardDrainingStatus::COMPLETED; } - StatusWith<DatabaseType> CatalogManagerLegacy::getDatabase(const std::string& dbName) { - invariant(nsIsDbOnly(dbName)); + // case 3: draining ongoing + return ShardDrainingStatus::ONGOING; +} + +Status CatalogManagerLegacy::updateDatabase(const std::string& dbName, const DatabaseType& db) { + fassert(28616, db.validate()); + + BatchedCommandResponse response; + Status status = update(DatabaseType::ConfigNS, + BSON(DatabaseType::name(dbName)), + db.toBSON(), + true, // upsert + false, // multi + &response); + if (!status.isOK()) { + return Status(status.code(), + str::stream() << "database metadata write failed: " << response.toBSON() + << "; status: " << status.toString()); + } - // The two databases that are hosted on the config server are config and admin - if (dbName == "config" || dbName == "admin") { - DatabaseType dbt; - dbt.setName(dbName); - dbt.setSharded(false); - dbt.setPrimary("config"); + return Status::OK(); +} - return dbt; - } +StatusWith<DatabaseType> CatalogManagerLegacy::getDatabase(const std::string& dbName) { + invariant(nsIsDbOnly(dbName)); - ScopedDbConnection conn(_configServerConnectionString, 30.0); - - BSONObj dbObj = conn->findOne(DatabaseType::ConfigNS, BSON(DatabaseType::name(dbName))); - if (dbObj.isEmpty()) { - conn.done(); - return Status(ErrorCodes::DatabaseNotFound, - stream() << "database " << dbName << " not found"); - } + // The two databases that are hosted on the config server are config and admin + if (dbName == "config" || dbName == "admin") { + DatabaseType dbt; + dbt.setName(dbName); + dbt.setSharded(false); + dbt.setPrimary("config"); - conn.done(); - return DatabaseType::fromBSON(dbObj); + return dbt; } - Status CatalogManagerLegacy::updateCollection(const std::string& collNs, - const CollectionType& coll) { - fassert(28634, coll.validate()); + ScopedDbConnection conn(_configServerConnectionString, 30.0); - BatchedCommandResponse response; - Status status = update(CollectionType::ConfigNS, - BSON(CollectionType::fullNs(collNs)), - coll.toBSON(), - true, // upsert - false, // multi - &response); - if (!status.isOK()) { - return Status(status.code(), - str::stream() << "collection metadata write failed: " - << response.toBSON() << "; status: " << status.toString()); - } + BSONObj dbObj = conn->findOne(DatabaseType::ConfigNS, BSON(DatabaseType::name(dbName))); + if (dbObj.isEmpty()) { + conn.done(); + return Status(ErrorCodes::DatabaseNotFound, + stream() << "database " << dbName << " not found"); + } - return Status::OK(); + conn.done(); + return DatabaseType::fromBSON(dbObj); +} + +Status CatalogManagerLegacy::updateCollection(const std::string& collNs, + const CollectionType& coll) { + fassert(28634, coll.validate()); + + BatchedCommandResponse response; + Status status = update(CollectionType::ConfigNS, + BSON(CollectionType::fullNs(collNs)), + coll.toBSON(), + true, // upsert + false, // multi + &response); + if (!status.isOK()) { + return Status(status.code(), + str::stream() << "collection metadata write failed: " << response.toBSON() + << "; status: " << status.toString()); } - StatusWith<CollectionType> CatalogManagerLegacy::getCollection(const std::string& collNs) { - ScopedDbConnection conn(_configServerConnectionString, 30.0); + return Status::OK(); +} - BSONObj collObj = conn->findOne(CollectionType::ConfigNS, - BSON(CollectionType::fullNs(collNs))); - if (collObj.isEmpty()) { - conn.done(); - return Status(ErrorCodes::NamespaceNotFound, - stream() << "collection " << collNs << " not found"); - } +StatusWith<CollectionType> CatalogManagerLegacy::getCollection(const std::string& collNs) { + ScopedDbConnection conn(_configServerConnectionString, 30.0); + BSONObj collObj = conn->findOne(CollectionType::ConfigNS, BSON(CollectionType::fullNs(collNs))); + if (collObj.isEmpty()) { conn.done(); - return CollectionType::fromBSON(collObj); + return Status(ErrorCodes::NamespaceNotFound, + stream() << "collection " << collNs << " not found"); } - Status CatalogManagerLegacy::getCollections(const std::string* dbName, - std::vector<CollectionType>* collections) { - collections->clear(); + conn.done(); + return CollectionType::fromBSON(collObj); +} - BSONObjBuilder b; - if (dbName) { - invariant(!dbName->empty()); - b.appendRegex(CollectionType::fullNs(), - (string)"^" + pcrecpp::RE::QuoteMeta(*dbName) + "\\."); - } +Status CatalogManagerLegacy::getCollections(const std::string* dbName, + std::vector<CollectionType>* collections) { + collections->clear(); - ScopedDbConnection conn(_configServerConnectionString, 30.0); + BSONObjBuilder b; + if (dbName) { + invariant(!dbName->empty()); + b.appendRegex(CollectionType::fullNs(), + (string) "^" + pcrecpp::RE::QuoteMeta(*dbName) + "\\."); + } - std::unique_ptr<DBClientCursor> cursor(_safeCursor(conn->query(CollectionType::ConfigNS, - b.obj()))); + ScopedDbConnection conn(_configServerConnectionString, 30.0); - while (cursor->more()) { - const BSONObj collObj = cursor->next(); + std::unique_ptr<DBClientCursor> cursor( + _safeCursor(conn->query(CollectionType::ConfigNS, b.obj()))); - auto status = CollectionType::fromBSON(collObj); - if (!status.isOK()) { - conn.done(); - return status.getStatus(); - } + while (cursor->more()) { + const BSONObj collObj = cursor->next(); - collections->push_back(status.getValue()); + auto status = CollectionType::fromBSON(collObj); + if (!status.isOK()) { + conn.done(); + return status.getStatus(); } - conn.done(); - return Status::OK(); + collections->push_back(status.getValue()); } - Status CatalogManagerLegacy::dropCollection(const std::string& collectionNs) { - logChange(NULL, "dropCollection.start", collectionNs, BSONObj()); + conn.done(); + return Status::OK(); +} - // Lock the collection globally so that split/migrate cannot run - auto scopedDistLock = getDistLockManager()->lock(collectionNs, "drop"); - if (!scopedDistLock.isOK()) { - return scopedDistLock.getStatus(); - } +Status CatalogManagerLegacy::dropCollection(const std::string& collectionNs) { + logChange(NULL, "dropCollection.start", collectionNs, BSONObj()); - LOG(1) << "dropCollection " << collectionNs << " started"; + // Lock the collection globally so that split/migrate cannot run + auto scopedDistLock = getDistLockManager()->lock(collectionNs, "drop"); + if (!scopedDistLock.isOK()) { + return scopedDistLock.getStatus(); + } - // This cleans up the collection on all shards - vector<ShardType> allShards; - Status status = getAllShards(&allShards); - if (!status.isOK()) { - return status; - } + LOG(1) << "dropCollection " << collectionNs << " started"; - LOG(1) << "dropCollection " << collectionNs << " locked"; + // This cleans up the collection on all shards + vector<ShardType> allShards; + Status status = getAllShards(&allShards); + if (!status.isOK()) { + return status; + } - map<string, BSONObj> errors; + LOG(1) << "dropCollection " << collectionNs << " locked"; - // Delete data from all mongods - for (vector<ShardType>::const_iterator i = allShards.begin(); i != allShards.end(); i++) { - const auto shard = grid.shardRegistry()->getShard(i->getName()); - ScopedDbConnection conn(shard->getConnString()); + map<string, BSONObj> errors; - BSONObj info; - if (!conn->dropCollection(collectionNs, &info)) { - // Ignore the database not found errors - if (info["code"].isNumber() && - (info["code"].Int() == ErrorCodes::NamespaceNotFound)) { - conn.done(); - continue; - } + // Delete data from all mongods + for (vector<ShardType>::const_iterator i = allShards.begin(); i != allShards.end(); i++) { + const auto shard = grid.shardRegistry()->getShard(i->getName()); + ScopedDbConnection conn(shard->getConnString()); - errors[shard->getConnString().toString()] = info; + BSONObj info; + if (!conn->dropCollection(collectionNs, &info)) { + // Ignore the database not found errors + if (info["code"].isNumber() && (info["code"].Int() == ErrorCodes::NamespaceNotFound)) { + conn.done(); + continue; } - conn.done(); + errors[shard->getConnString().toString()] = info; } - if (!errors.empty()) { - StringBuilder sb; - sb << "Dropping collection failed on the following hosts: "; - - for (map<string, BSONObj>::const_iterator it = errors.begin(); - it != errors.end(); - ++it) { + conn.done(); + } - if (it != errors.begin()) { - sb << ", "; - } + if (!errors.empty()) { + StringBuilder sb; + sb << "Dropping collection failed on the following hosts: "; - sb << it->first << ": " << it->second; + for (map<string, BSONObj>::const_iterator it = errors.begin(); it != errors.end(); ++it) { + if (it != errors.begin()) { + sb << ", "; } - return Status(ErrorCodes::OperationFailed, sb.str()); + sb << it->first << ": " << it->second; } - LOG(1) << "dropCollection " << collectionNs << " shard data deleted"; - - // remove chunk data - Status result = remove(ChunkType::ConfigNS, - BSON(ChunkType::ns(collectionNs)), - 0, - NULL); - if (!result.isOK()) { - return result; - } + return Status(ErrorCodes::OperationFailed, sb.str()); + } - LOG(1) << "dropCollection " << collectionNs << " chunk data deleted"; + LOG(1) << "dropCollection " << collectionNs << " shard data deleted"; - for (vector<ShardType>::const_iterator i = allShards.begin(); i != allShards.end(); i++) { - const auto shard = grid.shardRegistry()->getShard(i->getName()); - ScopedDbConnection conn(shard->getConnString()); + // remove chunk data + Status result = remove(ChunkType::ConfigNS, BSON(ChunkType::ns(collectionNs)), 0, NULL); + if (!result.isOK()) { + return result; + } - BSONObj res; + LOG(1) << "dropCollection " << collectionNs << " chunk data deleted"; - // this is horrible - // we need a special command for dropping on the d side - // this hack works for the moment + for (vector<ShardType>::const_iterator i = allShards.begin(); i != allShards.end(); i++) { + const auto shard = grid.shardRegistry()->getShard(i->getName()); + ScopedDbConnection conn(shard->getConnString()); - if (!setShardVersion(conn.conn(), - collectionNs, - _configServerConnectionString.toString(), - ChunkVersion(0, 0, OID()), - NULL, - true, - res)) { + BSONObj res; - return Status(static_cast<ErrorCodes::Error>(8071), - str::stream() << "cleaning up after drop failed: " << res); - } + // this is horrible + // we need a special command for dropping on the d side + // this hack works for the moment - conn->simpleCommand("admin", 0, "unsetSharding"); - conn.done(); + if (!setShardVersion(conn.conn(), + collectionNs, + _configServerConnectionString.toString(), + ChunkVersion(0, 0, OID()), + NULL, + true, + res)) { + return Status(static_cast<ErrorCodes::Error>(8071), + str::stream() << "cleaning up after drop failed: " << res); } - LOG(1) << "dropCollection " << collectionNs << " completed"; + conn->simpleCommand("admin", 0, "unsetSharding"); + conn.done(); + } - logChange(NULL, "dropCollection", collectionNs, BSONObj()); + LOG(1) << "dropCollection " << collectionNs << " completed"; - return Status::OK(); - } + logChange(NULL, "dropCollection", collectionNs, BSONObj()); - void CatalogManagerLegacy::logAction(const ActionLogType& actionLog) { - // Create the action log collection and ensure that it is capped. Wrap in try/catch, - // because creating an existing collection throws. - if (actionLogCollectionCreated.load() == 0) { - try { - ScopedDbConnection conn(_configServerConnectionString, 30.0); - conn->createCollection(ActionLogType::ConfigNS, 1024 * 1024 * 2, true); - conn.done(); + return Status::OK(); +} - actionLogCollectionCreated.store(1); - } - catch (const DBException& e) { - // It's ok to ignore this exception - LOG(1) << "couldn't create actionlog collection: " << e; - } - } +void CatalogManagerLegacy::logAction(const ActionLogType& actionLog) { + // Create the action log collection and ensure that it is capped. Wrap in try/catch, + // because creating an existing collection throws. + if (actionLogCollectionCreated.load() == 0) { + try { + ScopedDbConnection conn(_configServerConnectionString, 30.0); + conn->createCollection(ActionLogType::ConfigNS, 1024 * 1024 * 2, true); + conn.done(); - Status result = insert(ActionLogType::ConfigNS, actionLog.toBSON(), NULL); - if (!result.isOK()) { - log() << "error encountered while logging action: " << result; + actionLogCollectionCreated.store(1); + } catch (const DBException& e) { + // It's ok to ignore this exception + LOG(1) << "couldn't create actionlog collection: " << e; } } - void CatalogManagerLegacy::logChange(OperationContext* opCtx, - const string& what, - const string& ns, - const BSONObj& detail) { - - // Create the change log collection and ensure that it is capped. Wrap in try/catch, - // because creating an existing collection throws. - if (changeLogCollectionCreated.load() == 0) { - try { - ScopedDbConnection conn(_configServerConnectionString, 30.0); - conn->createCollection(ChangelogType::ConfigNS, 1024 * 1024 * 10, true); - conn.done(); + Status result = insert(ActionLogType::ConfigNS, actionLog.toBSON(), NULL); + if (!result.isOK()) { + log() << "error encountered while logging action: " << result; + } +} + +void CatalogManagerLegacy::logChange(OperationContext* opCtx, + const string& what, + const string& ns, + const BSONObj& detail) { + // Create the change log collection and ensure that it is capped. Wrap in try/catch, + // because creating an existing collection throws. + if (changeLogCollectionCreated.load() == 0) { + try { + ScopedDbConnection conn(_configServerConnectionString, 30.0); + conn->createCollection(ChangelogType::ConfigNS, 1024 * 1024 * 10, true); + conn.done(); - changeLogCollectionCreated.store(1); - } - catch (const UserException& e) { - // It's ok to ignore this exception - LOG(1) << "couldn't create changelog collection: " << e; - } + changeLogCollectionCreated.store(1); + } catch (const UserException& e) { + // It's ok to ignore this exception + LOG(1) << "couldn't create changelog collection: " << e; } + } - // Store this entry's ID so we can use on the exception code path too - StringBuilder changeIdBuilder; - changeIdBuilder << getHostNameCached() << "-" << terseCurrentTime() - << "-" << OID::gen(); + // Store this entry's ID so we can use on the exception code path too + StringBuilder changeIdBuilder; + changeIdBuilder << getHostNameCached() << "-" << terseCurrentTime() << "-" << OID::gen(); - const string changeID = changeIdBuilder.str(); + const string changeID = changeIdBuilder.str(); - Client* client; - if (opCtx) { - client = opCtx->getClient(); - } - else if (haveClient()) { - client = &cc(); - } - else { - client = nullptr; - } + Client* client; + if (opCtx) { + client = opCtx->getClient(); + } else if (haveClient()) { + client = &cc(); + } else { + client = nullptr; + } - // Send a copy of the message to the local log in case it doesn't manage to reach - // config.changelog - BSONObj msg = BSON(ChangelogType::changeID(changeID) << - ChangelogType::server(getHostNameCached()) << - ChangelogType::clientAddr((client ? - client->clientAddress(true) : "")) << - ChangelogType::time(jsTime()) << - ChangelogType::what(what) << - ChangelogType::ns(ns) << - ChangelogType::details(detail)); + // Send a copy of the message to the local log in case it doesn't manage to reach + // config.changelog + BSONObj msg = BSON(ChangelogType::changeID(changeID) + << ChangelogType::server(getHostNameCached()) + << ChangelogType::clientAddr((client ? client->clientAddress(true) : "")) + << ChangelogType::time(jsTime()) << ChangelogType::what(what) + << ChangelogType::ns(ns) << ChangelogType::details(detail)); - log() << "about to log metadata event: " << msg; + log() << "about to log metadata event: " << msg; - Status result = insert(ChangelogType::ConfigNS, msg, NULL); - if (!result.isOK()) { - warning() << "Error encountered while logging config change with ID " - << changeID << ": " << result; - } + Status result = insert(ChangelogType::ConfigNS, msg, NULL); + if (!result.isOK()) { + warning() << "Error encountered while logging config change with ID " << changeID << ": " + << result; } +} - StatusWith<SettingsType> CatalogManagerLegacy::getGlobalSettings(const string& key) { - try { - ScopedDbConnection conn(_configServerConnectionString, 30); - BSONObj settingsDoc = conn->findOne(SettingsType::ConfigNS, - BSON(SettingsType::key(key))); - conn.done(); - - if (settingsDoc.isEmpty()) { - return Status(ErrorCodes::NoMatchingDocument, - str::stream() << "can't find settings document with key: " << key); - } +StatusWith<SettingsType> CatalogManagerLegacy::getGlobalSettings(const string& key) { + try { + ScopedDbConnection conn(_configServerConnectionString, 30); + BSONObj settingsDoc = conn->findOne(SettingsType::ConfigNS, BSON(SettingsType::key(key))); + conn.done(); - StatusWith<SettingsType> settingsResult = SettingsType::fromBSON(settingsDoc); - if (!settingsResult.isOK()) { - return Status(ErrorCodes::FailedToParse, - str::stream() << "error while parsing settings document: " - << settingsDoc - << " : " << settingsResult.getStatus().toString()); - } + if (settingsDoc.isEmpty()) { + return Status(ErrorCodes::NoMatchingDocument, + str::stream() << "can't find settings document with key: " << key); + } - const SettingsType& settings = settingsResult.getValue(); + StatusWith<SettingsType> settingsResult = SettingsType::fromBSON(settingsDoc); + if (!settingsResult.isOK()) { + return Status(ErrorCodes::FailedToParse, + str::stream() << "error while parsing settings document: " << settingsDoc + << " : " << settingsResult.getStatus().toString()); + } - Status validationStatus = settings.validate(); - if (!validationStatus.isOK()) { - return validationStatus; - } + const SettingsType& settings = settingsResult.getValue(); - return settingsResult; - } - catch (const DBException& ex) { - return Status(ErrorCodes::OperationFailed, - str::stream() << "unable to successfully obtain " - << "config.settings document: " << causedBy(ex)); + Status validationStatus = settings.validate(); + if (!validationStatus.isOK()) { + return validationStatus; } - } - Status CatalogManagerLegacy::getDatabasesForShard(const string& shardName, - vector<string>* dbs) { - dbs->clear(); - - try { - ScopedDbConnection conn(_configServerConnectionString, 30.0); - std::unique_ptr<DBClientCursor> cursor(_safeCursor( - conn->query(DatabaseType::ConfigNS, - Query(BSON(DatabaseType::primary(shardName)))))); - if (!cursor.get()) { - conn.done(); - return Status(ErrorCodes::HostUnreachable, "unable to open chunk cursor"); - } + return settingsResult; + } catch (const DBException& ex) { + return Status(ErrorCodes::OperationFailed, + str::stream() << "unable to successfully obtain " + << "config.settings document: " << causedBy(ex)); + } +} - while (cursor->more()) { - BSONObj shard = cursor->nextSafe(); - dbs->push_back(shard[DatabaseType::name()].str()); - } +Status CatalogManagerLegacy::getDatabasesForShard(const string& shardName, vector<string>* dbs) { + dbs->clear(); + try { + ScopedDbConnection conn(_configServerConnectionString, 30.0); + std::unique_ptr<DBClientCursor> cursor(_safeCursor( + conn->query(DatabaseType::ConfigNS, Query(BSON(DatabaseType::primary(shardName)))))); + if (!cursor.get()) { conn.done(); + return Status(ErrorCodes::HostUnreachable, "unable to open chunk cursor"); } - catch (const DBException& ex) { - return ex.toStatus(); + + while (cursor->more()) { + BSONObj shard = cursor->nextSafe(); + dbs->push_back(shard[DatabaseType::name()].str()); } - return Status::OK(); + conn.done(); + } catch (const DBException& ex) { + return ex.toStatus(); } - Status CatalogManagerLegacy::getChunks(const Query& query, - int nToReturn, - vector<ChunkType>* chunks) { - chunks->clear(); + return Status::OK(); +} - try { - ScopedDbConnection conn(_configServerConnectionString, 30.0); - std::unique_ptr<DBClientCursor> cursor(_safeCursor(conn->query(ChunkType::ConfigNS, - query, - nToReturn))); - if (!cursor.get()) { - conn.done(); - return Status(ErrorCodes::HostUnreachable, "unable to open chunk cursor"); - } +Status CatalogManagerLegacy::getChunks(const Query& query, + int nToReturn, + vector<ChunkType>* chunks) { + chunks->clear(); - while (cursor->more()) { - BSONObj chunkObj = cursor->nextSafe(); + try { + ScopedDbConnection conn(_configServerConnectionString, 30.0); + std::unique_ptr<DBClientCursor> cursor( + _safeCursor(conn->query(ChunkType::ConfigNS, query, nToReturn))); + if (!cursor.get()) { + conn.done(); + return Status(ErrorCodes::HostUnreachable, "unable to open chunk cursor"); + } - StatusWith<ChunkType> chunkRes = ChunkType::fromBSON(chunkObj); - if (!chunkRes.isOK()) { - conn.done(); - chunks->clear(); - return {ErrorCodes::FailedToParse, - stream() << "Failed to parse chunk with id (" - << chunkObj[ChunkType::name()].toString() << "): " - << chunkRes.getStatus().reason()}; - } + while (cursor->more()) { + BSONObj chunkObj = cursor->nextSafe(); - chunks->push_back(chunkRes.getValue()); + StatusWith<ChunkType> chunkRes = ChunkType::fromBSON(chunkObj); + if (!chunkRes.isOK()) { + conn.done(); + chunks->clear(); + return {ErrorCodes::FailedToParse, + stream() << "Failed to parse chunk with id (" + << chunkObj[ChunkType::name()].toString() + << "): " << chunkRes.getStatus().reason()}; } - conn.done(); - } - catch (const DBException& ex) { - return ex.toStatus(); + chunks->push_back(chunkRes.getValue()); } - return Status::OK(); + conn.done(); + } catch (const DBException& ex) { + return ex.toStatus(); } - Status CatalogManagerLegacy::getTagsForCollection(const std::string& collectionNs, - std::vector<TagsType>* tags) { - tags->clear(); + return Status::OK(); +} - try { - ScopedDbConnection conn(_configServerConnectionString, 30); - std::unique_ptr<DBClientCursor> cursor(_safeCursor( - conn->query(TagsType::ConfigNS, - Query(BSON(TagsType::ns(collectionNs))) - .sort(TagsType::min())))); - if (!cursor.get()) { - conn.done(); - return Status(ErrorCodes::HostUnreachable, "unable to open tags cursor"); - } +Status CatalogManagerLegacy::getTagsForCollection(const std::string& collectionNs, + std::vector<TagsType>* tags) { + tags->clear(); - while (cursor->more()) { - BSONObj tagObj = cursor->nextSafe(); + try { + ScopedDbConnection conn(_configServerConnectionString, 30); + std::unique_ptr<DBClientCursor> cursor(_safeCursor(conn->query( + TagsType::ConfigNS, Query(BSON(TagsType::ns(collectionNs))).sort(TagsType::min())))); + if (!cursor.get()) { + conn.done(); + return Status(ErrorCodes::HostUnreachable, "unable to open tags cursor"); + } - StatusWith<TagsType> tagRes = TagsType::fromBSON(tagObj); - if (!tagRes.isOK()) { - conn.done(); - return Status(ErrorCodes::FailedToParse, - str::stream() << "Failed to parse tag BSONObj: " - << tagRes.getStatus().reason()); - } + while (cursor->more()) { + BSONObj tagObj = cursor->nextSafe(); - tags->push_back(tagRes.getValue()); + StatusWith<TagsType> tagRes = TagsType::fromBSON(tagObj); + if (!tagRes.isOK()) { + conn.done(); + return Status(ErrorCodes::FailedToParse, + str::stream() << "Failed to parse tag BSONObj: " + << tagRes.getStatus().reason()); } - conn.done(); - } - catch (const DBException& ex) { - return ex.toStatus(); + tags->push_back(tagRes.getValue()); } - return Status::OK(); + conn.done(); + } catch (const DBException& ex) { + return ex.toStatus(); } - StatusWith<string> CatalogManagerLegacy::getTagForChunk(const std::string& collectionNs, - const ChunkType& chunk) { - BSONObj tagDoc; + return Status::OK(); +} - try { - ScopedDbConnection conn(_configServerConnectionString, 30); +StatusWith<string> CatalogManagerLegacy::getTagForChunk(const std::string& collectionNs, + const ChunkType& chunk) { + BSONObj tagDoc; - Query query(BSON(TagsType::ns(collectionNs) << - TagsType::min() << BSON("$lte" << chunk.getMin()) << - TagsType::max() << BSON("$gte" << chunk.getMax()))); + try { + ScopedDbConnection conn(_configServerConnectionString, 30); - tagDoc = conn->findOne(TagsType::ConfigNS, query); - conn.done(); - } - catch (const DBException& ex) { - return ex.toStatus(); - } + Query query(BSON(TagsType::ns(collectionNs) + << TagsType::min() << BSON("$lte" << chunk.getMin()) << TagsType::max() + << BSON("$gte" << chunk.getMax()))); - if (tagDoc.isEmpty()) { - return std::string(""); - } + tagDoc = conn->findOne(TagsType::ConfigNS, query); + conn.done(); + } catch (const DBException& ex) { + return ex.toStatus(); + } - auto status = TagsType::fromBSON(tagDoc); - if (status.isOK()) { - return status.getValue().getTag(); - } + if (tagDoc.isEmpty()) { + return std::string(""); + } - return status.getStatus(); + auto status = TagsType::fromBSON(tagDoc); + if (status.isOK()) { + return status.getValue().getTag(); } - Status CatalogManagerLegacy::getAllShards(vector<ShardType>* shards) { - ScopedDbConnection conn(_configServerConnectionString, 30.0); - std::unique_ptr<DBClientCursor> cursor(_safeCursor(conn->query(ShardType::ConfigNS, - BSONObj()))); - while (cursor->more()) { - BSONObj shardObj = cursor->nextSafe(); + return status.getStatus(); +} - StatusWith<ShardType> shardRes = ShardType::fromBSON(shardObj); - if (!shardRes.isOK()) { - shards->clear(); - conn.done(); - return Status(ErrorCodes::FailedToParse, - str::stream() << "Failed to parse shard with id (" - << shardObj[ShardType::name()].toString() << "): " - << shardRes.getStatus().reason()); - } +Status CatalogManagerLegacy::getAllShards(vector<ShardType>* shards) { + ScopedDbConnection conn(_configServerConnectionString, 30.0); + std::unique_ptr<DBClientCursor> cursor( + _safeCursor(conn->query(ShardType::ConfigNS, BSONObj()))); + while (cursor->more()) { + BSONObj shardObj = cursor->nextSafe(); - shards->push_back(shardRes.getValue()); + StatusWith<ShardType> shardRes = ShardType::fromBSON(shardObj); + if (!shardRes.isOK()) { + shards->clear(); + conn.done(); + return Status(ErrorCodes::FailedToParse, + str::stream() << "Failed to parse shard with id (" + << shardObj[ShardType::name()].toString() + << "): " << shardRes.getStatus().reason()); } - conn.done(); - return Status::OK(); + shards->push_back(shardRes.getValue()); } - - bool CatalogManagerLegacy::isShardHost(const ConnectionString& connectionString) { - return _getShardCount(BSON(ShardType::host(connectionString.toString()))); + conn.done(); + + return Status::OK(); +} + +bool CatalogManagerLegacy::isShardHost(const ConnectionString& connectionString) { + return _getShardCount(BSON(ShardType::host(connectionString.toString()))); +} + +bool CatalogManagerLegacy::runUserManagementWriteCommand(const string& commandName, + const string& dbname, + const BSONObj& cmdObj, + BSONObjBuilder* result) { + DBClientMultiCommand dispatcher; + RawBSONSerializable requestCmdSerial(cmdObj); + for (const ConnectionString& configServer : _configServers) { + dispatcher.addCommand(configServer, dbname, requestCmdSerial); } - bool CatalogManagerLegacy::runUserManagementWriteCommand(const string& commandName, - const string& dbname, - const BSONObj& cmdObj, - BSONObjBuilder* result) { - DBClientMultiCommand dispatcher; - RawBSONSerializable requestCmdSerial(cmdObj); - for (const ConnectionString& configServer : _configServers) { - dispatcher.addCommand(configServer, dbname, requestCmdSerial); - } - - auto scopedDistLock = getDistLockManager()->lock("authorizationData", - commandName, - Seconds{5}); - if (!scopedDistLock.isOK()) { - return Command::appendCommandStatus(*result, scopedDistLock.getStatus()); - } - - dispatcher.sendAll(); + auto scopedDistLock = getDistLockManager()->lock("authorizationData", commandName, Seconds{5}); + if (!scopedDistLock.isOK()) { + return Command::appendCommandStatus(*result, scopedDistLock.getStatus()); + } - BSONObj responseObj; + dispatcher.sendAll(); - Status prevStatus{Status::OK()}; - Status currStatus{Status::OK()}; + BSONObj responseObj; - BSONObjBuilder responses; - unsigned failedCount = 0; - bool sameError = true; - while (dispatcher.numPending() > 0) { - ConnectionString host; - RawBSONSerializable responseCmdSerial; + Status prevStatus{Status::OK()}; + Status currStatus{Status::OK()}; - Status dispatchStatus = dispatcher.recvAny(&host, - &responseCmdSerial); + BSONObjBuilder responses; + unsigned failedCount = 0; + bool sameError = true; + while (dispatcher.numPending() > 0) { + ConnectionString host; + RawBSONSerializable responseCmdSerial; - if (!dispatchStatus.isOK()) { - return Command::appendCommandStatus(*result, dispatchStatus); - } + Status dispatchStatus = dispatcher.recvAny(&host, &responseCmdSerial); - responseObj = responseCmdSerial.toBSON(); - responses.append(host.toString(), responseObj); - - currStatus = Command::getStatusFromCommandResult(responseObj); - if (!currStatus.isOK()) { - // same error <=> adjacent error statuses are the same - if (failedCount > 0 && prevStatus != currStatus) { - sameError = false; - } - failedCount++; - prevStatus = currStatus; - } + if (!dispatchStatus.isOK()) { + return Command::appendCommandStatus(*result, dispatchStatus); } - if (failedCount == 0) { - result->appendElements(responseObj); - return true; - } + responseObj = responseCmdSerial.toBSON(); + responses.append(host.toString(), responseObj); - // if the command succeeds on at least one config server and fails on at least one, - // manual intervention is required - if (failedCount < _configServers.size()) { - Status status(ErrorCodes::ManualInterventionRequired, - str::stream() << "Config write was not consistent - " - << "user management command failed on at least one " - << "config server but passed on at least one other. " - << "Manual intervention may be required. " - << "Config responses: " << responses.obj().toString()); - return Command::appendCommandStatus(*result, status); + currStatus = Command::getStatusFromCommandResult(responseObj); + if (!currStatus.isOK()) { + // same error <=> adjacent error statuses are the same + if (failedCount > 0 && prevStatus != currStatus) { + sameError = false; + } + failedCount++; + prevStatus = currStatus; } + } - if (sameError) { - result->appendElements(responseObj); - return false; - } + if (failedCount == 0) { + result->appendElements(responseObj); + return true; + } + // if the command succeeds on at least one config server and fails on at least one, + // manual intervention is required + if (failedCount < _configServers.size()) { Status status(ErrorCodes::ManualInterventionRequired, str::stream() << "Config write was not consistent - " - << "user management command produced inconsistent results. " + << "user management command failed on at least one " + << "config server but passed on at least one other. " << "Manual intervention may be required. " << "Config responses: " << responses.obj().toString()); return Command::appendCommandStatus(*result, status); } - bool CatalogManagerLegacy::runUserManagementReadCommand(const string& dbname, - const BSONObj& cmdObj, - BSONObjBuilder* result) { - try { - // let SyncClusterConnection handle connecting to the first config server - // that is reachable and returns data - ScopedDbConnection conn(_configServerConnectionString, 30); - - BSONObj cmdResult; - const bool ok = conn->runCommand(dbname, cmdObj, cmdResult); - result->appendElements(cmdResult); - conn.done(); - return ok; - } - catch (const DBException& ex) { - return Command::appendCommandStatus(*result, ex.toStatus()); - } + if (sameError) { + result->appendElements(responseObj); + return false; } - Status CatalogManagerLegacy::applyChunkOpsDeprecated(const BSONArray& updateOps, - const BSONArray& preCondition) { - BSONObj cmd = BSON("applyOps" << updateOps << - "preCondition" << preCondition); - bool ok; + Status status(ErrorCodes::ManualInterventionRequired, + str::stream() << "Config write was not consistent - " + << "user management command produced inconsistent results. " + << "Manual intervention may be required. " + << "Config responses: " << responses.obj().toString()); + return Command::appendCommandStatus(*result, status); +} + +bool CatalogManagerLegacy::runUserManagementReadCommand(const string& dbname, + const BSONObj& cmdObj, + BSONObjBuilder* result) { + try { + // let SyncClusterConnection handle connecting to the first config server + // that is reachable and returns data + ScopedDbConnection conn(_configServerConnectionString, 30); + BSONObj cmdResult; - try { - ScopedDbConnection conn(_configServerConnectionString, 30); - ok = conn->runCommand("config", cmd, cmdResult); - conn.done(); - } - catch (const DBException& ex) { - return ex.toStatus(); - } + const bool ok = conn->runCommand(dbname, cmdObj, cmdResult); + result->appendElements(cmdResult); + conn.done(); + return ok; + } catch (const DBException& ex) { + return Command::appendCommandStatus(*result, ex.toStatus()); + } +} + +Status CatalogManagerLegacy::applyChunkOpsDeprecated(const BSONArray& updateOps, + const BSONArray& preCondition) { + BSONObj cmd = BSON("applyOps" << updateOps << "preCondition" << preCondition); + bool ok; + BSONObj cmdResult; + try { + ScopedDbConnection conn(_configServerConnectionString, 30); + ok = conn->runCommand("config", cmd, cmdResult); + conn.done(); + } catch (const DBException& ex) { + return ex.toStatus(); + } - if (!ok) { - string errMsg(str::stream() << "Unable to save chunk ops. Command: " - << cmd << ". Result: " << cmdResult); + if (!ok) { + string errMsg(str::stream() << "Unable to save chunk ops. Command: " << cmd + << ". Result: " << cmdResult); - return Status(ErrorCodes::OperationFailed, errMsg); - } + return Status(ErrorCodes::OperationFailed, errMsg); + } - return Status::OK(); + return Status::OK(); +} + +void CatalogManagerLegacy::writeConfigServerDirect(const BatchedCommandRequest& request, + BatchedCommandResponse* response) { + // check if config servers are consistent + if (!_isConsistentFromLastCheck()) { + toBatchError(Status(ErrorCodes::ConfigServersInconsistent, + "Data inconsistency detected amongst config servers"), + response); + return; } - void CatalogManagerLegacy::writeConfigServerDirect(const BatchedCommandRequest& request, - BatchedCommandResponse* response) { + // We only support batch sizes of one for config writes + if (request.sizeWriteOps() != 1) { + toBatchError(Status(ErrorCodes::InvalidOptions, + str::stream() << "Writes to config servers must have batch size of 1, " + << "found " << request.sizeWriteOps()), + response); - // check if config servers are consistent - if (!_isConsistentFromLastCheck()) { - toBatchError( - Status(ErrorCodes::ConfigServersInconsistent, - "Data inconsistency detected amongst config servers"), - response); - return; - } + return; + } - // We only support batch sizes of one for config writes - if (request.sizeWriteOps() != 1) { - toBatchError( - Status(ErrorCodes::InvalidOptions, - str::stream() << "Writes to config servers must have batch size of 1, " - << "found " << request.sizeWriteOps()), - response); + // We only support {w: 0}, {w: 1}, and {w: 'majority'} write concern for config writes + if (request.isWriteConcernSet() && !validConfigWC(request.getWriteConcern())) { + toBatchError(Status(ErrorCodes::InvalidOptions, + str::stream() << "Invalid write concern for write to " + << "config servers: " << request.getWriteConcern()), + response); - return; - } + return; + } - // We only support {w: 0}, {w: 1}, and {w: 'majority'} write concern for config writes - if (request.isWriteConcernSet() && !validConfigWC(request.getWriteConcern())) { + DBClientMultiCommand dispatcher; + if (_configServers.size() > 1) { + // We can't support no-_id upserts to multiple config servers - the _ids will differ + if (BatchedCommandRequest::containsNoIDUpsert(request)) { toBatchError( Status(ErrorCodes::InvalidOptions, - str::stream() << "Invalid write concern for write to " - << "config servers: " << request.getWriteConcern()), + str::stream() << "upserts to multiple config servers must include _id"), response); - return; } - - DBClientMultiCommand dispatcher; - if (_configServers.size() > 1) { - // We can't support no-_id upserts to multiple config servers - the _ids will differ - if (BatchedCommandRequest::containsNoIDUpsert(request)) { - toBatchError( - Status(ErrorCodes::InvalidOptions, - str::stream() << "upserts to multiple config servers must include _id"), - response); - return; - } - } - - ConfigCoordinator exec(&dispatcher, _configServerConnectionString); - exec.executeBatch(request, response); } - Status CatalogManagerLegacy::_checkDbDoesNotExist(const std::string& dbName) const { - ScopedDbConnection conn(_configServerConnectionString, 30); + ConfigCoordinator exec(&dispatcher, _configServerConnectionString); + exec.executeBatch(request, response); +} - BSONObjBuilder b; - b.appendRegex(DatabaseType::name(), - (string)"^" + pcrecpp::RE::QuoteMeta(dbName) + "$", "i"); +Status CatalogManagerLegacy::_checkDbDoesNotExist(const std::string& dbName) const { + ScopedDbConnection conn(_configServerConnectionString, 30); - BSONObj dbObj = conn->findOne(DatabaseType::ConfigNS, b.obj()); - conn.done(); + BSONObjBuilder b; + b.appendRegex(DatabaseType::name(), (string) "^" + pcrecpp::RE::QuoteMeta(dbName) + "$", "i"); - // If our name is exactly the same as the name we want, try loading - // the database again. - if (!dbObj.isEmpty() && dbObj[DatabaseType::name()].String() == dbName) { - return Status(ErrorCodes::NamespaceExists, - str::stream() << "database " << dbName << " already exists"); - } + BSONObj dbObj = conn->findOne(DatabaseType::ConfigNS, b.obj()); + conn.done(); - if (!dbObj.isEmpty()) { - return Status(ErrorCodes::DatabaseDifferCase, - str::stream() << "can't have 2 databases that just differ on case " - << " have: " << dbObj[DatabaseType::name()].String() - << " want to add: " << dbName); - } + // If our name is exactly the same as the name we want, try loading + // the database again. + if (!dbObj.isEmpty() && dbObj[DatabaseType::name()].String() == dbName) { + return Status(ErrorCodes::NamespaceExists, + str::stream() << "database " << dbName << " already exists"); + } - return Status::OK(); + if (!dbObj.isEmpty()) { + return Status(ErrorCodes::DatabaseDifferCase, + str::stream() << "can't have 2 databases that just differ on case " + << " have: " << dbObj[DatabaseType::name()].String() + << " want to add: " << dbName); } - StatusWith<string> CatalogManagerLegacy::_getNewShardName() const { - BSONObj o; - { - ScopedDbConnection conn(_configServerConnectionString, 30); - o = conn->findOne(ShardType::ConfigNS, - Query(fromjson("{" + ShardType::name() + ": /^shard/}")) - .sort(BSON(ShardType::name() << -1 ))); - conn.done(); - } + return Status::OK(); +} - int count = 0; - if (!o.isEmpty()) { - string last = o[ShardType::name()].String(); - std::istringstream is(last.substr(5)); - is >> count; - count++; - } +StatusWith<string> CatalogManagerLegacy::_getNewShardName() const { + BSONObj o; + { + ScopedDbConnection conn(_configServerConnectionString, 30); + o = conn->findOne(ShardType::ConfigNS, + Query(fromjson("{" + ShardType::name() + ": /^shard/}")) + .sort(BSON(ShardType::name() << -1))); + conn.done(); + } - // TODO fix so that we can have more than 10000 automatically generated shard names - if (count < 9999) { - std::stringstream ss; - ss << "shard" << std::setfill('0') << std::setw(4) << count; - return ss.str(); - } + int count = 0; + if (!o.isEmpty()) { + string last = o[ShardType::name()].String(); + std::istringstream is(last.substr(5)); + is >> count; + count++; + } - return Status(ErrorCodes::OperationFailed, - "unable to generate new shard name"); + // TODO fix so that we can have more than 10000 automatically generated shard names + if (count < 9999) { + std::stringstream ss; + ss << "shard" << std::setfill('0') << std::setw(4) << count; + return ss.str(); } - size_t CatalogManagerLegacy::_getShardCount(const BSONObj& query) const { - ScopedDbConnection conn(_configServerConnectionString, 30.0); - long long shardCount = conn->count(ShardType::ConfigNS, query); - conn.done(); + return Status(ErrorCodes::OperationFailed, "unable to generate new shard name"); +} - return shardCount; - } +size_t CatalogManagerLegacy::_getShardCount(const BSONObj& query) const { + ScopedDbConnection conn(_configServerConnectionString, 30.0); + long long shardCount = conn->count(ShardType::ConfigNS, query); + conn.done(); - DistLockManager* CatalogManagerLegacy::getDistLockManager() { - invariant(_distLockManager); - return _distLockManager.get(); - } + return shardCount; +} - bool CatalogManagerLegacy::_checkConfigServersConsistent(const unsigned tries) const { - if (tries <= 0) - return false; +DistLockManager* CatalogManagerLegacy::getDistLockManager() { + invariant(_distLockManager); + return _distLockManager.get(); +} - unsigned firstGood = 0; - int up = 0; - vector<BSONObj> res; +bool CatalogManagerLegacy::_checkConfigServersConsistent(const unsigned tries) const { + if (tries <= 0) + return false; - // The last error we saw on a config server - string errMsg; + unsigned firstGood = 0; + int up = 0; + vector<BSONObj> res; - for (unsigned i = 0; i < _configServers.size(); i++) { - BSONObj result; - std::unique_ptr<ScopedDbConnection> conn; - - try { - conn.reset(new ScopedDbConnection(_configServers[i], 30.0)); - - if (!conn->get()->runCommand("config", - BSON("dbhash" << 1 << - "collections" << BSON_ARRAY("chunks" << - "databases" << - "collections" << - "shards" << - "version")), - result)) { - - errMsg = result["errmsg"].eoo() ? "" : result["errmsg"].String(); - if (!result["assertion"].eoo()) errMsg = result["assertion"].String(); - - warning() << "couldn't check dbhash on config server " << _configServers[i] - << causedBy(result.toString()); - - result = BSONObj(); - } - else { - result = result.getOwned(); - if (up == 0) - firstGood = i; - up++; - } - conn->done(); - } - catch (const DBException& e) { - if (conn) { - conn->kill(); - } + // The last error we saw on a config server + string errMsg; - // We need to catch DBExceptions b/c sometimes we throw them - // instead of socket exceptions when findN fails + for (unsigned i = 0; i < _configServers.size(); i++) { + BSONObj result; + std::unique_ptr<ScopedDbConnection> conn; - errMsg = e.toString(); - warning() << " couldn't check dbhash on config server " - << _configServers[i] << causedBy(e); + try { + conn.reset(new ScopedDbConnection(_configServers[i], 30.0)); + + if (!conn->get()->runCommand( + "config", + BSON("dbhash" << 1 << "collections" << BSON_ARRAY("chunks" + << "databases" + << "collections" + << "shards" + << "version")), + result)) { + errMsg = result["errmsg"].eoo() ? "" : result["errmsg"].String(); + if (!result["assertion"].eoo()) + errMsg = result["assertion"].String(); + + warning() << "couldn't check dbhash on config server " << _configServers[i] + << causedBy(result.toString()); + + result = BSONObj(); + } else { + result = result.getOwned(); + if (up == 0) + firstGood = i; + up++; + } + conn->done(); + } catch (const DBException& e) { + if (conn) { + conn->kill(); } - res.push_back(result); - } - if (_configServers.size() == 1) - return true; + // We need to catch DBExceptions b/c sometimes we throw them + // instead of socket exceptions when findN fails - if (up == 0) { - // Use a ptr to error so if empty we won't add causedby - error() << "no config servers successfully contacted" << causedBy(&errMsg); - return false; - } - else if (up == 1) { - warning() << "only 1 config server reachable, continuing"; - return true; + errMsg = e.toString(); + warning() << " couldn't check dbhash on config server " << _configServers[i] + << causedBy(e); } + res.push_back(result); + } - BSONObj base = res[firstGood]; - for (unsigned i = firstGood+1; i < res.size(); i++) { - if (res[i].isEmpty()) - continue; + if (_configServers.size() == 1) + return true; - string chunksHash1 = base.getFieldDotted("collections.chunks"); - string chunksHash2 = res[i].getFieldDotted("collections.chunks"); + if (up == 0) { + // Use a ptr to error so if empty we won't add causedby + error() << "no config servers successfully contacted" << causedBy(&errMsg); + return false; + } else if (up == 1) { + warning() << "only 1 config server reachable, continuing"; + return true; + } - string databaseHash1 = base.getFieldDotted("collections.databases"); - string databaseHash2 = res[i].getFieldDotted("collections.databases"); + BSONObj base = res[firstGood]; + for (unsigned i = firstGood + 1; i < res.size(); i++) { + if (res[i].isEmpty()) + continue; - string collectionsHash1 = base.getFieldDotted("collections.collections"); - string collectionsHash2 = res[i].getFieldDotted("collections.collections"); + string chunksHash1 = base.getFieldDotted("collections.chunks"); + string chunksHash2 = res[i].getFieldDotted("collections.chunks"); - string shardHash1 = base.getFieldDotted("collections.shards"); - string shardHash2 = res[i].getFieldDotted("collections.shards"); + string databaseHash1 = base.getFieldDotted("collections.databases"); + string databaseHash2 = res[i].getFieldDotted("collections.databases"); - string versionHash1 = base.getFieldDotted("collections.version") ; - string versionHash2 = res[i].getFieldDotted("collections.version"); + string collectionsHash1 = base.getFieldDotted("collections.collections"); + string collectionsHash2 = res[i].getFieldDotted("collections.collections"); - if (chunksHash1 == chunksHash2 && - databaseHash1 == databaseHash2 && - collectionsHash1 == collectionsHash2 && - shardHash1 == shardHash2 && - versionHash1 == versionHash2) { - continue; - } + string shardHash1 = base.getFieldDotted("collections.shards"); + string shardHash2 = res[i].getFieldDotted("collections.shards"); - warning() << "config servers " << _configServers[firstGood].toString() - << " and " << _configServers[i].toString() << " differ"; - if (tries <= 1) { - error() << ": " << base["collections"].Obj() - << " vs " << res[i]["collections"].Obj(); - return false; - } + string versionHash1 = base.getFieldDotted("collections.version"); + string versionHash2 = res[i].getFieldDotted("collections.version"); - return _checkConfigServersConsistent(tries - 1); + if (chunksHash1 == chunksHash2 && databaseHash1 == databaseHash2 && + collectionsHash1 == collectionsHash2 && shardHash1 == shardHash2 && + versionHash1 == versionHash2) { + continue; } - return true; + warning() << "config servers " << _configServers[firstGood].toString() << " and " + << _configServers[i].toString() << " differ"; + if (tries <= 1) { + error() << ": " << base["collections"].Obj() << " vs " << res[i]["collections"].Obj(); + return false; + } + + return _checkConfigServersConsistent(tries - 1); } - void CatalogManagerLegacy::_consistencyChecker() { - stdx::unique_lock<stdx::mutex> lk(_mutex); - while (!_inShutdown) { - lk.unlock(); - const bool isConsistent = _checkConfigServersConsistent(); + return true; +} - lk.lock(); - _consistentFromLastCheck = isConsistent; - if (_inShutdown) break; - _consistencyCheckerCV.wait_for(lk, Seconds(60)); - } - LOG(1) << "Consistency checker thread shutting down"; - } +void CatalogManagerLegacy::_consistencyChecker() { + stdx::unique_lock<stdx::mutex> lk(_mutex); + while (!_inShutdown) { + lk.unlock(); + const bool isConsistent = _checkConfigServersConsistent(); - bool CatalogManagerLegacy::_isConsistentFromLastCheck() { - stdx::unique_lock<stdx::mutex> lk(_mutex); - return _consistentFromLastCheck; + lk.lock(); + _consistentFromLastCheck = isConsistent; + if (_inShutdown) + break; + _consistencyCheckerCV.wait_for(lk, Seconds(60)); } + LOG(1) << "Consistency checker thread shutting down"; +} + +bool CatalogManagerLegacy::_isConsistentFromLastCheck() { + stdx::unique_lock<stdx::mutex> lk(_mutex); + return _consistentFromLastCheck; +} -} // namespace mongo +} // namespace mongo diff --git a/src/mongo/s/catalog/legacy/catalog_manager_legacy.h b/src/mongo/s/catalog/legacy/catalog_manager_legacy.h index aff0ce6d185..5a762a1aebc 100644 --- a/src/mongo/s/catalog/legacy/catalog_manager_legacy.h +++ b/src/mongo/s/catalog/legacy/catalog_manager_legacy.h @@ -39,173 +39,170 @@ namespace mongo { +/** + * Implements the catalog manager using the legacy 3-config server protocol. + */ +class CatalogManagerLegacy final : public CatalogManager { +public: + CatalogManagerLegacy(); + ~CatalogManagerLegacy(); + /** - * Implements the catalog manager using the legacy 3-config server protocol. + * Initializes the catalog manager with the hosts, which will be used as a configuration + * server. Can only be called once for the lifetime. */ - class CatalogManagerLegacy final : public CatalogManager { - public: - CatalogManagerLegacy(); - ~CatalogManagerLegacy(); + Status init(const ConnectionString& configCS); - /** - * Initializes the catalog manager with the hosts, which will be used as a configuration - * server. Can only be called once for the lifetime. - */ - Status init(const ConnectionString& configCS); + Status startup(bool upgrade) override; - Status startup(bool upgrade) override; + ConnectionString connectionString() const override; - ConnectionString connectionString() const override; + void shutDown() override; - void shutDown() override; + Status enableSharding(const std::string& dbName) override; - Status enableSharding(const std::string& dbName) override; + Status shardCollection(const std::string& ns, + const ShardKeyPattern& fieldsAndOrder, + bool unique, + std::vector<BSONObj>* initPoints, + std::set<ShardId>* initShardIds) override; - Status shardCollection(const std::string& ns, - const ShardKeyPattern& fieldsAndOrder, - bool unique, - std::vector<BSONObj>* initPoints, - std::set<ShardId>* initShardIds) override; + StatusWith<std::string> addShard(const std::string& name, + const ConnectionString& shardConnectionString, + const long long maxSize) override; - StatusWith<std::string> addShard(const std::string& name, - const ConnectionString& shardConnectionString, - const long long maxSize) override; + StatusWith<ShardDrainingStatus> removeShard(OperationContext* txn, + const std::string& name) override; - StatusWith<ShardDrainingStatus> removeShard(OperationContext* txn, - const std::string& name) override; + Status createDatabase(const std::string& dbName) override; - Status createDatabase(const std::string& dbName) override; + Status updateDatabase(const std::string& dbName, const DatabaseType& db) override; - Status updateDatabase(const std::string& dbName, const DatabaseType& db) override; + StatusWith<DatabaseType> getDatabase(const std::string& dbName) override; - StatusWith<DatabaseType> getDatabase(const std::string& dbName) override; + Status updateCollection(const std::string& collNs, const CollectionType& coll) override; - Status updateCollection(const std::string& collNs, const CollectionType& coll) override; + StatusWith<CollectionType> getCollection(const std::string& collNs) override; - StatusWith<CollectionType> getCollection(const std::string& collNs) override; + Status getCollections(const std::string* dbName, std::vector<CollectionType>* collections); - Status getCollections(const std::string* dbName, std::vector<CollectionType>* collections); + Status dropCollection(const std::string& collectionNs); - Status dropCollection(const std::string& collectionNs); + Status getDatabasesForShard(const std::string& shardName, + std::vector<std::string>* dbs) override; - Status getDatabasesForShard(const std::string& shardName, - std::vector<std::string>* dbs) override; + Status getChunks(const Query& query, int nToReturn, std::vector<ChunkType>* chunks) override; - Status getChunks(const Query& query, - int nToReturn, - std::vector<ChunkType>* chunks) override; + Status getTagsForCollection(const std::string& collectionNs, + std::vector<TagsType>* tags) override; - Status getTagsForCollection(const std::string& collectionNs, - std::vector<TagsType>* tags) override; + StatusWith<std::string> getTagForChunk(const std::string& collectionNs, + const ChunkType& chunk) override; - StatusWith<std::string> getTagForChunk(const std::string& collectionNs, - const ChunkType& chunk) override; + Status getAllShards(std::vector<ShardType>* shards) override; - Status getAllShards(std::vector<ShardType>* shards) override; + bool isShardHost(const ConnectionString& shardConnectionString) override; - bool isShardHost(const ConnectionString& shardConnectionString) override; - - /** - * Grabs a distributed lock and runs the command on all config servers. - */ - bool runUserManagementWriteCommand(const std::string& commandName, - const std::string& dbname, - const BSONObj& cmdObj, - BSONObjBuilder* result) override; + /** + * Grabs a distributed lock and runs the command on all config servers. + */ + bool runUserManagementWriteCommand(const std::string& commandName, + const std::string& dbname, + const BSONObj& cmdObj, + BSONObjBuilder* result) override; - bool runUserManagementReadCommand(const std::string& dbname, - const BSONObj& cmdObj, - BSONObjBuilder* result) override; + bool runUserManagementReadCommand(const std::string& dbname, + const BSONObj& cmdObj, + BSONObjBuilder* result) override; - Status applyChunkOpsDeprecated(const BSONArray& updateOps, - const BSONArray& preCondition) override; + Status applyChunkOpsDeprecated(const BSONArray& updateOps, + const BSONArray& preCondition) override; - void logAction(const ActionLogType& actionLog); + void logAction(const ActionLogType& actionLog); - void logChange(OperationContext* txn, - const std::string& what, - const std::string& ns, - const BSONObj& detail) override; + void logChange(OperationContext* txn, + const std::string& what, + const std::string& ns, + const BSONObj& detail) override; - StatusWith<SettingsType> getGlobalSettings(const std::string& key) override; + StatusWith<SettingsType> getGlobalSettings(const std::string& key) override; - void writeConfigServerDirect(const BatchedCommandRequest& request, - BatchedCommandResponse* response) override; + void writeConfigServerDirect(const BatchedCommandRequest& request, + BatchedCommandResponse* response) override; - DistLockManager* getDistLockManager() override; + DistLockManager* getDistLockManager() override; - private: +private: + /** + * Updates the config server's metadata to the current version. + */ + Status _checkAndUpgradeConfigMetadata(bool doUpgrade); - /** - * Updates the config server's metadata to the current version. - */ - Status _checkAndUpgradeConfigMetadata(bool doUpgrade); + /** + * Starts the thread that periodically checks data consistency amongst the config servers. + * Note: this is not thread safe and can only be called once for the lifetime. + */ + Status _startConfigServerChecker(); - /** - * Starts the thread that periodically checks data consistency amongst the config servers. - * Note: this is not thread safe and can only be called once for the lifetime. - */ - Status _startConfigServerChecker(); + /** + * Direct network check to see if a particular database does not already exist with the + * same name or different case. + */ + Status _checkDbDoesNotExist(const std::string& dbName) const; - /** - * Direct network check to see if a particular database does not already exist with the - * same name or different case. - */ - Status _checkDbDoesNotExist(const std::string& dbName) const; + /** + * Generates a new shard name "shard<xxxx>" + * where <xxxx> is an autoincrementing value and <xxxx> < 10000 + */ + StatusWith<std::string> _getNewShardName() const; - /** - * Generates a new shard name "shard<xxxx>" - * where <xxxx> is an autoincrementing value and <xxxx> < 10000 - */ - StatusWith<std::string> _getNewShardName() const; + /** + * Returns the number of shards recognized by the config servers + * in this sharded cluster. + * Optional: use query parameter to filter shard count. + */ + size_t _getShardCount(const BSONObj& query = {}) const; - /** - * Returns the number of shards recognized by the config servers - * in this sharded cluster. - * Optional: use query parameter to filter shard count. - */ - size_t _getShardCount(const BSONObj& query = {}) const; + /** + * Returns true if all config servers have the same state. + * If inconsistency detected on first attempt, checks at most 3 more times. + */ + bool _checkConfigServersConsistent(const unsigned tries = 4) const; - /** - * Returns true if all config servers have the same state. - * If inconsistency detected on first attempt, checks at most 3 more times. - */ - bool _checkConfigServersConsistent(const unsigned tries = 4) const; + /** + * Checks data consistency amongst config servers every 60 seconds. + */ + void _consistencyChecker(); - /** - * Checks data consistency amongst config servers every 60 seconds. - */ - void _consistencyChecker(); + /** + * Returns true if the config servers have the same contents since the last + * check was performed. + */ + bool _isConsistentFromLastCheck(); - /** - * Returns true if the config servers have the same contents since the last - * check was performed. - */ - bool _isConsistentFromLastCheck(); + // Parsed config server hosts, as specified on the command line. + ConnectionString _configServerConnectionString; + std::vector<ConnectionString> _configServers; - // Parsed config server hosts, as specified on the command line. - ConnectionString _configServerConnectionString; - std::vector<ConnectionString> _configServers; + // Distribted lock manager singleton. + std::unique_ptr<DistLockManager> _distLockManager; - // Distribted lock manager singleton. - std::unique_ptr<DistLockManager> _distLockManager; + // protects _inShutdown, _consistentFromLastCheck; used by _consistencyCheckerCV + stdx::mutex _mutex; - // protects _inShutdown, _consistentFromLastCheck; used by _consistencyCheckerCV - stdx::mutex _mutex; + // True if CatalogManagerLegacy::shutDown has been called. False, otherwise. + bool _inShutdown = false; - // True if CatalogManagerLegacy::shutDown has been called. False, otherwise. - bool _inShutdown = false; + // used by consistency checker thread to check if config + // servers are consistent + bool _consistentFromLastCheck = false; - // used by consistency checker thread to check if config - // servers are consistent - bool _consistentFromLastCheck = false; + // Thread that runs dbHash on config servers for checking data consistency. + stdx::thread _consistencyCheckerThread; - // Thread that runs dbHash on config servers for checking data consistency. - stdx::thread _consistencyCheckerThread; + // condition variable used by the consistency checker thread to wait + // for <= 60s, on every iteration, until shutDown is called + stdx::condition_variable _consistencyCheckerCV; +}; - // condition variable used by the consistency checker thread to wait - // for <= 60s, on every iteration, until shutDown is called - stdx::condition_variable _consistencyCheckerCV; - }; - -} // namespace mongo +} // namespace mongo diff --git a/src/mongo/s/catalog/legacy/cluster_client_internal.cpp b/src/mongo/s/catalog/legacy/cluster_client_internal.cpp index 5af263b5e56..c8b60c9c9dc 100644 --- a/src/mongo/s/catalog/legacy/cluster_client_internal.cpp +++ b/src/mongo/s/catalog/legacy/cluster_client_internal.cpp @@ -44,188 +44,175 @@ namespace mongo { - using std::endl; - using std::string; - using std::unique_ptr; - using std::vector; - using mongoutils::str::stream; - - Status checkClusterMongoVersions(CatalogManager* catalogManager, - const string& minMongoVersion) { - - unique_ptr<ScopedDbConnection> connPtr; - - // - // Find mongos pings in config server - // - - try { - connPtr.reset(new ScopedDbConnection(catalogManager->connectionString(), 30)); - ScopedDbConnection& conn = *connPtr; - unique_ptr<DBClientCursor> cursor(_safeCursor(conn->query(MongosType::ConfigNS, - Query()))); - - while (cursor->more()) { - - BSONObj pingDoc = cursor->next(); - - MongosType ping; - string errMsg; - // NOTE: We don't care if the ping is invalid, legacy stuff will be - if (!ping.parseBSON(pingDoc, &errMsg)) { - warning() << "could not parse ping document: " << pingDoc << causedBy(errMsg) - << endl; - continue; - } +using std::endl; +using std::string; +using std::unique_ptr; +using std::vector; +using mongoutils::str::stream; + +Status checkClusterMongoVersions(CatalogManager* catalogManager, const string& minMongoVersion) { + unique_ptr<ScopedDbConnection> connPtr; + + // + // Find mongos pings in config server + // + + try { + connPtr.reset(new ScopedDbConnection(catalogManager->connectionString(), 30)); + ScopedDbConnection& conn = *connPtr; + unique_ptr<DBClientCursor> cursor(_safeCursor(conn->query(MongosType::ConfigNS, Query()))); + + while (cursor->more()) { + BSONObj pingDoc = cursor->next(); + + MongosType ping; + string errMsg; + // NOTE: We don't care if the ping is invalid, legacy stuff will be + if (!ping.parseBSON(pingDoc, &errMsg)) { + warning() << "could not parse ping document: " << pingDoc << causedBy(errMsg) + << endl; + continue; + } - string mongoVersion = "2.0"; - // Hack to determine older mongos versions from ping format - if (ping.isWaitingSet()) mongoVersion = "2.2"; - if (ping.isMongoVersionSet() && ping.getMongoVersion() != "") { - mongoVersion = ping.getMongoVersion(); - } + string mongoVersion = "2.0"; + // Hack to determine older mongos versions from ping format + if (ping.isWaitingSet()) + mongoVersion = "2.2"; + if (ping.isMongoVersionSet() && ping.getMongoVersion() != "") { + mongoVersion = ping.getMongoVersion(); + } - Date_t lastPing = ping.getPing(); + Date_t lastPing = ping.getPing(); - Minutes quietIntervalMins{0}; - Date_t currentJsTime = jsTime(); - if (currentJsTime >= lastPing) { - quietIntervalMins = duration_cast<Minutes>(currentJsTime - lastPing); - } + Minutes quietIntervalMins{0}; + Date_t currentJsTime = jsTime(); + if (currentJsTime >= lastPing) { + quietIntervalMins = duration_cast<Minutes>(currentJsTime - lastPing); + } - // We assume that anything that hasn't pinged in 5 minutes is probably down - if (quietIntervalMins >= Minutes{5}) { - log() << "stale mongos detected " << quietIntervalMins.count() - << " minutes ago, network location is " << pingDoc["_id"].String() - << ", not checking version"; - } - else { - if (versionCmp(mongoVersion, minMongoVersion) < 0) { - return Status(ErrorCodes::RemoteValidationError, - stream() << "version " << mongoVersion - << " detected on mongos at " - << ping.getName() - << ", but version >= " << minMongoVersion - << " required; you must wait 5 minutes " - << "after shutting down a pre-" << minMongoVersion - << " mongos"); - } + // We assume that anything that hasn't pinged in 5 minutes is probably down + if (quietIntervalMins >= Minutes{5}) { + log() << "stale mongos detected " << quietIntervalMins.count() + << " minutes ago, network location is " << pingDoc["_id"].String() + << ", not checking version"; + } else { + if (versionCmp(mongoVersion, minMongoVersion) < 0) { + return Status( + ErrorCodes::RemoteValidationError, + stream() << "version " << mongoVersion << " detected on mongos at " + << ping.getName() << ", but version >= " << minMongoVersion + << " required; you must wait 5 minutes " + << "after shutting down a pre-" << minMongoVersion << " mongos"); } } } - catch (const DBException& e) { - return e.toStatus("could not read mongos pings collection"); - } + } catch (const DBException& e) { + return e.toStatus("could not read mongos pings collection"); + } - connPtr->done(); + connPtr->done(); - // - // Load shards from config server - // + // + // Load shards from config server + // - vector<HostAndPort> servers; + vector<HostAndPort> servers; - try { - vector<ShardType> shards; - Status status = catalogManager->getAllShards(&shards); + try { + vector<ShardType> shards; + Status status = catalogManager->getAllShards(&shards); + if (!status.isOK()) { + return status; + } + + for (const ShardType& shard : shards) { + Status status = shard.validate(); if (!status.isOK()) { - return status; + return Status(ErrorCodes::UnsupportedFormat, + stream() << "shard " << shard.toBSON() + << " failed validation: " << causedBy(status)); } - for (const ShardType& shard : shards) { - Status status = shard.validate(); - if (!status.isOK()) { - return Status(ErrorCodes::UnsupportedFormat, - stream() << "shard " << shard.toBSON() - << " failed validation: " << causedBy(status)); - } - - const auto shardConnStatus = ConnectionString::parse(shard.getHost()); - if (!shardConnStatus.isOK()) { - return Status(ErrorCodes::UnsupportedFormat, - stream() << "invalid shard host " << shard.getHost() - << " read from the config server" - << shardConnStatus.getStatus().toString()); - } - - vector<HostAndPort> shardServers = shardConnStatus.getValue().getServers(); - servers.insert(servers.end(), shardServers.begin(), shardServers.end()); + const auto shardConnStatus = ConnectionString::parse(shard.getHost()); + if (!shardConnStatus.isOK()) { + return Status(ErrorCodes::UnsupportedFormat, + stream() << "invalid shard host " << shard.getHost() + << " read from the config server" + << shardConnStatus.getStatus().toString()); } - } - catch (const DBException& e) { - return e.toStatus("could not read shards collection"); - } - - // Add config servers to list of servers to check version against - vector<HostAndPort> configServers = catalogManager->connectionString().getServers(); - servers.insert(servers.end(), configServers.begin(), configServers.end()); - // - // We've now got all the shard info from the config server, start contacting the shards - // and config servers and verifying their versions. - // + vector<HostAndPort> shardServers = shardConnStatus.getValue().getServers(); + servers.insert(servers.end(), shardServers.begin(), shardServers.end()); + } + } catch (const DBException& e) { + return e.toStatus("could not read shards collection"); + } - for (vector<HostAndPort>::iterator serverIt = servers.begin(); - serverIt != servers.end(); ++serverIt) { + // Add config servers to list of servers to check version against + vector<HostAndPort> configServers = catalogManager->connectionString().getServers(); + servers.insert(servers.end(), configServers.begin(), configServers.end()); - // Note: This will *always* be a single-host connection - ConnectionString serverLoc(*serverIt); - dassert(serverLoc.type() == ConnectionString::MASTER || - serverLoc.type() == ConnectionString::CUSTOM); // for dbtests + // + // We've now got all the shard info from the config server, start contacting the shards + // and config servers and verifying their versions. + // - log() << "checking that version of host " << serverLoc << " is compatible with " - << minMongoVersion << endl; + for (vector<HostAndPort>::iterator serverIt = servers.begin(); serverIt != servers.end(); + ++serverIt) { + // Note: This will *always* be a single-host connection + ConnectionString serverLoc(*serverIt); + dassert(serverLoc.type() == ConnectionString::MASTER || + serverLoc.type() == ConnectionString::CUSTOM); // for dbtests - unique_ptr<ScopedDbConnection> serverConnPtr; + log() << "checking that version of host " << serverLoc << " is compatible with " + << minMongoVersion << endl; - bool resultOk; - BSONObj buildInfo; + unique_ptr<ScopedDbConnection> serverConnPtr; - try { - serverConnPtr.reset(new ScopedDbConnection(serverLoc, 30)); - ScopedDbConnection& serverConn = *serverConnPtr; + bool resultOk; + BSONObj buildInfo; - resultOk = serverConn->runCommand("admin", - BSON("buildInfo" << 1), - buildInfo); - } - catch (const DBException& e) { - warning() << "could not run buildInfo command on " << serverLoc.toString() << " " - << causedBy(e) << ". Please ensure that this server is up and at a " - "version >= " - << minMongoVersion; - continue; - } + try { + serverConnPtr.reset(new ScopedDbConnection(serverLoc, 30)); + ScopedDbConnection& serverConn = *serverConnPtr; + + resultOk = serverConn->runCommand("admin", BSON("buildInfo" << 1), buildInfo); + } catch (const DBException& e) { + warning() << "could not run buildInfo command on " << serverLoc.toString() << " " + << causedBy(e) << ". Please ensure that this server is up and at a " + "version >= " << minMongoVersion; + continue; + } - // TODO: Make running commands saner such that we can consolidate error handling - if (!resultOk) { - return Status(ErrorCodes::UnknownError, - stream() << DBClientConnection::getLastErrorString(buildInfo) - << causedBy(buildInfo.toString())); - } + // TODO: Make running commands saner such that we can consolidate error handling + if (!resultOk) { + return Status(ErrorCodes::UnknownError, + stream() << DBClientConnection::getLastErrorString(buildInfo) + << causedBy(buildInfo.toString())); + } - serverConnPtr->done(); + serverConnPtr->done(); - verify(buildInfo["version"].type() == String); - string mongoVersion = buildInfo["version"].String(); + verify(buildInfo["version"].type() == String); + string mongoVersion = buildInfo["version"].String(); - if (versionCmp(mongoVersion, minMongoVersion) < 0) { - return Status(ErrorCodes::RemoteValidationError, - stream() << "version " << mongoVersion << " detected on mongo " - "server at " << serverLoc.toString() << - ", but version >= " << minMongoVersion << " required"); - } + if (versionCmp(mongoVersion, minMongoVersion) < 0) { + return Status(ErrorCodes::RemoteValidationError, + stream() << "version " << mongoVersion << " detected on mongo " + "server at " + << serverLoc.toString() << ", but version >= " << minMongoVersion + << " required"); } - - return Status::OK(); } - // Helper function for safe cursors - DBClientCursor* _safeCursor(unique_ptr<DBClientCursor> cursor) { - // TODO: Make error handling more consistent, it's annoying that cursors error out by - // throwing exceptions *and* being empty - uassert(16625, str::stream() << "cursor not found, transport error", cursor.get()); - return cursor.release(); - } + return Status::OK(); +} +// Helper function for safe cursors +DBClientCursor* _safeCursor(unique_ptr<DBClientCursor> cursor) { + // TODO: Make error handling more consistent, it's annoying that cursors error out by + // throwing exceptions *and* being empty + uassert(16625, str::stream() << "cursor not found, transport error", cursor.get()); + return cursor.release(); +} } diff --git a/src/mongo/s/catalog/legacy/cluster_client_internal.h b/src/mongo/s/catalog/legacy/cluster_client_internal.h index 8a140e171cd..ae60e519fbd 100644 --- a/src/mongo/s/catalog/legacy/cluster_client_internal.h +++ b/src/mongo/s/catalog/legacy/cluster_client_internal.h @@ -34,27 +34,26 @@ namespace mongo { - class CatalogManager; - class DBClientCursor; - class Status; - - /** - * Tries to check the versions of all active hosts in a cluster. Not 100% accurate, but pretty - * effective if hosts are reachable. - * - * Returns OK if hosts are compatible as far as we know, RemoteValidationError if hosts are not - * compatible, and an error Status if anything else goes wrong. - */ - Status checkClusterMongoVersions(CatalogManager* catalogManager, - const std::string& minMongoVersion); - - // - // Needed to normalize exception behavior of connections and cursors - // TODO: Remove when we refactor the client connection interface to something more consistent. - // - - // Helper function which throws for invalid cursor initialization. - // Note: cursor ownership will be passed to this function. - DBClientCursor* _safeCursor(std::unique_ptr<DBClientCursor> cursor); +class CatalogManager; +class DBClientCursor; +class Status; +/** + * Tries to check the versions of all active hosts in a cluster. Not 100% accurate, but pretty + * effective if hosts are reachable. + * + * Returns OK if hosts are compatible as far as we know, RemoteValidationError if hosts are not + * compatible, and an error Status if anything else goes wrong. + */ +Status checkClusterMongoVersions(CatalogManager* catalogManager, + const std::string& minMongoVersion); + +// +// Needed to normalize exception behavior of connections and cursors +// TODO: Remove when we refactor the client connection interface to something more consistent. +// + +// Helper function which throws for invalid cursor initialization. +// Note: cursor ownership will be passed to this function. +DBClientCursor* _safeCursor(std::unique_ptr<DBClientCursor> cursor); } diff --git a/src/mongo/s/catalog/legacy/config_coordinator.cpp b/src/mongo/s/catalog/legacy/config_coordinator.cpp index e7f33095be9..f4d02c5433a 100644 --- a/src/mongo/s/catalog/legacy/config_coordinator.cpp +++ b/src/mongo/s/catalog/legacy/config_coordinator.cpp @@ -43,431 +43,424 @@ namespace mongo { - using std::string; - using std::vector; +using std::string; +using std::vector; namespace { - /** - * A BSON serializable object representing a setShardVersion command request. - */ - class SSVRequest : public BSONSerializable { - MONGO_DISALLOW_COPYING(SSVRequest); - public: - SSVRequest(const std::string& configDBString) : _configDBString(configDBString) { - - } - - bool isValid(std::string* errMsg) const { - return true; - } +/** + * A BSON serializable object representing a setShardVersion command request. + */ +class SSVRequest : public BSONSerializable { + MONGO_DISALLOW_COPYING(SSVRequest); - /** Returns the BSON representation of the entry. */ - BSONObj toBSON() const { - BSONObjBuilder builder; - builder.append("setShardVersion", ""); // empty ns for init - builder.append("configdb", _configDBString); - builder.append("init", true); - builder.append("authoritative", true); - return builder.obj(); - } +public: + SSVRequest(const std::string& configDBString) : _configDBString(configDBString) {} - bool parseBSON(const BSONObj& source, std::string* errMsg) { - // Not implemented - invariant(false); - return false; - } + bool isValid(std::string* errMsg) const { + return true; + } - void clear() { - // Not implemented - invariant(false); - } + /** Returns the BSON representation of the entry. */ + BSONObj toBSON() const { + BSONObjBuilder builder; + builder.append("setShardVersion", ""); // empty ns for init + builder.append("configdb", _configDBString); + builder.append("init", true); + builder.append("authoritative", true); + return builder.obj(); + } - string toString() const { - return toBSON().toString(); - } + bool parseBSON(const BSONObj& source, std::string* errMsg) { + // Not implemented + invariant(false); + return false; + } - private: - const std::string _configDBString; - }; + void clear() { + // Not implemented + invariant(false); + } - /** - * A BSON serializable object representing a setShardVersion command response. - */ - class SSVResponse : public BSONSerializable { - MONGO_DISALLOW_COPYING(SSVResponse); - public: - static const BSONField<int> ok; - static const BSONField<int> errCode; - static const BSONField<string> errMessage; + string toString() const { + return toBSON().toString(); + } +private: + const std::string _configDBString; +}; - SSVResponse() { - clear(); - } +/** + * A BSON serializable object representing a setShardVersion command response. + */ +class SSVResponse : public BSONSerializable { + MONGO_DISALLOW_COPYING(SSVResponse); - bool isValid(std::string* errMsg) const { - return _isOkSet; - } +public: + static const BSONField<int> ok; + static const BSONField<int> errCode; + static const BSONField<string> errMessage; - BSONObj toBSON() const { - BSONObjBuilder builder; - if (_isOkSet) builder << ok(_ok); - if (_isErrCodeSet) builder << errCode(_errCode); - if (_isErrMessageSet) builder << errMessage(_errMessage); + SSVResponse() { + clear(); + } - return builder.obj(); - } + bool isValid(std::string* errMsg) const { + return _isOkSet; + } - bool parseBSON(const BSONObj& source, std::string* errMsg) { - FieldParser::FieldState result; + BSONObj toBSON() const { + BSONObjBuilder builder; - result = FieldParser::extractNumber(source, ok, &_ok, errMsg); - if (result == FieldParser::FIELD_INVALID) { - return false; - } - _isOkSet = result != FieldParser::FIELD_NONE; + if (_isOkSet) + builder << ok(_ok); + if (_isErrCodeSet) + builder << errCode(_errCode); + if (_isErrMessageSet) + builder << errMessage(_errMessage); - result = FieldParser::extract(source, errCode, &_errCode, errMsg); - if (result == FieldParser::FIELD_INVALID) { - return false; - } - _isErrCodeSet = result != FieldParser::FIELD_NONE; + return builder.obj(); + } - result = FieldParser::extract(source, errMessage, &_errMessage, errMsg); - if (result == FieldParser::FIELD_INVALID) { - return false; - } - _isErrMessageSet = result != FieldParser::FIELD_NONE; + bool parseBSON(const BSONObj& source, std::string* errMsg) { + FieldParser::FieldState result; - return true; + result = FieldParser::extractNumber(source, ok, &_ok, errMsg); + if (result == FieldParser::FIELD_INVALID) { + return false; } + _isOkSet = result != FieldParser::FIELD_NONE; - void clear() { - _ok = false; - _isOkSet = false; - - _errCode = 0; - _isErrCodeSet = false; - - _errMessage = ""; - _isErrMessageSet = false; + result = FieldParser::extract(source, errCode, &_errCode, errMsg); + if (result == FieldParser::FIELD_INVALID) { + return false; } + _isErrCodeSet = result != FieldParser::FIELD_NONE; - string toString() const { - return toBSON().toString(); + result = FieldParser::extract(source, errMessage, &_errMessage, errMsg); + if (result == FieldParser::FIELD_INVALID) { + return false; } + _isErrMessageSet = result != FieldParser::FIELD_NONE; - int getOk() { - dassert(_isOkSet); - return _ok; - } + return true; + } - void setOk(int ok) { - _ok = ok; - _isOkSet = true; - } + void clear() { + _ok = false; + _isOkSet = false; - int getErrCode() { - if (_isErrCodeSet) { - return _errCode; - } - else { - return errCode.getDefault(); - } - } + _errCode = 0; + _isErrCodeSet = false; - void setErrCode(int errCode) { - _errCode = errCode; - _isErrCodeSet = true; - } + _errMessage = ""; + _isErrMessageSet = false; + } - bool isErrCodeSet() const { - return _isErrCodeSet; - } + string toString() const { + return toBSON().toString(); + } - const string& getErrMessage() { - dassert(_isErrMessageSet); - return _errMessage; - } + int getOk() { + dassert(_isOkSet); + return _ok; + } - void setErrMessage(StringData errMsg) { - _errMessage = errMsg.toString(); - _isErrMessageSet = true; + void setOk(int ok) { + _ok = ok; + _isOkSet = true; + } + + int getErrCode() { + if (_isErrCodeSet) { + return _errCode; + } else { + return errCode.getDefault(); } + } - private: - int _ok; - bool _isOkSet; + void setErrCode(int errCode) { + _errCode = errCode; + _isErrCodeSet = true; + } - int _errCode; - bool _isErrCodeSet; + bool isErrCodeSet() const { + return _isErrCodeSet; + } - string _errMessage; - bool _isErrMessageSet; - }; + const string& getErrMessage() { + dassert(_isErrMessageSet); + return _errMessage; + } - const BSONField<int> SSVResponse::ok("ok"); - const BSONField<int> SSVResponse::errCode("code"); - const BSONField<string> SSVResponse::errMessage("errmsg"); + void setErrMessage(StringData errMsg) { + _errMessage = errMsg.toString(); + _isErrMessageSet = true; + } +private: + int _ok; + bool _isOkSet; - struct ConfigResponse { - ConnectionString configHost; - BatchedCommandResponse response; - }; + int _errCode; + bool _isErrCodeSet; - void buildErrorFrom(const Status& status, BatchedCommandResponse* response) { - response->setOk(false); - response->setErrCode(static_cast<int>(status.code())); - response->setErrMessage(status.reason()); + string _errMessage; + bool _isErrMessageSet; +}; - dassert(response->isValid(NULL)); - } +const BSONField<int> SSVResponse::ok("ok"); +const BSONField<int> SSVResponse::errCode("code"); +const BSONField<string> SSVResponse::errMessage("errmsg"); - bool areResponsesEqual(const BatchedCommandResponse& responseA, - const BatchedCommandResponse& responseB) { - // Note: This needs to also take into account comparing responses from legacy writes - // and write commands. +struct ConfigResponse { + ConnectionString configHost; + BatchedCommandResponse response; +}; - // TODO: Better reporting of why not equal - if (responseA.getOk() != responseB.getOk()) { - return false; - } +void buildErrorFrom(const Status& status, BatchedCommandResponse* response) { + response->setOk(false); + response->setErrCode(static_cast<int>(status.code())); + response->setErrMessage(status.reason()); - if (responseA.getN() != responseB.getN()) { - return false; - } + dassert(response->isValid(NULL)); +} - if (responseA.isUpsertDetailsSet()) { - // TODO: - } +bool areResponsesEqual(const BatchedCommandResponse& responseA, + const BatchedCommandResponse& responseB) { + // Note: This needs to also take into account comparing responses from legacy writes + // and write commands. - if (responseA.getOk()) { - return true; - } + // TODO: Better reporting of why not equal + if (responseA.getOk() != responseB.getOk()) { + return false; + } + + if (responseA.getN() != responseB.getN()) { + return false; + } - // TODO: Compare errors here + if (responseA.isUpsertDetailsSet()) { + // TODO: + } + if (responseA.getOk()) { return true; } - bool areAllResponsesEqual(const vector<ConfigResponse*>& responses) { - BatchedCommandResponse* lastResponse = NULL; + // TODO: Compare errors here - for (vector<ConfigResponse*>::const_iterator it = responses.begin(); - it != responses.end(); - ++it) { + return true; +} - BatchedCommandResponse* response = &(*it)->response; +bool areAllResponsesEqual(const vector<ConfigResponse*>& responses) { + BatchedCommandResponse* lastResponse = NULL; - if (lastResponse != NULL) { - if (!areResponsesEqual(*lastResponse, *response)) { - return false; - } - } + for (vector<ConfigResponse*>::const_iterator it = responses.begin(); it != responses.end(); + ++it) { + BatchedCommandResponse* response = &(*it)->response; - lastResponse = response; + if (lastResponse != NULL) { + if (!areResponsesEqual(*lastResponse, *response)) { + return false; + } } - return true; + lastResponse = response; } - void combineResponses(const vector<ConfigResponse*>& responses, - BatchedCommandResponse* clientResponse) { - - if (areAllResponsesEqual(responses)) { - responses.front()->response.cloneTo(clientResponse); - return; - } - - BSONObjBuilder builder; - for (vector<ConfigResponse*>::const_iterator it = responses.begin(); - it != responses.end(); - ++it) { + return true; +} - builder.append((*it)->configHost.toString(), (*it)->response.toBSON()); - } +void combineResponses(const vector<ConfigResponse*>& responses, + BatchedCommandResponse* clientResponse) { + if (areAllResponsesEqual(responses)) { + responses.front()->response.cloneTo(clientResponse); + return; + } - clientResponse->setOk(false); - clientResponse->setErrCode(ErrorCodes::ManualInterventionRequired); - clientResponse->setErrMessage("config write was not consistent, " - "manual intervention may be required. " - "config responses: " + builder.obj().toString()); + BSONObjBuilder builder; + for (vector<ConfigResponse*>::const_iterator it = responses.begin(); it != responses.end(); + ++it) { + builder.append((*it)->configHost.toString(), (*it)->response.toBSON()); } -} // namespace + clientResponse->setOk(false); + clientResponse->setErrCode(ErrorCodes::ManualInterventionRequired); + clientResponse->setErrMessage( + "config write was not consistent, " + "manual intervention may be required. " + "config responses: " + + builder.obj().toString()); +} +} // namespace - ConfigCoordinator::ConfigCoordinator(MultiCommandDispatch* dispatcher, - const ConnectionString& configServerConnectionString) - : _dispatcher(dispatcher), - _configServerConnectionString(configServerConnectionString) { - } +ConfigCoordinator::ConfigCoordinator(MultiCommandDispatch* dispatcher, + const ConnectionString& configServerConnectionString) + : _dispatcher(dispatcher), _configServerConnectionString(configServerConnectionString) {} - bool ConfigCoordinator::_checkConfigString(BatchedCommandResponse* clientResponse) { - // - // Send side - // +bool ConfigCoordinator::_checkConfigString(BatchedCommandResponse* clientResponse) { + // + // Send side + // - for (const HostAndPort& server : _configServerConnectionString.getServers()) { - SSVRequest ssvRequest(_configServerConnectionString.toString()); - _dispatcher->addCommand(ConnectionString(server), "admin", ssvRequest); - } + for (const HostAndPort& server : _configServerConnectionString.getServers()) { + SSVRequest ssvRequest(_configServerConnectionString.toString()); + _dispatcher->addCommand(ConnectionString(server), "admin", ssvRequest); + } - _dispatcher->sendAll(); + _dispatcher->sendAll(); - // - // Recv side - // + // + // Recv side + // - bool ssvError = false; - while (_dispatcher->numPending() > 0) { - ConnectionString configHost; - SSVResponse response; + bool ssvError = false; + while (_dispatcher->numPending() > 0) { + ConnectionString configHost; + SSVResponse response; - // We've got to recv everything, no matter what - even if some failed. - Status dispatchStatus = _dispatcher->recvAny(&configHost, &response); + // We've got to recv everything, no matter what - even if some failed. + Status dispatchStatus = _dispatcher->recvAny(&configHost, &response); - if (ssvError) { - // record only the first failure. - continue; - } + if (ssvError) { + // record only the first failure. + continue; + } - if (!dispatchStatus.isOK()) { - ssvError = true; - clientResponse->setOk(false); - clientResponse->setErrCode(static_cast<int>(dispatchStatus.code())); - clientResponse->setErrMessage(dispatchStatus.reason()); - } - else if (!response.getOk()) { - ssvError = true; - clientResponse->setOk(false); - clientResponse->setErrMessage(response.getErrMessage()); + if (!dispatchStatus.isOK()) { + ssvError = true; + clientResponse->setOk(false); + clientResponse->setErrCode(static_cast<int>(dispatchStatus.code())); + clientResponse->setErrMessage(dispatchStatus.reason()); + } else if (!response.getOk()) { + ssvError = true; + clientResponse->setOk(false); + clientResponse->setErrMessage(response.getErrMessage()); - if (response.isErrCodeSet()) { - clientResponse->setErrCode(response.getErrCode()); - } + if (response.isErrCodeSet()) { + clientResponse->setErrCode(response.getErrCode()); } } - - return !ssvError; } - /** - * The core config write functionality. - * - * Config writes run in two passes - the first is a quick check to ensure the config servers - * are all reachable, the second runs the actual write. - * - * TODO: Upgrade and move this logic to the config servers, a state machine implementation - * is probably the next step. - */ - void ConfigCoordinator::executeBatch(const BatchedCommandRequest& clientRequest, - BatchedCommandResponse* clientResponse) { - - const NamespaceString nss(clientRequest.getNS()); - - // Should never use it for anything other than DBs residing on the config server - dassert(nss.db() == "config" || nss.db() == "admin"); - dassert(clientRequest.sizeWriteOps() == 1u); - - // This is an opportunistic check that all config servers look healthy by calling - // getLastError on each one of them. If there was some form of write/journaling error, get - // last error would fail. - { - for (const HostAndPort& server : _configServerConnectionString.getServers()) { - _dispatcher->addCommand(ConnectionString(server), - "admin", - RawBSONSerializable(BSON("getLastError" << true << - "fsync" << true))); - } + return !ssvError; +} + +/** + * The core config write functionality. + * + * Config writes run in two passes - the first is a quick check to ensure the config servers + * are all reachable, the second runs the actual write. + * + * TODO: Upgrade and move this logic to the config servers, a state machine implementation + * is probably the next step. + */ +void ConfigCoordinator::executeBatch(const BatchedCommandRequest& clientRequest, + BatchedCommandResponse* clientResponse) { + const NamespaceString nss(clientRequest.getNS()); + + // Should never use it for anything other than DBs residing on the config server + dassert(nss.db() == "config" || nss.db() == "admin"); + dassert(clientRequest.sizeWriteOps() == 1u); + + // This is an opportunistic check that all config servers look healthy by calling + // getLastError on each one of them. If there was some form of write/journaling error, get + // last error would fail. + { + for (const HostAndPort& server : _configServerConnectionString.getServers()) { + _dispatcher->addCommand( + ConnectionString(server), + "admin", + RawBSONSerializable(BSON("getLastError" << true << "fsync" << true))); + } - _dispatcher->sendAll(); + _dispatcher->sendAll(); - bool error = false; - while (_dispatcher->numPending()) { - ConnectionString host; - RawBSONSerializable response; + bool error = false; + while (_dispatcher->numPending()) { + ConnectionString host; + RawBSONSerializable response; - Status status = _dispatcher->recvAny(&host, &response); - if (status.isOK()) { - BSONObj obj = response.toBSON(); + Status status = _dispatcher->recvAny(&host, &response); + if (status.isOK()) { + BSONObj obj = response.toBSON(); - LOG(3) << "Response " << obj.toString(); + LOG(3) << "Response " << obj.toString(); - // If the ok field is anything other than 1, count it as error - if (!obj["ok"].trueValue()) { - error = true; - log() << "Config server check for host " << host - << " returned error: " << response; - } - } - else { + // If the ok field is anything other than 1, count it as error + if (!obj["ok"].trueValue()) { error = true; log() << "Config server check for host " << host - << " failed with status: " << status; + << " returned error: " << response; } - } - - // All responses should have been gathered by this point - if (error) { - clientResponse->setOk(false); - clientResponse->setErrCode(ErrorCodes::RemoteValidationError); - clientResponse->setErrMessage("Could not verify that config servers were active" - " and reachable before write"); - return; + } else { + error = true; + log() << "Config server check for host " << host + << " failed with status: " << status; } } - if (!_checkConfigString(clientResponse)) { + // All responses should have been gathered by this point + if (error) { + clientResponse->setOk(false); + clientResponse->setErrCode(ErrorCodes::RemoteValidationError); + clientResponse->setErrMessage( + "Could not verify that config servers were active" + " and reachable before write"); return; } + } - // - // Do the actual writes - // + if (!_checkConfigString(clientResponse)) { + return; + } - BatchedCommandRequest configRequest( clientRequest.getBatchType() ); - clientRequest.cloneTo( &configRequest ); - configRequest.setNS( nss.coll() ); + // + // Do the actual writes + // - OwnedPointerVector<ConfigResponse> responsesOwned; - vector<ConfigResponse*>& responses = responsesOwned.mutableVector(); + BatchedCommandRequest configRequest(clientRequest.getBatchType()); + clientRequest.cloneTo(&configRequest); + configRequest.setNS(nss.coll()); - // - // Send the actual config writes - // + OwnedPointerVector<ConfigResponse> responsesOwned; + vector<ConfigResponse*>& responses = responsesOwned.mutableVector(); - // Get as many batches as we can at once - for (const HostAndPort& server : _configServerConnectionString.getServers()) { - _dispatcher->addCommand(ConnectionString(server), nss.db(), configRequest); - } + // + // Send the actual config writes + // - // Send them all out - _dispatcher->sendAll(); + // Get as many batches as we can at once + for (const HostAndPort& server : _configServerConnectionString.getServers()) { + _dispatcher->addCommand(ConnectionString(server), nss.db(), configRequest); + } - // - // Recv side - // + // Send them all out + _dispatcher->sendAll(); - while (_dispatcher->numPending() > 0) { - // Get the response - responses.push_back(new ConfigResponse()); + // + // Recv side + // - ConfigResponse& configResponse = *responses.back(); - Status dispatchStatus = _dispatcher->recvAny(&configResponse.configHost, - &configResponse.response); + while (_dispatcher->numPending() > 0) { + // Get the response + responses.push_back(new ConfigResponse()); - if (!dispatchStatus.isOK()) { - buildErrorFrom(dispatchStatus, &configResponse.response); - } - } + ConfigResponse& configResponse = *responses.back(); + Status dispatchStatus = + _dispatcher->recvAny(&configResponse.configHost, &configResponse.response); - combineResponses(responses, clientResponse); + if (!dispatchStatus.isOK()) { + buildErrorFrom(dispatchStatus, &configResponse.response); + } } -} // namespace mongo + combineResponses(responses, clientResponse); +} + +} // namespace mongo diff --git a/src/mongo/s/catalog/legacy/config_coordinator.h b/src/mongo/s/catalog/legacy/config_coordinator.h index 3cd0301f23f..b1a3e18ca88 100644 --- a/src/mongo/s/catalog/legacy/config_coordinator.h +++ b/src/mongo/s/catalog/legacy/config_coordinator.h @@ -36,27 +36,26 @@ namespace mongo { - class MultiCommandDispatch; +class MultiCommandDispatch; - class ConfigCoordinator { - public: - ConfigCoordinator(MultiCommandDispatch* dispatcher, - const ConnectionString& configServerConnectionString); +class ConfigCoordinator { +public: + ConfigCoordinator(MultiCommandDispatch* dispatcher, + const ConnectionString& configServerConnectionString); - void executeBatch(const BatchedCommandRequest& request, BatchedCommandResponse* response); + void executeBatch(const BatchedCommandRequest& request, BatchedCommandResponse* response); - private: - /** - * Initialize configDB string in config server or if already initialized, - * check that it matches. Returns false if an error occured. - */ - bool _checkConfigString(BatchedCommandResponse* clientResponse); +private: + /** + * Initialize configDB string in config server or if already initialized, + * check that it matches. Returns false if an error occured. + */ + bool _checkConfigString(BatchedCommandResponse* clientResponse); - // Not owned here - MultiCommandDispatch* const _dispatcher; - - const ConnectionString _configServerConnectionString; - }; + // Not owned here + MultiCommandDispatch* const _dispatcher; + const ConnectionString _configServerConnectionString; +}; } diff --git a/src/mongo/s/catalog/legacy/config_upgrade.cpp b/src/mongo/s/catalog/legacy/config_upgrade.cpp index 2ed119d914f..3592d109e2b 100644 --- a/src/mongo/s/catalog/legacy/config_upgrade.cpp +++ b/src/mongo/s/catalog/legacy/config_upgrade.cpp @@ -52,519 +52,492 @@ namespace mongo { - using std::unique_ptr; - using std::make_pair; - using std::map; - using std::string; - using std::vector; - using str::stream; - - // Implemented in the respective steps' .cpp file - bool doUpgradeV0ToV7(CatalogManager* catalogManager, - const VersionType& lastVersionInfo, - std::string* errMsg); - - bool doUpgradeV6ToV7(CatalogManager* catalogManager, - const VersionType& lastVersionInfo, - std::string* errMsg); +using std::unique_ptr; +using std::make_pair; +using std::map; +using std::string; +using std::vector; +using str::stream; + +// Implemented in the respective steps' .cpp file +bool doUpgradeV0ToV7(CatalogManager* catalogManager, + const VersionType& lastVersionInfo, + std::string* errMsg); + +bool doUpgradeV6ToV7(CatalogManager* catalogManager, + const VersionType& lastVersionInfo, + std::string* errMsg); namespace { - struct VersionRange { - VersionRange(int _minCompatibleVersion, int _currentVersion) - : minCompatibleVersion(_minCompatibleVersion), - currentVersion(_currentVersion) { +struct VersionRange { + VersionRange(int _minCompatibleVersion, int _currentVersion) + : minCompatibleVersion(_minCompatibleVersion), currentVersion(_currentVersion) {} - } - - bool operator==(const VersionRange& other) const { - return (other.minCompatibleVersion == minCompatibleVersion) - && (other.currentVersion == currentVersion); - } - - bool operator!=(const VersionRange& other) const { - return !(*this == other); - } - - int minCompatibleVersion; - int currentVersion; - }; - - enum VersionStatus { - // No way to upgrade the test version to be compatible with current version - VersionStatus_Incompatible, + bool operator==(const VersionRange& other) const { + return (other.minCompatibleVersion == minCompatibleVersion) && + (other.currentVersion == currentVersion); + } - // Current version is compatible with test version - VersionStatus_Compatible, + bool operator!=(const VersionRange& other) const { + return !(*this == other); + } - // Test version must be upgraded to be compatible with current version - VersionStatus_NeedUpgrade - }; + int minCompatibleVersion; + int currentVersion; +}; - /** - * Encapsulates the information needed to register a config upgrade. - */ - struct UpgradeStep { - typedef stdx::function<bool(CatalogManager*, const VersionType&, string*)> UpgradeCallback; +enum VersionStatus { + // No way to upgrade the test version to be compatible with current version + VersionStatus_Incompatible, - UpgradeStep(int _fromVersion, - const VersionRange& _toVersionRange, - UpgradeCallback _upgradeCallback) - : fromVersion(_fromVersion), - toVersionRange(_toVersionRange), - upgradeCallback(_upgradeCallback) { + // Current version is compatible with test version + VersionStatus_Compatible, - } + // Test version must be upgraded to be compatible with current version + VersionStatus_NeedUpgrade +}; - // The config version we're upgrading from - int fromVersion; +/** + * Encapsulates the information needed to register a config upgrade. + */ +struct UpgradeStep { + typedef stdx::function<bool(CatalogManager*, const VersionType&, string*)> UpgradeCallback; - // The config version we're upgrading to and the min compatible config version (min, to) - VersionRange toVersionRange; + UpgradeStep(int _fromVersion, + const VersionRange& _toVersionRange, + UpgradeCallback _upgradeCallback) + : fromVersion(_fromVersion), + toVersionRange(_toVersionRange), + upgradeCallback(_upgradeCallback) {} - // The upgrade callback which performs the actual upgrade - UpgradeCallback upgradeCallback; - }; + // The config version we're upgrading from + int fromVersion; - typedef map<int, UpgradeStep> ConfigUpgradeRegistry; + // The config version we're upgrading to and the min compatible config version (min, to) + VersionRange toVersionRange; - /** - * Does a sanity-check validation of the registry ensuring three things: - * 1. All upgrade paths lead to the same minCompatible/currentVersion - * 2. Our constants match this final version pair - * 3. There is a zero-version upgrade path - */ - void validateRegistry(const ConfigUpgradeRegistry& registry) { - VersionRange maxCompatibleConfigVersionRange(-1, -1); - bool hasZeroVersionUpgrade = false; + // The upgrade callback which performs the actual upgrade + UpgradeCallback upgradeCallback; +}; - for (const auto& upgradeStep : registry) { - const UpgradeStep& upgrade = upgradeStep.second; +typedef map<int, UpgradeStep> ConfigUpgradeRegistry; - if (upgrade.fromVersion == 0) { - hasZeroVersionUpgrade = true; - } +/** + * Does a sanity-check validation of the registry ensuring three things: + * 1. All upgrade paths lead to the same minCompatible/currentVersion + * 2. Our constants match this final version pair + * 3. There is a zero-version upgrade path + */ +void validateRegistry(const ConfigUpgradeRegistry& registry) { + VersionRange maxCompatibleConfigVersionRange(-1, -1); + bool hasZeroVersionUpgrade = false; - if (maxCompatibleConfigVersionRange.currentVersion - < upgrade.toVersionRange.currentVersion) { + for (const auto& upgradeStep : registry) { + const UpgradeStep& upgrade = upgradeStep.second; - maxCompatibleConfigVersionRange = upgrade.toVersionRange; - } - else if (maxCompatibleConfigVersionRange.currentVersion - == upgrade.toVersionRange.currentVersion) { + if (upgrade.fromVersion == 0) { + hasZeroVersionUpgrade = true; + } - // Make sure all max upgrade paths end up with same version and compatibility - fassert(16621, maxCompatibleConfigVersionRange == upgrade.toVersionRange); - } + if (maxCompatibleConfigVersionRange.currentVersion < + upgrade.toVersionRange.currentVersion) { + maxCompatibleConfigVersionRange = upgrade.toVersionRange; + } else if (maxCompatibleConfigVersionRange.currentVersion == + upgrade.toVersionRange.currentVersion) { + // Make sure all max upgrade paths end up with same version and compatibility + fassert(16621, maxCompatibleConfigVersionRange == upgrade.toVersionRange); } + } - // Make sure we have a zero-version upgrade - fassert(16622, hasZeroVersionUpgrade); + // Make sure we have a zero-version upgrade + fassert(16622, hasZeroVersionUpgrade); - // Make sure our max registered range is the same as our constants - fassert(16623, - maxCompatibleConfigVersionRange - == VersionRange(MIN_COMPATIBLE_CONFIG_VERSION, CURRENT_CONFIG_VERSION)); - } + // Make sure our max registered range is the same as our constants + fassert(16623, + maxCompatibleConfigVersionRange == + VersionRange(MIN_COMPATIBLE_CONFIG_VERSION, CURRENT_CONFIG_VERSION)); +} - /** - * Creates a registry of config upgrades used by the code below. - * - * MODIFY THIS CODE HERE TO CREATE A NEW UPGRADE PATH FROM X to Y - * YOU MUST ALSO MODIFY THE VERSION DECLARATIONS IN config_upgrade.h - * - * Caveats: - * - All upgrade paths must eventually lead to the exact same version range of - * min and max compatible versions. - * - This resulting version range must be equal to: - * make_pair(MIN_COMPATIBLE_CONFIG_VERSION, CURRENT_CONFIG_VERSION) - * - There must always be an upgrade path from the empty version (0) to the latest - * config version. - * - * If any of the above is false, we fassert and fail to start. - */ - ConfigUpgradeRegistry createRegistry() { - ConfigUpgradeRegistry registry; - - // v0 to v7 - UpgradeStep v0ToV7(0, VersionRange(6, 7), doUpgradeV0ToV7); - registry.insert(make_pair(v0ToV7.fromVersion, v0ToV7)); - - // v6 to v7 - UpgradeStep v6ToV7(6, VersionRange(6, 7), doUpgradeV6ToV7); - registry.insert(make_pair(v6ToV7.fromVersion, v6ToV7)); - - validateRegistry(registry); - - return registry; - } +/** + * Creates a registry of config upgrades used by the code below. + * + * MODIFY THIS CODE HERE TO CREATE A NEW UPGRADE PATH FROM X to Y + * YOU MUST ALSO MODIFY THE VERSION DECLARATIONS IN config_upgrade.h + * + * Caveats: + * - All upgrade paths must eventually lead to the exact same version range of + * min and max compatible versions. + * - This resulting version range must be equal to: + * make_pair(MIN_COMPATIBLE_CONFIG_VERSION, CURRENT_CONFIG_VERSION) + * - There must always be an upgrade path from the empty version (0) to the latest + * config version. + * + * If any of the above is false, we fassert and fail to start. + */ +ConfigUpgradeRegistry createRegistry() { + ConfigUpgradeRegistry registry; - /** - * Checks whether or not a particular cluster version is compatible with our current - * version and mongodb version. The version is compatible if it falls between the - * MIN_COMPATIBLE_CONFIG_VERSION and CURRENT_CONFIG_VERSION and is not explicitly excluded. - * - * @return a VersionStatus enum indicating compatibility - */ - VersionStatus isConfigVersionCompatible(const VersionType& versionInfo, string* whyNot) { - string dummy; - if (!whyNot) { - whyNot = &dummy; - } + // v0 to v7 + UpgradeStep v0ToV7(0, VersionRange(6, 7), doUpgradeV0ToV7); + registry.insert(make_pair(v0ToV7.fromVersion, v0ToV7)); - // Check if we're empty - if (versionInfo.getCurrentVersion() == UpgradeHistory_EmptyVersion) { - return VersionStatus_NeedUpgrade; - } + // v6 to v7 + UpgradeStep v6ToV7(6, VersionRange(6, 7), doUpgradeV6ToV7); + registry.insert(make_pair(v6ToV7.fromVersion, v6ToV7)); - // Check that we aren't too old - if (CURRENT_CONFIG_VERSION < versionInfo.getMinCompatibleVersion()) { + validateRegistry(registry); - *whyNot = stream() << "the config version " << CURRENT_CONFIG_VERSION - << " of our process is too old " - << "for the detected config version " - << versionInfo.getMinCompatibleVersion(); + return registry; +} - return VersionStatus_Incompatible; - } +/** + * Checks whether or not a particular cluster version is compatible with our current + * version and mongodb version. The version is compatible if it falls between the + * MIN_COMPATIBLE_CONFIG_VERSION and CURRENT_CONFIG_VERSION and is not explicitly excluded. + * + * @return a VersionStatus enum indicating compatibility + */ +VersionStatus isConfigVersionCompatible(const VersionType& versionInfo, string* whyNot) { + string dummy; + if (!whyNot) { + whyNot = &dummy; + } - // Check that the mongo version of this process hasn't been excluded from the cluster - vector<MongoVersionRange> excludedRanges; - if (versionInfo.isExcludingMongoVersionsSet() && - !MongoVersionRange::parseBSONArray(versionInfo.getExcludingMongoVersions(), - &excludedRanges, - whyNot)) - { + // Check if we're empty + if (versionInfo.getCurrentVersion() == UpgradeHistory_EmptyVersion) { + return VersionStatus_NeedUpgrade; + } - *whyNot = stream() << "could not understand excluded version ranges" - << causedBy(whyNot); + // Check that we aren't too old + if (CURRENT_CONFIG_VERSION < versionInfo.getMinCompatibleVersion()) { + *whyNot = stream() << "the config version " << CURRENT_CONFIG_VERSION + << " of our process is too old " + << "for the detected config version " + << versionInfo.getMinCompatibleVersion(); - return VersionStatus_Incompatible; - } + return VersionStatus_Incompatible; + } - // versionString is the global version of this process - if (isInMongoVersionRanges(versionString, excludedRanges)) { + // Check that the mongo version of this process hasn't been excluded from the cluster + vector<MongoVersionRange> excludedRanges; + if (versionInfo.isExcludingMongoVersionsSet() && + !MongoVersionRange::parseBSONArray( + versionInfo.getExcludingMongoVersions(), &excludedRanges, whyNot)) { + *whyNot = stream() << "could not understand excluded version ranges" << causedBy(whyNot); - // Cast needed here for MSVC compiler issue - *whyNot = stream() << "not compatible with current config version, version " - << reinterpret_cast<const char*>(versionString) - << "has been excluded."; + return VersionStatus_Incompatible; + } - return VersionStatus_Incompatible; - } + // versionString is the global version of this process + if (isInMongoVersionRanges(versionString, excludedRanges)) { + // Cast needed here for MSVC compiler issue + *whyNot = stream() << "not compatible with current config version, version " + << reinterpret_cast<const char*>(versionString) << "has been excluded."; - // Check if we need to upgrade - if (versionInfo.getCurrentVersion() >= CURRENT_CONFIG_VERSION) { - return VersionStatus_Compatible; - } + return VersionStatus_Incompatible; + } - return VersionStatus_NeedUpgrade; + // Check if we need to upgrade + if (versionInfo.getCurrentVersion() >= CURRENT_CONFIG_VERSION) { + return VersionStatus_Compatible; } - // Checks that all config servers are online - bool _checkConfigServersAlive(const ConnectionString& configLoc, string* errMsg) { - bool resultOk; - BSONObj result; - try { - ScopedDbConnection conn(configLoc, 30); - if (conn->type() == ConnectionString::SYNC) { - // TODO: Dynamic cast is bad, we need a better way of managing this op - // via the heirarchy (or not) - SyncClusterConnection* scc = dynamic_cast<SyncClusterConnection*>(conn.get()); - fassert(16729, scc != NULL); - return scc->prepare(*errMsg); - } - else { - resultOk = conn->runCommand("admin", BSON( "fsync" << 1 ), result); - } - conn.done(); - } - catch (const DBException& e) { - *errMsg = e.toString(); - return false; + return VersionStatus_NeedUpgrade; +} + +// Checks that all config servers are online +bool _checkConfigServersAlive(const ConnectionString& configLoc, string* errMsg) { + bool resultOk; + BSONObj result; + try { + ScopedDbConnection conn(configLoc, 30); + if (conn->type() == ConnectionString::SYNC) { + // TODO: Dynamic cast is bad, we need a better way of managing this op + // via the heirarchy (or not) + SyncClusterConnection* scc = dynamic_cast<SyncClusterConnection*>(conn.get()); + fassert(16729, scc != NULL); + return scc->prepare(*errMsg); + } else { + resultOk = conn->runCommand("admin", BSON("fsync" << 1), result); } - - if (!resultOk) { - *errMsg = DBClientWithCommands::getLastErrorString(result); - return false; - } - - return true; + conn.done(); + } catch (const DBException& e) { + *errMsg = e.toString(); + return false; } - // Dispatches upgrades based on version to the upgrades registered in the upgrade registry - bool _nextUpgrade(CatalogManager* catalogManager, - const ConfigUpgradeRegistry& registry, - const VersionType& lastVersionInfo, - VersionType* upgradedVersionInfo, - string* errMsg) { + if (!resultOk) { + *errMsg = DBClientWithCommands::getLastErrorString(result); + return false; + } - int fromVersion = lastVersionInfo.getCurrentVersion(); + return true; +} - ConfigUpgradeRegistry::const_iterator foundIt = registry.find(fromVersion); +// Dispatches upgrades based on version to the upgrades registered in the upgrade registry +bool _nextUpgrade(CatalogManager* catalogManager, + const ConfigUpgradeRegistry& registry, + const VersionType& lastVersionInfo, + VersionType* upgradedVersionInfo, + string* errMsg) { + int fromVersion = lastVersionInfo.getCurrentVersion(); - if (foundIt == registry.end()) { + ConfigUpgradeRegistry::const_iterator foundIt = registry.find(fromVersion); - *errMsg = stream() << "newer version " << CURRENT_CONFIG_VERSION - << " of mongo config metadata is required, " << "current version is " - << fromVersion << ", " - << "don't know how to upgrade from this version"; + if (foundIt == registry.end()) { + *errMsg = stream() << "newer version " << CURRENT_CONFIG_VERSION + << " of mongo config metadata is required, " + << "current version is " << fromVersion << ", " + << "don't know how to upgrade from this version"; - return false; - } + return false; + } - const UpgradeStep& upgrade = foundIt->second; - int toVersion = upgrade.toVersionRange.currentVersion; + const UpgradeStep& upgrade = foundIt->second; + int toVersion = upgrade.toVersionRange.currentVersion; - log() << "starting next upgrade step from v" << fromVersion << " to v" << toVersion; + log() << "starting next upgrade step from v" << fromVersion << " to v" << toVersion; - // Log begin to config.changelog - catalogManager->logChange(NULL, - "starting upgrade of config database", - VersionType::ConfigNS, - BSON("from" << fromVersion << "to" << toVersion)); + // Log begin to config.changelog + catalogManager->logChange(NULL, + "starting upgrade of config database", + VersionType::ConfigNS, + BSON("from" << fromVersion << "to" << toVersion)); - if (!upgrade.upgradeCallback(catalogManager, lastVersionInfo, errMsg)) { - *errMsg = stream() << "error upgrading config database from v" - << fromVersion << " to v" << toVersion << causedBy(errMsg); - return false; - } + if (!upgrade.upgradeCallback(catalogManager, lastVersionInfo, errMsg)) { + *errMsg = stream() << "error upgrading config database from v" << fromVersion << " to v" + << toVersion << causedBy(errMsg); + return false; + } - // Get the config version we've upgraded to and make sure it's sane - Status verifyConfigStatus = getConfigVersion(catalogManager, upgradedVersionInfo); + // Get the config version we've upgraded to and make sure it's sane + Status verifyConfigStatus = getConfigVersion(catalogManager, upgradedVersionInfo); - if (!verifyConfigStatus.isOK()) { - *errMsg = stream() << "failed to validate v" << fromVersion << " config version upgrade" - << causedBy(verifyConfigStatus); + if (!verifyConfigStatus.isOK()) { + *errMsg = stream() << "failed to validate v" << fromVersion << " config version upgrade" + << causedBy(verifyConfigStatus); - return false; - } - - catalogManager->logChange(NULL, - "finished upgrade of config database", - VersionType::ConfigNS, - BSON("from" << fromVersion << "to" << toVersion)); - return true; + return false; } -} // namespace + catalogManager->logChange(NULL, + "finished upgrade of config database", + VersionType::ConfigNS, + BSON("from" << fromVersion << "to" << toVersion)); + return true; +} +} // namespace - /** - * Returns the config version of the cluster pointed at by the connection string. - * - * @return OK if version found successfully, error status if something bad happened. - */ - Status getConfigVersion(CatalogManager* catalogManager, VersionType* versionInfo) { - try { - versionInfo->clear(); - ScopedDbConnection conn(catalogManager->connectionString(), 30); - - unique_ptr<DBClientCursor> cursor(_safeCursor(conn->query("config.version", - BSONObj()))); +/** + * Returns the config version of the cluster pointed at by the connection string. + * + * @return OK if version found successfully, error status if something bad happened. + */ +Status getConfigVersion(CatalogManager* catalogManager, VersionType* versionInfo) { + try { + versionInfo->clear(); - bool hasConfigData = conn->count(ShardType::ConfigNS) - || conn->count(DatabaseType::ConfigNS) - || conn->count(CollectionType::ConfigNS); + ScopedDbConnection conn(catalogManager->connectionString(), 30); - if (!cursor->more()) { + unique_ptr<DBClientCursor> cursor(_safeCursor(conn->query("config.version", BSONObj()))); - // Version is 1 if we have data, 0 if we're completely empty - if (hasConfigData) { - versionInfo->setMinCompatibleVersion(UpgradeHistory_UnreportedVersion); - versionInfo->setCurrentVersion(UpgradeHistory_UnreportedVersion); - } - else { - versionInfo->setMinCompatibleVersion(UpgradeHistory_EmptyVersion); - versionInfo->setCurrentVersion(UpgradeHistory_EmptyVersion); - } + bool hasConfigData = conn->count(ShardType::ConfigNS) || + conn->count(DatabaseType::ConfigNS) || conn->count(CollectionType::ConfigNS); - conn.done(); - return Status::OK(); + if (!cursor->more()) { + // Version is 1 if we have data, 0 if we're completely empty + if (hasConfigData) { + versionInfo->setMinCompatibleVersion(UpgradeHistory_UnreportedVersion); + versionInfo->setCurrentVersion(UpgradeHistory_UnreportedVersion); + } else { + versionInfo->setMinCompatibleVersion(UpgradeHistory_EmptyVersion); + versionInfo->setCurrentVersion(UpgradeHistory_EmptyVersion); } - BSONObj versionDoc = cursor->next(); - string errMsg; + conn.done(); + return Status::OK(); + } - if (!versionInfo->parseBSON(versionDoc, &errMsg) || !versionInfo->isValid(&errMsg)) { - conn.done(); + BSONObj versionDoc = cursor->next(); + string errMsg; - return Status(ErrorCodes::UnsupportedFormat, - stream() << "invalid config version document " << versionDoc - << causedBy(errMsg)); - } + if (!versionInfo->parseBSON(versionDoc, &errMsg) || !versionInfo->isValid(&errMsg)) { + conn.done(); - if (cursor->more()) { - conn.done(); + return Status(ErrorCodes::UnsupportedFormat, + stream() << "invalid config version document " << versionDoc + << causedBy(errMsg)); + } - return Status(ErrorCodes::RemoteValidationError, - stream() << "should only have 1 document " - << "in config.version collection"); - } + if (cursor->more()) { conn.done(); - } - catch (const DBException& e) { - return e.toStatus(); - } - return Status::OK(); + return Status(ErrorCodes::RemoteValidationError, + stream() << "should only have 1 document " + << "in config.version collection"); + } + conn.done(); + } catch (const DBException& e) { + return e.toStatus(); } - bool checkAndUpgradeConfigVersion(CatalogManager* catalogManager, - bool upgrade, - VersionType* initialVersionInfo, - VersionType* versionInfo, - string* errMsg) { + return Status::OK(); +} + +bool checkAndUpgradeConfigVersion(CatalogManager* catalogManager, + bool upgrade, + VersionType* initialVersionInfo, + VersionType* versionInfo, + string* errMsg) { + string dummy; + if (!errMsg) { + errMsg = &dummy; + } - string dummy; - if (!errMsg) { - errMsg = &dummy; - } + Status getConfigStatus = getConfigVersion(catalogManager, versionInfo); + if (!getConfigStatus.isOK()) { + *errMsg = stream() << "could not load config version for upgrade" + << causedBy(getConfigStatus); + return false; + } - Status getConfigStatus = getConfigVersion(catalogManager, versionInfo); - if (!getConfigStatus.isOK()) { - *errMsg = stream() << "could not load config version for upgrade" - << causedBy(getConfigStatus); - return false; - } + versionInfo->cloneTo(initialVersionInfo); - versionInfo->cloneTo(initialVersionInfo); + VersionStatus comp = isConfigVersionCompatible(*versionInfo, errMsg); - VersionStatus comp = isConfigVersionCompatible(*versionInfo, errMsg); + if (comp == VersionStatus_Incompatible) + return false; + if (comp == VersionStatus_Compatible) + return true; - if (comp == VersionStatus_Incompatible) return false; - if (comp == VersionStatus_Compatible) return true; + invariant(comp == VersionStatus_NeedUpgrade); - invariant(comp == VersionStatus_NeedUpgrade); + // + // Our current config version is now greater than the current version, so we should upgrade + // if possible. + // - // - // Our current config version is now greater than the current version, so we should upgrade - // if possible. - // + // The first empty version is technically an upgrade, but has special semantics + bool isEmptyVersion = versionInfo->getCurrentVersion() == UpgradeHistory_EmptyVersion; - // The first empty version is technically an upgrade, but has special semantics - bool isEmptyVersion = versionInfo->getCurrentVersion() == UpgradeHistory_EmptyVersion; + // First check for the upgrade flag (but no flag is needed if we're upgrading from empty) + if (!isEmptyVersion && !upgrade) { + *errMsg = stream() << "newer version " << CURRENT_CONFIG_VERSION + << " of mongo config metadata is required, " + << "current version is " << versionInfo->getCurrentVersion() << ", " + << "need to run mongos with --upgrade"; - // First check for the upgrade flag (but no flag is needed if we're upgrading from empty) - if (!isEmptyVersion && !upgrade) { - *errMsg = stream() << "newer version " << CURRENT_CONFIG_VERSION - << " of mongo config metadata is required, " << "current version is " - << versionInfo->getCurrentVersion() << ", " - << "need to run mongos with --upgrade"; + return false; + } - return false; + // Contact the config servers to make sure all are online - otherwise we wait a long time + // for locks. + if (!_checkConfigServersAlive(catalogManager->connectionString(), errMsg)) { + if (isEmptyVersion) { + *errMsg = stream() << "all config servers must be reachable for initial" + << " config database creation" << causedBy(errMsg); + } else { + *errMsg = stream() << "all config servers must be reachable for config upgrade" + << causedBy(errMsg); } - // Contact the config servers to make sure all are online - otherwise we wait a long time - // for locks. - if (!_checkConfigServersAlive(catalogManager->connectionString(), errMsg)) { + return false; + } - if (isEmptyVersion) { - *errMsg = stream() << "all config servers must be reachable for initial" - << " config database creation" << causedBy(errMsg); - } - else { - *errMsg = stream() << "all config servers must be reachable for config upgrade" + // Check whether or not the balancer is online, if it is online we will not upgrade + // (but we will initialize the config server) + if (!isEmptyVersion) { + auto balSettingsResult = catalogManager->getGlobalSettings(SettingsType::BalancerDocKey); + if (balSettingsResult.isOK()) { + SettingsType balSettings = balSettingsResult.getValue(); + if (!balSettings.getBalancerStopped()) { + *errMsg = stream() << "balancer must be stopped for config upgrade" << causedBy(errMsg); } - - return false; } + } - // Check whether or not the balancer is online, if it is online we will not upgrade - // (but we will initialize the config server) - if (!isEmptyVersion) { - auto balSettingsResult = - catalogManager->getGlobalSettings(SettingsType::BalancerDocKey); - if (balSettingsResult.isOK()) { - SettingsType balSettings = balSettingsResult.getValue(); - if (!balSettings.getBalancerStopped()) { - *errMsg = stream() << "balancer must be stopped for config upgrade" - << causedBy(errMsg); - } - } - } + // + // Acquire a lock for the upgrade process. + // + // We want to ensure that only a single mongo process is upgrading the config server at a + // time. + // + + string whyMessage(stream() << "upgrading config database to new format v" + << CURRENT_CONFIG_VERSION); + auto lockTimeout = stdx::chrono::milliseconds(20 * 60 * 1000); + auto scopedDistLock = + catalogManager->getDistLockManager()->lock("configUpgrade", whyMessage, lockTimeout); + if (!scopedDistLock.isOK()) { + *errMsg = scopedDistLock.getStatus().toString(); + return false; + } - // - // Acquire a lock for the upgrade process. - // - // We want to ensure that only a single mongo process is upgrading the config server at a - // time. - // + // + // Double-check compatibility inside the upgrade lock + // Another process may have won the lock earlier and done the upgrade for us, check + // if this is the case. + // + + getConfigStatus = getConfigVersion(catalogManager, versionInfo); + if (!getConfigStatus.isOK()) { + *errMsg = stream() << "could not reload config version for upgrade" + << causedBy(getConfigStatus); + return false; + } - string whyMessage(stream() << "upgrading config database to new format v" - << CURRENT_CONFIG_VERSION); - auto lockTimeout = stdx::chrono::milliseconds(20 * 60 * 1000); - auto scopedDistLock = catalogManager->getDistLockManager()->lock("configUpgrade", - whyMessage, - lockTimeout); - if (!scopedDistLock.isOK()) { - *errMsg = scopedDistLock.getStatus().toString(); - return false; - } + versionInfo->cloneTo(initialVersionInfo); - // - // Double-check compatibility inside the upgrade lock - // Another process may have won the lock earlier and done the upgrade for us, check - // if this is the case. - // + comp = isConfigVersionCompatible(*versionInfo, errMsg); - getConfigStatus = getConfigVersion(catalogManager, versionInfo); - if (!getConfigStatus.isOK()) { - *errMsg = stream() << "could not reload config version for upgrade" - << causedBy(getConfigStatus); - return false; - } + if (comp == VersionStatus_Incompatible) + return false; + if (comp == VersionStatus_Compatible) + return true; - versionInfo->cloneTo(initialVersionInfo); + invariant(comp == VersionStatus_NeedUpgrade); - comp = isConfigVersionCompatible(*versionInfo, errMsg); + // + // Run through the upgrade steps necessary to bring our config version to the current + // version + // - if (comp == VersionStatus_Incompatible) return false; - if (comp == VersionStatus_Compatible) return true; + log() << "starting upgrade of config server from v" << versionInfo->getCurrentVersion() + << " to v" << CURRENT_CONFIG_VERSION; - invariant(comp == VersionStatus_NeedUpgrade); + ConfigUpgradeRegistry registry(createRegistry()); + + while (versionInfo->getCurrentVersion() < CURRENT_CONFIG_VERSION) { + int fromVersion = versionInfo->getCurrentVersion(); // - // Run through the upgrade steps necessary to bring our config version to the current - // version + // Run the next upgrade process and replace versionInfo with the result of the + // upgrade. // - log() << "starting upgrade of config server from v" << versionInfo->getCurrentVersion() - << " to v" << CURRENT_CONFIG_VERSION; - - ConfigUpgradeRegistry registry(createRegistry()); - - while (versionInfo->getCurrentVersion() < CURRENT_CONFIG_VERSION) { - int fromVersion = versionInfo->getCurrentVersion(); - - // - // Run the next upgrade process and replace versionInfo with the result of the - // upgrade. - // - - if (!_nextUpgrade(catalogManager, registry, *versionInfo, versionInfo, errMsg)) { - return false; - } - - // Ensure we're making progress here - if (versionInfo->getCurrentVersion() <= fromVersion) { + if (!_nextUpgrade(catalogManager, registry, *versionInfo, versionInfo, errMsg)) { + return false; + } - *errMsg = stream() << "bad v" << fromVersion << " config version upgrade, " - << "version did not increment and is now " - << versionInfo->getCurrentVersion(); + // Ensure we're making progress here + if (versionInfo->getCurrentVersion() <= fromVersion) { + *errMsg = stream() << "bad v" << fromVersion << " config version upgrade, " + << "version did not increment and is now " + << versionInfo->getCurrentVersion(); - return false; - } + return false; } + } - invariant(versionInfo->getCurrentVersion() == CURRENT_CONFIG_VERSION); + invariant(versionInfo->getCurrentVersion() == CURRENT_CONFIG_VERSION); - log() << "upgrade of config server to v" << versionInfo->getCurrentVersion() - << " successful"; + log() << "upgrade of config server to v" << versionInfo->getCurrentVersion() << " successful"; - return true; - } + return true; +} -} // namespace mongo +} // namespace mongo diff --git a/src/mongo/s/catalog/legacy/config_upgrade.h b/src/mongo/s/catalog/legacy/config_upgrade.h index cd37a9cb634..7caf9d5a177 100644 --- a/src/mongo/s/catalog/legacy/config_upgrade.h +++ b/src/mongo/s/catalog/legacy/config_upgrade.h @@ -32,118 +32,118 @@ namespace mongo { - class CatalogManager; - class Status; - class VersionType; +class CatalogManager; +class Status; +class VersionType; + +/** + * UPGRADE HISTORY + * + * The enum below documents the version changes to *both* the config server data layout + * and the versioning protocol between clients (i.e. the set of calls between mongos and + * mongod). + * + * Friendly notice: + * + * EVERY CHANGE EITHER IN CONFIG LAYOUT AND IN S/D PROTOCOL MUST BE RECORDED HERE BY AN INCREASE + * IN THE VERSION AND BY TAKING THE FOLLOWING STEPS. (IF YOU DON'T UNDERSTAND THESE STEPS, YOU + * SHOULD PROBABLY NOT BE UPGRADING THE VERSIONS BY YOURSELF.) + * + * + A new entry in the UpgradeHistory enum is created + * + The CURRENT_CONFIG_VERSION below is incremented to that version + * + There should be a determination if the MIN_COMPATIBLE_CONFIG_VERSION should be increased or + * not. This means determining if, by introducing the changes to layout and/or protocol, the + * new mongos/d can co-exist in a cluster with the old ones. + * + If layout changes are involved, there should be a corresponding layout upgrade routine. See + * for instance config_upgrade_vX_to_vY.cpp. + * + Again, if a layout change occurs, the base upgrade method, config_upgrade_v0_to_vX.cpp must + * be upgraded. This means that all new clusters will start at the newest versions. + * + */ +enum UpgradeHistory { /** - * UPGRADE HISTORY - * - * The enum below documents the version changes to *both* the config server data layout - * and the versioning protocol between clients (i.e. the set of calls between mongos and - * mongod). - * - * Friendly notice: + * The empty version, reported when there is no config server data + */ + UpgradeHistory_EmptyVersion = 0, + + /** + * The unreported version older mongoses used before config.version collection existed * - * EVERY CHANGE EITHER IN CONFIG LAYOUT AND IN S/D PROTOCOL MUST BE RECORDED HERE BY AN INCREASE - * IN THE VERSION AND BY TAKING THE FOLLOWING STEPS. (IF YOU DON'T UNDERSTAND THESE STEPS, YOU - * SHOULD PROBABLY NOT BE UPGRADING THE VERSIONS BY YOURSELF.) + * If there is a config.shards/databases/collections collection but no config.version + * collection, version 1 is assumed + */ + UpgradeHistory_UnreportedVersion = 1, + + /** + * NOTE: We skip version 2 here since it is very old and we shouldn't see it in the wild. * - * + A new entry in the UpgradeHistory enum is created - * + The CURRENT_CONFIG_VERSION below is incremented to that version - * + There should be a determination if the MIN_COMPATIBLE_CONFIG_VERSION should be increased or - * not. This means determining if, by introducing the changes to layout and/or protocol, the - * new mongos/d can co-exist in a cluster with the old ones. - * + If layout changes are involved, there should be a corresponding layout upgrade routine. See - * for instance config_upgrade_vX_to_vY.cpp. - * + Again, if a layout change occurs, the base upgrade method, config_upgrade_v0_to_vX.cpp must - * be upgraded. This means that all new clusters will start at the newest versions. + * Do not skip upgrade versions in the future. + */ + + /** + * Base version used by pre-2.4 mongoses with no collection epochs. + */ + UpgradeHistory_NoEpochVersion = 3, + + /** + * Version upgrade which added collection epochs to all sharded collections and + * chunks. * + * Also: + * + Version document in config.version now of the form: + * { minVersion : X, currentVersion : Y, clusterId : OID(...) } + * + Mongos pings include a "mongoVersion" field indicating the mongos version + * + Mongos pings include a "configVersion" field indicating the current config version + * + Mongos explicitly ignores any collection with a "primary" field */ - enum UpgradeHistory { - - /** - * The empty version, reported when there is no config server data - */ - UpgradeHistory_EmptyVersion = 0, - - /** - * The unreported version older mongoses used before config.version collection existed - * - * If there is a config.shards/databases/collections collection but no config.version - * collection, version 1 is assumed - */ - UpgradeHistory_UnreportedVersion = 1, - - /** - * NOTE: We skip version 2 here since it is very old and we shouldn't see it in the wild. - * - * Do not skip upgrade versions in the future. - */ - - /** - * Base version used by pre-2.4 mongoses with no collection epochs. - */ - UpgradeHistory_NoEpochVersion = 3, - - /** - * Version upgrade which added collection epochs to all sharded collections and - * chunks. - * - * Also: - * + Version document in config.version now of the form: - * { minVersion : X, currentVersion : Y, clusterId : OID(...) } - * + Mongos pings include a "mongoVersion" field indicating the mongos version - * + Mongos pings include a "configVersion" field indicating the current config version - * + Mongos explicitly ignores any collection with a "primary" field - */ - UpgradeHistory_MandatoryEpochVersion = 4, - - /** - * Version upgrade with the following changes: - * - * + Dropping a collection from mongos now waits for the chunks to be removed from the - * config server before contacting each shard. Because of this, mongos should be - * upgraded first before mongod or never drop collections during upgrade. - */ - UpgradeHistory_DummyBumpPre2_6 = 5, - - /** - * Version upgrade with the following changes: - * - * + "_secondaryThrottle" field for config.settings now accepts write concern - * specifications. - * + config.locks { ts: 1 } index is no longer unique. - */ - UpgradeHistory_DummyBumpPre2_8 = 6, // Note: 2.8 is also known as 3.0. - - UpgradeHistory_DummyBumpPre3_0 = 7, - }; - - // Earliest version we're compatible with - const int MIN_COMPATIBLE_CONFIG_VERSION = UpgradeHistory_DummyBumpPre2_8; - - // Latest version we know how to communicate with - const int CURRENT_CONFIG_VERSION = UpgradeHistory_DummyBumpPre3_0; + UpgradeHistory_MandatoryEpochVersion = 4, /** - * Returns the config version of the cluster pointed at by the connection string. + * Version upgrade with the following changes: * - * @return OK if version found successfully, error status if something bad happened. + * + Dropping a collection from mongos now waits for the chunks to be removed from the + * config server before contacting each shard. Because of this, mongos should be + * upgraded first before mongod or never drop collections during upgrade. */ - Status getConfigVersion(CatalogManager* catalogManager, VersionType* versionInfo); + UpgradeHistory_DummyBumpPre2_6 = 5, /** - * Checks the config version and ensures it's the latest version, otherwise tries to update. + * Version upgrade with the following changes: * - * @return true if the config version is now compatible. - * @return initial and finalVersionInfo indicating the start and end versions of the upgrade. - * These are the same if no upgrade occurred. + * + "_secondaryThrottle" field for config.settings now accepts write concern + * specifications. + * + config.locks { ts: 1 } index is no longer unique. */ - bool checkAndUpgradeConfigVersion(CatalogManager* catalogManager, - bool upgrade, - VersionType* initialVersionInfo, - VersionType* finalVersionInfo, - std::string* errMsg); + UpgradeHistory_DummyBumpPre2_8 = 6, // Note: 2.8 is also known as 3.0. + + UpgradeHistory_DummyBumpPre3_0 = 7, +}; + +// Earliest version we're compatible with +const int MIN_COMPATIBLE_CONFIG_VERSION = UpgradeHistory_DummyBumpPre2_8; + +// Latest version we know how to communicate with +const int CURRENT_CONFIG_VERSION = UpgradeHistory_DummyBumpPre3_0; + +/** + * Returns the config version of the cluster pointed at by the connection string. + * + * @return OK if version found successfully, error status if something bad happened. + */ +Status getConfigVersion(CatalogManager* catalogManager, VersionType* versionInfo); + +/** + * Checks the config version and ensures it's the latest version, otherwise tries to update. + * + * @return true if the config version is now compatible. + * @return initial and finalVersionInfo indicating the start and end versions of the upgrade. + * These are the same if no upgrade occurred. + */ +bool checkAndUpgradeConfigVersion(CatalogManager* catalogManager, + bool upgrade, + VersionType* initialVersionInfo, + VersionType* finalVersionInfo, + std::string* errMsg); -} // namespace mongo +} // namespace mongo diff --git a/src/mongo/s/catalog/legacy/config_upgrade_helpers.cpp b/src/mongo/s/catalog/legacy/config_upgrade_helpers.cpp index dfde57d6774..41bc88eeb41 100644 --- a/src/mongo/s/catalog/legacy/config_upgrade_helpers.cpp +++ b/src/mongo/s/catalog/legacy/config_upgrade_helpers.cpp @@ -45,81 +45,76 @@ namespace mongo { - using std::endl; - using std::string; - using std::unique_ptr; +using std::endl; +using std::string; +using std::unique_ptr; - using mongoutils::str::stream; +using mongoutils::str::stream; - // Custom field used in upgrade state to determine if/where we failed on last upgrade - const BSONField<bool> inCriticalSectionField("inCriticalSection", false); +// Custom field used in upgrade state to determine if/where we failed on last upgrade +const BSONField<bool> inCriticalSectionField("inCriticalSection", false); - Status preUpgradeCheck(CatalogManager* catalogManager, - const VersionType& lastVersionInfo, - string minMongosVersion) { - - if (lastVersionInfo.isUpgradeIdSet() && lastVersionInfo.getUpgradeId().isSet()) { - // - // Another upgrade failed, so cleanup may be necessary - // - - BSONObj lastUpgradeState = lastVersionInfo.getUpgradeState(); +Status preUpgradeCheck(CatalogManager* catalogManager, + const VersionType& lastVersionInfo, + string minMongosVersion) { + if (lastVersionInfo.isUpgradeIdSet() && lastVersionInfo.getUpgradeId().isSet()) { + // + // Another upgrade failed, so cleanup may be necessary + // - bool inCriticalSection; - string errMsg; - if (!FieldParser::extract(lastUpgradeState, - inCriticalSectionField, - &inCriticalSection, - &errMsg)) { - return Status(ErrorCodes::FailedToParse, causedBy(errMsg)); - } + BSONObj lastUpgradeState = lastVersionInfo.getUpgradeState(); - if (inCriticalSection) { - // Note: custom message must be supplied by caller - return Status(ErrorCodes::ManualInterventionRequired, ""); - } + bool inCriticalSection; + string errMsg; + if (!FieldParser::extract( + lastUpgradeState, inCriticalSectionField, &inCriticalSection, &errMsg)) { + return Status(ErrorCodes::FailedToParse, causedBy(errMsg)); } - // - // Check the versions of other mongo processes in the cluster before upgrade. - // We can't upgrade if there are active pre-v2.4 processes in the cluster - // - return checkClusterMongoVersions(catalogManager, string(minMongosVersion)); - } - - Status commitConfigUpgrade(CatalogManager* catalogManager, - int currentVersion, - int minCompatibleVersion, - int newVersion) { - - // Note: DO NOT CLEAR the config version unless bumping the minCompatibleVersion, - // we want to save the excludes that were set. - - BSONObjBuilder setObj; - setObj << VersionType::minCompatibleVersion(minCompatibleVersion); - setObj << VersionType::currentVersion(newVersion); - - BSONObjBuilder unsetObj; - unsetObj.append(VersionType::upgradeId(), 1); - unsetObj.append(VersionType::upgradeState(), 1); - unsetObj.append("version", 1); // remove deprecated field, no longer supported >= v3.0. - - Status result = catalogManager->update( - VersionType::ConfigNS, - BSON("_id" << 1 << VersionType::currentVersion(currentVersion)), - BSON("$set" << setObj.done() << "$unset" << unsetObj.done()), - false, - false, - NULL); - if (!result.isOK()) { - return Status(result.code(), - str::stream() << "could not write new version info " - << " and exit critical upgrade section: " - << result.reason()); + if (inCriticalSection) { + // Note: custom message must be supplied by caller + return Status(ErrorCodes::ManualInterventionRequired, ""); } + } - return result; + // + // Check the versions of other mongo processes in the cluster before upgrade. + // We can't upgrade if there are active pre-v2.4 processes in the cluster + // + return checkClusterMongoVersions(catalogManager, string(minMongosVersion)); +} + +Status commitConfigUpgrade(CatalogManager* catalogManager, + int currentVersion, + int minCompatibleVersion, + int newVersion) { + // Note: DO NOT CLEAR the config version unless bumping the minCompatibleVersion, + // we want to save the excludes that were set. + + BSONObjBuilder setObj; + setObj << VersionType::minCompatibleVersion(minCompatibleVersion); + setObj << VersionType::currentVersion(newVersion); + + BSONObjBuilder unsetObj; + unsetObj.append(VersionType::upgradeId(), 1); + unsetObj.append(VersionType::upgradeState(), 1); + unsetObj.append("version", 1); // remove deprecated field, no longer supported >= v3.0. + + Status result = + catalogManager->update(VersionType::ConfigNS, + BSON("_id" << 1 << VersionType::currentVersion(currentVersion)), + BSON("$set" << setObj.done() << "$unset" << unsetObj.done()), + false, + false, + NULL); + if (!result.isOK()) { + return Status(result.code(), + str::stream() << "could not write new version info " + << " and exit critical upgrade section: " << result.reason()); } -} // namespace mongo + return result; +} + +} // namespace mongo diff --git a/src/mongo/s/catalog/legacy/config_upgrade_helpers.h b/src/mongo/s/catalog/legacy/config_upgrade_helpers.h index 10d03676ea8..7411737e262 100644 --- a/src/mongo/s/catalog/legacy/config_upgrade_helpers.h +++ b/src/mongo/s/catalog/legacy/config_upgrade_helpers.h @@ -39,32 +39,32 @@ namespace mongo { - class CatalogManager; - class ConnectionString; - class OID; - class Status; - class VersionType; +class CatalogManager; +class ConnectionString; +class OID; +class Status; +class VersionType; - /** - * Checks whether an unsuccessful upgrade was performed last time and also checks whether - * the mongos in the current cluster have the mimimum version required. Returns not ok if - * the check failed and the upgrade should not proceed. - * - * Note: There is also a special case for ManualInterventionRequired error where the - * message will be empty. - */ - Status preUpgradeCheck(CatalogManager* catalogManager, - const VersionType& lastVersionInfo, - std::string minMongosVersion); +/** + * Checks whether an unsuccessful upgrade was performed last time and also checks whether + * the mongos in the current cluster have the mimimum version required. Returns not ok if + * the check failed and the upgrade should not proceed. + * + * Note: There is also a special case for ManualInterventionRequired error where the + * message will be empty. + */ +Status preUpgradeCheck(CatalogManager* catalogManager, + const VersionType& lastVersionInfo, + std::string minMongosVersion); - /** - * Informs the config server that the upgrade task was completed by bumping the version. - * This also clears all upgrade state effectively leaving the critical section if the - * upgrade process did enter it. - */ - Status commitConfigUpgrade(CatalogManager* catalogManager, - int currentVersion, - int minCompatibleVersion, - int newVersion); +/** + * Informs the config server that the upgrade task was completed by bumping the version. + * This also clears all upgrade state effectively leaving the critical section if the + * upgrade process did enter it. + */ +Status commitConfigUpgrade(CatalogManager* catalogManager, + int currentVersion, + int minCompatibleVersion, + int newVersion); -} // namespace mongo +} // namespace mongo diff --git a/src/mongo/s/catalog/legacy/config_upgrade_v0_to_v7.cpp b/src/mongo/s/catalog/legacy/config_upgrade_v0_to_v7.cpp index 787d0114e73..f086fabc521 100644 --- a/src/mongo/s/catalog/legacy/config_upgrade_v0_to_v7.cpp +++ b/src/mongo/s/catalog/legacy/config_upgrade_v0_to_v7.cpp @@ -39,59 +39,57 @@ namespace mongo { - using std::string; - using mongo::str::stream; - - - /** - * Upgrade v0 to v7 described here - * - * This upgrade takes the config server from empty to an initial version. - */ - bool doUpgradeV0ToV7(CatalogManager* catalogManager, - const VersionType& lastVersionInfo, - string* errMsg) { - - string dummy; - if (!errMsg) errMsg = &dummy; - - verify(lastVersionInfo.getCurrentVersion() == UpgradeHistory_EmptyVersion); - - // - // Even though the initial config write is a single-document update, that single document - // is on multiple config servers and requests can interleave. The upgrade lock prevents - // this. - // - - log() << "writing initial config version at v" << CURRENT_CONFIG_VERSION; - - OID newClusterId = OID::gen(); - - VersionType versionInfo; - - // Upgrade to new version - versionInfo.setMinCompatibleVersion(MIN_COMPATIBLE_CONFIG_VERSION); - versionInfo.setCurrentVersion(CURRENT_CONFIG_VERSION); - versionInfo.setClusterId(newClusterId); - - verify(versionInfo.isValid(NULL)); - - // If the cluster has not previously been initialized, we need to set the version before - // using so subsequent mongoses use the config data the same way. This requires all three - // config servers online initially. - Status result = catalogManager->update(VersionType::ConfigNS, - BSON("_id" << 1), - versionInfo.toBSON(), - true, // upsert - false, // multi - NULL); - if (!result.isOK()) { - *errMsg = stream() << "error writing initial config version: " - << result.reason(); - return false; - } - - return true; +using std::string; +using mongo::str::stream; + + +/** + * Upgrade v0 to v7 described here + * + * This upgrade takes the config server from empty to an initial version. + */ +bool doUpgradeV0ToV7(CatalogManager* catalogManager, + const VersionType& lastVersionInfo, + string* errMsg) { + string dummy; + if (!errMsg) + errMsg = &dummy; + + verify(lastVersionInfo.getCurrentVersion() == UpgradeHistory_EmptyVersion); + + // + // Even though the initial config write is a single-document update, that single document + // is on multiple config servers and requests can interleave. The upgrade lock prevents + // this. + // + + log() << "writing initial config version at v" << CURRENT_CONFIG_VERSION; + + OID newClusterId = OID::gen(); + + VersionType versionInfo; + + // Upgrade to new version + versionInfo.setMinCompatibleVersion(MIN_COMPATIBLE_CONFIG_VERSION); + versionInfo.setCurrentVersion(CURRENT_CONFIG_VERSION); + versionInfo.setClusterId(newClusterId); + + verify(versionInfo.isValid(NULL)); + + // If the cluster has not previously been initialized, we need to set the version before + // using so subsequent mongoses use the config data the same way. This requires all three + // config servers online initially. + Status result = catalogManager->update(VersionType::ConfigNS, + BSON("_id" << 1), + versionInfo.toBSON(), + true, // upsert + false, // multi + NULL); + if (!result.isOK()) { + *errMsg = stream() << "error writing initial config version: " << result.reason(); + return false; } + return true; +} } diff --git a/src/mongo/s/catalog/legacy/config_upgrade_v6_to_v7.cpp b/src/mongo/s/catalog/legacy/config_upgrade_v6_to_v7.cpp index 6f841b0dc87..430d2eedb6e 100644 --- a/src/mongo/s/catalog/legacy/config_upgrade_v6_to_v7.cpp +++ b/src/mongo/s/catalog/legacy/config_upgrade_v6_to_v7.cpp @@ -40,74 +40,73 @@ namespace mongo { - using std::list; - using std::string; - using std::vector; - - static const char* minMongoProcessVersion = "3.0"; - - static const char* cannotCleanupMessage = - "\n\n" - "******\n" - "Cannot upgrade config database from v6 to v7 because a previous upgrade\n" - "failed in the critical section. Manual intervention is required to re-sync\n" - "the config servers.\n" - "******\n"; - - /** - * Upgrades v6 to v7. - */ - bool doUpgradeV6ToV7(CatalogManager* catalogManager, - const VersionType& lastVersionInfo, - string* errMsg) { - - string dummy; - if (!errMsg) errMsg = &dummy; - - invariant(lastVersionInfo.getCurrentVersion() == UpgradeHistory_DummyBumpPre2_8); - - Status result = preUpgradeCheck(catalogManager, lastVersionInfo, minMongoProcessVersion); - if (!result.isOK()) { - if (result.code() == ErrorCodes::ManualInterventionRequired) { - *errMsg = cannotCleanupMessage; - } - else { - *errMsg = result.toString(); - } - - return false; - } +using std::list; +using std::string; +using std::vector; + +static const char* minMongoProcessVersion = "3.0"; + +static const char* cannotCleanupMessage = + "\n\n" + "******\n" + "Cannot upgrade config database from v6 to v7 because a previous upgrade\n" + "failed in the critical section. Manual intervention is required to re-sync\n" + "the config servers.\n" + "******\n"; - // This is not needed because we are not actually going to make any modifications - // on the other collections in the config server for this particular upgrade. - // startConfigUpgrade(configLoc.toString(), - // lastVersionInfo.getCurrentVersion(), - // OID::gen()); - - // If we actually need to modify something in the config servers these need to follow - // after calling startConfigUpgrade(...): - // - // 1. Acquire necessary locks. - // 2. Make a backup of the collections we are about to modify. - // 3. Perform the upgrade process on the backup collection. - // 4. Verify that no changes were made to the collections since the backup was performed. - // 5. Call enterConfigUpgradeCriticalSection(configLoc.toString(), - // lastVersionInfo.getCurrentVersion()). - // 6. Rename the backup collection to the name of the original collection with - // dropTarget set to true. - - // We're only after the version bump in commitConfigUpgrade here since we never - // get into the critical section. - Status commitStatus = commitConfigUpgrade(catalogManager, - lastVersionInfo.getCurrentVersion(), - MIN_COMPATIBLE_CONFIG_VERSION, - CURRENT_CONFIG_VERSION); - - if (!commitStatus.isOK()) { - *errMsg = commitStatus.toString(); - return false; +/** + * Upgrades v6 to v7. + */ +bool doUpgradeV6ToV7(CatalogManager* catalogManager, + const VersionType& lastVersionInfo, + string* errMsg) { + string dummy; + if (!errMsg) + errMsg = &dummy; + + invariant(lastVersionInfo.getCurrentVersion() == UpgradeHistory_DummyBumpPre2_8); + + Status result = preUpgradeCheck(catalogManager, lastVersionInfo, minMongoProcessVersion); + if (!result.isOK()) { + if (result.code() == ErrorCodes::ManualInterventionRequired) { + *errMsg = cannotCleanupMessage; + } else { + *errMsg = result.toString(); } - return true; + return false; } + + // This is not needed because we are not actually going to make any modifications + // on the other collections in the config server for this particular upgrade. + // startConfigUpgrade(configLoc.toString(), + // lastVersionInfo.getCurrentVersion(), + // OID::gen()); + + // If we actually need to modify something in the config servers these need to follow + // after calling startConfigUpgrade(...): + // + // 1. Acquire necessary locks. + // 2. Make a backup of the collections we are about to modify. + // 3. Perform the upgrade process on the backup collection. + // 4. Verify that no changes were made to the collections since the backup was performed. + // 5. Call enterConfigUpgradeCriticalSection(configLoc.toString(), + // lastVersionInfo.getCurrentVersion()). + // 6. Rename the backup collection to the name of the original collection with + // dropTarget set to true. + + // We're only after the version bump in commitConfigUpgrade here since we never + // get into the critical section. + Status commitStatus = commitConfigUpgrade(catalogManager, + lastVersionInfo.getCurrentVersion(), + MIN_COMPATIBLE_CONFIG_VERSION, + CURRENT_CONFIG_VERSION); + + if (!commitStatus.isOK()) { + *errMsg = commitStatus.toString(); + return false; + } + + return true; +} } diff --git a/src/mongo/s/catalog/legacy/distlock.cpp b/src/mongo/s/catalog/legacy/distlock.cpp index 22c0a4e6128..b0d30b28967 100644 --- a/src/mongo/s/catalog/legacy/distlock.cpp +++ b/src/mongo/s/catalog/legacy/distlock.cpp @@ -43,796 +43,760 @@ namespace mongo { - using std::endl; - using std::list; - using std::set; - using std::string; - using std::stringstream; - using std::unique_ptr; - using std::vector; - - LabeledLevel DistributedLock::logLvl( 1 ); - DistributedLock::LastPings DistributedLock::lastPings; - - ThreadLocalValue<string> distLockIds(""); - - /* ================== - * Module initialization - */ - - static SimpleMutex _cachedProcessMutex; - static string* _cachedProcessString = NULL; - - static void initModule() { - stdx::lock_guard<SimpleMutex> lk(_cachedProcessMutex); - if (_cachedProcessString) { - // someone got the lock before us - return; - } - - // cache process string - stringstream ss; - ss << getHostName() << ":" << serverGlobalParams.port << ":" << time(0) << ":" << rand(); - _cachedProcessString = new string( ss.str() ); - } - - /* =================== */ +using std::endl; +using std::list; +using std::set; +using std::string; +using std::stringstream; +using std::unique_ptr; +using std::vector; - string getDistLockProcess() { - if (!_cachedProcessString) - initModule(); - verify( _cachedProcessString ); - return *_cachedProcessString; - } +LabeledLevel DistributedLock::logLvl(1); +DistributedLock::LastPings DistributedLock::lastPings; - string getDistLockId() { - string s = distLockIds.get(); - if ( s.empty() ) { - stringstream ss; - ss << getDistLockProcess() << ":" << getThreadName() << ":" << rand(); - s = ss.str(); - distLockIds.set( s ); - } - return s; - } +ThreadLocalValue<string> distLockIds(""); - LockException::LockException(StringData msg, int code): LockException(msg, code, OID()) { - } +/* ================== + * Module initialization + */ - LockException::LockException(StringData msg, int code, DistLockHandle lockID): - DBException(msg.toString(), code), - _mustUnlockID(lockID) { - } +static SimpleMutex _cachedProcessMutex; +static string* _cachedProcessString = NULL; - DistLockHandle LockException::getMustUnlockID() const { - return _mustUnlockID; +static void initModule() { + stdx::lock_guard<SimpleMutex> lk(_cachedProcessMutex); + if (_cachedProcessString) { + // someone got the lock before us + return; } - /** - * Create a new distributed lock, potentially with a custom sleep and takeover time. If a custom sleep time is - * specified (time between pings) - */ - DistributedLock::DistributedLock( const ConnectionString& conn , const string& name , unsigned long long lockTimeout, bool asProcess ) - : _conn(conn), _name(name), - _processId( asProcess ? getDistLockId() : getDistLockProcess() ), - _lockTimeout( lockTimeout == 0 ? LOCK_TIMEOUT : lockTimeout ), - _maxClockSkew( _lockTimeout / LOCK_SKEW_FACTOR ), _maxNetSkew( _maxClockSkew ), - _lockPing( _maxClockSkew ) - { - LOG( logLvl ) << "created new distributed lock for " << name << " on " << conn - << " ( lock timeout : " << _lockTimeout - << ", ping interval : " << _lockPing << ", process : " << asProcess << " )" << endl; - - - } + // cache process string + stringstream ss; + ss << getHostName() << ":" << serverGlobalParams.port << ":" << time(0) << ":" << rand(); + _cachedProcessString = new string(ss.str()); +} - DistLockPingInfo DistributedLock::LastPings::getLastPing(const ConnectionString& conn, - const string& lockName) { - stdx::lock_guard<stdx::mutex> lock(_mutex); - return _lastPings[std::make_pair(conn.toString(), lockName)]; - } +/* =================== */ - void DistributedLock::LastPings::setLastPing(const ConnectionString& conn, - const string& lockName, - const DistLockPingInfo& pd) { - stdx::lock_guard<stdx::mutex> lock(_mutex); - _lastPings[std::make_pair(conn.toString(), lockName)] = pd; - } +string getDistLockProcess() { + if (!_cachedProcessString) + initModule(); + verify(_cachedProcessString); + return *_cachedProcessString; +} - Date_t DistributedLock::getRemoteTime() const { - return DistributedLock::remoteTime(_conn, _maxNetSkew); +string getDistLockId() { + string s = distLockIds.get(); + if (s.empty()) { + stringstream ss; + ss << getDistLockProcess() << ":" << getThreadName() << ":" << rand(); + s = ss.str(); + distLockIds.set(s); } + return s; +} - bool DistributedLock::isRemoteTimeSkewed() const { - return !DistributedLock::checkSkew(_conn, - NUM_LOCK_SKEW_CHECKS, - _maxClockSkew, - _maxNetSkew); - } +LockException::LockException(StringData msg, int code) : LockException(msg, code, OID()) {} - const ConnectionString& DistributedLock::getRemoteConnection() const { - return _conn; - } +LockException::LockException(StringData msg, int code, DistLockHandle lockID) + : DBException(msg.toString(), code), _mustUnlockID(lockID) {} - const string& DistributedLock::getProcessId() const { - return _processId; - } - - /** - * Returns the remote time as reported by the cluster or server. The maximum difference between the reported time - * and the actual time on the remote server (at the completion of the function) is the maxNetSkew - */ - Date_t DistributedLock::remoteTime( const ConnectionString& cluster, unsigned long long maxNetSkew ) { +DistLockHandle LockException::getMustUnlockID() const { + return _mustUnlockID; +} - ConnectionString server( *cluster.getServers().begin() ); +/** + * Create a new distributed lock, potentially with a custom sleep and takeover time. If a custom sleep time is + * specified (time between pings) + */ +DistributedLock::DistributedLock(const ConnectionString& conn, + const string& name, + unsigned long long lockTimeout, + bool asProcess) + : _conn(conn), + _name(name), + _processId(asProcess ? getDistLockId() : getDistLockProcess()), + _lockTimeout(lockTimeout == 0 ? LOCK_TIMEOUT : lockTimeout), + _maxClockSkew(_lockTimeout / LOCK_SKEW_FACTOR), + _maxNetSkew(_maxClockSkew), + _lockPing(_maxClockSkew) { + LOG(logLvl) << "created new distributed lock for " << name << " on " << conn + << " ( lock timeout : " << _lockTimeout << ", ping interval : " << _lockPing + << ", process : " << asProcess << " )" << endl; +} - // Get result and delay if successful, errMsg if not - bool success = false; - BSONObj result; - string errMsg; - Milliseconds delay{0}; +DistLockPingInfo DistributedLock::LastPings::getLastPing(const ConnectionString& conn, + const string& lockName) { + stdx::lock_guard<stdx::mutex> lock(_mutex); + return _lastPings[std::make_pair(conn.toString(), lockName)]; +} - unique_ptr<ScopedDbConnection> connPtr; - try { - connPtr.reset( new ScopedDbConnection( server.toString() ) ); - ScopedDbConnection& conn = *connPtr; +void DistributedLock::LastPings::setLastPing(const ConnectionString& conn, + const string& lockName, + const DistLockPingInfo& pd) { + stdx::lock_guard<stdx::mutex> lock(_mutex); + _lastPings[std::make_pair(conn.toString(), lockName)] = pd; +} - Date_t then = jsTime(); - success = conn->runCommand( string( "admin" ), BSON( "serverStatus" << 1 ), result ); - delay = jsTime() - then; +Date_t DistributedLock::getRemoteTime() const { + return DistributedLock::remoteTime(_conn, _maxNetSkew); +} - if ( !success ) errMsg = result.toString(); - conn.done(); - } - catch ( const DBException& ex ) { +bool DistributedLock::isRemoteTimeSkewed() const { + return !DistributedLock::checkSkew(_conn, NUM_LOCK_SKEW_CHECKS, _maxClockSkew, _maxNetSkew); +} - if ( connPtr && connPtr->get()->isFailed() ) { - // Return to the pool so the pool knows about the failure - connPtr->done(); - } +const ConnectionString& DistributedLock::getRemoteConnection() const { + return _conn; +} - success = false; - errMsg = ex.toString(); - } - - if( !success ) { - throw TimeNotFoundException( str::stream() << "could not get status from server " - << server.toString() << " in cluster " - << cluster.toString() << " to check time" - << causedBy( errMsg ), - 13647 ); - } +const string& DistributedLock::getProcessId() const { + return _processId; +} - // Make sure that our delay is not more than 2x our maximum network skew, since this is the max our remote - // time value can be off by if we assume a response in the middle of the delay. - if (delay > Milliseconds(maxNetSkew * 2)) { - throw TimeNotFoundException( str::stream() - << "server " << server.toString() << " in cluster " - << cluster.toString() - << " did not respond within max network delay of " - << maxNetSkew << "ms", - 13648 ); +/** + * Returns the remote time as reported by the cluster or server. The maximum difference between the reported time + * and the actual time on the remote server (at the completion of the function) is the maxNetSkew + */ +Date_t DistributedLock::remoteTime(const ConnectionString& cluster, unsigned long long maxNetSkew) { + ConnectionString server(*cluster.getServers().begin()); + + // Get result and delay if successful, errMsg if not + bool success = false; + BSONObj result; + string errMsg; + Milliseconds delay{0}; + + unique_ptr<ScopedDbConnection> connPtr; + try { + connPtr.reset(new ScopedDbConnection(server.toString())); + ScopedDbConnection& conn = *connPtr; + + Date_t then = jsTime(); + success = conn->runCommand(string("admin"), BSON("serverStatus" << 1), result); + delay = jsTime() - then; + + if (!success) + errMsg = result.toString(); + conn.done(); + } catch (const DBException& ex) { + if (connPtr && connPtr->get()->isFailed()) { + // Return to the pool so the pool knows about the failure + connPtr->done(); } - return result["localTime"].Date() - (delay / 2); + success = false; + errMsg = ex.toString(); } - bool DistributedLock::checkSkew( const ConnectionString& cluster, unsigned skewChecks, unsigned long long maxClockSkew, unsigned long long maxNetSkew ) { - - vector<HostAndPort> servers = cluster.getServers(); + if (!success) { + throw TimeNotFoundException(str::stream() << "could not get status from server " + << server.toString() << " in cluster " + << cluster.toString() << " to check time" + << causedBy(errMsg), + 13647); + } - if(servers.size() < 1) return true; + // Make sure that our delay is not more than 2x our maximum network skew, since this is the max our remote + // time value can be off by if we assume a response in the middle of the delay. + if (delay > Milliseconds(maxNetSkew * 2)) { + throw TimeNotFoundException( + str::stream() << "server " << server.toString() << " in cluster " << cluster.toString() + << " did not respond within max network delay of " << maxNetSkew << "ms", + 13648); + } - vector<long long> avgSkews; + return result["localTime"].Date() - (delay / 2); +} - for(unsigned i = 0; i < skewChecks; i++) { +bool DistributedLock::checkSkew(const ConnectionString& cluster, + unsigned skewChecks, + unsigned long long maxClockSkew, + unsigned long long maxNetSkew) { + vector<HostAndPort> servers = cluster.getServers(); - // Find the average skew for each server - unsigned s = 0; - for(vector<HostAndPort>::iterator si = servers.begin(); si != servers.end(); ++si,s++) { + if (servers.size() < 1) + return true; - if(i == 0) avgSkews.push_back(0); + vector<long long> avgSkews; - // Could check if this is self, but shouldn't matter since local network connection should be fast. - ConnectionString server( *si ); + for (unsigned i = 0; i < skewChecks; i++) { + // Find the average skew for each server + unsigned s = 0; + for (vector<HostAndPort>::iterator si = servers.begin(); si != servers.end(); ++si, s++) { + if (i == 0) + avgSkews.push_back(0); - vector<long long> skew; + // Could check if this is self, but shouldn't matter since local network connection should be fast. + ConnectionString server(*si); - BSONObj result; + vector<long long> skew; - Date_t remote = remoteTime( server, maxNetSkew ); - Date_t local = jsTime(); + BSONObj result; - // Remote time can be delayed by at most MAX_NET_SKEW + Date_t remote = remoteTime(server, maxNetSkew); + Date_t local = jsTime(); - // Skew is how much time we'd have to add to local to get to remote - avgSkews[s] += (remote - local).count(); + // Remote time can be delayed by at most MAX_NET_SKEW - LOG( logLvl + 1 ) << "skew from remote server " << server << " found: " - << (remote - local).count(); + // Skew is how much time we'd have to add to local to get to remote + avgSkews[s] += (remote - local).count(); - } + LOG(logLvl + 1) << "skew from remote server " << server + << " found: " << (remote - local).count(); } + } - // Analyze skews - - long long serverMaxSkew = 0; - long long serverMinSkew = 0; + // Analyze skews - for(unsigned s = 0; s < avgSkews.size(); s++) { + long long serverMaxSkew = 0; + long long serverMinSkew = 0; - long long avgSkew = (avgSkews[s] /= skewChecks); + for (unsigned s = 0; s < avgSkews.size(); s++) { + long long avgSkew = (avgSkews[s] /= skewChecks); - // Keep track of max and min skews - if(s == 0) { + // Keep track of max and min skews + if (s == 0) { + serverMaxSkew = avgSkew; + serverMinSkew = avgSkew; + } else { + if (avgSkew > serverMaxSkew) serverMaxSkew = avgSkew; + if (avgSkew < serverMinSkew) serverMinSkew = avgSkew; - } - else { - if(avgSkew > serverMaxSkew) - serverMaxSkew = avgSkew; - if(avgSkew < serverMinSkew) - serverMinSkew = avgSkew; - } - } + } - long long totalSkew = serverMaxSkew - serverMinSkew; - - // Make sure our max skew is not more than our pre-set limit - if(totalSkew > (long long) maxClockSkew) { - LOG( logLvl + 1 ) << "total clock skew of " << totalSkew << "ms for servers " << cluster << " is out of " << maxClockSkew << "ms bounds." << endl; - return false; - } + long long totalSkew = serverMaxSkew - serverMinSkew; - LOG( logLvl + 1 ) << "total clock skew of " << totalSkew << "ms for servers " << cluster << " is in " << maxClockSkew << "ms bounds." << endl; - return true; + // Make sure our max skew is not more than our pre-set limit + if (totalSkew > (long long)maxClockSkew) { + LOG(logLvl + 1) << "total clock skew of " << totalSkew << "ms for servers " << cluster + << " is out of " << maxClockSkew << "ms bounds." << endl; + return false; } - Status DistributedLock::checkStatus(double timeout) { - - BSONObj lockObj; - try { - ScopedDbConnection conn(_conn.toString(), timeout ); - lockObj = conn->findOne( LocksType::ConfigNS, - BSON( LocksType::name(_name) ) ).getOwned(); - conn.done(); - } - catch ( DBException& e ) { - return e.toStatus(); - } + LOG(logLvl + 1) << "total clock skew of " << totalSkew << "ms for servers " << cluster + << " is in " << maxClockSkew << "ms bounds." << endl; + return true; +} - if ( lockObj.isEmpty() ) { - return Status(ErrorCodes::LockFailed, - str::stream() << "no lock for " << _name << " exists in the locks collection"); - } +Status DistributedLock::checkStatus(double timeout) { + BSONObj lockObj; + try { + ScopedDbConnection conn(_conn.toString(), timeout); + lockObj = conn->findOne(LocksType::ConfigNS, BSON(LocksType::name(_name))).getOwned(); + conn.done(); + } catch (DBException& e) { + return e.toStatus(); + } - if ( lockObj[LocksType::state()].numberInt() < 2 ) { - return Status(ErrorCodes::LockFailed, - str::stream() << "lock " << _name << " current state is not held (" - << lockObj[LocksType::state()].numberInt() << ")"); - } + if (lockObj.isEmpty()) { + return Status(ErrorCodes::LockFailed, + str::stream() << "no lock for " << _name + << " exists in the locks collection"); + } - if ( lockObj[LocksType::process()].String() != _processId ) { - return Status(ErrorCodes::LockFailed, - str::stream() << "lock " << _name << " is currently being held by " - << "another process (" - << lockObj[LocksType::process()].String() << ")"); - } + if (lockObj[LocksType::state()].numberInt() < 2) { + return Status(ErrorCodes::LockFailed, + str::stream() << "lock " << _name << " current state is not held (" + << lockObj[LocksType::state()].numberInt() << ")"); + } - return Status::OK(); + if (lockObj[LocksType::process()].String() != _processId) { + return Status(ErrorCodes::LockFailed, + str::stream() << "lock " << _name << " is currently being held by " + << "another process (" << lockObj[LocksType::process()].String() + << ")"); } - static void logErrMsgOrWarn(StringData messagePrefix, - StringData lockName, - StringData errMsg, - StringData altErrMsg) { + return Status::OK(); +} - if (errMsg.empty()) { - LOG(DistributedLock::logLvl - 1) << messagePrefix << " '" << lockName << "' " << - altErrMsg << std::endl; - } - else { - warning() << messagePrefix << " '" << lockName << "' " << causedBy(errMsg.toString()); - } +static void logErrMsgOrWarn(StringData messagePrefix, + StringData lockName, + StringData errMsg, + StringData altErrMsg) { + if (errMsg.empty()) { + LOG(DistributedLock::logLvl - 1) << messagePrefix << " '" << lockName << "' " << altErrMsg + << std::endl; + } else { + warning() << messagePrefix << " '" << lockName << "' " << causedBy(errMsg.toString()); } +} - // Semantics of this method are basically that if the lock cannot be acquired, returns false, - // can be retried. If the lock should not be tried again (some unexpected error), - // a LockException is thrown. - bool DistributedLock::lock_try(const string& why, BSONObj* other, double timeout) { - // This should always be true, if not, we are using the lock incorrectly. - verify( _name != "" ); +// Semantics of this method are basically that if the lock cannot be acquired, returns false, +// can be retried. If the lock should not be tried again (some unexpected error), +// a LockException is thrown. +bool DistributedLock::lock_try(const string& why, BSONObj* other, double timeout) { + // This should always be true, if not, we are using the lock incorrectly. + verify(_name != ""); - LOG( logLvl ) << "trying to acquire new distributed lock for " << _name << " on " << _conn - << " ( lock timeout : " << _lockTimeout - << ", ping interval : " << _lockPing << ", process : " << _processId << " )" - << endl; + LOG(logLvl) << "trying to acquire new distributed lock for " << _name << " on " << _conn + << " ( lock timeout : " << _lockTimeout << ", ping interval : " << _lockPing + << ", process : " << _processId << " )" << endl; - // write to dummy if 'other' is null - BSONObj dummyOther; - if ( other == NULL ) - other = &dummyOther; + // write to dummy if 'other' is null + BSONObj dummyOther; + if (other == NULL) + other = &dummyOther; - ScopedDbConnection conn(_conn.toString(), timeout ); + ScopedDbConnection conn(_conn.toString(), timeout); - BSONObjBuilder queryBuilder; - queryBuilder.append( LocksType::name() , _name ); - queryBuilder.append( LocksType::state() , 0 ); + BSONObjBuilder queryBuilder; + queryBuilder.append(LocksType::name(), _name); + queryBuilder.append(LocksType::state(), 0); - { - // make sure its there so we can use simple update logic below - BSONObj o = conn->findOne( LocksType::ConfigNS , BSON( LocksType::name(_name) ) ).getOwned(); + { + // make sure its there so we can use simple update logic below + BSONObj o = conn->findOne(LocksType::ConfigNS, BSON(LocksType::name(_name))).getOwned(); - // Case 1: No locks - if ( o.isEmpty() ) { - try { - LOG( logLvl ) << "inserting initial doc in " << LocksType::ConfigNS << " for lock " << _name << endl; - conn->insert( LocksType::ConfigNS, - BSON( LocksType::name(_name) - << LocksType::state(0) - << LocksType::who("") - << LocksType::lockID(OID()) )); - } - catch ( UserException& e ) { - warning() << "could not insert initial doc for distributed lock " << _name << causedBy( e ) << endl; - } + // Case 1: No locks + if (o.isEmpty()) { + try { + LOG(logLvl) << "inserting initial doc in " << LocksType::ConfigNS << " for lock " + << _name << endl; + conn->insert(LocksType::ConfigNS, + BSON(LocksType::name(_name) << LocksType::state(0) + << LocksType::who("") + << LocksType::lockID(OID()))); + } catch (UserException& e) { + warning() << "could not insert initial doc for distributed lock " << _name + << causedBy(e) << endl; } + } - // Case 2: A set lock that we might be able to force - else if ( o[LocksType::state()].numberInt() > 0 ) { - - string lockName = o[LocksType::name()].String() + string("/") + o[LocksType::process()].String(); - - BSONObj lastPing = conn->findOne( LockpingsType::ConfigNS, o[LocksType::process()].wrap( LockpingsType::process() ) ); - if ( lastPing.isEmpty() ) { - LOG( logLvl ) << "empty ping found for process in lock '" << lockName << "'" << endl; - // TODO: Using 0 as a "no time found" value Will fail if dates roll over, but then, so will a lot. - lastPing = BSON( LockpingsType::process(o[LocksType::process()].String()) << - LockpingsType::ping(Date_t()) ); - } - - unsigned long long elapsed = 0; - unsigned long long takeover = _lockTimeout; - DistLockPingInfo lastPingEntry = getLastPing(); + // Case 2: A set lock that we might be able to force + else if (o[LocksType::state()].numberInt() > 0) { + string lockName = + o[LocksType::name()].String() + string("/") + o[LocksType::process()].String(); + + BSONObj lastPing = conn->findOne( + LockpingsType::ConfigNS, o[LocksType::process()].wrap(LockpingsType::process())); + if (lastPing.isEmpty()) { + LOG(logLvl) << "empty ping found for process in lock '" << lockName << "'" << endl; + // TODO: Using 0 as a "no time found" value Will fail if dates roll over, but then, so will a lot. + lastPing = BSON(LockpingsType::process(o[LocksType::process()].String()) + << LockpingsType::ping(Date_t())); + } - LOG(logLvl) << "checking last ping for lock '" << lockName - << "' against process " << lastPingEntry.processId - << " and ping " << lastPingEntry.lastPing; + unsigned long long elapsed = 0; + unsigned long long takeover = _lockTimeout; + DistLockPingInfo lastPingEntry = getLastPing(); - try { + LOG(logLvl) << "checking last ping for lock '" << lockName << "' against process " + << lastPingEntry.processId << " and ping " << lastPingEntry.lastPing; - Date_t remote = remoteTime( _conn ); - - auto pingDocProcessId = lastPing[LockpingsType::process()].String(); - auto pingDocPingValue = lastPing[LockpingsType::ping()].Date(); - - // Timeout the elapsed time using comparisons of remote clock - // For non-finalized locks, timeout 15 minutes since last seen (ts) - // For finalized locks, timeout 15 minutes since last ping - bool recPingChange = - o[LocksType::state()].numberInt() == 2 && - (lastPingEntry.processId != pingDocProcessId || - lastPingEntry.lastPing != pingDocPingValue); - bool recTSChange = lastPingEntry.lockSessionId != o[LocksType::lockID()].OID(); - - if (recPingChange || recTSChange) { - // If the ping has changed since we last checked, mark the current date and time - setLastPing(DistLockPingInfo(pingDocProcessId, - pingDocPingValue, - remote, - o[LocksType::lockID()].OID(), - OID())); - } - else { - - // GOTCHA! Due to network issues, it is possible that the current time - // is less than the remote time. We *have* to check this here, otherwise - // we overflow and our lock breaks. - if (lastPingEntry.configLocalTime >= remote) - elapsed = 0; - else - elapsed = (remote - lastPingEntry.configLocalTime).count(); - } + try { + Date_t remote = remoteTime(_conn); + + auto pingDocProcessId = lastPing[LockpingsType::process()].String(); + auto pingDocPingValue = lastPing[LockpingsType::ping()].Date(); + + // Timeout the elapsed time using comparisons of remote clock + // For non-finalized locks, timeout 15 minutes since last seen (ts) + // For finalized locks, timeout 15 minutes since last ping + bool recPingChange = o[LocksType::state()].numberInt() == 2 && + (lastPingEntry.processId != pingDocProcessId || + lastPingEntry.lastPing != pingDocPingValue); + bool recTSChange = lastPingEntry.lockSessionId != o[LocksType::lockID()].OID(); + + if (recPingChange || recTSChange) { + // If the ping has changed since we last checked, mark the current date and time + setLastPing(DistLockPingInfo(pingDocProcessId, + pingDocPingValue, + remote, + o[LocksType::lockID()].OID(), + OID())); + } else { + // GOTCHA! Due to network issues, it is possible that the current time + // is less than the remote time. We *have* to check this here, otherwise + // we overflow and our lock breaks. + if (lastPingEntry.configLocalTime >= remote) + elapsed = 0; + else + elapsed = (remote - lastPingEntry.configLocalTime).count(); } - catch( LockException& e ) { - - // Remote server cannot be found / is not responsive - warning() << "Could not get remote time from " << _conn << causedBy( e ); - // If our config server is having issues, forget all the pings until we can see it again - resetLastPing(); + } catch (LockException& e) { + // Remote server cannot be found / is not responsive + warning() << "Could not get remote time from " << _conn << causedBy(e); + // If our config server is having issues, forget all the pings until we can see it again + resetLastPing(); + } - } + if (elapsed <= takeover) { + LOG(1) << "could not force lock '" << lockName << "' because elapsed time " + << elapsed << " <= takeover time " << takeover; + *other = o; + other->getOwned(); + conn.done(); + return false; + } - if (elapsed <= takeover) { - LOG(1) << "could not force lock '" << lockName - << "' because elapsed time " << elapsed - << " <= takeover time " << takeover; - *other = o; other->getOwned(); conn.done(); - return false; - } + LOG(0) << "forcing lock '" << lockName << "' because elapsed time " << elapsed + << " > takeover time " << takeover; - LOG(0) << "forcing lock '" << lockName - << "' because elapsed time " << elapsed - << " > takeover time " << takeover; - - if( elapsed > takeover ) { - - // Lock may forced, reset our timer if succeeds or fails - // Ensures that another timeout must happen if something borks up here, and resets our pristine - // ping state if acquired. - resetLastPing(); - - try { - - // Check the clock skew again. If we check this before we get a lock - // and after the lock times out, we can be pretty sure the time is - // increasing at the same rate on all servers and therefore our - // timeout is accurate - if (isRemoteTimeSkewed()) { - string msg(str::stream() << "remote time in cluster " - << _conn.toString() - << " is now skewed, cannot force lock."); - throw LockException(msg, ErrorCodes::DistributedClockSkewed); - } - - // Make sure we break the lock with the correct "ts" (OID) value, otherwise - // we can overwrite a new lock inserted in the meantime. - conn->update( LocksType::ConfigNS, - BSON( LocksType::name(_name) << - LocksType::state(o[LocksType::state()].numberInt()) << - LocksType::lockID(o[LocksType::lockID()].OID()) ), - BSON( "$set" << BSON( LocksType::state(0) ) ) ); - - BSONObj err = conn->getLastErrorDetailed(); - string errMsg = DBClientWithCommands::getLastErrorString(err); - - // TODO: Clean up all the extra code to exit this method, probably with a refactor - if ( !errMsg.empty() || !err["n"].type() || err["n"].numberInt() < 1 ) { - logErrMsgOrWarn("Could not force lock", lockName, errMsg, "(another force won"); - *other = o; other->getOwned(); conn.done(); - return false; - } + if (elapsed > takeover) { + // Lock may forced, reset our timer if succeeds or fails + // Ensures that another timeout must happen if something borks up here, and resets our pristine + // ping state if acquired. + resetLastPing(); + try { + // Check the clock skew again. If we check this before we get a lock + // and after the lock times out, we can be pretty sure the time is + // increasing at the same rate on all servers and therefore our + // timeout is accurate + if (isRemoteTimeSkewed()) { + string msg(str::stream() << "remote time in cluster " << _conn.toString() + << " is now skewed, cannot force lock."); + throw LockException(msg, ErrorCodes::DistributedClockSkewed); } - catch( UpdateNotTheSame& ) { - // Ok to continue since we know we forced at least one lock document, and all lock docs - // are required for a lock to be held. - warning() << "lock forcing " << lockName << " inconsistent" << endl; - } - catch (const LockException& ) { - // Let the exception go up and don't repackage the exception. - throw; - } - catch( std::exception& e ) { + + // Make sure we break the lock with the correct "ts" (OID) value, otherwise + // we can overwrite a new lock inserted in the meantime. + conn->update(LocksType::ConfigNS, + BSON(LocksType::name(_name) + << LocksType::state(o[LocksType::state()].numberInt()) + << LocksType::lockID(o[LocksType::lockID()].OID())), + BSON("$set" << BSON(LocksType::state(0)))); + + BSONObj err = conn->getLastErrorDetailed(); + string errMsg = DBClientWithCommands::getLastErrorString(err); + + // TODO: Clean up all the extra code to exit this method, probably with a refactor + if (!errMsg.empty() || !err["n"].type() || err["n"].numberInt() < 1) { + logErrMsgOrWarn( + "Could not force lock", lockName, errMsg, "(another force won"); + *other = o; + other->getOwned(); conn.done(); - string msg(str::stream() << "exception forcing distributed lock " - << lockName << causedBy(e)); - throw LockException(msg, 13660); + return false; } + } catch (UpdateNotTheSame&) { + // Ok to continue since we know we forced at least one lock document, and all lock docs + // are required for a lock to be held. + warning() << "lock forcing " << lockName << " inconsistent" << endl; + } catch (const LockException&) { + // Let the exception go up and don't repackage the exception. + throw; + } catch (std::exception& e) { + conn.done(); + string msg(str::stream() << "exception forcing distributed lock " << lockName + << causedBy(e)); + throw LockException(msg, 13660); } - else { - // Not strictly necessary, but helpful for small timeouts where thread - // scheduling is significant. This ensures that two attempts are still - // required for a force if not acquired, and resets our state if we - // are acquired. - resetLastPing(); - - // Test that the lock is held by trying to update the finalized state of the lock to the same state - // if it does not update or does not update on all servers, we can't re-enter. - try { - - // Test the lock with the correct "ts" (OID) value - conn->update( LocksType::ConfigNS, - BSON( LocksType::name(_name) << - LocksType::state(2) << - LocksType::lockID(o[LocksType::lockID()].OID()) ), - BSON( "$set" << BSON( LocksType::state(2) ) ) ); - - BSONObj err = conn->getLastErrorDetailed(); - string errMsg = DBClientWithCommands::getLastErrorString(err); - - // TODO: Clean up all the extra code to exit this method, probably with a refactor - if ( ! errMsg.empty() || ! err["n"].type() || err["n"].numberInt() < 1 ) { - logErrMsgOrWarn("Could not re-enter lock", lockName, errMsg, "(not sure lock is held"); - *other = o; other->getOwned(); conn.done(); - return false; - } - } - catch( UpdateNotTheSame& ) { - // NOT ok to continue since our lock isn't held by all servers, so isn't valid. - warning() << "inconsistent state re-entering lock, lock " << lockName << " not held" << endl; - *other = o; other->getOwned(); conn.done(); - return false; - } - catch( std::exception& e ) { + } else { + // Not strictly necessary, but helpful for small timeouts where thread + // scheduling is significant. This ensures that two attempts are still + // required for a force if not acquired, and resets our state if we + // are acquired. + resetLastPing(); + + // Test that the lock is held by trying to update the finalized state of the lock to the same state + // if it does not update or does not update on all servers, we can't re-enter. + try { + // Test the lock with the correct "ts" (OID) value + conn->update(LocksType::ConfigNS, + BSON(LocksType::name(_name) + << LocksType::state(2) + << LocksType::lockID(o[LocksType::lockID()].OID())), + BSON("$set" << BSON(LocksType::state(2)))); + + BSONObj err = conn->getLastErrorDetailed(); + string errMsg = DBClientWithCommands::getLastErrorString(err); + + // TODO: Clean up all the extra code to exit this method, probably with a refactor + if (!errMsg.empty() || !err["n"].type() || err["n"].numberInt() < 1) { + logErrMsgOrWarn( + "Could not re-enter lock", lockName, errMsg, "(not sure lock is held"); + *other = o; + other->getOwned(); conn.done(); - string msg(str::stream() << "exception re-entering distributed lock " - << lockName << causedBy(e)); - throw LockException(msg, 13660); + return false; } - LOG( logLvl - 1 ) << "re-entered distributed lock '" << lockName << "'" << endl; - *other = o.getOwned(); + } catch (UpdateNotTheSame&) { + // NOT ok to continue since our lock isn't held by all servers, so isn't valid. + warning() << "inconsistent state re-entering lock, lock " << lockName + << " not held" << endl; + *other = o; + other->getOwned(); conn.done(); - return true; - + return false; + } catch (std::exception& e) { + conn.done(); + string msg(str::stream() << "exception re-entering distributed lock " + << lockName << causedBy(e)); + throw LockException(msg, 13660); } - LOG( logLvl - 1 ) << "lock '" << lockName << "' successfully forced" << endl; - - // We don't need the ts value in the query, since we will only ever replace locks with state=0. - } - // Case 3: We have an expired lock - else if ( o[LocksType::lockID()].type() ) { - queryBuilder.append( o[LocksType::lockID()] ); + LOG(logLvl - 1) << "re-entered distributed lock '" << lockName << "'" << endl; + *other = o.getOwned(); + conn.done(); + return true; } - } - // Always reset our ping if we're trying to get a lock, since getting a lock implies the lock state is open - // and no locks need to be forced. If anything goes wrong, we don't want to remember an old lock. - resetLastPing(); + LOG(logLvl - 1) << "lock '" << lockName << "' successfully forced" << endl; - bool gotLock = false; - BSONObj currLock; + // We don't need the ts value in the query, since we will only ever replace locks with state=0. + } + // Case 3: We have an expired lock + else if (o[LocksType::lockID()].type()) { + queryBuilder.append(o[LocksType::lockID()]); + } + } - BSONObj lockDetails = BSON( LocksType::state(1) - << LocksType::who(getDistLockId()) - << LocksType::process(_processId) - << LocksType::when(jsTime()) - << LocksType::why(why) - << LocksType::lockID(OID::gen()) ); - BSONObj whatIWant = BSON( "$set" << lockDetails ); + // Always reset our ping if we're trying to get a lock, since getting a lock implies the lock state is open + // and no locks need to be forced. If anything goes wrong, we don't want to remember an old lock. + resetLastPing(); - BSONObj query = queryBuilder.obj(); + bool gotLock = false; + BSONObj currLock; - string lockName = _name + string("/") + _processId; + BSONObj lockDetails = + BSON(LocksType::state(1) << LocksType::who(getDistLockId()) + << LocksType::process(_processId) << LocksType::when(jsTime()) + << LocksType::why(why) << LocksType::lockID(OID::gen())); + BSONObj whatIWant = BSON("$set" << lockDetails); - try { + BSONObj query = queryBuilder.obj(); - // Main codepath to acquire lock + string lockName = _name + string("/") + _processId; - LOG( logLvl ) << "about to acquire distributed lock '" << lockName << "'"; + try { + // Main codepath to acquire lock - LOG( logLvl + 1 ) << "trying to acquire lock " << query.toString( false, true ) - << " with details " << lockDetails.toString( false, true ) << endl; + LOG(logLvl) << "about to acquire distributed lock '" << lockName << "'"; - conn->update( LocksType::ConfigNS , query , whatIWant ); + LOG(logLvl + 1) << "trying to acquire lock " << query.toString(false, true) + << " with details " << lockDetails.toString(false, true) << endl; - BSONObj err = conn->getLastErrorDetailed(); - string errMsg = DBClientWithCommands::getLastErrorString(err); + conn->update(LocksType::ConfigNS, query, whatIWant); - currLock = conn->findOne( LocksType::ConfigNS , BSON( LocksType::name(_name) ) ); + BSONObj err = conn->getLastErrorDetailed(); + string errMsg = DBClientWithCommands::getLastErrorString(err); - if ( !errMsg.empty() || !err["n"].type() || err["n"].numberInt() < 1 ) { - logErrMsgOrWarn("could not acquire lock", lockName, errMsg, "(another update won)"); - *other = currLock; - other->getOwned(); - gotLock = false; - } - else { - gotLock = true; - } + currLock = conn->findOne(LocksType::ConfigNS, BSON(LocksType::name(_name))); + if (!errMsg.empty() || !err["n"].type() || err["n"].numberInt() < 1) { + logErrMsgOrWarn("could not acquire lock", lockName, errMsg, "(another update won)"); + *other = currLock; + other->getOwned(); + gotLock = false; + } else { + gotLock = true; } - catch ( UpdateNotTheSame& up ) { - - // this means our update got through on some, but not others - warning() << "distributed lock '" << lockName << " did not propagate properly." << causedBy( up ) << endl; - - // Overall protection derives from: - // All unlocking updates use the ts value when setting state to 0 - // This ensures that during locking, we can override all smaller ts locks with - // our own safe ts value and not be unlocked afterward. - for ( unsigned i = 0; i < up.size(); i++ ) { - ScopedDbConnection indDB(up[i].first); - BSONObj indUpdate; + } catch (UpdateNotTheSame& up) { + // this means our update got through on some, but not others + warning() << "distributed lock '" << lockName << " did not propagate properly." + << causedBy(up) << endl; - try { + // Overall protection derives from: + // All unlocking updates use the ts value when setting state to 0 + // This ensures that during locking, we can override all smaller ts locks with + // our own safe ts value and not be unlocked afterward. + for (unsigned i = 0; i < up.size(); i++) { + ScopedDbConnection indDB(up[i].first); + BSONObj indUpdate; - indUpdate = indDB->findOne( LocksType::ConfigNS , BSON( LocksType::name(_name) ) ); - - // If we override this lock in any way, grab and protect it. - // We assume/ensure that if a process does not have all lock documents, it is no longer - // holding the lock. - // Note - finalized locks may compete too, but we know they've won already if competing - // in this round. Cleanup of crashes during finalizing may take a few tries. - if( indUpdate[LocksType::lockID()] < lockDetails[LocksType::lockID()] || indUpdate[LocksType::state()].numberInt() == 0 ) { - - BSONObj grabQuery = BSON( LocksType::name(_name) - << LocksType::lockID(indUpdate[LocksType::lockID()].OID()) ); - - // Change ts so we won't be forced, state so we won't be relocked - BSONObj grabChanges = BSON( LocksType::lockID(lockDetails[LocksType::lockID()].OID()) - << LocksType::state(1) ); - - // Either our update will succeed, and we'll grab the lock, or it will fail b/c some other - // process grabbed the lock (which will change the ts), but the lock will be set until forcing - indDB->update( LocksType::ConfigNS, grabQuery, BSON( "$set" << grabChanges ) ); - - indUpdate = indDB->findOne( LocksType::ConfigNS, BSON( LocksType::name(_name) ) ); - - // The tournament was interfered and it is not safe to proceed further. - // One case this could happen is when the LockPinger processes old - // entries from addUnlockOID. See SERVER-10688 for more detailed - // description of race. - if ( indUpdate[LocksType::state()].numberInt() <= 0 ) { - LOG( logLvl - 1 ) << "lock tournament interrupted, " - << "so no lock was taken; " - << "new state of lock: " << indUpdate << endl; - - // We now break and set our currLock lockID value to zero, so that - // we know that we did not acquire the lock below. Later code will - // cleanup failed entries. - currLock = BSON(LocksType::lockID(OID())); - indDB.done(); - break; - } + try { + indUpdate = indDB->findOne(LocksType::ConfigNS, BSON(LocksType::name(_name))); + + // If we override this lock in any way, grab and protect it. + // We assume/ensure that if a process does not have all lock documents, it is no longer + // holding the lock. + // Note - finalized locks may compete too, but we know they've won already if competing + // in this round. Cleanup of crashes during finalizing may take a few tries. + if (indUpdate[LocksType::lockID()] < lockDetails[LocksType::lockID()] || + indUpdate[LocksType::state()].numberInt() == 0) { + BSONObj grabQuery = + BSON(LocksType::name(_name) + << LocksType::lockID(indUpdate[LocksType::lockID()].OID())); + + // Change ts so we won't be forced, state so we won't be relocked + BSONObj grabChanges = + BSON(LocksType::lockID(lockDetails[LocksType::lockID()].OID()) + << LocksType::state(1)); + + // Either our update will succeed, and we'll grab the lock, or it will fail b/c some other + // process grabbed the lock (which will change the ts), but the lock will be set until forcing + indDB->update(LocksType::ConfigNS, grabQuery, BSON("$set" << grabChanges)); + + indUpdate = indDB->findOne(LocksType::ConfigNS, BSON(LocksType::name(_name))); + + // The tournament was interfered and it is not safe to proceed further. + // One case this could happen is when the LockPinger processes old + // entries from addUnlockOID. See SERVER-10688 for more detailed + // description of race. + if (indUpdate[LocksType::state()].numberInt() <= 0) { + LOG(logLvl - 1) << "lock tournament interrupted, " + << "so no lock was taken; " + << "new state of lock: " << indUpdate << endl; + + // We now break and set our currLock lockID value to zero, so that + // we know that we did not acquire the lock below. Later code will + // cleanup failed entries. + currLock = BSON(LocksType::lockID(OID())); + indDB.done(); + break; } - // else our lock is the same, in which case we're safe, or it's a bigger lock, - // in which case we won't need to protect anything since we won't have the lock. - - } - catch( std::exception& e ) { - conn.done(); - string msg(str::stream() << "distributed lock " << lockName - << " had errors communicating with individual server " - << up[1].first << causedBy(e)); - throw LockException(msg, 13661); - } - - verify( !indUpdate.isEmpty() ); - - // Find max TS value - if ( currLock.isEmpty() || currLock[LocksType::lockID()] < indUpdate[LocksType::lockID()] ) { - currLock = indUpdate.getOwned(); } + // else our lock is the same, in which case we're safe, or it's a bigger lock, + // in which case we won't need to protect anything since we won't have the lock. - indDB.done(); - + } catch (std::exception& e) { + conn.done(); + string msg(str::stream() << "distributed lock " << lockName + << " had errors communicating with individual server " + << up[1].first << causedBy(e)); + throw LockException(msg, 13661); } - // Locks on all servers are now set and safe until forcing + verify(!indUpdate.isEmpty()); - if ( currLock[LocksType::lockID()] == lockDetails[LocksType::lockID()] ) { - LOG( logLvl - 1 ) << "lock update won, completing lock propagation for '" << lockName << "'" << endl; - gotLock = true; + // Find max TS value + if (currLock.isEmpty() || + currLock[LocksType::lockID()] < indUpdate[LocksType::lockID()]) { + currLock = indUpdate.getOwned(); } - else { - LOG( logLvl - 1 ) << "lock update lost, lock '" << lockName << "' not propagated." << endl; - gotLock = false; - } - } - catch( std::exception& e ) { - conn.done(); - string msg(str::stream() << "exception creating distributed lock " - << lockName << causedBy(e)); - throw LockException(msg, 13663 ); - } - // Complete lock propagation - if( gotLock ) { + indDB.done(); + } - // This is now safe, since we know that no new locks will be placed on top of the ones we've checked for at - // least 15 minutes. Sets the state = 2, so that future clients can determine that the lock is truly set. - // The invariant for rollbacks is that we will never force locks with state = 2 and active pings, since that - // indicates the lock is active, but this means the process creating/destroying them must explicitly poll - // when something goes wrong. - try { + // Locks on all servers are now set and safe until forcing - BSONObjBuilder finalLockDetails; - BSONObjIterator bi( lockDetails ); - while( bi.more() ) { - BSONElement el = bi.next(); - if( (string) ( el.fieldName() ) == LocksType::state() ) - finalLockDetails.append( LocksType::state(), 2 ); - else finalLockDetails.append( el ); - } + if (currLock[LocksType::lockID()] == lockDetails[LocksType::lockID()]) { + LOG(logLvl - 1) << "lock update won, completing lock propagation for '" << lockName + << "'" << endl; + gotLock = true; + } else { + LOG(logLvl - 1) << "lock update lost, lock '" << lockName << "' not propagated." + << endl; + gotLock = false; + } + } catch (std::exception& e) { + conn.done(); + string msg(str::stream() << "exception creating distributed lock " << lockName + << causedBy(e)); + throw LockException(msg, 13663); + } - conn->update( LocksType::ConfigNS , BSON( LocksType::name(_name) ) , BSON( "$set" << finalLockDetails.obj() ) ); + // Complete lock propagation + if (gotLock) { + // This is now safe, since we know that no new locks will be placed on top of the ones we've checked for at + // least 15 minutes. Sets the state = 2, so that future clients can determine that the lock is truly set. + // The invariant for rollbacks is that we will never force locks with state = 2 and active pings, since that + // indicates the lock is active, but this means the process creating/destroying them must explicitly poll + // when something goes wrong. + try { + BSONObjBuilder finalLockDetails; + BSONObjIterator bi(lockDetails); + while (bi.more()) { + BSONElement el = bi.next(); + if ((string)(el.fieldName()) == LocksType::state()) + finalLockDetails.append(LocksType::state(), 2); + else + finalLockDetails.append(el); + } - BSONObj err = conn->getLastErrorDetailed(); - string errMsg = DBClientWithCommands::getLastErrorString(err); + conn->update(LocksType::ConfigNS, + BSON(LocksType::name(_name)), + BSON("$set" << finalLockDetails.obj())); - currLock = conn->findOne( LocksType::ConfigNS , BSON( LocksType::name(_name) ) ); + BSONObj err = conn->getLastErrorDetailed(); + string errMsg = DBClientWithCommands::getLastErrorString(err); - if ( !errMsg.empty() || !err["n"].type() || err["n"].numberInt() < 1 ) { - warning() << "could not finalize winning lock " << lockName - << ( !errMsg.empty() ? causedBy( errMsg ) : " (did not update lock) " ) << endl; - gotLock = false; - } - else { - // SUCCESS! - gotLock = true; - } + currLock = conn->findOne(LocksType::ConfigNS, BSON(LocksType::name(_name))); - } - catch( std::exception& e ) { - conn.done(); - string msg(str::stream() << "exception finalizing winning lock" << causedBy(e)); - // Inform caller about the potential orphan lock. - throw LockException(msg, - 13662, - lockDetails[LocksType::lockID()].OID()); + if (!errMsg.empty() || !err["n"].type() || err["n"].numberInt() < 1) { + warning() << "could not finalize winning lock " << lockName + << (!errMsg.empty() ? causedBy(errMsg) : " (did not update lock) ") + << endl; + gotLock = false; + } else { + // SUCCESS! + gotLock = true; } + } catch (std::exception& e) { + conn.done(); + string msg(str::stream() << "exception finalizing winning lock" << causedBy(e)); + // Inform caller about the potential orphan lock. + throw LockException(msg, 13662, lockDetails[LocksType::lockID()].OID()); } - - *other = currLock; - other->getOwned(); - - // Log our lock results - if(gotLock) - LOG( logLvl - 1 ) << "distributed lock '" << lockName << - "' acquired, ts : " << currLock[LocksType::lockID()].OID() << endl; - else - LOG( logLvl - 1 ) << "distributed lock '" << lockName << "' was not acquired." << endl; - - conn.done(); - - return gotLock; } - // This function *must not* throw exceptions, since it can be used in destructors - failure - // results in queuing and trying again later. - bool DistributedLock::unlock(const DistLockHandle& lockID) { + *other = currLock; + other->getOwned(); - verify(_name != ""); - string lockName = _name + string("/") + _processId; + // Log our lock results + if (gotLock) + LOG(logLvl - 1) << "distributed lock '" << lockName + << "' acquired, ts : " << currLock[LocksType::lockID()].OID() << endl; + else + LOG(logLvl - 1) << "distributed lock '" << lockName << "' was not acquired." << endl; - const int maxAttempts = 3; - int attempted = 0; + conn.done(); - while ( ++attempted <= maxAttempts ) { + return gotLock; +} - // Awkward, but necessary since the constructor itself throws exceptions - unique_ptr<ScopedDbConnection> connPtr; +// This function *must not* throw exceptions, since it can be used in destructors - failure +// results in queuing and trying again later. +bool DistributedLock::unlock(const DistLockHandle& lockID) { + verify(_name != ""); + string lockName = _name + string("/") + _processId; - try { + const int maxAttempts = 3; + int attempted = 0; - connPtr.reset( new ScopedDbConnection( _conn.toString() ) ); - ScopedDbConnection& conn = *connPtr; + while (++attempted <= maxAttempts) { + // Awkward, but necessary since the constructor itself throws exceptions + unique_ptr<ScopedDbConnection> connPtr; - // Use ts when updating lock, so that new locks can be sure they won't get trampled. - conn->update(LocksType::ConfigNS, - BSON(LocksType::name(_name) - << LocksType::lockID(lockID)), - BSON("$set" << BSON(LocksType::state(0)))); + try { + connPtr.reset(new ScopedDbConnection(_conn.toString())); + ScopedDbConnection& conn = *connPtr; - // Check that the lock was actually unlocked... if not, try again - BSONObj err = conn->getLastErrorDetailed(); - string errMsg = DBClientWithCommands::getLastErrorString(err); + // Use ts when updating lock, so that new locks can be sure they won't get trampled. + conn->update(LocksType::ConfigNS, + BSON(LocksType::name(_name) << LocksType::lockID(lockID)), + BSON("$set" << BSON(LocksType::state(0)))); - if ( !errMsg.empty() || !err["n"].type() || err["n"].numberInt() < 1 ){ - warning() << "distributed lock unlock update failed, retrying " - << ( errMsg.empty() ? causedBy( "( update not registered )" ) : causedBy( errMsg ) ) << endl; - conn.done(); - continue; - } + // Check that the lock was actually unlocked... if not, try again + BSONObj err = conn->getLastErrorDetailed(); + string errMsg = DBClientWithCommands::getLastErrorString(err); - LOG(0) << "distributed lock '" << lockName << "' unlocked. "; + if (!errMsg.empty() || !err["n"].type() || err["n"].numberInt() < 1) { + warning() << "distributed lock unlock update failed, retrying " + << (errMsg.empty() ? causedBy("( update not registered )") + : causedBy(errMsg)) << endl; conn.done(); - return true; - } - catch (const UpdateNotTheSame&) { - LOG(0) << "distributed lock '" << lockName << "' unlocked (messily). "; - // This isn't a connection problem, so don't throw away the conn - connPtr->done(); - return true; + continue; } - catch ( std::exception& e) { - warning() << "distributed lock '" << lockName << "' failed unlock attempt." - << causedBy( e ) << endl; - // TODO: If our lock timeout is small, sleeping this long may be unsafe. - if( attempted != maxAttempts) sleepsecs(1 << attempted); - } + LOG(0) << "distributed lock '" << lockName << "' unlocked. "; + conn.done(); + return true; + } catch (const UpdateNotTheSame&) { + LOG(0) << "distributed lock '" << lockName << "' unlocked (messily). "; + // This isn't a connection problem, so don't throw away the conn + connPtr->done(); + return true; + } catch (std::exception& e) { + warning() << "distributed lock '" << lockName << "' failed unlock attempt." + << causedBy(e) << endl; + + // TODO: If our lock timeout is small, sleeping this long may be unsafe. + if (attempted != maxAttempts) + sleepsecs(1 << attempted); } - - return false; } + return false; +} } diff --git a/src/mongo/s/catalog/legacy/distlock.h b/src/mongo/s/catalog/legacy/distlock.h index b569080147e..b267be42a04 100644 --- a/src/mongo/s/catalog/legacy/distlock.h +++ b/src/mongo/s/catalog/legacy/distlock.h @@ -37,199 +37,198 @@ namespace mongo { - namespace { - - enum TimeConstants { - LOCK_TIMEOUT = 15 * 60 * 1000, - LOCK_SKEW_FACTOR = 30, - LOCK_PING = LOCK_TIMEOUT / LOCK_SKEW_FACTOR, - MAX_LOCK_NET_SKEW = LOCK_TIMEOUT / LOCK_SKEW_FACTOR, - MAX_LOCK_CLOCK_SKEW = LOCK_TIMEOUT / LOCK_SKEW_FACTOR, - NUM_LOCK_SKEW_CHECKS = 3, - }; - - // The maximum clock skew we need to handle between config servers is - // 2 * MAX_LOCK_NET_SKEW + MAX_LOCK_CLOCK_SKEW. - - // Net effect of *this* clock being slow is effectively a multiplier on the max net skew - // and a linear increase or decrease of the max clock skew. - } +namespace { + +enum TimeConstants { + LOCK_TIMEOUT = 15 * 60 * 1000, + LOCK_SKEW_FACTOR = 30, + LOCK_PING = LOCK_TIMEOUT / LOCK_SKEW_FACTOR, + MAX_LOCK_NET_SKEW = LOCK_TIMEOUT / LOCK_SKEW_FACTOR, + MAX_LOCK_CLOCK_SKEW = LOCK_TIMEOUT / LOCK_SKEW_FACTOR, + NUM_LOCK_SKEW_CHECKS = 3, +}; + +// The maximum clock skew we need to handle between config servers is +// 2 * MAX_LOCK_NET_SKEW + MAX_LOCK_CLOCK_SKEW. + +// Net effect of *this* clock being slow is effectively a multiplier on the max net skew +// and a linear increase or decrease of the max clock skew. +} + +/** + * Exception class to encapsulate exceptions while managing distributed locks + */ +class LockException : public DBException { +public: + LockException(StringData msg, int code); /** - * Exception class to encapsulate exceptions while managing distributed locks + * Use this to signal that a lock with lockID needs to be unlocked. For example, in cases + * where the final lock acquisition was not propagated properly to all config servers. */ - class LockException : public DBException { - public: - LockException(StringData msg, int code); + LockException(StringData msg, int code, DistLockHandle lockID); - /** - * Use this to signal that a lock with lockID needs to be unlocked. For example, in cases - * where the final lock acquisition was not propagated properly to all config servers. - */ - LockException(StringData msg, int code, DistLockHandle lockID); + /** + * Returns the OID of the lock that needs to be unlocked. + */ + DistLockHandle getMustUnlockID() const; - /** - * Returns the OID of the lock that needs to be unlocked. - */ - DistLockHandle getMustUnlockID() const; + virtual ~LockException() = default; - virtual ~LockException() = default; +private: + // The identifier of a lock that needs to be unlocked. + DistLockHandle _mustUnlockID; +}; - private: - // The identifier of a lock that needs to be unlocked. - DistLockHandle _mustUnlockID; - }; +/** + * Indicates an error in retrieving time values from remote servers. + */ +class TimeNotFoundException : public LockException { +public: + TimeNotFoundException(const char* msg, int code) : LockException(msg, code) {} + TimeNotFoundException(const std::string& msg, int code) : LockException(msg, code) {} + virtual ~TimeNotFoundException() = default; +}; + +/** + * The distributed lock is a configdb backed way of synchronizing system-wide tasks. A task + * must be identified by a unique name across the system (e.g., "balancer"). A lock is taken + * by writing a document in the configdb's locks collection with that name. + * + * To be maintained, each taken lock needs to be revalidated ("pinged") within a + * pre-established amount of time. This class does this maintenance automatically once a + * DistributedLock object was constructed. The ping procedure records the local time to + * the ping document, but that time is untrusted and is only used as a point of reference + * of whether the ping was refreshed or not. Ultimately, the clock a configdb is the source + * of truth when determining whether a ping is still fresh or not. This is achieved by + * (1) remembering the ping document time along with config server time when unable to + * take a lock, and (2) ensuring all config servers report similar times and have similar + * time rates (the difference in times must start and stay small). + * + * Lock states include: + * 0: unlocked + * 1: about to be locked + * 2: locked + * + * Valid state transitions: + * 0 -> 1 + * 1 -> 2 + * 2 -> 0 + * + * Note that at any point in time, a lock can be force unlocked if the ping for the lock + * becomes too stale. + */ +class DistributedLock { +public: + static logger::LabeledLevel logLvl; - /** - * Indicates an error in retrieving time values from remote servers. - */ - class TimeNotFoundException : public LockException { + class LastPings { public: - TimeNotFoundException( const char * msg , int code ) : LockException( msg, code ) {} - TimeNotFoundException( const std::string& msg, int code ) : LockException( msg, code ) {} - virtual ~TimeNotFoundException() = default; + DistLockPingInfo getLastPing(const ConnectionString& conn, const std::string& lockName); + void setLastPing(const ConnectionString& conn, + const std::string& lockName, + const DistLockPingInfo& pd); + + stdx::mutex _mutex; + std::map<std::pair<std::string, std::string>, DistLockPingInfo> _lastPings; }; + static LastPings lastPings; + /** - * The distributed lock is a configdb backed way of synchronizing system-wide tasks. A task - * must be identified by a unique name across the system (e.g., "balancer"). A lock is taken - * by writing a document in the configdb's locks collection with that name. + * The constructor does not connect to the configdb yet and constructing does not mean the lock was acquired. + * Construction does trigger a lock "pinging" mechanism, though. * - * To be maintained, each taken lock needs to be revalidated ("pinged") within a - * pre-established amount of time. This class does this maintenance automatically once a - * DistributedLock object was constructed. The ping procedure records the local time to - * the ping document, but that time is untrusted and is only used as a point of reference - * of whether the ping was refreshed or not. Ultimately, the clock a configdb is the source - * of truth when determining whether a ping is still fresh or not. This is achieved by - * (1) remembering the ping document time along with config server time when unable to - * take a lock, and (2) ensuring all config servers report similar times and have similar - * time rates (the difference in times must start and stay small). + * @param conn address of config(s) server(s) + * @param name identifier for the lock + * @param lockTimeout how long can the log go "unpinged" before a new attempt to lock steals it (in minutes). + * @param lockPing how long to wait between lock pings + * @param legacy use legacy logic * - * Lock states include: - * 0: unlocked - * 1: about to be locked - * 2: locked - * - * Valid state transitions: - * 0 -> 1 - * 1 -> 2 - * 2 -> 0 + */ + DistributedLock(const ConnectionString& conn, + const std::string& name, + unsigned long long lockTimeout = 0, + bool asProcess = false); + ~DistributedLock(){}; + + /** + * Attempts to acquire 'this' lock, checking if it could or should be stolen from the previous holder. Please + * consider using the dist_lock_try construct to acquire this lock in an exception safe way. * - * Note that at any point in time, a lock can be force unlocked if the ping for the lock - * becomes too stale. + * @param why human readable description of why the lock is being taken (used to log) + * @param other configdb's lock document that is currently holding the lock, if lock is taken, or our own lock + * details if not + * @return true if it managed to grab the lock */ - class DistributedLock { - public: + bool lock_try(const std::string& why, BSONObj* other = 0, double timeout = 0.0); - static logger::LabeledLevel logLvl; - - class LastPings { - public: - DistLockPingInfo getLastPing(const ConnectionString& conn, - const std::string& lockName); - void setLastPing(const ConnectionString& conn, - const std::string& lockName, - const DistLockPingInfo& pd); - - stdx::mutex _mutex; - std::map< std::pair<std::string, std::string>, DistLockPingInfo > _lastPings; - }; - - static LastPings lastPings; - - /** - * The constructor does not connect to the configdb yet and constructing does not mean the lock was acquired. - * Construction does trigger a lock "pinging" mechanism, though. - * - * @param conn address of config(s) server(s) - * @param name identifier for the lock - * @param lockTimeout how long can the log go "unpinged" before a new attempt to lock steals it (in minutes). - * @param lockPing how long to wait between lock pings - * @param legacy use legacy logic - * - */ - DistributedLock( const ConnectionString& conn , const std::string& name , unsigned long long lockTimeout = 0, bool asProcess = false ); - ~DistributedLock(){}; - - /** - * Attempts to acquire 'this' lock, checking if it could or should be stolen from the previous holder. Please - * consider using the dist_lock_try construct to acquire this lock in an exception safe way. - * - * @param why human readable description of why the lock is being taken (used to log) - * @param other configdb's lock document that is currently holding the lock, if lock is taken, or our own lock - * details if not - * @return true if it managed to grab the lock - */ - bool lock_try(const std::string& why, BSONObj* other = 0, double timeout = 0.0); - - /** - * Returns OK if this lock is held (but does not guarantee that this owns it) and - * it was possible to confirm that, within 'timeout' seconds, if provided, with the - * config servers. - */ - Status checkStatus(double timeout); - - /** - * Releases a previously taken lock. Returns true on success. - */ - bool unlock(const OID& lockID); - - Date_t getRemoteTime() const; - - bool isRemoteTimeSkewed() const; - - const std::string& getProcessId() const; - - const ConnectionString& getRemoteConnection() const; - - /** - * Checks the skew among a cluster of servers and returns true if the min and max clock - * times among the servers are within maxClockSkew. - */ - static bool checkSkew( const ConnectionString& cluster, - unsigned skewChecks = NUM_LOCK_SKEW_CHECKS, - unsigned long long maxClockSkew = MAX_LOCK_CLOCK_SKEW, - unsigned long long maxNetSkew = MAX_LOCK_NET_SKEW ); - - /** - * Get the remote time from a server or cluster - */ - static Date_t remoteTime( const ConnectionString& cluster, unsigned long long maxNetSkew = MAX_LOCK_NET_SKEW ); - - /** - * Namespace for lock pings - */ - static const std::string lockPingNS; - - /** - * Namespace for locks - */ - static const std::string locksNS; - - const ConnectionString _conn; - const std::string _name; - const std::string _processId; - - // Timeout for lock, usually LOCK_TIMEOUT - const unsigned long long _lockTimeout; - const unsigned long long _maxClockSkew; - const unsigned long long _maxNetSkew; - const unsigned long long _lockPing; - - private: - - void resetLastPing() { - lastPings.setLastPing(_conn, _name, DistLockPingInfo()); - } - - void setLastPing(const DistLockPingInfo& pd) { - lastPings.setLastPing(_conn, _name, pd); - } - - DistLockPingInfo getLastPing() { - return lastPings.getLastPing(_conn, _name); - } - }; + /** + * Returns OK if this lock is held (but does not guarantee that this owns it) and + * it was possible to confirm that, within 'timeout' seconds, if provided, with the + * config servers. + */ + Status checkStatus(double timeout); -} + /** + * Releases a previously taken lock. Returns true on success. + */ + bool unlock(const OID& lockID); + + Date_t getRemoteTime() const; + + bool isRemoteTimeSkewed() const; + + const std::string& getProcessId() const; + + const ConnectionString& getRemoteConnection() const; + + /** + * Checks the skew among a cluster of servers and returns true if the min and max clock + * times among the servers are within maxClockSkew. + */ + static bool checkSkew(const ConnectionString& cluster, + unsigned skewChecks = NUM_LOCK_SKEW_CHECKS, + unsigned long long maxClockSkew = MAX_LOCK_CLOCK_SKEW, + unsigned long long maxNetSkew = MAX_LOCK_NET_SKEW); + + /** + * Get the remote time from a server or cluster + */ + static Date_t remoteTime(const ConnectionString& cluster, + unsigned long long maxNetSkew = MAX_LOCK_NET_SKEW); + + /** + * Namespace for lock pings + */ + static const std::string lockPingNS; + + /** + * Namespace for locks + */ + static const std::string locksNS; + + const ConnectionString _conn; + const std::string _name; + const std::string _processId; + + // Timeout for lock, usually LOCK_TIMEOUT + const unsigned long long _lockTimeout; + const unsigned long long _maxClockSkew; + const unsigned long long _maxNetSkew; + const unsigned long long _lockPing; +private: + void resetLastPing() { + lastPings.setLastPing(_conn, _name, DistLockPingInfo()); + } + + void setLastPing(const DistLockPingInfo& pd) { + lastPings.setLastPing(_conn, _name, pd); + } + + DistLockPingInfo getLastPing() { + return lastPings.getLastPing(_conn, _name); + } +}; +} diff --git a/src/mongo/s/catalog/legacy/legacy_dist_lock_manager.cpp b/src/mongo/s/catalog/legacy/legacy_dist_lock_manager.cpp index 18cc7d537cf..e375c792f28 100644 --- a/src/mongo/s/catalog/legacy/legacy_dist_lock_manager.cpp +++ b/src/mongo/s/catalog/legacy/legacy_dist_lock_manager.cpp @@ -39,187 +39,176 @@ namespace mongo { - using std::string; - using std::unique_ptr; - using stdx::chrono::milliseconds; +using std::string; +using std::unique_ptr; +using stdx::chrono::milliseconds; namespace { - const stdx::chrono::seconds kDefaultSocketTimeout(30); - const milliseconds kDefaultPingInterval(30 * 1000); -} // unnamed namespace - - LegacyDistLockManager::LegacyDistLockManager(ConnectionString configServer): - _configServer(std::move(configServer)), - _isStopped(false), - _pingerEnabled(true) { - } +const stdx::chrono::seconds kDefaultSocketTimeout(30); +const milliseconds kDefaultPingInterval(30 * 1000); +} // unnamed namespace - void LegacyDistLockManager::startUp() { - stdx::lock_guard<stdx::mutex> sl(_mutex); - invariant(!_pinger); - _pinger = stdx::make_unique<LegacyDistLockPinger>(); - } +LegacyDistLockManager::LegacyDistLockManager(ConnectionString configServer) + : _configServer(std::move(configServer)), _isStopped(false), _pingerEnabled(true) {} - void LegacyDistLockManager::shutDown() { - stdx::unique_lock<stdx::mutex> sl(_mutex); - _isStopped = true; +void LegacyDistLockManager::startUp() { + stdx::lock_guard<stdx::mutex> sl(_mutex); + invariant(!_pinger); + _pinger = stdx::make_unique<LegacyDistLockPinger>(); +} - while (!_lockMap.empty()) { - _noLocksCV.wait(sl); - } +void LegacyDistLockManager::shutDown() { + stdx::unique_lock<stdx::mutex> sl(_mutex); + _isStopped = true; - if (_pinger) { - _pinger->shutdown(); - } + while (!_lockMap.empty()) { + _noLocksCV.wait(sl); } - StatusWith<DistLockManager::ScopedDistLock> LegacyDistLockManager::lock( - StringData name, - StringData whyMessage, - milliseconds waitFor, - milliseconds lockTryInterval) { - - auto distLock = stdx::make_unique<DistributedLock>(_configServer, name.toString()); + if (_pinger) { + _pinger->shutdown(); + } +} - { - stdx::lock_guard<stdx::mutex> sl(_mutex); +StatusWith<DistLockManager::ScopedDistLock> LegacyDistLockManager::lock( + StringData name, StringData whyMessage, milliseconds waitFor, milliseconds lockTryInterval) { + auto distLock = stdx::make_unique<DistributedLock>(_configServer, name.toString()); - if (_isStopped) { - return Status(ErrorCodes::LockBusy, "legacy distlock manager is stopped"); - } + { + stdx::lock_guard<stdx::mutex> sl(_mutex); - if (_pingerEnabled) { - auto pingStatus = _pinger->startPing(*(distLock.get()), kDefaultPingInterval); - if (!pingStatus.isOK()) { - return pingStatus; - } - } + if (_isStopped) { + return Status(ErrorCodes::LockBusy, "legacy distlock manager is stopped"); } - auto lastStatus = Status(ErrorCodes::LockBusy, - str::stream() << "timed out waiting for " << name); - - Timer timer; - Timer msgTimer; - while (waitFor <= milliseconds::zero() || milliseconds(timer.millis()) < waitFor) { - bool acquired = false; - BSONObj lockDoc; - try { - acquired = distLock->lock_try(whyMessage.toString(), - &lockDoc, - kDefaultSocketTimeout.count()); - - if (!acquired) { - lastStatus = Status(ErrorCodes::LockBusy, - str::stream() << "Lock for " << whyMessage - << " is taken."); - } + if (_pingerEnabled) { + auto pingStatus = _pinger->startPing(*(distLock.get()), kDefaultPingInterval); + if (!pingStatus.isOK()) { + return pingStatus; } - catch (const LockException& lockExcep) { - OID needUnlockID(lockExcep.getMustUnlockID()); - if (needUnlockID.isSet()) { - _pinger->addUnlockOID(needUnlockID); - } + } + } - lastStatus = lockExcep.toStatus(); + auto lastStatus = + Status(ErrorCodes::LockBusy, str::stream() << "timed out waiting for " << name); + + Timer timer; + Timer msgTimer; + while (waitFor <= milliseconds::zero() || milliseconds(timer.millis()) < waitFor) { + bool acquired = false; + BSONObj lockDoc; + try { + acquired = + distLock->lock_try(whyMessage.toString(), &lockDoc, kDefaultSocketTimeout.count()); + + if (!acquired) { + lastStatus = Status(ErrorCodes::LockBusy, + str::stream() << "Lock for " << whyMessage << " is taken."); } - catch (...) { - lastStatus = exceptionToStatus(); + } catch (const LockException& lockExcep) { + OID needUnlockID(lockExcep.getMustUnlockID()); + if (needUnlockID.isSet()) { + _pinger->addUnlockOID(needUnlockID); } - if (acquired) { - verify(!lockDoc.isEmpty()); + lastStatus = lockExcep.toStatus(); + } catch (...) { + lastStatus = exceptionToStatus(); + } - LocksType lock; - string errMsg; - if (!lock.parseBSON(lockDoc, &errMsg)) { - return StatusWith<ScopedDistLock>( - ErrorCodes::UnsupportedFormat, - str::stream() << "error while parsing lock document: " << errMsg); - } + if (acquired) { + verify(!lockDoc.isEmpty()); - dassert(lock.isLockIDSet()); + LocksType lock; + string errMsg; + if (!lock.parseBSON(lockDoc, &errMsg)) { + return StatusWith<ScopedDistLock>( + ErrorCodes::UnsupportedFormat, + str::stream() << "error while parsing lock document: " << errMsg); + } - { - stdx::lock_guard<stdx::mutex> sl(_mutex); - _lockMap.insert(std::make_pair(lock.getLockID(), std::move(distLock))); - } + dassert(lock.isLockIDSet()); - return ScopedDistLock(lock.getLockID(), this); + { + stdx::lock_guard<stdx::mutex> sl(_mutex); + _lockMap.insert(std::make_pair(lock.getLockID(), std::move(distLock))); } - if (waitFor == milliseconds::zero()) break; + return ScopedDistLock(lock.getLockID(), this); + } - if (lastStatus != ErrorCodes::LockBusy) { - return lastStatus; - } + if (waitFor == milliseconds::zero()) + break; - // Periodically message for debugging reasons - if (msgTimer.seconds() > 10) { - log() << "waited " << timer.seconds() << "s for distributed lock " << name - << " for " << whyMessage << ": " << lastStatus.toString(); + if (lastStatus != ErrorCodes::LockBusy) { + return lastStatus; + } - msgTimer.reset(); - } + // Periodically message for debugging reasons + if (msgTimer.seconds() > 10) { + log() << "waited " << timer.seconds() << "s for distributed lock " << name << " for " + << whyMessage << ": " << lastStatus.toString(); - milliseconds timeRemaining = - std::max(milliseconds::zero(), waitFor - milliseconds(timer.millis())); - sleepFor(std::min(lockTryInterval, timeRemaining)); + msgTimer.reset(); } - return lastStatus; + milliseconds timeRemaining = + std::max(milliseconds::zero(), waitFor - milliseconds(timer.millis())); + sleepFor(std::min(lockTryInterval, timeRemaining)); } - void LegacyDistLockManager::unlock(const DistLockHandle& lockHandle) BOOST_NOEXCEPT { - unique_ptr<DistributedLock> distLock; + return lastStatus; +} - { - stdx::lock_guard<stdx::mutex> sl(_mutex); - auto iter = _lockMap.find(lockHandle); - invariant(iter != _lockMap.end()); +void LegacyDistLockManager::unlock(const DistLockHandle& lockHandle) BOOST_NOEXCEPT { + unique_ptr<DistributedLock> distLock; - distLock = std::move(iter->second); - _lockMap.erase(iter); - } + { + stdx::lock_guard<stdx::mutex> sl(_mutex); + auto iter = _lockMap.find(lockHandle); + invariant(iter != _lockMap.end()); - if (!distLock->unlock(lockHandle)) { - _pinger->addUnlockOID(lockHandle); - } + distLock = std::move(iter->second); + _lockMap.erase(iter); + } - { - stdx::lock_guard<stdx::mutex> sl(_mutex); - if (_lockMap.empty()) { - _noLocksCV.notify_all(); - } - } + if (!distLock->unlock(lockHandle)) { + _pinger->addUnlockOID(lockHandle); } - Status LegacyDistLockManager::checkStatus(const DistLockHandle& lockHandle) { - // Note: this should not happen when locks are managed through ScopedDistLock. - if (_pinger->willUnlockOID(lockHandle)) { - return Status(ErrorCodes::LockFailed, - str::stream() << "lock " << lockHandle << " is not held and " - << "is currently being scheduled for lazy unlock"); + { + stdx::lock_guard<stdx::mutex> sl(_mutex); + if (_lockMap.empty()) { + _noLocksCV.notify_all(); } + } +} - DistributedLock* distLock = nullptr; +Status LegacyDistLockManager::checkStatus(const DistLockHandle& lockHandle) { + // Note: this should not happen when locks are managed through ScopedDistLock. + if (_pinger->willUnlockOID(lockHandle)) { + return Status(ErrorCodes::LockFailed, + str::stream() << "lock " << lockHandle << " is not held and " + << "is currently being scheduled for lazy unlock"); + } - { - // Assumption: lockHandles are never shared across threads. - stdx::lock_guard<stdx::mutex> sl(_mutex); - auto iter = _lockMap.find(lockHandle); - invariant(iter != _lockMap.end()); + DistributedLock* distLock = nullptr; - distLock = iter->second.get(); - } + { + // Assumption: lockHandles are never shared across threads. + stdx::lock_guard<stdx::mutex> sl(_mutex); + auto iter = _lockMap.find(lockHandle); + invariant(iter != _lockMap.end()); - return distLock->checkStatus(kDefaultSocketTimeout.count()); + distLock = iter->second.get(); } - void LegacyDistLockManager::enablePinger(bool enable) { - stdx::lock_guard<stdx::mutex> sl(_mutex); - _pingerEnabled = enable; - } + return distLock->checkStatus(kDefaultSocketTimeout.count()); +} +void LegacyDistLockManager::enablePinger(bool enable) { + stdx::lock_guard<stdx::mutex> sl(_mutex); + _pingerEnabled = enable; +} } diff --git a/src/mongo/s/catalog/legacy/legacy_dist_lock_manager.h b/src/mongo/s/catalog/legacy/legacy_dist_lock_manager.h index 50a6088c333..238989a3cdb 100644 --- a/src/mongo/s/catalog/legacy/legacy_dist_lock_manager.h +++ b/src/mongo/s/catalog/legacy/legacy_dist_lock_manager.h @@ -41,46 +41,43 @@ namespace mongo { - class DistributedLock; +class DistributedLock; - class LegacyDistLockManager: public DistLockManager { - public: - explicit LegacyDistLockManager(ConnectionString configServer); +class LegacyDistLockManager : public DistLockManager { +public: + explicit LegacyDistLockManager(ConnectionString configServer); - virtual ~LegacyDistLockManager() = default; + virtual ~LegacyDistLockManager() = default; - virtual void startUp() override; - virtual void shutDown() override; + virtual void startUp() override; + virtual void shutDown() override; - virtual StatusWith<DistLockManager::ScopedDistLock> lock( - StringData name, - StringData whyMessage, - stdx::chrono::milliseconds waitFor, - stdx::chrono::milliseconds lockTryInterval) override; + virtual StatusWith<DistLockManager::ScopedDistLock> lock( + StringData name, + StringData whyMessage, + stdx::chrono::milliseconds waitFor, + stdx::chrono::milliseconds lockTryInterval) override; - // For testing only. - void enablePinger(bool enable); + // For testing only. + void enablePinger(bool enable); - protected: +protected: + virtual void unlock(const DistLockHandle& lockHandle) BOOST_NOEXCEPT override; - virtual void unlock(const DistLockHandle& lockHandle) BOOST_NOEXCEPT override; + virtual Status checkStatus(const DistLockHandle& lockHandle) override; - virtual Status checkStatus(const DistLockHandle& lockHandle) override; +private: + const ConnectionString _configServer; - private: + stdx::mutex _mutex; + stdx::condition_variable _noLocksCV; + std::map<DistLockHandle, std::unique_ptr<DistributedLock>> _lockMap; - const ConnectionString _configServer; + std::unique_ptr<LegacyDistLockPinger> _pinger; - stdx::mutex _mutex; - stdx::condition_variable _noLocksCV; - std::map<DistLockHandle, std::unique_ptr<DistributedLock>> _lockMap; - - std::unique_ptr<LegacyDistLockPinger> _pinger; - - bool _isStopped; - - // For testing only. - bool _pingerEnabled; - }; + bool _isStopped; + // For testing only. + bool _pingerEnabled; +}; } diff --git a/src/mongo/s/catalog/legacy/legacy_dist_lock_pinger.cpp b/src/mongo/s/catalog/legacy/legacy_dist_lock_pinger.cpp index 5e5f03b0864..ef5299fd558 100644 --- a/src/mongo/s/catalog/legacy/legacy_dist_lock_pinger.cpp +++ b/src/mongo/s/catalog/legacy/legacy_dist_lock_pinger.cpp @@ -42,318 +42,304 @@ namespace mongo { - using std::string; +using std::string; namespace { - string pingThreadId(const ConnectionString& conn, const string& processId) { - return conn.toString() + "/" + processId; - } +string pingThreadId(const ConnectionString& conn, const string& processId) { + return conn.toString() + "/" + processId; +} } - void LegacyDistLockPinger::_distLockPingThread(ConnectionString addr, - const string& process, - Milliseconds sleepTime) { - setThreadName("LockPinger"); - - string pingId = pingThreadId(addr, process); - - LOG(0) << "creating distributed lock ping thread for " << addr - << " and process " << process << " (sleeping for " << sleepTime.count() << "ms)"; - - static int loops = 0; - Date_t lastPingTime = jsTime(); - while (!shouldStopPinging(addr, process)) { +void LegacyDistLockPinger::_distLockPingThread(ConnectionString addr, + const string& process, + Milliseconds sleepTime) { + setThreadName("LockPinger"); - LOG(3) << "distributed lock pinger '" << pingId << "' about to ping."; + string pingId = pingThreadId(addr, process); - Date_t pingTime; + LOG(0) << "creating distributed lock ping thread for " << addr << " and process " << process + << " (sleeping for " << sleepTime.count() << "ms)"; - try { - ScopedDbConnection conn(addr.toString(), 30.0); + static int loops = 0; + Date_t lastPingTime = jsTime(); + while (!shouldStopPinging(addr, process)) { + LOG(3) << "distributed lock pinger '" << pingId << "' about to ping."; - pingTime = jsTime(); - const auto elapsed = pingTime - lastPingTime; - if (elapsed > 10 * sleepTime) { - warning() << "Lock pinger for addr: " << addr - << ", proc: " << process - << " was inactive for " << elapsed; - } + Date_t pingTime; - lastPingTime = pingTime; + try { + ScopedDbConnection conn(addr.toString(), 30.0); - // Refresh the entry corresponding to this process in the lockpings collection. - conn->update(LockpingsType::ConfigNS, - BSON(LockpingsType::process(process)), - BSON("$set" << BSON(LockpingsType::ping(pingTime))), - true); + pingTime = jsTime(); + const auto elapsed = pingTime - lastPingTime; + if (elapsed > 10 * sleepTime) { + warning() << "Lock pinger for addr: " << addr << ", proc: " << process + << " was inactive for " << elapsed; + } - string err = conn->getLastError(); - if (!err.empty()) { - warning() << "pinging failed for distributed lock pinger '" << pingId << "'." - << causedBy(err); - conn.done(); + lastPingTime = pingTime; - if (!shouldStopPinging(addr, process)) { - waitTillNextPingTime(sleepTime); - } - continue; - } + // Refresh the entry corresponding to this process in the lockpings collection. + conn->update(LockpingsType::ConfigNS, + BSON(LockpingsType::process(process)), + BSON("$set" << BSON(LockpingsType::ping(pingTime))), + true); - // Remove really old entries from the lockpings collection if they're not - // holding a lock. This may happen if an instance of a process was taken down - // and no new instance came up to replace it for a quite a while. - // NOTE this is NOT the same as the standard take-over mechanism, which forces - // the lock entry. - BSONObj fieldsToReturn = BSON(LocksType::state() << 1 - << LocksType::process() << 1); - auto activeLocks = - conn->query(LocksType::ConfigNS, - BSON(LocksType::state() << NE << LocksType::UNLOCKED)); - - uassert(16060, - str::stream() << "cannot query locks collection on config server " - << conn.getHost(), - activeLocks.get()); - - std::set<string> pids; - while (activeLocks->more()) { - BSONObj lock = activeLocks->nextSafe(); - - if (!lock[LocksType::process()].eoo()) { - pids.insert(lock[LocksType::process()].str()); - } - else { - warning() << "found incorrect lock document during lock ping cleanup: " - << lock.toString(); - } - } + string err = conn->getLastError(); + if (!err.empty()) { + warning() << "pinging failed for distributed lock pinger '" << pingId << "'." + << causedBy(err); + conn.done(); - // This can potentially delete ping entries that are actually active (if the clock - // of another pinger is too skewed). This is still fine as the lock logic only - // checks if there is a change in the ping document and the document going away - // is a valid change. - Date_t fourDays = pingTime - stdx::chrono::hours{4 * 24}; - conn->remove(LockpingsType::ConfigNS, - BSON(LockpingsType::process() << NIN << pids - << LockpingsType::ping() << LT << fourDays)); - err = conn->getLastError(); - - if (!err.empty()) { - warning() << "ping cleanup for distributed lock pinger '" << pingId - << " failed." << causedBy(err); - conn.done(); - - if (!shouldStopPinging(addr, process)) { - waitTillNextPingTime(sleepTime); - } - continue; + if (!shouldStopPinging(addr, process)) { + waitTillNextPingTime(sleepTime); } + continue; + } - LOG(1 - (loops % 10 == 0 ? 1 : 0)) << "cluster " << addr - << " pinged successfully at " << pingTime - << " by distributed lock pinger '" << pingId - << "', sleeping for " << sleepTime.count() << "ms"; - - // Remove old locks, if possible - // Make sure no one else is adding to this list at the same time - stdx::lock_guard<stdx::mutex> lk(_mutex); - - int numOldLocks = _unlockList.size(); - if (numOldLocks > 0) { - LOG(0) << "trying to delete " << _unlockList.size() - << " old lock entries for process " << process; + // Remove really old entries from the lockpings collection if they're not + // holding a lock. This may happen if an instance of a process was taken down + // and no new instance came up to replace it for a quite a while. + // NOTE this is NOT the same as the standard take-over mechanism, which forces + // the lock entry. + BSONObj fieldsToReturn = BSON(LocksType::state() << 1 << LocksType::process() << 1); + auto activeLocks = conn->query(LocksType::ConfigNS, + BSON(LocksType::state() << NE << LocksType::UNLOCKED)); + + uassert(16060, + str::stream() << "cannot query locks collection on config server " + << conn.getHost(), + activeLocks.get()); + + std::set<string> pids; + while (activeLocks->more()) { + BSONObj lock = activeLocks->nextSafe(); + + if (!lock[LocksType::process()].eoo()) { + pids.insert(lock[LocksType::process()].str()); + } else { + warning() << "found incorrect lock document during lock ping cleanup: " + << lock.toString(); } + } - bool removed = false; - for (auto iter = _unlockList.begin(); iter != _unlockList.end(); - iter = (removed ? _unlockList.erase(iter) : ++iter)) { - removed = false; - try { - // Got DistLockHandle from lock, so we don't need to specify _id again - conn->update(LocksType::ConfigNS, - BSON(LocksType::lockID(*iter)), - BSON("$set" << BSON( LocksType::state(LocksType::UNLOCKED)))); - - // Either the update went through or it didn't, - // either way we're done trying to unlock. - LOG(0) << "handled late remove of old distributed lock with ts " << *iter; - removed = true; - } - catch (UpdateNotTheSame&) { - LOG(0) << "partially removed old distributed lock with ts " << *iter; - removed = true; - } - catch (std::exception& e) { - warning() << "could not remove old distributed lock with ts " << *iter - << causedBy(e); - } + // This can potentially delete ping entries that are actually active (if the clock + // of another pinger is too skewed). This is still fine as the lock logic only + // checks if there is a change in the ping document and the document going away + // is a valid change. + Date_t fourDays = pingTime - stdx::chrono::hours{4 * 24}; + conn->remove(LockpingsType::ConfigNS, + BSON(LockpingsType::process() << NIN << pids << LockpingsType::ping() << LT + << fourDays)); + err = conn->getLastError(); + + if (!err.empty()) { + warning() << "ping cleanup for distributed lock pinger '" << pingId << " failed." + << causedBy(err); + conn.done(); + if (!shouldStopPinging(addr, process)) { + waitTillNextPingTime(sleepTime); } + continue; + } - if (numOldLocks > 0 && _unlockList.size() > 0) { - LOG(0) << "not all old lock entries could be removed for process " << process; - } + LOG(1 - (loops % 10 == 0 ? 1 : 0)) << "cluster " << addr << " pinged successfully at " + << pingTime << " by distributed lock pinger '" + << pingId << "', sleeping for " << sleepTime.count() + << "ms"; - conn.done(); + // Remove old locks, if possible + // Make sure no one else is adding to this list at the same time + stdx::lock_guard<stdx::mutex> lk(_mutex); + int numOldLocks = _unlockList.size(); + if (numOldLocks > 0) { + LOG(0) << "trying to delete " << _unlockList.size() + << " old lock entries for process " << process; } - catch (std::exception& e) { - warning() << "distributed lock pinger '" << pingId - << "' detected an exception while pinging." << causedBy(e); + + bool removed = false; + for (auto iter = _unlockList.begin(); iter != _unlockList.end(); + iter = (removed ? _unlockList.erase(iter) : ++iter)) { + removed = false; + try { + // Got DistLockHandle from lock, so we don't need to specify _id again + conn->update(LocksType::ConfigNS, + BSON(LocksType::lockID(*iter)), + BSON("$set" << BSON(LocksType::state(LocksType::UNLOCKED)))); + + // Either the update went through or it didn't, + // either way we're done trying to unlock. + LOG(0) << "handled late remove of old distributed lock with ts " << *iter; + removed = true; + } catch (UpdateNotTheSame&) { + LOG(0) << "partially removed old distributed lock with ts " << *iter; + removed = true; + } catch (std::exception& e) { + warning() << "could not remove old distributed lock with ts " << *iter + << causedBy(e); + } } - if (!shouldStopPinging(addr, process)) { - waitTillNextPingTime(sleepTime); + if (numOldLocks > 0 && _unlockList.size() > 0) { + LOG(0) << "not all old lock entries could be removed for process " << process; } - } - warning() << "removing distributed lock ping thread '" << pingId << "'"; + conn.done(); - if (shouldStopPinging(addr, process)) { - acknowledgeStopPing(addr, process); + } catch (std::exception& e) { + warning() << "distributed lock pinger '" << pingId + << "' detected an exception while pinging." << causedBy(e); } - } - void LegacyDistLockPinger::distLockPingThread(ConnectionString addr, - long long clockSkew, - const std::string& processId, - stdx::chrono::milliseconds sleepTime) { - try { - jsTimeVirtualThreadSkew(clockSkew); - _distLockPingThread(addr, processId, sleepTime); - } - catch (std::exception& e) { - error() << "unexpected error while running distributed lock pinger for " << addr - << ", process " << processId << causedBy(e); - } - catch (...) { - error() << "unknown error while running distributed lock pinger for " << addr - << ", process " << processId; + if (!shouldStopPinging(addr, process)) { + waitTillNextPingTime(sleepTime); } } - Status LegacyDistLockPinger::startPing(const DistributedLock& lock, - stdx::chrono::milliseconds sleepTime) { - const ConnectionString& conn = lock.getRemoteConnection(); - const string& processId = lock.getProcessId(); - string pingID = pingThreadId(conn, processId); + warning() << "removing distributed lock ping thread '" << pingId << "'"; - { - // Make sure we don't start multiple threads for a process id. - stdx::lock_guard<stdx::mutex> lk(_mutex); + if (shouldStopPinging(addr, process)) { + acknowledgeStopPing(addr, process); + } +} - if (_inShutdown) { - return Status(ErrorCodes::ShutdownInProgress, - "shutting down, will not start ping"); - } +void LegacyDistLockPinger::distLockPingThread(ConnectionString addr, + long long clockSkew, + const std::string& processId, + stdx::chrono::milliseconds sleepTime) { + try { + jsTimeVirtualThreadSkew(clockSkew); + _distLockPingThread(addr, processId, sleepTime); + } catch (std::exception& e) { + error() << "unexpected error while running distributed lock pinger for " << addr + << ", process " << processId << causedBy(e); + } catch (...) { + error() << "unknown error while running distributed lock pinger for " << addr + << ", process " << processId; + } +} - // Ignore if we already have a pinging thread for this process. - if (_seen.count(pingID) > 0) { - return Status::OK(); - } +Status LegacyDistLockPinger::startPing(const DistributedLock& lock, + stdx::chrono::milliseconds sleepTime) { + const ConnectionString& conn = lock.getRemoteConnection(); + const string& processId = lock.getProcessId(); + string pingID = pingThreadId(conn, processId); - // Check the config server clock skew. - if (lock.isRemoteTimeSkewed()) { - return Status(ErrorCodes::DistributedClockSkewed, - str::stream() << "clock skew of the cluster " << conn.toString() - << " is too far out of bounds " - << "to allow distributed locking."); - } + { + // Make sure we don't start multiple threads for a process id. + stdx::lock_guard<stdx::mutex> lk(_mutex); + + if (_inShutdown) { + return Status(ErrorCodes::ShutdownInProgress, "shutting down, will not start ping"); } - { - stdx::lock_guard<stdx::mutex> lk(_mutex); - stdx::thread thread(stdx::bind(&LegacyDistLockPinger::distLockPingThread, - this, - conn, - getJSTimeVirtualThreadSkew(), - processId, - sleepTime)); - _pingThreads.insert(std::make_pair(pingID, std::move(thread))); - - _seen.insert(pingID); + // Ignore if we already have a pinging thread for this process. + if (_seen.count(pingID) > 0) { + return Status::OK(); } - return Status::OK(); + // Check the config server clock skew. + if (lock.isRemoteTimeSkewed()) { + return Status(ErrorCodes::DistributedClockSkewed, + str::stream() << "clock skew of the cluster " << conn.toString() + << " is too far out of bounds " + << "to allow distributed locking."); + } } - void LegacyDistLockPinger::addUnlockOID(const DistLockHandle& lockID) { - // Modifying the lock from some other thread + { stdx::lock_guard<stdx::mutex> lk(_mutex); - _unlockList.push_back(lockID); + stdx::thread thread(stdx::bind(&LegacyDistLockPinger::distLockPingThread, + this, + conn, + getJSTimeVirtualThreadSkew(), + processId, + sleepTime)); + _pingThreads.insert(std::make_pair(pingID, std::move(thread))); + + _seen.insert(pingID); } - bool LegacyDistLockPinger::willUnlockOID(const DistLockHandle& lockID) { - stdx::lock_guard<stdx::mutex> lk(_mutex); - return find(_unlockList.begin(), _unlockList.end(), lockID) != _unlockList.end(); - } + return Status::OK(); +} - void LegacyDistLockPinger::stopPing(const ConnectionString& conn, const string& processId) { - { - stdx::lock_guard<stdx::mutex> lk(_mutex); +void LegacyDistLockPinger::addUnlockOID(const DistLockHandle& lockID) { + // Modifying the lock from some other thread + stdx::lock_guard<stdx::mutex> lk(_mutex); + _unlockList.push_back(lockID); +} - string pingId = pingThreadId(conn, processId); +bool LegacyDistLockPinger::willUnlockOID(const DistLockHandle& lockID) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + return find(_unlockList.begin(), _unlockList.end(), lockID) != _unlockList.end(); +} - verify(_seen.count(pingId) > 0); - _kill.insert(pingId); - _pingStoppedCV.notify_all(); - } - } +void LegacyDistLockPinger::stopPing(const ConnectionString& conn, const string& processId) { + { + stdx::lock_guard<stdx::mutex> lk(_mutex); - void LegacyDistLockPinger::shutdown() { - { - stdx::lock_guard<stdx::mutex> lk(_mutex); - _inShutdown = true; - _pingStoppedCV.notify_all(); - } + string pingId = pingThreadId(conn, processId); - // Don't grab _mutex, otherwise will deadlock trying to join. Safe to read - // _pingThreads since it cannot be modified once _shutdown is true. - for (auto& idToThread : _pingThreads) { - if (idToThread.second.joinable()) { - idToThread.second.join(); - } - } + verify(_seen.count(pingId) > 0); + _kill.insert(pingId); + _pingStoppedCV.notify_all(); } +} - bool LegacyDistLockPinger::shouldStopPinging(const ConnectionString& conn, - const string& processId) { - if (inShutdown()) { - return true; - } - +void LegacyDistLockPinger::shutdown() { + { stdx::lock_guard<stdx::mutex> lk(_mutex); + _inShutdown = true; + _pingStoppedCV.notify_all(); + } - if (_inShutdown) { - return true; + // Don't grab _mutex, otherwise will deadlock trying to join. Safe to read + // _pingThreads since it cannot be modified once _shutdown is true. + for (auto& idToThread : _pingThreads) { + if (idToThread.second.joinable()) { + idToThread.second.join(); } + } +} - return _kill.count(pingThreadId(conn, processId)) > 0; +bool LegacyDistLockPinger::shouldStopPinging(const ConnectionString& conn, + const string& processId) { + if (inShutdown()) { + return true; } - void LegacyDistLockPinger::acknowledgeStopPing(const ConnectionString& addr, - const string& processId) { - { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<stdx::mutex> lk(_mutex); - string pingId = pingThreadId(addr, processId); + if (_inShutdown) { + return true; + } - _kill.erase(pingId); - _seen.erase(pingId); - } + return _kill.count(pingThreadId(conn, processId)) > 0; +} - try { - ScopedDbConnection conn(addr.toString(), 30.0); - conn->remove(LockpingsType::ConfigNS, BSON(LockpingsType::process(processId))); - } - catch (const DBException& ex) { - warning() << "Error encountered while stopping ping on " << processId - << causedBy(ex); - } +void LegacyDistLockPinger::acknowledgeStopPing(const ConnectionString& addr, + const string& processId) { + { + stdx::lock_guard<stdx::mutex> lk(_mutex); + + string pingId = pingThreadId(addr, processId); + + _kill.erase(pingId); + _seen.erase(pingId); } - void LegacyDistLockPinger::waitTillNextPingTime(stdx::chrono::milliseconds duration) { - stdx::unique_lock<stdx::mutex> lk(_mutex); - _pingStoppedCV.wait_for(lk, duration); + try { + ScopedDbConnection conn(addr.toString(), 30.0); + conn->remove(LockpingsType::ConfigNS, BSON(LockpingsType::process(processId))); + } catch (const DBException& ex) { + warning() << "Error encountered while stopping ping on " << processId << causedBy(ex); } } + +void LegacyDistLockPinger::waitTillNextPingTime(stdx::chrono::milliseconds duration) { + stdx::unique_lock<stdx::mutex> lk(_mutex); + _pingStoppedCV.wait_for(lk, duration); +} +} diff --git a/src/mongo/s/catalog/legacy/legacy_dist_lock_pinger.h b/src/mongo/s/catalog/legacy/legacy_dist_lock_pinger.h index 777cb39314b..41874a59d13 100644 --- a/src/mongo/s/catalog/legacy/legacy_dist_lock_pinger.h +++ b/src/mongo/s/catalog/legacy/legacy_dist_lock_pinger.h @@ -43,98 +43,97 @@ namespace mongo { - class DistributedLock; - - class LegacyDistLockPinger { - public: - LegacyDistLockPinger() = default; - - /** - * Starts pinging the process id for the given lock. - */ - Status startPing(const DistributedLock& lock, Milliseconds sleepTime); - - /** - * Adds a distributed lock that has the given id to the unlock list. The unlock list - * contains the list of locks that this pinger will repeatedly attempt to unlock until - * it succeeds. - */ - void addUnlockOID(const DistLockHandle& lockID); - - /** - * Returns true if the given lock id is currently in the unlock queue. - */ - bool willUnlockOID(const DistLockHandle& lockID); - - /** - * For testing only: non-blocking call to stop pinging the given process id. - */ - void stopPing(const ConnectionString& conn, const std::string& processId); - - /** - * Kills all ping threads and wait for them to cleanup. - */ - void shutdown(); - - private: - /** - * Helper method for calling _distLockPingThread. - */ - void distLockPingThread(ConnectionString addr, - long long clockSkew, - const std::string& processId, - Milliseconds sleepTime); - - /** - * Function for repeatedly pinging the process id. Also attempts to unlock all the - * locks in the unlock list. - */ - void _distLockPingThread(ConnectionString addr, - const std::string& process, - Milliseconds sleepTime); - - /** - * Returns true if a request has been made to stop pinging the give process id. - */ - bool shouldStopPinging(const ConnectionString& conn, const std::string& processId); - - /** - * Acknowledge the stop ping request and performs the necessary cleanup. - */ - void acknowledgeStopPing(const ConnectionString& conn, const std::string& processId); - - /** - * Blocks until duration has elapsed or if the ping thread is interrupted. - */ - void waitTillNextPingTime(Milliseconds duration); - - // - // All member variables are labeled with one of the following codes indicating the - // synchronization rules for accessing them. - // - // (M) Must hold _mutex for access. - - stdx::mutex _mutex; - - // Triggered everytime a pinger thread is stopped. - stdx::condition_variable _pingStoppedCV; // (M) - - // pingID -> thread - // This can contain multiple elements in tests, but in tne normal case, this will - // contain only a single element. - // Note: can be safely read when _inShutdown is true. - std::map<std::string, stdx::thread> _pingThreads; // (M*) - - // Contains the list of process id to stopPing. - std::set<std::string> _kill; // (M) - - // Contains all of the process id to ping. - std::set<std::string> _seen; // (M) - - // Contains all lock ids to keeping on retrying to unlock until success. - std::list<DistLockHandle> _unlockList; // (M) - - bool _inShutdown = false; // (M) - }; - +class DistributedLock; + +class LegacyDistLockPinger { +public: + LegacyDistLockPinger() = default; + + /** + * Starts pinging the process id for the given lock. + */ + Status startPing(const DistributedLock& lock, Milliseconds sleepTime); + + /** + * Adds a distributed lock that has the given id to the unlock list. The unlock list + * contains the list of locks that this pinger will repeatedly attempt to unlock until + * it succeeds. + */ + void addUnlockOID(const DistLockHandle& lockID); + + /** + * Returns true if the given lock id is currently in the unlock queue. + */ + bool willUnlockOID(const DistLockHandle& lockID); + + /** + * For testing only: non-blocking call to stop pinging the given process id. + */ + void stopPing(const ConnectionString& conn, const std::string& processId); + + /** + * Kills all ping threads and wait for them to cleanup. + */ + void shutdown(); + +private: + /** + * Helper method for calling _distLockPingThread. + */ + void distLockPingThread(ConnectionString addr, + long long clockSkew, + const std::string& processId, + Milliseconds sleepTime); + + /** + * Function for repeatedly pinging the process id. Also attempts to unlock all the + * locks in the unlock list. + */ + void _distLockPingThread(ConnectionString addr, + const std::string& process, + Milliseconds sleepTime); + + /** + * Returns true if a request has been made to stop pinging the give process id. + */ + bool shouldStopPinging(const ConnectionString& conn, const std::string& processId); + + /** + * Acknowledge the stop ping request and performs the necessary cleanup. + */ + void acknowledgeStopPing(const ConnectionString& conn, const std::string& processId); + + /** + * Blocks until duration has elapsed or if the ping thread is interrupted. + */ + void waitTillNextPingTime(Milliseconds duration); + + // + // All member variables are labeled with one of the following codes indicating the + // synchronization rules for accessing them. + // + // (M) Must hold _mutex for access. + + stdx::mutex _mutex; + + // Triggered everytime a pinger thread is stopped. + stdx::condition_variable _pingStoppedCV; // (M) + + // pingID -> thread + // This can contain multiple elements in tests, but in tne normal case, this will + // contain only a single element. + // Note: can be safely read when _inShutdown is true. + std::map<std::string, stdx::thread> _pingThreads; // (M*) + + // Contains the list of process id to stopPing. + std::set<std::string> _kill; // (M) + + // Contains all of the process id to ping. + std::set<std::string> _seen; // (M) + + // Contains all lock ids to keeping on retrying to unlock until success. + std::list<DistLockHandle> _unlockList; // (M) + + bool _inShutdown = false; // (M) +}; } |