/**
* Copyright (C) 2012 10gen Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects
* for all of the code used other than as permitted herein. If you modify
* file(s) with this exception, you may extend this exception to your
* version of the file(s), but you are not obligated to do so. If you do not
* wish to do so, delete this exception statement from your version. If you
* delete this exception statement from all source files in the program,
* then also delete it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
#include "mongo/s/config_upgrade.h"
#include
#include "mongo/base/init.h"
#include "mongo/client/dbclientcursor.h"
#include "mongo/s/cluster_client_internal.h"
#include "mongo/s/distlock.h"
#include "mongo/s/mongo_version_range.h"
#include "mongo/s/type_config_version.h"
#include "mongo/s/type_database.h"
#include "mongo/s/type_settings.h"
#include "mongo/s/type_shard.h"
#include "mongo/stdx/functional.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/log.h"
#include "mongo/util/version.h"
namespace mongo {
using boost::scoped_ptr;
using mongoutils::str::stream;
//
// BEGIN CONFIG UPGRADE REGISTRATION
//
struct VersionRange {
VersionRange(int _minCompatibleVersion, int _currentVersion) :
minCompatibleVersion(_minCompatibleVersion), currentVersion(_currentVersion)
{
}
bool operator==(const VersionRange& other) const {
return (other.minCompatibleVersion == minCompatibleVersion)
&& (other.currentVersion == currentVersion);
}
bool operator!=(const VersionRange& other) const {
return !(*this == other);
}
int minCompatibleVersion;
int currentVersion;
};
/**
* Encapsulates the information needed to register a config upgrade.
*/
struct Upgrade {
typedef stdx::function UpgradeCallback;
Upgrade(int _fromVersion,
const VersionRange& _toVersionRange,
UpgradeCallback _upgradeCallback) :
fromVersion(_fromVersion),
toVersionRange(_toVersionRange),
upgradeCallback(_upgradeCallback)
{
}
// The config version we're upgrading from
int fromVersion;
// The config version we're upgrading to and the min compatible config version (min, to)
VersionRange toVersionRange;
// The upgrade callback which performs the actual upgrade
UpgradeCallback upgradeCallback;
};
typedef map ConfigUpgradeRegistry;
void validateRegistry(const ConfigUpgradeRegistry& registry);
/**
* Creates a registry of config upgrades used by the code below.
*
* MODIFY THIS CODE HERE TO CREATE A NEW UPGRADE PATH FROM X to Y
* YOU MUST ALSO MODIFY THE VERSION DECLARATIONS IN config_upgrade.h
*
* Caveats:
* - All upgrade paths must eventually lead to the exact same version range of
* min and max compatible versions.
* - This resulting version range must be equal to:
* make_pair(MIN_COMPATIBLE_CONFIG_VERSION, CURRENT_CONFIG_VERSION)
* - There must always be an upgrade path from the empty version (0) to the latest
* config version.
*
* If any of the above is false, we fassert and fail to start.
*/
ConfigUpgradeRegistry createRegistry() {
ConfigUpgradeRegistry registry;
// v0 to v6
Upgrade v0ToV6(0, VersionRange(5, 6), doUpgradeV0ToV6);
registry.insert(make_pair(v0ToV6.fromVersion, v0ToV6));
// v5 to v6
Upgrade v5ToV6(5, VersionRange(5, 6), doUpgradeV5ToV6);
registry.insert(make_pair(v5ToV6.fromVersion, v5ToV6));
validateRegistry(registry);
return registry;
}
/**
* Does a sanity-check validation of the registry ensuring three things:
* 1. All upgrade paths lead to the same minCompatible/currentVersion
* 2. Our constants match this final version pair
* 3. There is a zero-version upgrade path
*/
void validateRegistry(const ConfigUpgradeRegistry& registry) {
VersionRange maxCompatibleConfigVersionRange(-1, -1);
bool hasZeroVersionUpgrade = false;
for (ConfigUpgradeRegistry::const_iterator it = registry.begin(); it != registry.end();
++it)
{
const Upgrade& upgrade = it->second;
if (upgrade.fromVersion == 0) hasZeroVersionUpgrade = true;
if (maxCompatibleConfigVersionRange.currentVersion
< upgrade.toVersionRange.currentVersion)
{
maxCompatibleConfigVersionRange = upgrade.toVersionRange;
}
else if (maxCompatibleConfigVersionRange.currentVersion
== upgrade.toVersionRange.currentVersion)
{
// Make sure all max upgrade paths end up with same version and compatibility
fassert(16621, maxCompatibleConfigVersionRange == upgrade.toVersionRange);
}
}
// Make sure we have a zero-version upgrade
fassert(16622, hasZeroVersionUpgrade);
// Make sure our max registered range is the same as our constants
fassert(16623,
maxCompatibleConfigVersionRange
== VersionRange(MIN_COMPATIBLE_CONFIG_VERSION, CURRENT_CONFIG_VERSION));
}
//
// END CONFIG UPGRADE REGISTRATION
//
// Gets the config version information from the config server
Status getConfigVersion(const ConnectionString& configLoc, VersionType* versionInfo) {
try {
versionInfo->clear();
ScopedDbConnection conn(configLoc, 30);
scoped_ptr cursor(_safeCursor(conn->query("config.version",
BSONObj())));
bool hasConfigData = conn->count(ShardType::ConfigNS)
|| conn->count(DatabaseType::ConfigNS)
|| conn->count(CollectionType::ConfigNS);
if (!cursor->more()) {
// Version is 1 if we have data, 0 if we're completely empty
if (hasConfigData) {
versionInfo->setMinCompatibleVersion(UpgradeHistory_UnreportedVersion);
versionInfo->setCurrentVersion(UpgradeHistory_UnreportedVersion);
}
else {
versionInfo->setMinCompatibleVersion(UpgradeHistory_EmptyVersion);
versionInfo->setCurrentVersion(UpgradeHistory_EmptyVersion);
}
conn.done();
return Status::OK();
}
BSONObj versionDoc = cursor->next();
string errMsg;
if (!versionInfo->parseBSON(versionDoc, &errMsg) || !versionInfo->isValid(&errMsg)) {
conn.done();
return Status(ErrorCodes::UnsupportedFormat,
stream() << "invalid config version document " << versionDoc
<< causedBy(errMsg));
}
if (cursor->more()) {
conn.done();
return Status(ErrorCodes::RemoteValidationError,
stream() << "should only have 1 document "
<< "in config.version collection");
}
conn.done();
}
catch (const DBException& e) {
return e.toStatus();
}
return Status::OK();
}
// Checks version compatibility with our version
VersionStatus isConfigVersionCompatible(const VersionType& versionInfo, string* whyNot) {
string dummy;
if (!whyNot) whyNot = &dummy;
// Check if we're empty
if (versionInfo.getCurrentVersion() == UpgradeHistory_EmptyVersion) {
return VersionStatus_NeedUpgrade;
}
// Check that we aren't too old
if (CURRENT_CONFIG_VERSION < versionInfo.getMinCompatibleVersion()) {
*whyNot = stream() << "the config version " << CURRENT_CONFIG_VERSION
<< " of our process is too old "
<< "for the detected config version "
<< versionInfo.getMinCompatibleVersion();
return VersionStatus_Incompatible;
}
// Check that the mongo version of this process hasn't been excluded from the cluster
vector excludedRanges;
if (versionInfo.isExcludingMongoVersionsSet() &&
!MongoVersionRange::parseBSONArray(versionInfo.getExcludingMongoVersions(),
&excludedRanges,
whyNot))
{
*whyNot = stream() << "could not understand excluded version ranges"
<< causedBy(whyNot);
return VersionStatus_Incompatible;
}
// versionString is the global version of this process
if (isInMongoVersionRanges(versionString, excludedRanges)) {
// Cast needed here for MSVC compiler issue
*whyNot = stream() << "not compatible with current config version, version "
<< reinterpret_cast(versionString)
<< "has been excluded.";
return VersionStatus_Incompatible;
}
// Check if we need to upgrade
if (versionInfo.getCurrentVersion() >= CURRENT_CONFIG_VERSION) {
return VersionStatus_Compatible;
}
return VersionStatus_NeedUpgrade;
}
// Returns true if we can confirm the balancer is stopped
bool _isBalancerStopped(const ConnectionString& configLoc, string* errMsg) {
// Get the balancer information
BSONObj balancerDoc;
try {
ScopedDbConnection conn(configLoc, 30);
balancerDoc = conn->findOne(SettingsType::ConfigNS,
BSON(SettingsType::key("balancer")));
conn.done();
}
catch (const DBException& e) {
*errMsg = e.toString();
return false;
}
return balancerDoc[SettingsType::balancerStopped()].trueValue();
}
// Checks that all config servers are online
bool _checkConfigServersAlive(const ConnectionString& configLoc, string* errMsg) {
bool resultOk;
BSONObj result;
try {
ScopedDbConnection conn(configLoc, 30);
if (conn->type() == ConnectionString::SYNC) {
// TODO: Dynamic cast is bad, we need a better way of managing this op
// via the heirarchy (or not)
SyncClusterConnection* scc = dynamic_cast(conn.get());
fassert(16729, scc != NULL);
return scc->prepare(*errMsg);
}
else {
resultOk = conn->runCommand("admin", BSON( "fsync" << 1 ), result);
}
conn.done();
}
catch (const DBException& e) {
*errMsg = e.toString();
return false;
}
if (!resultOk) {
*errMsg = DBClientWithCommands::getLastErrorString(result);
return false;
}
return true;
}
// Dispatches upgrades based on version to the upgrades registered in the upgrade registry
bool _nextUpgrade(const ConnectionString& configLoc,
const ConfigUpgradeRegistry& registry,
const VersionType& lastVersionInfo,
VersionType* upgradedVersionInfo,
string* errMsg)
{
int fromVersion = lastVersionInfo.getCurrentVersion();
ConfigUpgradeRegistry::const_iterator foundIt = registry.find(fromVersion);
if (foundIt == registry.end()) {
*errMsg = stream() << "newer version " << CURRENT_CONFIG_VERSION
<< " of mongo config metadata is required, " << "current version is "
<< fromVersion << ", "
<< "don't know how to upgrade from this version";
return false;
}
const Upgrade& upgrade = foundIt->second;
int toVersion = upgrade.toVersionRange.currentVersion;
log() << "starting next upgrade step from v" << fromVersion << " to v" << toVersion << endl;
// Log begin to config.changelog
Status logStatus = logConfigChange(configLoc,
"",
VersionType::ConfigNS,
"starting upgrade of config database",
BSON("from" << fromVersion << "to" << toVersion));
if (!logStatus.isOK()) {
*errMsg = stream() << "could not write initial changelog entry for upgrade"
<< causedBy(logStatus);
return false;
}
if (!upgrade.upgradeCallback(configLoc, lastVersionInfo, errMsg)) {
*errMsg = stream() << "error upgrading config database from v" << fromVersion << " to v"
<< toVersion << causedBy(errMsg);
return false;
}
// Get the config version we've upgraded to and make sure it's sane
Status verifyConfigStatus = getConfigVersion(configLoc, upgradedVersionInfo);
if (!verifyConfigStatus.isOK()) {
*errMsg = stream() << "failed to validate v" << fromVersion << " config version upgrade"
<< causedBy(verifyConfigStatus);
return false;
}
// Log end to config.changelog
logStatus = logConfigChange(configLoc,
"",
VersionType::ConfigNS,
"finished upgrade of config database",
BSON("from" << fromVersion << "to" << toVersion));
if (!logStatus.isOK()) {
*errMsg = stream() << "could not write final changelog entry for upgrade"
<< causedBy(logStatus);
return false;
}
return true;
}
// Upgrades the config server
bool checkAndUpgradeConfigVersion(const ConnectionString& configLoc,
bool upgrade,
VersionType* initialVersionInfo,
VersionType* versionInfo,
string* errMsg)
{
string dummy;
if (!errMsg) errMsg = &dummy;
//
// Check compatibility of config version
//
Status getConfigStatus = getConfigVersion(configLoc, versionInfo);
if (!getConfigStatus.isOK()) {
*errMsg = stream() << "could not load config version for upgrade"
<< causedBy(getConfigStatus);
return false;
}
versionInfo->cloneTo(initialVersionInfo);
VersionStatus comp = isConfigVersionCompatible(*versionInfo, errMsg);
if (comp == VersionStatus_Incompatible) return false;
if (comp == VersionStatus_Compatible) return true;
verify(comp == VersionStatus_NeedUpgrade);
//
// Our current config version is now greater than the current version, so we should upgrade
// if possible.
//
// The first empty version is technically an upgrade, but has special semantics
bool isEmptyVersion = versionInfo->getCurrentVersion() == UpgradeHistory_EmptyVersion;
// First check for the upgrade flag (but no flag is needed if we're upgrading from empty)
if (!isEmptyVersion && !upgrade) {
*errMsg = stream() << "newer version " << CURRENT_CONFIG_VERSION
<< " of mongo config metadata is required, " << "current version is "
<< versionInfo->getCurrentVersion() << ", "
<< "need to run mongos with --upgrade";
return false;
}
// Contact the config servers to make sure all are online - otherwise we wait a long time
// for locks.
if (!_checkConfigServersAlive(configLoc, errMsg)) {
if (isEmptyVersion) {
*errMsg = stream() << "all config servers must be reachable for initial"
<< " config database creation" << causedBy(errMsg);
}
else {
*errMsg = stream() << "all config servers must be reachable for config upgrade"
<< causedBy(errMsg);
}
return false;
}
// Check whether or not the balancer is online, if it is online we will not upgrade
// (but we will initialize the config server)
if (!isEmptyVersion && !_isBalancerStopped(configLoc, errMsg)) {
*errMsg = stream() << "balancer must be stopped for config upgrade"
<< causedBy(errMsg);
return false;
}
//
// Acquire a lock for the upgrade process.
//
// We want to ensure that only a single mongo process is upgrading the config server at a
// time.
//
ScopedDistributedLock upgradeLock(configLoc, "configUpgrade");
upgradeLock.setLockMessage(stream() << "upgrading config database to new format v"
<< CURRENT_CONFIG_VERSION);
Status acquisitionStatus = upgradeLock.acquire(20 * 60 * 1000);
if (!acquisitionStatus.isOK()) {
*errMsg = acquisitionStatus.toString();
return false;
}
//
// Double-check compatibility inside the upgrade lock
// Another process may have won the lock earlier and done the upgrade for us, check
// if this is the case.
//
getConfigStatus = getConfigVersion(configLoc, versionInfo);
if (!getConfigStatus.isOK()) {
*errMsg = stream() << "could not reload config version for upgrade"
<< causedBy(getConfigStatus);
return false;
}
versionInfo->cloneTo(initialVersionInfo);
comp = isConfigVersionCompatible(*versionInfo, errMsg);
if (comp == VersionStatus_Incompatible) return false;
if (comp == VersionStatus_Compatible) return true;
verify(comp == VersionStatus_NeedUpgrade);
//
// Run through the upgrade steps necessary to bring our config version to the current
// version
//
log() << "starting upgrade of config server from v" << versionInfo->getCurrentVersion()
<< " to v" << CURRENT_CONFIG_VERSION << endl;
ConfigUpgradeRegistry registry(createRegistry());
while (versionInfo->getCurrentVersion() < CURRENT_CONFIG_VERSION) {
int fromVersion = versionInfo->getCurrentVersion();
//
// Run the next upgrade process and replace versionInfo with the result of the
// upgrade.
//
if (!_nextUpgrade(configLoc, registry, *versionInfo, versionInfo, errMsg)) {
return false;
}
// Ensure we're making progress here
if (versionInfo->getCurrentVersion() <= fromVersion) {
*errMsg = stream() << "bad v" << fromVersion << " config version upgrade, "
<< "version did not increment and is now "
<< versionInfo->getCurrentVersion();
return false;
}
}
verify(versionInfo->getCurrentVersion() == CURRENT_CONFIG_VERSION);
log() << "upgrade of config server to v" << versionInfo->getCurrentVersion()
<< " successful" << endl;
return true;
}
}