summaryrefslogtreecommitdiff
path: root/src/mongo/s/catalog/legacy
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/s/catalog/legacy')
-rw-r--r--src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp2507
-rw-r--r--src/mongo/s/catalog/legacy/catalog_manager_legacy.h245
-rw-r--r--src/mongo/s/catalog/legacy/cluster_client_internal.cpp293
-rw-r--r--src/mongo/s/catalog/legacy/cluster_client_internal.h43
-rw-r--r--src/mongo/s/catalog/legacy/config_coordinator.cpp637
-rw-r--r--src/mongo/s/catalog/legacy/config_coordinator.h33
-rw-r--r--src/mongo/s/catalog/legacy/config_upgrade.cpp789
-rw-r--r--src/mongo/s/catalog/legacy/config_upgrade.h200
-rw-r--r--src/mongo/s/catalog/legacy/config_upgrade_helpers.cpp127
-rw-r--r--src/mongo/s/catalog/legacy/config_upgrade_helpers.h52
-rw-r--r--src/mongo/s/catalog/legacy/config_upgrade_v0_to_v7.cpp104
-rw-r--r--src/mongo/s/catalog/legacy/config_upgrade_v6_to_v7.cpp131
-rw-r--r--src/mongo/s/catalog/legacy/distlock.cpp1262
-rw-r--r--src/mongo/s/catalog/legacy/distlock.h353
-rw-r--r--src/mongo/s/catalog/legacy/legacy_dist_lock_manager.cpp261
-rw-r--r--src/mongo/s/catalog/legacy/legacy_dist_lock_manager.h57
-rw-r--r--src/mongo/s/catalog/legacy/legacy_dist_lock_pinger.cpp482
-rw-r--r--src/mongo/s/catalog/legacy/legacy_dist_lock_pinger.h187
18 files changed, 3775 insertions, 3988 deletions
diff --git a/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp b/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp
index ab274c579fb..c25618e31e8 100644
--- a/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp
+++ b/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp
@@ -80,1657 +80,1570 @@
namespace mongo {
- using std::map;
- using std::pair;
- using std::set;
- using std::string;
- using std::vector;
- using str::stream;
+using std::map;
+using std::pair;
+using std::set;
+using std::string;
+using std::vector;
+using str::stream;
namespace {
- bool validConfigWC(const BSONObj& writeConcern) {
- BSONElement elem(writeConcern["w"]);
- if (elem.eoo()) {
- return true;
- }
+bool validConfigWC(const BSONObj& writeConcern) {
+ BSONElement elem(writeConcern["w"]);
+ if (elem.eoo()) {
+ return true;
+ }
- if (elem.isNumber() && elem.numberInt() <= 1) {
- return true;
- }
+ if (elem.isNumber() && elem.numberInt() <= 1) {
+ return true;
+ }
- if (elem.type() == String && elem.str() == "majority") {
- return true;
- }
+ if (elem.type() == String && elem.str() == "majority") {
+ return true;
+ }
- return false;
+ return false;
+}
+
+void toBatchError(const Status& status, BatchedCommandResponse* response) {
+ response->clear();
+ response->setErrCode(status.code());
+ response->setErrMessage(status.reason());
+ response->setOk(false);
+
+ dassert(response->isValid(NULL));
+}
+
+StatusWith<string> isValidShard(const string& name,
+ const ConnectionString& shardConnectionString,
+ ScopedDbConnection& conn) {
+ if (conn->type() == ConnectionString::SYNC) {
+ return Status(ErrorCodes::BadValue,
+ "can't use sync cluster as a shard; for a replica set, "
+ "you have to use <setname>/<server1>,<server2>,...");
}
- void toBatchError(const Status& status, BatchedCommandResponse* response) {
- response->clear();
- response->setErrCode(status.code());
- response->setErrMessage(status.reason());
- response->setOk(false);
+ BSONObj resIsMongos;
+ // (ok == 0) implies that it is a mongos
+ if (conn->runCommand("admin", BSON("isdbgrid" << 1), resIsMongos)) {
+ return Status(ErrorCodes::BadValue, "can't add a mongos process as a shard");
+ }
- dassert(response->isValid(NULL));
+ BSONObj resIsMaster;
+ if (!conn->runCommand("admin", BSON("isMaster" << 1), resIsMaster)) {
+ return Status(ErrorCodes::OperationFailed,
+ str::stream() << "failed running isMaster: " << resIsMaster);
}
- StatusWith<string> isValidShard(const string& name,
- const ConnectionString& shardConnectionString,
- ScopedDbConnection& conn) {
- if (conn->type() == ConnectionString::SYNC) {
- return Status(ErrorCodes::BadValue,
- "can't use sync cluster as a shard; for a replica set, "
- "you have to use <setname>/<server1>,<server2>,...");
- }
+ // if the shard has only one host, make sure it is not part of a replica set
+ string setName = resIsMaster["setName"].str();
+ string commandSetName = shardConnectionString.getSetName();
+ if (commandSetName.empty() && !setName.empty()) {
+ return Status(ErrorCodes::BadValue,
+ str::stream() << "host is part of set " << setName << "; "
+ << "use replica set url format "
+ << "<setname>/<server1>,<server2>, ...");
+ }
- BSONObj resIsMongos;
- // (ok == 0) implies that it is a mongos
- if (conn->runCommand("admin", BSON("isdbgrid" << 1), resIsMongos)) {
- return Status(ErrorCodes::BadValue,
- "can't add a mongos process as a shard");
- }
+ if (!commandSetName.empty() && setName.empty()) {
+ return Status(ErrorCodes::OperationFailed,
+ str::stream() << "host did not return a set name; "
+ << "is the replica set still initializing? " << resIsMaster);
+ }
- BSONObj resIsMaster;
- if (!conn->runCommand("admin", BSON("isMaster" << 1), resIsMaster)) {
- return Status(ErrorCodes::OperationFailed,
- str::stream() << "failed running isMaster: " << resIsMaster);
- }
+ // if the shard is part of replica set, make sure it is the right one
+ if (!commandSetName.empty() && (commandSetName != setName)) {
+ return Status(ErrorCodes::OperationFailed,
+ str::stream() << "host is part of a different set: " << setName);
+ }
- // if the shard has only one host, make sure it is not part of a replica set
- string setName = resIsMaster["setName"].str();
- string commandSetName = shardConnectionString.getSetName();
- if (commandSetName.empty() && !setName.empty()) {
+ if (setName.empty()) {
+ // check this isn't a --configsvr
+ BSONObj res;
+ bool ok = conn->runCommand("admin", BSON("replSetGetStatus" << 1), res);
+ if (!ok && res["info"].type() == String && res["info"].String() == "configsvr") {
return Status(ErrorCodes::BadValue,
- str::stream() << "host is part of set " << setName << "; "
- << "use replica set url format "
- << "<setname>/<server1>,<server2>, ...");
- }
-
- if (!commandSetName.empty() && setName.empty()) {
- return Status(ErrorCodes::OperationFailed,
- str::stream() << "host did not return a set name; "
- << "is the replica set still initializing? "
- << resIsMaster);
+ "the specified mongod is a --configsvr and "
+ "should thus not be a shard server");
}
+ }
- // if the shard is part of replica set, make sure it is the right one
- if (!commandSetName.empty() && (commandSetName != setName)) {
- return Status(ErrorCodes::OperationFailed,
- str::stream() << "host is part of a different set: " << setName);
+ // if the shard is part of a replica set,
+ // make sure all the hosts mentioned in 'shardConnectionString' are part of
+ // the set. It is fine if not all members of the set are present in 'shardConnectionString'.
+ bool foundAll = true;
+ string offendingHost;
+ if (!commandSetName.empty()) {
+ set<string> hostSet;
+ BSONObjIterator iter(resIsMaster["hosts"].Obj());
+ while (iter.more()) {
+ hostSet.insert(iter.next().String()); // host:port
+ }
+ if (resIsMaster["passives"].isABSONObj()) {
+ BSONObjIterator piter(resIsMaster["passives"].Obj());
+ while (piter.more()) {
+ hostSet.insert(piter.next().String()); // host:port
+ }
}
-
- if (setName.empty()) {
- // check this isn't a --configsvr
- BSONObj res;
- bool ok = conn->runCommand("admin",
- BSON("replSetGetStatus" << 1),
- res);
- if(!ok && res["info"].type() == String && res["info"].String() == "configsvr") {
- return Status(ErrorCodes::BadValue,
- "the specified mongod is a --configsvr and "
- "should thus not be a shard server");
+ if (resIsMaster["arbiters"].isABSONObj()) {
+ BSONObjIterator piter(resIsMaster["arbiters"].Obj());
+ while (piter.more()) {
+ hostSet.insert(piter.next().String()); // host:port
}
}
- // if the shard is part of a replica set,
- // make sure all the hosts mentioned in 'shardConnectionString' are part of
- // the set. It is fine if not all members of the set are present in 'shardConnectionString'.
- bool foundAll = true;
- string offendingHost;
- if (!commandSetName.empty()) {
- set<string> hostSet;
- BSONObjIterator iter(resIsMaster["hosts"].Obj());
- while (iter.more()) {
- hostSet.insert(iter.next().String()); // host:port
- }
- if (resIsMaster["passives"].isABSONObj()) {
- BSONObjIterator piter(resIsMaster["passives"].Obj());
- while (piter.more()) {
- hostSet.insert(piter.next().String()); // host:port
- }
- }
- if (resIsMaster["arbiters"].isABSONObj()) {
- BSONObjIterator piter(resIsMaster["arbiters"].Obj());
- while (piter.more()) {
- hostSet.insert(piter.next().String()); // host:port
- }
+ vector<HostAndPort> hosts = shardConnectionString.getServers();
+ for (size_t i = 0; i < hosts.size(); i++) {
+ if (!hosts[i].hasPort()) {
+ hosts[i] = HostAndPort(hosts[i].host(), hosts[i].port());
}
-
- vector<HostAndPort> hosts = shardConnectionString.getServers();
- for (size_t i = 0; i < hosts.size(); i++) {
- if (!hosts[i].hasPort()) {
- hosts[i] = HostAndPort(hosts[i].host(), hosts[i].port());
- }
- string host = hosts[i].toString(); // host:port
- if (hostSet.find(host) == hostSet.end()) {
- offendingHost = host;
- foundAll = false;
- break;
- }
+ string host = hosts[i].toString(); // host:port
+ if (hostSet.find(host) == hostSet.end()) {
+ offendingHost = host;
+ foundAll = false;
+ break;
}
}
- if (!foundAll) {
- return Status(ErrorCodes::OperationFailed,
- str::stream() << "in seed list " << shardConnectionString.toString()
- << ", host " << offendingHost
- << " does not belong to replica set " << setName);
- }
-
- string shardName(name);
- // shard name defaults to the name of the replica set
- if (name.empty() && !setName.empty()) {
- shardName = setName;
- }
-
- // disallow adding shard replica set with name 'config'
- if (shardName == "config") {
- return Status(ErrorCodes::BadValue,
- "use of shard replica set with name 'config' is not allowed");
- }
+ }
+ if (!foundAll) {
+ return Status(ErrorCodes::OperationFailed,
+ str::stream() << "in seed list " << shardConnectionString.toString()
+ << ", host " << offendingHost
+ << " does not belong to replica set " << setName);
+ }
- return shardName;
+ string shardName(name);
+ // shard name defaults to the name of the replica set
+ if (name.empty() && !setName.empty()) {
+ shardName = setName;
}
- // In order to be accepted as a new shard, that mongod must not have
- // any database name that exists already in any other shards.
- // If that test passes, the new shard's databases are going to be entered as
- // non-sharded db's whose primary is the newly added shard.
- StatusWith<vector<string>> getDBNames(const ConnectionString& shardConnectionString,
- ScopedDbConnection& conn) {
- vector<string> dbNames;
+ // disallow adding shard replica set with name 'config'
+ if (shardName == "config") {
+ return Status(ErrorCodes::BadValue,
+ "use of shard replica set with name 'config' is not allowed");
+ }
- BSONObj resListDB;
- if (!conn->runCommand("admin", BSON("listDatabases" << 1), resListDB)) {
- return Status(ErrorCodes::OperationFailed,
- str::stream() << "failed listing "
- << shardConnectionString.toString()
- << "'s databases:" << resListDB);
- }
+ return shardName;
+}
- BSONObjIterator i(resListDB["databases"].Obj());
- while (i.more()) {
- BSONObj dbEntry = i.next().Obj();
- const string& dbName = dbEntry["name"].String();
- if (!(dbName == "local" || dbName == "admin" || dbName == "config")) {
- dbNames.push_back(dbName);
- }
- }
+// In order to be accepted as a new shard, that mongod must not have
+// any database name that exists already in any other shards.
+// If that test passes, the new shard's databases are going to be entered as
+// non-sharded db's whose primary is the newly added shard.
+StatusWith<vector<string>> getDBNames(const ConnectionString& shardConnectionString,
+ ScopedDbConnection& conn) {
+ vector<string> dbNames;
- return dbNames;
+ BSONObj resListDB;
+ if (!conn->runCommand("admin", BSON("listDatabases" << 1), resListDB)) {
+ return Status(ErrorCodes::OperationFailed,
+ str::stream() << "failed listing " << shardConnectionString.toString()
+ << "'s databases:" << resListDB);
}
- BSONObj buildRemoveLogEntry(const string& shardName, bool isDraining) {
- BSONObjBuilder details;
- details.append("shard", shardName);
- details.append("isDraining", isDraining);
-
- return details.obj();
+ BSONObjIterator i(resListDB["databases"].Obj());
+ while (i.more()) {
+ BSONObj dbEntry = i.next().Obj();
+ const string& dbName = dbEntry["name"].String();
+ if (!(dbName == "local" || dbName == "admin" || dbName == "config")) {
+ dbNames.push_back(dbName);
+ }
}
- // Whether the logChange call should attempt to create the changelog collection
- AtomicInt32 changeLogCollectionCreated(0);
+ return dbNames;
+}
- // Whether the logAction call should attempt to create the actionlog collection
- AtomicInt32 actionLogCollectionCreated(0);
+BSONObj buildRemoveLogEntry(const string& shardName, bool isDraining) {
+ BSONObjBuilder details;
+ details.append("shard", shardName);
+ details.append("isDraining", isDraining);
-} // namespace
+ return details.obj();
+}
+// Whether the logChange call should attempt to create the changelog collection
+AtomicInt32 changeLogCollectionCreated(0);
- CatalogManagerLegacy::CatalogManagerLegacy() = default;
+// Whether the logAction call should attempt to create the actionlog collection
+AtomicInt32 actionLogCollectionCreated(0);
- CatalogManagerLegacy::~CatalogManagerLegacy() = default;
+} // namespace
- Status CatalogManagerLegacy::init(const ConnectionString& configDBCS) {
- // Initialization should not happen more than once
- invariant(!_configServerConnectionString.isValid());
- invariant(_configServers.empty());
- invariant(configDBCS.isValid());
-
- // Extract the hosts in HOST:PORT format
- set<HostAndPort> configHostsAndPortsSet;
- set<string> configHostsOnly;
- std::vector<HostAndPort> configHostAndPorts = configDBCS.getServers();
- for (size_t i = 0; i < configHostAndPorts.size(); i++) {
- // Append the default port, if not specified
- HostAndPort configHost = configHostAndPorts[i];
- if (!configHost.hasPort()) {
- configHost = HostAndPort(configHost.host(), ServerGlobalParams::ConfigServerPort);
- }
-
- // Make sure there are no duplicates
- if (!configHostsAndPortsSet.insert(configHost).second) {
- StringBuilder sb;
- sb << "Host " << configHost.toString()
- << " exists twice in the config servers listing.";
- return Status(ErrorCodes::InvalidOptions, sb.str());
- }
+CatalogManagerLegacy::CatalogManagerLegacy() = default;
- configHostsOnly.insert(configHost.host());
- }
+CatalogManagerLegacy::~CatalogManagerLegacy() = default;
- // Make sure the hosts are reachable
- for (set<string>::const_iterator i = configHostsOnly.begin();
- i != configHostsOnly.end();
- i++) {
+Status CatalogManagerLegacy::init(const ConnectionString& configDBCS) {
+ // Initialization should not happen more than once
+ invariant(!_configServerConnectionString.isValid());
+ invariant(_configServers.empty());
+ invariant(configDBCS.isValid());
- const string host = *i;
+ // Extract the hosts in HOST:PORT format
+ set<HostAndPort> configHostsAndPortsSet;
+ set<string> configHostsOnly;
+ std::vector<HostAndPort> configHostAndPorts = configDBCS.getServers();
+ for (size_t i = 0; i < configHostAndPorts.size(); i++) {
+ // Append the default port, if not specified
+ HostAndPort configHost = configHostAndPorts[i];
+ if (!configHost.hasPort()) {
+ configHost = HostAndPort(configHost.host(), ServerGlobalParams::ConfigServerPort);
+ }
- // If this is a CUSTOM connection string (for testing) don't do DNS resolution
- string errMsg;
- if (ConnectionString::parse(host, errMsg).type() == ConnectionString::CUSTOM) {
- continue;
- }
+ // Make sure there are no duplicates
+ if (!configHostsAndPortsSet.insert(configHost).second) {
+ StringBuilder sb;
+ sb << "Host " << configHost.toString()
+ << " exists twice in the config servers listing.";
- bool ok = false;
+ return Status(ErrorCodes::InvalidOptions, sb.str());
+ }
- for (int x = 10; x > 0; x--) {
- if (!hostbyname(host.c_str()).empty()) {
- ok = true;
- break;
- }
+ configHostsOnly.insert(configHost.host());
+ }
- log() << "can't resolve DNS for [" << host << "] sleeping and trying "
- << x << " more times";
- sleepsecs(10);
- }
+ // Make sure the hosts are reachable
+ for (set<string>::const_iterator i = configHostsOnly.begin(); i != configHostsOnly.end(); i++) {
+ const string host = *i;
- if (!ok) {
- return Status(ErrorCodes::HostNotFound,
- stream() << "unable to resolve DNS for host " << host);
- }
+ // If this is a CUSTOM connection string (for testing) don't do DNS resolution
+ string errMsg;
+ if (ConnectionString::parse(host, errMsg).type() == ConnectionString::CUSTOM) {
+ continue;
}
- LOG(1) << " config string : " << configDBCS.toString();
-
- // Now that the config hosts are verified, initialize the catalog manager. The code below
- // should never fail.
+ bool ok = false;
- _configServerConnectionString = configDBCS;
-
- if (_configServerConnectionString.type() == ConnectionString::MASTER) {
- _configServers.push_back(_configServerConnectionString);
- }
- else if (_configServerConnectionString.type() == ConnectionString::SYNC ||
- (_configServerConnectionString.type() == ConnectionString::SET &&
- _configServerConnectionString.getServers().size() == 1)) {
- // TODO(spencer): Remove second part of the above or statement that allows replset
- // config server strings once we've separated the legacy catalog manager from the
- // CSRS version.
- const vector<HostAndPort> configHPs = _configServerConnectionString.getServers();
- for (vector<HostAndPort>::const_iterator it = configHPs.begin();
- it != configHPs.end();
- ++it) {
-
- _configServers.push_back(ConnectionString(*it));
+ for (int x = 10; x > 0; x--) {
+ if (!hostbyname(host.c_str()).empty()) {
+ ok = true;
+ break;
}
- }
- else {
- // This is only for tests.
- invariant(_configServerConnectionString.type() == ConnectionString::CUSTOM);
- _configServers.push_back(_configServerConnectionString);
- }
- _distLockManager = stdx::make_unique<LegacyDistLockManager>(_configServerConnectionString);
- _distLockManager->startUp();
+ log() << "can't resolve DNS for [" << host << "] sleeping and trying " << x
+ << " more times";
+ sleepsecs(10);
+ }
- {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- _inShutdown = false;
- _consistentFromLastCheck = true;
+ if (!ok) {
+ return Status(ErrorCodes::HostNotFound,
+ stream() << "unable to resolve DNS for host " << host);
}
+ }
- return Status::OK();
+ LOG(1) << " config string : " << configDBCS.toString();
+
+ // Now that the config hosts are verified, initialize the catalog manager. The code below
+ // should never fail.
+
+ _configServerConnectionString = configDBCS;
+
+ if (_configServerConnectionString.type() == ConnectionString::MASTER) {
+ _configServers.push_back(_configServerConnectionString);
+ } else if (_configServerConnectionString.type() == ConnectionString::SYNC ||
+ (_configServerConnectionString.type() == ConnectionString::SET &&
+ _configServerConnectionString.getServers().size() == 1)) {
+ // TODO(spencer): Remove second part of the above or statement that allows replset
+ // config server strings once we've separated the legacy catalog manager from the
+ // CSRS version.
+ const vector<HostAndPort> configHPs = _configServerConnectionString.getServers();
+ for (vector<HostAndPort>::const_iterator it = configHPs.begin(); it != configHPs.end();
+ ++it) {
+ _configServers.push_back(ConnectionString(*it));
+ }
+ } else {
+ // This is only for tests.
+ invariant(_configServerConnectionString.type() == ConnectionString::CUSTOM);
+ _configServers.push_back(_configServerConnectionString);
}
- Status CatalogManagerLegacy::startup(bool upgrade) {
- Status status = _startConfigServerChecker();
- if (!status.isOK()) {
- return status;
- }
+ _distLockManager = stdx::make_unique<LegacyDistLockManager>(_configServerConnectionString);
+ _distLockManager->startUp();
- status = _checkAndUpgradeConfigMetadata(upgrade);
- return status;
+ {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _inShutdown = false;
+ _consistentFromLastCheck = true;
}
- Status CatalogManagerLegacy::_checkAndUpgradeConfigMetadata(bool doUpgrade) {
- VersionType initVersionInfo;
- VersionType versionInfo;
- string errMsg;
+ return Status::OK();
+}
- bool upgraded = checkAndUpgradeConfigVersion(this,
- doUpgrade,
- &initVersionInfo,
- &versionInfo,
- &errMsg);
- if (!upgraded) {
- return Status(ErrorCodes::IncompatibleShardingMetadata,
- str::stream() << "error upgrading config database to v"
- << CURRENT_CONFIG_VERSION << causedBy(errMsg));
- }
-
- return Status::OK();
+Status CatalogManagerLegacy::startup(bool upgrade) {
+ Status status = _startConfigServerChecker();
+ if (!status.isOK()) {
+ return status;
}
- Status CatalogManagerLegacy::_startConfigServerChecker() {
- if (!_checkConfigServersConsistent()) {
- return Status(ErrorCodes::ConfigServersInconsistent,
- "Data inconsistency detected amongst config servers");
- }
+ status = _checkAndUpgradeConfigMetadata(upgrade);
+ return status;
+}
+
+Status CatalogManagerLegacy::_checkAndUpgradeConfigMetadata(bool doUpgrade) {
+ VersionType initVersionInfo;
+ VersionType versionInfo;
+ string errMsg;
+
+ bool upgraded =
+ checkAndUpgradeConfigVersion(this, doUpgrade, &initVersionInfo, &versionInfo, &errMsg);
+ if (!upgraded) {
+ return Status(ErrorCodes::IncompatibleShardingMetadata,
+ str::stream() << "error upgrading config database to v"
+ << CURRENT_CONFIG_VERSION << causedBy(errMsg));
+ }
- stdx::thread t(stdx::bind(&CatalogManagerLegacy::_consistencyChecker, this));
- _consistencyCheckerThread.swap(t);
+ return Status::OK();
+}
- return Status::OK();
+Status CatalogManagerLegacy::_startConfigServerChecker() {
+ if (!_checkConfigServersConsistent()) {
+ return Status(ErrorCodes::ConfigServersInconsistent,
+ "Data inconsistency detected amongst config servers");
}
- ConnectionString CatalogManagerLegacy::connectionString() const {
- return _configServerConnectionString;
- }
+ stdx::thread t(stdx::bind(&CatalogManagerLegacy::_consistencyChecker, this));
+ _consistencyCheckerThread.swap(t);
- void CatalogManagerLegacy::shutDown() {
- LOG(1) << "CatalogManagerLegacy::shutDown() called.";
- {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- _inShutdown = true;
- _consistencyCheckerCV.notify_one();
- }
- _consistencyCheckerThread.join();
+ return Status::OK();
+}
+
+ConnectionString CatalogManagerLegacy::connectionString() const {
+ return _configServerConnectionString;
+}
- invariant(_distLockManager);
- _distLockManager->shutDown();
+void CatalogManagerLegacy::shutDown() {
+ LOG(1) << "CatalogManagerLegacy::shutDown() called.";
+ {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _inShutdown = true;
+ _consistencyCheckerCV.notify_one();
}
+ _consistencyCheckerThread.join();
- Status CatalogManagerLegacy::enableSharding(const std::string& dbName) {
- invariant(nsIsDbOnly(dbName));
+ invariant(_distLockManager);
+ _distLockManager->shutDown();
+}
- DatabaseType db;
+Status CatalogManagerLegacy::enableSharding(const std::string& dbName) {
+ invariant(nsIsDbOnly(dbName));
- // Check for case sensitivity violations
- Status status = _checkDbDoesNotExist(dbName);
- if (status.isOK()) {
- // Database does not exist, create a new entry
- const ShardPtr primary = Shard::pick();
- if (primary) {
- log() << "Placing [" << dbName << "] on: " << primary->toString();
+ DatabaseType db;
- db.setName(dbName);
- db.setPrimary(primary->getId());
- db.setSharded(true);
- }
- else {
- return Status(ErrorCodes::ShardNotFound, "can't find a shard to put new db on");
- }
- }
- else if (status.code() == ErrorCodes::NamespaceExists) {
- // Database exists, so just update it
- StatusWith<DatabaseType> dbStatus = getDatabase(dbName);
- if (!dbStatus.isOK()) {
- return dbStatus.getStatus();
- }
+ // Check for case sensitivity violations
+ Status status = _checkDbDoesNotExist(dbName);
+ if (status.isOK()) {
+ // Database does not exist, create a new entry
+ const ShardPtr primary = Shard::pick();
+ if (primary) {
+ log() << "Placing [" << dbName << "] on: " << primary->toString();
- db = dbStatus.getValue();
+ db.setName(dbName);
+ db.setPrimary(primary->getId());
db.setSharded(true);
+ } else {
+ return Status(ErrorCodes::ShardNotFound, "can't find a shard to put new db on");
}
- else {
- // Some fatal error
- return status;
+ } else if (status.code() == ErrorCodes::NamespaceExists) {
+ // Database exists, so just update it
+ StatusWith<DatabaseType> dbStatus = getDatabase(dbName);
+ if (!dbStatus.isOK()) {
+ return dbStatus.getStatus();
}
- log() << "Enabling sharding for database [" << dbName << "] in config db";
-
- return updateDatabase(dbName, db);
+ db = dbStatus.getValue();
+ db.setSharded(true);
+ } else {
+ // Some fatal error
+ return status;
}
- Status CatalogManagerLegacy::shardCollection(const string& ns,
- const ShardKeyPattern& fieldsAndOrder,
- bool unique,
- vector<BSONObj>* initPoints,
- set<ShardId>* initShardIds) {
+ log() << "Enabling sharding for database [" << dbName << "] in config db";
- StatusWith<DatabaseType> status = getDatabase(nsToDatabase(ns));
- if (!status.isOK()) {
- return status.getStatus();
- }
+ return updateDatabase(dbName, db);
+}
- DatabaseType dbt = status.getValue();
- ShardId dbPrimaryShardId = dbt.getPrimary();
-
- // This is an extra safety check that the collection is not getting sharded concurrently by
- // two different mongos instances. It is not 100%-proof, but it reduces the chance that two
- // invocations of shard collection will step on each other's toes.
- {
- ScopedDbConnection conn(_configServerConnectionString, 30);
- unsigned long long existingChunks = conn->count(ChunkType::ConfigNS,
- BSON(ChunkType::ns(ns)));
- if (existingChunks > 0) {
- conn.done();
- return Status(ErrorCodes::AlreadyInitialized,
- str::stream() << "collection " << ns << " already sharded with "
- << existingChunks << " chunks.");
- }
+Status CatalogManagerLegacy::shardCollection(const string& ns,
+ const ShardKeyPattern& fieldsAndOrder,
+ bool unique,
+ vector<BSONObj>* initPoints,
+ set<ShardId>* initShardIds) {
+ StatusWith<DatabaseType> status = getDatabase(nsToDatabase(ns));
+ if (!status.isOK()) {
+ return status.getStatus();
+ }
+
+ DatabaseType dbt = status.getValue();
+ ShardId dbPrimaryShardId = dbt.getPrimary();
+ // This is an extra safety check that the collection is not getting sharded concurrently by
+ // two different mongos instances. It is not 100%-proof, but it reduces the chance that two
+ // invocations of shard collection will step on each other's toes.
+ {
+ ScopedDbConnection conn(_configServerConnectionString, 30);
+ unsigned long long existingChunks =
+ conn->count(ChunkType::ConfigNS, BSON(ChunkType::ns(ns)));
+ if (existingChunks > 0) {
conn.done();
+ return Status(ErrorCodes::AlreadyInitialized,
+ str::stream() << "collection " << ns << " already sharded with "
+ << existingChunks << " chunks.");
}
- log() << "enable sharding on: " << ns << " with shard key: " << fieldsAndOrder;
+ conn.done();
+ }
- // Record start in changelog
- BSONObjBuilder collectionDetail;
- collectionDetail.append("shardKey", fieldsAndOrder.toBSON());
- collectionDetail.append("collection", ns);
- string dbPrimaryShardStr;
- {
- const auto shard = grid.shardRegistry()->getShard(dbPrimaryShardId);
- dbPrimaryShardStr = shard->toString();
- }
- collectionDetail.append("primary", dbPrimaryShardStr);
-
- BSONArray initialShards;
- if (initShardIds == NULL)
- initialShards = BSONArray();
- else {
- BSONArrayBuilder b;
- for (const ShardId& shardId : *initShardIds) {
- b.append(shardId);
- }
- initialShards = b.arr();
+ log() << "enable sharding on: " << ns << " with shard key: " << fieldsAndOrder;
+
+ // Record start in changelog
+ BSONObjBuilder collectionDetail;
+ collectionDetail.append("shardKey", fieldsAndOrder.toBSON());
+ collectionDetail.append("collection", ns);
+ string dbPrimaryShardStr;
+ {
+ const auto shard = grid.shardRegistry()->getShard(dbPrimaryShardId);
+ dbPrimaryShardStr = shard->toString();
+ }
+ collectionDetail.append("primary", dbPrimaryShardStr);
+
+ BSONArray initialShards;
+ if (initShardIds == NULL)
+ initialShards = BSONArray();
+ else {
+ BSONArrayBuilder b;
+ for (const ShardId& shardId : *initShardIds) {
+ b.append(shardId);
}
+ initialShards = b.arr();
+ }
- collectionDetail.append("initShards", initialShards);
- collectionDetail.append("numChunks", static_cast<int>(initPoints->size() + 1));
-
- logChange(NULL, "shardCollection.start", ns, collectionDetail.obj());
-
- ChunkManagerPtr manager(new ChunkManager(ns, fieldsAndOrder, unique));
- manager->createFirstChunks(dbPrimaryShardId,
- initPoints,
- initShardIds);
- manager->loadExistingRanges(nullptr);
-
- CollectionInfo collInfo;
- collInfo.useChunkManager(manager);
- collInfo.save(ns);
- manager->reload(true);
-
- // Tell the primary mongod to refresh its data
- // TODO: Think the real fix here is for mongos to just
- // assume that all collections are sharded, when we get there
- for (int i = 0;i < 4;i++) {
- if (i == 3) {
- warning() << "too many tries updating initial version of " << ns
- << " on shard primary " << dbPrimaryShardStr
- << ", other mongoses may not see the collection as sharded immediately";
- break;
- }
+ collectionDetail.append("initShards", initialShards);
+ collectionDetail.append("numChunks", static_cast<int>(initPoints->size() + 1));
- try {
- const auto shard = grid.shardRegistry()->getShard(dbPrimaryShardId);
- ShardConnection conn(shard->getConnString(), ns);
- bool isVersionSet = conn.setVersion();
- conn.done();
- if (!isVersionSet) {
- warning() << "could not update initial version of "
- << ns << " on shard primary " << dbPrimaryShardStr;
- } else {
- break;
- }
- }
- catch (const DBException& e) {
- warning() << "could not update initial version of " << ns
- << " on shard primary " << dbPrimaryShardStr
- << causedBy(e);
- }
+ logChange(NULL, "shardCollection.start", ns, collectionDetail.obj());
- sleepsecs(i);
- }
+ ChunkManagerPtr manager(new ChunkManager(ns, fieldsAndOrder, unique));
+ manager->createFirstChunks(dbPrimaryShardId, initPoints, initShardIds);
+ manager->loadExistingRanges(nullptr);
- // Record finish in changelog
- BSONObjBuilder finishDetail;
+ CollectionInfo collInfo;
+ collInfo.useChunkManager(manager);
+ collInfo.save(ns);
+ manager->reload(true);
- finishDetail.append("version", manager->getVersion().toString());
+ // Tell the primary mongod to refresh its data
+ // TODO: Think the real fix here is for mongos to just
+ // assume that all collections are sharded, when we get there
+ for (int i = 0; i < 4; i++) {
+ if (i == 3) {
+ warning() << "too many tries updating initial version of " << ns << " on shard primary "
+ << dbPrimaryShardStr
+ << ", other mongoses may not see the collection as sharded immediately";
+ break;
+ }
- logChange(NULL, "shardCollection", ns, finishDetail.obj());
+ try {
+ const auto shard = grid.shardRegistry()->getShard(dbPrimaryShardId);
+ ShardConnection conn(shard->getConnString(), ns);
+ bool isVersionSet = conn.setVersion();
+ conn.done();
+ if (!isVersionSet) {
+ warning() << "could not update initial version of " << ns << " on shard primary "
+ << dbPrimaryShardStr;
+ } else {
+ break;
+ }
+ } catch (const DBException& e) {
+ warning() << "could not update initial version of " << ns << " on shard primary "
+ << dbPrimaryShardStr << causedBy(e);
+ }
- return Status::OK();
+ sleepsecs(i);
}
- Status CatalogManagerLegacy::createDatabase(const std::string& dbName) {
- invariant(nsIsDbOnly(dbName));
+ // Record finish in changelog
+ BSONObjBuilder finishDetail;
- // The admin and config databases should never be explicitly created. They "just exist",
- // i.e. getDatabase will always return an entry for them.
- invariant(dbName != "admin");
- invariant(dbName != "config");
+ finishDetail.append("version", manager->getVersion().toString());
- // Lock the database globally to prevent conflicts with simultaneous database creation.
- auto scopedDistLock = getDistLockManager()->lock(dbName,
- "createDatabase",
- Seconds{5},
- Milliseconds{500});
- if (!scopedDistLock.isOK()) {
- return scopedDistLock.getStatus();
- }
-
- // Check for case sensitivity violations
- auto status = _checkDbDoesNotExist(dbName);
- if (!status.isOK()) {
- return status;
- }
+ logChange(NULL, "shardCollection", ns, finishDetail.obj());
- // Database does not exist, pick a shard and create a new entry
- const ShardPtr primaryShard = Shard::pick();
- if (!primaryShard) {
- return Status(ErrorCodes::ShardNotFound, "can't find a shard to put new db on");
- }
+ return Status::OK();
+}
- log() << "Placing [" << dbName << "] on: " << primaryShard->toString();
+Status CatalogManagerLegacy::createDatabase(const std::string& dbName) {
+ invariant(nsIsDbOnly(dbName));
- DatabaseType db;
- db.setName(dbName);
- db.setPrimary(primaryShard->getId());
- db.setSharded(false);
+ // The admin and config databases should never be explicitly created. They "just exist",
+ // i.e. getDatabase will always return an entry for them.
+ invariant(dbName != "admin");
+ invariant(dbName != "config");
- BatchedCommandResponse response;
- status = insert(DatabaseType::ConfigNS, db.toBSON(), &response);
- if (status.isOK()) {
- return status;
- }
+ // Lock the database globally to prevent conflicts with simultaneous database creation.
+ auto scopedDistLock =
+ getDistLockManager()->lock(dbName, "createDatabase", Seconds{5}, Milliseconds{500});
+ if (!scopedDistLock.isOK()) {
+ return scopedDistLock.getStatus();
+ }
- if (status.code() == ErrorCodes::DuplicateKey) {
- return Status(ErrorCodes::NamespaceExists, "database " + dbName + " already exists");
- }
+ // Check for case sensitivity violations
+ auto status = _checkDbDoesNotExist(dbName);
+ if (!status.isOK()) {
+ return status;
+ }
- return Status(status.code(), str::stream() << "database metadata write failed for "
- << dbName << ". Error: " << response.toBSON());
+ // Database does not exist, pick a shard and create a new entry
+ const ShardPtr primaryShard = Shard::pick();
+ if (!primaryShard) {
+ return Status(ErrorCodes::ShardNotFound, "can't find a shard to put new db on");
}
- StatusWith<string> CatalogManagerLegacy::addShard(const string& name,
- const ConnectionString& shardConnectionString,
- const long long maxSize) {
+ log() << "Placing [" << dbName << "] on: " << primaryShard->toString();
- string shardName;
- ReplicaSetMonitorPtr rsMonitor;
- vector<string> dbNames;
+ DatabaseType db;
+ db.setName(dbName);
+ db.setPrimary(primaryShard->getId());
+ db.setSharded(false);
- try {
- ScopedDbConnection newShardConn(shardConnectionString);
- newShardConn->getLastError();
-
- StatusWith<string> validShard = isValidShard(name,
- shardConnectionString,
- newShardConn);
- if (!validShard.isOK()) {
- newShardConn.done();
- return validShard.getStatus();
- }
- shardName = validShard.getValue();
+ BatchedCommandResponse response;
+ status = insert(DatabaseType::ConfigNS, db.toBSON(), &response);
+ if (status.isOK()) {
+ return status;
+ }
- StatusWith<vector<string>> shardDBNames = getDBNames(shardConnectionString,
- newShardConn);
- if (!shardDBNames.isOK()) {
- newShardConn.done();
- return shardDBNames.getStatus();
- }
- dbNames = shardDBNames.getValue();
+ if (status.code() == ErrorCodes::DuplicateKey) {
+ return Status(ErrorCodes::NamespaceExists, "database " + dbName + " already exists");
+ }
- if (newShardConn->type() == ConnectionString::SET) {
- rsMonitor = ReplicaSetMonitor::get(shardConnectionString.getSetName());
- }
+ return Status(status.code(),
+ str::stream() << "database metadata write failed for " << dbName
+ << ". Error: " << response.toBSON());
+}
+
+StatusWith<string> CatalogManagerLegacy::addShard(const string& name,
+ const ConnectionString& shardConnectionString,
+ const long long maxSize) {
+ string shardName;
+ ReplicaSetMonitorPtr rsMonitor;
+ vector<string> dbNames;
+ try {
+ ScopedDbConnection newShardConn(shardConnectionString);
+ newShardConn->getLastError();
+
+ StatusWith<string> validShard = isValidShard(name, shardConnectionString, newShardConn);
+ if (!validShard.isOK()) {
newShardConn.done();
+ return validShard.getStatus();
}
- catch (const DBException& e) {
- if (shardConnectionString.type() == ConnectionString::SET) {
- shardConnectionPool.removeHost(shardConnectionString.getSetName());
- ReplicaSetMonitor::remove(shardConnectionString.getSetName());
- }
+ shardName = validShard.getValue();
- return Status(ErrorCodes::OperationFailed,
- str::stream() << "couldn't connect to new shard "
- << e.what());
+ StatusWith<vector<string>> shardDBNames = getDBNames(shardConnectionString, newShardConn);
+ if (!shardDBNames.isOK()) {
+ newShardConn.done();
+ return shardDBNames.getStatus();
}
+ dbNames = shardDBNames.getValue();
- // check that none of the existing shard candidate's db's exist elsewhere
- for (vector<string>::const_iterator it = dbNames.begin(); it != dbNames.end(); ++it) {
- StatusWith<DatabaseType> dbt = getDatabase(*it);
- if (dbt.isOK()) {
- return Status(ErrorCodes::OperationFailed,
- str::stream() << "can't add shard "
- << "'" << shardConnectionString.toString() << "'"
- << " because a local database '" << *it
- << "' exists in another "
- << dbt.getValue().getPrimary());
- }
+ if (newShardConn->type() == ConnectionString::SET) {
+ rsMonitor = ReplicaSetMonitor::get(shardConnectionString.getSetName());
}
- // if a name for a shard wasn't provided, pick one.
- if (shardName.empty()) {
- StatusWith<string> result = _getNewShardName();
- if (!result.isOK()) {
- return Status(ErrorCodes::OperationFailed,
- "error generating new shard name");
- }
- shardName = result.getValue();
+ newShardConn.done();
+ } catch (const DBException& e) {
+ if (shardConnectionString.type() == ConnectionString::SET) {
+ shardConnectionPool.removeHost(shardConnectionString.getSetName());
+ ReplicaSetMonitor::remove(shardConnectionString.getSetName());
}
- // build the ConfigDB shard document
- BSONObjBuilder b;
- b.append(ShardType::name(), shardName);
- b.append(ShardType::host(),
- rsMonitor ? rsMonitor->getServerAddress() : shardConnectionString.toString());
- if (maxSize > 0) {
- b.append(ShardType::maxSizeMB(), maxSize);
- }
- BSONObj shardDoc = b.obj();
+ return Status(ErrorCodes::OperationFailed,
+ str::stream() << "couldn't connect to new shard " << e.what());
+ }
- if (isShardHost(shardConnectionString)) {
- return Status(ErrorCodes::OperationFailed, "host already used");
+ // check that none of the existing shard candidate's db's exist elsewhere
+ for (vector<string>::const_iterator it = dbNames.begin(); it != dbNames.end(); ++it) {
+ StatusWith<DatabaseType> dbt = getDatabase(*it);
+ if (dbt.isOK()) {
+ return Status(ErrorCodes::OperationFailed,
+ str::stream() << "can't add shard "
+ << "'" << shardConnectionString.toString() << "'"
+ << " because a local database '" << *it
+ << "' exists in another " << dbt.getValue().getPrimary());
}
+ }
- log() << "going to add shard: " << shardDoc;
-
- Status result = insert(ShardType::ConfigNS, shardDoc, NULL);
+ // if a name for a shard wasn't provided, pick one.
+ if (shardName.empty()) {
+ StatusWith<string> result = _getNewShardName();
if (!result.isOK()) {
- log() << "error adding shard: " << shardDoc << " err: " << result.reason();
- return result;
+ return Status(ErrorCodes::OperationFailed, "error generating new shard name");
}
+ shardName = result.getValue();
+ }
- Shard::reloadShardInfo();
-
- // add all databases of the new shard
- for (vector<string>::const_iterator it = dbNames.begin(); it != dbNames.end(); ++it) {
- DatabaseType dbt;
- dbt.setName(*it);
- dbt.setPrimary(shardName);
- dbt.setSharded(false);
- Status status = updateDatabase(*it, dbt);
- if (!status.isOK()) {
- log() << "adding shard " << shardConnectionString.toString()
- << " even though could not add database " << *it;
- }
- }
+ // build the ConfigDB shard document
+ BSONObjBuilder b;
+ b.append(ShardType::name(), shardName);
+ b.append(ShardType::host(),
+ rsMonitor ? rsMonitor->getServerAddress() : shardConnectionString.toString());
+ if (maxSize > 0) {
+ b.append(ShardType::maxSizeMB(), maxSize);
+ }
+ BSONObj shardDoc = b.obj();
- // Record in changelog
- BSONObjBuilder shardDetails;
- shardDetails.append("name", shardName);
- shardDetails.append("host", shardConnectionString.toString());
+ if (isShardHost(shardConnectionString)) {
+ return Status(ErrorCodes::OperationFailed, "host already used");
+ }
- logChange(NULL, "addShard", "", shardDetails.obj());
+ log() << "going to add shard: " << shardDoc;
- return shardName;
+ Status result = insert(ShardType::ConfigNS, shardDoc, NULL);
+ if (!result.isOK()) {
+ log() << "error adding shard: " << shardDoc << " err: " << result.reason();
+ return result;
}
- StatusWith<ShardDrainingStatus> CatalogManagerLegacy::removeShard(OperationContext* txn,
- const std::string& name) {
- ScopedDbConnection conn(_configServerConnectionString, 30);
+ Shard::reloadShardInfo();
- if (conn->count(ShardType::ConfigNS,
- BSON(ShardType::name() << NE << name
- << ShardType::draining(true)))) {
- conn.done();
- return Status(ErrorCodes::ConflictingOperationInProgress,
- "Can't have more than one draining shard at a time");
+ // add all databases of the new shard
+ for (vector<string>::const_iterator it = dbNames.begin(); it != dbNames.end(); ++it) {
+ DatabaseType dbt;
+ dbt.setName(*it);
+ dbt.setPrimary(shardName);
+ dbt.setSharded(false);
+ Status status = updateDatabase(*it, dbt);
+ if (!status.isOK()) {
+ log() << "adding shard " << shardConnectionString.toString()
+ << " even though could not add database " << *it;
}
+ }
- if (conn->count(ShardType::ConfigNS,
- BSON(ShardType::name() << NE << name)) == 0) {
- conn.done();
- return Status(ErrorCodes::IllegalOperation,
- "Can't remove last shard");
- }
+ // Record in changelog
+ BSONObjBuilder shardDetails;
+ shardDetails.append("name", shardName);
+ shardDetails.append("host", shardConnectionString.toString());
- BSONObj searchDoc = BSON(ShardType::name() << name);
+ logChange(NULL, "addShard", "", shardDetails.obj());
- // Case 1: start draining chunks
- BSONObj drainingDoc = BSON(ShardType::name() << name << ShardType::draining(true));
- BSONObj shardDoc = conn->findOne(ShardType::ConfigNS, drainingDoc);
- if (shardDoc.isEmpty()) {
- log() << "going to start draining shard: " << name;
- BSONObj newStatus = BSON("$set" << BSON(ShardType::draining(true)));
+ return shardName;
+}
- Status status = update(ShardType::ConfigNS, searchDoc, newStatus, false, false, NULL);
- if (!status.isOK()) {
- log() << "error starting removeShard: " << name
- << "; err: " << status.reason();
- return status;
- }
+StatusWith<ShardDrainingStatus> CatalogManagerLegacy::removeShard(OperationContext* txn,
+ const std::string& name) {
+ ScopedDbConnection conn(_configServerConnectionString, 30);
- BSONObj primaryLocalDoc = BSON(DatabaseType::name("local") <<
- DatabaseType::primary(name));
- log() << "primaryLocalDoc: " << primaryLocalDoc;
- if (conn->count(DatabaseType::ConfigNS, primaryLocalDoc)) {
- log() << "This shard is listed as primary of local db. Removing entry.";
-
- Status status = remove(DatabaseType::ConfigNS,
- BSON(DatabaseType::name("local")),
- 0,
- NULL);
- if (!status.isOK()) {
- log() << "error removing local db: "
- << status.reason();
- return status;
- }
- }
+ if (conn->count(ShardType::ConfigNS,
+ BSON(ShardType::name() << NE << name << ShardType::draining(true)))) {
+ conn.done();
+ return Status(ErrorCodes::ConflictingOperationInProgress,
+ "Can't have more than one draining shard at a time");
+ }
- Shard::reloadShardInfo();
- conn.done();
+ if (conn->count(ShardType::ConfigNS, BSON(ShardType::name() << NE << name)) == 0) {
+ conn.done();
+ return Status(ErrorCodes::IllegalOperation, "Can't remove last shard");
+ }
+
+ BSONObj searchDoc = BSON(ShardType::name() << name);
+
+ // Case 1: start draining chunks
+ BSONObj drainingDoc = BSON(ShardType::name() << name << ShardType::draining(true));
+ BSONObj shardDoc = conn->findOne(ShardType::ConfigNS, drainingDoc);
+ if (shardDoc.isEmpty()) {
+ log() << "going to start draining shard: " << name;
+ BSONObj newStatus = BSON("$set" << BSON(ShardType::draining(true)));
- // Record start in changelog
- logChange(txn, "removeShard.start", "", buildRemoveLogEntry(name, true));
- return ShardDrainingStatus::STARTED;
+ Status status = update(ShardType::ConfigNS, searchDoc, newStatus, false, false, NULL);
+ if (!status.isOK()) {
+ log() << "error starting removeShard: " << name << "; err: " << status.reason();
+ return status;
}
- // Case 2: all chunks drained
- BSONObj shardIDDoc = BSON(ChunkType::shard(shardDoc[ShardType::name()].str()));
- long long chunkCount = conn->count(ChunkType::ConfigNS, shardIDDoc);
- long long dbCount = conn->count(DatabaseType::ConfigNS,
- BSON(DatabaseType::name.ne("local")
- << DatabaseType::primary(name)));
- if (chunkCount == 0 && dbCount == 0) {
- log() << "going to remove shard: " << name;
- audit::logRemoveShard(ClientBasic::getCurrent(), name);
-
- Status status = remove(ShardType::ConfigNS, searchDoc, 0, NULL);
+ BSONObj primaryLocalDoc = BSON(DatabaseType::name("local") << DatabaseType::primary(name));
+ log() << "primaryLocalDoc: " << primaryLocalDoc;
+ if (conn->count(DatabaseType::ConfigNS, primaryLocalDoc)) {
+ log() << "This shard is listed as primary of local db. Removing entry.";
+
+ Status status =
+ remove(DatabaseType::ConfigNS, BSON(DatabaseType::name("local")), 0, NULL);
if (!status.isOK()) {
- log() << "Error concluding removeShard operation on: " << name
- << "; err: " << status.reason();
+ log() << "error removing local db: " << status.reason();
return status;
}
+ }
- grid.shardRegistry()->remove(name);
-
- shardConnectionPool.removeHost(name);
- ReplicaSetMonitor::remove(name);
+ Shard::reloadShardInfo();
+ conn.done();
- Shard::reloadShardInfo();
- conn.done();
+ // Record start in changelog
+ logChange(txn, "removeShard.start", "", buildRemoveLogEntry(name, true));
+ return ShardDrainingStatus::STARTED;
+ }
- // Record finish in changelog
- logChange(txn, "removeShard", "", buildRemoveLogEntry(name, false));
- return ShardDrainingStatus::COMPLETED;
+ // Case 2: all chunks drained
+ BSONObj shardIDDoc = BSON(ChunkType::shard(shardDoc[ShardType::name()].str()));
+ long long chunkCount = conn->count(ChunkType::ConfigNS, shardIDDoc);
+ long long dbCount =
+ conn->count(DatabaseType::ConfigNS,
+ BSON(DatabaseType::name.ne("local") << DatabaseType::primary(name)));
+ if (chunkCount == 0 && dbCount == 0) {
+ log() << "going to remove shard: " << name;
+ audit::logRemoveShard(ClientBasic::getCurrent(), name);
+
+ Status status = remove(ShardType::ConfigNS, searchDoc, 0, NULL);
+ if (!status.isOK()) {
+ log() << "Error concluding removeShard operation on: " << name
+ << "; err: " << status.reason();
+ return status;
}
- // case 3: draining ongoing
- return ShardDrainingStatus::ONGOING;
- }
+ grid.shardRegistry()->remove(name);
- Status CatalogManagerLegacy::updateDatabase(const std::string& dbName, const DatabaseType& db) {
- fassert(28616, db.validate());
+ shardConnectionPool.removeHost(name);
+ ReplicaSetMonitor::remove(name);
- BatchedCommandResponse response;
- Status status = update(DatabaseType::ConfigNS,
- BSON(DatabaseType::name(dbName)),
- db.toBSON(),
- true, // upsert
- false, // multi
- &response);
- if (!status.isOK()) {
- return Status(status.code(),
- str::stream() << "database metadata write failed: "
- << response.toBSON() << "; status: " << status.toString());
- }
+ Shard::reloadShardInfo();
+ conn.done();
- return Status::OK();
+ // Record finish in changelog
+ logChange(txn, "removeShard", "", buildRemoveLogEntry(name, false));
+ return ShardDrainingStatus::COMPLETED;
}
- StatusWith<DatabaseType> CatalogManagerLegacy::getDatabase(const std::string& dbName) {
- invariant(nsIsDbOnly(dbName));
+ // case 3: draining ongoing
+ return ShardDrainingStatus::ONGOING;
+}
+
+Status CatalogManagerLegacy::updateDatabase(const std::string& dbName, const DatabaseType& db) {
+ fassert(28616, db.validate());
+
+ BatchedCommandResponse response;
+ Status status = update(DatabaseType::ConfigNS,
+ BSON(DatabaseType::name(dbName)),
+ db.toBSON(),
+ true, // upsert
+ false, // multi
+ &response);
+ if (!status.isOK()) {
+ return Status(status.code(),
+ str::stream() << "database metadata write failed: " << response.toBSON()
+ << "; status: " << status.toString());
+ }
- // The two databases that are hosted on the config server are config and admin
- if (dbName == "config" || dbName == "admin") {
- DatabaseType dbt;
- dbt.setName(dbName);
- dbt.setSharded(false);
- dbt.setPrimary("config");
+ return Status::OK();
+}
- return dbt;
- }
+StatusWith<DatabaseType> CatalogManagerLegacy::getDatabase(const std::string& dbName) {
+ invariant(nsIsDbOnly(dbName));
- ScopedDbConnection conn(_configServerConnectionString, 30.0);
-
- BSONObj dbObj = conn->findOne(DatabaseType::ConfigNS, BSON(DatabaseType::name(dbName)));
- if (dbObj.isEmpty()) {
- conn.done();
- return Status(ErrorCodes::DatabaseNotFound,
- stream() << "database " << dbName << " not found");
- }
+ // The two databases that are hosted on the config server are config and admin
+ if (dbName == "config" || dbName == "admin") {
+ DatabaseType dbt;
+ dbt.setName(dbName);
+ dbt.setSharded(false);
+ dbt.setPrimary("config");
- conn.done();
- return DatabaseType::fromBSON(dbObj);
+ return dbt;
}
- Status CatalogManagerLegacy::updateCollection(const std::string& collNs,
- const CollectionType& coll) {
- fassert(28634, coll.validate());
+ ScopedDbConnection conn(_configServerConnectionString, 30.0);
- BatchedCommandResponse response;
- Status status = update(CollectionType::ConfigNS,
- BSON(CollectionType::fullNs(collNs)),
- coll.toBSON(),
- true, // upsert
- false, // multi
- &response);
- if (!status.isOK()) {
- return Status(status.code(),
- str::stream() << "collection metadata write failed: "
- << response.toBSON() << "; status: " << status.toString());
- }
+ BSONObj dbObj = conn->findOne(DatabaseType::ConfigNS, BSON(DatabaseType::name(dbName)));
+ if (dbObj.isEmpty()) {
+ conn.done();
+ return Status(ErrorCodes::DatabaseNotFound,
+ stream() << "database " << dbName << " not found");
+ }
- return Status::OK();
+ conn.done();
+ return DatabaseType::fromBSON(dbObj);
+}
+
+Status CatalogManagerLegacy::updateCollection(const std::string& collNs,
+ const CollectionType& coll) {
+ fassert(28634, coll.validate());
+
+ BatchedCommandResponse response;
+ Status status = update(CollectionType::ConfigNS,
+ BSON(CollectionType::fullNs(collNs)),
+ coll.toBSON(),
+ true, // upsert
+ false, // multi
+ &response);
+ if (!status.isOK()) {
+ return Status(status.code(),
+ str::stream() << "collection metadata write failed: " << response.toBSON()
+ << "; status: " << status.toString());
}
- StatusWith<CollectionType> CatalogManagerLegacy::getCollection(const std::string& collNs) {
- ScopedDbConnection conn(_configServerConnectionString, 30.0);
+ return Status::OK();
+}
- BSONObj collObj = conn->findOne(CollectionType::ConfigNS,
- BSON(CollectionType::fullNs(collNs)));
- if (collObj.isEmpty()) {
- conn.done();
- return Status(ErrorCodes::NamespaceNotFound,
- stream() << "collection " << collNs << " not found");
- }
+StatusWith<CollectionType> CatalogManagerLegacy::getCollection(const std::string& collNs) {
+ ScopedDbConnection conn(_configServerConnectionString, 30.0);
+ BSONObj collObj = conn->findOne(CollectionType::ConfigNS, BSON(CollectionType::fullNs(collNs)));
+ if (collObj.isEmpty()) {
conn.done();
- return CollectionType::fromBSON(collObj);
+ return Status(ErrorCodes::NamespaceNotFound,
+ stream() << "collection " << collNs << " not found");
}
- Status CatalogManagerLegacy::getCollections(const std::string* dbName,
- std::vector<CollectionType>* collections) {
- collections->clear();
+ conn.done();
+ return CollectionType::fromBSON(collObj);
+}
- BSONObjBuilder b;
- if (dbName) {
- invariant(!dbName->empty());
- b.appendRegex(CollectionType::fullNs(),
- (string)"^" + pcrecpp::RE::QuoteMeta(*dbName) + "\\.");
- }
+Status CatalogManagerLegacy::getCollections(const std::string* dbName,
+ std::vector<CollectionType>* collections) {
+ collections->clear();
- ScopedDbConnection conn(_configServerConnectionString, 30.0);
+ BSONObjBuilder b;
+ if (dbName) {
+ invariant(!dbName->empty());
+ b.appendRegex(CollectionType::fullNs(),
+ (string) "^" + pcrecpp::RE::QuoteMeta(*dbName) + "\\.");
+ }
- std::unique_ptr<DBClientCursor> cursor(_safeCursor(conn->query(CollectionType::ConfigNS,
- b.obj())));
+ ScopedDbConnection conn(_configServerConnectionString, 30.0);
- while (cursor->more()) {
- const BSONObj collObj = cursor->next();
+ std::unique_ptr<DBClientCursor> cursor(
+ _safeCursor(conn->query(CollectionType::ConfigNS, b.obj())));
- auto status = CollectionType::fromBSON(collObj);
- if (!status.isOK()) {
- conn.done();
- return status.getStatus();
- }
+ while (cursor->more()) {
+ const BSONObj collObj = cursor->next();
- collections->push_back(status.getValue());
+ auto status = CollectionType::fromBSON(collObj);
+ if (!status.isOK()) {
+ conn.done();
+ return status.getStatus();
}
- conn.done();
- return Status::OK();
+ collections->push_back(status.getValue());
}
- Status CatalogManagerLegacy::dropCollection(const std::string& collectionNs) {
- logChange(NULL, "dropCollection.start", collectionNs, BSONObj());
+ conn.done();
+ return Status::OK();
+}
- // Lock the collection globally so that split/migrate cannot run
- auto scopedDistLock = getDistLockManager()->lock(collectionNs, "drop");
- if (!scopedDistLock.isOK()) {
- return scopedDistLock.getStatus();
- }
+Status CatalogManagerLegacy::dropCollection(const std::string& collectionNs) {
+ logChange(NULL, "dropCollection.start", collectionNs, BSONObj());
- LOG(1) << "dropCollection " << collectionNs << " started";
+ // Lock the collection globally so that split/migrate cannot run
+ auto scopedDistLock = getDistLockManager()->lock(collectionNs, "drop");
+ if (!scopedDistLock.isOK()) {
+ return scopedDistLock.getStatus();
+ }
- // This cleans up the collection on all shards
- vector<ShardType> allShards;
- Status status = getAllShards(&allShards);
- if (!status.isOK()) {
- return status;
- }
+ LOG(1) << "dropCollection " << collectionNs << " started";
- LOG(1) << "dropCollection " << collectionNs << " locked";
+ // This cleans up the collection on all shards
+ vector<ShardType> allShards;
+ Status status = getAllShards(&allShards);
+ if (!status.isOK()) {
+ return status;
+ }
- map<string, BSONObj> errors;
+ LOG(1) << "dropCollection " << collectionNs << " locked";
- // Delete data from all mongods
- for (vector<ShardType>::const_iterator i = allShards.begin(); i != allShards.end(); i++) {
- const auto shard = grid.shardRegistry()->getShard(i->getName());
- ScopedDbConnection conn(shard->getConnString());
+ map<string, BSONObj> errors;
- BSONObj info;
- if (!conn->dropCollection(collectionNs, &info)) {
- // Ignore the database not found errors
- if (info["code"].isNumber() &&
- (info["code"].Int() == ErrorCodes::NamespaceNotFound)) {
- conn.done();
- continue;
- }
+ // Delete data from all mongods
+ for (vector<ShardType>::const_iterator i = allShards.begin(); i != allShards.end(); i++) {
+ const auto shard = grid.shardRegistry()->getShard(i->getName());
+ ScopedDbConnection conn(shard->getConnString());
- errors[shard->getConnString().toString()] = info;
+ BSONObj info;
+ if (!conn->dropCollection(collectionNs, &info)) {
+ // Ignore the database not found errors
+ if (info["code"].isNumber() && (info["code"].Int() == ErrorCodes::NamespaceNotFound)) {
+ conn.done();
+ continue;
}
- conn.done();
+ errors[shard->getConnString().toString()] = info;
}
- if (!errors.empty()) {
- StringBuilder sb;
- sb << "Dropping collection failed on the following hosts: ";
-
- for (map<string, BSONObj>::const_iterator it = errors.begin();
- it != errors.end();
- ++it) {
+ conn.done();
+ }
- if (it != errors.begin()) {
- sb << ", ";
- }
+ if (!errors.empty()) {
+ StringBuilder sb;
+ sb << "Dropping collection failed on the following hosts: ";
- sb << it->first << ": " << it->second;
+ for (map<string, BSONObj>::const_iterator it = errors.begin(); it != errors.end(); ++it) {
+ if (it != errors.begin()) {
+ sb << ", ";
}
- return Status(ErrorCodes::OperationFailed, sb.str());
+ sb << it->first << ": " << it->second;
}
- LOG(1) << "dropCollection " << collectionNs << " shard data deleted";
-
- // remove chunk data
- Status result = remove(ChunkType::ConfigNS,
- BSON(ChunkType::ns(collectionNs)),
- 0,
- NULL);
- if (!result.isOK()) {
- return result;
- }
+ return Status(ErrorCodes::OperationFailed, sb.str());
+ }
- LOG(1) << "dropCollection " << collectionNs << " chunk data deleted";
+ LOG(1) << "dropCollection " << collectionNs << " shard data deleted";
- for (vector<ShardType>::const_iterator i = allShards.begin(); i != allShards.end(); i++) {
- const auto shard = grid.shardRegistry()->getShard(i->getName());
- ScopedDbConnection conn(shard->getConnString());
+ // remove chunk data
+ Status result = remove(ChunkType::ConfigNS, BSON(ChunkType::ns(collectionNs)), 0, NULL);
+ if (!result.isOK()) {
+ return result;
+ }
- BSONObj res;
+ LOG(1) << "dropCollection " << collectionNs << " chunk data deleted";
- // this is horrible
- // we need a special command for dropping on the d side
- // this hack works for the moment
+ for (vector<ShardType>::const_iterator i = allShards.begin(); i != allShards.end(); i++) {
+ const auto shard = grid.shardRegistry()->getShard(i->getName());
+ ScopedDbConnection conn(shard->getConnString());
- if (!setShardVersion(conn.conn(),
- collectionNs,
- _configServerConnectionString.toString(),
- ChunkVersion(0, 0, OID()),
- NULL,
- true,
- res)) {
+ BSONObj res;
- return Status(static_cast<ErrorCodes::Error>(8071),
- str::stream() << "cleaning up after drop failed: " << res);
- }
+ // this is horrible
+ // we need a special command for dropping on the d side
+ // this hack works for the moment
- conn->simpleCommand("admin", 0, "unsetSharding");
- conn.done();
+ if (!setShardVersion(conn.conn(),
+ collectionNs,
+ _configServerConnectionString.toString(),
+ ChunkVersion(0, 0, OID()),
+ NULL,
+ true,
+ res)) {
+ return Status(static_cast<ErrorCodes::Error>(8071),
+ str::stream() << "cleaning up after drop failed: " << res);
}
- LOG(1) << "dropCollection " << collectionNs << " completed";
+ conn->simpleCommand("admin", 0, "unsetSharding");
+ conn.done();
+ }
- logChange(NULL, "dropCollection", collectionNs, BSONObj());
+ LOG(1) << "dropCollection " << collectionNs << " completed";
- return Status::OK();
- }
+ logChange(NULL, "dropCollection", collectionNs, BSONObj());
- void CatalogManagerLegacy::logAction(const ActionLogType& actionLog) {
- // Create the action log collection and ensure that it is capped. Wrap in try/catch,
- // because creating an existing collection throws.
- if (actionLogCollectionCreated.load() == 0) {
- try {
- ScopedDbConnection conn(_configServerConnectionString, 30.0);
- conn->createCollection(ActionLogType::ConfigNS, 1024 * 1024 * 2, true);
- conn.done();
+ return Status::OK();
+}
- actionLogCollectionCreated.store(1);
- }
- catch (const DBException& e) {
- // It's ok to ignore this exception
- LOG(1) << "couldn't create actionlog collection: " << e;
- }
- }
+void CatalogManagerLegacy::logAction(const ActionLogType& actionLog) {
+ // Create the action log collection and ensure that it is capped. Wrap in try/catch,
+ // because creating an existing collection throws.
+ if (actionLogCollectionCreated.load() == 0) {
+ try {
+ ScopedDbConnection conn(_configServerConnectionString, 30.0);
+ conn->createCollection(ActionLogType::ConfigNS, 1024 * 1024 * 2, true);
+ conn.done();
- Status result = insert(ActionLogType::ConfigNS, actionLog.toBSON(), NULL);
- if (!result.isOK()) {
- log() << "error encountered while logging action: " << result;
+ actionLogCollectionCreated.store(1);
+ } catch (const DBException& e) {
+ // It's ok to ignore this exception
+ LOG(1) << "couldn't create actionlog collection: " << e;
}
}
- void CatalogManagerLegacy::logChange(OperationContext* opCtx,
- const string& what,
- const string& ns,
- const BSONObj& detail) {
-
- // Create the change log collection and ensure that it is capped. Wrap in try/catch,
- // because creating an existing collection throws.
- if (changeLogCollectionCreated.load() == 0) {
- try {
- ScopedDbConnection conn(_configServerConnectionString, 30.0);
- conn->createCollection(ChangelogType::ConfigNS, 1024 * 1024 * 10, true);
- conn.done();
+ Status result = insert(ActionLogType::ConfigNS, actionLog.toBSON(), NULL);
+ if (!result.isOK()) {
+ log() << "error encountered while logging action: " << result;
+ }
+}
+
+void CatalogManagerLegacy::logChange(OperationContext* opCtx,
+ const string& what,
+ const string& ns,
+ const BSONObj& detail) {
+ // Create the change log collection and ensure that it is capped. Wrap in try/catch,
+ // because creating an existing collection throws.
+ if (changeLogCollectionCreated.load() == 0) {
+ try {
+ ScopedDbConnection conn(_configServerConnectionString, 30.0);
+ conn->createCollection(ChangelogType::ConfigNS, 1024 * 1024 * 10, true);
+ conn.done();
- changeLogCollectionCreated.store(1);
- }
- catch (const UserException& e) {
- // It's ok to ignore this exception
- LOG(1) << "couldn't create changelog collection: " << e;
- }
+ changeLogCollectionCreated.store(1);
+ } catch (const UserException& e) {
+ // It's ok to ignore this exception
+ LOG(1) << "couldn't create changelog collection: " << e;
}
+ }
- // Store this entry's ID so we can use on the exception code path too
- StringBuilder changeIdBuilder;
- changeIdBuilder << getHostNameCached() << "-" << terseCurrentTime()
- << "-" << OID::gen();
+ // Store this entry's ID so we can use on the exception code path too
+ StringBuilder changeIdBuilder;
+ changeIdBuilder << getHostNameCached() << "-" << terseCurrentTime() << "-" << OID::gen();
- const string changeID = changeIdBuilder.str();
+ const string changeID = changeIdBuilder.str();
- Client* client;
- if (opCtx) {
- client = opCtx->getClient();
- }
- else if (haveClient()) {
- client = &cc();
- }
- else {
- client = nullptr;
- }
+ Client* client;
+ if (opCtx) {
+ client = opCtx->getClient();
+ } else if (haveClient()) {
+ client = &cc();
+ } else {
+ client = nullptr;
+ }
- // Send a copy of the message to the local log in case it doesn't manage to reach
- // config.changelog
- BSONObj msg = BSON(ChangelogType::changeID(changeID) <<
- ChangelogType::server(getHostNameCached()) <<
- ChangelogType::clientAddr((client ?
- client->clientAddress(true) : "")) <<
- ChangelogType::time(jsTime()) <<
- ChangelogType::what(what) <<
- ChangelogType::ns(ns) <<
- ChangelogType::details(detail));
+ // Send a copy of the message to the local log in case it doesn't manage to reach
+ // config.changelog
+ BSONObj msg = BSON(ChangelogType::changeID(changeID)
+ << ChangelogType::server(getHostNameCached())
+ << ChangelogType::clientAddr((client ? client->clientAddress(true) : ""))
+ << ChangelogType::time(jsTime()) << ChangelogType::what(what)
+ << ChangelogType::ns(ns) << ChangelogType::details(detail));
- log() << "about to log metadata event: " << msg;
+ log() << "about to log metadata event: " << msg;
- Status result = insert(ChangelogType::ConfigNS, msg, NULL);
- if (!result.isOK()) {
- warning() << "Error encountered while logging config change with ID "
- << changeID << ": " << result;
- }
+ Status result = insert(ChangelogType::ConfigNS, msg, NULL);
+ if (!result.isOK()) {
+ warning() << "Error encountered while logging config change with ID " << changeID << ": "
+ << result;
}
+}
- StatusWith<SettingsType> CatalogManagerLegacy::getGlobalSettings(const string& key) {
- try {
- ScopedDbConnection conn(_configServerConnectionString, 30);
- BSONObj settingsDoc = conn->findOne(SettingsType::ConfigNS,
- BSON(SettingsType::key(key)));
- conn.done();
-
- if (settingsDoc.isEmpty()) {
- return Status(ErrorCodes::NoMatchingDocument,
- str::stream() << "can't find settings document with key: " << key);
- }
+StatusWith<SettingsType> CatalogManagerLegacy::getGlobalSettings(const string& key) {
+ try {
+ ScopedDbConnection conn(_configServerConnectionString, 30);
+ BSONObj settingsDoc = conn->findOne(SettingsType::ConfigNS, BSON(SettingsType::key(key)));
+ conn.done();
- StatusWith<SettingsType> settingsResult = SettingsType::fromBSON(settingsDoc);
- if (!settingsResult.isOK()) {
- return Status(ErrorCodes::FailedToParse,
- str::stream() << "error while parsing settings document: "
- << settingsDoc
- << " : " << settingsResult.getStatus().toString());
- }
+ if (settingsDoc.isEmpty()) {
+ return Status(ErrorCodes::NoMatchingDocument,
+ str::stream() << "can't find settings document with key: " << key);
+ }
- const SettingsType& settings = settingsResult.getValue();
+ StatusWith<SettingsType> settingsResult = SettingsType::fromBSON(settingsDoc);
+ if (!settingsResult.isOK()) {
+ return Status(ErrorCodes::FailedToParse,
+ str::stream() << "error while parsing settings document: " << settingsDoc
+ << " : " << settingsResult.getStatus().toString());
+ }
- Status validationStatus = settings.validate();
- if (!validationStatus.isOK()) {
- return validationStatus;
- }
+ const SettingsType& settings = settingsResult.getValue();
- return settingsResult;
- }
- catch (const DBException& ex) {
- return Status(ErrorCodes::OperationFailed,
- str::stream() << "unable to successfully obtain "
- << "config.settings document: " << causedBy(ex));
+ Status validationStatus = settings.validate();
+ if (!validationStatus.isOK()) {
+ return validationStatus;
}
- }
- Status CatalogManagerLegacy::getDatabasesForShard(const string& shardName,
- vector<string>* dbs) {
- dbs->clear();
-
- try {
- ScopedDbConnection conn(_configServerConnectionString, 30.0);
- std::unique_ptr<DBClientCursor> cursor(_safeCursor(
- conn->query(DatabaseType::ConfigNS,
- Query(BSON(DatabaseType::primary(shardName))))));
- if (!cursor.get()) {
- conn.done();
- return Status(ErrorCodes::HostUnreachable, "unable to open chunk cursor");
- }
+ return settingsResult;
+ } catch (const DBException& ex) {
+ return Status(ErrorCodes::OperationFailed,
+ str::stream() << "unable to successfully obtain "
+ << "config.settings document: " << causedBy(ex));
+ }
+}
- while (cursor->more()) {
- BSONObj shard = cursor->nextSafe();
- dbs->push_back(shard[DatabaseType::name()].str());
- }
+Status CatalogManagerLegacy::getDatabasesForShard(const string& shardName, vector<string>* dbs) {
+ dbs->clear();
+ try {
+ ScopedDbConnection conn(_configServerConnectionString, 30.0);
+ std::unique_ptr<DBClientCursor> cursor(_safeCursor(
+ conn->query(DatabaseType::ConfigNS, Query(BSON(DatabaseType::primary(shardName))))));
+ if (!cursor.get()) {
conn.done();
+ return Status(ErrorCodes::HostUnreachable, "unable to open chunk cursor");
}
- catch (const DBException& ex) {
- return ex.toStatus();
+
+ while (cursor->more()) {
+ BSONObj shard = cursor->nextSafe();
+ dbs->push_back(shard[DatabaseType::name()].str());
}
- return Status::OK();
+ conn.done();
+ } catch (const DBException& ex) {
+ return ex.toStatus();
}
- Status CatalogManagerLegacy::getChunks(const Query& query,
- int nToReturn,
- vector<ChunkType>* chunks) {
- chunks->clear();
+ return Status::OK();
+}
- try {
- ScopedDbConnection conn(_configServerConnectionString, 30.0);
- std::unique_ptr<DBClientCursor> cursor(_safeCursor(conn->query(ChunkType::ConfigNS,
- query,
- nToReturn)));
- if (!cursor.get()) {
- conn.done();
- return Status(ErrorCodes::HostUnreachable, "unable to open chunk cursor");
- }
+Status CatalogManagerLegacy::getChunks(const Query& query,
+ int nToReturn,
+ vector<ChunkType>* chunks) {
+ chunks->clear();
- while (cursor->more()) {
- BSONObj chunkObj = cursor->nextSafe();
+ try {
+ ScopedDbConnection conn(_configServerConnectionString, 30.0);
+ std::unique_ptr<DBClientCursor> cursor(
+ _safeCursor(conn->query(ChunkType::ConfigNS, query, nToReturn)));
+ if (!cursor.get()) {
+ conn.done();
+ return Status(ErrorCodes::HostUnreachable, "unable to open chunk cursor");
+ }
- StatusWith<ChunkType> chunkRes = ChunkType::fromBSON(chunkObj);
- if (!chunkRes.isOK()) {
- conn.done();
- chunks->clear();
- return {ErrorCodes::FailedToParse,
- stream() << "Failed to parse chunk with id ("
- << chunkObj[ChunkType::name()].toString() << "): "
- << chunkRes.getStatus().reason()};
- }
+ while (cursor->more()) {
+ BSONObj chunkObj = cursor->nextSafe();
- chunks->push_back(chunkRes.getValue());
+ StatusWith<ChunkType> chunkRes = ChunkType::fromBSON(chunkObj);
+ if (!chunkRes.isOK()) {
+ conn.done();
+ chunks->clear();
+ return {ErrorCodes::FailedToParse,
+ stream() << "Failed to parse chunk with id ("
+ << chunkObj[ChunkType::name()].toString()
+ << "): " << chunkRes.getStatus().reason()};
}
- conn.done();
- }
- catch (const DBException& ex) {
- return ex.toStatus();
+ chunks->push_back(chunkRes.getValue());
}
- return Status::OK();
+ conn.done();
+ } catch (const DBException& ex) {
+ return ex.toStatus();
}
- Status CatalogManagerLegacy::getTagsForCollection(const std::string& collectionNs,
- std::vector<TagsType>* tags) {
- tags->clear();
+ return Status::OK();
+}
- try {
- ScopedDbConnection conn(_configServerConnectionString, 30);
- std::unique_ptr<DBClientCursor> cursor(_safeCursor(
- conn->query(TagsType::ConfigNS,
- Query(BSON(TagsType::ns(collectionNs)))
- .sort(TagsType::min()))));
- if (!cursor.get()) {
- conn.done();
- return Status(ErrorCodes::HostUnreachable, "unable to open tags cursor");
- }
+Status CatalogManagerLegacy::getTagsForCollection(const std::string& collectionNs,
+ std::vector<TagsType>* tags) {
+ tags->clear();
- while (cursor->more()) {
- BSONObj tagObj = cursor->nextSafe();
+ try {
+ ScopedDbConnection conn(_configServerConnectionString, 30);
+ std::unique_ptr<DBClientCursor> cursor(_safeCursor(conn->query(
+ TagsType::ConfigNS, Query(BSON(TagsType::ns(collectionNs))).sort(TagsType::min()))));
+ if (!cursor.get()) {
+ conn.done();
+ return Status(ErrorCodes::HostUnreachable, "unable to open tags cursor");
+ }
- StatusWith<TagsType> tagRes = TagsType::fromBSON(tagObj);
- if (!tagRes.isOK()) {
- conn.done();
- return Status(ErrorCodes::FailedToParse,
- str::stream() << "Failed to parse tag BSONObj: "
- << tagRes.getStatus().reason());
- }
+ while (cursor->more()) {
+ BSONObj tagObj = cursor->nextSafe();
- tags->push_back(tagRes.getValue());
+ StatusWith<TagsType> tagRes = TagsType::fromBSON(tagObj);
+ if (!tagRes.isOK()) {
+ conn.done();
+ return Status(ErrorCodes::FailedToParse,
+ str::stream() << "Failed to parse tag BSONObj: "
+ << tagRes.getStatus().reason());
}
- conn.done();
- }
- catch (const DBException& ex) {
- return ex.toStatus();
+ tags->push_back(tagRes.getValue());
}
- return Status::OK();
+ conn.done();
+ } catch (const DBException& ex) {
+ return ex.toStatus();
}
- StatusWith<string> CatalogManagerLegacy::getTagForChunk(const std::string& collectionNs,
- const ChunkType& chunk) {
- BSONObj tagDoc;
+ return Status::OK();
+}
- try {
- ScopedDbConnection conn(_configServerConnectionString, 30);
+StatusWith<string> CatalogManagerLegacy::getTagForChunk(const std::string& collectionNs,
+ const ChunkType& chunk) {
+ BSONObj tagDoc;
- Query query(BSON(TagsType::ns(collectionNs) <<
- TagsType::min() << BSON("$lte" << chunk.getMin()) <<
- TagsType::max() << BSON("$gte" << chunk.getMax())));
+ try {
+ ScopedDbConnection conn(_configServerConnectionString, 30);
- tagDoc = conn->findOne(TagsType::ConfigNS, query);
- conn.done();
- }
- catch (const DBException& ex) {
- return ex.toStatus();
- }
+ Query query(BSON(TagsType::ns(collectionNs)
+ << TagsType::min() << BSON("$lte" << chunk.getMin()) << TagsType::max()
+ << BSON("$gte" << chunk.getMax())));
- if (tagDoc.isEmpty()) {
- return std::string("");
- }
+ tagDoc = conn->findOne(TagsType::ConfigNS, query);
+ conn.done();
+ } catch (const DBException& ex) {
+ return ex.toStatus();
+ }
- auto status = TagsType::fromBSON(tagDoc);
- if (status.isOK()) {
- return status.getValue().getTag();
- }
+ if (tagDoc.isEmpty()) {
+ return std::string("");
+ }
- return status.getStatus();
+ auto status = TagsType::fromBSON(tagDoc);
+ if (status.isOK()) {
+ return status.getValue().getTag();
}
- Status CatalogManagerLegacy::getAllShards(vector<ShardType>* shards) {
- ScopedDbConnection conn(_configServerConnectionString, 30.0);
- std::unique_ptr<DBClientCursor> cursor(_safeCursor(conn->query(ShardType::ConfigNS,
- BSONObj())));
- while (cursor->more()) {
- BSONObj shardObj = cursor->nextSafe();
+ return status.getStatus();
+}
- StatusWith<ShardType> shardRes = ShardType::fromBSON(shardObj);
- if (!shardRes.isOK()) {
- shards->clear();
- conn.done();
- return Status(ErrorCodes::FailedToParse,
- str::stream() << "Failed to parse shard with id ("
- << shardObj[ShardType::name()].toString() << "): "
- << shardRes.getStatus().reason());
- }
+Status CatalogManagerLegacy::getAllShards(vector<ShardType>* shards) {
+ ScopedDbConnection conn(_configServerConnectionString, 30.0);
+ std::unique_ptr<DBClientCursor> cursor(
+ _safeCursor(conn->query(ShardType::ConfigNS, BSONObj())));
+ while (cursor->more()) {
+ BSONObj shardObj = cursor->nextSafe();
- shards->push_back(shardRes.getValue());
+ StatusWith<ShardType> shardRes = ShardType::fromBSON(shardObj);
+ if (!shardRes.isOK()) {
+ shards->clear();
+ conn.done();
+ return Status(ErrorCodes::FailedToParse,
+ str::stream() << "Failed to parse shard with id ("
+ << shardObj[ShardType::name()].toString()
+ << "): " << shardRes.getStatus().reason());
}
- conn.done();
- return Status::OK();
+ shards->push_back(shardRes.getValue());
}
-
- bool CatalogManagerLegacy::isShardHost(const ConnectionString& connectionString) {
- return _getShardCount(BSON(ShardType::host(connectionString.toString())));
+ conn.done();
+
+ return Status::OK();
+}
+
+bool CatalogManagerLegacy::isShardHost(const ConnectionString& connectionString) {
+ return _getShardCount(BSON(ShardType::host(connectionString.toString())));
+}
+
+bool CatalogManagerLegacy::runUserManagementWriteCommand(const string& commandName,
+ const string& dbname,
+ const BSONObj& cmdObj,
+ BSONObjBuilder* result) {
+ DBClientMultiCommand dispatcher;
+ RawBSONSerializable requestCmdSerial(cmdObj);
+ for (const ConnectionString& configServer : _configServers) {
+ dispatcher.addCommand(configServer, dbname, requestCmdSerial);
}
- bool CatalogManagerLegacy::runUserManagementWriteCommand(const string& commandName,
- const string& dbname,
- const BSONObj& cmdObj,
- BSONObjBuilder* result) {
- DBClientMultiCommand dispatcher;
- RawBSONSerializable requestCmdSerial(cmdObj);
- for (const ConnectionString& configServer : _configServers) {
- dispatcher.addCommand(configServer, dbname, requestCmdSerial);
- }
-
- auto scopedDistLock = getDistLockManager()->lock("authorizationData",
- commandName,
- Seconds{5});
- if (!scopedDistLock.isOK()) {
- return Command::appendCommandStatus(*result, scopedDistLock.getStatus());
- }
-
- dispatcher.sendAll();
+ auto scopedDistLock = getDistLockManager()->lock("authorizationData", commandName, Seconds{5});
+ if (!scopedDistLock.isOK()) {
+ return Command::appendCommandStatus(*result, scopedDistLock.getStatus());
+ }
- BSONObj responseObj;
+ dispatcher.sendAll();
- Status prevStatus{Status::OK()};
- Status currStatus{Status::OK()};
+ BSONObj responseObj;
- BSONObjBuilder responses;
- unsigned failedCount = 0;
- bool sameError = true;
- while (dispatcher.numPending() > 0) {
- ConnectionString host;
- RawBSONSerializable responseCmdSerial;
+ Status prevStatus{Status::OK()};
+ Status currStatus{Status::OK()};
- Status dispatchStatus = dispatcher.recvAny(&host,
- &responseCmdSerial);
+ BSONObjBuilder responses;
+ unsigned failedCount = 0;
+ bool sameError = true;
+ while (dispatcher.numPending() > 0) {
+ ConnectionString host;
+ RawBSONSerializable responseCmdSerial;
- if (!dispatchStatus.isOK()) {
- return Command::appendCommandStatus(*result, dispatchStatus);
- }
+ Status dispatchStatus = dispatcher.recvAny(&host, &responseCmdSerial);
- responseObj = responseCmdSerial.toBSON();
- responses.append(host.toString(), responseObj);
-
- currStatus = Command::getStatusFromCommandResult(responseObj);
- if (!currStatus.isOK()) {
- // same error <=> adjacent error statuses are the same
- if (failedCount > 0 && prevStatus != currStatus) {
- sameError = false;
- }
- failedCount++;
- prevStatus = currStatus;
- }
+ if (!dispatchStatus.isOK()) {
+ return Command::appendCommandStatus(*result, dispatchStatus);
}
- if (failedCount == 0) {
- result->appendElements(responseObj);
- return true;
- }
+ responseObj = responseCmdSerial.toBSON();
+ responses.append(host.toString(), responseObj);
- // if the command succeeds on at least one config server and fails on at least one,
- // manual intervention is required
- if (failedCount < _configServers.size()) {
- Status status(ErrorCodes::ManualInterventionRequired,
- str::stream() << "Config write was not consistent - "
- << "user management command failed on at least one "
- << "config server but passed on at least one other. "
- << "Manual intervention may be required. "
- << "Config responses: " << responses.obj().toString());
- return Command::appendCommandStatus(*result, status);
+ currStatus = Command::getStatusFromCommandResult(responseObj);
+ if (!currStatus.isOK()) {
+ // same error <=> adjacent error statuses are the same
+ if (failedCount > 0 && prevStatus != currStatus) {
+ sameError = false;
+ }
+ failedCount++;
+ prevStatus = currStatus;
}
+ }
- if (sameError) {
- result->appendElements(responseObj);
- return false;
- }
+ if (failedCount == 0) {
+ result->appendElements(responseObj);
+ return true;
+ }
+ // if the command succeeds on at least one config server and fails on at least one,
+ // manual intervention is required
+ if (failedCount < _configServers.size()) {
Status status(ErrorCodes::ManualInterventionRequired,
str::stream() << "Config write was not consistent - "
- << "user management command produced inconsistent results. "
+ << "user management command failed on at least one "
+ << "config server but passed on at least one other. "
<< "Manual intervention may be required. "
<< "Config responses: " << responses.obj().toString());
return Command::appendCommandStatus(*result, status);
}
- bool CatalogManagerLegacy::runUserManagementReadCommand(const string& dbname,
- const BSONObj& cmdObj,
- BSONObjBuilder* result) {
- try {
- // let SyncClusterConnection handle connecting to the first config server
- // that is reachable and returns data
- ScopedDbConnection conn(_configServerConnectionString, 30);
-
- BSONObj cmdResult;
- const bool ok = conn->runCommand(dbname, cmdObj, cmdResult);
- result->appendElements(cmdResult);
- conn.done();
- return ok;
- }
- catch (const DBException& ex) {
- return Command::appendCommandStatus(*result, ex.toStatus());
- }
+ if (sameError) {
+ result->appendElements(responseObj);
+ return false;
}
- Status CatalogManagerLegacy::applyChunkOpsDeprecated(const BSONArray& updateOps,
- const BSONArray& preCondition) {
- BSONObj cmd = BSON("applyOps" << updateOps <<
- "preCondition" << preCondition);
- bool ok;
+ Status status(ErrorCodes::ManualInterventionRequired,
+ str::stream() << "Config write was not consistent - "
+ << "user management command produced inconsistent results. "
+ << "Manual intervention may be required. "
+ << "Config responses: " << responses.obj().toString());
+ return Command::appendCommandStatus(*result, status);
+}
+
+bool CatalogManagerLegacy::runUserManagementReadCommand(const string& dbname,
+ const BSONObj& cmdObj,
+ BSONObjBuilder* result) {
+ try {
+ // let SyncClusterConnection handle connecting to the first config server
+ // that is reachable and returns data
+ ScopedDbConnection conn(_configServerConnectionString, 30);
+
BSONObj cmdResult;
- try {
- ScopedDbConnection conn(_configServerConnectionString, 30);
- ok = conn->runCommand("config", cmd, cmdResult);
- conn.done();
- }
- catch (const DBException& ex) {
- return ex.toStatus();
- }
+ const bool ok = conn->runCommand(dbname, cmdObj, cmdResult);
+ result->appendElements(cmdResult);
+ conn.done();
+ return ok;
+ } catch (const DBException& ex) {
+ return Command::appendCommandStatus(*result, ex.toStatus());
+ }
+}
+
+Status CatalogManagerLegacy::applyChunkOpsDeprecated(const BSONArray& updateOps,
+ const BSONArray& preCondition) {
+ BSONObj cmd = BSON("applyOps" << updateOps << "preCondition" << preCondition);
+ bool ok;
+ BSONObj cmdResult;
+ try {
+ ScopedDbConnection conn(_configServerConnectionString, 30);
+ ok = conn->runCommand("config", cmd, cmdResult);
+ conn.done();
+ } catch (const DBException& ex) {
+ return ex.toStatus();
+ }
- if (!ok) {
- string errMsg(str::stream() << "Unable to save chunk ops. Command: "
- << cmd << ". Result: " << cmdResult);
+ if (!ok) {
+ string errMsg(str::stream() << "Unable to save chunk ops. Command: " << cmd
+ << ". Result: " << cmdResult);
- return Status(ErrorCodes::OperationFailed, errMsg);
- }
+ return Status(ErrorCodes::OperationFailed, errMsg);
+ }
- return Status::OK();
+ return Status::OK();
+}
+
+void CatalogManagerLegacy::writeConfigServerDirect(const BatchedCommandRequest& request,
+ BatchedCommandResponse* response) {
+ // check if config servers are consistent
+ if (!_isConsistentFromLastCheck()) {
+ toBatchError(Status(ErrorCodes::ConfigServersInconsistent,
+ "Data inconsistency detected amongst config servers"),
+ response);
+ return;
}
- void CatalogManagerLegacy::writeConfigServerDirect(const BatchedCommandRequest& request,
- BatchedCommandResponse* response) {
+ // We only support batch sizes of one for config writes
+ if (request.sizeWriteOps() != 1) {
+ toBatchError(Status(ErrorCodes::InvalidOptions,
+ str::stream() << "Writes to config servers must have batch size of 1, "
+ << "found " << request.sizeWriteOps()),
+ response);
- // check if config servers are consistent
- if (!_isConsistentFromLastCheck()) {
- toBatchError(
- Status(ErrorCodes::ConfigServersInconsistent,
- "Data inconsistency detected amongst config servers"),
- response);
- return;
- }
+ return;
+ }
- // We only support batch sizes of one for config writes
- if (request.sizeWriteOps() != 1) {
- toBatchError(
- Status(ErrorCodes::InvalidOptions,
- str::stream() << "Writes to config servers must have batch size of 1, "
- << "found " << request.sizeWriteOps()),
- response);
+ // We only support {w: 0}, {w: 1}, and {w: 'majority'} write concern for config writes
+ if (request.isWriteConcernSet() && !validConfigWC(request.getWriteConcern())) {
+ toBatchError(Status(ErrorCodes::InvalidOptions,
+ str::stream() << "Invalid write concern for write to "
+ << "config servers: " << request.getWriteConcern()),
+ response);
- return;
- }
+ return;
+ }
- // We only support {w: 0}, {w: 1}, and {w: 'majority'} write concern for config writes
- if (request.isWriteConcernSet() && !validConfigWC(request.getWriteConcern())) {
+ DBClientMultiCommand dispatcher;
+ if (_configServers.size() > 1) {
+ // We can't support no-_id upserts to multiple config servers - the _ids will differ
+ if (BatchedCommandRequest::containsNoIDUpsert(request)) {
toBatchError(
Status(ErrorCodes::InvalidOptions,
- str::stream() << "Invalid write concern for write to "
- << "config servers: " << request.getWriteConcern()),
+ str::stream() << "upserts to multiple config servers must include _id"),
response);
-
return;
}
-
- DBClientMultiCommand dispatcher;
- if (_configServers.size() > 1) {
- // We can't support no-_id upserts to multiple config servers - the _ids will differ
- if (BatchedCommandRequest::containsNoIDUpsert(request)) {
- toBatchError(
- Status(ErrorCodes::InvalidOptions,
- str::stream() << "upserts to multiple config servers must include _id"),
- response);
- return;
- }
- }
-
- ConfigCoordinator exec(&dispatcher, _configServerConnectionString);
- exec.executeBatch(request, response);
}
- Status CatalogManagerLegacy::_checkDbDoesNotExist(const std::string& dbName) const {
- ScopedDbConnection conn(_configServerConnectionString, 30);
+ ConfigCoordinator exec(&dispatcher, _configServerConnectionString);
+ exec.executeBatch(request, response);
+}
- BSONObjBuilder b;
- b.appendRegex(DatabaseType::name(),
- (string)"^" + pcrecpp::RE::QuoteMeta(dbName) + "$", "i");
+Status CatalogManagerLegacy::_checkDbDoesNotExist(const std::string& dbName) const {
+ ScopedDbConnection conn(_configServerConnectionString, 30);
- BSONObj dbObj = conn->findOne(DatabaseType::ConfigNS, b.obj());
- conn.done();
+ BSONObjBuilder b;
+ b.appendRegex(DatabaseType::name(), (string) "^" + pcrecpp::RE::QuoteMeta(dbName) + "$", "i");
- // If our name is exactly the same as the name we want, try loading
- // the database again.
- if (!dbObj.isEmpty() && dbObj[DatabaseType::name()].String() == dbName) {
- return Status(ErrorCodes::NamespaceExists,
- str::stream() << "database " << dbName << " already exists");
- }
+ BSONObj dbObj = conn->findOne(DatabaseType::ConfigNS, b.obj());
+ conn.done();
- if (!dbObj.isEmpty()) {
- return Status(ErrorCodes::DatabaseDifferCase,
- str::stream() << "can't have 2 databases that just differ on case "
- << " have: " << dbObj[DatabaseType::name()].String()
- << " want to add: " << dbName);
- }
+ // If our name is exactly the same as the name we want, try loading
+ // the database again.
+ if (!dbObj.isEmpty() && dbObj[DatabaseType::name()].String() == dbName) {
+ return Status(ErrorCodes::NamespaceExists,
+ str::stream() << "database " << dbName << " already exists");
+ }
- return Status::OK();
+ if (!dbObj.isEmpty()) {
+ return Status(ErrorCodes::DatabaseDifferCase,
+ str::stream() << "can't have 2 databases that just differ on case "
+ << " have: " << dbObj[DatabaseType::name()].String()
+ << " want to add: " << dbName);
}
- StatusWith<string> CatalogManagerLegacy::_getNewShardName() const {
- BSONObj o;
- {
- ScopedDbConnection conn(_configServerConnectionString, 30);
- o = conn->findOne(ShardType::ConfigNS,
- Query(fromjson("{" + ShardType::name() + ": /^shard/}"))
- .sort(BSON(ShardType::name() << -1 )));
- conn.done();
- }
+ return Status::OK();
+}
- int count = 0;
- if (!o.isEmpty()) {
- string last = o[ShardType::name()].String();
- std::istringstream is(last.substr(5));
- is >> count;
- count++;
- }
+StatusWith<string> CatalogManagerLegacy::_getNewShardName() const {
+ BSONObj o;
+ {
+ ScopedDbConnection conn(_configServerConnectionString, 30);
+ o = conn->findOne(ShardType::ConfigNS,
+ Query(fromjson("{" + ShardType::name() + ": /^shard/}"))
+ .sort(BSON(ShardType::name() << -1)));
+ conn.done();
+ }
- // TODO fix so that we can have more than 10000 automatically generated shard names
- if (count < 9999) {
- std::stringstream ss;
- ss << "shard" << std::setfill('0') << std::setw(4) << count;
- return ss.str();
- }
+ int count = 0;
+ if (!o.isEmpty()) {
+ string last = o[ShardType::name()].String();
+ std::istringstream is(last.substr(5));
+ is >> count;
+ count++;
+ }
- return Status(ErrorCodes::OperationFailed,
- "unable to generate new shard name");
+ // TODO fix so that we can have more than 10000 automatically generated shard names
+ if (count < 9999) {
+ std::stringstream ss;
+ ss << "shard" << std::setfill('0') << std::setw(4) << count;
+ return ss.str();
}
- size_t CatalogManagerLegacy::_getShardCount(const BSONObj& query) const {
- ScopedDbConnection conn(_configServerConnectionString, 30.0);
- long long shardCount = conn->count(ShardType::ConfigNS, query);
- conn.done();
+ return Status(ErrorCodes::OperationFailed, "unable to generate new shard name");
+}
- return shardCount;
- }
+size_t CatalogManagerLegacy::_getShardCount(const BSONObj& query) const {
+ ScopedDbConnection conn(_configServerConnectionString, 30.0);
+ long long shardCount = conn->count(ShardType::ConfigNS, query);
+ conn.done();
- DistLockManager* CatalogManagerLegacy::getDistLockManager() {
- invariant(_distLockManager);
- return _distLockManager.get();
- }
+ return shardCount;
+}
- bool CatalogManagerLegacy::_checkConfigServersConsistent(const unsigned tries) const {
- if (tries <= 0)
- return false;
+DistLockManager* CatalogManagerLegacy::getDistLockManager() {
+ invariant(_distLockManager);
+ return _distLockManager.get();
+}
- unsigned firstGood = 0;
- int up = 0;
- vector<BSONObj> res;
+bool CatalogManagerLegacy::_checkConfigServersConsistent(const unsigned tries) const {
+ if (tries <= 0)
+ return false;
- // The last error we saw on a config server
- string errMsg;
+ unsigned firstGood = 0;
+ int up = 0;
+ vector<BSONObj> res;
- for (unsigned i = 0; i < _configServers.size(); i++) {
- BSONObj result;
- std::unique_ptr<ScopedDbConnection> conn;
-
- try {
- conn.reset(new ScopedDbConnection(_configServers[i], 30.0));
-
- if (!conn->get()->runCommand("config",
- BSON("dbhash" << 1 <<
- "collections" << BSON_ARRAY("chunks" <<
- "databases" <<
- "collections" <<
- "shards" <<
- "version")),
- result)) {
-
- errMsg = result["errmsg"].eoo() ? "" : result["errmsg"].String();
- if (!result["assertion"].eoo()) errMsg = result["assertion"].String();
-
- warning() << "couldn't check dbhash on config server " << _configServers[i]
- << causedBy(result.toString());
-
- result = BSONObj();
- }
- else {
- result = result.getOwned();
- if (up == 0)
- firstGood = i;
- up++;
- }
- conn->done();
- }
- catch (const DBException& e) {
- if (conn) {
- conn->kill();
- }
+ // The last error we saw on a config server
+ string errMsg;
- // We need to catch DBExceptions b/c sometimes we throw them
- // instead of socket exceptions when findN fails
+ for (unsigned i = 0; i < _configServers.size(); i++) {
+ BSONObj result;
+ std::unique_ptr<ScopedDbConnection> conn;
- errMsg = e.toString();
- warning() << " couldn't check dbhash on config server "
- << _configServers[i] << causedBy(e);
+ try {
+ conn.reset(new ScopedDbConnection(_configServers[i], 30.0));
+
+ if (!conn->get()->runCommand(
+ "config",
+ BSON("dbhash" << 1 << "collections" << BSON_ARRAY("chunks"
+ << "databases"
+ << "collections"
+ << "shards"
+ << "version")),
+ result)) {
+ errMsg = result["errmsg"].eoo() ? "" : result["errmsg"].String();
+ if (!result["assertion"].eoo())
+ errMsg = result["assertion"].String();
+
+ warning() << "couldn't check dbhash on config server " << _configServers[i]
+ << causedBy(result.toString());
+
+ result = BSONObj();
+ } else {
+ result = result.getOwned();
+ if (up == 0)
+ firstGood = i;
+ up++;
+ }
+ conn->done();
+ } catch (const DBException& e) {
+ if (conn) {
+ conn->kill();
}
- res.push_back(result);
- }
- if (_configServers.size() == 1)
- return true;
+ // We need to catch DBExceptions b/c sometimes we throw them
+ // instead of socket exceptions when findN fails
- if (up == 0) {
- // Use a ptr to error so if empty we won't add causedby
- error() << "no config servers successfully contacted" << causedBy(&errMsg);
- return false;
- }
- else if (up == 1) {
- warning() << "only 1 config server reachable, continuing";
- return true;
+ errMsg = e.toString();
+ warning() << " couldn't check dbhash on config server " << _configServers[i]
+ << causedBy(e);
}
+ res.push_back(result);
+ }
- BSONObj base = res[firstGood];
- for (unsigned i = firstGood+1; i < res.size(); i++) {
- if (res[i].isEmpty())
- continue;
+ if (_configServers.size() == 1)
+ return true;
- string chunksHash1 = base.getFieldDotted("collections.chunks");
- string chunksHash2 = res[i].getFieldDotted("collections.chunks");
+ if (up == 0) {
+ // Use a ptr to error so if empty we won't add causedby
+ error() << "no config servers successfully contacted" << causedBy(&errMsg);
+ return false;
+ } else if (up == 1) {
+ warning() << "only 1 config server reachable, continuing";
+ return true;
+ }
- string databaseHash1 = base.getFieldDotted("collections.databases");
- string databaseHash2 = res[i].getFieldDotted("collections.databases");
+ BSONObj base = res[firstGood];
+ for (unsigned i = firstGood + 1; i < res.size(); i++) {
+ if (res[i].isEmpty())
+ continue;
- string collectionsHash1 = base.getFieldDotted("collections.collections");
- string collectionsHash2 = res[i].getFieldDotted("collections.collections");
+ string chunksHash1 = base.getFieldDotted("collections.chunks");
+ string chunksHash2 = res[i].getFieldDotted("collections.chunks");
- string shardHash1 = base.getFieldDotted("collections.shards");
- string shardHash2 = res[i].getFieldDotted("collections.shards");
+ string databaseHash1 = base.getFieldDotted("collections.databases");
+ string databaseHash2 = res[i].getFieldDotted("collections.databases");
- string versionHash1 = base.getFieldDotted("collections.version") ;
- string versionHash2 = res[i].getFieldDotted("collections.version");
+ string collectionsHash1 = base.getFieldDotted("collections.collections");
+ string collectionsHash2 = res[i].getFieldDotted("collections.collections");
- if (chunksHash1 == chunksHash2 &&
- databaseHash1 == databaseHash2 &&
- collectionsHash1 == collectionsHash2 &&
- shardHash1 == shardHash2 &&
- versionHash1 == versionHash2) {
- continue;
- }
+ string shardHash1 = base.getFieldDotted("collections.shards");
+ string shardHash2 = res[i].getFieldDotted("collections.shards");
- warning() << "config servers " << _configServers[firstGood].toString()
- << " and " << _configServers[i].toString() << " differ";
- if (tries <= 1) {
- error() << ": " << base["collections"].Obj()
- << " vs " << res[i]["collections"].Obj();
- return false;
- }
+ string versionHash1 = base.getFieldDotted("collections.version");
+ string versionHash2 = res[i].getFieldDotted("collections.version");
- return _checkConfigServersConsistent(tries - 1);
+ if (chunksHash1 == chunksHash2 && databaseHash1 == databaseHash2 &&
+ collectionsHash1 == collectionsHash2 && shardHash1 == shardHash2 &&
+ versionHash1 == versionHash2) {
+ continue;
}
- return true;
+ warning() << "config servers " << _configServers[firstGood].toString() << " and "
+ << _configServers[i].toString() << " differ";
+ if (tries <= 1) {
+ error() << ": " << base["collections"].Obj() << " vs " << res[i]["collections"].Obj();
+ return false;
+ }
+
+ return _checkConfigServersConsistent(tries - 1);
}
- void CatalogManagerLegacy::_consistencyChecker() {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- while (!_inShutdown) {
- lk.unlock();
- const bool isConsistent = _checkConfigServersConsistent();
+ return true;
+}
- lk.lock();
- _consistentFromLastCheck = isConsistent;
- if (_inShutdown) break;
- _consistencyCheckerCV.wait_for(lk, Seconds(60));
- }
- LOG(1) << "Consistency checker thread shutting down";
- }
+void CatalogManagerLegacy::_consistencyChecker() {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ while (!_inShutdown) {
+ lk.unlock();
+ const bool isConsistent = _checkConfigServersConsistent();
- bool CatalogManagerLegacy::_isConsistentFromLastCheck() {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- return _consistentFromLastCheck;
+ lk.lock();
+ _consistentFromLastCheck = isConsistent;
+ if (_inShutdown)
+ break;
+ _consistencyCheckerCV.wait_for(lk, Seconds(60));
}
+ LOG(1) << "Consistency checker thread shutting down";
+}
+
+bool CatalogManagerLegacy::_isConsistentFromLastCheck() {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ return _consistentFromLastCheck;
+}
-} // namespace mongo
+} // namespace mongo
diff --git a/src/mongo/s/catalog/legacy/catalog_manager_legacy.h b/src/mongo/s/catalog/legacy/catalog_manager_legacy.h
index aff0ce6d185..5a762a1aebc 100644
--- a/src/mongo/s/catalog/legacy/catalog_manager_legacy.h
+++ b/src/mongo/s/catalog/legacy/catalog_manager_legacy.h
@@ -39,173 +39,170 @@
namespace mongo {
+/**
+ * Implements the catalog manager using the legacy 3-config server protocol.
+ */
+class CatalogManagerLegacy final : public CatalogManager {
+public:
+ CatalogManagerLegacy();
+ ~CatalogManagerLegacy();
+
/**
- * Implements the catalog manager using the legacy 3-config server protocol.
+ * Initializes the catalog manager with the hosts, which will be used as a configuration
+ * server. Can only be called once for the lifetime.
*/
- class CatalogManagerLegacy final : public CatalogManager {
- public:
- CatalogManagerLegacy();
- ~CatalogManagerLegacy();
+ Status init(const ConnectionString& configCS);
- /**
- * Initializes the catalog manager with the hosts, which will be used as a configuration
- * server. Can only be called once for the lifetime.
- */
- Status init(const ConnectionString& configCS);
+ Status startup(bool upgrade) override;
- Status startup(bool upgrade) override;
+ ConnectionString connectionString() const override;
- ConnectionString connectionString() const override;
+ void shutDown() override;
- void shutDown() override;
+ Status enableSharding(const std::string& dbName) override;
- Status enableSharding(const std::string& dbName) override;
+ Status shardCollection(const std::string& ns,
+ const ShardKeyPattern& fieldsAndOrder,
+ bool unique,
+ std::vector<BSONObj>* initPoints,
+ std::set<ShardId>* initShardIds) override;
- Status shardCollection(const std::string& ns,
- const ShardKeyPattern& fieldsAndOrder,
- bool unique,
- std::vector<BSONObj>* initPoints,
- std::set<ShardId>* initShardIds) override;
+ StatusWith<std::string> addShard(const std::string& name,
+ const ConnectionString& shardConnectionString,
+ const long long maxSize) override;
- StatusWith<std::string> addShard(const std::string& name,
- const ConnectionString& shardConnectionString,
- const long long maxSize) override;
+ StatusWith<ShardDrainingStatus> removeShard(OperationContext* txn,
+ const std::string& name) override;
- StatusWith<ShardDrainingStatus> removeShard(OperationContext* txn,
- const std::string& name) override;
+ Status createDatabase(const std::string& dbName) override;
- Status createDatabase(const std::string& dbName) override;
+ Status updateDatabase(const std::string& dbName, const DatabaseType& db) override;
- Status updateDatabase(const std::string& dbName, const DatabaseType& db) override;
+ StatusWith<DatabaseType> getDatabase(const std::string& dbName) override;
- StatusWith<DatabaseType> getDatabase(const std::string& dbName) override;
+ Status updateCollection(const std::string& collNs, const CollectionType& coll) override;
- Status updateCollection(const std::string& collNs, const CollectionType& coll) override;
+ StatusWith<CollectionType> getCollection(const std::string& collNs) override;
- StatusWith<CollectionType> getCollection(const std::string& collNs) override;
+ Status getCollections(const std::string* dbName, std::vector<CollectionType>* collections);
- Status getCollections(const std::string* dbName, std::vector<CollectionType>* collections);
+ Status dropCollection(const std::string& collectionNs);
- Status dropCollection(const std::string& collectionNs);
+ Status getDatabasesForShard(const std::string& shardName,
+ std::vector<std::string>* dbs) override;
- Status getDatabasesForShard(const std::string& shardName,
- std::vector<std::string>* dbs) override;
+ Status getChunks(const Query& query, int nToReturn, std::vector<ChunkType>* chunks) override;
- Status getChunks(const Query& query,
- int nToReturn,
- std::vector<ChunkType>* chunks) override;
+ Status getTagsForCollection(const std::string& collectionNs,
+ std::vector<TagsType>* tags) override;
- Status getTagsForCollection(const std::string& collectionNs,
- std::vector<TagsType>* tags) override;
+ StatusWith<std::string> getTagForChunk(const std::string& collectionNs,
+ const ChunkType& chunk) override;
- StatusWith<std::string> getTagForChunk(const std::string& collectionNs,
- const ChunkType& chunk) override;
+ Status getAllShards(std::vector<ShardType>* shards) override;
- Status getAllShards(std::vector<ShardType>* shards) override;
+ bool isShardHost(const ConnectionString& shardConnectionString) override;
- bool isShardHost(const ConnectionString& shardConnectionString) override;
-
- /**
- * Grabs a distributed lock and runs the command on all config servers.
- */
- bool runUserManagementWriteCommand(const std::string& commandName,
- const std::string& dbname,
- const BSONObj& cmdObj,
- BSONObjBuilder* result) override;
+ /**
+ * Grabs a distributed lock and runs the command on all config servers.
+ */
+ bool runUserManagementWriteCommand(const std::string& commandName,
+ const std::string& dbname,
+ const BSONObj& cmdObj,
+ BSONObjBuilder* result) override;
- bool runUserManagementReadCommand(const std::string& dbname,
- const BSONObj& cmdObj,
- BSONObjBuilder* result) override;
+ bool runUserManagementReadCommand(const std::string& dbname,
+ const BSONObj& cmdObj,
+ BSONObjBuilder* result) override;
- Status applyChunkOpsDeprecated(const BSONArray& updateOps,
- const BSONArray& preCondition) override;
+ Status applyChunkOpsDeprecated(const BSONArray& updateOps,
+ const BSONArray& preCondition) override;
- void logAction(const ActionLogType& actionLog);
+ void logAction(const ActionLogType& actionLog);
- void logChange(OperationContext* txn,
- const std::string& what,
- const std::string& ns,
- const BSONObj& detail) override;
+ void logChange(OperationContext* txn,
+ const std::string& what,
+ const std::string& ns,
+ const BSONObj& detail) override;
- StatusWith<SettingsType> getGlobalSettings(const std::string& key) override;
+ StatusWith<SettingsType> getGlobalSettings(const std::string& key) override;
- void writeConfigServerDirect(const BatchedCommandRequest& request,
- BatchedCommandResponse* response) override;
+ void writeConfigServerDirect(const BatchedCommandRequest& request,
+ BatchedCommandResponse* response) override;
- DistLockManager* getDistLockManager() override;
+ DistLockManager* getDistLockManager() override;
- private:
+private:
+ /**
+ * Updates the config server's metadata to the current version.
+ */
+ Status _checkAndUpgradeConfigMetadata(bool doUpgrade);
- /**
- * Updates the config server's metadata to the current version.
- */
- Status _checkAndUpgradeConfigMetadata(bool doUpgrade);
+ /**
+ * Starts the thread that periodically checks data consistency amongst the config servers.
+ * Note: this is not thread safe and can only be called once for the lifetime.
+ */
+ Status _startConfigServerChecker();
- /**
- * Starts the thread that periodically checks data consistency amongst the config servers.
- * Note: this is not thread safe and can only be called once for the lifetime.
- */
- Status _startConfigServerChecker();
+ /**
+ * Direct network check to see if a particular database does not already exist with the
+ * same name or different case.
+ */
+ Status _checkDbDoesNotExist(const std::string& dbName) const;
- /**
- * Direct network check to see if a particular database does not already exist with the
- * same name or different case.
- */
- Status _checkDbDoesNotExist(const std::string& dbName) const;
+ /**
+ * Generates a new shard name "shard<xxxx>"
+ * where <xxxx> is an autoincrementing value and <xxxx> < 10000
+ */
+ StatusWith<std::string> _getNewShardName() const;
- /**
- * Generates a new shard name "shard<xxxx>"
- * where <xxxx> is an autoincrementing value and <xxxx> < 10000
- */
- StatusWith<std::string> _getNewShardName() const;
+ /**
+ * Returns the number of shards recognized by the config servers
+ * in this sharded cluster.
+ * Optional: use query parameter to filter shard count.
+ */
+ size_t _getShardCount(const BSONObj& query = {}) const;
- /**
- * Returns the number of shards recognized by the config servers
- * in this sharded cluster.
- * Optional: use query parameter to filter shard count.
- */
- size_t _getShardCount(const BSONObj& query = {}) const;
+ /**
+ * Returns true if all config servers have the same state.
+ * If inconsistency detected on first attempt, checks at most 3 more times.
+ */
+ bool _checkConfigServersConsistent(const unsigned tries = 4) const;
- /**
- * Returns true if all config servers have the same state.
- * If inconsistency detected on first attempt, checks at most 3 more times.
- */
- bool _checkConfigServersConsistent(const unsigned tries = 4) const;
+ /**
+ * Checks data consistency amongst config servers every 60 seconds.
+ */
+ void _consistencyChecker();
- /**
- * Checks data consistency amongst config servers every 60 seconds.
- */
- void _consistencyChecker();
+ /**
+ * Returns true if the config servers have the same contents since the last
+ * check was performed.
+ */
+ bool _isConsistentFromLastCheck();
- /**
- * Returns true if the config servers have the same contents since the last
- * check was performed.
- */
- bool _isConsistentFromLastCheck();
+ // Parsed config server hosts, as specified on the command line.
+ ConnectionString _configServerConnectionString;
+ std::vector<ConnectionString> _configServers;
- // Parsed config server hosts, as specified on the command line.
- ConnectionString _configServerConnectionString;
- std::vector<ConnectionString> _configServers;
+ // Distribted lock manager singleton.
+ std::unique_ptr<DistLockManager> _distLockManager;
- // Distribted lock manager singleton.
- std::unique_ptr<DistLockManager> _distLockManager;
+ // protects _inShutdown, _consistentFromLastCheck; used by _consistencyCheckerCV
+ stdx::mutex _mutex;
- // protects _inShutdown, _consistentFromLastCheck; used by _consistencyCheckerCV
- stdx::mutex _mutex;
+ // True if CatalogManagerLegacy::shutDown has been called. False, otherwise.
+ bool _inShutdown = false;
- // True if CatalogManagerLegacy::shutDown has been called. False, otherwise.
- bool _inShutdown = false;
+ // used by consistency checker thread to check if config
+ // servers are consistent
+ bool _consistentFromLastCheck = false;
- // used by consistency checker thread to check if config
- // servers are consistent
- bool _consistentFromLastCheck = false;
+ // Thread that runs dbHash on config servers for checking data consistency.
+ stdx::thread _consistencyCheckerThread;
- // Thread that runs dbHash on config servers for checking data consistency.
- stdx::thread _consistencyCheckerThread;
+ // condition variable used by the consistency checker thread to wait
+ // for <= 60s, on every iteration, until shutDown is called
+ stdx::condition_variable _consistencyCheckerCV;
+};
- // condition variable used by the consistency checker thread to wait
- // for <= 60s, on every iteration, until shutDown is called
- stdx::condition_variable _consistencyCheckerCV;
- };
-
-} // namespace mongo
+} // namespace mongo
diff --git a/src/mongo/s/catalog/legacy/cluster_client_internal.cpp b/src/mongo/s/catalog/legacy/cluster_client_internal.cpp
index 5af263b5e56..c8b60c9c9dc 100644
--- a/src/mongo/s/catalog/legacy/cluster_client_internal.cpp
+++ b/src/mongo/s/catalog/legacy/cluster_client_internal.cpp
@@ -44,188 +44,175 @@
namespace mongo {
- using std::endl;
- using std::string;
- using std::unique_ptr;
- using std::vector;
- using mongoutils::str::stream;
-
- Status checkClusterMongoVersions(CatalogManager* catalogManager,
- const string& minMongoVersion) {
-
- unique_ptr<ScopedDbConnection> connPtr;
-
- //
- // Find mongos pings in config server
- //
-
- try {
- connPtr.reset(new ScopedDbConnection(catalogManager->connectionString(), 30));
- ScopedDbConnection& conn = *connPtr;
- unique_ptr<DBClientCursor> cursor(_safeCursor(conn->query(MongosType::ConfigNS,
- Query())));
-
- while (cursor->more()) {
-
- BSONObj pingDoc = cursor->next();
-
- MongosType ping;
- string errMsg;
- // NOTE: We don't care if the ping is invalid, legacy stuff will be
- if (!ping.parseBSON(pingDoc, &errMsg)) {
- warning() << "could not parse ping document: " << pingDoc << causedBy(errMsg)
- << endl;
- continue;
- }
+using std::endl;
+using std::string;
+using std::unique_ptr;
+using std::vector;
+using mongoutils::str::stream;
+
+Status checkClusterMongoVersions(CatalogManager* catalogManager, const string& minMongoVersion) {
+ unique_ptr<ScopedDbConnection> connPtr;
+
+ //
+ // Find mongos pings in config server
+ //
+
+ try {
+ connPtr.reset(new ScopedDbConnection(catalogManager->connectionString(), 30));
+ ScopedDbConnection& conn = *connPtr;
+ unique_ptr<DBClientCursor> cursor(_safeCursor(conn->query(MongosType::ConfigNS, Query())));
+
+ while (cursor->more()) {
+ BSONObj pingDoc = cursor->next();
+
+ MongosType ping;
+ string errMsg;
+ // NOTE: We don't care if the ping is invalid, legacy stuff will be
+ if (!ping.parseBSON(pingDoc, &errMsg)) {
+ warning() << "could not parse ping document: " << pingDoc << causedBy(errMsg)
+ << endl;
+ continue;
+ }
- string mongoVersion = "2.0";
- // Hack to determine older mongos versions from ping format
- if (ping.isWaitingSet()) mongoVersion = "2.2";
- if (ping.isMongoVersionSet() && ping.getMongoVersion() != "") {
- mongoVersion = ping.getMongoVersion();
- }
+ string mongoVersion = "2.0";
+ // Hack to determine older mongos versions from ping format
+ if (ping.isWaitingSet())
+ mongoVersion = "2.2";
+ if (ping.isMongoVersionSet() && ping.getMongoVersion() != "") {
+ mongoVersion = ping.getMongoVersion();
+ }
- Date_t lastPing = ping.getPing();
+ Date_t lastPing = ping.getPing();
- Minutes quietIntervalMins{0};
- Date_t currentJsTime = jsTime();
- if (currentJsTime >= lastPing) {
- quietIntervalMins = duration_cast<Minutes>(currentJsTime - lastPing);
- }
+ Minutes quietIntervalMins{0};
+ Date_t currentJsTime = jsTime();
+ if (currentJsTime >= lastPing) {
+ quietIntervalMins = duration_cast<Minutes>(currentJsTime - lastPing);
+ }
- // We assume that anything that hasn't pinged in 5 minutes is probably down
- if (quietIntervalMins >= Minutes{5}) {
- log() << "stale mongos detected " << quietIntervalMins.count()
- << " minutes ago, network location is " << pingDoc["_id"].String()
- << ", not checking version";
- }
- else {
- if (versionCmp(mongoVersion, minMongoVersion) < 0) {
- return Status(ErrorCodes::RemoteValidationError,
- stream() << "version " << mongoVersion
- << " detected on mongos at "
- << ping.getName()
- << ", but version >= " << minMongoVersion
- << " required; you must wait 5 minutes "
- << "after shutting down a pre-" << minMongoVersion
- << " mongos");
- }
+ // We assume that anything that hasn't pinged in 5 minutes is probably down
+ if (quietIntervalMins >= Minutes{5}) {
+ log() << "stale mongos detected " << quietIntervalMins.count()
+ << " minutes ago, network location is " << pingDoc["_id"].String()
+ << ", not checking version";
+ } else {
+ if (versionCmp(mongoVersion, minMongoVersion) < 0) {
+ return Status(
+ ErrorCodes::RemoteValidationError,
+ stream() << "version " << mongoVersion << " detected on mongos at "
+ << ping.getName() << ", but version >= " << minMongoVersion
+ << " required; you must wait 5 minutes "
+ << "after shutting down a pre-" << minMongoVersion << " mongos");
}
}
}
- catch (const DBException& e) {
- return e.toStatus("could not read mongos pings collection");
- }
+ } catch (const DBException& e) {
+ return e.toStatus("could not read mongos pings collection");
+ }
- connPtr->done();
+ connPtr->done();
- //
- // Load shards from config server
- //
+ //
+ // Load shards from config server
+ //
- vector<HostAndPort> servers;
+ vector<HostAndPort> servers;
- try {
- vector<ShardType> shards;
- Status status = catalogManager->getAllShards(&shards);
+ try {
+ vector<ShardType> shards;
+ Status status = catalogManager->getAllShards(&shards);
+ if (!status.isOK()) {
+ return status;
+ }
+
+ for (const ShardType& shard : shards) {
+ Status status = shard.validate();
if (!status.isOK()) {
- return status;
+ return Status(ErrorCodes::UnsupportedFormat,
+ stream() << "shard " << shard.toBSON()
+ << " failed validation: " << causedBy(status));
}
- for (const ShardType& shard : shards) {
- Status status = shard.validate();
- if (!status.isOK()) {
- return Status(ErrorCodes::UnsupportedFormat,
- stream() << "shard " << shard.toBSON()
- << " failed validation: " << causedBy(status));
- }
-
- const auto shardConnStatus = ConnectionString::parse(shard.getHost());
- if (!shardConnStatus.isOK()) {
- return Status(ErrorCodes::UnsupportedFormat,
- stream() << "invalid shard host " << shard.getHost()
- << " read from the config server"
- << shardConnStatus.getStatus().toString());
- }
-
- vector<HostAndPort> shardServers = shardConnStatus.getValue().getServers();
- servers.insert(servers.end(), shardServers.begin(), shardServers.end());
+ const auto shardConnStatus = ConnectionString::parse(shard.getHost());
+ if (!shardConnStatus.isOK()) {
+ return Status(ErrorCodes::UnsupportedFormat,
+ stream() << "invalid shard host " << shard.getHost()
+ << " read from the config server"
+ << shardConnStatus.getStatus().toString());
}
- }
- catch (const DBException& e) {
- return e.toStatus("could not read shards collection");
- }
-
- // Add config servers to list of servers to check version against
- vector<HostAndPort> configServers = catalogManager->connectionString().getServers();
- servers.insert(servers.end(), configServers.begin(), configServers.end());
- //
- // We've now got all the shard info from the config server, start contacting the shards
- // and config servers and verifying their versions.
- //
+ vector<HostAndPort> shardServers = shardConnStatus.getValue().getServers();
+ servers.insert(servers.end(), shardServers.begin(), shardServers.end());
+ }
+ } catch (const DBException& e) {
+ return e.toStatus("could not read shards collection");
+ }
- for (vector<HostAndPort>::iterator serverIt = servers.begin();
- serverIt != servers.end(); ++serverIt) {
+ // Add config servers to list of servers to check version against
+ vector<HostAndPort> configServers = catalogManager->connectionString().getServers();
+ servers.insert(servers.end(), configServers.begin(), configServers.end());
- // Note: This will *always* be a single-host connection
- ConnectionString serverLoc(*serverIt);
- dassert(serverLoc.type() == ConnectionString::MASTER ||
- serverLoc.type() == ConnectionString::CUSTOM); // for dbtests
+ //
+ // We've now got all the shard info from the config server, start contacting the shards
+ // and config servers and verifying their versions.
+ //
- log() << "checking that version of host " << serverLoc << " is compatible with "
- << minMongoVersion << endl;
+ for (vector<HostAndPort>::iterator serverIt = servers.begin(); serverIt != servers.end();
+ ++serverIt) {
+ // Note: This will *always* be a single-host connection
+ ConnectionString serverLoc(*serverIt);
+ dassert(serverLoc.type() == ConnectionString::MASTER ||
+ serverLoc.type() == ConnectionString::CUSTOM); // for dbtests
- unique_ptr<ScopedDbConnection> serverConnPtr;
+ log() << "checking that version of host " << serverLoc << " is compatible with "
+ << minMongoVersion << endl;
- bool resultOk;
- BSONObj buildInfo;
+ unique_ptr<ScopedDbConnection> serverConnPtr;
- try {
- serverConnPtr.reset(new ScopedDbConnection(serverLoc, 30));
- ScopedDbConnection& serverConn = *serverConnPtr;
+ bool resultOk;
+ BSONObj buildInfo;
- resultOk = serverConn->runCommand("admin",
- BSON("buildInfo" << 1),
- buildInfo);
- }
- catch (const DBException& e) {
- warning() << "could not run buildInfo command on " << serverLoc.toString() << " "
- << causedBy(e) << ". Please ensure that this server is up and at a "
- "version >= "
- << minMongoVersion;
- continue;
- }
+ try {
+ serverConnPtr.reset(new ScopedDbConnection(serverLoc, 30));
+ ScopedDbConnection& serverConn = *serverConnPtr;
+
+ resultOk = serverConn->runCommand("admin", BSON("buildInfo" << 1), buildInfo);
+ } catch (const DBException& e) {
+ warning() << "could not run buildInfo command on " << serverLoc.toString() << " "
+ << causedBy(e) << ". Please ensure that this server is up and at a "
+ "version >= " << minMongoVersion;
+ continue;
+ }
- // TODO: Make running commands saner such that we can consolidate error handling
- if (!resultOk) {
- return Status(ErrorCodes::UnknownError,
- stream() << DBClientConnection::getLastErrorString(buildInfo)
- << causedBy(buildInfo.toString()));
- }
+ // TODO: Make running commands saner such that we can consolidate error handling
+ if (!resultOk) {
+ return Status(ErrorCodes::UnknownError,
+ stream() << DBClientConnection::getLastErrorString(buildInfo)
+ << causedBy(buildInfo.toString()));
+ }
- serverConnPtr->done();
+ serverConnPtr->done();
- verify(buildInfo["version"].type() == String);
- string mongoVersion = buildInfo["version"].String();
+ verify(buildInfo["version"].type() == String);
+ string mongoVersion = buildInfo["version"].String();
- if (versionCmp(mongoVersion, minMongoVersion) < 0) {
- return Status(ErrorCodes::RemoteValidationError,
- stream() << "version " << mongoVersion << " detected on mongo "
- "server at " << serverLoc.toString() <<
- ", but version >= " << minMongoVersion << " required");
- }
+ if (versionCmp(mongoVersion, minMongoVersion) < 0) {
+ return Status(ErrorCodes::RemoteValidationError,
+ stream() << "version " << mongoVersion << " detected on mongo "
+ "server at "
+ << serverLoc.toString() << ", but version >= " << minMongoVersion
+ << " required");
}
-
- return Status::OK();
}
- // Helper function for safe cursors
- DBClientCursor* _safeCursor(unique_ptr<DBClientCursor> cursor) {
- // TODO: Make error handling more consistent, it's annoying that cursors error out by
- // throwing exceptions *and* being empty
- uassert(16625, str::stream() << "cursor not found, transport error", cursor.get());
- return cursor.release();
- }
+ return Status::OK();
+}
+// Helper function for safe cursors
+DBClientCursor* _safeCursor(unique_ptr<DBClientCursor> cursor) {
+ // TODO: Make error handling more consistent, it's annoying that cursors error out by
+ // throwing exceptions *and* being empty
+ uassert(16625, str::stream() << "cursor not found, transport error", cursor.get());
+ return cursor.release();
+}
}
diff --git a/src/mongo/s/catalog/legacy/cluster_client_internal.h b/src/mongo/s/catalog/legacy/cluster_client_internal.h
index 8a140e171cd..ae60e519fbd 100644
--- a/src/mongo/s/catalog/legacy/cluster_client_internal.h
+++ b/src/mongo/s/catalog/legacy/cluster_client_internal.h
@@ -34,27 +34,26 @@
namespace mongo {
- class CatalogManager;
- class DBClientCursor;
- class Status;
-
- /**
- * Tries to check the versions of all active hosts in a cluster. Not 100% accurate, but pretty
- * effective if hosts are reachable.
- *
- * Returns OK if hosts are compatible as far as we know, RemoteValidationError if hosts are not
- * compatible, and an error Status if anything else goes wrong.
- */
- Status checkClusterMongoVersions(CatalogManager* catalogManager,
- const std::string& minMongoVersion);
-
- //
- // Needed to normalize exception behavior of connections and cursors
- // TODO: Remove when we refactor the client connection interface to something more consistent.
- //
-
- // Helper function which throws for invalid cursor initialization.
- // Note: cursor ownership will be passed to this function.
- DBClientCursor* _safeCursor(std::unique_ptr<DBClientCursor> cursor);
+class CatalogManager;
+class DBClientCursor;
+class Status;
+/**
+ * Tries to check the versions of all active hosts in a cluster. Not 100% accurate, but pretty
+ * effective if hosts are reachable.
+ *
+ * Returns OK if hosts are compatible as far as we know, RemoteValidationError if hosts are not
+ * compatible, and an error Status if anything else goes wrong.
+ */
+Status checkClusterMongoVersions(CatalogManager* catalogManager,
+ const std::string& minMongoVersion);
+
+//
+// Needed to normalize exception behavior of connections and cursors
+// TODO: Remove when we refactor the client connection interface to something more consistent.
+//
+
+// Helper function which throws for invalid cursor initialization.
+// Note: cursor ownership will be passed to this function.
+DBClientCursor* _safeCursor(std::unique_ptr<DBClientCursor> cursor);
}
diff --git a/src/mongo/s/catalog/legacy/config_coordinator.cpp b/src/mongo/s/catalog/legacy/config_coordinator.cpp
index e7f33095be9..f4d02c5433a 100644
--- a/src/mongo/s/catalog/legacy/config_coordinator.cpp
+++ b/src/mongo/s/catalog/legacy/config_coordinator.cpp
@@ -43,431 +43,424 @@
namespace mongo {
- using std::string;
- using std::vector;
+using std::string;
+using std::vector;
namespace {
- /**
- * A BSON serializable object representing a setShardVersion command request.
- */
- class SSVRequest : public BSONSerializable {
- MONGO_DISALLOW_COPYING(SSVRequest);
- public:
- SSVRequest(const std::string& configDBString) : _configDBString(configDBString) {
-
- }
-
- bool isValid(std::string* errMsg) const {
- return true;
- }
+/**
+ * A BSON serializable object representing a setShardVersion command request.
+ */
+class SSVRequest : public BSONSerializable {
+ MONGO_DISALLOW_COPYING(SSVRequest);
- /** Returns the BSON representation of the entry. */
- BSONObj toBSON() const {
- BSONObjBuilder builder;
- builder.append("setShardVersion", ""); // empty ns for init
- builder.append("configdb", _configDBString);
- builder.append("init", true);
- builder.append("authoritative", true);
- return builder.obj();
- }
+public:
+ SSVRequest(const std::string& configDBString) : _configDBString(configDBString) {}
- bool parseBSON(const BSONObj& source, std::string* errMsg) {
- // Not implemented
- invariant(false);
- return false;
- }
+ bool isValid(std::string* errMsg) const {
+ return true;
+ }
- void clear() {
- // Not implemented
- invariant(false);
- }
+ /** Returns the BSON representation of the entry. */
+ BSONObj toBSON() const {
+ BSONObjBuilder builder;
+ builder.append("setShardVersion", ""); // empty ns for init
+ builder.append("configdb", _configDBString);
+ builder.append("init", true);
+ builder.append("authoritative", true);
+ return builder.obj();
+ }
- string toString() const {
- return toBSON().toString();
- }
+ bool parseBSON(const BSONObj& source, std::string* errMsg) {
+ // Not implemented
+ invariant(false);
+ return false;
+ }
- private:
- const std::string _configDBString;
- };
+ void clear() {
+ // Not implemented
+ invariant(false);
+ }
- /**
- * A BSON serializable object representing a setShardVersion command response.
- */
- class SSVResponse : public BSONSerializable {
- MONGO_DISALLOW_COPYING(SSVResponse);
- public:
- static const BSONField<int> ok;
- static const BSONField<int> errCode;
- static const BSONField<string> errMessage;
+ string toString() const {
+ return toBSON().toString();
+ }
+private:
+ const std::string _configDBString;
+};
- SSVResponse() {
- clear();
- }
+/**
+ * A BSON serializable object representing a setShardVersion command response.
+ */
+class SSVResponse : public BSONSerializable {
+ MONGO_DISALLOW_COPYING(SSVResponse);
- bool isValid(std::string* errMsg) const {
- return _isOkSet;
- }
+public:
+ static const BSONField<int> ok;
+ static const BSONField<int> errCode;
+ static const BSONField<string> errMessage;
- BSONObj toBSON() const {
- BSONObjBuilder builder;
- if (_isOkSet) builder << ok(_ok);
- if (_isErrCodeSet) builder << errCode(_errCode);
- if (_isErrMessageSet) builder << errMessage(_errMessage);
+ SSVResponse() {
+ clear();
+ }
- return builder.obj();
- }
+ bool isValid(std::string* errMsg) const {
+ return _isOkSet;
+ }
- bool parseBSON(const BSONObj& source, std::string* errMsg) {
- FieldParser::FieldState result;
+ BSONObj toBSON() const {
+ BSONObjBuilder builder;
- result = FieldParser::extractNumber(source, ok, &_ok, errMsg);
- if (result == FieldParser::FIELD_INVALID) {
- return false;
- }
- _isOkSet = result != FieldParser::FIELD_NONE;
+ if (_isOkSet)
+ builder << ok(_ok);
+ if (_isErrCodeSet)
+ builder << errCode(_errCode);
+ if (_isErrMessageSet)
+ builder << errMessage(_errMessage);
- result = FieldParser::extract(source, errCode, &_errCode, errMsg);
- if (result == FieldParser::FIELD_INVALID) {
- return false;
- }
- _isErrCodeSet = result != FieldParser::FIELD_NONE;
+ return builder.obj();
+ }
- result = FieldParser::extract(source, errMessage, &_errMessage, errMsg);
- if (result == FieldParser::FIELD_INVALID) {
- return false;
- }
- _isErrMessageSet = result != FieldParser::FIELD_NONE;
+ bool parseBSON(const BSONObj& source, std::string* errMsg) {
+ FieldParser::FieldState result;
- return true;
+ result = FieldParser::extractNumber(source, ok, &_ok, errMsg);
+ if (result == FieldParser::FIELD_INVALID) {
+ return false;
}
+ _isOkSet = result != FieldParser::FIELD_NONE;
- void clear() {
- _ok = false;
- _isOkSet = false;
-
- _errCode = 0;
- _isErrCodeSet = false;
-
- _errMessage = "";
- _isErrMessageSet = false;
+ result = FieldParser::extract(source, errCode, &_errCode, errMsg);
+ if (result == FieldParser::FIELD_INVALID) {
+ return false;
}
+ _isErrCodeSet = result != FieldParser::FIELD_NONE;
- string toString() const {
- return toBSON().toString();
+ result = FieldParser::extract(source, errMessage, &_errMessage, errMsg);
+ if (result == FieldParser::FIELD_INVALID) {
+ return false;
}
+ _isErrMessageSet = result != FieldParser::FIELD_NONE;
- int getOk() {
- dassert(_isOkSet);
- return _ok;
- }
+ return true;
+ }
- void setOk(int ok) {
- _ok = ok;
- _isOkSet = true;
- }
+ void clear() {
+ _ok = false;
+ _isOkSet = false;
- int getErrCode() {
- if (_isErrCodeSet) {
- return _errCode;
- }
- else {
- return errCode.getDefault();
- }
- }
+ _errCode = 0;
+ _isErrCodeSet = false;
- void setErrCode(int errCode) {
- _errCode = errCode;
- _isErrCodeSet = true;
- }
+ _errMessage = "";
+ _isErrMessageSet = false;
+ }
- bool isErrCodeSet() const {
- return _isErrCodeSet;
- }
+ string toString() const {
+ return toBSON().toString();
+ }
- const string& getErrMessage() {
- dassert(_isErrMessageSet);
- return _errMessage;
- }
+ int getOk() {
+ dassert(_isOkSet);
+ return _ok;
+ }
- void setErrMessage(StringData errMsg) {
- _errMessage = errMsg.toString();
- _isErrMessageSet = true;
+ void setOk(int ok) {
+ _ok = ok;
+ _isOkSet = true;
+ }
+
+ int getErrCode() {
+ if (_isErrCodeSet) {
+ return _errCode;
+ } else {
+ return errCode.getDefault();
}
+ }
- private:
- int _ok;
- bool _isOkSet;
+ void setErrCode(int errCode) {
+ _errCode = errCode;
+ _isErrCodeSet = true;
+ }
- int _errCode;
- bool _isErrCodeSet;
+ bool isErrCodeSet() const {
+ return _isErrCodeSet;
+ }
- string _errMessage;
- bool _isErrMessageSet;
- };
+ const string& getErrMessage() {
+ dassert(_isErrMessageSet);
+ return _errMessage;
+ }
- const BSONField<int> SSVResponse::ok("ok");
- const BSONField<int> SSVResponse::errCode("code");
- const BSONField<string> SSVResponse::errMessage("errmsg");
+ void setErrMessage(StringData errMsg) {
+ _errMessage = errMsg.toString();
+ _isErrMessageSet = true;
+ }
+private:
+ int _ok;
+ bool _isOkSet;
- struct ConfigResponse {
- ConnectionString configHost;
- BatchedCommandResponse response;
- };
+ int _errCode;
+ bool _isErrCodeSet;
- void buildErrorFrom(const Status& status, BatchedCommandResponse* response) {
- response->setOk(false);
- response->setErrCode(static_cast<int>(status.code()));
- response->setErrMessage(status.reason());
+ string _errMessage;
+ bool _isErrMessageSet;
+};
- dassert(response->isValid(NULL));
- }
+const BSONField<int> SSVResponse::ok("ok");
+const BSONField<int> SSVResponse::errCode("code");
+const BSONField<string> SSVResponse::errMessage("errmsg");
- bool areResponsesEqual(const BatchedCommandResponse& responseA,
- const BatchedCommandResponse& responseB) {
- // Note: This needs to also take into account comparing responses from legacy writes
- // and write commands.
+struct ConfigResponse {
+ ConnectionString configHost;
+ BatchedCommandResponse response;
+};
- // TODO: Better reporting of why not equal
- if (responseA.getOk() != responseB.getOk()) {
- return false;
- }
+void buildErrorFrom(const Status& status, BatchedCommandResponse* response) {
+ response->setOk(false);
+ response->setErrCode(static_cast<int>(status.code()));
+ response->setErrMessage(status.reason());
- if (responseA.getN() != responseB.getN()) {
- return false;
- }
+ dassert(response->isValid(NULL));
+}
- if (responseA.isUpsertDetailsSet()) {
- // TODO:
- }
+bool areResponsesEqual(const BatchedCommandResponse& responseA,
+ const BatchedCommandResponse& responseB) {
+ // Note: This needs to also take into account comparing responses from legacy writes
+ // and write commands.
- if (responseA.getOk()) {
- return true;
- }
+ // TODO: Better reporting of why not equal
+ if (responseA.getOk() != responseB.getOk()) {
+ return false;
+ }
+
+ if (responseA.getN() != responseB.getN()) {
+ return false;
+ }
- // TODO: Compare errors here
+ if (responseA.isUpsertDetailsSet()) {
+ // TODO:
+ }
+ if (responseA.getOk()) {
return true;
}
- bool areAllResponsesEqual(const vector<ConfigResponse*>& responses) {
- BatchedCommandResponse* lastResponse = NULL;
+ // TODO: Compare errors here
- for (vector<ConfigResponse*>::const_iterator it = responses.begin();
- it != responses.end();
- ++it) {
+ return true;
+}
- BatchedCommandResponse* response = &(*it)->response;
+bool areAllResponsesEqual(const vector<ConfigResponse*>& responses) {
+ BatchedCommandResponse* lastResponse = NULL;
- if (lastResponse != NULL) {
- if (!areResponsesEqual(*lastResponse, *response)) {
- return false;
- }
- }
+ for (vector<ConfigResponse*>::const_iterator it = responses.begin(); it != responses.end();
+ ++it) {
+ BatchedCommandResponse* response = &(*it)->response;
- lastResponse = response;
+ if (lastResponse != NULL) {
+ if (!areResponsesEqual(*lastResponse, *response)) {
+ return false;
+ }
}
- return true;
+ lastResponse = response;
}
- void combineResponses(const vector<ConfigResponse*>& responses,
- BatchedCommandResponse* clientResponse) {
-
- if (areAllResponsesEqual(responses)) {
- responses.front()->response.cloneTo(clientResponse);
- return;
- }
-
- BSONObjBuilder builder;
- for (vector<ConfigResponse*>::const_iterator it = responses.begin();
- it != responses.end();
- ++it) {
+ return true;
+}
- builder.append((*it)->configHost.toString(), (*it)->response.toBSON());
- }
+void combineResponses(const vector<ConfigResponse*>& responses,
+ BatchedCommandResponse* clientResponse) {
+ if (areAllResponsesEqual(responses)) {
+ responses.front()->response.cloneTo(clientResponse);
+ return;
+ }
- clientResponse->setOk(false);
- clientResponse->setErrCode(ErrorCodes::ManualInterventionRequired);
- clientResponse->setErrMessage("config write was not consistent, "
- "manual intervention may be required. "
- "config responses: " + builder.obj().toString());
+ BSONObjBuilder builder;
+ for (vector<ConfigResponse*>::const_iterator it = responses.begin(); it != responses.end();
+ ++it) {
+ builder.append((*it)->configHost.toString(), (*it)->response.toBSON());
}
-} // namespace
+ clientResponse->setOk(false);
+ clientResponse->setErrCode(ErrorCodes::ManualInterventionRequired);
+ clientResponse->setErrMessage(
+ "config write was not consistent, "
+ "manual intervention may be required. "
+ "config responses: " +
+ builder.obj().toString());
+}
+} // namespace
- ConfigCoordinator::ConfigCoordinator(MultiCommandDispatch* dispatcher,
- const ConnectionString& configServerConnectionString)
- : _dispatcher(dispatcher),
- _configServerConnectionString(configServerConnectionString) {
- }
+ConfigCoordinator::ConfigCoordinator(MultiCommandDispatch* dispatcher,
+ const ConnectionString& configServerConnectionString)
+ : _dispatcher(dispatcher), _configServerConnectionString(configServerConnectionString) {}
- bool ConfigCoordinator::_checkConfigString(BatchedCommandResponse* clientResponse) {
- //
- // Send side
- //
+bool ConfigCoordinator::_checkConfigString(BatchedCommandResponse* clientResponse) {
+ //
+ // Send side
+ //
- for (const HostAndPort& server : _configServerConnectionString.getServers()) {
- SSVRequest ssvRequest(_configServerConnectionString.toString());
- _dispatcher->addCommand(ConnectionString(server), "admin", ssvRequest);
- }
+ for (const HostAndPort& server : _configServerConnectionString.getServers()) {
+ SSVRequest ssvRequest(_configServerConnectionString.toString());
+ _dispatcher->addCommand(ConnectionString(server), "admin", ssvRequest);
+ }
- _dispatcher->sendAll();
+ _dispatcher->sendAll();
- //
- // Recv side
- //
+ //
+ // Recv side
+ //
- bool ssvError = false;
- while (_dispatcher->numPending() > 0) {
- ConnectionString configHost;
- SSVResponse response;
+ bool ssvError = false;
+ while (_dispatcher->numPending() > 0) {
+ ConnectionString configHost;
+ SSVResponse response;
- // We've got to recv everything, no matter what - even if some failed.
- Status dispatchStatus = _dispatcher->recvAny(&configHost, &response);
+ // We've got to recv everything, no matter what - even if some failed.
+ Status dispatchStatus = _dispatcher->recvAny(&configHost, &response);
- if (ssvError) {
- // record only the first failure.
- continue;
- }
+ if (ssvError) {
+ // record only the first failure.
+ continue;
+ }
- if (!dispatchStatus.isOK()) {
- ssvError = true;
- clientResponse->setOk(false);
- clientResponse->setErrCode(static_cast<int>(dispatchStatus.code()));
- clientResponse->setErrMessage(dispatchStatus.reason());
- }
- else if (!response.getOk()) {
- ssvError = true;
- clientResponse->setOk(false);
- clientResponse->setErrMessage(response.getErrMessage());
+ if (!dispatchStatus.isOK()) {
+ ssvError = true;
+ clientResponse->setOk(false);
+ clientResponse->setErrCode(static_cast<int>(dispatchStatus.code()));
+ clientResponse->setErrMessage(dispatchStatus.reason());
+ } else if (!response.getOk()) {
+ ssvError = true;
+ clientResponse->setOk(false);
+ clientResponse->setErrMessage(response.getErrMessage());
- if (response.isErrCodeSet()) {
- clientResponse->setErrCode(response.getErrCode());
- }
+ if (response.isErrCodeSet()) {
+ clientResponse->setErrCode(response.getErrCode());
}
}
-
- return !ssvError;
}
- /**
- * The core config write functionality.
- *
- * Config writes run in two passes - the first is a quick check to ensure the config servers
- * are all reachable, the second runs the actual write.
- *
- * TODO: Upgrade and move this logic to the config servers, a state machine implementation
- * is probably the next step.
- */
- void ConfigCoordinator::executeBatch(const BatchedCommandRequest& clientRequest,
- BatchedCommandResponse* clientResponse) {
-
- const NamespaceString nss(clientRequest.getNS());
-
- // Should never use it for anything other than DBs residing on the config server
- dassert(nss.db() == "config" || nss.db() == "admin");
- dassert(clientRequest.sizeWriteOps() == 1u);
-
- // This is an opportunistic check that all config servers look healthy by calling
- // getLastError on each one of them. If there was some form of write/journaling error, get
- // last error would fail.
- {
- for (const HostAndPort& server : _configServerConnectionString.getServers()) {
- _dispatcher->addCommand(ConnectionString(server),
- "admin",
- RawBSONSerializable(BSON("getLastError" << true <<
- "fsync" << true)));
- }
+ return !ssvError;
+}
+
+/**
+ * The core config write functionality.
+ *
+ * Config writes run in two passes - the first is a quick check to ensure the config servers
+ * are all reachable, the second runs the actual write.
+ *
+ * TODO: Upgrade and move this logic to the config servers, a state machine implementation
+ * is probably the next step.
+ */
+void ConfigCoordinator::executeBatch(const BatchedCommandRequest& clientRequest,
+ BatchedCommandResponse* clientResponse) {
+ const NamespaceString nss(clientRequest.getNS());
+
+ // Should never use it for anything other than DBs residing on the config server
+ dassert(nss.db() == "config" || nss.db() == "admin");
+ dassert(clientRequest.sizeWriteOps() == 1u);
+
+ // This is an opportunistic check that all config servers look healthy by calling
+ // getLastError on each one of them. If there was some form of write/journaling error, get
+ // last error would fail.
+ {
+ for (const HostAndPort& server : _configServerConnectionString.getServers()) {
+ _dispatcher->addCommand(
+ ConnectionString(server),
+ "admin",
+ RawBSONSerializable(BSON("getLastError" << true << "fsync" << true)));
+ }
- _dispatcher->sendAll();
+ _dispatcher->sendAll();
- bool error = false;
- while (_dispatcher->numPending()) {
- ConnectionString host;
- RawBSONSerializable response;
+ bool error = false;
+ while (_dispatcher->numPending()) {
+ ConnectionString host;
+ RawBSONSerializable response;
- Status status = _dispatcher->recvAny(&host, &response);
- if (status.isOK()) {
- BSONObj obj = response.toBSON();
+ Status status = _dispatcher->recvAny(&host, &response);
+ if (status.isOK()) {
+ BSONObj obj = response.toBSON();
- LOG(3) << "Response " << obj.toString();
+ LOG(3) << "Response " << obj.toString();
- // If the ok field is anything other than 1, count it as error
- if (!obj["ok"].trueValue()) {
- error = true;
- log() << "Config server check for host " << host
- << " returned error: " << response;
- }
- }
- else {
+ // If the ok field is anything other than 1, count it as error
+ if (!obj["ok"].trueValue()) {
error = true;
log() << "Config server check for host " << host
- << " failed with status: " << status;
+ << " returned error: " << response;
}
- }
-
- // All responses should have been gathered by this point
- if (error) {
- clientResponse->setOk(false);
- clientResponse->setErrCode(ErrorCodes::RemoteValidationError);
- clientResponse->setErrMessage("Could not verify that config servers were active"
- " and reachable before write");
- return;
+ } else {
+ error = true;
+ log() << "Config server check for host " << host
+ << " failed with status: " << status;
}
}
- if (!_checkConfigString(clientResponse)) {
+ // All responses should have been gathered by this point
+ if (error) {
+ clientResponse->setOk(false);
+ clientResponse->setErrCode(ErrorCodes::RemoteValidationError);
+ clientResponse->setErrMessage(
+ "Could not verify that config servers were active"
+ " and reachable before write");
return;
}
+ }
- //
- // Do the actual writes
- //
+ if (!_checkConfigString(clientResponse)) {
+ return;
+ }
- BatchedCommandRequest configRequest( clientRequest.getBatchType() );
- clientRequest.cloneTo( &configRequest );
- configRequest.setNS( nss.coll() );
+ //
+ // Do the actual writes
+ //
- OwnedPointerVector<ConfigResponse> responsesOwned;
- vector<ConfigResponse*>& responses = responsesOwned.mutableVector();
+ BatchedCommandRequest configRequest(clientRequest.getBatchType());
+ clientRequest.cloneTo(&configRequest);
+ configRequest.setNS(nss.coll());
- //
- // Send the actual config writes
- //
+ OwnedPointerVector<ConfigResponse> responsesOwned;
+ vector<ConfigResponse*>& responses = responsesOwned.mutableVector();
- // Get as many batches as we can at once
- for (const HostAndPort& server : _configServerConnectionString.getServers()) {
- _dispatcher->addCommand(ConnectionString(server), nss.db(), configRequest);
- }
+ //
+ // Send the actual config writes
+ //
- // Send them all out
- _dispatcher->sendAll();
+ // Get as many batches as we can at once
+ for (const HostAndPort& server : _configServerConnectionString.getServers()) {
+ _dispatcher->addCommand(ConnectionString(server), nss.db(), configRequest);
+ }
- //
- // Recv side
- //
+ // Send them all out
+ _dispatcher->sendAll();
- while (_dispatcher->numPending() > 0) {
- // Get the response
- responses.push_back(new ConfigResponse());
+ //
+ // Recv side
+ //
- ConfigResponse& configResponse = *responses.back();
- Status dispatchStatus = _dispatcher->recvAny(&configResponse.configHost,
- &configResponse.response);
+ while (_dispatcher->numPending() > 0) {
+ // Get the response
+ responses.push_back(new ConfigResponse());
- if (!dispatchStatus.isOK()) {
- buildErrorFrom(dispatchStatus, &configResponse.response);
- }
- }
+ ConfigResponse& configResponse = *responses.back();
+ Status dispatchStatus =
+ _dispatcher->recvAny(&configResponse.configHost, &configResponse.response);
- combineResponses(responses, clientResponse);
+ if (!dispatchStatus.isOK()) {
+ buildErrorFrom(dispatchStatus, &configResponse.response);
+ }
}
-} // namespace mongo
+ combineResponses(responses, clientResponse);
+}
+
+} // namespace mongo
diff --git a/src/mongo/s/catalog/legacy/config_coordinator.h b/src/mongo/s/catalog/legacy/config_coordinator.h
index 3cd0301f23f..b1a3e18ca88 100644
--- a/src/mongo/s/catalog/legacy/config_coordinator.h
+++ b/src/mongo/s/catalog/legacy/config_coordinator.h
@@ -36,27 +36,26 @@
namespace mongo {
- class MultiCommandDispatch;
+class MultiCommandDispatch;
- class ConfigCoordinator {
- public:
- ConfigCoordinator(MultiCommandDispatch* dispatcher,
- const ConnectionString& configServerConnectionString);
+class ConfigCoordinator {
+public:
+ ConfigCoordinator(MultiCommandDispatch* dispatcher,
+ const ConnectionString& configServerConnectionString);
- void executeBatch(const BatchedCommandRequest& request, BatchedCommandResponse* response);
+ void executeBatch(const BatchedCommandRequest& request, BatchedCommandResponse* response);
- private:
- /**
- * Initialize configDB string in config server or if already initialized,
- * check that it matches. Returns false if an error occured.
- */
- bool _checkConfigString(BatchedCommandResponse* clientResponse);
+private:
+ /**
+ * Initialize configDB string in config server or if already initialized,
+ * check that it matches. Returns false if an error occured.
+ */
+ bool _checkConfigString(BatchedCommandResponse* clientResponse);
- // Not owned here
- MultiCommandDispatch* const _dispatcher;
-
- const ConnectionString _configServerConnectionString;
- };
+ // Not owned here
+ MultiCommandDispatch* const _dispatcher;
+ const ConnectionString _configServerConnectionString;
+};
}
diff --git a/src/mongo/s/catalog/legacy/config_upgrade.cpp b/src/mongo/s/catalog/legacy/config_upgrade.cpp
index 2ed119d914f..3592d109e2b 100644
--- a/src/mongo/s/catalog/legacy/config_upgrade.cpp
+++ b/src/mongo/s/catalog/legacy/config_upgrade.cpp
@@ -52,519 +52,492 @@
namespace mongo {
- using std::unique_ptr;
- using std::make_pair;
- using std::map;
- using std::string;
- using std::vector;
- using str::stream;
-
- // Implemented in the respective steps' .cpp file
- bool doUpgradeV0ToV7(CatalogManager* catalogManager,
- const VersionType& lastVersionInfo,
- std::string* errMsg);
-
- bool doUpgradeV6ToV7(CatalogManager* catalogManager,
- const VersionType& lastVersionInfo,
- std::string* errMsg);
+using std::unique_ptr;
+using std::make_pair;
+using std::map;
+using std::string;
+using std::vector;
+using str::stream;
+
+// Implemented in the respective steps' .cpp file
+bool doUpgradeV0ToV7(CatalogManager* catalogManager,
+ const VersionType& lastVersionInfo,
+ std::string* errMsg);
+
+bool doUpgradeV6ToV7(CatalogManager* catalogManager,
+ const VersionType& lastVersionInfo,
+ std::string* errMsg);
namespace {
- struct VersionRange {
- VersionRange(int _minCompatibleVersion, int _currentVersion)
- : minCompatibleVersion(_minCompatibleVersion),
- currentVersion(_currentVersion) {
+struct VersionRange {
+ VersionRange(int _minCompatibleVersion, int _currentVersion)
+ : minCompatibleVersion(_minCompatibleVersion), currentVersion(_currentVersion) {}
- }
-
- bool operator==(const VersionRange& other) const {
- return (other.minCompatibleVersion == minCompatibleVersion)
- && (other.currentVersion == currentVersion);
- }
-
- bool operator!=(const VersionRange& other) const {
- return !(*this == other);
- }
-
- int minCompatibleVersion;
- int currentVersion;
- };
-
- enum VersionStatus {
- // No way to upgrade the test version to be compatible with current version
- VersionStatus_Incompatible,
+ bool operator==(const VersionRange& other) const {
+ return (other.minCompatibleVersion == minCompatibleVersion) &&
+ (other.currentVersion == currentVersion);
+ }
- // Current version is compatible with test version
- VersionStatus_Compatible,
+ bool operator!=(const VersionRange& other) const {
+ return !(*this == other);
+ }
- // Test version must be upgraded to be compatible with current version
- VersionStatus_NeedUpgrade
- };
+ int minCompatibleVersion;
+ int currentVersion;
+};
- /**
- * Encapsulates the information needed to register a config upgrade.
- */
- struct UpgradeStep {
- typedef stdx::function<bool(CatalogManager*, const VersionType&, string*)> UpgradeCallback;
+enum VersionStatus {
+ // No way to upgrade the test version to be compatible with current version
+ VersionStatus_Incompatible,
- UpgradeStep(int _fromVersion,
- const VersionRange& _toVersionRange,
- UpgradeCallback _upgradeCallback)
- : fromVersion(_fromVersion),
- toVersionRange(_toVersionRange),
- upgradeCallback(_upgradeCallback) {
+ // Current version is compatible with test version
+ VersionStatus_Compatible,
- }
+ // Test version must be upgraded to be compatible with current version
+ VersionStatus_NeedUpgrade
+};
- // The config version we're upgrading from
- int fromVersion;
+/**
+ * Encapsulates the information needed to register a config upgrade.
+ */
+struct UpgradeStep {
+ typedef stdx::function<bool(CatalogManager*, const VersionType&, string*)> UpgradeCallback;
- // The config version we're upgrading to and the min compatible config version (min, to)
- VersionRange toVersionRange;
+ UpgradeStep(int _fromVersion,
+ const VersionRange& _toVersionRange,
+ UpgradeCallback _upgradeCallback)
+ : fromVersion(_fromVersion),
+ toVersionRange(_toVersionRange),
+ upgradeCallback(_upgradeCallback) {}
- // The upgrade callback which performs the actual upgrade
- UpgradeCallback upgradeCallback;
- };
+ // The config version we're upgrading from
+ int fromVersion;
- typedef map<int, UpgradeStep> ConfigUpgradeRegistry;
+ // The config version we're upgrading to and the min compatible config version (min, to)
+ VersionRange toVersionRange;
- /**
- * Does a sanity-check validation of the registry ensuring three things:
- * 1. All upgrade paths lead to the same minCompatible/currentVersion
- * 2. Our constants match this final version pair
- * 3. There is a zero-version upgrade path
- */
- void validateRegistry(const ConfigUpgradeRegistry& registry) {
- VersionRange maxCompatibleConfigVersionRange(-1, -1);
- bool hasZeroVersionUpgrade = false;
+ // The upgrade callback which performs the actual upgrade
+ UpgradeCallback upgradeCallback;
+};
- for (const auto& upgradeStep : registry) {
- const UpgradeStep& upgrade = upgradeStep.second;
+typedef map<int, UpgradeStep> ConfigUpgradeRegistry;
- if (upgrade.fromVersion == 0) {
- hasZeroVersionUpgrade = true;
- }
+/**
+ * Does a sanity-check validation of the registry ensuring three things:
+ * 1. All upgrade paths lead to the same minCompatible/currentVersion
+ * 2. Our constants match this final version pair
+ * 3. There is a zero-version upgrade path
+ */
+void validateRegistry(const ConfigUpgradeRegistry& registry) {
+ VersionRange maxCompatibleConfigVersionRange(-1, -1);
+ bool hasZeroVersionUpgrade = false;
- if (maxCompatibleConfigVersionRange.currentVersion
- < upgrade.toVersionRange.currentVersion) {
+ for (const auto& upgradeStep : registry) {
+ const UpgradeStep& upgrade = upgradeStep.second;
- maxCompatibleConfigVersionRange = upgrade.toVersionRange;
- }
- else if (maxCompatibleConfigVersionRange.currentVersion
- == upgrade.toVersionRange.currentVersion) {
+ if (upgrade.fromVersion == 0) {
+ hasZeroVersionUpgrade = true;
+ }
- // Make sure all max upgrade paths end up with same version and compatibility
- fassert(16621, maxCompatibleConfigVersionRange == upgrade.toVersionRange);
- }
+ if (maxCompatibleConfigVersionRange.currentVersion <
+ upgrade.toVersionRange.currentVersion) {
+ maxCompatibleConfigVersionRange = upgrade.toVersionRange;
+ } else if (maxCompatibleConfigVersionRange.currentVersion ==
+ upgrade.toVersionRange.currentVersion) {
+ // Make sure all max upgrade paths end up with same version and compatibility
+ fassert(16621, maxCompatibleConfigVersionRange == upgrade.toVersionRange);
}
+ }
- // Make sure we have a zero-version upgrade
- fassert(16622, hasZeroVersionUpgrade);
+ // Make sure we have a zero-version upgrade
+ fassert(16622, hasZeroVersionUpgrade);
- // Make sure our max registered range is the same as our constants
- fassert(16623,
- maxCompatibleConfigVersionRange
- == VersionRange(MIN_COMPATIBLE_CONFIG_VERSION, CURRENT_CONFIG_VERSION));
- }
+ // Make sure our max registered range is the same as our constants
+ fassert(16623,
+ maxCompatibleConfigVersionRange ==
+ VersionRange(MIN_COMPATIBLE_CONFIG_VERSION, CURRENT_CONFIG_VERSION));
+}
- /**
- * Creates a registry of config upgrades used by the code below.
- *
- * MODIFY THIS CODE HERE TO CREATE A NEW UPGRADE PATH FROM X to Y
- * YOU MUST ALSO MODIFY THE VERSION DECLARATIONS IN config_upgrade.h
- *
- * Caveats:
- * - All upgrade paths must eventually lead to the exact same version range of
- * min and max compatible versions.
- * - This resulting version range must be equal to:
- * make_pair(MIN_COMPATIBLE_CONFIG_VERSION, CURRENT_CONFIG_VERSION)
- * - There must always be an upgrade path from the empty version (0) to the latest
- * config version.
- *
- * If any of the above is false, we fassert and fail to start.
- */
- ConfigUpgradeRegistry createRegistry() {
- ConfigUpgradeRegistry registry;
-
- // v0 to v7
- UpgradeStep v0ToV7(0, VersionRange(6, 7), doUpgradeV0ToV7);
- registry.insert(make_pair(v0ToV7.fromVersion, v0ToV7));
-
- // v6 to v7
- UpgradeStep v6ToV7(6, VersionRange(6, 7), doUpgradeV6ToV7);
- registry.insert(make_pair(v6ToV7.fromVersion, v6ToV7));
-
- validateRegistry(registry);
-
- return registry;
- }
+/**
+ * Creates a registry of config upgrades used by the code below.
+ *
+ * MODIFY THIS CODE HERE TO CREATE A NEW UPGRADE PATH FROM X to Y
+ * YOU MUST ALSO MODIFY THE VERSION DECLARATIONS IN config_upgrade.h
+ *
+ * Caveats:
+ * - All upgrade paths must eventually lead to the exact same version range of
+ * min and max compatible versions.
+ * - This resulting version range must be equal to:
+ * make_pair(MIN_COMPATIBLE_CONFIG_VERSION, CURRENT_CONFIG_VERSION)
+ * - There must always be an upgrade path from the empty version (0) to the latest
+ * config version.
+ *
+ * If any of the above is false, we fassert and fail to start.
+ */
+ConfigUpgradeRegistry createRegistry() {
+ ConfigUpgradeRegistry registry;
- /**
- * Checks whether or not a particular cluster version is compatible with our current
- * version and mongodb version. The version is compatible if it falls between the
- * MIN_COMPATIBLE_CONFIG_VERSION and CURRENT_CONFIG_VERSION and is not explicitly excluded.
- *
- * @return a VersionStatus enum indicating compatibility
- */
- VersionStatus isConfigVersionCompatible(const VersionType& versionInfo, string* whyNot) {
- string dummy;
- if (!whyNot) {
- whyNot = &dummy;
- }
+ // v0 to v7
+ UpgradeStep v0ToV7(0, VersionRange(6, 7), doUpgradeV0ToV7);
+ registry.insert(make_pair(v0ToV7.fromVersion, v0ToV7));
- // Check if we're empty
- if (versionInfo.getCurrentVersion() == UpgradeHistory_EmptyVersion) {
- return VersionStatus_NeedUpgrade;
- }
+ // v6 to v7
+ UpgradeStep v6ToV7(6, VersionRange(6, 7), doUpgradeV6ToV7);
+ registry.insert(make_pair(v6ToV7.fromVersion, v6ToV7));
- // Check that we aren't too old
- if (CURRENT_CONFIG_VERSION < versionInfo.getMinCompatibleVersion()) {
+ validateRegistry(registry);
- *whyNot = stream() << "the config version " << CURRENT_CONFIG_VERSION
- << " of our process is too old "
- << "for the detected config version "
- << versionInfo.getMinCompatibleVersion();
+ return registry;
+}
- return VersionStatus_Incompatible;
- }
+/**
+ * Checks whether or not a particular cluster version is compatible with our current
+ * version and mongodb version. The version is compatible if it falls between the
+ * MIN_COMPATIBLE_CONFIG_VERSION and CURRENT_CONFIG_VERSION and is not explicitly excluded.
+ *
+ * @return a VersionStatus enum indicating compatibility
+ */
+VersionStatus isConfigVersionCompatible(const VersionType& versionInfo, string* whyNot) {
+ string dummy;
+ if (!whyNot) {
+ whyNot = &dummy;
+ }
- // Check that the mongo version of this process hasn't been excluded from the cluster
- vector<MongoVersionRange> excludedRanges;
- if (versionInfo.isExcludingMongoVersionsSet() &&
- !MongoVersionRange::parseBSONArray(versionInfo.getExcludingMongoVersions(),
- &excludedRanges,
- whyNot))
- {
+ // Check if we're empty
+ if (versionInfo.getCurrentVersion() == UpgradeHistory_EmptyVersion) {
+ return VersionStatus_NeedUpgrade;
+ }
- *whyNot = stream() << "could not understand excluded version ranges"
- << causedBy(whyNot);
+ // Check that we aren't too old
+ if (CURRENT_CONFIG_VERSION < versionInfo.getMinCompatibleVersion()) {
+ *whyNot = stream() << "the config version " << CURRENT_CONFIG_VERSION
+ << " of our process is too old "
+ << "for the detected config version "
+ << versionInfo.getMinCompatibleVersion();
- return VersionStatus_Incompatible;
- }
+ return VersionStatus_Incompatible;
+ }
- // versionString is the global version of this process
- if (isInMongoVersionRanges(versionString, excludedRanges)) {
+ // Check that the mongo version of this process hasn't been excluded from the cluster
+ vector<MongoVersionRange> excludedRanges;
+ if (versionInfo.isExcludingMongoVersionsSet() &&
+ !MongoVersionRange::parseBSONArray(
+ versionInfo.getExcludingMongoVersions(), &excludedRanges, whyNot)) {
+ *whyNot = stream() << "could not understand excluded version ranges" << causedBy(whyNot);
- // Cast needed here for MSVC compiler issue
- *whyNot = stream() << "not compatible with current config version, version "
- << reinterpret_cast<const char*>(versionString)
- << "has been excluded.";
+ return VersionStatus_Incompatible;
+ }
- return VersionStatus_Incompatible;
- }
+ // versionString is the global version of this process
+ if (isInMongoVersionRanges(versionString, excludedRanges)) {
+ // Cast needed here for MSVC compiler issue
+ *whyNot = stream() << "not compatible with current config version, version "
+ << reinterpret_cast<const char*>(versionString) << "has been excluded.";
- // Check if we need to upgrade
- if (versionInfo.getCurrentVersion() >= CURRENT_CONFIG_VERSION) {
- return VersionStatus_Compatible;
- }
+ return VersionStatus_Incompatible;
+ }
- return VersionStatus_NeedUpgrade;
+ // Check if we need to upgrade
+ if (versionInfo.getCurrentVersion() >= CURRENT_CONFIG_VERSION) {
+ return VersionStatus_Compatible;
}
- // Checks that all config servers are online
- bool _checkConfigServersAlive(const ConnectionString& configLoc, string* errMsg) {
- bool resultOk;
- BSONObj result;
- try {
- ScopedDbConnection conn(configLoc, 30);
- if (conn->type() == ConnectionString::SYNC) {
- // TODO: Dynamic cast is bad, we need a better way of managing this op
- // via the heirarchy (or not)
- SyncClusterConnection* scc = dynamic_cast<SyncClusterConnection*>(conn.get());
- fassert(16729, scc != NULL);
- return scc->prepare(*errMsg);
- }
- else {
- resultOk = conn->runCommand("admin", BSON( "fsync" << 1 ), result);
- }
- conn.done();
- }
- catch (const DBException& e) {
- *errMsg = e.toString();
- return false;
+ return VersionStatus_NeedUpgrade;
+}
+
+// Checks that all config servers are online
+bool _checkConfigServersAlive(const ConnectionString& configLoc, string* errMsg) {
+ bool resultOk;
+ BSONObj result;
+ try {
+ ScopedDbConnection conn(configLoc, 30);
+ if (conn->type() == ConnectionString::SYNC) {
+ // TODO: Dynamic cast is bad, we need a better way of managing this op
+ // via the heirarchy (or not)
+ SyncClusterConnection* scc = dynamic_cast<SyncClusterConnection*>(conn.get());
+ fassert(16729, scc != NULL);
+ return scc->prepare(*errMsg);
+ } else {
+ resultOk = conn->runCommand("admin", BSON("fsync" << 1), result);
}
-
- if (!resultOk) {
- *errMsg = DBClientWithCommands::getLastErrorString(result);
- return false;
- }
-
- return true;
+ conn.done();
+ } catch (const DBException& e) {
+ *errMsg = e.toString();
+ return false;
}
- // Dispatches upgrades based on version to the upgrades registered in the upgrade registry
- bool _nextUpgrade(CatalogManager* catalogManager,
- const ConfigUpgradeRegistry& registry,
- const VersionType& lastVersionInfo,
- VersionType* upgradedVersionInfo,
- string* errMsg) {
+ if (!resultOk) {
+ *errMsg = DBClientWithCommands::getLastErrorString(result);
+ return false;
+ }
- int fromVersion = lastVersionInfo.getCurrentVersion();
+ return true;
+}
- ConfigUpgradeRegistry::const_iterator foundIt = registry.find(fromVersion);
+// Dispatches upgrades based on version to the upgrades registered in the upgrade registry
+bool _nextUpgrade(CatalogManager* catalogManager,
+ const ConfigUpgradeRegistry& registry,
+ const VersionType& lastVersionInfo,
+ VersionType* upgradedVersionInfo,
+ string* errMsg) {
+ int fromVersion = lastVersionInfo.getCurrentVersion();
- if (foundIt == registry.end()) {
+ ConfigUpgradeRegistry::const_iterator foundIt = registry.find(fromVersion);
- *errMsg = stream() << "newer version " << CURRENT_CONFIG_VERSION
- << " of mongo config metadata is required, " << "current version is "
- << fromVersion << ", "
- << "don't know how to upgrade from this version";
+ if (foundIt == registry.end()) {
+ *errMsg = stream() << "newer version " << CURRENT_CONFIG_VERSION
+ << " of mongo config metadata is required, "
+ << "current version is " << fromVersion << ", "
+ << "don't know how to upgrade from this version";
- return false;
- }
+ return false;
+ }
- const UpgradeStep& upgrade = foundIt->second;
- int toVersion = upgrade.toVersionRange.currentVersion;
+ const UpgradeStep& upgrade = foundIt->second;
+ int toVersion = upgrade.toVersionRange.currentVersion;
- log() << "starting next upgrade step from v" << fromVersion << " to v" << toVersion;
+ log() << "starting next upgrade step from v" << fromVersion << " to v" << toVersion;
- // Log begin to config.changelog
- catalogManager->logChange(NULL,
- "starting upgrade of config database",
- VersionType::ConfigNS,
- BSON("from" << fromVersion << "to" << toVersion));
+ // Log begin to config.changelog
+ catalogManager->logChange(NULL,
+ "starting upgrade of config database",
+ VersionType::ConfigNS,
+ BSON("from" << fromVersion << "to" << toVersion));
- if (!upgrade.upgradeCallback(catalogManager, lastVersionInfo, errMsg)) {
- *errMsg = stream() << "error upgrading config database from v"
- << fromVersion << " to v" << toVersion << causedBy(errMsg);
- return false;
- }
+ if (!upgrade.upgradeCallback(catalogManager, lastVersionInfo, errMsg)) {
+ *errMsg = stream() << "error upgrading config database from v" << fromVersion << " to v"
+ << toVersion << causedBy(errMsg);
+ return false;
+ }
- // Get the config version we've upgraded to and make sure it's sane
- Status verifyConfigStatus = getConfigVersion(catalogManager, upgradedVersionInfo);
+ // Get the config version we've upgraded to and make sure it's sane
+ Status verifyConfigStatus = getConfigVersion(catalogManager, upgradedVersionInfo);
- if (!verifyConfigStatus.isOK()) {
- *errMsg = stream() << "failed to validate v" << fromVersion << " config version upgrade"
- << causedBy(verifyConfigStatus);
+ if (!verifyConfigStatus.isOK()) {
+ *errMsg = stream() << "failed to validate v" << fromVersion << " config version upgrade"
+ << causedBy(verifyConfigStatus);
- return false;
- }
-
- catalogManager->logChange(NULL,
- "finished upgrade of config database",
- VersionType::ConfigNS,
- BSON("from" << fromVersion << "to" << toVersion));
- return true;
+ return false;
}
-} // namespace
+ catalogManager->logChange(NULL,
+ "finished upgrade of config database",
+ VersionType::ConfigNS,
+ BSON("from" << fromVersion << "to" << toVersion));
+ return true;
+}
+} // namespace
- /**
- * Returns the config version of the cluster pointed at by the connection string.
- *
- * @return OK if version found successfully, error status if something bad happened.
- */
- Status getConfigVersion(CatalogManager* catalogManager, VersionType* versionInfo) {
- try {
- versionInfo->clear();
- ScopedDbConnection conn(catalogManager->connectionString(), 30);
-
- unique_ptr<DBClientCursor> cursor(_safeCursor(conn->query("config.version",
- BSONObj())));
+/**
+ * Returns the config version of the cluster pointed at by the connection string.
+ *
+ * @return OK if version found successfully, error status if something bad happened.
+ */
+Status getConfigVersion(CatalogManager* catalogManager, VersionType* versionInfo) {
+ try {
+ versionInfo->clear();
- bool hasConfigData = conn->count(ShardType::ConfigNS)
- || conn->count(DatabaseType::ConfigNS)
- || conn->count(CollectionType::ConfigNS);
+ ScopedDbConnection conn(catalogManager->connectionString(), 30);
- if (!cursor->more()) {
+ unique_ptr<DBClientCursor> cursor(_safeCursor(conn->query("config.version", BSONObj())));
- // Version is 1 if we have data, 0 if we're completely empty
- if (hasConfigData) {
- versionInfo->setMinCompatibleVersion(UpgradeHistory_UnreportedVersion);
- versionInfo->setCurrentVersion(UpgradeHistory_UnreportedVersion);
- }
- else {
- versionInfo->setMinCompatibleVersion(UpgradeHistory_EmptyVersion);
- versionInfo->setCurrentVersion(UpgradeHistory_EmptyVersion);
- }
+ bool hasConfigData = conn->count(ShardType::ConfigNS) ||
+ conn->count(DatabaseType::ConfigNS) || conn->count(CollectionType::ConfigNS);
- conn.done();
- return Status::OK();
+ if (!cursor->more()) {
+ // Version is 1 if we have data, 0 if we're completely empty
+ if (hasConfigData) {
+ versionInfo->setMinCompatibleVersion(UpgradeHistory_UnreportedVersion);
+ versionInfo->setCurrentVersion(UpgradeHistory_UnreportedVersion);
+ } else {
+ versionInfo->setMinCompatibleVersion(UpgradeHistory_EmptyVersion);
+ versionInfo->setCurrentVersion(UpgradeHistory_EmptyVersion);
}
- BSONObj versionDoc = cursor->next();
- string errMsg;
+ conn.done();
+ return Status::OK();
+ }
- if (!versionInfo->parseBSON(versionDoc, &errMsg) || !versionInfo->isValid(&errMsg)) {
- conn.done();
+ BSONObj versionDoc = cursor->next();
+ string errMsg;
- return Status(ErrorCodes::UnsupportedFormat,
- stream() << "invalid config version document " << versionDoc
- << causedBy(errMsg));
- }
+ if (!versionInfo->parseBSON(versionDoc, &errMsg) || !versionInfo->isValid(&errMsg)) {
+ conn.done();
- if (cursor->more()) {
- conn.done();
+ return Status(ErrorCodes::UnsupportedFormat,
+ stream() << "invalid config version document " << versionDoc
+ << causedBy(errMsg));
+ }
- return Status(ErrorCodes::RemoteValidationError,
- stream() << "should only have 1 document "
- << "in config.version collection");
- }
+ if (cursor->more()) {
conn.done();
- }
- catch (const DBException& e) {
- return e.toStatus();
- }
- return Status::OK();
+ return Status(ErrorCodes::RemoteValidationError,
+ stream() << "should only have 1 document "
+ << "in config.version collection");
+ }
+ conn.done();
+ } catch (const DBException& e) {
+ return e.toStatus();
}
- bool checkAndUpgradeConfigVersion(CatalogManager* catalogManager,
- bool upgrade,
- VersionType* initialVersionInfo,
- VersionType* versionInfo,
- string* errMsg) {
+ return Status::OK();
+}
+
+bool checkAndUpgradeConfigVersion(CatalogManager* catalogManager,
+ bool upgrade,
+ VersionType* initialVersionInfo,
+ VersionType* versionInfo,
+ string* errMsg) {
+ string dummy;
+ if (!errMsg) {
+ errMsg = &dummy;
+ }
- string dummy;
- if (!errMsg) {
- errMsg = &dummy;
- }
+ Status getConfigStatus = getConfigVersion(catalogManager, versionInfo);
+ if (!getConfigStatus.isOK()) {
+ *errMsg = stream() << "could not load config version for upgrade"
+ << causedBy(getConfigStatus);
+ return false;
+ }
- Status getConfigStatus = getConfigVersion(catalogManager, versionInfo);
- if (!getConfigStatus.isOK()) {
- *errMsg = stream() << "could not load config version for upgrade"
- << causedBy(getConfigStatus);
- return false;
- }
+ versionInfo->cloneTo(initialVersionInfo);
- versionInfo->cloneTo(initialVersionInfo);
+ VersionStatus comp = isConfigVersionCompatible(*versionInfo, errMsg);
- VersionStatus comp = isConfigVersionCompatible(*versionInfo, errMsg);
+ if (comp == VersionStatus_Incompatible)
+ return false;
+ if (comp == VersionStatus_Compatible)
+ return true;
- if (comp == VersionStatus_Incompatible) return false;
- if (comp == VersionStatus_Compatible) return true;
+ invariant(comp == VersionStatus_NeedUpgrade);
- invariant(comp == VersionStatus_NeedUpgrade);
+ //
+ // Our current config version is now greater than the current version, so we should upgrade
+ // if possible.
+ //
- //
- // Our current config version is now greater than the current version, so we should upgrade
- // if possible.
- //
+ // The first empty version is technically an upgrade, but has special semantics
+ bool isEmptyVersion = versionInfo->getCurrentVersion() == UpgradeHistory_EmptyVersion;
- // The first empty version is technically an upgrade, but has special semantics
- bool isEmptyVersion = versionInfo->getCurrentVersion() == UpgradeHistory_EmptyVersion;
+ // First check for the upgrade flag (but no flag is needed if we're upgrading from empty)
+ if (!isEmptyVersion && !upgrade) {
+ *errMsg = stream() << "newer version " << CURRENT_CONFIG_VERSION
+ << " of mongo config metadata is required, "
+ << "current version is " << versionInfo->getCurrentVersion() << ", "
+ << "need to run mongos with --upgrade";
- // First check for the upgrade flag (but no flag is needed if we're upgrading from empty)
- if (!isEmptyVersion && !upgrade) {
- *errMsg = stream() << "newer version " << CURRENT_CONFIG_VERSION
- << " of mongo config metadata is required, " << "current version is "
- << versionInfo->getCurrentVersion() << ", "
- << "need to run mongos with --upgrade";
+ return false;
+ }
- return false;
+ // Contact the config servers to make sure all are online - otherwise we wait a long time
+ // for locks.
+ if (!_checkConfigServersAlive(catalogManager->connectionString(), errMsg)) {
+ if (isEmptyVersion) {
+ *errMsg = stream() << "all config servers must be reachable for initial"
+ << " config database creation" << causedBy(errMsg);
+ } else {
+ *errMsg = stream() << "all config servers must be reachable for config upgrade"
+ << causedBy(errMsg);
}
- // Contact the config servers to make sure all are online - otherwise we wait a long time
- // for locks.
- if (!_checkConfigServersAlive(catalogManager->connectionString(), errMsg)) {
+ return false;
+ }
- if (isEmptyVersion) {
- *errMsg = stream() << "all config servers must be reachable for initial"
- << " config database creation" << causedBy(errMsg);
- }
- else {
- *errMsg = stream() << "all config servers must be reachable for config upgrade"
+ // Check whether or not the balancer is online, if it is online we will not upgrade
+ // (but we will initialize the config server)
+ if (!isEmptyVersion) {
+ auto balSettingsResult = catalogManager->getGlobalSettings(SettingsType::BalancerDocKey);
+ if (balSettingsResult.isOK()) {
+ SettingsType balSettings = balSettingsResult.getValue();
+ if (!balSettings.getBalancerStopped()) {
+ *errMsg = stream() << "balancer must be stopped for config upgrade"
<< causedBy(errMsg);
}
-
- return false;
}
+ }
- // Check whether or not the balancer is online, if it is online we will not upgrade
- // (but we will initialize the config server)
- if (!isEmptyVersion) {
- auto balSettingsResult =
- catalogManager->getGlobalSettings(SettingsType::BalancerDocKey);
- if (balSettingsResult.isOK()) {
- SettingsType balSettings = balSettingsResult.getValue();
- if (!balSettings.getBalancerStopped()) {
- *errMsg = stream() << "balancer must be stopped for config upgrade"
- << causedBy(errMsg);
- }
- }
- }
+ //
+ // Acquire a lock for the upgrade process.
+ //
+ // We want to ensure that only a single mongo process is upgrading the config server at a
+ // time.
+ //
+
+ string whyMessage(stream() << "upgrading config database to new format v"
+ << CURRENT_CONFIG_VERSION);
+ auto lockTimeout = stdx::chrono::milliseconds(20 * 60 * 1000);
+ auto scopedDistLock =
+ catalogManager->getDistLockManager()->lock("configUpgrade", whyMessage, lockTimeout);
+ if (!scopedDistLock.isOK()) {
+ *errMsg = scopedDistLock.getStatus().toString();
+ return false;
+ }
- //
- // Acquire a lock for the upgrade process.
- //
- // We want to ensure that only a single mongo process is upgrading the config server at a
- // time.
- //
+ //
+ // Double-check compatibility inside the upgrade lock
+ // Another process may have won the lock earlier and done the upgrade for us, check
+ // if this is the case.
+ //
+
+ getConfigStatus = getConfigVersion(catalogManager, versionInfo);
+ if (!getConfigStatus.isOK()) {
+ *errMsg = stream() << "could not reload config version for upgrade"
+ << causedBy(getConfigStatus);
+ return false;
+ }
- string whyMessage(stream() << "upgrading config database to new format v"
- << CURRENT_CONFIG_VERSION);
- auto lockTimeout = stdx::chrono::milliseconds(20 * 60 * 1000);
- auto scopedDistLock = catalogManager->getDistLockManager()->lock("configUpgrade",
- whyMessage,
- lockTimeout);
- if (!scopedDistLock.isOK()) {
- *errMsg = scopedDistLock.getStatus().toString();
- return false;
- }
+ versionInfo->cloneTo(initialVersionInfo);
- //
- // Double-check compatibility inside the upgrade lock
- // Another process may have won the lock earlier and done the upgrade for us, check
- // if this is the case.
- //
+ comp = isConfigVersionCompatible(*versionInfo, errMsg);
- getConfigStatus = getConfigVersion(catalogManager, versionInfo);
- if (!getConfigStatus.isOK()) {
- *errMsg = stream() << "could not reload config version for upgrade"
- << causedBy(getConfigStatus);
- return false;
- }
+ if (comp == VersionStatus_Incompatible)
+ return false;
+ if (comp == VersionStatus_Compatible)
+ return true;
- versionInfo->cloneTo(initialVersionInfo);
+ invariant(comp == VersionStatus_NeedUpgrade);
- comp = isConfigVersionCompatible(*versionInfo, errMsg);
+ //
+ // Run through the upgrade steps necessary to bring our config version to the current
+ // version
+ //
- if (comp == VersionStatus_Incompatible) return false;
- if (comp == VersionStatus_Compatible) return true;
+ log() << "starting upgrade of config server from v" << versionInfo->getCurrentVersion()
+ << " to v" << CURRENT_CONFIG_VERSION;
- invariant(comp == VersionStatus_NeedUpgrade);
+ ConfigUpgradeRegistry registry(createRegistry());
+
+ while (versionInfo->getCurrentVersion() < CURRENT_CONFIG_VERSION) {
+ int fromVersion = versionInfo->getCurrentVersion();
//
- // Run through the upgrade steps necessary to bring our config version to the current
- // version
+ // Run the next upgrade process and replace versionInfo with the result of the
+ // upgrade.
//
- log() << "starting upgrade of config server from v" << versionInfo->getCurrentVersion()
- << " to v" << CURRENT_CONFIG_VERSION;
-
- ConfigUpgradeRegistry registry(createRegistry());
-
- while (versionInfo->getCurrentVersion() < CURRENT_CONFIG_VERSION) {
- int fromVersion = versionInfo->getCurrentVersion();
-
- //
- // Run the next upgrade process and replace versionInfo with the result of the
- // upgrade.
- //
-
- if (!_nextUpgrade(catalogManager, registry, *versionInfo, versionInfo, errMsg)) {
- return false;
- }
-
- // Ensure we're making progress here
- if (versionInfo->getCurrentVersion() <= fromVersion) {
+ if (!_nextUpgrade(catalogManager, registry, *versionInfo, versionInfo, errMsg)) {
+ return false;
+ }
- *errMsg = stream() << "bad v" << fromVersion << " config version upgrade, "
- << "version did not increment and is now "
- << versionInfo->getCurrentVersion();
+ // Ensure we're making progress here
+ if (versionInfo->getCurrentVersion() <= fromVersion) {
+ *errMsg = stream() << "bad v" << fromVersion << " config version upgrade, "
+ << "version did not increment and is now "
+ << versionInfo->getCurrentVersion();
- return false;
- }
+ return false;
}
+ }
- invariant(versionInfo->getCurrentVersion() == CURRENT_CONFIG_VERSION);
+ invariant(versionInfo->getCurrentVersion() == CURRENT_CONFIG_VERSION);
- log() << "upgrade of config server to v" << versionInfo->getCurrentVersion()
- << " successful";
+ log() << "upgrade of config server to v" << versionInfo->getCurrentVersion() << " successful";
- return true;
- }
+ return true;
+}
-} // namespace mongo
+} // namespace mongo
diff --git a/src/mongo/s/catalog/legacy/config_upgrade.h b/src/mongo/s/catalog/legacy/config_upgrade.h
index cd37a9cb634..7caf9d5a177 100644
--- a/src/mongo/s/catalog/legacy/config_upgrade.h
+++ b/src/mongo/s/catalog/legacy/config_upgrade.h
@@ -32,118 +32,118 @@
namespace mongo {
- class CatalogManager;
- class Status;
- class VersionType;
+class CatalogManager;
+class Status;
+class VersionType;
+
+/**
+ * UPGRADE HISTORY
+ *
+ * The enum below documents the version changes to *both* the config server data layout
+ * and the versioning protocol between clients (i.e. the set of calls between mongos and
+ * mongod).
+ *
+ * Friendly notice:
+ *
+ * EVERY CHANGE EITHER IN CONFIG LAYOUT AND IN S/D PROTOCOL MUST BE RECORDED HERE BY AN INCREASE
+ * IN THE VERSION AND BY TAKING THE FOLLOWING STEPS. (IF YOU DON'T UNDERSTAND THESE STEPS, YOU
+ * SHOULD PROBABLY NOT BE UPGRADING THE VERSIONS BY YOURSELF.)
+ *
+ * + A new entry in the UpgradeHistory enum is created
+ * + The CURRENT_CONFIG_VERSION below is incremented to that version
+ * + There should be a determination if the MIN_COMPATIBLE_CONFIG_VERSION should be increased or
+ * not. This means determining if, by introducing the changes to layout and/or protocol, the
+ * new mongos/d can co-exist in a cluster with the old ones.
+ * + If layout changes are involved, there should be a corresponding layout upgrade routine. See
+ * for instance config_upgrade_vX_to_vY.cpp.
+ * + Again, if a layout change occurs, the base upgrade method, config_upgrade_v0_to_vX.cpp must
+ * be upgraded. This means that all new clusters will start at the newest versions.
+ *
+ */
+enum UpgradeHistory {
/**
- * UPGRADE HISTORY
- *
- * The enum below documents the version changes to *both* the config server data layout
- * and the versioning protocol between clients (i.e. the set of calls between mongos and
- * mongod).
- *
- * Friendly notice:
+ * The empty version, reported when there is no config server data
+ */
+ UpgradeHistory_EmptyVersion = 0,
+
+ /**
+ * The unreported version older mongoses used before config.version collection existed
*
- * EVERY CHANGE EITHER IN CONFIG LAYOUT AND IN S/D PROTOCOL MUST BE RECORDED HERE BY AN INCREASE
- * IN THE VERSION AND BY TAKING THE FOLLOWING STEPS. (IF YOU DON'T UNDERSTAND THESE STEPS, YOU
- * SHOULD PROBABLY NOT BE UPGRADING THE VERSIONS BY YOURSELF.)
+ * If there is a config.shards/databases/collections collection but no config.version
+ * collection, version 1 is assumed
+ */
+ UpgradeHistory_UnreportedVersion = 1,
+
+ /**
+ * NOTE: We skip version 2 here since it is very old and we shouldn't see it in the wild.
*
- * + A new entry in the UpgradeHistory enum is created
- * + The CURRENT_CONFIG_VERSION below is incremented to that version
- * + There should be a determination if the MIN_COMPATIBLE_CONFIG_VERSION should be increased or
- * not. This means determining if, by introducing the changes to layout and/or protocol, the
- * new mongos/d can co-exist in a cluster with the old ones.
- * + If layout changes are involved, there should be a corresponding layout upgrade routine. See
- * for instance config_upgrade_vX_to_vY.cpp.
- * + Again, if a layout change occurs, the base upgrade method, config_upgrade_v0_to_vX.cpp must
- * be upgraded. This means that all new clusters will start at the newest versions.
+ * Do not skip upgrade versions in the future.
+ */
+
+ /**
+ * Base version used by pre-2.4 mongoses with no collection epochs.
+ */
+ UpgradeHistory_NoEpochVersion = 3,
+
+ /**
+ * Version upgrade which added collection epochs to all sharded collections and
+ * chunks.
*
+ * Also:
+ * + Version document in config.version now of the form:
+ * { minVersion : X, currentVersion : Y, clusterId : OID(...) }
+ * + Mongos pings include a "mongoVersion" field indicating the mongos version
+ * + Mongos pings include a "configVersion" field indicating the current config version
+ * + Mongos explicitly ignores any collection with a "primary" field
*/
- enum UpgradeHistory {
-
- /**
- * The empty version, reported when there is no config server data
- */
- UpgradeHistory_EmptyVersion = 0,
-
- /**
- * The unreported version older mongoses used before config.version collection existed
- *
- * If there is a config.shards/databases/collections collection but no config.version
- * collection, version 1 is assumed
- */
- UpgradeHistory_UnreportedVersion = 1,
-
- /**
- * NOTE: We skip version 2 here since it is very old and we shouldn't see it in the wild.
- *
- * Do not skip upgrade versions in the future.
- */
-
- /**
- * Base version used by pre-2.4 mongoses with no collection epochs.
- */
- UpgradeHistory_NoEpochVersion = 3,
-
- /**
- * Version upgrade which added collection epochs to all sharded collections and
- * chunks.
- *
- * Also:
- * + Version document in config.version now of the form:
- * { minVersion : X, currentVersion : Y, clusterId : OID(...) }
- * + Mongos pings include a "mongoVersion" field indicating the mongos version
- * + Mongos pings include a "configVersion" field indicating the current config version
- * + Mongos explicitly ignores any collection with a "primary" field
- */
- UpgradeHistory_MandatoryEpochVersion = 4,
-
- /**
- * Version upgrade with the following changes:
- *
- * + Dropping a collection from mongos now waits for the chunks to be removed from the
- * config server before contacting each shard. Because of this, mongos should be
- * upgraded first before mongod or never drop collections during upgrade.
- */
- UpgradeHistory_DummyBumpPre2_6 = 5,
-
- /**
- * Version upgrade with the following changes:
- *
- * + "_secondaryThrottle" field for config.settings now accepts write concern
- * specifications.
- * + config.locks { ts: 1 } index is no longer unique.
- */
- UpgradeHistory_DummyBumpPre2_8 = 6, // Note: 2.8 is also known as 3.0.
-
- UpgradeHistory_DummyBumpPre3_0 = 7,
- };
-
- // Earliest version we're compatible with
- const int MIN_COMPATIBLE_CONFIG_VERSION = UpgradeHistory_DummyBumpPre2_8;
-
- // Latest version we know how to communicate with
- const int CURRENT_CONFIG_VERSION = UpgradeHistory_DummyBumpPre3_0;
+ UpgradeHistory_MandatoryEpochVersion = 4,
/**
- * Returns the config version of the cluster pointed at by the connection string.
+ * Version upgrade with the following changes:
*
- * @return OK if version found successfully, error status if something bad happened.
+ * + Dropping a collection from mongos now waits for the chunks to be removed from the
+ * config server before contacting each shard. Because of this, mongos should be
+ * upgraded first before mongod or never drop collections during upgrade.
*/
- Status getConfigVersion(CatalogManager* catalogManager, VersionType* versionInfo);
+ UpgradeHistory_DummyBumpPre2_6 = 5,
/**
- * Checks the config version and ensures it's the latest version, otherwise tries to update.
+ * Version upgrade with the following changes:
*
- * @return true if the config version is now compatible.
- * @return initial and finalVersionInfo indicating the start and end versions of the upgrade.
- * These are the same if no upgrade occurred.
+ * + "_secondaryThrottle" field for config.settings now accepts write concern
+ * specifications.
+ * + config.locks { ts: 1 } index is no longer unique.
*/
- bool checkAndUpgradeConfigVersion(CatalogManager* catalogManager,
- bool upgrade,
- VersionType* initialVersionInfo,
- VersionType* finalVersionInfo,
- std::string* errMsg);
+ UpgradeHistory_DummyBumpPre2_8 = 6, // Note: 2.8 is also known as 3.0.
+
+ UpgradeHistory_DummyBumpPre3_0 = 7,
+};
+
+// Earliest version we're compatible with
+const int MIN_COMPATIBLE_CONFIG_VERSION = UpgradeHistory_DummyBumpPre2_8;
+
+// Latest version we know how to communicate with
+const int CURRENT_CONFIG_VERSION = UpgradeHistory_DummyBumpPre3_0;
+
+/**
+ * Returns the config version of the cluster pointed at by the connection string.
+ *
+ * @return OK if version found successfully, error status if something bad happened.
+ */
+Status getConfigVersion(CatalogManager* catalogManager, VersionType* versionInfo);
+
+/**
+ * Checks the config version and ensures it's the latest version, otherwise tries to update.
+ *
+ * @return true if the config version is now compatible.
+ * @return initial and finalVersionInfo indicating the start and end versions of the upgrade.
+ * These are the same if no upgrade occurred.
+ */
+bool checkAndUpgradeConfigVersion(CatalogManager* catalogManager,
+ bool upgrade,
+ VersionType* initialVersionInfo,
+ VersionType* finalVersionInfo,
+ std::string* errMsg);
-} // namespace mongo
+} // namespace mongo
diff --git a/src/mongo/s/catalog/legacy/config_upgrade_helpers.cpp b/src/mongo/s/catalog/legacy/config_upgrade_helpers.cpp
index dfde57d6774..41bc88eeb41 100644
--- a/src/mongo/s/catalog/legacy/config_upgrade_helpers.cpp
+++ b/src/mongo/s/catalog/legacy/config_upgrade_helpers.cpp
@@ -45,81 +45,76 @@
namespace mongo {
- using std::endl;
- using std::string;
- using std::unique_ptr;
+using std::endl;
+using std::string;
+using std::unique_ptr;
- using mongoutils::str::stream;
+using mongoutils::str::stream;
- // Custom field used in upgrade state to determine if/where we failed on last upgrade
- const BSONField<bool> inCriticalSectionField("inCriticalSection", false);
+// Custom field used in upgrade state to determine if/where we failed on last upgrade
+const BSONField<bool> inCriticalSectionField("inCriticalSection", false);
- Status preUpgradeCheck(CatalogManager* catalogManager,
- const VersionType& lastVersionInfo,
- string minMongosVersion) {
-
- if (lastVersionInfo.isUpgradeIdSet() && lastVersionInfo.getUpgradeId().isSet()) {
- //
- // Another upgrade failed, so cleanup may be necessary
- //
-
- BSONObj lastUpgradeState = lastVersionInfo.getUpgradeState();
+Status preUpgradeCheck(CatalogManager* catalogManager,
+ const VersionType& lastVersionInfo,
+ string minMongosVersion) {
+ if (lastVersionInfo.isUpgradeIdSet() && lastVersionInfo.getUpgradeId().isSet()) {
+ //
+ // Another upgrade failed, so cleanup may be necessary
+ //
- bool inCriticalSection;
- string errMsg;
- if (!FieldParser::extract(lastUpgradeState,
- inCriticalSectionField,
- &inCriticalSection,
- &errMsg)) {
- return Status(ErrorCodes::FailedToParse, causedBy(errMsg));
- }
+ BSONObj lastUpgradeState = lastVersionInfo.getUpgradeState();
- if (inCriticalSection) {
- // Note: custom message must be supplied by caller
- return Status(ErrorCodes::ManualInterventionRequired, "");
- }
+ bool inCriticalSection;
+ string errMsg;
+ if (!FieldParser::extract(
+ lastUpgradeState, inCriticalSectionField, &inCriticalSection, &errMsg)) {
+ return Status(ErrorCodes::FailedToParse, causedBy(errMsg));
}
- //
- // Check the versions of other mongo processes in the cluster before upgrade.
- // We can't upgrade if there are active pre-v2.4 processes in the cluster
- //
- return checkClusterMongoVersions(catalogManager, string(minMongosVersion));
- }
-
- Status commitConfigUpgrade(CatalogManager* catalogManager,
- int currentVersion,
- int minCompatibleVersion,
- int newVersion) {
-
- // Note: DO NOT CLEAR the config version unless bumping the minCompatibleVersion,
- // we want to save the excludes that were set.
-
- BSONObjBuilder setObj;
- setObj << VersionType::minCompatibleVersion(minCompatibleVersion);
- setObj << VersionType::currentVersion(newVersion);
-
- BSONObjBuilder unsetObj;
- unsetObj.append(VersionType::upgradeId(), 1);
- unsetObj.append(VersionType::upgradeState(), 1);
- unsetObj.append("version", 1); // remove deprecated field, no longer supported >= v3.0.
-
- Status result = catalogManager->update(
- VersionType::ConfigNS,
- BSON("_id" << 1 << VersionType::currentVersion(currentVersion)),
- BSON("$set" << setObj.done() << "$unset" << unsetObj.done()),
- false,
- false,
- NULL);
- if (!result.isOK()) {
- return Status(result.code(),
- str::stream() << "could not write new version info "
- << " and exit critical upgrade section: "
- << result.reason());
+ if (inCriticalSection) {
+ // Note: custom message must be supplied by caller
+ return Status(ErrorCodes::ManualInterventionRequired, "");
}
+ }
- return result;
+ //
+ // Check the versions of other mongo processes in the cluster before upgrade.
+ // We can't upgrade if there are active pre-v2.4 processes in the cluster
+ //
+ return checkClusterMongoVersions(catalogManager, string(minMongosVersion));
+}
+
+Status commitConfigUpgrade(CatalogManager* catalogManager,
+ int currentVersion,
+ int minCompatibleVersion,
+ int newVersion) {
+ // Note: DO NOT CLEAR the config version unless bumping the minCompatibleVersion,
+ // we want to save the excludes that were set.
+
+ BSONObjBuilder setObj;
+ setObj << VersionType::minCompatibleVersion(minCompatibleVersion);
+ setObj << VersionType::currentVersion(newVersion);
+
+ BSONObjBuilder unsetObj;
+ unsetObj.append(VersionType::upgradeId(), 1);
+ unsetObj.append(VersionType::upgradeState(), 1);
+ unsetObj.append("version", 1); // remove deprecated field, no longer supported >= v3.0.
+
+ Status result =
+ catalogManager->update(VersionType::ConfigNS,
+ BSON("_id" << 1 << VersionType::currentVersion(currentVersion)),
+ BSON("$set" << setObj.done() << "$unset" << unsetObj.done()),
+ false,
+ false,
+ NULL);
+ if (!result.isOK()) {
+ return Status(result.code(),
+ str::stream() << "could not write new version info "
+ << " and exit critical upgrade section: " << result.reason());
}
-} // namespace mongo
+ return result;
+}
+
+} // namespace mongo
diff --git a/src/mongo/s/catalog/legacy/config_upgrade_helpers.h b/src/mongo/s/catalog/legacy/config_upgrade_helpers.h
index 10d03676ea8..7411737e262 100644
--- a/src/mongo/s/catalog/legacy/config_upgrade_helpers.h
+++ b/src/mongo/s/catalog/legacy/config_upgrade_helpers.h
@@ -39,32 +39,32 @@
namespace mongo {
- class CatalogManager;
- class ConnectionString;
- class OID;
- class Status;
- class VersionType;
+class CatalogManager;
+class ConnectionString;
+class OID;
+class Status;
+class VersionType;
- /**
- * Checks whether an unsuccessful upgrade was performed last time and also checks whether
- * the mongos in the current cluster have the mimimum version required. Returns not ok if
- * the check failed and the upgrade should not proceed.
- *
- * Note: There is also a special case for ManualInterventionRequired error where the
- * message will be empty.
- */
- Status preUpgradeCheck(CatalogManager* catalogManager,
- const VersionType& lastVersionInfo,
- std::string minMongosVersion);
+/**
+ * Checks whether an unsuccessful upgrade was performed last time and also checks whether
+ * the mongos in the current cluster have the mimimum version required. Returns not ok if
+ * the check failed and the upgrade should not proceed.
+ *
+ * Note: There is also a special case for ManualInterventionRequired error where the
+ * message will be empty.
+ */
+Status preUpgradeCheck(CatalogManager* catalogManager,
+ const VersionType& lastVersionInfo,
+ std::string minMongosVersion);
- /**
- * Informs the config server that the upgrade task was completed by bumping the version.
- * This also clears all upgrade state effectively leaving the critical section if the
- * upgrade process did enter it.
- */
- Status commitConfigUpgrade(CatalogManager* catalogManager,
- int currentVersion,
- int minCompatibleVersion,
- int newVersion);
+/**
+ * Informs the config server that the upgrade task was completed by bumping the version.
+ * This also clears all upgrade state effectively leaving the critical section if the
+ * upgrade process did enter it.
+ */
+Status commitConfigUpgrade(CatalogManager* catalogManager,
+ int currentVersion,
+ int minCompatibleVersion,
+ int newVersion);
-} // namespace mongo
+} // namespace mongo
diff --git a/src/mongo/s/catalog/legacy/config_upgrade_v0_to_v7.cpp b/src/mongo/s/catalog/legacy/config_upgrade_v0_to_v7.cpp
index 787d0114e73..f086fabc521 100644
--- a/src/mongo/s/catalog/legacy/config_upgrade_v0_to_v7.cpp
+++ b/src/mongo/s/catalog/legacy/config_upgrade_v0_to_v7.cpp
@@ -39,59 +39,57 @@
namespace mongo {
- using std::string;
- using mongo::str::stream;
-
-
- /**
- * Upgrade v0 to v7 described here
- *
- * This upgrade takes the config server from empty to an initial version.
- */
- bool doUpgradeV0ToV7(CatalogManager* catalogManager,
- const VersionType& lastVersionInfo,
- string* errMsg) {
-
- string dummy;
- if (!errMsg) errMsg = &dummy;
-
- verify(lastVersionInfo.getCurrentVersion() == UpgradeHistory_EmptyVersion);
-
- //
- // Even though the initial config write is a single-document update, that single document
- // is on multiple config servers and requests can interleave. The upgrade lock prevents
- // this.
- //
-
- log() << "writing initial config version at v" << CURRENT_CONFIG_VERSION;
-
- OID newClusterId = OID::gen();
-
- VersionType versionInfo;
-
- // Upgrade to new version
- versionInfo.setMinCompatibleVersion(MIN_COMPATIBLE_CONFIG_VERSION);
- versionInfo.setCurrentVersion(CURRENT_CONFIG_VERSION);
- versionInfo.setClusterId(newClusterId);
-
- verify(versionInfo.isValid(NULL));
-
- // If the cluster has not previously been initialized, we need to set the version before
- // using so subsequent mongoses use the config data the same way. This requires all three
- // config servers online initially.
- Status result = catalogManager->update(VersionType::ConfigNS,
- BSON("_id" << 1),
- versionInfo.toBSON(),
- true, // upsert
- false, // multi
- NULL);
- if (!result.isOK()) {
- *errMsg = stream() << "error writing initial config version: "
- << result.reason();
- return false;
- }
-
- return true;
+using std::string;
+using mongo::str::stream;
+
+
+/**
+ * Upgrade v0 to v7 described here
+ *
+ * This upgrade takes the config server from empty to an initial version.
+ */
+bool doUpgradeV0ToV7(CatalogManager* catalogManager,
+ const VersionType& lastVersionInfo,
+ string* errMsg) {
+ string dummy;
+ if (!errMsg)
+ errMsg = &dummy;
+
+ verify(lastVersionInfo.getCurrentVersion() == UpgradeHistory_EmptyVersion);
+
+ //
+ // Even though the initial config write is a single-document update, that single document
+ // is on multiple config servers and requests can interleave. The upgrade lock prevents
+ // this.
+ //
+
+ log() << "writing initial config version at v" << CURRENT_CONFIG_VERSION;
+
+ OID newClusterId = OID::gen();
+
+ VersionType versionInfo;
+
+ // Upgrade to new version
+ versionInfo.setMinCompatibleVersion(MIN_COMPATIBLE_CONFIG_VERSION);
+ versionInfo.setCurrentVersion(CURRENT_CONFIG_VERSION);
+ versionInfo.setClusterId(newClusterId);
+
+ verify(versionInfo.isValid(NULL));
+
+ // If the cluster has not previously been initialized, we need to set the version before
+ // using so subsequent mongoses use the config data the same way. This requires all three
+ // config servers online initially.
+ Status result = catalogManager->update(VersionType::ConfigNS,
+ BSON("_id" << 1),
+ versionInfo.toBSON(),
+ true, // upsert
+ false, // multi
+ NULL);
+ if (!result.isOK()) {
+ *errMsg = stream() << "error writing initial config version: " << result.reason();
+ return false;
}
+ return true;
+}
}
diff --git a/src/mongo/s/catalog/legacy/config_upgrade_v6_to_v7.cpp b/src/mongo/s/catalog/legacy/config_upgrade_v6_to_v7.cpp
index 6f841b0dc87..430d2eedb6e 100644
--- a/src/mongo/s/catalog/legacy/config_upgrade_v6_to_v7.cpp
+++ b/src/mongo/s/catalog/legacy/config_upgrade_v6_to_v7.cpp
@@ -40,74 +40,73 @@
namespace mongo {
- using std::list;
- using std::string;
- using std::vector;
-
- static const char* minMongoProcessVersion = "3.0";
-
- static const char* cannotCleanupMessage =
- "\n\n"
- "******\n"
- "Cannot upgrade config database from v6 to v7 because a previous upgrade\n"
- "failed in the critical section. Manual intervention is required to re-sync\n"
- "the config servers.\n"
- "******\n";
-
- /**
- * Upgrades v6 to v7.
- */
- bool doUpgradeV6ToV7(CatalogManager* catalogManager,
- const VersionType& lastVersionInfo,
- string* errMsg) {
-
- string dummy;
- if (!errMsg) errMsg = &dummy;
-
- invariant(lastVersionInfo.getCurrentVersion() == UpgradeHistory_DummyBumpPre2_8);
-
- Status result = preUpgradeCheck(catalogManager, lastVersionInfo, minMongoProcessVersion);
- if (!result.isOK()) {
- if (result.code() == ErrorCodes::ManualInterventionRequired) {
- *errMsg = cannotCleanupMessage;
- }
- else {
- *errMsg = result.toString();
- }
-
- return false;
- }
+using std::list;
+using std::string;
+using std::vector;
+
+static const char* minMongoProcessVersion = "3.0";
+
+static const char* cannotCleanupMessage =
+ "\n\n"
+ "******\n"
+ "Cannot upgrade config database from v6 to v7 because a previous upgrade\n"
+ "failed in the critical section. Manual intervention is required to re-sync\n"
+ "the config servers.\n"
+ "******\n";
- // This is not needed because we are not actually going to make any modifications
- // on the other collections in the config server for this particular upgrade.
- // startConfigUpgrade(configLoc.toString(),
- // lastVersionInfo.getCurrentVersion(),
- // OID::gen());
-
- // If we actually need to modify something in the config servers these need to follow
- // after calling startConfigUpgrade(...):
- //
- // 1. Acquire necessary locks.
- // 2. Make a backup of the collections we are about to modify.
- // 3. Perform the upgrade process on the backup collection.
- // 4. Verify that no changes were made to the collections since the backup was performed.
- // 5. Call enterConfigUpgradeCriticalSection(configLoc.toString(),
- // lastVersionInfo.getCurrentVersion()).
- // 6. Rename the backup collection to the name of the original collection with
- // dropTarget set to true.
-
- // We're only after the version bump in commitConfigUpgrade here since we never
- // get into the critical section.
- Status commitStatus = commitConfigUpgrade(catalogManager,
- lastVersionInfo.getCurrentVersion(),
- MIN_COMPATIBLE_CONFIG_VERSION,
- CURRENT_CONFIG_VERSION);
-
- if (!commitStatus.isOK()) {
- *errMsg = commitStatus.toString();
- return false;
+/**
+ * Upgrades v6 to v7.
+ */
+bool doUpgradeV6ToV7(CatalogManager* catalogManager,
+ const VersionType& lastVersionInfo,
+ string* errMsg) {
+ string dummy;
+ if (!errMsg)
+ errMsg = &dummy;
+
+ invariant(lastVersionInfo.getCurrentVersion() == UpgradeHistory_DummyBumpPre2_8);
+
+ Status result = preUpgradeCheck(catalogManager, lastVersionInfo, minMongoProcessVersion);
+ if (!result.isOK()) {
+ if (result.code() == ErrorCodes::ManualInterventionRequired) {
+ *errMsg = cannotCleanupMessage;
+ } else {
+ *errMsg = result.toString();
}
- return true;
+ return false;
}
+
+ // This is not needed because we are not actually going to make any modifications
+ // on the other collections in the config server for this particular upgrade.
+ // startConfigUpgrade(configLoc.toString(),
+ // lastVersionInfo.getCurrentVersion(),
+ // OID::gen());
+
+ // If we actually need to modify something in the config servers these need to follow
+ // after calling startConfigUpgrade(...):
+ //
+ // 1. Acquire necessary locks.
+ // 2. Make a backup of the collections we are about to modify.
+ // 3. Perform the upgrade process on the backup collection.
+ // 4. Verify that no changes were made to the collections since the backup was performed.
+ // 5. Call enterConfigUpgradeCriticalSection(configLoc.toString(),
+ // lastVersionInfo.getCurrentVersion()).
+ // 6. Rename the backup collection to the name of the original collection with
+ // dropTarget set to true.
+
+ // We're only after the version bump in commitConfigUpgrade here since we never
+ // get into the critical section.
+ Status commitStatus = commitConfigUpgrade(catalogManager,
+ lastVersionInfo.getCurrentVersion(),
+ MIN_COMPATIBLE_CONFIG_VERSION,
+ CURRENT_CONFIG_VERSION);
+
+ if (!commitStatus.isOK()) {
+ *errMsg = commitStatus.toString();
+ return false;
+ }
+
+ return true;
+}
}
diff --git a/src/mongo/s/catalog/legacy/distlock.cpp b/src/mongo/s/catalog/legacy/distlock.cpp
index 22c0a4e6128..b0d30b28967 100644
--- a/src/mongo/s/catalog/legacy/distlock.cpp
+++ b/src/mongo/s/catalog/legacy/distlock.cpp
@@ -43,796 +43,760 @@
namespace mongo {
- using std::endl;
- using std::list;
- using std::set;
- using std::string;
- using std::stringstream;
- using std::unique_ptr;
- using std::vector;
-
- LabeledLevel DistributedLock::logLvl( 1 );
- DistributedLock::LastPings DistributedLock::lastPings;
-
- ThreadLocalValue<string> distLockIds("");
-
- /* ==================
- * Module initialization
- */
-
- static SimpleMutex _cachedProcessMutex;
- static string* _cachedProcessString = NULL;
-
- static void initModule() {
- stdx::lock_guard<SimpleMutex> lk(_cachedProcessMutex);
- if (_cachedProcessString) {
- // someone got the lock before us
- return;
- }
-
- // cache process string
- stringstream ss;
- ss << getHostName() << ":" << serverGlobalParams.port << ":" << time(0) << ":" << rand();
- _cachedProcessString = new string( ss.str() );
- }
-
- /* =================== */
+using std::endl;
+using std::list;
+using std::set;
+using std::string;
+using std::stringstream;
+using std::unique_ptr;
+using std::vector;
- string getDistLockProcess() {
- if (!_cachedProcessString)
- initModule();
- verify( _cachedProcessString );
- return *_cachedProcessString;
- }
+LabeledLevel DistributedLock::logLvl(1);
+DistributedLock::LastPings DistributedLock::lastPings;
- string getDistLockId() {
- string s = distLockIds.get();
- if ( s.empty() ) {
- stringstream ss;
- ss << getDistLockProcess() << ":" << getThreadName() << ":" << rand();
- s = ss.str();
- distLockIds.set( s );
- }
- return s;
- }
+ThreadLocalValue<string> distLockIds("");
- LockException::LockException(StringData msg, int code): LockException(msg, code, OID()) {
- }
+/* ==================
+ * Module initialization
+ */
- LockException::LockException(StringData msg, int code, DistLockHandle lockID):
- DBException(msg.toString(), code),
- _mustUnlockID(lockID) {
- }
+static SimpleMutex _cachedProcessMutex;
+static string* _cachedProcessString = NULL;
- DistLockHandle LockException::getMustUnlockID() const {
- return _mustUnlockID;
+static void initModule() {
+ stdx::lock_guard<SimpleMutex> lk(_cachedProcessMutex);
+ if (_cachedProcessString) {
+ // someone got the lock before us
+ return;
}
- /**
- * Create a new distributed lock, potentially with a custom sleep and takeover time. If a custom sleep time is
- * specified (time between pings)
- */
- DistributedLock::DistributedLock( const ConnectionString& conn , const string& name , unsigned long long lockTimeout, bool asProcess )
- : _conn(conn), _name(name),
- _processId( asProcess ? getDistLockId() : getDistLockProcess() ),
- _lockTimeout( lockTimeout == 0 ? LOCK_TIMEOUT : lockTimeout ),
- _maxClockSkew( _lockTimeout / LOCK_SKEW_FACTOR ), _maxNetSkew( _maxClockSkew ),
- _lockPing( _maxClockSkew )
- {
- LOG( logLvl ) << "created new distributed lock for " << name << " on " << conn
- << " ( lock timeout : " << _lockTimeout
- << ", ping interval : " << _lockPing << ", process : " << asProcess << " )" << endl;
-
-
- }
+ // cache process string
+ stringstream ss;
+ ss << getHostName() << ":" << serverGlobalParams.port << ":" << time(0) << ":" << rand();
+ _cachedProcessString = new string(ss.str());
+}
- DistLockPingInfo DistributedLock::LastPings::getLastPing(const ConnectionString& conn,
- const string& lockName) {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
- return _lastPings[std::make_pair(conn.toString(), lockName)];
- }
+/* =================== */
- void DistributedLock::LastPings::setLastPing(const ConnectionString& conn,
- const string& lockName,
- const DistLockPingInfo& pd) {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
- _lastPings[std::make_pair(conn.toString(), lockName)] = pd;
- }
+string getDistLockProcess() {
+ if (!_cachedProcessString)
+ initModule();
+ verify(_cachedProcessString);
+ return *_cachedProcessString;
+}
- Date_t DistributedLock::getRemoteTime() const {
- return DistributedLock::remoteTime(_conn, _maxNetSkew);
+string getDistLockId() {
+ string s = distLockIds.get();
+ if (s.empty()) {
+ stringstream ss;
+ ss << getDistLockProcess() << ":" << getThreadName() << ":" << rand();
+ s = ss.str();
+ distLockIds.set(s);
}
+ return s;
+}
- bool DistributedLock::isRemoteTimeSkewed() const {
- return !DistributedLock::checkSkew(_conn,
- NUM_LOCK_SKEW_CHECKS,
- _maxClockSkew,
- _maxNetSkew);
- }
+LockException::LockException(StringData msg, int code) : LockException(msg, code, OID()) {}
- const ConnectionString& DistributedLock::getRemoteConnection() const {
- return _conn;
- }
+LockException::LockException(StringData msg, int code, DistLockHandle lockID)
+ : DBException(msg.toString(), code), _mustUnlockID(lockID) {}
- const string& DistributedLock::getProcessId() const {
- return _processId;
- }
-
- /**
- * Returns the remote time as reported by the cluster or server. The maximum difference between the reported time
- * and the actual time on the remote server (at the completion of the function) is the maxNetSkew
- */
- Date_t DistributedLock::remoteTime( const ConnectionString& cluster, unsigned long long maxNetSkew ) {
+DistLockHandle LockException::getMustUnlockID() const {
+ return _mustUnlockID;
+}
- ConnectionString server( *cluster.getServers().begin() );
+/**
+ * Create a new distributed lock, potentially with a custom sleep and takeover time. If a custom sleep time is
+ * specified (time between pings)
+ */
+DistributedLock::DistributedLock(const ConnectionString& conn,
+ const string& name,
+ unsigned long long lockTimeout,
+ bool asProcess)
+ : _conn(conn),
+ _name(name),
+ _processId(asProcess ? getDistLockId() : getDistLockProcess()),
+ _lockTimeout(lockTimeout == 0 ? LOCK_TIMEOUT : lockTimeout),
+ _maxClockSkew(_lockTimeout / LOCK_SKEW_FACTOR),
+ _maxNetSkew(_maxClockSkew),
+ _lockPing(_maxClockSkew) {
+ LOG(logLvl) << "created new distributed lock for " << name << " on " << conn
+ << " ( lock timeout : " << _lockTimeout << ", ping interval : " << _lockPing
+ << ", process : " << asProcess << " )" << endl;
+}
- // Get result and delay if successful, errMsg if not
- bool success = false;
- BSONObj result;
- string errMsg;
- Milliseconds delay{0};
+DistLockPingInfo DistributedLock::LastPings::getLastPing(const ConnectionString& conn,
+ const string& lockName) {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ return _lastPings[std::make_pair(conn.toString(), lockName)];
+}
- unique_ptr<ScopedDbConnection> connPtr;
- try {
- connPtr.reset( new ScopedDbConnection( server.toString() ) );
- ScopedDbConnection& conn = *connPtr;
+void DistributedLock::LastPings::setLastPing(const ConnectionString& conn,
+ const string& lockName,
+ const DistLockPingInfo& pd) {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ _lastPings[std::make_pair(conn.toString(), lockName)] = pd;
+}
- Date_t then = jsTime();
- success = conn->runCommand( string( "admin" ), BSON( "serverStatus" << 1 ), result );
- delay = jsTime() - then;
+Date_t DistributedLock::getRemoteTime() const {
+ return DistributedLock::remoteTime(_conn, _maxNetSkew);
+}
- if ( !success ) errMsg = result.toString();
- conn.done();
- }
- catch ( const DBException& ex ) {
+bool DistributedLock::isRemoteTimeSkewed() const {
+ return !DistributedLock::checkSkew(_conn, NUM_LOCK_SKEW_CHECKS, _maxClockSkew, _maxNetSkew);
+}
- if ( connPtr && connPtr->get()->isFailed() ) {
- // Return to the pool so the pool knows about the failure
- connPtr->done();
- }
+const ConnectionString& DistributedLock::getRemoteConnection() const {
+ return _conn;
+}
- success = false;
- errMsg = ex.toString();
- }
-
- if( !success ) {
- throw TimeNotFoundException( str::stream() << "could not get status from server "
- << server.toString() << " in cluster "
- << cluster.toString() << " to check time"
- << causedBy( errMsg ),
- 13647 );
- }
+const string& DistributedLock::getProcessId() const {
+ return _processId;
+}
- // Make sure that our delay is not more than 2x our maximum network skew, since this is the max our remote
- // time value can be off by if we assume a response in the middle of the delay.
- if (delay > Milliseconds(maxNetSkew * 2)) {
- throw TimeNotFoundException( str::stream()
- << "server " << server.toString() << " in cluster "
- << cluster.toString()
- << " did not respond within max network delay of "
- << maxNetSkew << "ms",
- 13648 );
+/**
+ * Returns the remote time as reported by the cluster or server. The maximum difference between the reported time
+ * and the actual time on the remote server (at the completion of the function) is the maxNetSkew
+ */
+Date_t DistributedLock::remoteTime(const ConnectionString& cluster, unsigned long long maxNetSkew) {
+ ConnectionString server(*cluster.getServers().begin());
+
+ // Get result and delay if successful, errMsg if not
+ bool success = false;
+ BSONObj result;
+ string errMsg;
+ Milliseconds delay{0};
+
+ unique_ptr<ScopedDbConnection> connPtr;
+ try {
+ connPtr.reset(new ScopedDbConnection(server.toString()));
+ ScopedDbConnection& conn = *connPtr;
+
+ Date_t then = jsTime();
+ success = conn->runCommand(string("admin"), BSON("serverStatus" << 1), result);
+ delay = jsTime() - then;
+
+ if (!success)
+ errMsg = result.toString();
+ conn.done();
+ } catch (const DBException& ex) {
+ if (connPtr && connPtr->get()->isFailed()) {
+ // Return to the pool so the pool knows about the failure
+ connPtr->done();
}
- return result["localTime"].Date() - (delay / 2);
+ success = false;
+ errMsg = ex.toString();
}
- bool DistributedLock::checkSkew( const ConnectionString& cluster, unsigned skewChecks, unsigned long long maxClockSkew, unsigned long long maxNetSkew ) {
-
- vector<HostAndPort> servers = cluster.getServers();
+ if (!success) {
+ throw TimeNotFoundException(str::stream() << "could not get status from server "
+ << server.toString() << " in cluster "
+ << cluster.toString() << " to check time"
+ << causedBy(errMsg),
+ 13647);
+ }
- if(servers.size() < 1) return true;
+ // Make sure that our delay is not more than 2x our maximum network skew, since this is the max our remote
+ // time value can be off by if we assume a response in the middle of the delay.
+ if (delay > Milliseconds(maxNetSkew * 2)) {
+ throw TimeNotFoundException(
+ str::stream() << "server " << server.toString() << " in cluster " << cluster.toString()
+ << " did not respond within max network delay of " << maxNetSkew << "ms",
+ 13648);
+ }
- vector<long long> avgSkews;
+ return result["localTime"].Date() - (delay / 2);
+}
- for(unsigned i = 0; i < skewChecks; i++) {
+bool DistributedLock::checkSkew(const ConnectionString& cluster,
+ unsigned skewChecks,
+ unsigned long long maxClockSkew,
+ unsigned long long maxNetSkew) {
+ vector<HostAndPort> servers = cluster.getServers();
- // Find the average skew for each server
- unsigned s = 0;
- for(vector<HostAndPort>::iterator si = servers.begin(); si != servers.end(); ++si,s++) {
+ if (servers.size() < 1)
+ return true;
- if(i == 0) avgSkews.push_back(0);
+ vector<long long> avgSkews;
- // Could check if this is self, but shouldn't matter since local network connection should be fast.
- ConnectionString server( *si );
+ for (unsigned i = 0; i < skewChecks; i++) {
+ // Find the average skew for each server
+ unsigned s = 0;
+ for (vector<HostAndPort>::iterator si = servers.begin(); si != servers.end(); ++si, s++) {
+ if (i == 0)
+ avgSkews.push_back(0);
- vector<long long> skew;
+ // Could check if this is self, but shouldn't matter since local network connection should be fast.
+ ConnectionString server(*si);
- BSONObj result;
+ vector<long long> skew;
- Date_t remote = remoteTime( server, maxNetSkew );
- Date_t local = jsTime();
+ BSONObj result;
- // Remote time can be delayed by at most MAX_NET_SKEW
+ Date_t remote = remoteTime(server, maxNetSkew);
+ Date_t local = jsTime();
- // Skew is how much time we'd have to add to local to get to remote
- avgSkews[s] += (remote - local).count();
+ // Remote time can be delayed by at most MAX_NET_SKEW
- LOG( logLvl + 1 ) << "skew from remote server " << server << " found: "
- << (remote - local).count();
+ // Skew is how much time we'd have to add to local to get to remote
+ avgSkews[s] += (remote - local).count();
- }
+ LOG(logLvl + 1) << "skew from remote server " << server
+ << " found: " << (remote - local).count();
}
+ }
- // Analyze skews
-
- long long serverMaxSkew = 0;
- long long serverMinSkew = 0;
+ // Analyze skews
- for(unsigned s = 0; s < avgSkews.size(); s++) {
+ long long serverMaxSkew = 0;
+ long long serverMinSkew = 0;
- long long avgSkew = (avgSkews[s] /= skewChecks);
+ for (unsigned s = 0; s < avgSkews.size(); s++) {
+ long long avgSkew = (avgSkews[s] /= skewChecks);
- // Keep track of max and min skews
- if(s == 0) {
+ // Keep track of max and min skews
+ if (s == 0) {
+ serverMaxSkew = avgSkew;
+ serverMinSkew = avgSkew;
+ } else {
+ if (avgSkew > serverMaxSkew)
serverMaxSkew = avgSkew;
+ if (avgSkew < serverMinSkew)
serverMinSkew = avgSkew;
- }
- else {
- if(avgSkew > serverMaxSkew)
- serverMaxSkew = avgSkew;
- if(avgSkew < serverMinSkew)
- serverMinSkew = avgSkew;
- }
-
}
+ }
- long long totalSkew = serverMaxSkew - serverMinSkew;
-
- // Make sure our max skew is not more than our pre-set limit
- if(totalSkew > (long long) maxClockSkew) {
- LOG( logLvl + 1 ) << "total clock skew of " << totalSkew << "ms for servers " << cluster << " is out of " << maxClockSkew << "ms bounds." << endl;
- return false;
- }
+ long long totalSkew = serverMaxSkew - serverMinSkew;
- LOG( logLvl + 1 ) << "total clock skew of " << totalSkew << "ms for servers " << cluster << " is in " << maxClockSkew << "ms bounds." << endl;
- return true;
+ // Make sure our max skew is not more than our pre-set limit
+ if (totalSkew > (long long)maxClockSkew) {
+ LOG(logLvl + 1) << "total clock skew of " << totalSkew << "ms for servers " << cluster
+ << " is out of " << maxClockSkew << "ms bounds." << endl;
+ return false;
}
- Status DistributedLock::checkStatus(double timeout) {
-
- BSONObj lockObj;
- try {
- ScopedDbConnection conn(_conn.toString(), timeout );
- lockObj = conn->findOne( LocksType::ConfigNS,
- BSON( LocksType::name(_name) ) ).getOwned();
- conn.done();
- }
- catch ( DBException& e ) {
- return e.toStatus();
- }
+ LOG(logLvl + 1) << "total clock skew of " << totalSkew << "ms for servers " << cluster
+ << " is in " << maxClockSkew << "ms bounds." << endl;
+ return true;
+}
- if ( lockObj.isEmpty() ) {
- return Status(ErrorCodes::LockFailed,
- str::stream() << "no lock for " << _name << " exists in the locks collection");
- }
+Status DistributedLock::checkStatus(double timeout) {
+ BSONObj lockObj;
+ try {
+ ScopedDbConnection conn(_conn.toString(), timeout);
+ lockObj = conn->findOne(LocksType::ConfigNS, BSON(LocksType::name(_name))).getOwned();
+ conn.done();
+ } catch (DBException& e) {
+ return e.toStatus();
+ }
- if ( lockObj[LocksType::state()].numberInt() < 2 ) {
- return Status(ErrorCodes::LockFailed,
- str::stream() << "lock " << _name << " current state is not held ("
- << lockObj[LocksType::state()].numberInt() << ")");
- }
+ if (lockObj.isEmpty()) {
+ return Status(ErrorCodes::LockFailed,
+ str::stream() << "no lock for " << _name
+ << " exists in the locks collection");
+ }
- if ( lockObj[LocksType::process()].String() != _processId ) {
- return Status(ErrorCodes::LockFailed,
- str::stream() << "lock " << _name << " is currently being held by "
- << "another process ("
- << lockObj[LocksType::process()].String() << ")");
- }
+ if (lockObj[LocksType::state()].numberInt() < 2) {
+ return Status(ErrorCodes::LockFailed,
+ str::stream() << "lock " << _name << " current state is not held ("
+ << lockObj[LocksType::state()].numberInt() << ")");
+ }
- return Status::OK();
+ if (lockObj[LocksType::process()].String() != _processId) {
+ return Status(ErrorCodes::LockFailed,
+ str::stream() << "lock " << _name << " is currently being held by "
+ << "another process (" << lockObj[LocksType::process()].String()
+ << ")");
}
- static void logErrMsgOrWarn(StringData messagePrefix,
- StringData lockName,
- StringData errMsg,
- StringData altErrMsg) {
+ return Status::OK();
+}
- if (errMsg.empty()) {
- LOG(DistributedLock::logLvl - 1) << messagePrefix << " '" << lockName << "' " <<
- altErrMsg << std::endl;
- }
- else {
- warning() << messagePrefix << " '" << lockName << "' " << causedBy(errMsg.toString());
- }
+static void logErrMsgOrWarn(StringData messagePrefix,
+ StringData lockName,
+ StringData errMsg,
+ StringData altErrMsg) {
+ if (errMsg.empty()) {
+ LOG(DistributedLock::logLvl - 1) << messagePrefix << " '" << lockName << "' " << altErrMsg
+ << std::endl;
+ } else {
+ warning() << messagePrefix << " '" << lockName << "' " << causedBy(errMsg.toString());
}
+}
- // Semantics of this method are basically that if the lock cannot be acquired, returns false,
- // can be retried. If the lock should not be tried again (some unexpected error),
- // a LockException is thrown.
- bool DistributedLock::lock_try(const string& why, BSONObj* other, double timeout) {
- // This should always be true, if not, we are using the lock incorrectly.
- verify( _name != "" );
+// Semantics of this method are basically that if the lock cannot be acquired, returns false,
+// can be retried. If the lock should not be tried again (some unexpected error),
+// a LockException is thrown.
+bool DistributedLock::lock_try(const string& why, BSONObj* other, double timeout) {
+ // This should always be true, if not, we are using the lock incorrectly.
+ verify(_name != "");
- LOG( logLvl ) << "trying to acquire new distributed lock for " << _name << " on " << _conn
- << " ( lock timeout : " << _lockTimeout
- << ", ping interval : " << _lockPing << ", process : " << _processId << " )"
- << endl;
+ LOG(logLvl) << "trying to acquire new distributed lock for " << _name << " on " << _conn
+ << " ( lock timeout : " << _lockTimeout << ", ping interval : " << _lockPing
+ << ", process : " << _processId << " )" << endl;
- // write to dummy if 'other' is null
- BSONObj dummyOther;
- if ( other == NULL )
- other = &dummyOther;
+ // write to dummy if 'other' is null
+ BSONObj dummyOther;
+ if (other == NULL)
+ other = &dummyOther;
- ScopedDbConnection conn(_conn.toString(), timeout );
+ ScopedDbConnection conn(_conn.toString(), timeout);
- BSONObjBuilder queryBuilder;
- queryBuilder.append( LocksType::name() , _name );
- queryBuilder.append( LocksType::state() , 0 );
+ BSONObjBuilder queryBuilder;
+ queryBuilder.append(LocksType::name(), _name);
+ queryBuilder.append(LocksType::state(), 0);
- {
- // make sure its there so we can use simple update logic below
- BSONObj o = conn->findOne( LocksType::ConfigNS , BSON( LocksType::name(_name) ) ).getOwned();
+ {
+ // make sure its there so we can use simple update logic below
+ BSONObj o = conn->findOne(LocksType::ConfigNS, BSON(LocksType::name(_name))).getOwned();
- // Case 1: No locks
- if ( o.isEmpty() ) {
- try {
- LOG( logLvl ) << "inserting initial doc in " << LocksType::ConfigNS << " for lock " << _name << endl;
- conn->insert( LocksType::ConfigNS,
- BSON( LocksType::name(_name)
- << LocksType::state(0)
- << LocksType::who("")
- << LocksType::lockID(OID()) ));
- }
- catch ( UserException& e ) {
- warning() << "could not insert initial doc for distributed lock " << _name << causedBy( e ) << endl;
- }
+ // Case 1: No locks
+ if (o.isEmpty()) {
+ try {
+ LOG(logLvl) << "inserting initial doc in " << LocksType::ConfigNS << " for lock "
+ << _name << endl;
+ conn->insert(LocksType::ConfigNS,
+ BSON(LocksType::name(_name) << LocksType::state(0)
+ << LocksType::who("")
+ << LocksType::lockID(OID())));
+ } catch (UserException& e) {
+ warning() << "could not insert initial doc for distributed lock " << _name
+ << causedBy(e) << endl;
}
+ }
- // Case 2: A set lock that we might be able to force
- else if ( o[LocksType::state()].numberInt() > 0 ) {
-
- string lockName = o[LocksType::name()].String() + string("/") + o[LocksType::process()].String();
-
- BSONObj lastPing = conn->findOne( LockpingsType::ConfigNS, o[LocksType::process()].wrap( LockpingsType::process() ) );
- if ( lastPing.isEmpty() ) {
- LOG( logLvl ) << "empty ping found for process in lock '" << lockName << "'" << endl;
- // TODO: Using 0 as a "no time found" value Will fail if dates roll over, but then, so will a lot.
- lastPing = BSON( LockpingsType::process(o[LocksType::process()].String()) <<
- LockpingsType::ping(Date_t()) );
- }
-
- unsigned long long elapsed = 0;
- unsigned long long takeover = _lockTimeout;
- DistLockPingInfo lastPingEntry = getLastPing();
+ // Case 2: A set lock that we might be able to force
+ else if (o[LocksType::state()].numberInt() > 0) {
+ string lockName =
+ o[LocksType::name()].String() + string("/") + o[LocksType::process()].String();
+
+ BSONObj lastPing = conn->findOne(
+ LockpingsType::ConfigNS, o[LocksType::process()].wrap(LockpingsType::process()));
+ if (lastPing.isEmpty()) {
+ LOG(logLvl) << "empty ping found for process in lock '" << lockName << "'" << endl;
+ // TODO: Using 0 as a "no time found" value Will fail if dates roll over, but then, so will a lot.
+ lastPing = BSON(LockpingsType::process(o[LocksType::process()].String())
+ << LockpingsType::ping(Date_t()));
+ }
- LOG(logLvl) << "checking last ping for lock '" << lockName
- << "' against process " << lastPingEntry.processId
- << " and ping " << lastPingEntry.lastPing;
+ unsigned long long elapsed = 0;
+ unsigned long long takeover = _lockTimeout;
+ DistLockPingInfo lastPingEntry = getLastPing();
- try {
+ LOG(logLvl) << "checking last ping for lock '" << lockName << "' against process "
+ << lastPingEntry.processId << " and ping " << lastPingEntry.lastPing;
- Date_t remote = remoteTime( _conn );
-
- auto pingDocProcessId = lastPing[LockpingsType::process()].String();
- auto pingDocPingValue = lastPing[LockpingsType::ping()].Date();
-
- // Timeout the elapsed time using comparisons of remote clock
- // For non-finalized locks, timeout 15 minutes since last seen (ts)
- // For finalized locks, timeout 15 minutes since last ping
- bool recPingChange =
- o[LocksType::state()].numberInt() == 2 &&
- (lastPingEntry.processId != pingDocProcessId ||
- lastPingEntry.lastPing != pingDocPingValue);
- bool recTSChange = lastPingEntry.lockSessionId != o[LocksType::lockID()].OID();
-
- if (recPingChange || recTSChange) {
- // If the ping has changed since we last checked, mark the current date and time
- setLastPing(DistLockPingInfo(pingDocProcessId,
- pingDocPingValue,
- remote,
- o[LocksType::lockID()].OID(),
- OID()));
- }
- else {
-
- // GOTCHA! Due to network issues, it is possible that the current time
- // is less than the remote time. We *have* to check this here, otherwise
- // we overflow and our lock breaks.
- if (lastPingEntry.configLocalTime >= remote)
- elapsed = 0;
- else
- elapsed = (remote - lastPingEntry.configLocalTime).count();
- }
+ try {
+ Date_t remote = remoteTime(_conn);
+
+ auto pingDocProcessId = lastPing[LockpingsType::process()].String();
+ auto pingDocPingValue = lastPing[LockpingsType::ping()].Date();
+
+ // Timeout the elapsed time using comparisons of remote clock
+ // For non-finalized locks, timeout 15 minutes since last seen (ts)
+ // For finalized locks, timeout 15 minutes since last ping
+ bool recPingChange = o[LocksType::state()].numberInt() == 2 &&
+ (lastPingEntry.processId != pingDocProcessId ||
+ lastPingEntry.lastPing != pingDocPingValue);
+ bool recTSChange = lastPingEntry.lockSessionId != o[LocksType::lockID()].OID();
+
+ if (recPingChange || recTSChange) {
+ // If the ping has changed since we last checked, mark the current date and time
+ setLastPing(DistLockPingInfo(pingDocProcessId,
+ pingDocPingValue,
+ remote,
+ o[LocksType::lockID()].OID(),
+ OID()));
+ } else {
+ // GOTCHA! Due to network issues, it is possible that the current time
+ // is less than the remote time. We *have* to check this here, otherwise
+ // we overflow and our lock breaks.
+ if (lastPingEntry.configLocalTime >= remote)
+ elapsed = 0;
+ else
+ elapsed = (remote - lastPingEntry.configLocalTime).count();
}
- catch( LockException& e ) {
-
- // Remote server cannot be found / is not responsive
- warning() << "Could not get remote time from " << _conn << causedBy( e );
- // If our config server is having issues, forget all the pings until we can see it again
- resetLastPing();
+ } catch (LockException& e) {
+ // Remote server cannot be found / is not responsive
+ warning() << "Could not get remote time from " << _conn << causedBy(e);
+ // If our config server is having issues, forget all the pings until we can see it again
+ resetLastPing();
+ }
- }
+ if (elapsed <= takeover) {
+ LOG(1) << "could not force lock '" << lockName << "' because elapsed time "
+ << elapsed << " <= takeover time " << takeover;
+ *other = o;
+ other->getOwned();
+ conn.done();
+ return false;
+ }
- if (elapsed <= takeover) {
- LOG(1) << "could not force lock '" << lockName
- << "' because elapsed time " << elapsed
- << " <= takeover time " << takeover;
- *other = o; other->getOwned(); conn.done();
- return false;
- }
+ LOG(0) << "forcing lock '" << lockName << "' because elapsed time " << elapsed
+ << " > takeover time " << takeover;
- LOG(0) << "forcing lock '" << lockName
- << "' because elapsed time " << elapsed
- << " > takeover time " << takeover;
-
- if( elapsed > takeover ) {
-
- // Lock may forced, reset our timer if succeeds or fails
- // Ensures that another timeout must happen if something borks up here, and resets our pristine
- // ping state if acquired.
- resetLastPing();
-
- try {
-
- // Check the clock skew again. If we check this before we get a lock
- // and after the lock times out, we can be pretty sure the time is
- // increasing at the same rate on all servers and therefore our
- // timeout is accurate
- if (isRemoteTimeSkewed()) {
- string msg(str::stream() << "remote time in cluster "
- << _conn.toString()
- << " is now skewed, cannot force lock.");
- throw LockException(msg, ErrorCodes::DistributedClockSkewed);
- }
-
- // Make sure we break the lock with the correct "ts" (OID) value, otherwise
- // we can overwrite a new lock inserted in the meantime.
- conn->update( LocksType::ConfigNS,
- BSON( LocksType::name(_name) <<
- LocksType::state(o[LocksType::state()].numberInt()) <<
- LocksType::lockID(o[LocksType::lockID()].OID()) ),
- BSON( "$set" << BSON( LocksType::state(0) ) ) );
-
- BSONObj err = conn->getLastErrorDetailed();
- string errMsg = DBClientWithCommands::getLastErrorString(err);
-
- // TODO: Clean up all the extra code to exit this method, probably with a refactor
- if ( !errMsg.empty() || !err["n"].type() || err["n"].numberInt() < 1 ) {
- logErrMsgOrWarn("Could not force lock", lockName, errMsg, "(another force won");
- *other = o; other->getOwned(); conn.done();
- return false;
- }
+ if (elapsed > takeover) {
+ // Lock may forced, reset our timer if succeeds or fails
+ // Ensures that another timeout must happen if something borks up here, and resets our pristine
+ // ping state if acquired.
+ resetLastPing();
+ try {
+ // Check the clock skew again. If we check this before we get a lock
+ // and after the lock times out, we can be pretty sure the time is
+ // increasing at the same rate on all servers and therefore our
+ // timeout is accurate
+ if (isRemoteTimeSkewed()) {
+ string msg(str::stream() << "remote time in cluster " << _conn.toString()
+ << " is now skewed, cannot force lock.");
+ throw LockException(msg, ErrorCodes::DistributedClockSkewed);
}
- catch( UpdateNotTheSame& ) {
- // Ok to continue since we know we forced at least one lock document, and all lock docs
- // are required for a lock to be held.
- warning() << "lock forcing " << lockName << " inconsistent" << endl;
- }
- catch (const LockException& ) {
- // Let the exception go up and don't repackage the exception.
- throw;
- }
- catch( std::exception& e ) {
+
+ // Make sure we break the lock with the correct "ts" (OID) value, otherwise
+ // we can overwrite a new lock inserted in the meantime.
+ conn->update(LocksType::ConfigNS,
+ BSON(LocksType::name(_name)
+ << LocksType::state(o[LocksType::state()].numberInt())
+ << LocksType::lockID(o[LocksType::lockID()].OID())),
+ BSON("$set" << BSON(LocksType::state(0))));
+
+ BSONObj err = conn->getLastErrorDetailed();
+ string errMsg = DBClientWithCommands::getLastErrorString(err);
+
+ // TODO: Clean up all the extra code to exit this method, probably with a refactor
+ if (!errMsg.empty() || !err["n"].type() || err["n"].numberInt() < 1) {
+ logErrMsgOrWarn(
+ "Could not force lock", lockName, errMsg, "(another force won");
+ *other = o;
+ other->getOwned();
conn.done();
- string msg(str::stream() << "exception forcing distributed lock "
- << lockName << causedBy(e));
- throw LockException(msg, 13660);
+ return false;
}
+ } catch (UpdateNotTheSame&) {
+ // Ok to continue since we know we forced at least one lock document, and all lock docs
+ // are required for a lock to be held.
+ warning() << "lock forcing " << lockName << " inconsistent" << endl;
+ } catch (const LockException&) {
+ // Let the exception go up and don't repackage the exception.
+ throw;
+ } catch (std::exception& e) {
+ conn.done();
+ string msg(str::stream() << "exception forcing distributed lock " << lockName
+ << causedBy(e));
+ throw LockException(msg, 13660);
}
- else {
- // Not strictly necessary, but helpful for small timeouts where thread
- // scheduling is significant. This ensures that two attempts are still
- // required for a force if not acquired, and resets our state if we
- // are acquired.
- resetLastPing();
-
- // Test that the lock is held by trying to update the finalized state of the lock to the same state
- // if it does not update or does not update on all servers, we can't re-enter.
- try {
-
- // Test the lock with the correct "ts" (OID) value
- conn->update( LocksType::ConfigNS,
- BSON( LocksType::name(_name) <<
- LocksType::state(2) <<
- LocksType::lockID(o[LocksType::lockID()].OID()) ),
- BSON( "$set" << BSON( LocksType::state(2) ) ) );
-
- BSONObj err = conn->getLastErrorDetailed();
- string errMsg = DBClientWithCommands::getLastErrorString(err);
-
- // TODO: Clean up all the extra code to exit this method, probably with a refactor
- if ( ! errMsg.empty() || ! err["n"].type() || err["n"].numberInt() < 1 ) {
- logErrMsgOrWarn("Could not re-enter lock", lockName, errMsg, "(not sure lock is held");
- *other = o; other->getOwned(); conn.done();
- return false;
- }
- }
- catch( UpdateNotTheSame& ) {
- // NOT ok to continue since our lock isn't held by all servers, so isn't valid.
- warning() << "inconsistent state re-entering lock, lock " << lockName << " not held" << endl;
- *other = o; other->getOwned(); conn.done();
- return false;
- }
- catch( std::exception& e ) {
+ } else {
+ // Not strictly necessary, but helpful for small timeouts where thread
+ // scheduling is significant. This ensures that two attempts are still
+ // required for a force if not acquired, and resets our state if we
+ // are acquired.
+ resetLastPing();
+
+ // Test that the lock is held by trying to update the finalized state of the lock to the same state
+ // if it does not update or does not update on all servers, we can't re-enter.
+ try {
+ // Test the lock with the correct "ts" (OID) value
+ conn->update(LocksType::ConfigNS,
+ BSON(LocksType::name(_name)
+ << LocksType::state(2)
+ << LocksType::lockID(o[LocksType::lockID()].OID())),
+ BSON("$set" << BSON(LocksType::state(2))));
+
+ BSONObj err = conn->getLastErrorDetailed();
+ string errMsg = DBClientWithCommands::getLastErrorString(err);
+
+ // TODO: Clean up all the extra code to exit this method, probably with a refactor
+ if (!errMsg.empty() || !err["n"].type() || err["n"].numberInt() < 1) {
+ logErrMsgOrWarn(
+ "Could not re-enter lock", lockName, errMsg, "(not sure lock is held");
+ *other = o;
+ other->getOwned();
conn.done();
- string msg(str::stream() << "exception re-entering distributed lock "
- << lockName << causedBy(e));
- throw LockException(msg, 13660);
+ return false;
}
- LOG( logLvl - 1 ) << "re-entered distributed lock '" << lockName << "'" << endl;
- *other = o.getOwned();
+ } catch (UpdateNotTheSame&) {
+ // NOT ok to continue since our lock isn't held by all servers, so isn't valid.
+ warning() << "inconsistent state re-entering lock, lock " << lockName
+ << " not held" << endl;
+ *other = o;
+ other->getOwned();
conn.done();
- return true;
-
+ return false;
+ } catch (std::exception& e) {
+ conn.done();
+ string msg(str::stream() << "exception re-entering distributed lock "
+ << lockName << causedBy(e));
+ throw LockException(msg, 13660);
}
- LOG( logLvl - 1 ) << "lock '" << lockName << "' successfully forced" << endl;
-
- // We don't need the ts value in the query, since we will only ever replace locks with state=0.
- }
- // Case 3: We have an expired lock
- else if ( o[LocksType::lockID()].type() ) {
- queryBuilder.append( o[LocksType::lockID()] );
+ LOG(logLvl - 1) << "re-entered distributed lock '" << lockName << "'" << endl;
+ *other = o.getOwned();
+ conn.done();
+ return true;
}
- }
- // Always reset our ping if we're trying to get a lock, since getting a lock implies the lock state is open
- // and no locks need to be forced. If anything goes wrong, we don't want to remember an old lock.
- resetLastPing();
+ LOG(logLvl - 1) << "lock '" << lockName << "' successfully forced" << endl;
- bool gotLock = false;
- BSONObj currLock;
+ // We don't need the ts value in the query, since we will only ever replace locks with state=0.
+ }
+ // Case 3: We have an expired lock
+ else if (o[LocksType::lockID()].type()) {
+ queryBuilder.append(o[LocksType::lockID()]);
+ }
+ }
- BSONObj lockDetails = BSON( LocksType::state(1)
- << LocksType::who(getDistLockId())
- << LocksType::process(_processId)
- << LocksType::when(jsTime())
- << LocksType::why(why)
- << LocksType::lockID(OID::gen()) );
- BSONObj whatIWant = BSON( "$set" << lockDetails );
+ // Always reset our ping if we're trying to get a lock, since getting a lock implies the lock state is open
+ // and no locks need to be forced. If anything goes wrong, we don't want to remember an old lock.
+ resetLastPing();
- BSONObj query = queryBuilder.obj();
+ bool gotLock = false;
+ BSONObj currLock;
- string lockName = _name + string("/") + _processId;
+ BSONObj lockDetails =
+ BSON(LocksType::state(1) << LocksType::who(getDistLockId())
+ << LocksType::process(_processId) << LocksType::when(jsTime())
+ << LocksType::why(why) << LocksType::lockID(OID::gen()));
+ BSONObj whatIWant = BSON("$set" << lockDetails);
- try {
+ BSONObj query = queryBuilder.obj();
- // Main codepath to acquire lock
+ string lockName = _name + string("/") + _processId;
- LOG( logLvl ) << "about to acquire distributed lock '" << lockName << "'";
+ try {
+ // Main codepath to acquire lock
- LOG( logLvl + 1 ) << "trying to acquire lock " << query.toString( false, true )
- << " with details " << lockDetails.toString( false, true ) << endl;
+ LOG(logLvl) << "about to acquire distributed lock '" << lockName << "'";
- conn->update( LocksType::ConfigNS , query , whatIWant );
+ LOG(logLvl + 1) << "trying to acquire lock " << query.toString(false, true)
+ << " with details " << lockDetails.toString(false, true) << endl;
- BSONObj err = conn->getLastErrorDetailed();
- string errMsg = DBClientWithCommands::getLastErrorString(err);
+ conn->update(LocksType::ConfigNS, query, whatIWant);
- currLock = conn->findOne( LocksType::ConfigNS , BSON( LocksType::name(_name) ) );
+ BSONObj err = conn->getLastErrorDetailed();
+ string errMsg = DBClientWithCommands::getLastErrorString(err);
- if ( !errMsg.empty() || !err["n"].type() || err["n"].numberInt() < 1 ) {
- logErrMsgOrWarn("could not acquire lock", lockName, errMsg, "(another update won)");
- *other = currLock;
- other->getOwned();
- gotLock = false;
- }
- else {
- gotLock = true;
- }
+ currLock = conn->findOne(LocksType::ConfigNS, BSON(LocksType::name(_name)));
+ if (!errMsg.empty() || !err["n"].type() || err["n"].numberInt() < 1) {
+ logErrMsgOrWarn("could not acquire lock", lockName, errMsg, "(another update won)");
+ *other = currLock;
+ other->getOwned();
+ gotLock = false;
+ } else {
+ gotLock = true;
}
- catch ( UpdateNotTheSame& up ) {
-
- // this means our update got through on some, but not others
- warning() << "distributed lock '" << lockName << " did not propagate properly." << causedBy( up ) << endl;
-
- // Overall protection derives from:
- // All unlocking updates use the ts value when setting state to 0
- // This ensures that during locking, we can override all smaller ts locks with
- // our own safe ts value and not be unlocked afterward.
- for ( unsigned i = 0; i < up.size(); i++ ) {
- ScopedDbConnection indDB(up[i].first);
- BSONObj indUpdate;
+ } catch (UpdateNotTheSame& up) {
+ // this means our update got through on some, but not others
+ warning() << "distributed lock '" << lockName << " did not propagate properly."
+ << causedBy(up) << endl;
- try {
+ // Overall protection derives from:
+ // All unlocking updates use the ts value when setting state to 0
+ // This ensures that during locking, we can override all smaller ts locks with
+ // our own safe ts value and not be unlocked afterward.
+ for (unsigned i = 0; i < up.size(); i++) {
+ ScopedDbConnection indDB(up[i].first);
+ BSONObj indUpdate;
- indUpdate = indDB->findOne( LocksType::ConfigNS , BSON( LocksType::name(_name) ) );
-
- // If we override this lock in any way, grab and protect it.
- // We assume/ensure that if a process does not have all lock documents, it is no longer
- // holding the lock.
- // Note - finalized locks may compete too, but we know they've won already if competing
- // in this round. Cleanup of crashes during finalizing may take a few tries.
- if( indUpdate[LocksType::lockID()] < lockDetails[LocksType::lockID()] || indUpdate[LocksType::state()].numberInt() == 0 ) {
-
- BSONObj grabQuery = BSON( LocksType::name(_name)
- << LocksType::lockID(indUpdate[LocksType::lockID()].OID()) );
-
- // Change ts so we won't be forced, state so we won't be relocked
- BSONObj grabChanges = BSON( LocksType::lockID(lockDetails[LocksType::lockID()].OID())
- << LocksType::state(1) );
-
- // Either our update will succeed, and we'll grab the lock, or it will fail b/c some other
- // process grabbed the lock (which will change the ts), but the lock will be set until forcing
- indDB->update( LocksType::ConfigNS, grabQuery, BSON( "$set" << grabChanges ) );
-
- indUpdate = indDB->findOne( LocksType::ConfigNS, BSON( LocksType::name(_name) ) );
-
- // The tournament was interfered and it is not safe to proceed further.
- // One case this could happen is when the LockPinger processes old
- // entries from addUnlockOID. See SERVER-10688 for more detailed
- // description of race.
- if ( indUpdate[LocksType::state()].numberInt() <= 0 ) {
- LOG( logLvl - 1 ) << "lock tournament interrupted, "
- << "so no lock was taken; "
- << "new state of lock: " << indUpdate << endl;
-
- // We now break and set our currLock lockID value to zero, so that
- // we know that we did not acquire the lock below. Later code will
- // cleanup failed entries.
- currLock = BSON(LocksType::lockID(OID()));
- indDB.done();
- break;
- }
+ try {
+ indUpdate = indDB->findOne(LocksType::ConfigNS, BSON(LocksType::name(_name)));
+
+ // If we override this lock in any way, grab and protect it.
+ // We assume/ensure that if a process does not have all lock documents, it is no longer
+ // holding the lock.
+ // Note - finalized locks may compete too, but we know they've won already if competing
+ // in this round. Cleanup of crashes during finalizing may take a few tries.
+ if (indUpdate[LocksType::lockID()] < lockDetails[LocksType::lockID()] ||
+ indUpdate[LocksType::state()].numberInt() == 0) {
+ BSONObj grabQuery =
+ BSON(LocksType::name(_name)
+ << LocksType::lockID(indUpdate[LocksType::lockID()].OID()));
+
+ // Change ts so we won't be forced, state so we won't be relocked
+ BSONObj grabChanges =
+ BSON(LocksType::lockID(lockDetails[LocksType::lockID()].OID())
+ << LocksType::state(1));
+
+ // Either our update will succeed, and we'll grab the lock, or it will fail b/c some other
+ // process grabbed the lock (which will change the ts), but the lock will be set until forcing
+ indDB->update(LocksType::ConfigNS, grabQuery, BSON("$set" << grabChanges));
+
+ indUpdate = indDB->findOne(LocksType::ConfigNS, BSON(LocksType::name(_name)));
+
+ // The tournament was interfered and it is not safe to proceed further.
+ // One case this could happen is when the LockPinger processes old
+ // entries from addUnlockOID. See SERVER-10688 for more detailed
+ // description of race.
+ if (indUpdate[LocksType::state()].numberInt() <= 0) {
+ LOG(logLvl - 1) << "lock tournament interrupted, "
+ << "so no lock was taken; "
+ << "new state of lock: " << indUpdate << endl;
+
+ // We now break and set our currLock lockID value to zero, so that
+ // we know that we did not acquire the lock below. Later code will
+ // cleanup failed entries.
+ currLock = BSON(LocksType::lockID(OID()));
+ indDB.done();
+ break;
}
- // else our lock is the same, in which case we're safe, or it's a bigger lock,
- // in which case we won't need to protect anything since we won't have the lock.
-
- }
- catch( std::exception& e ) {
- conn.done();
- string msg(str::stream() << "distributed lock " << lockName
- << " had errors communicating with individual server "
- << up[1].first << causedBy(e));
- throw LockException(msg, 13661);
- }
-
- verify( !indUpdate.isEmpty() );
-
- // Find max TS value
- if ( currLock.isEmpty() || currLock[LocksType::lockID()] < indUpdate[LocksType::lockID()] ) {
- currLock = indUpdate.getOwned();
}
+ // else our lock is the same, in which case we're safe, or it's a bigger lock,
+ // in which case we won't need to protect anything since we won't have the lock.
- indDB.done();
-
+ } catch (std::exception& e) {
+ conn.done();
+ string msg(str::stream() << "distributed lock " << lockName
+ << " had errors communicating with individual server "
+ << up[1].first << causedBy(e));
+ throw LockException(msg, 13661);
}
- // Locks on all servers are now set and safe until forcing
+ verify(!indUpdate.isEmpty());
- if ( currLock[LocksType::lockID()] == lockDetails[LocksType::lockID()] ) {
- LOG( logLvl - 1 ) << "lock update won, completing lock propagation for '" << lockName << "'" << endl;
- gotLock = true;
+ // Find max TS value
+ if (currLock.isEmpty() ||
+ currLock[LocksType::lockID()] < indUpdate[LocksType::lockID()]) {
+ currLock = indUpdate.getOwned();
}
- else {
- LOG( logLvl - 1 ) << "lock update lost, lock '" << lockName << "' not propagated." << endl;
- gotLock = false;
- }
- }
- catch( std::exception& e ) {
- conn.done();
- string msg(str::stream() << "exception creating distributed lock "
- << lockName << causedBy(e));
- throw LockException(msg, 13663 );
- }
- // Complete lock propagation
- if( gotLock ) {
+ indDB.done();
+ }
- // This is now safe, since we know that no new locks will be placed on top of the ones we've checked for at
- // least 15 minutes. Sets the state = 2, so that future clients can determine that the lock is truly set.
- // The invariant for rollbacks is that we will never force locks with state = 2 and active pings, since that
- // indicates the lock is active, but this means the process creating/destroying them must explicitly poll
- // when something goes wrong.
- try {
+ // Locks on all servers are now set and safe until forcing
- BSONObjBuilder finalLockDetails;
- BSONObjIterator bi( lockDetails );
- while( bi.more() ) {
- BSONElement el = bi.next();
- if( (string) ( el.fieldName() ) == LocksType::state() )
- finalLockDetails.append( LocksType::state(), 2 );
- else finalLockDetails.append( el );
- }
+ if (currLock[LocksType::lockID()] == lockDetails[LocksType::lockID()]) {
+ LOG(logLvl - 1) << "lock update won, completing lock propagation for '" << lockName
+ << "'" << endl;
+ gotLock = true;
+ } else {
+ LOG(logLvl - 1) << "lock update lost, lock '" << lockName << "' not propagated."
+ << endl;
+ gotLock = false;
+ }
+ } catch (std::exception& e) {
+ conn.done();
+ string msg(str::stream() << "exception creating distributed lock " << lockName
+ << causedBy(e));
+ throw LockException(msg, 13663);
+ }
- conn->update( LocksType::ConfigNS , BSON( LocksType::name(_name) ) , BSON( "$set" << finalLockDetails.obj() ) );
+ // Complete lock propagation
+ if (gotLock) {
+ // This is now safe, since we know that no new locks will be placed on top of the ones we've checked for at
+ // least 15 minutes. Sets the state = 2, so that future clients can determine that the lock is truly set.
+ // The invariant for rollbacks is that we will never force locks with state = 2 and active pings, since that
+ // indicates the lock is active, but this means the process creating/destroying them must explicitly poll
+ // when something goes wrong.
+ try {
+ BSONObjBuilder finalLockDetails;
+ BSONObjIterator bi(lockDetails);
+ while (bi.more()) {
+ BSONElement el = bi.next();
+ if ((string)(el.fieldName()) == LocksType::state())
+ finalLockDetails.append(LocksType::state(), 2);
+ else
+ finalLockDetails.append(el);
+ }
- BSONObj err = conn->getLastErrorDetailed();
- string errMsg = DBClientWithCommands::getLastErrorString(err);
+ conn->update(LocksType::ConfigNS,
+ BSON(LocksType::name(_name)),
+ BSON("$set" << finalLockDetails.obj()));
- currLock = conn->findOne( LocksType::ConfigNS , BSON( LocksType::name(_name) ) );
+ BSONObj err = conn->getLastErrorDetailed();
+ string errMsg = DBClientWithCommands::getLastErrorString(err);
- if ( !errMsg.empty() || !err["n"].type() || err["n"].numberInt() < 1 ) {
- warning() << "could not finalize winning lock " << lockName
- << ( !errMsg.empty() ? causedBy( errMsg ) : " (did not update lock) " ) << endl;
- gotLock = false;
- }
- else {
- // SUCCESS!
- gotLock = true;
- }
+ currLock = conn->findOne(LocksType::ConfigNS, BSON(LocksType::name(_name)));
- }
- catch( std::exception& e ) {
- conn.done();
- string msg(str::stream() << "exception finalizing winning lock" << causedBy(e));
- // Inform caller about the potential orphan lock.
- throw LockException(msg,
- 13662,
- lockDetails[LocksType::lockID()].OID());
+ if (!errMsg.empty() || !err["n"].type() || err["n"].numberInt() < 1) {
+ warning() << "could not finalize winning lock " << lockName
+ << (!errMsg.empty() ? causedBy(errMsg) : " (did not update lock) ")
+ << endl;
+ gotLock = false;
+ } else {
+ // SUCCESS!
+ gotLock = true;
}
+ } catch (std::exception& e) {
+ conn.done();
+ string msg(str::stream() << "exception finalizing winning lock" << causedBy(e));
+ // Inform caller about the potential orphan lock.
+ throw LockException(msg, 13662, lockDetails[LocksType::lockID()].OID());
}
-
- *other = currLock;
- other->getOwned();
-
- // Log our lock results
- if(gotLock)
- LOG( logLvl - 1 ) << "distributed lock '" << lockName <<
- "' acquired, ts : " << currLock[LocksType::lockID()].OID() << endl;
- else
- LOG( logLvl - 1 ) << "distributed lock '" << lockName << "' was not acquired." << endl;
-
- conn.done();
-
- return gotLock;
}
- // This function *must not* throw exceptions, since it can be used in destructors - failure
- // results in queuing and trying again later.
- bool DistributedLock::unlock(const DistLockHandle& lockID) {
+ *other = currLock;
+ other->getOwned();
- verify(_name != "");
- string lockName = _name + string("/") + _processId;
+ // Log our lock results
+ if (gotLock)
+ LOG(logLvl - 1) << "distributed lock '" << lockName
+ << "' acquired, ts : " << currLock[LocksType::lockID()].OID() << endl;
+ else
+ LOG(logLvl - 1) << "distributed lock '" << lockName << "' was not acquired." << endl;
- const int maxAttempts = 3;
- int attempted = 0;
+ conn.done();
- while ( ++attempted <= maxAttempts ) {
+ return gotLock;
+}
- // Awkward, but necessary since the constructor itself throws exceptions
- unique_ptr<ScopedDbConnection> connPtr;
+// This function *must not* throw exceptions, since it can be used in destructors - failure
+// results in queuing and trying again later.
+bool DistributedLock::unlock(const DistLockHandle& lockID) {
+ verify(_name != "");
+ string lockName = _name + string("/") + _processId;
- try {
+ const int maxAttempts = 3;
+ int attempted = 0;
- connPtr.reset( new ScopedDbConnection( _conn.toString() ) );
- ScopedDbConnection& conn = *connPtr;
+ while (++attempted <= maxAttempts) {
+ // Awkward, but necessary since the constructor itself throws exceptions
+ unique_ptr<ScopedDbConnection> connPtr;
- // Use ts when updating lock, so that new locks can be sure they won't get trampled.
- conn->update(LocksType::ConfigNS,
- BSON(LocksType::name(_name)
- << LocksType::lockID(lockID)),
- BSON("$set" << BSON(LocksType::state(0))));
+ try {
+ connPtr.reset(new ScopedDbConnection(_conn.toString()));
+ ScopedDbConnection& conn = *connPtr;
- // Check that the lock was actually unlocked... if not, try again
- BSONObj err = conn->getLastErrorDetailed();
- string errMsg = DBClientWithCommands::getLastErrorString(err);
+ // Use ts when updating lock, so that new locks can be sure they won't get trampled.
+ conn->update(LocksType::ConfigNS,
+ BSON(LocksType::name(_name) << LocksType::lockID(lockID)),
+ BSON("$set" << BSON(LocksType::state(0))));
- if ( !errMsg.empty() || !err["n"].type() || err["n"].numberInt() < 1 ){
- warning() << "distributed lock unlock update failed, retrying "
- << ( errMsg.empty() ? causedBy( "( update not registered )" ) : causedBy( errMsg ) ) << endl;
- conn.done();
- continue;
- }
+ // Check that the lock was actually unlocked... if not, try again
+ BSONObj err = conn->getLastErrorDetailed();
+ string errMsg = DBClientWithCommands::getLastErrorString(err);
- LOG(0) << "distributed lock '" << lockName << "' unlocked. ";
+ if (!errMsg.empty() || !err["n"].type() || err["n"].numberInt() < 1) {
+ warning() << "distributed lock unlock update failed, retrying "
+ << (errMsg.empty() ? causedBy("( update not registered )")
+ : causedBy(errMsg)) << endl;
conn.done();
- return true;
- }
- catch (const UpdateNotTheSame&) {
- LOG(0) << "distributed lock '" << lockName << "' unlocked (messily). ";
- // This isn't a connection problem, so don't throw away the conn
- connPtr->done();
- return true;
+ continue;
}
- catch ( std::exception& e) {
- warning() << "distributed lock '" << lockName << "' failed unlock attempt."
- << causedBy( e ) << endl;
- // TODO: If our lock timeout is small, sleeping this long may be unsafe.
- if( attempted != maxAttempts) sleepsecs(1 << attempted);
- }
+ LOG(0) << "distributed lock '" << lockName << "' unlocked. ";
+ conn.done();
+ return true;
+ } catch (const UpdateNotTheSame&) {
+ LOG(0) << "distributed lock '" << lockName << "' unlocked (messily). ";
+ // This isn't a connection problem, so don't throw away the conn
+ connPtr->done();
+ return true;
+ } catch (std::exception& e) {
+ warning() << "distributed lock '" << lockName << "' failed unlock attempt."
+ << causedBy(e) << endl;
+
+ // TODO: If our lock timeout is small, sleeping this long may be unsafe.
+ if (attempted != maxAttempts)
+ sleepsecs(1 << attempted);
}
-
- return false;
}
+ return false;
+}
}
diff --git a/src/mongo/s/catalog/legacy/distlock.h b/src/mongo/s/catalog/legacy/distlock.h
index b569080147e..b267be42a04 100644
--- a/src/mongo/s/catalog/legacy/distlock.h
+++ b/src/mongo/s/catalog/legacy/distlock.h
@@ -37,199 +37,198 @@
namespace mongo {
- namespace {
-
- enum TimeConstants {
- LOCK_TIMEOUT = 15 * 60 * 1000,
- LOCK_SKEW_FACTOR = 30,
- LOCK_PING = LOCK_TIMEOUT / LOCK_SKEW_FACTOR,
- MAX_LOCK_NET_SKEW = LOCK_TIMEOUT / LOCK_SKEW_FACTOR,
- MAX_LOCK_CLOCK_SKEW = LOCK_TIMEOUT / LOCK_SKEW_FACTOR,
- NUM_LOCK_SKEW_CHECKS = 3,
- };
-
- // The maximum clock skew we need to handle between config servers is
- // 2 * MAX_LOCK_NET_SKEW + MAX_LOCK_CLOCK_SKEW.
-
- // Net effect of *this* clock being slow is effectively a multiplier on the max net skew
- // and a linear increase or decrease of the max clock skew.
- }
+namespace {
+
+enum TimeConstants {
+ LOCK_TIMEOUT = 15 * 60 * 1000,
+ LOCK_SKEW_FACTOR = 30,
+ LOCK_PING = LOCK_TIMEOUT / LOCK_SKEW_FACTOR,
+ MAX_LOCK_NET_SKEW = LOCK_TIMEOUT / LOCK_SKEW_FACTOR,
+ MAX_LOCK_CLOCK_SKEW = LOCK_TIMEOUT / LOCK_SKEW_FACTOR,
+ NUM_LOCK_SKEW_CHECKS = 3,
+};
+
+// The maximum clock skew we need to handle between config servers is
+// 2 * MAX_LOCK_NET_SKEW + MAX_LOCK_CLOCK_SKEW.
+
+// Net effect of *this* clock being slow is effectively a multiplier on the max net skew
+// and a linear increase or decrease of the max clock skew.
+}
+
+/**
+ * Exception class to encapsulate exceptions while managing distributed locks
+ */
+class LockException : public DBException {
+public:
+ LockException(StringData msg, int code);
/**
- * Exception class to encapsulate exceptions while managing distributed locks
+ * Use this to signal that a lock with lockID needs to be unlocked. For example, in cases
+ * where the final lock acquisition was not propagated properly to all config servers.
*/
- class LockException : public DBException {
- public:
- LockException(StringData msg, int code);
+ LockException(StringData msg, int code, DistLockHandle lockID);
- /**
- * Use this to signal that a lock with lockID needs to be unlocked. For example, in cases
- * where the final lock acquisition was not propagated properly to all config servers.
- */
- LockException(StringData msg, int code, DistLockHandle lockID);
+ /**
+ * Returns the OID of the lock that needs to be unlocked.
+ */
+ DistLockHandle getMustUnlockID() const;
- /**
- * Returns the OID of the lock that needs to be unlocked.
- */
- DistLockHandle getMustUnlockID() const;
+ virtual ~LockException() = default;
- virtual ~LockException() = default;
+private:
+ // The identifier of a lock that needs to be unlocked.
+ DistLockHandle _mustUnlockID;
+};
- private:
- // The identifier of a lock that needs to be unlocked.
- DistLockHandle _mustUnlockID;
- };
+/**
+ * Indicates an error in retrieving time values from remote servers.
+ */
+class TimeNotFoundException : public LockException {
+public:
+ TimeNotFoundException(const char* msg, int code) : LockException(msg, code) {}
+ TimeNotFoundException(const std::string& msg, int code) : LockException(msg, code) {}
+ virtual ~TimeNotFoundException() = default;
+};
+
+/**
+ * The distributed lock is a configdb backed way of synchronizing system-wide tasks. A task
+ * must be identified by a unique name across the system (e.g., "balancer"). A lock is taken
+ * by writing a document in the configdb's locks collection with that name.
+ *
+ * To be maintained, each taken lock needs to be revalidated ("pinged") within a
+ * pre-established amount of time. This class does this maintenance automatically once a
+ * DistributedLock object was constructed. The ping procedure records the local time to
+ * the ping document, but that time is untrusted and is only used as a point of reference
+ * of whether the ping was refreshed or not. Ultimately, the clock a configdb is the source
+ * of truth when determining whether a ping is still fresh or not. This is achieved by
+ * (1) remembering the ping document time along with config server time when unable to
+ * take a lock, and (2) ensuring all config servers report similar times and have similar
+ * time rates (the difference in times must start and stay small).
+ *
+ * Lock states include:
+ * 0: unlocked
+ * 1: about to be locked
+ * 2: locked
+ *
+ * Valid state transitions:
+ * 0 -> 1
+ * 1 -> 2
+ * 2 -> 0
+ *
+ * Note that at any point in time, a lock can be force unlocked if the ping for the lock
+ * becomes too stale.
+ */
+class DistributedLock {
+public:
+ static logger::LabeledLevel logLvl;
- /**
- * Indicates an error in retrieving time values from remote servers.
- */
- class TimeNotFoundException : public LockException {
+ class LastPings {
public:
- TimeNotFoundException( const char * msg , int code ) : LockException( msg, code ) {}
- TimeNotFoundException( const std::string& msg, int code ) : LockException( msg, code ) {}
- virtual ~TimeNotFoundException() = default;
+ DistLockPingInfo getLastPing(const ConnectionString& conn, const std::string& lockName);
+ void setLastPing(const ConnectionString& conn,
+ const std::string& lockName,
+ const DistLockPingInfo& pd);
+
+ stdx::mutex _mutex;
+ std::map<std::pair<std::string, std::string>, DistLockPingInfo> _lastPings;
};
+ static LastPings lastPings;
+
/**
- * The distributed lock is a configdb backed way of synchronizing system-wide tasks. A task
- * must be identified by a unique name across the system (e.g., "balancer"). A lock is taken
- * by writing a document in the configdb's locks collection with that name.
+ * The constructor does not connect to the configdb yet and constructing does not mean the lock was acquired.
+ * Construction does trigger a lock "pinging" mechanism, though.
*
- * To be maintained, each taken lock needs to be revalidated ("pinged") within a
- * pre-established amount of time. This class does this maintenance automatically once a
- * DistributedLock object was constructed. The ping procedure records the local time to
- * the ping document, but that time is untrusted and is only used as a point of reference
- * of whether the ping was refreshed or not. Ultimately, the clock a configdb is the source
- * of truth when determining whether a ping is still fresh or not. This is achieved by
- * (1) remembering the ping document time along with config server time when unable to
- * take a lock, and (2) ensuring all config servers report similar times and have similar
- * time rates (the difference in times must start and stay small).
+ * @param conn address of config(s) server(s)
+ * @param name identifier for the lock
+ * @param lockTimeout how long can the log go "unpinged" before a new attempt to lock steals it (in minutes).
+ * @param lockPing how long to wait between lock pings
+ * @param legacy use legacy logic
*
- * Lock states include:
- * 0: unlocked
- * 1: about to be locked
- * 2: locked
- *
- * Valid state transitions:
- * 0 -> 1
- * 1 -> 2
- * 2 -> 0
+ */
+ DistributedLock(const ConnectionString& conn,
+ const std::string& name,
+ unsigned long long lockTimeout = 0,
+ bool asProcess = false);
+ ~DistributedLock(){};
+
+ /**
+ * Attempts to acquire 'this' lock, checking if it could or should be stolen from the previous holder. Please
+ * consider using the dist_lock_try construct to acquire this lock in an exception safe way.
*
- * Note that at any point in time, a lock can be force unlocked if the ping for the lock
- * becomes too stale.
+ * @param why human readable description of why the lock is being taken (used to log)
+ * @param other configdb's lock document that is currently holding the lock, if lock is taken, or our own lock
+ * details if not
+ * @return true if it managed to grab the lock
*/
- class DistributedLock {
- public:
+ bool lock_try(const std::string& why, BSONObj* other = 0, double timeout = 0.0);
- static logger::LabeledLevel logLvl;
-
- class LastPings {
- public:
- DistLockPingInfo getLastPing(const ConnectionString& conn,
- const std::string& lockName);
- void setLastPing(const ConnectionString& conn,
- const std::string& lockName,
- const DistLockPingInfo& pd);
-
- stdx::mutex _mutex;
- std::map< std::pair<std::string, std::string>, DistLockPingInfo > _lastPings;
- };
-
- static LastPings lastPings;
-
- /**
- * The constructor does not connect to the configdb yet and constructing does not mean the lock was acquired.
- * Construction does trigger a lock "pinging" mechanism, though.
- *
- * @param conn address of config(s) server(s)
- * @param name identifier for the lock
- * @param lockTimeout how long can the log go "unpinged" before a new attempt to lock steals it (in minutes).
- * @param lockPing how long to wait between lock pings
- * @param legacy use legacy logic
- *
- */
- DistributedLock( const ConnectionString& conn , const std::string& name , unsigned long long lockTimeout = 0, bool asProcess = false );
- ~DistributedLock(){};
-
- /**
- * Attempts to acquire 'this' lock, checking if it could or should be stolen from the previous holder. Please
- * consider using the dist_lock_try construct to acquire this lock in an exception safe way.
- *
- * @param why human readable description of why the lock is being taken (used to log)
- * @param other configdb's lock document that is currently holding the lock, if lock is taken, or our own lock
- * details if not
- * @return true if it managed to grab the lock
- */
- bool lock_try(const std::string& why, BSONObj* other = 0, double timeout = 0.0);
-
- /**
- * Returns OK if this lock is held (but does not guarantee that this owns it) and
- * it was possible to confirm that, within 'timeout' seconds, if provided, with the
- * config servers.
- */
- Status checkStatus(double timeout);
-
- /**
- * Releases a previously taken lock. Returns true on success.
- */
- bool unlock(const OID& lockID);
-
- Date_t getRemoteTime() const;
-
- bool isRemoteTimeSkewed() const;
-
- const std::string& getProcessId() const;
-
- const ConnectionString& getRemoteConnection() const;
-
- /**
- * Checks the skew among a cluster of servers and returns true if the min and max clock
- * times among the servers are within maxClockSkew.
- */
- static bool checkSkew( const ConnectionString& cluster,
- unsigned skewChecks = NUM_LOCK_SKEW_CHECKS,
- unsigned long long maxClockSkew = MAX_LOCK_CLOCK_SKEW,
- unsigned long long maxNetSkew = MAX_LOCK_NET_SKEW );
-
- /**
- * Get the remote time from a server or cluster
- */
- static Date_t remoteTime( const ConnectionString& cluster, unsigned long long maxNetSkew = MAX_LOCK_NET_SKEW );
-
- /**
- * Namespace for lock pings
- */
- static const std::string lockPingNS;
-
- /**
- * Namespace for locks
- */
- static const std::string locksNS;
-
- const ConnectionString _conn;
- const std::string _name;
- const std::string _processId;
-
- // Timeout for lock, usually LOCK_TIMEOUT
- const unsigned long long _lockTimeout;
- const unsigned long long _maxClockSkew;
- const unsigned long long _maxNetSkew;
- const unsigned long long _lockPing;
-
- private:
-
- void resetLastPing() {
- lastPings.setLastPing(_conn, _name, DistLockPingInfo());
- }
-
- void setLastPing(const DistLockPingInfo& pd) {
- lastPings.setLastPing(_conn, _name, pd);
- }
-
- DistLockPingInfo getLastPing() {
- return lastPings.getLastPing(_conn, _name);
- }
- };
+ /**
+ * Returns OK if this lock is held (but does not guarantee that this owns it) and
+ * it was possible to confirm that, within 'timeout' seconds, if provided, with the
+ * config servers.
+ */
+ Status checkStatus(double timeout);
-}
+ /**
+ * Releases a previously taken lock. Returns true on success.
+ */
+ bool unlock(const OID& lockID);
+
+ Date_t getRemoteTime() const;
+
+ bool isRemoteTimeSkewed() const;
+
+ const std::string& getProcessId() const;
+
+ const ConnectionString& getRemoteConnection() const;
+
+ /**
+ * Checks the skew among a cluster of servers and returns true if the min and max clock
+ * times among the servers are within maxClockSkew.
+ */
+ static bool checkSkew(const ConnectionString& cluster,
+ unsigned skewChecks = NUM_LOCK_SKEW_CHECKS,
+ unsigned long long maxClockSkew = MAX_LOCK_CLOCK_SKEW,
+ unsigned long long maxNetSkew = MAX_LOCK_NET_SKEW);
+
+ /**
+ * Get the remote time from a server or cluster
+ */
+ static Date_t remoteTime(const ConnectionString& cluster,
+ unsigned long long maxNetSkew = MAX_LOCK_NET_SKEW);
+
+ /**
+ * Namespace for lock pings
+ */
+ static const std::string lockPingNS;
+
+ /**
+ * Namespace for locks
+ */
+ static const std::string locksNS;
+
+ const ConnectionString _conn;
+ const std::string _name;
+ const std::string _processId;
+
+ // Timeout for lock, usually LOCK_TIMEOUT
+ const unsigned long long _lockTimeout;
+ const unsigned long long _maxClockSkew;
+ const unsigned long long _maxNetSkew;
+ const unsigned long long _lockPing;
+private:
+ void resetLastPing() {
+ lastPings.setLastPing(_conn, _name, DistLockPingInfo());
+ }
+
+ void setLastPing(const DistLockPingInfo& pd) {
+ lastPings.setLastPing(_conn, _name, pd);
+ }
+
+ DistLockPingInfo getLastPing() {
+ return lastPings.getLastPing(_conn, _name);
+ }
+};
+}
diff --git a/src/mongo/s/catalog/legacy/legacy_dist_lock_manager.cpp b/src/mongo/s/catalog/legacy/legacy_dist_lock_manager.cpp
index 18cc7d537cf..e375c792f28 100644
--- a/src/mongo/s/catalog/legacy/legacy_dist_lock_manager.cpp
+++ b/src/mongo/s/catalog/legacy/legacy_dist_lock_manager.cpp
@@ -39,187 +39,176 @@
namespace mongo {
- using std::string;
- using std::unique_ptr;
- using stdx::chrono::milliseconds;
+using std::string;
+using std::unique_ptr;
+using stdx::chrono::milliseconds;
namespace {
- const stdx::chrono::seconds kDefaultSocketTimeout(30);
- const milliseconds kDefaultPingInterval(30 * 1000);
-} // unnamed namespace
-
- LegacyDistLockManager::LegacyDistLockManager(ConnectionString configServer):
- _configServer(std::move(configServer)),
- _isStopped(false),
- _pingerEnabled(true) {
- }
+const stdx::chrono::seconds kDefaultSocketTimeout(30);
+const milliseconds kDefaultPingInterval(30 * 1000);
+} // unnamed namespace
- void LegacyDistLockManager::startUp() {
- stdx::lock_guard<stdx::mutex> sl(_mutex);
- invariant(!_pinger);
- _pinger = stdx::make_unique<LegacyDistLockPinger>();
- }
+LegacyDistLockManager::LegacyDistLockManager(ConnectionString configServer)
+ : _configServer(std::move(configServer)), _isStopped(false), _pingerEnabled(true) {}
- void LegacyDistLockManager::shutDown() {
- stdx::unique_lock<stdx::mutex> sl(_mutex);
- _isStopped = true;
+void LegacyDistLockManager::startUp() {
+ stdx::lock_guard<stdx::mutex> sl(_mutex);
+ invariant(!_pinger);
+ _pinger = stdx::make_unique<LegacyDistLockPinger>();
+}
- while (!_lockMap.empty()) {
- _noLocksCV.wait(sl);
- }
+void LegacyDistLockManager::shutDown() {
+ stdx::unique_lock<stdx::mutex> sl(_mutex);
+ _isStopped = true;
- if (_pinger) {
- _pinger->shutdown();
- }
+ while (!_lockMap.empty()) {
+ _noLocksCV.wait(sl);
}
- StatusWith<DistLockManager::ScopedDistLock> LegacyDistLockManager::lock(
- StringData name,
- StringData whyMessage,
- milliseconds waitFor,
- milliseconds lockTryInterval) {
-
- auto distLock = stdx::make_unique<DistributedLock>(_configServer, name.toString());
+ if (_pinger) {
+ _pinger->shutdown();
+ }
+}
- {
- stdx::lock_guard<stdx::mutex> sl(_mutex);
+StatusWith<DistLockManager::ScopedDistLock> LegacyDistLockManager::lock(
+ StringData name, StringData whyMessage, milliseconds waitFor, milliseconds lockTryInterval) {
+ auto distLock = stdx::make_unique<DistributedLock>(_configServer, name.toString());
- if (_isStopped) {
- return Status(ErrorCodes::LockBusy, "legacy distlock manager is stopped");
- }
+ {
+ stdx::lock_guard<stdx::mutex> sl(_mutex);
- if (_pingerEnabled) {
- auto pingStatus = _pinger->startPing(*(distLock.get()), kDefaultPingInterval);
- if (!pingStatus.isOK()) {
- return pingStatus;
- }
- }
+ if (_isStopped) {
+ return Status(ErrorCodes::LockBusy, "legacy distlock manager is stopped");
}
- auto lastStatus = Status(ErrorCodes::LockBusy,
- str::stream() << "timed out waiting for " << name);
-
- Timer timer;
- Timer msgTimer;
- while (waitFor <= milliseconds::zero() || milliseconds(timer.millis()) < waitFor) {
- bool acquired = false;
- BSONObj lockDoc;
- try {
- acquired = distLock->lock_try(whyMessage.toString(),
- &lockDoc,
- kDefaultSocketTimeout.count());
-
- if (!acquired) {
- lastStatus = Status(ErrorCodes::LockBusy,
- str::stream() << "Lock for " << whyMessage
- << " is taken.");
- }
+ if (_pingerEnabled) {
+ auto pingStatus = _pinger->startPing(*(distLock.get()), kDefaultPingInterval);
+ if (!pingStatus.isOK()) {
+ return pingStatus;
}
- catch (const LockException& lockExcep) {
- OID needUnlockID(lockExcep.getMustUnlockID());
- if (needUnlockID.isSet()) {
- _pinger->addUnlockOID(needUnlockID);
- }
+ }
+ }
- lastStatus = lockExcep.toStatus();
+ auto lastStatus =
+ Status(ErrorCodes::LockBusy, str::stream() << "timed out waiting for " << name);
+
+ Timer timer;
+ Timer msgTimer;
+ while (waitFor <= milliseconds::zero() || milliseconds(timer.millis()) < waitFor) {
+ bool acquired = false;
+ BSONObj lockDoc;
+ try {
+ acquired =
+ distLock->lock_try(whyMessage.toString(), &lockDoc, kDefaultSocketTimeout.count());
+
+ if (!acquired) {
+ lastStatus = Status(ErrorCodes::LockBusy,
+ str::stream() << "Lock for " << whyMessage << " is taken.");
}
- catch (...) {
- lastStatus = exceptionToStatus();
+ } catch (const LockException& lockExcep) {
+ OID needUnlockID(lockExcep.getMustUnlockID());
+ if (needUnlockID.isSet()) {
+ _pinger->addUnlockOID(needUnlockID);
}
- if (acquired) {
- verify(!lockDoc.isEmpty());
+ lastStatus = lockExcep.toStatus();
+ } catch (...) {
+ lastStatus = exceptionToStatus();
+ }
- LocksType lock;
- string errMsg;
- if (!lock.parseBSON(lockDoc, &errMsg)) {
- return StatusWith<ScopedDistLock>(
- ErrorCodes::UnsupportedFormat,
- str::stream() << "error while parsing lock document: " << errMsg);
- }
+ if (acquired) {
+ verify(!lockDoc.isEmpty());
- dassert(lock.isLockIDSet());
+ LocksType lock;
+ string errMsg;
+ if (!lock.parseBSON(lockDoc, &errMsg)) {
+ return StatusWith<ScopedDistLock>(
+ ErrorCodes::UnsupportedFormat,
+ str::stream() << "error while parsing lock document: " << errMsg);
+ }
- {
- stdx::lock_guard<stdx::mutex> sl(_mutex);
- _lockMap.insert(std::make_pair(lock.getLockID(), std::move(distLock)));
- }
+ dassert(lock.isLockIDSet());
- return ScopedDistLock(lock.getLockID(), this);
+ {
+ stdx::lock_guard<stdx::mutex> sl(_mutex);
+ _lockMap.insert(std::make_pair(lock.getLockID(), std::move(distLock)));
}
- if (waitFor == milliseconds::zero()) break;
+ return ScopedDistLock(lock.getLockID(), this);
+ }
- if (lastStatus != ErrorCodes::LockBusy) {
- return lastStatus;
- }
+ if (waitFor == milliseconds::zero())
+ break;
- // Periodically message for debugging reasons
- if (msgTimer.seconds() > 10) {
- log() << "waited " << timer.seconds() << "s for distributed lock " << name
- << " for " << whyMessage << ": " << lastStatus.toString();
+ if (lastStatus != ErrorCodes::LockBusy) {
+ return lastStatus;
+ }
- msgTimer.reset();
- }
+ // Periodically message for debugging reasons
+ if (msgTimer.seconds() > 10) {
+ log() << "waited " << timer.seconds() << "s for distributed lock " << name << " for "
+ << whyMessage << ": " << lastStatus.toString();
- milliseconds timeRemaining =
- std::max(milliseconds::zero(), waitFor - milliseconds(timer.millis()));
- sleepFor(std::min(lockTryInterval, timeRemaining));
+ msgTimer.reset();
}
- return lastStatus;
+ milliseconds timeRemaining =
+ std::max(milliseconds::zero(), waitFor - milliseconds(timer.millis()));
+ sleepFor(std::min(lockTryInterval, timeRemaining));
}
- void LegacyDistLockManager::unlock(const DistLockHandle& lockHandle) BOOST_NOEXCEPT {
- unique_ptr<DistributedLock> distLock;
+ return lastStatus;
+}
- {
- stdx::lock_guard<stdx::mutex> sl(_mutex);
- auto iter = _lockMap.find(lockHandle);
- invariant(iter != _lockMap.end());
+void LegacyDistLockManager::unlock(const DistLockHandle& lockHandle) BOOST_NOEXCEPT {
+ unique_ptr<DistributedLock> distLock;
- distLock = std::move(iter->second);
- _lockMap.erase(iter);
- }
+ {
+ stdx::lock_guard<stdx::mutex> sl(_mutex);
+ auto iter = _lockMap.find(lockHandle);
+ invariant(iter != _lockMap.end());
- if (!distLock->unlock(lockHandle)) {
- _pinger->addUnlockOID(lockHandle);
- }
+ distLock = std::move(iter->second);
+ _lockMap.erase(iter);
+ }
- {
- stdx::lock_guard<stdx::mutex> sl(_mutex);
- if (_lockMap.empty()) {
- _noLocksCV.notify_all();
- }
- }
+ if (!distLock->unlock(lockHandle)) {
+ _pinger->addUnlockOID(lockHandle);
}
- Status LegacyDistLockManager::checkStatus(const DistLockHandle& lockHandle) {
- // Note: this should not happen when locks are managed through ScopedDistLock.
- if (_pinger->willUnlockOID(lockHandle)) {
- return Status(ErrorCodes::LockFailed,
- str::stream() << "lock " << lockHandle << " is not held and "
- << "is currently being scheduled for lazy unlock");
+ {
+ stdx::lock_guard<stdx::mutex> sl(_mutex);
+ if (_lockMap.empty()) {
+ _noLocksCV.notify_all();
}
+ }
+}
- DistributedLock* distLock = nullptr;
+Status LegacyDistLockManager::checkStatus(const DistLockHandle& lockHandle) {
+ // Note: this should not happen when locks are managed through ScopedDistLock.
+ if (_pinger->willUnlockOID(lockHandle)) {
+ return Status(ErrorCodes::LockFailed,
+ str::stream() << "lock " << lockHandle << " is not held and "
+ << "is currently being scheduled for lazy unlock");
+ }
- {
- // Assumption: lockHandles are never shared across threads.
- stdx::lock_guard<stdx::mutex> sl(_mutex);
- auto iter = _lockMap.find(lockHandle);
- invariant(iter != _lockMap.end());
+ DistributedLock* distLock = nullptr;
- distLock = iter->second.get();
- }
+ {
+ // Assumption: lockHandles are never shared across threads.
+ stdx::lock_guard<stdx::mutex> sl(_mutex);
+ auto iter = _lockMap.find(lockHandle);
+ invariant(iter != _lockMap.end());
- return distLock->checkStatus(kDefaultSocketTimeout.count());
+ distLock = iter->second.get();
}
- void LegacyDistLockManager::enablePinger(bool enable) {
- stdx::lock_guard<stdx::mutex> sl(_mutex);
- _pingerEnabled = enable;
- }
+ return distLock->checkStatus(kDefaultSocketTimeout.count());
+}
+void LegacyDistLockManager::enablePinger(bool enable) {
+ stdx::lock_guard<stdx::mutex> sl(_mutex);
+ _pingerEnabled = enable;
+}
}
diff --git a/src/mongo/s/catalog/legacy/legacy_dist_lock_manager.h b/src/mongo/s/catalog/legacy/legacy_dist_lock_manager.h
index 50a6088c333..238989a3cdb 100644
--- a/src/mongo/s/catalog/legacy/legacy_dist_lock_manager.h
+++ b/src/mongo/s/catalog/legacy/legacy_dist_lock_manager.h
@@ -41,46 +41,43 @@
namespace mongo {
- class DistributedLock;
+class DistributedLock;
- class LegacyDistLockManager: public DistLockManager {
- public:
- explicit LegacyDistLockManager(ConnectionString configServer);
+class LegacyDistLockManager : public DistLockManager {
+public:
+ explicit LegacyDistLockManager(ConnectionString configServer);
- virtual ~LegacyDistLockManager() = default;
+ virtual ~LegacyDistLockManager() = default;
- virtual void startUp() override;
- virtual void shutDown() override;
+ virtual void startUp() override;
+ virtual void shutDown() override;
- virtual StatusWith<DistLockManager::ScopedDistLock> lock(
- StringData name,
- StringData whyMessage,
- stdx::chrono::milliseconds waitFor,
- stdx::chrono::milliseconds lockTryInterval) override;
+ virtual StatusWith<DistLockManager::ScopedDistLock> lock(
+ StringData name,
+ StringData whyMessage,
+ stdx::chrono::milliseconds waitFor,
+ stdx::chrono::milliseconds lockTryInterval) override;
- // For testing only.
- void enablePinger(bool enable);
+ // For testing only.
+ void enablePinger(bool enable);
- protected:
+protected:
+ virtual void unlock(const DistLockHandle& lockHandle) BOOST_NOEXCEPT override;
- virtual void unlock(const DistLockHandle& lockHandle) BOOST_NOEXCEPT override;
+ virtual Status checkStatus(const DistLockHandle& lockHandle) override;
- virtual Status checkStatus(const DistLockHandle& lockHandle) override;
+private:
+ const ConnectionString _configServer;
- private:
+ stdx::mutex _mutex;
+ stdx::condition_variable _noLocksCV;
+ std::map<DistLockHandle, std::unique_ptr<DistributedLock>> _lockMap;
- const ConnectionString _configServer;
+ std::unique_ptr<LegacyDistLockPinger> _pinger;
- stdx::mutex _mutex;
- stdx::condition_variable _noLocksCV;
- std::map<DistLockHandle, std::unique_ptr<DistributedLock>> _lockMap;
-
- std::unique_ptr<LegacyDistLockPinger> _pinger;
-
- bool _isStopped;
-
- // For testing only.
- bool _pingerEnabled;
- };
+ bool _isStopped;
+ // For testing only.
+ bool _pingerEnabled;
+};
}
diff --git a/src/mongo/s/catalog/legacy/legacy_dist_lock_pinger.cpp b/src/mongo/s/catalog/legacy/legacy_dist_lock_pinger.cpp
index 5e5f03b0864..ef5299fd558 100644
--- a/src/mongo/s/catalog/legacy/legacy_dist_lock_pinger.cpp
+++ b/src/mongo/s/catalog/legacy/legacy_dist_lock_pinger.cpp
@@ -42,318 +42,304 @@
namespace mongo {
- using std::string;
+using std::string;
namespace {
- string pingThreadId(const ConnectionString& conn, const string& processId) {
- return conn.toString() + "/" + processId;
- }
+string pingThreadId(const ConnectionString& conn, const string& processId) {
+ return conn.toString() + "/" + processId;
+}
}
- void LegacyDistLockPinger::_distLockPingThread(ConnectionString addr,
- const string& process,
- Milliseconds sleepTime) {
- setThreadName("LockPinger");
-
- string pingId = pingThreadId(addr, process);
-
- LOG(0) << "creating distributed lock ping thread for " << addr
- << " and process " << process << " (sleeping for " << sleepTime.count() << "ms)";
-
- static int loops = 0;
- Date_t lastPingTime = jsTime();
- while (!shouldStopPinging(addr, process)) {
+void LegacyDistLockPinger::_distLockPingThread(ConnectionString addr,
+ const string& process,
+ Milliseconds sleepTime) {
+ setThreadName("LockPinger");
- LOG(3) << "distributed lock pinger '" << pingId << "' about to ping.";
+ string pingId = pingThreadId(addr, process);
- Date_t pingTime;
+ LOG(0) << "creating distributed lock ping thread for " << addr << " and process " << process
+ << " (sleeping for " << sleepTime.count() << "ms)";
- try {
- ScopedDbConnection conn(addr.toString(), 30.0);
+ static int loops = 0;
+ Date_t lastPingTime = jsTime();
+ while (!shouldStopPinging(addr, process)) {
+ LOG(3) << "distributed lock pinger '" << pingId << "' about to ping.";
- pingTime = jsTime();
- const auto elapsed = pingTime - lastPingTime;
- if (elapsed > 10 * sleepTime) {
- warning() << "Lock pinger for addr: " << addr
- << ", proc: " << process
- << " was inactive for " << elapsed;
- }
+ Date_t pingTime;
- lastPingTime = pingTime;
+ try {
+ ScopedDbConnection conn(addr.toString(), 30.0);
- // Refresh the entry corresponding to this process in the lockpings collection.
- conn->update(LockpingsType::ConfigNS,
- BSON(LockpingsType::process(process)),
- BSON("$set" << BSON(LockpingsType::ping(pingTime))),
- true);
+ pingTime = jsTime();
+ const auto elapsed = pingTime - lastPingTime;
+ if (elapsed > 10 * sleepTime) {
+ warning() << "Lock pinger for addr: " << addr << ", proc: " << process
+ << " was inactive for " << elapsed;
+ }
- string err = conn->getLastError();
- if (!err.empty()) {
- warning() << "pinging failed for distributed lock pinger '" << pingId << "'."
- << causedBy(err);
- conn.done();
+ lastPingTime = pingTime;
- if (!shouldStopPinging(addr, process)) {
- waitTillNextPingTime(sleepTime);
- }
- continue;
- }
+ // Refresh the entry corresponding to this process in the lockpings collection.
+ conn->update(LockpingsType::ConfigNS,
+ BSON(LockpingsType::process(process)),
+ BSON("$set" << BSON(LockpingsType::ping(pingTime))),
+ true);
- // Remove really old entries from the lockpings collection if they're not
- // holding a lock. This may happen if an instance of a process was taken down
- // and no new instance came up to replace it for a quite a while.
- // NOTE this is NOT the same as the standard take-over mechanism, which forces
- // the lock entry.
- BSONObj fieldsToReturn = BSON(LocksType::state() << 1
- << LocksType::process() << 1);
- auto activeLocks =
- conn->query(LocksType::ConfigNS,
- BSON(LocksType::state() << NE << LocksType::UNLOCKED));
-
- uassert(16060,
- str::stream() << "cannot query locks collection on config server "
- << conn.getHost(),
- activeLocks.get());
-
- std::set<string> pids;
- while (activeLocks->more()) {
- BSONObj lock = activeLocks->nextSafe();
-
- if (!lock[LocksType::process()].eoo()) {
- pids.insert(lock[LocksType::process()].str());
- }
- else {
- warning() << "found incorrect lock document during lock ping cleanup: "
- << lock.toString();
- }
- }
+ string err = conn->getLastError();
+ if (!err.empty()) {
+ warning() << "pinging failed for distributed lock pinger '" << pingId << "'."
+ << causedBy(err);
+ conn.done();
- // This can potentially delete ping entries that are actually active (if the clock
- // of another pinger is too skewed). This is still fine as the lock logic only
- // checks if there is a change in the ping document and the document going away
- // is a valid change.
- Date_t fourDays = pingTime - stdx::chrono::hours{4 * 24};
- conn->remove(LockpingsType::ConfigNS,
- BSON(LockpingsType::process() << NIN << pids
- << LockpingsType::ping() << LT << fourDays));
- err = conn->getLastError();
-
- if (!err.empty()) {
- warning() << "ping cleanup for distributed lock pinger '" << pingId
- << " failed." << causedBy(err);
- conn.done();
-
- if (!shouldStopPinging(addr, process)) {
- waitTillNextPingTime(sleepTime);
- }
- continue;
+ if (!shouldStopPinging(addr, process)) {
+ waitTillNextPingTime(sleepTime);
}
+ continue;
+ }
- LOG(1 - (loops % 10 == 0 ? 1 : 0)) << "cluster " << addr
- << " pinged successfully at " << pingTime
- << " by distributed lock pinger '" << pingId
- << "', sleeping for " << sleepTime.count() << "ms";
-
- // Remove old locks, if possible
- // Make sure no one else is adding to this list at the same time
- stdx::lock_guard<stdx::mutex> lk(_mutex);
-
- int numOldLocks = _unlockList.size();
- if (numOldLocks > 0) {
- LOG(0) << "trying to delete " << _unlockList.size()
- << " old lock entries for process " << process;
+ // Remove really old entries from the lockpings collection if they're not
+ // holding a lock. This may happen if an instance of a process was taken down
+ // and no new instance came up to replace it for a quite a while.
+ // NOTE this is NOT the same as the standard take-over mechanism, which forces
+ // the lock entry.
+ BSONObj fieldsToReturn = BSON(LocksType::state() << 1 << LocksType::process() << 1);
+ auto activeLocks = conn->query(LocksType::ConfigNS,
+ BSON(LocksType::state() << NE << LocksType::UNLOCKED));
+
+ uassert(16060,
+ str::stream() << "cannot query locks collection on config server "
+ << conn.getHost(),
+ activeLocks.get());
+
+ std::set<string> pids;
+ while (activeLocks->more()) {
+ BSONObj lock = activeLocks->nextSafe();
+
+ if (!lock[LocksType::process()].eoo()) {
+ pids.insert(lock[LocksType::process()].str());
+ } else {
+ warning() << "found incorrect lock document during lock ping cleanup: "
+ << lock.toString();
}
+ }
- bool removed = false;
- for (auto iter = _unlockList.begin(); iter != _unlockList.end();
- iter = (removed ? _unlockList.erase(iter) : ++iter)) {
- removed = false;
- try {
- // Got DistLockHandle from lock, so we don't need to specify _id again
- conn->update(LocksType::ConfigNS,
- BSON(LocksType::lockID(*iter)),
- BSON("$set" << BSON( LocksType::state(LocksType::UNLOCKED))));
-
- // Either the update went through or it didn't,
- // either way we're done trying to unlock.
- LOG(0) << "handled late remove of old distributed lock with ts " << *iter;
- removed = true;
- }
- catch (UpdateNotTheSame&) {
- LOG(0) << "partially removed old distributed lock with ts " << *iter;
- removed = true;
- }
- catch (std::exception& e) {
- warning() << "could not remove old distributed lock with ts " << *iter
- << causedBy(e);
- }
+ // This can potentially delete ping entries that are actually active (if the clock
+ // of another pinger is too skewed). This is still fine as the lock logic only
+ // checks if there is a change in the ping document and the document going away
+ // is a valid change.
+ Date_t fourDays = pingTime - stdx::chrono::hours{4 * 24};
+ conn->remove(LockpingsType::ConfigNS,
+ BSON(LockpingsType::process() << NIN << pids << LockpingsType::ping() << LT
+ << fourDays));
+ err = conn->getLastError();
+
+ if (!err.empty()) {
+ warning() << "ping cleanup for distributed lock pinger '" << pingId << " failed."
+ << causedBy(err);
+ conn.done();
+ if (!shouldStopPinging(addr, process)) {
+ waitTillNextPingTime(sleepTime);
}
+ continue;
+ }
- if (numOldLocks > 0 && _unlockList.size() > 0) {
- LOG(0) << "not all old lock entries could be removed for process " << process;
- }
+ LOG(1 - (loops % 10 == 0 ? 1 : 0)) << "cluster " << addr << " pinged successfully at "
+ << pingTime << " by distributed lock pinger '"
+ << pingId << "', sleeping for " << sleepTime.count()
+ << "ms";
- conn.done();
+ // Remove old locks, if possible
+ // Make sure no one else is adding to this list at the same time
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ int numOldLocks = _unlockList.size();
+ if (numOldLocks > 0) {
+ LOG(0) << "trying to delete " << _unlockList.size()
+ << " old lock entries for process " << process;
}
- catch (std::exception& e) {
- warning() << "distributed lock pinger '" << pingId
- << "' detected an exception while pinging." << causedBy(e);
+
+ bool removed = false;
+ for (auto iter = _unlockList.begin(); iter != _unlockList.end();
+ iter = (removed ? _unlockList.erase(iter) : ++iter)) {
+ removed = false;
+ try {
+ // Got DistLockHandle from lock, so we don't need to specify _id again
+ conn->update(LocksType::ConfigNS,
+ BSON(LocksType::lockID(*iter)),
+ BSON("$set" << BSON(LocksType::state(LocksType::UNLOCKED))));
+
+ // Either the update went through or it didn't,
+ // either way we're done trying to unlock.
+ LOG(0) << "handled late remove of old distributed lock with ts " << *iter;
+ removed = true;
+ } catch (UpdateNotTheSame&) {
+ LOG(0) << "partially removed old distributed lock with ts " << *iter;
+ removed = true;
+ } catch (std::exception& e) {
+ warning() << "could not remove old distributed lock with ts " << *iter
+ << causedBy(e);
+ }
}
- if (!shouldStopPinging(addr, process)) {
- waitTillNextPingTime(sleepTime);
+ if (numOldLocks > 0 && _unlockList.size() > 0) {
+ LOG(0) << "not all old lock entries could be removed for process " << process;
}
- }
- warning() << "removing distributed lock ping thread '" << pingId << "'";
+ conn.done();
- if (shouldStopPinging(addr, process)) {
- acknowledgeStopPing(addr, process);
+ } catch (std::exception& e) {
+ warning() << "distributed lock pinger '" << pingId
+ << "' detected an exception while pinging." << causedBy(e);
}
- }
- void LegacyDistLockPinger::distLockPingThread(ConnectionString addr,
- long long clockSkew,
- const std::string& processId,
- stdx::chrono::milliseconds sleepTime) {
- try {
- jsTimeVirtualThreadSkew(clockSkew);
- _distLockPingThread(addr, processId, sleepTime);
- }
- catch (std::exception& e) {
- error() << "unexpected error while running distributed lock pinger for " << addr
- << ", process " << processId << causedBy(e);
- }
- catch (...) {
- error() << "unknown error while running distributed lock pinger for " << addr
- << ", process " << processId;
+ if (!shouldStopPinging(addr, process)) {
+ waitTillNextPingTime(sleepTime);
}
}
- Status LegacyDistLockPinger::startPing(const DistributedLock& lock,
- stdx::chrono::milliseconds sleepTime) {
- const ConnectionString& conn = lock.getRemoteConnection();
- const string& processId = lock.getProcessId();
- string pingID = pingThreadId(conn, processId);
+ warning() << "removing distributed lock ping thread '" << pingId << "'";
- {
- // Make sure we don't start multiple threads for a process id.
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ if (shouldStopPinging(addr, process)) {
+ acknowledgeStopPing(addr, process);
+ }
+}
- if (_inShutdown) {
- return Status(ErrorCodes::ShutdownInProgress,
- "shutting down, will not start ping");
- }
+void LegacyDistLockPinger::distLockPingThread(ConnectionString addr,
+ long long clockSkew,
+ const std::string& processId,
+ stdx::chrono::milliseconds sleepTime) {
+ try {
+ jsTimeVirtualThreadSkew(clockSkew);
+ _distLockPingThread(addr, processId, sleepTime);
+ } catch (std::exception& e) {
+ error() << "unexpected error while running distributed lock pinger for " << addr
+ << ", process " << processId << causedBy(e);
+ } catch (...) {
+ error() << "unknown error while running distributed lock pinger for " << addr
+ << ", process " << processId;
+ }
+}
- // Ignore if we already have a pinging thread for this process.
- if (_seen.count(pingID) > 0) {
- return Status::OK();
- }
+Status LegacyDistLockPinger::startPing(const DistributedLock& lock,
+ stdx::chrono::milliseconds sleepTime) {
+ const ConnectionString& conn = lock.getRemoteConnection();
+ const string& processId = lock.getProcessId();
+ string pingID = pingThreadId(conn, processId);
- // Check the config server clock skew.
- if (lock.isRemoteTimeSkewed()) {
- return Status(ErrorCodes::DistributedClockSkewed,
- str::stream() << "clock skew of the cluster " << conn.toString()
- << " is too far out of bounds "
- << "to allow distributed locking.");
- }
+ {
+ // Make sure we don't start multiple threads for a process id.
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+
+ if (_inShutdown) {
+ return Status(ErrorCodes::ShutdownInProgress, "shutting down, will not start ping");
}
- {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- stdx::thread thread(stdx::bind(&LegacyDistLockPinger::distLockPingThread,
- this,
- conn,
- getJSTimeVirtualThreadSkew(),
- processId,
- sleepTime));
- _pingThreads.insert(std::make_pair(pingID, std::move(thread)));
-
- _seen.insert(pingID);
+ // Ignore if we already have a pinging thread for this process.
+ if (_seen.count(pingID) > 0) {
+ return Status::OK();
}
- return Status::OK();
+ // Check the config server clock skew.
+ if (lock.isRemoteTimeSkewed()) {
+ return Status(ErrorCodes::DistributedClockSkewed,
+ str::stream() << "clock skew of the cluster " << conn.toString()
+ << " is too far out of bounds "
+ << "to allow distributed locking.");
+ }
}
- void LegacyDistLockPinger::addUnlockOID(const DistLockHandle& lockID) {
- // Modifying the lock from some other thread
+ {
stdx::lock_guard<stdx::mutex> lk(_mutex);
- _unlockList.push_back(lockID);
+ stdx::thread thread(stdx::bind(&LegacyDistLockPinger::distLockPingThread,
+ this,
+ conn,
+ getJSTimeVirtualThreadSkew(),
+ processId,
+ sleepTime));
+ _pingThreads.insert(std::make_pair(pingID, std::move(thread)));
+
+ _seen.insert(pingID);
}
- bool LegacyDistLockPinger::willUnlockOID(const DistLockHandle& lockID) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- return find(_unlockList.begin(), _unlockList.end(), lockID) != _unlockList.end();
- }
+ return Status::OK();
+}
- void LegacyDistLockPinger::stopPing(const ConnectionString& conn, const string& processId) {
- {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+void LegacyDistLockPinger::addUnlockOID(const DistLockHandle& lockID) {
+ // Modifying the lock from some other thread
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _unlockList.push_back(lockID);
+}
- string pingId = pingThreadId(conn, processId);
+bool LegacyDistLockPinger::willUnlockOID(const DistLockHandle& lockID) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ return find(_unlockList.begin(), _unlockList.end(), lockID) != _unlockList.end();
+}
- verify(_seen.count(pingId) > 0);
- _kill.insert(pingId);
- _pingStoppedCV.notify_all();
- }
- }
+void LegacyDistLockPinger::stopPing(const ConnectionString& conn, const string& processId) {
+ {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
- void LegacyDistLockPinger::shutdown() {
- {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- _inShutdown = true;
- _pingStoppedCV.notify_all();
- }
+ string pingId = pingThreadId(conn, processId);
- // Don't grab _mutex, otherwise will deadlock trying to join. Safe to read
- // _pingThreads since it cannot be modified once _shutdown is true.
- for (auto& idToThread : _pingThreads) {
- if (idToThread.second.joinable()) {
- idToThread.second.join();
- }
- }
+ verify(_seen.count(pingId) > 0);
+ _kill.insert(pingId);
+ _pingStoppedCV.notify_all();
}
+}
- bool LegacyDistLockPinger::shouldStopPinging(const ConnectionString& conn,
- const string& processId) {
- if (inShutdown()) {
- return true;
- }
-
+void LegacyDistLockPinger::shutdown() {
+ {
stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _inShutdown = true;
+ _pingStoppedCV.notify_all();
+ }
- if (_inShutdown) {
- return true;
+ // Don't grab _mutex, otherwise will deadlock trying to join. Safe to read
+ // _pingThreads since it cannot be modified once _shutdown is true.
+ for (auto& idToThread : _pingThreads) {
+ if (idToThread.second.joinable()) {
+ idToThread.second.join();
}
+ }
+}
- return _kill.count(pingThreadId(conn, processId)) > 0;
+bool LegacyDistLockPinger::shouldStopPinging(const ConnectionString& conn,
+ const string& processId) {
+ if (inShutdown()) {
+ return true;
}
- void LegacyDistLockPinger::acknowledgeStopPing(const ConnectionString& addr,
- const string& processId) {
- {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
- string pingId = pingThreadId(addr, processId);
+ if (_inShutdown) {
+ return true;
+ }
- _kill.erase(pingId);
- _seen.erase(pingId);
- }
+ return _kill.count(pingThreadId(conn, processId)) > 0;
+}
- try {
- ScopedDbConnection conn(addr.toString(), 30.0);
- conn->remove(LockpingsType::ConfigNS, BSON(LockpingsType::process(processId)));
- }
- catch (const DBException& ex) {
- warning() << "Error encountered while stopping ping on " << processId
- << causedBy(ex);
- }
+void LegacyDistLockPinger::acknowledgeStopPing(const ConnectionString& addr,
+ const string& processId) {
+ {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+
+ string pingId = pingThreadId(addr, processId);
+
+ _kill.erase(pingId);
+ _seen.erase(pingId);
}
- void LegacyDistLockPinger::waitTillNextPingTime(stdx::chrono::milliseconds duration) {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- _pingStoppedCV.wait_for(lk, duration);
+ try {
+ ScopedDbConnection conn(addr.toString(), 30.0);
+ conn->remove(LockpingsType::ConfigNS, BSON(LockpingsType::process(processId)));
+ } catch (const DBException& ex) {
+ warning() << "Error encountered while stopping ping on " << processId << causedBy(ex);
}
}
+
+void LegacyDistLockPinger::waitTillNextPingTime(stdx::chrono::milliseconds duration) {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ _pingStoppedCV.wait_for(lk, duration);
+}
+}
diff --git a/src/mongo/s/catalog/legacy/legacy_dist_lock_pinger.h b/src/mongo/s/catalog/legacy/legacy_dist_lock_pinger.h
index 777cb39314b..41874a59d13 100644
--- a/src/mongo/s/catalog/legacy/legacy_dist_lock_pinger.h
+++ b/src/mongo/s/catalog/legacy/legacy_dist_lock_pinger.h
@@ -43,98 +43,97 @@
namespace mongo {
- class DistributedLock;
-
- class LegacyDistLockPinger {
- public:
- LegacyDistLockPinger() = default;
-
- /**
- * Starts pinging the process id for the given lock.
- */
- Status startPing(const DistributedLock& lock, Milliseconds sleepTime);
-
- /**
- * Adds a distributed lock that has the given id to the unlock list. The unlock list
- * contains the list of locks that this pinger will repeatedly attempt to unlock until
- * it succeeds.
- */
- void addUnlockOID(const DistLockHandle& lockID);
-
- /**
- * Returns true if the given lock id is currently in the unlock queue.
- */
- bool willUnlockOID(const DistLockHandle& lockID);
-
- /**
- * For testing only: non-blocking call to stop pinging the given process id.
- */
- void stopPing(const ConnectionString& conn, const std::string& processId);
-
- /**
- * Kills all ping threads and wait for them to cleanup.
- */
- void shutdown();
-
- private:
- /**
- * Helper method for calling _distLockPingThread.
- */
- void distLockPingThread(ConnectionString addr,
- long long clockSkew,
- const std::string& processId,
- Milliseconds sleepTime);
-
- /**
- * Function for repeatedly pinging the process id. Also attempts to unlock all the
- * locks in the unlock list.
- */
- void _distLockPingThread(ConnectionString addr,
- const std::string& process,
- Milliseconds sleepTime);
-
- /**
- * Returns true if a request has been made to stop pinging the give process id.
- */
- bool shouldStopPinging(const ConnectionString& conn, const std::string& processId);
-
- /**
- * Acknowledge the stop ping request and performs the necessary cleanup.
- */
- void acknowledgeStopPing(const ConnectionString& conn, const std::string& processId);
-
- /**
- * Blocks until duration has elapsed or if the ping thread is interrupted.
- */
- void waitTillNextPingTime(Milliseconds duration);
-
- //
- // All member variables are labeled with one of the following codes indicating the
- // synchronization rules for accessing them.
- //
- // (M) Must hold _mutex for access.
-
- stdx::mutex _mutex;
-
- // Triggered everytime a pinger thread is stopped.
- stdx::condition_variable _pingStoppedCV; // (M)
-
- // pingID -> thread
- // This can contain multiple elements in tests, but in tne normal case, this will
- // contain only a single element.
- // Note: can be safely read when _inShutdown is true.
- std::map<std::string, stdx::thread> _pingThreads; // (M*)
-
- // Contains the list of process id to stopPing.
- std::set<std::string> _kill; // (M)
-
- // Contains all of the process id to ping.
- std::set<std::string> _seen; // (M)
-
- // Contains all lock ids to keeping on retrying to unlock until success.
- std::list<DistLockHandle> _unlockList; // (M)
-
- bool _inShutdown = false; // (M)
- };
-
+class DistributedLock;
+
+class LegacyDistLockPinger {
+public:
+ LegacyDistLockPinger() = default;
+
+ /**
+ * Starts pinging the process id for the given lock.
+ */
+ Status startPing(const DistributedLock& lock, Milliseconds sleepTime);
+
+ /**
+ * Adds a distributed lock that has the given id to the unlock list. The unlock list
+ * contains the list of locks that this pinger will repeatedly attempt to unlock until
+ * it succeeds.
+ */
+ void addUnlockOID(const DistLockHandle& lockID);
+
+ /**
+ * Returns true if the given lock id is currently in the unlock queue.
+ */
+ bool willUnlockOID(const DistLockHandle& lockID);
+
+ /**
+ * For testing only: non-blocking call to stop pinging the given process id.
+ */
+ void stopPing(const ConnectionString& conn, const std::string& processId);
+
+ /**
+ * Kills all ping threads and wait for them to cleanup.
+ */
+ void shutdown();
+
+private:
+ /**
+ * Helper method for calling _distLockPingThread.
+ */
+ void distLockPingThread(ConnectionString addr,
+ long long clockSkew,
+ const std::string& processId,
+ Milliseconds sleepTime);
+
+ /**
+ * Function for repeatedly pinging the process id. Also attempts to unlock all the
+ * locks in the unlock list.
+ */
+ void _distLockPingThread(ConnectionString addr,
+ const std::string& process,
+ Milliseconds sleepTime);
+
+ /**
+ * Returns true if a request has been made to stop pinging the give process id.
+ */
+ bool shouldStopPinging(const ConnectionString& conn, const std::string& processId);
+
+ /**
+ * Acknowledge the stop ping request and performs the necessary cleanup.
+ */
+ void acknowledgeStopPing(const ConnectionString& conn, const std::string& processId);
+
+ /**
+ * Blocks until duration has elapsed or if the ping thread is interrupted.
+ */
+ void waitTillNextPingTime(Milliseconds duration);
+
+ //
+ // All member variables are labeled with one of the following codes indicating the
+ // synchronization rules for accessing them.
+ //
+ // (M) Must hold _mutex for access.
+
+ stdx::mutex _mutex;
+
+ // Triggered everytime a pinger thread is stopped.
+ stdx::condition_variable _pingStoppedCV; // (M)
+
+ // pingID -> thread
+ // This can contain multiple elements in tests, but in tne normal case, this will
+ // contain only a single element.
+ // Note: can be safely read when _inShutdown is true.
+ std::map<std::string, stdx::thread> _pingThreads; // (M*)
+
+ // Contains the list of process id to stopPing.
+ std::set<std::string> _kill; // (M)
+
+ // Contains all of the process id to ping.
+ std::set<std::string> _seen; // (M)
+
+ // Contains all lock ids to keeping on retrying to unlock until success.
+ std::list<DistLockHandle> _unlockList; // (M)
+
+ bool _inShutdown = false; // (M)
+};
}