diff options
Diffstat (limited to 'src/mongo/db/cloner.cpp')
-rw-r--r-- | src/mongo/db/cloner.cpp | 144 |
1 files changed, 73 insertions, 71 deletions
diff --git a/src/mongo/db/cloner.cpp b/src/mongo/db/cloner.cpp index 6aa2f1cdb33..439d3ac574c 100644 --- a/src/mongo/db/cloner.cpp +++ b/src/mongo/db/cloner.cpp @@ -137,24 +137,25 @@ BSONObj Cloner::getIdIndexSpec(const std::list<BSONObj>& indexSpecs) { Cloner::Cloner() {} struct Cloner::Fun { - Fun(OperationContext* txn, const string& dbName) : lastLog(0), txn(txn), _dbName(dbName) {} + Fun(OperationContext* opCtx, const string& dbName) + : lastLog(0), opCtx(opCtx), _dbName(dbName) {} void operator()(DBClientCursorBatchIterator& i) { invariant(from_collection.coll() != "system.indexes"); // XXX: can probably take dblock instead - unique_ptr<ScopedTransaction> scopedXact(new ScopedTransaction(txn, MODE_X)); - unique_ptr<Lock::GlobalWrite> globalWriteLock(new Lock::GlobalWrite(txn->lockState())); + unique_ptr<ScopedTransaction> scopedXact(new ScopedTransaction(opCtx, MODE_X)); + unique_ptr<Lock::GlobalWrite> globalWriteLock(new Lock::GlobalWrite(opCtx->lockState())); uassert( ErrorCodes::NotMaster, str::stream() << "Not primary while cloning collection " << from_collection.ns() << " to " << to_collection.ns(), - !txn->writesAreReplicated() || - repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(txn, to_collection)); + !opCtx->writesAreReplicated() || + repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(opCtx, to_collection)); // Make sure database still exists after we resume from the temp release - Database* db = dbHolder().openDb(txn, _dbName); + Database* db = dbHolder().openDb(opCtx, _dbName); bool createdCollection = false; Collection* collection = NULL; @@ -166,10 +167,10 @@ struct Cloner::Fun { << "]", !createdCollection); MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { - txn->checkForInterrupt(); + opCtx->checkForInterrupt(); - WriteUnitOfWork wunit(txn); - Status s = userCreateNS(txn, + WriteUnitOfWork wunit(opCtx); + Status s = userCreateNS(opCtx, db, to_collection.toString(), from_options, @@ -179,7 +180,7 @@ struct Cloner::Fun { wunit.commit(); collection = db->getCollection(to_collection); } - MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "createCollection", to_collection.ns()); + MONGO_WRITE_CONFLICT_RETRY_LOOP_END(opCtx, "createCollection", to_collection.ns()); } const bool isSystemViewsClone = to_collection.isSystemDotViews(); @@ -193,27 +194,27 @@ struct Cloner::Fun { log() << "clone " << to_collection << ' ' << numSeen; lastLog = now; } - txn->checkForInterrupt(); + opCtx->checkForInterrupt(); scopedXact.reset(); globalWriteLock.reset(); - CurOp::get(txn)->yielded(); + CurOp::get(opCtx)->yielded(); - scopedXact.reset(new ScopedTransaction(txn, MODE_X)); - globalWriteLock.reset(new Lock::GlobalWrite(txn->lockState())); + scopedXact.reset(new ScopedTransaction(opCtx, MODE_X)); + globalWriteLock.reset(new Lock::GlobalWrite(opCtx->lockState())); // Check if everything is still all right. - if (txn->writesAreReplicated()) { + if (opCtx->writesAreReplicated()) { uassert(28592, str::stream() << "Cannot write to ns: " << to_collection.ns() << " after yielding", repl::getGlobalReplicationCoordinator()->canAcceptWritesFor( - txn, to_collection)); + opCtx, to_collection)); } // TODO: SERVER-16598 abort if original db or collection is gone. - db = dbHolder().get(txn, _dbName); + db = dbHolder().get(opCtx, _dbName); uassert(28593, str::stream() << "Database " << _dbName << " dropped while cloning", db != NULL); @@ -262,13 +263,13 @@ struct Cloner::Fun { verify(collection); ++numSeen; MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { - txn->checkForInterrupt(); + opCtx->checkForInterrupt(); - WriteUnitOfWork wunit(txn); + WriteUnitOfWork wunit(opCtx); BSONObj doc = tmp; OpDebug* const nullOpDebug = nullptr; - Status status = collection->insertDocument(txn, doc, nullOpDebug, true); + Status status = collection->insertDocument(opCtx, doc, nullOpDebug, true); if (!status.isOK() && status.code() != ErrorCodes::DuplicateKey) { error() << "error: exception cloning object in " << from_collection << ' ' << redact(status) << " obj:" << redact(doc); @@ -278,7 +279,7 @@ struct Cloner::Fun { wunit.commit(); } } - MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "cloner insert", to_collection.ns()); + MONGO_WRITE_CONFLICT_RETRY_LOOP_END(opCtx, "cloner insert", to_collection.ns()); RARELY if (time(0) - saveLast > 60) { log() << numSeen << " objects cloned so far from collection " << from_collection; saveLast = time(0); @@ -299,7 +300,7 @@ struct Cloner::Fun { } time_t lastLog; - OperationContext* txn; + OperationContext* opCtx; const string _dbName; int64_t numSeen; @@ -313,7 +314,7 @@ struct Cloner::Fun { /* copy the specified collection */ -void Cloner::copy(OperationContext* txn, +void Cloner::copy(OperationContext* opCtx, const string& toDBName, const NamespaceString& from_collection, const BSONObj& from_opts, @@ -324,7 +325,7 @@ void Cloner::copy(OperationContext* txn, LOG(2) << "\t\tcloning collection " << from_collection << " to " << to_collection << " on " << _conn->getServerAddress() << " with filter " << redact(query.toString()); - Fun f(txn, toDBName); + Fun f(opCtx, toDBName); f.numSeen = 0; f.from_collection = from_collection; f.from_options = from_opts; @@ -335,7 +336,7 @@ void Cloner::copy(OperationContext* txn, int options = QueryOption_NoCursorTimeout | (opts.slaveOk ? QueryOption_SlaveOk : 0); { - Lock::TempRelease tempRelease(txn->lockState()); + Lock::TempRelease tempRelease(opCtx->lockState()); _conn->query(stdx::function<void(DBClientCursorBatchIterator&)>(f), from_collection.ns(), query, @@ -349,11 +350,11 @@ void Cloner::copy(OperationContext* txn, << to_collection.ns() << " with filter " << query.toString(), - !txn->writesAreReplicated() || - repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(txn, to_collection)); + !opCtx->writesAreReplicated() || + repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(opCtx, to_collection)); } -void Cloner::copyIndexes(OperationContext* txn, +void Cloner::copyIndexes(OperationContext* opCtx, const string& toDBName, const NamespaceString& from_collection, const BSONObj& from_opts, @@ -372,8 +373,8 @@ void Cloner::copyIndexes(OperationContext* txn, << " to " << to_collection.ns() << " (Cloner)", - !txn->writesAreReplicated() || - repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(txn, to_collection)); + !opCtx->writesAreReplicated() || + repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(opCtx, to_collection)); if (indexesToBuild.empty()) @@ -381,16 +382,16 @@ void Cloner::copyIndexes(OperationContext* txn, // We are under lock here again, so reload the database in case it may have disappeared // during the temp release - Database* db = dbHolder().openDb(txn, toDBName); + Database* db = dbHolder().openDb(opCtx, toDBName); Collection* collection = db->getCollection(to_collection); if (!collection) { MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { - txn->checkForInterrupt(); + opCtx->checkForInterrupt(); - WriteUnitOfWork wunit(txn); + WriteUnitOfWork wunit(opCtx); Status s = userCreateNS( - txn, + opCtx, db, to_collection.toString(), from_opts, @@ -401,7 +402,7 @@ void Cloner::copyIndexes(OperationContext* txn, invariant(collection); wunit.commit(); } - MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "createCollection", to_collection.ns()); + MONGO_WRITE_CONFLICT_RETRY_LOOP_END(opCtx, "createCollection", to_collection.ns()); } // TODO pass the MultiIndexBlock when inserting into the collection rather than building the @@ -409,7 +410,7 @@ void Cloner::copyIndexes(OperationContext* txn, // from creation to completion without yielding to ensure the index and the collection // matches. It also wouldn't work on non-empty collections so we would need both // implementations anyway as long as that is supported. - MultiIndexBlock indexer(txn, collection); + MultiIndexBlock indexer(opCtx, collection); indexer.allowInterruption(); indexer.removeExistingIndexes(&indexesToBuild); @@ -419,20 +420,20 @@ void Cloner::copyIndexes(OperationContext* txn, auto indexInfoObjs = uassertStatusOK(indexer.init(indexesToBuild)); uassertStatusOK(indexer.insertAllDocumentsInCollection()); - WriteUnitOfWork wunit(txn); + WriteUnitOfWork wunit(opCtx); indexer.commit(); - if (txn->writesAreReplicated()) { + if (opCtx->writesAreReplicated()) { const string targetSystemIndexesCollectionName = to_collection.getSystemIndexesCollection(); const char* createIndexNs = targetSystemIndexesCollectionName.c_str(); for (auto&& infoObj : indexInfoObjs) { getGlobalServiceContext()->getOpObserver()->onCreateIndex( - txn, createIndexNs, infoObj, false); + opCtx, createIndexNs, infoObj, false); } } wunit.commit(); } -bool Cloner::copyCollection(OperationContext* txn, +bool Cloner::copyCollection(OperationContext* opCtx, const string& ns, const BSONObj& query, string& errmsg, @@ -474,22 +475,22 @@ bool Cloner::copyCollection(OperationContext* txn, auto sourceIndexes = _conn->getIndexSpecs(nss.ns(), QueryOption_SlaveOk); auto idIndexSpec = getIdIndexSpec(sourceIndexes); - ScopedTransaction transaction(txn, MODE_IX); - Lock::DBLock dbWrite(txn->lockState(), dbname, MODE_X); + ScopedTransaction transaction(opCtx, MODE_IX); + Lock::DBLock dbWrite(opCtx->lockState(), dbname, MODE_X); uassert(ErrorCodes::PrimarySteppedDown, str::stream() << "Not primary while copying collection " << ns << " (Cloner)", - !txn->writesAreReplicated() || - repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(txn, nss)); + !opCtx->writesAreReplicated() || + repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(opCtx, nss)); - Database* db = dbHolder().openDb(txn, dbname); + Database* db = dbHolder().openDb(opCtx, dbname); if (shouldCreateCollection) { MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { - txn->checkForInterrupt(); + opCtx->checkForInterrupt(); - WriteUnitOfWork wunit(txn); - Status status = userCreateNS(txn, db, ns, options, true, idIndexSpec); + WriteUnitOfWork wunit(opCtx); + Status status = userCreateNS(opCtx, db, ns, options, true, idIndexSpec); if (!status.isOK()) { errmsg = status.toString(); // abort write unit of work @@ -497,7 +498,7 @@ bool Cloner::copyCollection(OperationContext* txn, } wunit.commit(); } - MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "createCollection", ns); + MONGO_WRITE_CONFLICT_RETRY_LOOP_END(opCtx, "createCollection", ns); } else { LOG(1) << "No collection info found for ns:" << nss.toString() << ", host:" << _conn->getServerAddress(); @@ -506,7 +507,7 @@ bool Cloner::copyCollection(OperationContext* txn, // main data CloneOptions opts; opts.slaveOk = true; - copy(txn, dbname, nss, options, idIndexSpec, nss, opts, Query(query).snapshot()); + copy(opCtx, dbname, nss, options, idIndexSpec, nss, opts, Query(query).snapshot()); /* TODO : copyIndexes bool does not seem to be implemented! */ if (!shouldCopyIndexes) { @@ -514,7 +515,7 @@ bool Cloner::copyCollection(OperationContext* txn, } // indexes - copyIndexes(txn, dbname, NamespaceString(ns), options, sourceIndexes, NamespaceString(ns)); + copyIndexes(opCtx, dbname, NamespaceString(ns), options, sourceIndexes, NamespaceString(ns)); return true; } @@ -564,21 +565,21 @@ StatusWith<std::vector<BSONObj>> Cloner::filterCollectionsForClone( } Status Cloner::createCollectionsForDb( - OperationContext* txn, + OperationContext* opCtx, const std::vector<CreateCollectionParams>& createCollectionParams, const std::string& dbName) { - Database* db = dbHolder().openDb(txn, dbName); + Database* db = dbHolder().openDb(opCtx, dbName); for (auto&& params : createCollectionParams) { auto options = params.collectionInfo["options"].Obj(); const NamespaceString nss(dbName, params.collectionName); uassertStatusOK(userAllowedCreateNS(dbName, params.collectionName)); MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { - txn->checkForInterrupt(); - WriteUnitOfWork wunit(txn); + opCtx->checkForInterrupt(); + WriteUnitOfWork wunit(opCtx); Status createStatus = - userCreateNS(txn, + userCreateNS(opCtx, db, nss.ns(), options, @@ -590,12 +591,12 @@ Status Cloner::createCollectionsForDb( wunit.commit(); } - MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "createCollection", nss.ns()); + MONGO_WRITE_CONFLICT_RETRY_LOOP_END(opCtx, "createCollection", nss.ns()); } return Status::OK(); } -Status Cloner::copyDb(OperationContext* txn, +Status Cloner::copyDb(OperationContext* opCtx, const std::string& toDBName, const string& masterHost, const CloneOptions& opts, @@ -603,7 +604,7 @@ Status Cloner::copyDb(OperationContext* txn, std::vector<BSONObj> collectionsToClone) { massert(10289, "useReplAuth is not written to replication log", - !opts.useReplAuth || !txn->writesAreReplicated()); + !opts.useReplAuth || !opCtx->writesAreReplicated()); auto statusWithMasterHost = ConnectionString::parse(masterHost); if (!statusWithMasterHost.isOK()) { @@ -616,7 +617,7 @@ Status Cloner::copyDb(OperationContext* txn, std::vector<HostAndPort> csServers = cs.getServers(); for (std::vector<HostAndPort>::const_iterator iter = csServers.begin(); iter != csServers.end(); ++iter) { - if (!repl::isSelf(*iter, txn->getServiceContext())) + if (!repl::isSelf(*iter, opCtx->getServiceContext())) continue; masterSameProcess = true; @@ -648,7 +649,7 @@ Status Cloner::copyDb(OperationContext* txn, _conn = std::move(con); } else { - _conn.reset(new DBDirectClient(txn)); + _conn.reset(new DBDirectClient(opCtx)); } } @@ -661,7 +662,7 @@ Status Cloner::copyDb(OperationContext* txn, if (opts.createCollections) { // getCollectionInfos may make a remote call, which may block indefinitely, so release // the global lock that we are entering with. - Lock::TempRelease tempRelease(txn->lockState()); + Lock::TempRelease tempRelease(opCtx->lockState()); std::list<BSONObj> initialCollections = _conn->getCollectionInfos( opts.fromDB, ListCollectionsFilter::makeTypeCollectionFilter()); auto status = filterCollectionsForClone(opts, initialCollections); @@ -687,7 +688,7 @@ Status Cloner::copyDb(OperationContext* txn, // Get index specs for each collection. std::map<StringData, std::list<BSONObj>> collectionIndexSpecs; { - Lock::TempRelease tempRelease(txn->lockState()); + Lock::TempRelease tempRelease(opCtx->lockState()); for (auto&& params : createCollectionParams) { const NamespaceString nss(opts.fromDB, params.collectionName); auto indexSpecs = @@ -701,15 +702,16 @@ Status Cloner::copyDb(OperationContext* txn, } } - uassert(ErrorCodes::NotMaster, - str::stream() << "Not primary while cloning database " << opts.fromDB - << " (after getting list of collections to clone)", - !txn->writesAreReplicated() || - repl::getGlobalReplicationCoordinator()->canAcceptWritesForDatabase(txn, toDBName)); + uassert( + ErrorCodes::NotMaster, + str::stream() << "Not primary while cloning database " << opts.fromDB + << " (after getting list of collections to clone)", + !opCtx->writesAreReplicated() || + repl::getGlobalReplicationCoordinator()->canAcceptWritesForDatabase(opCtx, toDBName)); if (opts.syncData) { if (opts.createCollections) { - Status status = createCollectionsForDb(txn, createCollectionParams, toDBName); + Status status = createCollectionsForDb(opCtx, createCollectionParams, toDBName); if (!status.isOK()) { return status; } @@ -729,7 +731,7 @@ Status Cloner::copyDb(OperationContext* txn, if (opts.snapshot) q.snapshot(); - copy(txn, + copy(opCtx, toDBName, from_name, params.collectionInfo["options"].Obj(), @@ -749,7 +751,7 @@ Status Cloner::copyDb(OperationContext* txn, const NamespaceString to_name(toDBName, params.collectionName); - copyIndexes(txn, + copyIndexes(opCtx, toDBName, from_name, params.collectionInfo["options"].Obj(), |