diff options
-rw-r--r-- | src/mongo/db/cloner.cpp | 273 | ||||
-rw-r--r-- | src/mongo/db/cloner.h | 14 | ||||
-rw-r--r-- | src/mongo/db/s/clone_catalog_data_command.cpp | 3 |
3 files changed, 106 insertions, 184 deletions
diff --git a/src/mongo/db/cloner.cpp b/src/mongo/db/cloner.cpp index 69626a3e4b7..646685701bf 100644 --- a/src/mongo/db/cloner.cpp +++ b/src/mongo/db/cloner.cpp @@ -74,9 +74,6 @@ namespace mongo { -using std::string; -using std::unique_ptr; - using IndexVersion = IndexDescriptor::IndexVersion; MONGO_FAIL_POINT_DEFINE(movePrimaryFailPoint); @@ -98,68 +95,47 @@ BSONObj Cloner::_getIdIndexSpec(const std::list<BSONObj>& indexSpecs) { Cloner::Cloner() {} struct Cloner::Fun { - Fun(OperationContext* opCtx, const string& dbName) + Fun(OperationContext* opCtx, const std::string& dbName) : lastLog(0), opCtx(opCtx), _dbName(dbName) {} void operator()(DBClientCursorBatchIterator& i) { boost::optional<Lock::DBLock> dbLock; dbLock.emplace(opCtx, _dbName, MODE_X); - uassert( - ErrorCodes::NotMaster, - str::stream() << "Not primary while cloning collection " << from_collection.ns() - << " to " << to_collection.ns(), - !opCtx->writesAreReplicated() || - repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesFor(opCtx, to_collection)); + uassert(ErrorCodes::NotMaster, + str::stream() << "Not primary while cloning collection " << nss, + !opCtx->writesAreReplicated() || + repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesFor(opCtx, nss)); // Make sure database still exists after we resume from the temp release auto databaseHolder = DatabaseHolder::get(opCtx); auto db = databaseHolder->openDb(opCtx, _dbName); - - bool createdCollection = false; - Collection* collection = nullptr; - - collection = - CollectionCatalog::get(opCtx).lookupCollectionByNamespace(opCtx, to_collection); + auto collection = CollectionCatalog::get(opCtx).lookupCollectionByNamespace(opCtx, nss); if (!collection) { - massert(17321, - str::stream() << "collection dropped during clone [" << to_collection.ns() - << "]", - !createdCollection); - writeConflictRetry(opCtx, "createCollection", to_collection.ns(), [&] { + writeConflictRetry(opCtx, "createCollection", nss.ns(), [&] { opCtx->checkForInterrupt(); WriteUnitOfWork wunit(opCtx); const bool createDefaultIndexes = true; CollectionOptions collectionOptions = uassertStatusOK(CollectionOptions::parse( from_options, CollectionOptions::ParseKind::parseForCommand)); - invariant(db->userCreateNS(opCtx, - to_collection, - collectionOptions, - createDefaultIndexes, - from_id_index), - str::stream() << "collection creation failed during clone [" - << to_collection.ns() << "]"); + invariant(db->userCreateNS( + opCtx, nss, collectionOptions, createDefaultIndexes, from_id_index), + str::stream() + << "collection creation failed during clone [" << nss << "]"); wunit.commit(); - collection = - CollectionCatalog::get(opCtx).lookupCollectionByNamespace(opCtx, to_collection); + collection = CollectionCatalog::get(opCtx).lookupCollectionByNamespace(opCtx, nss); invariant(collection, - str::stream() - << "Missing collection during clone [" << to_collection.ns() << "]"); + str::stream() << "Missing collection during clone [" << nss << "]"); }); } - const bool isSystemViewsClone = to_collection.isSystemDotViews(); - while (i.moreInCurrentBatch()) { if (numSeen % 128 == 127) { time_t now = time(nullptr); if (now - lastLog >= 60) { // report progress if (lastLog) - LOGV2(20412, - "clone {to_collection} {numSeen}", - "to_collection"_attr = to_collection, - "numSeen"_attr = numSeen); + LOGV2(20412, "clone", "ns"_attr = nss, "numSeen"_attr = numSeen); lastLog = now; } opCtx->checkForInterrupt(); @@ -172,11 +148,10 @@ struct Cloner::Fun { // Check if everything is still all right. if (opCtx->writesAreReplicated()) { - uassert(ErrorCodes::PrimarySteppedDown, - str::stream() << "Cannot write to ns: " << to_collection.ns() - << " after yielding", - repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesFor( - opCtx, to_collection)); + uassert( + ErrorCodes::PrimarySteppedDown, + str::stream() << "Cannot write to ns: " << nss << " after yielding", + repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesFor(opCtx, nss)); } db = databaseHolder->getDb(opCtx, _dbName); @@ -184,41 +159,21 @@ struct Cloner::Fun { str::stream() << "Database " << _dbName << " dropped while cloning", db != nullptr); - collection = - CollectionCatalog::get(opCtx).lookupCollectionByNamespace(opCtx, to_collection); + collection = CollectionCatalog::get(opCtx).lookupCollectionByNamespace(opCtx, nss); uassert(28594, - str::stream() - << "Collection " << to_collection.ns() << " dropped while cloning", + str::stream() << "Collection " << nss << " dropped while cloning", collection != nullptr); } BSONObj tmp = i.nextSafe(); - // If copying the system.views collection to a database with a different name, then any - // view definitions must be modified to refer to the 'to' database. - if (isSystemViewsClone && from_collection.db() != to_collection.db()) { - BSONObjBuilder bob; - for (auto&& item : tmp) { - if (item.fieldNameStringData() == "_id") { - auto viewNss = NamespaceString(item.checkAndGetStringData()); - - bob.append("_id", - NamespaceString(to_collection.db(), viewNss.coll()).toString()); - } else { - bob.append(item); - } - } - tmp = bob.obj(); - } - /* assure object is valid. note this will slow us down a little. */ // Use the latest BSON validation version. We allow cloning of collections containing // decimal data even if decimal is disabled. const Status status = validateBSON(tmp.objdata(), tmp.objsize(), BSONVersion::kLatest); if (!status.isOK()) { str::stream ss; - ss << "Cloner: found corrupt document in " << from_collection.toString() << ": " - << redact(status); + ss << "Cloner: found corrupt document in " << nss << ": " << redact(status); if (gSkipCorruptDocumentsWhenCloning.load()) { LOGV2_WARNING(20423, "{ss_ss_str}; skipping", "ss_ss_str"_attr = ss.ss.str()); continue; @@ -229,7 +184,7 @@ struct Cloner::Fun { verify(collection); ++numSeen; - writeConflictRetry(opCtx, "cloner insert", to_collection.ns(), [&] { + writeConflictRetry(opCtx, "cloner insert", nss.ns(), [&] { opCtx->checkForInterrupt(); WriteUnitOfWork wunit(opCtx); @@ -239,12 +194,11 @@ struct Cloner::Fun { Status status = collection->insertDocument(opCtx, InsertStatement(doc), nullOpDebug, true); if (!status.isOK() && status.code() != ErrorCodes::DuplicateKey) { - LOGV2_ERROR( - 20424, - "error: exception cloning object in {from_collection} {status} obj:{doc}", - "from_collection"_attr = from_collection, - "status"_attr = redact(status), - "doc"_attr = redact(doc)); + LOGV2_ERROR(20424, + "error: exception cloning object", + "ns"_attr = nss, + "status"_attr = redact(status), + "doc"_attr = redact(doc)); uassertStatusOK(status); } if (status.isOK()) { @@ -255,9 +209,9 @@ struct Cloner::Fun { static Rarely sampler; if (sampler.tick() && (time(nullptr) - saveLast > 60)) { LOGV2(20413, - "{numSeen} objects cloned so far from collection {from_collection}", + "objects cloned so far from collection", "numSeen"_attr = numSeen, - "from_collection"_attr = from_collection); + "ns"_attr = nss); saveLast = time(nullptr); } } @@ -265,79 +219,73 @@ struct Cloner::Fun { time_t lastLog; OperationContext* opCtx; - const string _dbName; + const std::string _dbName; int64_t numSeen; - NamespaceString from_collection; + NamespaceString nss; BSONObj from_options; BSONObj from_id_index; - NamespaceString to_collection; time_t saveLast; }; /* copy the specified collection */ void Cloner::_copy(OperationContext* opCtx, - const string& toDBName, - const NamespaceString& from_collection, + const std::string& toDBName, + const NamespaceString& nss, const BSONObj& from_opts, const BSONObj& from_id_index, - const NamespaceString& to_collection, - Query query) { + Query query, + DBClientBase* conn) { LOGV2_DEBUG(20414, 2, - "\t\tcloning collection {from_collection} to {to_collection} on " - "{conn_getServerAddress} with filter {query}", - "from_collection"_attr = from_collection, - "to_collection"_attr = to_collection, - "conn_getServerAddress"_attr = _conn->getServerAddress(), + "\t\tcloning collection with filter", + "ns"_attr = nss, + "conn_getServerAddress"_attr = conn->getServerAddress(), "query"_attr = redact(query.toString())); Fun f(opCtx, toDBName); f.numSeen = 0; - f.from_collection = from_collection; + f.nss = nss; f.from_options = from_opts; f.from_id_index = from_id_index; - f.to_collection = to_collection; f.saveLast = time(nullptr); int options = QueryOption_NoCursorTimeout | QueryOption_Exhaust; { Lock::TempRelease tempRelease(opCtx->lockState()); - _conn->query(std::function<void(DBClientCursorBatchIterator&)>(f), - from_collection, - query, - nullptr, - options, - 0 /* batchSize */, - repl::ReadConcernArgs::kImplicitDefault); + conn->query(std::function<void(DBClientCursorBatchIterator&)>(f), + nss, + query, + nullptr, + options, + 0 /* batchSize */, + repl::ReadConcernArgs::kImplicitDefault); } uassert(ErrorCodes::PrimarySteppedDown, - str::stream() << "Not primary while cloning collection " << from_collection.ns() - << " to " << to_collection.ns() << " with filter " << query.toString(), + str::stream() << "Not primary while cloning collection " << nss.ns() << " with filter " + << query.toString(), !opCtx->writesAreReplicated() || - repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesFor(opCtx, to_collection)); + repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesFor(opCtx, nss)); } void Cloner::_copyIndexes(OperationContext* opCtx, - const string& toDBName, - const NamespaceString& from_collection, + const std::string& toDBName, + const NamespaceString& nss, const BSONObj& from_opts, const std::list<BSONObj>& from_indexes, - const NamespaceString& to_collection) { + DBClientBase* conn) { LOGV2_DEBUG(20415, 2, - "\t\t copyIndexes {from_collection} to {to_collection} on {conn_getServerAddress}", - "from_collection"_attr = from_collection, - "to_collection"_attr = to_collection, - "conn_getServerAddress"_attr = _conn->getServerAddress()); + "\t\t copyIndexes", + "ns"_attr = nss, + "conn_getServerAddress"_attr = conn->getServerAddress()); uassert(ErrorCodes::PrimarySteppedDown, - str::stream() << "Not primary while copying indexes from " << from_collection.ns() - << " to " << to_collection.ns() << " (Cloner)", + str::stream() << "Not primary while copying indexes from " << nss << " (Cloner)", !opCtx->writesAreReplicated() || - repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesFor(opCtx, to_collection)); + repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesFor(opCtx, nss)); if (from_indexes.empty()) return; @@ -347,10 +295,9 @@ void Cloner::_copyIndexes(OperationContext* opCtx, auto databaseHolder = DatabaseHolder::get(opCtx); auto db = databaseHolder->openDb(opCtx, toDBName); - Collection* collection = - CollectionCatalog::get(opCtx).lookupCollectionByNamespace(opCtx, to_collection); + Collection* collection = CollectionCatalog::get(opCtx).lookupCollectionByNamespace(opCtx, nss); if (!collection) { - writeConflictRetry(opCtx, "createCollection", to_collection.ns(), [&] { + writeConflictRetry(opCtx, "createCollection", nss.ns(), [&] { opCtx->checkForInterrupt(); WriteUnitOfWork wunit(opCtx); @@ -358,18 +305,15 @@ void Cloner::_copyIndexes(OperationContext* opCtx, CollectionOptions::parse(from_opts, CollectionOptions::ParseKind::parseForCommand)); const bool createDefaultIndexes = true; invariant(db->userCreateNS(opCtx, - to_collection, + nss, collectionOptions, createDefaultIndexes, _getIdIndexSpec(from_indexes)), - str::stream() - << "Collection creation failed while copying indexes from " - << from_collection.ns() << " to " << to_collection.ns() << " (Cloner)"); + str::stream() << "Collection creation failed while copying indexes from " + << nss << " (Cloner)"); wunit.commit(); - collection = - CollectionCatalog::get(opCtx).lookupCollectionByNamespace(opCtx, to_collection); - invariant(collection, - str::stream() << "Missing collection " << to_collection.ns() << " (Cloner)"); + collection = CollectionCatalog::get(opCtx).lookupCollectionByNamespace(opCtx, nss); + invariant(collection, str::stream() << "Missing collection " << nss << " (Cloner)"); }); } @@ -413,7 +357,7 @@ void Cloner::_copyIndexes(OperationContext* opCtx, uassertStatusOK(indexbuildentryhelpers::addIndexBuildEntry(opCtx, indexbuildEntry)); opObserver->onStartIndexBuild( - opCtx, to_collection, collection->uuid(), *buildUUID, specs, fromMigrate); + opCtx, nss, collection->uuid(), *buildUUID, specs, fromMigrate); return Status::OK(); }; } else { @@ -581,12 +525,11 @@ Status Cloner::_createCollectionsForDb( } Status Cloner::copyDb(OperationContext* opCtx, - const std::string& toDBName, + const std::string& dBName, const std::string& masterHost, - const std::string& fromDBName, const std::vector<NamespaceString>& shardedColls, std::set<std::string>* clonedColls) { - invariant(clonedColls, str::stream() << masterHost << ":" << toDBName); + invariant(clonedColls, str::stream() << masterHost << ":" << dBName); auto statusWithMasterHost = ConnectionString::parse(masterHost); if (!statusWithMasterHost.isOK()) { @@ -607,33 +550,21 @@ Status Cloner::copyDb(OperationContext* opCtx, } if (masterSameProcess) { - if (fromDBName == toDBName) { - // Guard against re-entrance - return Status(ErrorCodes::IllegalOperation, "can't clone from self (localhost)"); - } + // Guard against re-entrance + return Status(ErrorCodes::IllegalOperation, "can't clone from self (localhost)"); } - { - // setup connection - if (_conn.get()) { - // nothing to do - } else if (!masterSameProcess) { - std::string errmsg; - unique_ptr<DBClientBase> con(cs.connect(StringData(), errmsg)); - if (!con.get()) { - return Status(ErrorCodes::HostUnreachable, errmsg); - } - - if (auth::isInternalAuthSet()) { - auto authStatus = con->authenticateInternalUser(); - if (!authStatus.isOK()) { - return authStatus; - } - } + // Set up connection. + std::string errmsg; + std::unique_ptr<DBClientBase> conn(cs.connect(StringData(), errmsg)); + if (!conn.get()) { + return Status(ErrorCodes::HostUnreachable, errmsg); + } - _conn = std::move(con); - } else { - _conn.reset(new DBDirectClient(opCtx)); + if (auth::isInternalAuthSet()) { + auto authStatus = conn->authenticateInternalUser(); + if (!authStatus.isOK()) { + return authStatus; } } @@ -646,10 +577,10 @@ Status Cloner::copyDb(OperationContext* opCtx, // the global lock that we are entering with. Lock::TempRelease tempRelease(opCtx->lockState()); - std::list<BSONObj> initialCollections = _conn->getCollectionInfos( - fromDBName, ListCollectionsFilter::makeTypeCollectionFilter()); + std::list<BSONObj> initialCollections = + conn->getCollectionInfos(dBName, ListCollectionsFilter::makeTypeCollectionFilter()); - auto status = _filterCollectionsForClone(fromDBName, initialCollections); + auto status = _filterCollectionsForClone(dBName, initialCollections); if (!status.isOK()) { return status.getStatus(); } @@ -665,7 +596,7 @@ Status Cloner::copyDb(OperationContext* opCtx, params.idIndexSpec = idIndex.Obj(); } - const NamespaceString ns(fromDBName, params.collectionName); + const NamespaceString ns(dBName, params.collectionName); if (std::find(shardedColls.begin(), shardedColls.end(), ns) != shardedColls.end()) { params.shardedColl = true; } @@ -677,8 +608,8 @@ Status Cloner::copyDb(OperationContext* opCtx, { Lock::TempRelease tempRelease(opCtx->lockState()); for (auto&& params : createCollectionParams) { - const NamespaceString nss(fromDBName, params.collectionName); - auto indexSpecs = _conn->getIndexSpecs(nss); + const NamespaceString nss(dBName, params.collectionName); + auto indexSpecs = conn->getIndexSpecs(nss); collectionIndexSpecs[params.collectionName] = indexSpecs; @@ -690,12 +621,12 @@ Status Cloner::copyDb(OperationContext* opCtx, uassert( ErrorCodes::NotMaster, - str::stream() << "Not primary while cloning database " << fromDBName + str::stream() << "Not primary while cloning database " << dBName << " (after getting list of collections to clone)", !opCtx->writesAreReplicated() || - repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesForDatabase(opCtx, toDBName)); + repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesForDatabase(opCtx, dBName)); - auto status = _createCollectionsForDb(opCtx, createCollectionParams, toDBName); + auto status = _createCollectionsForDb(opCtx, createCollectionParams, dBName); if (!status.isOK()) { return status; } @@ -710,24 +641,19 @@ Status Cloner::copyDb(OperationContext* opCtx, " really will clone: {params_collectionInfo}", "params_collectionInfo"_attr = params.collectionInfo); - const NamespaceString from_name(fromDBName, params.collectionName); - const NamespaceString to_name(toDBName, params.collectionName); + const NamespaceString nss(dBName, params.collectionName); - clonedColls->insert(from_name.ns()); + clonedColls->insert(nss.ns()); - LOGV2_DEBUG(20421, - 1, - "\t\t cloning {from_name} -> {to_name}", - "from_name"_attr = from_name, - "to_name"_attr = to_name); + LOGV2_DEBUG(20421, 1, "\t\t cloning", "ns"_attr = nss, "host"_attr = masterHost); _copy(opCtx, - toDBName, - from_name, + dBName, + nss, params.collectionInfo["options"].Obj(), params.idIndexSpec, - to_name, - Query()); + Query(), + conn.get()); } // now build the secondary indexes @@ -736,16 +662,15 @@ Status Cloner::copyDb(OperationContext* opCtx, "copying indexes for: {params_collectionInfo}", "params_collectionInfo"_attr = params.collectionInfo); - const NamespaceString from_name(fromDBName, params.collectionName); - const NamespaceString to_name(toDBName, params.collectionName); + const NamespaceString nss(dBName, params.collectionName); _copyIndexes(opCtx, - toDBName, - from_name, + dBName, + nss, params.collectionInfo["options"].Obj(), collectionIndexSpecs[params.collectionName], - to_name); + conn.get()); } return Status::OK(); diff --git a/src/mongo/db/cloner.h b/src/mongo/db/cloner.h index cc3ee06fdeb..0760355a3ff 100644 --- a/src/mongo/db/cloner.h +++ b/src/mongo/db/cloner.h @@ -61,9 +61,8 @@ public: * ignored and the collection list is fetched from the remote via _conn. */ Status copyDb(OperationContext* opCtx, - const std::string& toDBName, + const std::string& dBName, const std::string& masterHost, - const std::string& fromDBName, const std::vector<NamespaceString>& shardedColls, std::set<std::string>* clonedColls); @@ -93,21 +92,20 @@ private: void _copy(OperationContext* opCtx, const std::string& toDBName, - const NamespaceString& from_ns, + const NamespaceString& nss, const BSONObj& from_opts, const BSONObj& from_id_index, - const NamespaceString& to_ns, - Query q); + Query q, + DBClientBase* conn); void _copyIndexes(OperationContext* opCtx, const std::string& toDBName, - const NamespaceString& from_ns, + const NamespaceString& nss, const BSONObj& from_opts, const std::list<BSONObj>& from_indexes, - const NamespaceString& to_ns); + DBClientBase* conn); struct Fun; - std::unique_ptr<DBClientBase> _conn; }; } // namespace mongo diff --git a/src/mongo/db/s/clone_catalog_data_command.cpp b/src/mongo/db/s/clone_catalog_data_command.cpp index 4d127bda847..7159838120d 100644 --- a/src/mongo/db/s/clone_catalog_data_command.cpp +++ b/src/mongo/db/s/clone_catalog_data_command.cpp @@ -127,8 +127,7 @@ public: Lock::DBLock dbXLock(opCtx, dbname, MODE_X); Cloner cloner; - uassertStatusOK( - cloner.copyDb(opCtx, dbname, from.toString(), dbname, shardedColls, &clonedColls)); + uassertStatusOK(cloner.copyDb(opCtx, dbname, from.toString(), shardedColls, &clonedColls)); { BSONArrayBuilder cloneBarr = result.subarrayStart("clonedColls"); cloneBarr.append(clonedColls); |