/**
* Copyright (C) 2012 10gen Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects
* for all of the code used other than as permitted herein. If you modify
* file(s) with this exception, you may extend this exception to your
* version of the file(s), but you are not obligated to do so. If you do not
* wish to do so, delete this exception statement from your version. If you
* delete this exception statement from all source files in the program,
* then also delete it in the license file.
*/
#include "mongo/s/config_upgrade_helpers.h"
#include "mongo/client/connpool.h"
#include "mongo/db/field_parser.h"
#include "mongo/db/namespace_string.h"
#include "mongo/s/cluster_client_internal.h"
#include "mongo/s/type_config_version.h"
#include "mongo/util/timer.h"
namespace mongo {
using mongoutils::str::stream;
// Custom field used in upgrade state to determine if/where we failed on last upgrade
const BSONField inCriticalSectionField("inCriticalSection", false);
Status checkIdsTheSame(const ConnectionString& configLoc, const string& nsA, const string& nsB)
{
scoped_ptr connPtr;
auto_ptr cursor;
try {
connPtr.reset(new ScopedDbConnection(configLoc, 30));
ScopedDbConnection& conn = *connPtr;
scoped_ptr cursorA(_safeCursor(conn->query(nsA,
Query().sort(BSON("_id" << 1)))));
scoped_ptr cursorB(_safeCursor(conn->query(nsB,
Query().sort(BSON("_id" << 1)))));
while (cursorA->more() && cursorB->more()) {
BSONObj nextA = cursorA->nextSafe();
BSONObj nextB = cursorB->nextSafe();
if (nextA["_id"] != nextB["_id"]) {
connPtr->done();
return Status(ErrorCodes::RemoteValidationError,
stream() << "document " << nextA << " is not the same as "
<< nextB);
}
}
if (cursorA->more() != cursorB->more()) {
connPtr->done();
return Status(ErrorCodes::RemoteValidationError,
stream() << "collection " << (cursorA->more() ? nsA : nsB)
<< " has more documents than "
<< (cursorA->more() ? nsB : nsA));
}
}
catch (const DBException& e) {
return e.toStatus();
}
connPtr->done();
return Status::OK();
}
string _extractHashFor(const BSONObj& dbHashResult, const StringData& collName) {
if (dbHashResult["collections"].type() != Object
|| dbHashResult["collections"].Obj()[collName].type() != String)
{
return "";
}
return dbHashResult["collections"].Obj()[collName].String();
}
Status checkHashesTheSame(const ConnectionString& configLoc,
const string& nsA,
const string& nsB)
{
//
// Check the sizes first, b/c if one collection is empty the hash check will fail
//
unsigned long long countA;
unsigned long long countB;
try {
ScopedDbConnection conn(configLoc, 30);
countA = conn->count(nsA, BSONObj());
countB = conn->count(nsB, BSONObj());
conn.done();
}
catch (const DBException& e) {
return e.toStatus();
}
if (countA == 0 && countB == 0) {
return Status::OK();
}
else if (countA != countB) {
return Status(ErrorCodes::RemoteValidationError,
stream() << "collection " << nsA << " has " << countA << " documents but "
<< nsB << " has " << countB << "documents");
}
verify(countA == countB);
//
// Find hash for nsA
//
bool resultOk;
BSONObj result;
NamespaceString nssA(nsA);
try {
ScopedDbConnection conn(configLoc, 30);
resultOk = conn->runCommand(nssA.db().toString(), BSON("dbHash" << true), result);
conn.done();
}
catch (const DBException& e) {
return e.toStatus();
}
if (!resultOk) {
return Status(ErrorCodes::UnknownError,
stream() << "could not run dbHash command on " << nssA.db() << " db"
<< causedBy(result.toString()));
}
string hashResultA = _extractHashFor(result, nssA.coll());
if (hashResultA == "") {
return Status(ErrorCodes::RemoteValidationError,
stream() << "could not find hash for collection " << nsA << " in "
<< result.toString());
}
//
// Find hash for nsB
//
NamespaceString nssB(nsB);
try {
ScopedDbConnection conn(configLoc, 30);
resultOk = conn->runCommand(nssB.db().toString(), BSON("dbHash" << true), result);
conn.done();
}
catch (const DBException& e) {
return e.toStatus();
}
if (!resultOk) {
return Status(ErrorCodes::UnknownError,
stream() << "could not run dbHash command on " << nssB.db() << " db"
<< causedBy(result.toString()));
}
string hashResultB = _extractHashFor(result, nssB.coll());
if (hashResultB == "") {
return Status(ErrorCodes::RemoteValidationError,
stream() << "could not find hash for collection " << nsB << " in "
<< result.toString());
}
if (hashResultA != hashResultB) {
return Status(ErrorCodes::RemoteValidationError,
stream() << "collection hashes for collection " << nsA << " and " << nsB
<< " do not match");
}
return Status::OK();
}
Status copyFrozenCollection(const ConnectionString& configLoc,
const string& fromNS,
const string& toNS)
{
scoped_ptr connPtr;
auto_ptr cursor;
// Create new collection
bool resultOk;
BSONObj createResult;
try {
connPtr.reset(new ScopedDbConnection(configLoc, 30));
ScopedDbConnection& conn = *connPtr;
resultOk = conn->createCollection(toNS, 0, false, 0, &createResult);
}
catch (const DBException& e) {
return e.toStatus("could not create new collection");
}
if (!resultOk) {
return Status(ErrorCodes::UnknownError,
stream() << DBClientWithCommands::getLastErrorString(createResult)
<< causedBy(createResult.toString()));
}
NamespaceString fromNSS(fromNS);
NamespaceString toNSS(toNS);
// Copy indexes over
try {
ScopedDbConnection& conn = *connPtr;
verify(fromNSS.isValid());
// TODO: EnsureIndex at some point, if it becomes easier?
string indexesNS = fromNSS.db().toString() + ".system.indexes";
scoped_ptr cursor(_safeCursor(conn->query(indexesNS,
BSON("ns" << fromNS))));
while (cursor->more()) {
BSONObj next = cursor->nextSafe();
BSONObjBuilder newIndexDesc;
newIndexDesc.append("ns", toNS);
newIndexDesc.appendElementsUnique(next);
conn->insert(toNSS.db().toString() + ".system.indexes", newIndexDesc.done());
_checkGLE(conn);
}
}
catch (const DBException& e) {
return e.toStatus("could not create indexes in new collection");
}
//
// Copy data over in batches. A batch size here is way smaller than the maximum size of
// a bsonobj. We want to copy efficiently but we don't need to maximize the object size
// here.
//
Timer t;
int64_t docCount = 0;
const int32_t maxBatchSize = BSONObjMaxUserSize / 16;
try {
log() << "About to copy " << fromNS << " to " << toNS << endl;
// Lower the query's batchSize so that we incur in getMore()'s more frequently.
// The rationale here is that, if for some reason the config server is extremely
// slow, we wouldn't time this cursor out.
ScopedDbConnection& conn = *connPtr;
scoped_ptr cursor(_safeCursor(conn->query(fromNS,
BSONObj(),
0 /* nToReturn */,
0 /* nToSkip */,
NULL /* fieldsToReturn */,
0 /* queryOptions */,
1024 /* batchSize */)));
vector insertBatch;
int32_t insertSize = 0;
while (cursor->more()) {
BSONObj next = cursor->nextSafe().getOwned();
++docCount;
insertBatch.push_back(next);
insertSize += next.objsize();
if (insertSize > maxBatchSize ) {
conn->insert(toNS, insertBatch);
_checkGLE(conn);
insertBatch.clear();
insertSize = 0;
}
if (t.seconds() >= 10) {
t.reset();
log() << "Copied " << docCount << " documents so far from "
<< fromNS << " to " << toNS << endl;
}
}
if (!insertBatch.empty()) {
conn->insert(toNS, insertBatch);
_checkGLE(conn);
}
log() << "Finished copying " << docCount << " documents from "
<< fromNS << " to " << toNS << endl;
}
catch (const DBException& e) {
return e.toStatus("could not copy data into new collection");
}
connPtr->done();
// Verify indices haven't changed
Status indexStatus = checkIdsTheSame(configLoc,
fromNSS.db().toString() + ".system.indexes",
toNSS.db().toString() + ".system.indexes");
if (!indexStatus.isOK()) {
return indexStatus;
}
// Verify data hasn't changed
return checkHashesTheSame(configLoc, fromNS, toNS);
}
Status overwriteCollection(const ConnectionString& configLoc,
const string& fromNS,
const string& overwriteNS)
{
// TODO: Also a bit awkward to deal with command results
bool resultOk;
BSONObj renameResult;
// Create new collection
try {
ScopedDbConnection conn(configLoc, 30);
BSONObjBuilder bob;
bob.append("renameCollection", fromNS);
bob.append("to", overwriteNS);
bob.append("dropTarget", true);
BSONObj renameCommand = bob.obj();
resultOk = conn->runCommand("admin", renameCommand, renameResult);
conn.done();
}
catch (const DBException& e) {
return e.toStatus();
}
if (!resultOk) {
return Status(ErrorCodes::UnknownError,
stream() << DBClientWithCommands::getLastErrorString(renameResult)
<< causedBy(renameResult.toString()));
}
return Status::OK();
}
string genWorkingSuffix(const OID& lastUpgradeId) {
return "-upgrade-" + lastUpgradeId.toString();
}
string genBackupSuffix(const OID& lastUpgradeId) {
return "-backup-" + lastUpgradeId.toString();
}
Status preUpgradeCheck(const ConnectionString& configServer,
const VersionType& lastVersionInfo,
string minMongosVersion) {
if (lastVersionInfo.isUpgradeIdSet() && lastVersionInfo.getUpgradeId().isSet()) {
//
// Another upgrade failed, so cleanup may be necessary
//
BSONObj lastUpgradeState = lastVersionInfo.getUpgradeState();
bool inCriticalSection;
string errMsg;
if (!FieldParser::extract(lastUpgradeState,
inCriticalSectionField,
&inCriticalSection,
&errMsg)) {
return Status(ErrorCodes::FailedToParse, causedBy(errMsg));
}
if (inCriticalSection) {
// Note: custom message must be supplied by caller
return Status(ErrorCodes::ManualInterventionRequired, "");
}
}
//
// Check the versions of other mongo processes in the cluster before upgrade.
// We can't upgrade if there are active pre-v2.4 processes in the cluster
//
return checkClusterMongoVersions(configServer, string(minMongosVersion));
}
Status startConfigUpgrade(const string& configServer,
int currentVersion,
const OID& upgradeID) {
BSONObjBuilder setUpgradeIdObj;
setUpgradeIdObj << VersionType::upgradeId(upgradeID);
setUpgradeIdObj << VersionType::upgradeState(BSONObj());
try {
ScopedDbConnection conn(configServer, 30);
conn->update(VersionType::ConfigNS,
BSON("_id" << 1 << VersionType::currentVersion(currentVersion)),
BSON("$set" << setUpgradeIdObj.done()));
_checkGLE(conn);
conn.done();
}
catch (const DBException& e) {
return e.toStatus("could not initialize version info for upgrade");
}
return Status::OK();
}
Status enterConfigUpgradeCriticalSection(const string& configServer, int currentVersion) {
BSONObjBuilder setUpgradeStateObj;
setUpgradeStateObj.append(VersionType::upgradeState(), BSON(inCriticalSectionField(true)));
try {
ScopedDbConnection conn(configServer, 30);
conn->update(VersionType::ConfigNS,
BSON("_id" << 1 << VersionType::currentVersion(currentVersion)),
BSON("$set" << setUpgradeStateObj.done()));
_checkGLE(conn);
conn.done();
}
catch (const DBException& e) {
// No cleanup message here since we're not sure if we wrote or not, and
// not dangerous either way except to prevent further updates (at which point
// the message is printed)
return e.toStatus("could not update version info to enter critical update section");
}
log() << "entered critical section for config upgrade" << endl;
return Status::OK();
}
Status commitConfigUpgrade(const string& configServer,
int currentVersion,
int minCompatibleVersion,
int newVersion) {
// Note: DO NOT CLEAR the config version unless bumping the minCompatibleVersion,
// we want to save the excludes that were set.
BSONObjBuilder setObj;
setObj << VersionType::minCompatibleVersion(minCompatibleVersion);
setObj << VersionType::version_DEPRECATED(minCompatibleVersion);
setObj << VersionType::currentVersion(newVersion);
BSONObjBuilder unsetObj;
unsetObj.append(VersionType::upgradeId(), 1);
unsetObj.append(VersionType::upgradeState(), 1);
try {
ScopedDbConnection conn(configServer, 30);
conn->update(VersionType::ConfigNS,
BSON("_id" << 1 << VersionType::currentVersion(currentVersion)),
BSON("$set" << setObj.done() << "$unset" << unsetObj.done()));
_checkGLE(conn);
conn.done();
}
catch (const DBException& e) {
return e.toStatus("could not write new version info and "
"exit critical upgrade section");
}
return Status::OK();
}
}