// cloner.cpp - copy a database (export/import basically) /** * Copyright (C) 2008 10gen Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #include "mongo/platform/basic.h" #include "mongo/base/init.h" #include "mongo/base/status.h" #include "mongo/bson/util/builder.h" #include "mongo/client/dbclientinterface.h" #include "mongo/db/auth/action_set.h" #include "mongo/db/auth/resource_pattern.h" #include "mongo/db/auth/authorization_session.h" #include "mongo/db/catalog/collection.h" #include "mongo/db/cloner.h" #include "mongo/db/commands.h" #include "mongo/db/commands/copydb.h" #include "mongo/db/commands/rename_collection.h" #include "mongo/db/dbhelpers.h" #include "mongo/db/index_builder.h" #include "mongo/db/instance.h" #include "mongo/db/jsobj.h" #include "mongo/db/namespace_string.h" #include "mongo/db/repl/isself.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/oplogreader.h" #include "mongo/db/storage_options.h" #include "mongo/util/log.h" namespace mongo { MONGO_LOG_DEFAULT_COMPONENT_FILE(::mongo::logger::LogComponent::kStorage); BSONElement getErrField(const BSONObj& o); /* for index info object: { "name" : "name_1" , "ns" : "foo.index3" , "key" : { "name" : 1.0 } } we need to fix up the value in the "ns" parameter so that the name prefix is correct on a copy to a new name. */ BSONObj fixindex(const string& newDbName, BSONObj o) { BSONObjBuilder b; BSONObjIterator i(o); while ( i.moreWithEOO() ) { BSONElement e = i.next(); if ( e.eoo() ) break; // for now, skip the "v" field so that v:0 indexes will be upgraded to v:1 if ( string("v") == e.fieldName() ) { continue; } if ( string("ns") == e.fieldName() ) { uassert( 10024 , "bad ns field for index during dbcopy", e.type() == String); const char *p = strchr(e.valuestr(), '.'); uassert( 10025 , "bad ns field for index during dbcopy [2]", p); string newname = newDbName + p; b.append("ns", newname); } else { b.append(e); } } BSONObj res= b.obj(); return res; } Cloner::Cloner() { } struct Cloner::Fun { Fun(OperationContext* txn, const string& dbName) :lastLog(0), txn(txn), _dbName(dbName) {} void operator()( DBClientCursorBatchIterator &i ) { invariant(from_collection.coll() != "system.indexes"); // XXX: can probably take dblock instead Lock::GlobalWrite lk(txn->lockState()); // Make sure database still exists after we resume from the temp release bool unused; Database* db = dbHolder().getOrCreate(txn, _dbName, unused); bool createdCollection = false; Collection* collection = NULL; collection = db->getCollection( txn, to_collection ); if ( !collection ) { massert( 17321, str::stream() << "collection dropped during clone [" << to_collection.ns() << "]", !createdCollection ); WriteUnitOfWork wunit(txn->recoveryUnit()); createdCollection = true; collection = db->createCollection( txn, to_collection.ns() ); verify( collection ); wunit.commit(); } while( i.moreInCurrentBatch() ) { if ( numSeen % 128 == 127 ) { time_t now = time(0); if( now - lastLog >= 60 ) { // report progress if( lastLog ) log() << "clone " << to_collection << ' ' << numSeen << endl; lastLog = now; } } BSONObj tmp = i.nextSafe(); /* assure object is valid. note this will slow us down a little. */ const Status status = validateBSON(tmp.objdata(), tmp.objsize()); if (!status.isOK()) { log() << "Cloner: skipping corrupt object from " << from_collection << ": " << status.reason(); continue; } ++numSeen; WriteUnitOfWork wunit(txn->recoveryUnit()); BSONObj js = tmp; StatusWith loc = collection->insertDocument( txn, js, true ); if ( !loc.isOK() ) { error() << "error: exception cloning object in " << from_collection << ' ' << loc.toString() << " obj:" << js; } uassertStatusOK( loc.getStatus() ); if (logForRepl) repl::logOp(txn, "i", to_collection.ns().c_str(), js); wunit.commit(); txn->recoveryUnit()->commitIfNeeded(); RARELY if ( time( 0 ) - saveLast > 60 ) { log() << numSeen << " objects cloned so far from collection " << from_collection; saveLast = time( 0 ); } } } time_t lastLog; OperationContext* txn; const string _dbName; int64_t numSeen; NamespaceString from_collection; NamespaceString to_collection; time_t saveLast; list *indexesToBuild; // deferred query results (e.g. index insert/build) bool logForRepl; bool _mayYield; bool _mayBeInterrupted; }; /* copy the specified collection isindex - if true, this is system.indexes collection, in which we do some transformation when copying. */ void Cloner::copy(OperationContext* txn, const string& toDBName, const NamespaceString& from_collection, const NamespaceString& to_collection, bool logForRepl, bool masterSameProcess, bool slaveOk, bool mayYield, bool mayBeInterrupted, Query query) { list indexesToBuild; LOG(2) << "\t\tcloning collection " << from_collection << " to " << to_collection << " on " << _conn->getServerAddress() << " with filter " << query.toString() << endl; Fun f(txn, toDBName); f.numSeen = 0; f.from_collection = from_collection; f.to_collection = to_collection; f.saveLast = time( 0 ); f.indexesToBuild = &indexesToBuild; f.logForRepl = logForRepl; f._mayYield = mayYield; f._mayBeInterrupted = mayBeInterrupted; int options = QueryOption_NoCursorTimeout | ( slaveOk ? QueryOption_SlaveOk : 0 ); { Lock::TempRelease tempRelease(txn->lockState()); _conn->query(stdx::function(f), from_collection, query, 0, options); } // We are under lock here again, so reload the database in case it may have disappeared // during the temp release bool unused; Database* db = dbHolder().getOrCreate(txn, toDBName, unused); if ( indexesToBuild.size() ) { for (list::const_iterator i = indexesToBuild.begin(); i != indexesToBuild.end(); ++i) { BSONObj spec = *i; string ns = spec["ns"].String(); // this was fixed when pulled off network Collection* collection = db->getCollection( txn, ns ); if ( !collection ) { collection = db->createCollection( txn, ns ); verify( collection ); } Status status = collection->getIndexCatalog()->createIndex(txn, spec, mayBeInterrupted); if ( status.code() == ErrorCodes::IndexAlreadyExists ) { // no-op } else if ( !status.isOK() ) { error() << "error creating index when cloning spec: " << spec << " error: " << status.toString(); uassertStatusOK( status ); } if (logForRepl) repl::logOp(txn, "i", to_collection.ns().c_str(), spec); txn->recoveryUnit()->commitIfNeeded(); } } } void Cloner::copyIndexes(OperationContext* txn, const string& toDBName, const NamespaceString& from_collection, const NamespaceString& to_collection, bool logForRepl, bool masterSameProcess, bool slaveOk, bool mayYield, bool mayBeInterrupted) { LOG(2) << "\t\t copyIndexes " << from_collection << " to " << to_collection << " on " << _conn->getServerAddress(); list indexesToBuild; { Lock::TempRelease tempRelease(txn->lockState()); indexesToBuild = _conn->getIndexSpecs( from_collection, slaveOk ? QueryOption_SlaveOk : 0 ); } // We are under lock here again, so reload the database in case it may have disappeared // during the temp release bool unused; Database* db = dbHolder().getOrCreate(txn, toDBName, unused); if ( indexesToBuild.size() ) { for (list::const_iterator i = indexesToBuild.begin(); i != indexesToBuild.end(); ++i) { BSONObj spec = fixindex( to_collection.db().toString(), *i ); string ns = spec["ns"].String(); Collection* collection = db->getCollection( txn, ns ); if ( !collection ) { collection = db->createCollection( txn, ns ); verify( collection ); } Status status = collection->getIndexCatalog()->createIndex(txn, spec, mayBeInterrupted); if ( status.code() == ErrorCodes::IndexAlreadyExists ) { // no-op } else if ( !status.isOK() ) { error() << "error creating index when cloning spec: " << spec << " error: " << status.toString(); uassertStatusOK( status ); } if (logForRepl) repl::logOp(txn, "i", to_collection.ns().c_str(), spec); txn->recoveryUnit()->commitIfNeeded(); } } } /** * validate the cloner query was successful * @param cur Cursor the query was executed on * @param errCode out Error code encountered during the query * @param errmsg out Error message encountered during the query */ bool validateQueryResults(const auto_ptr& cur, int32_t* errCode, string& errmsg) { if ( cur.get() == 0 ) return false; if ( cur->more() ) { BSONObj first = cur->next(); BSONElement errField = getErrField(first); if(!errField.eoo()) { errmsg = errField.str(); if (errCode) *errCode = first.getIntField("code"); return false; } cur->putBack(first); } return true; } bool Cloner::copyCollection(OperationContext* txn, const string& ns, const BSONObj& query, string& errmsg, bool mayYield, bool mayBeInterrupted, bool shouldCopyIndexes, bool logForRepl) { const NamespaceString nss(ns); Lock::DBWrite dbWrite(txn->lockState(), nss.db()); WriteUnitOfWork wunit(txn->recoveryUnit()); const string dbName = nss.db().toString(); bool unused; Database* db = dbHolder().getOrCreate(txn, dbName, unused); // config string temp = dbName + ".system.namespaces"; BSONObj config = _conn->findOne(temp , BSON("name" << ns)); if (config["options"].isABSONObj()) { Status status = userCreateNS(txn, db, ns, config["options"].Obj(), logForRepl, 0); if ( !status.isOK() ) { errmsg = status.toString(); return false; } } // main data copy(txn, dbName, nss, nss, logForRepl, false, true, mayYield, mayBeInterrupted, Query(query).snapshot()); /* TODO : copyIndexes bool does not seem to be implemented! */ if(!shouldCopyIndexes) { log() << "ERROR copy collection shouldCopyIndexes not implemented? " << ns << endl; } // indexes copyIndexes(txn, dbName, NamespaceString(ns), NamespaceString(ns), logForRepl, false, true, mayYield, mayBeInterrupted); wunit.commit(); txn->recoveryUnit()->commitIfNeeded(); return true; } extern bool inDBRepair; bool Cloner::go(OperationContext* txn, const std::string& toDBName, const string& masterHost, const CloneOptions& opts, set* clonedColls, string& errmsg, int* errCode) { if ( errCode ) { *errCode = 0; } massert( 10289 , "useReplAuth is not written to replication log", !opts.useReplAuth || !opts.logForRepl ); const ConnectionString cs = ConnectionString::parse(masterHost, errmsg); if (!cs.isValid()) { if (errCode) *errCode = ErrorCodes::FailedToParse; return false; } bool masterSameProcess = false; std::vector csServers = cs.getServers(); for (std::vector::const_iterator iter = csServers.begin(); iter != csServers.end(); ++iter) { #if !defined(_WIN32) && !defined(__sunos__) // isSelf() only does the necessary comparisons on os x and linux (SERVER-14165) if (!repl::isSelf(*iter)) continue; #else if (iter->port() != serverGlobalParams.port) continue; if (iter->host() != "localhost" && iter->host() != "127.0.0.1") continue; #endif masterSameProcess = true; break; } if (masterSameProcess) { if (opts.fromDB == toDBName) { // guard against an "infinite" loop /* if you are replicating, the local.sources config may be wrong if you get this */ errmsg = "can't clone from self (localhost)."; return false; } } { // setup connection if (_conn.get()) { // nothing to do } else if ( !masterSameProcess ) { auto_ptr con( cs.connect( errmsg )); if ( !con.get() ) return false; if (!repl::replAuthenticate(con.get())) return false; _conn = con; } else { _conn.reset(new DBDirectClient(txn)); } } list toClone; if ( clonedColls ) clonedColls->clear(); { /* todo: we can put these releases inside dbclient or a dbclient specialization. or just wait until we get rid of global lock anyway. */ Lock::TempRelease tempRelease(txn->lockState()); list raw = _conn->getCollectionInfos( opts.fromDB ); for ( list::iterator it = raw.begin(); it != raw.end(); ++it ) { BSONObj collection = *it; LOG(2) << "\t cloner got " << collection << endl; BSONElement collectionOptions = collection["options"]; if ( collectionOptions.isABSONObj() ) { Status parseOptionsStatus = CollectionOptions().parse(collectionOptions.Obj()); if ( !parseOptionsStatus.isOK() ) { errmsg = str::stream() << "invalid collection options: " << collection << ", reason: " << parseOptionsStatus.reason(); return false; } } BSONElement e = collection.getField("name"); if ( e.eoo() ) { string s = "bad collection object " + collection.toString(); massert( 10290 , s.c_str(), false); } verify( !e.eoo() ); verify( e.type() == String ); NamespaceString ns( opts.fromDB, e.valuestr() ); if( ns.isSystem() ) { /* system.users and s.js is cloned -- but nothing else from system. * system.indexes is handled specially at the end*/ if( legalClientSystemNS( ns.ns() , true ) == 0 ) { LOG(2) << "\t\t not cloning because system collection" << endl; continue; } } if( !ns.isNormal() ) { LOG(2) << "\t\t not cloning because has $ "; continue; } if( opts.collsToIgnore.find( ns.ns() ) != opts.collsToIgnore.end() ){ LOG(2) << "\t\t ignoring collection " << ns; continue; } else { LOG(2) << "\t\t not ignoring collection " << ns; } if ( clonedColls ) clonedColls->insert( ns.ns() ); toClone.push_back( collection.getOwned() ); } } if ( opts.syncData ) { for ( list::iterator i=toClone.begin(); i != toClone.end(); i++ ) { BSONObj collection = *i; LOG(2) << " really will clone: " << collection << endl; const char* collectionName = collection["name"].valuestr(); BSONObj options = collection.getObjectField("options"); NamespaceString from_name( opts.fromDB, collectionName ); NamespaceString to_name( toDBName, collectionName ); // Copy releases the lock, so we need to re-load the database. This should probably // throw if the database has changed in between, but for now preserve the existing // behaviour. bool unused; Database* db = dbHolder().getOrCreate(txn, toDBName, unused); /* we defer building id index for performance - building it in batch is much faster */ Status createStatus = userCreateNS( txn, db, to_name.ns(), options, opts.logForRepl, false ); if ( !createStatus.isOK() ) { errmsg = str::stream() << "failed to create collection \"" << to_name.ns() << "\": " << createStatus.reason(); return false; } LOG(1) << "\t\t cloning " << from_name << " -> " << to_name << endl; Query q; if( opts.snapshot ) q.snapshot(); copy(txn, toDBName, from_name, to_name, opts.logForRepl, masterSameProcess, opts.slaveOk, opts.mayYield, opts.mayBeInterrupted, q); { /* we need dropDups to be true as we didn't do a true snapshot and this is before applying oplog operations that occur during the initial sync. inDBRepair makes dropDups be true. */ bool old = inDBRepair; try { inDBRepair = true; Collection* c = db->getCollection( txn, to_name ); if ( c ) c->getIndexCatalog()->ensureHaveIdIndex(txn); inDBRepair = old; } catch(...) { inDBRepair = old; throw; } } } } // now build the indexes if ( opts.syncIndexes ) { for ( list::iterator i=toClone.begin(); i != toClone.end(); i++ ) { BSONObj collection = *i; log() << "copying indexes for: " << collection; const char* collectionName = collection["name"].valuestr(); NamespaceString from_name( opts.fromDB, collectionName ); NamespaceString to_name( toDBName, collectionName ); copyIndexes(txn, toDBName, from_name, to_name, opts.logForRepl, masterSameProcess, opts.slaveOk, opts.mayYield, opts.mayBeInterrupted ); } } return true; } } // namespace mongo