// rename_collection.cpp /** * Copyright (C) 2013 10gen Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #include "mongo/client/dbclientcursor.h" #include "mongo/db/catalog/collection.h" #include "mongo/db/catalog/index_catalog.h" #include "mongo/db/commands.h" #include "mongo/db/commands/rename_collection.h" #include "mongo/db/dbhelpers.h" #include "mongo/db/index_builder.h" #include "mongo/db/index/index_descriptor.h" #include "mongo/db/instance.h" #include "mongo/db/namespace_string.h" #include "mongo/db/ops/insert.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/storage/mmap_v1/dur_transaction.h" namespace mongo { class CmdRenameCollection : public Command { public: CmdRenameCollection() : Command( "renameCollection" ) {} virtual bool adminOnly() const { return true; } virtual bool slaveOk() const { return false; } virtual bool isWriteCommandForConfigServer() const { return true; } virtual Status checkAuthForCommand(ClientBasic* client, const std::string& dbname, const BSONObj& cmdObj) { return rename_collection::checkAuthForRenameCollectionCommand(client, dbname, cmdObj); } virtual void help( stringstream &help ) const { help << " example: { renameCollection: foo.a, to: bar.b }"; } virtual std::vector stopIndexBuilds(Database* db, const BSONObj& cmdObj) { string source = cmdObj.getStringField( name.c_str() ); string target = cmdObj.getStringField( "to" ); IndexCatalog::IndexKillCriteria criteria; criteria.ns = source; std::vector prelim = IndexBuilder::killMatchingIndexBuilds(db->getCollection(source), criteria); std::vector indexes; for (int i = 0; i < static_cast(prelim.size()); i++) { // Change the ns BSONObj stripped = prelim[i].removeField("ns"); BSONObjBuilder builder; builder.appendElements(stripped); builder.append("ns", target); indexes.push_back(builder.obj()); } return indexes; } virtual void restoreIndexBuildsOnSource(std::vector indexesInProg, std::string source) { IndexBuilder::restoreIndexes( indexesInProg ); } virtual bool run(TransactionExperiment* txn, const string& dbname, BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl) { Lock::GlobalWrite globalWriteLock; bool ok = wrappedRun(txn, dbname, cmdObj, errmsg, result, fromRepl); if (ok && !fromRepl) logOp(txn, "c",(dbname + ".$cmd").c_str(), cmdObj); return ok; } virtual bool wrappedRun(TransactionExperiment* txn, const string& dbname, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) { string source = cmdObj.getStringField( name.c_str() ); string target = cmdObj.getStringField( "to" ); if ( !NamespaceString::validCollectionComponent(target.c_str()) ) { errmsg = "invalid collection name: " + target; return false; } if ( source.empty() || target.empty() ) { errmsg = "invalid command syntax"; return false; } if (!fromRepl) { // If it got through on the master, need to allow it here too Status sourceStatus = userAllowedWriteNS(source); if (!sourceStatus.isOK()) { errmsg = "error with source namespace: " + sourceStatus.reason(); return false; } Status targetStatus = userAllowedWriteNS(target); if (!targetStatus.isOK()) { errmsg = "error with target namespace: " + targetStatus.reason(); return false; } } string sourceDB = nsToDatabase(source); string targetDB = nsToDatabase(target); bool capped = false; long long size = 0; std::vector indexesInProg; { Client::Context srcCtx( source ); Collection* sourceColl = srcCtx.db()->getCollection( source ); if ( !sourceColl ) { errmsg = "source namespace does not exist"; return false; } // Ensure that collection name does not exceed maximum length. // Ensure that index names do not push the length over the max. // Iterator includes unfinished indexes. IndexCatalog::IndexIterator sourceIndIt = sourceColl->getIndexCatalog()->getIndexIterator( true ); int longestIndexNameLength = 0; while ( sourceIndIt.more() ) { int thisLength = sourceIndIt.next()->indexName().length(); if ( thisLength > longestIndexNameLength ) longestIndexNameLength = thisLength; } unsigned int longestAllowed = min(int(Namespace::MaxNsColletionLen), int(Namespace::MaxNsLen) - 2/*strlen(".$")*/ - longestIndexNameLength); if (target.size() > longestAllowed) { StringBuilder sb; sb << "collection name length of " << target.size() << " exceeds maximum length of " << longestAllowed << ", allowing for index names"; errmsg = sb.str(); return false; } { indexesInProg = stopIndexBuilds( srcCtx.db(), cmdObj ); capped = sourceColl->isCapped(); if ( capped ) { size = sourceColl->getRecordStore()->storageSize(); } } } { Client::Context ctx( target ); // Check if the target namespace exists and if dropTarget is true. // If target exists and dropTarget is not true, return false. if ( ctx.db()->getCollection( target ) ) { if ( !cmdObj["dropTarget"].trueValue() ) { errmsg = "target namespace exists"; return false; } Status s = ctx.db()->dropCollection( txn, target ); if ( !s.isOK() ) { errmsg = s.toString(); restoreIndexBuildsOnSource( indexesInProg, source ); return false; } } // If we are renaming in the same database, just // rename the namespace and we're done. if ( sourceDB == targetDB ) { Status s = ctx.db()->renameCollection( txn, source, target, cmdObj["stayTemp"].trueValue() ); if ( !s.isOK() ) { errmsg = s.toString(); restoreIndexBuildsOnSource( indexesInProg, source ); return false; } return true; } // Otherwise, we are enaming across databases, so we must copy all // the data and then remove the source collection. // Create the target collection. Collection* targetColl = NULL; if ( capped ) { CollectionOptions options; options.capped = true; options.cappedSize = size; options.setNoIdIndex(); targetColl = ctx.db()->createCollection( txn, target, options ); } else { CollectionOptions options; options.setNoIdIndex(); // No logOp necessary because the entire renameCollection command is one logOp. targetColl = ctx.db()->createCollection( txn, target, options ); } if ( !targetColl ) { errmsg = "Failed to create target collection."; restoreIndexBuildsOnSource( indexesInProg, source ); return false; } } // Copy over all the data from source collection to target collection. bool insertSuccessful = true; boost::scoped_ptr sourceIt; Collection* sourceColl = NULL; { Client::Context srcCtx( source ); sourceColl = srcCtx.db()->getCollection( source ); sourceIt.reset( sourceColl->getIterator( DiskLoc(), false, CollectionScanParams::FORWARD ) ); } Collection* targetColl = NULL; while ( !sourceIt->isEOF() ) { BSONObj o; { Client::Context srcCtx( source ); o = sourceColl->docFor(sourceIt->getNext()); } // Insert and check return status of insert. { Client::Context ctx( target ); if ( !targetColl ) targetColl = ctx.db()->getCollection( target ); // No logOp necessary because the entire renameCollection command is one logOp. Status s = targetColl->insertDocument( txn, o, true ).getStatus(); if ( !s.isOK() ) { insertSuccessful = false; errmsg = s.toString(); break; } } } // If inserts were unsuccessful, drop the target collection and return false. if ( !insertSuccessful ) { Client::Context ctx( target ); Status s = ctx.db()->dropCollection( txn, target ); if ( !s.isOK() ) errmsg = s.toString(); restoreIndexBuildsOnSource( indexesInProg, source ); return false; } // Copy over the indexes to temp storage and then to the target.. vector copiedIndexes; bool indexSuccessful = true; { Client::Context srcCtx( source ); IndexCatalog::IndexIterator sourceIndIt = sourceColl->getIndexCatalog()->getIndexIterator( true ); while ( sourceIndIt.more() ) { BSONObj currIndex = sourceIndIt.next()->infoObj(); // Process the source index. BSONObjBuilder b; BSONObjIterator i( currIndex ); while( i.moreWithEOO() ) { BSONElement e = i.next(); if ( e.eoo() ) break; else if ( strcmp( e.fieldName(), "ns" ) == 0 ) b.append( "ns", target ); else b.append( e ); } BSONObj newIndex = b.obj(); copiedIndexes.push_back( newIndex ); } } { Client::Context ctx( target ); if ( !targetColl ) targetColl = ctx.db()->getCollection( target ); for ( vector::iterator it = copiedIndexes.begin(); it != copiedIndexes.end(); ++it ) { Status s = targetColl->getIndexCatalog()->createIndex(txn, *it, true ); if ( !s.isOK() ) { indexSuccessful = false; errmsg = s.toString(); break; } } // If indexes were unsuccessful, drop the target collection and return false. if ( !indexSuccessful ) { Status s = ctx.db()->dropCollection( txn, target ); if ( !s.isOK() ) errmsg = s.toString(); restoreIndexBuildsOnSource( indexesInProg, source ); return false; } } // Drop the source collection. { Client::Context srcCtx( source ); Status s = srcCtx.db()->dropCollection( txn, source ); if ( !s.isOK() ) { errmsg = s.toString(); restoreIndexBuildsOnSource( indexesInProg, source ); return false; } } return true; } } cmdrenamecollection; }