/** * Copyright (C) 2012 10gen Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ #include "mongo/s/config_upgrade_helpers.h" #include "mongo/client/connpool.h" #include "mongo/db/namespacestring.h" #include "mongo/s/cluster_client_internal.h" namespace mongo { using mongoutils::str::stream; Status checkIdsTheSame(const ConnectionString& configLoc, const string& nsA, const string& nsB) { scoped_ptr connPtr; auto_ptr cursor; try { connPtr.reset(ScopedDbConnection::getInternalScopedDbConnection(configLoc, 30)); ScopedDbConnection& conn = *connPtr; scoped_ptr cursorA(_safeCursor(conn->query(nsA, Query().sort(BSON("_id" << 1))))); scoped_ptr cursorB(_safeCursor(conn->query(nsB, Query().sort(BSON("_id" << 1))))); while (cursorA->more() && cursorB->more()) { BSONObj nextA = cursorA->nextSafe(); BSONObj nextB = cursorB->nextSafe(); if (nextA["_id"] != nextB["_id"]) { connPtr->done(); return Status(ErrorCodes::RemoteValidationError, stream() << "document " << nextA << " is not the same as " << nextB); } } if (cursorA->more() != cursorB->more()) { connPtr->done(); return Status(ErrorCodes::RemoteValidationError, stream() << "collection " << (cursorA->more() ? nsA : nsB) << " has more documents than " << (cursorA->more() ? nsB : nsA)); } } catch (const DBException& e) { return e.toStatus(); } connPtr->done(); return Status::OK(); } string _extractHashFor(const BSONObj& dbHashResult, const string& collName) { if (dbHashResult["collections"].type() != Object || dbHashResult["collections"].Obj()[collName].type() != String) { return ""; } return dbHashResult["collections"].Obj()[collName].String(); } Status checkHashesTheSame(const ConnectionString& configLoc, const string& nsA, const string& nsB) { // // Check the sizes first, b/c if one collection is empty the hash check will fail // scoped_ptr connPtr; unsigned long long countA; unsigned long long countB; try { connPtr.reset(ScopedDbConnection::getInternalScopedDbConnection(configLoc, 30)); ScopedDbConnection& conn = *connPtr; countA = conn->count(nsA, BSONObj()); countB = conn->count(nsB, BSONObj()); } catch (const DBException& e) { return e.toStatus(); } connPtr->done(); if (countA == 0 && countB == 0) { return Status::OK(); } else if (countA != countB) { return Status(ErrorCodes::RemoteValidationError, stream() << "collection " << nsA << " has " << countA << " documents but " << nsB << " has " << countB << "documents"); } verify(countA == countB); // // Find hash for nsA // bool resultOk; BSONObj result; NamespaceString nssA(nsA); try { connPtr.reset(ScopedDbConnection::getInternalScopedDbConnection(configLoc, 30)); ScopedDbConnection& conn = *connPtr; resultOk = conn->runCommand(nssA.db, BSON("dbHash" << true), result); } catch (const DBException& e) { return e.toStatus(); } connPtr->done(); if (!resultOk) { return Status(ErrorCodes::UnknownError, stream() << "could not run dbHash command on " << nssA.db << " db" << causedBy(result.toString())); } string hashResultA = _extractHashFor(result, nssA.coll); if (hashResultA == "") { return Status(ErrorCodes::RemoteValidationError, stream() << "could not find hash for collection " << nsA << " in " << result.toString()); } // // Find hash for nsB // NamespaceString nssB(nsB); try { connPtr.reset(ScopedDbConnection::getInternalScopedDbConnection(configLoc, 30)); ScopedDbConnection& conn = *connPtr; resultOk = conn->runCommand(nssB.db, BSON("dbHash" << true), result); } catch (const DBException& e) { return e.toStatus(); } connPtr->done(); if (!resultOk) { return Status(ErrorCodes::UnknownError, stream() << "could not run dbHash command on " << nssB.db << " db" << causedBy(result.toString())); } string hashResultB = _extractHashFor(result, nssB.coll); if (hashResultB == "") { return Status(ErrorCodes::RemoteValidationError, stream() << "could not find hash for collection " << nsB << " in " << result.toString()); } if (hashResultA != hashResultB) { return Status(ErrorCodes::RemoteValidationError, stream() << "collection hashes for collection " << nsA << " and " << nsB << " do not match"); } return Status::OK(); } Status copyFrozenCollection(const ConnectionString& configLoc, const string& fromNS, const string& toNS) { scoped_ptr connPtr; auto_ptr cursor; // Create new collection bool resultOk; BSONObj createResult; try { connPtr.reset(ScopedDbConnection::getInternalScopedDbConnection(configLoc, 30)); ScopedDbConnection& conn = *connPtr; resultOk = conn->createCollection(toNS, 0, false, 0, &createResult); } catch (const DBException& e) { return e.toStatus("could not create new collection"); } if (!resultOk) { return Status(ErrorCodes::UnknownError, stream() << DBClientWithCommands::getLastErrorString(createResult) << causedBy(createResult.toString())); } NamespaceString fromNSS(fromNS); NamespaceString toNSS(toNS); // Copy indexes over try { ScopedDbConnection& conn = *connPtr; verify(fromNSS.isValid()); string indexesNS = fromNSS.db + ".system.indexes"; scoped_ptr cursor(_safeCursor(conn->query(indexesNS, BSON("ns" << fromNS)))); while (cursor->more()) { BSONObj next = cursor->nextSafe(); BSONObjBuilder newIndexDesc; newIndexDesc.append("ns", toNS); newIndexDesc.appendElementsUnique(next); conn->insert(toNSS.db + ".system.indexes", newIndexDesc.done()); _checkGLE(conn); } } catch (const DBException& e) { return e.toStatus("could not create indexes in new collection"); } // Copy data over try { ScopedDbConnection& conn = *connPtr; scoped_ptr cursor(_safeCursor(conn->query(fromNS, BSONObj()))); while (cursor->more()) { BSONObj next = cursor->nextSafe(); conn->insert(toNS, next); _checkGLE(conn); } } catch (const DBException& e) { return e.toStatus("could not copy data into new collection"); } connPtr->done(); // Verify indices haven't changed Status indexStatus = checkIdsTheSame(configLoc, fromNSS.db + ".system.indexes", toNSS.db + ".system.indexes"); if (!indexStatus.isOK()) { return indexStatus; } // Verify data hasn't changed return checkHashesTheSame(configLoc, fromNS, toNS); } Status overwriteCollection(const ConnectionString& configLoc, const string& fromNS, const string& overwriteNS) { scoped_ptr connPtr; // TODO: Also a bit awkward to deal with command results bool resultOk; BSONObj renameResult; // Create new collection try { connPtr.reset(ScopedDbConnection::getInternalScopedDbConnection(configLoc, 30)); ScopedDbConnection& conn = *connPtr; BSONObjBuilder bob; bob.append("renameCollection", fromNS); bob.append("to", overwriteNS); bob.append("dropTarget", true); BSONObj renameCommand = bob.obj(); resultOk = conn->runCommand("admin", renameCommand, renameResult); } catch (const DBException& e) { return e.toStatus(); } connPtr->done(); if (!resultOk) { return Status(ErrorCodes::UnknownError, stream() << DBClientWithCommands::getLastErrorString(renameResult) << causedBy(renameResult.toString())); } return Status::OK(); } string genWorkingSuffix(const OID& lastUpgradeId) { return "-upgrade-" + lastUpgradeId.toString(); } string genBackupSuffix(const OID& lastUpgradeId) { return "-backup-" + lastUpgradeId.toString(); } }