diff options
author | Eric Milkie <milkie@10gen.com> | 2016-04-28 10:39:38 -0400 |
---|---|---|
committer | Eric Milkie <milkie@10gen.com> | 2016-05-02 10:50:13 -0400 |
commit | c52c530428fbbe0cae1293ad6605c3ab7be2a281 (patch) | |
tree | b4e007e87a4ee76b645bc27ed20924c6ddad7504 /src | |
parent | 1c5be329f5e3903d5cd4e9d106022733507b5e3f (diff) | |
download | mongo-c52c530428fbbe0cae1293ad6605c3ab7be2a281.tar.gz |
SERVER-23919 gather all collection names at the start of initial sync
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/cloner.cpp | 172 | ||||
-rw-r--r-- | src/mongo/db/cloner.h | 25 | ||||
-rw-r--r-- | src/mongo/db/repl/rs_initialsync.cpp | 126 |
3 files changed, 198 insertions, 125 deletions
diff --git a/src/mongo/db/cloner.cpp b/src/mongo/db/cloner.cpp index cc09104ab10..635381749f0 100644 --- a/src/mongo/db/cloner.cpp +++ b/src/mongo/db/cloner.cpp @@ -34,8 +34,8 @@ #include "mongo/db/cloner.h" - #include "mongo/base/status.h" +#include "mongo/bson/util/bson_extract.h" #include "mongo/bson/util/builder.h" #include "mongo/client/dbclientinterface.h" #include "mongo/db/auth/authorization_manager.h" @@ -51,7 +51,6 @@ #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/dbhelpers.h" -#include "mongo/db/service_context.h" #include "mongo/db/index_builder.h" #include "mongo/db/jsobj.h" #include "mongo/db/namespace_string.h" @@ -59,6 +58,7 @@ #include "mongo/db/repl/isself.h" #include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/db/server_parameters.h" +#include "mongo/db/service_context.h" #include "mongo/db/storage/storage_options.h" #include "mongo/s/grid.h" #include "mongo/util/assert_util.h" @@ -426,11 +426,81 @@ bool Cloner::copyCollection(OperationContext* txn, return true; } +StatusWith<std::vector<BSONObj>> Cloner::filterCollectionsForClone( + const CloneOptions& opts, const std::list<BSONObj>& initialCollections) { + std::vector<BSONObj> finalCollections; + for (auto&& collection : initialCollections) { + LOG(2) << "\t cloner got " << collection; + + BSONElement collectionOptions = collection["options"]; + if (collectionOptions.isABSONObj()) { + auto parseOptionsStatus = CollectionOptions().parse(collectionOptions.Obj()); + if (!parseOptionsStatus.isOK()) { + return parseOptionsStatus; + } + } + + std::string collectionName; + auto status = bsonExtractStringField(collection, "name", &collectionName); + if (!status.isOK()) { + return status; + } + + const NamespaceString ns(opts.fromDB, collectionName.c_str()); + + if (ns.isSystem()) { + if (legalClientSystemNS(ns.ns(), true) == 0) { + LOG(2) << "\t\t not cloning because system collection" << endl; + continue; + } + } + if (!ns.isNormal()) { + LOG(2) << "\t\t not cloning because has $ "; + continue; + } + if (opts.collsToIgnore.find(ns.ns()) != opts.collsToIgnore.end()) { + LOG(2) << "\t\t ignoring collection " << ns; + continue; + } else { + LOG(2) << "\t\t not ignoring collection " << ns; + } + + finalCollections.push_back(collection.getOwned()); + } + return finalCollections; +} + +Status Cloner::createCollectionsForDb(OperationContext* txn, + const std::vector<BSONObj>& collections, + const std::string& dbName) { + Database* db = dbHolder().openDb(txn, dbName); + for (auto&& collection : collections) { + BSONObj options = collection.getObjectField("options"); + const NamespaceString nss(dbName, collection["name"].valuestr()); + + MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { + txn->checkForInterrupt(); + WriteUnitOfWork wunit(txn); + + // we defer building id index for performance - building it in batch is much faster + Status createStatus = userCreateNS(txn, db, nss.ns(), options, false); + if (!createStatus.isOK()) { + return createStatus; + } + + wunit.commit(); + } + MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "createCollection", nss.ns()); + } + return Status::OK(); +} + Status Cloner::copyDb(OperationContext* txn, const std::string& toDBName, const string& masterHost, const CloneOptions& opts, - set<string>* clonedColls) { + set<string>* clonedColls, + std::vector<BSONObj> collectionsToClone) { massert(10289, "useReplAuth is not written to replication log", !opts.useReplAuth || !txn->writesAreReplicated()); @@ -483,67 +553,23 @@ Status Cloner::copyDb(OperationContext* txn, } // Gather the list of collections to clone - list<BSONObj> toClone; + std::vector<BSONObj> toClone; if (clonedColls) { clonedColls->clear(); } - - { + if (opts.createCollections) { // getCollectionInfos may make a remote call, which may block indefinitely, so release // the global lock that we are entering with. Lock::TempRelease tempRelease(txn->lockState()); - - list<BSONObj> raw = _conn->getCollectionInfos(opts.fromDB); - for (list<BSONObj>::iterator it = raw.begin(); it != raw.end(); ++it) { - BSONObj collection = *it; - - LOG(2) << "\t cloner got " << collection << endl; - - BSONElement collectionOptions = collection["options"]; - if (collectionOptions.isABSONObj()) { - Status parseOptionsStatus = CollectionOptions().parse(collectionOptions.Obj()); - if (!parseOptionsStatus.isOK()) { - return parseOptionsStatus; - } - } - - BSONElement e = collection.getField("name"); - if (e.eoo()) { - string s = "bad collection object " + collection.toString(); - massert(10290, s.c_str(), false); - } - verify(!e.eoo()); - verify(e.type() == String); - - const NamespaceString ns(opts.fromDB, e.valuestr()); - - if (ns.isSystem()) { - /* system.users and s.js is cloned -- but nothing else from system. - * system.indexes is handled specially at the end*/ - if (legalClientSystemNS(ns.ns(), true) == 0) { - LOG(2) << "\t\t not cloning because system collection" << endl; - continue; - } - } - if (!ns.isNormal()) { - LOG(2) << "\t\t not cloning because has $ "; - continue; - } - - if (opts.collsToIgnore.find(ns.ns()) != opts.collsToIgnore.end()) { - LOG(2) << "\t\t ignoring collection " << ns; - continue; - } else { - LOG(2) << "\t\t not ignoring collection " << ns; - } - - if (clonedColls) { - clonedColls->insert(ns.ns()); - } - - toClone.push_back(collection.getOwned()); + std::list<BSONObj> initialCollections = _conn->getCollectionInfos(opts.fromDB); + auto status = filterCollectionsForClone(opts, initialCollections); + if (!status.isOK()) { + return status.getStatus(); } + toClone = status.getValue(); + } else { + toClone = collectionsToClone; } uassert(ErrorCodes::NotMaster, @@ -553,32 +579,23 @@ Status Cloner::copyDb(OperationContext* txn, repl::getGlobalReplicationCoordinator()->canAcceptWritesForDatabase(toDBName)); if (opts.syncData) { - for (list<BSONObj>::iterator i = toClone.begin(); i != toClone.end(); i++) { - BSONObj collection = *i; + if (opts.createCollections) { + Status status = createCollectionsForDb(txn, toClone, toDBName); + if (!status.isOK()) { + return status; + } + } + for (auto&& collection : toClone) { LOG(2) << " really will clone: " << collection << endl; + const char* collectionName = collection["name"].valuestr(); BSONObj options = collection.getObjectField("options"); const NamespaceString from_name(opts.fromDB, collectionName); const NamespaceString to_name(toDBName, collectionName); - Database* db = dbHolder().openDb(txn, toDBName); - - { - MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { - txn->checkForInterrupt(); - WriteUnitOfWork wunit(txn); - - // we defer building id index for performance - building it in batch is much - // faster - Status createStatus = userCreateNS(txn, db, to_name.ns(), options, false); - if (!createStatus.isOK()) { - return createStatus; - } - - wunit.commit(); - } - MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "createCollection", to_name.ns()); + if (clonedColls) { + clonedColls->insert(from_name.ns()); } LOG(1) << "\t\t cloning " << from_name << " -> " << to_name << endl; @@ -591,7 +608,7 @@ Status Cloner::copyDb(OperationContext* txn, // Copy releases the lock, so we need to re-load the database. This should // probably throw if the database has changed in between, but for now preserve // the existing behaviour. - db = dbHolder().get(txn, toDBName); + Database* db = dbHolder().get(txn, toDBName); uassert(18645, str::stream() << "database " << toDBName << " dropped during clone", db); Collection* c = db->getCollection(to_name); @@ -635,8 +652,7 @@ Status Cloner::copyDb(OperationContext* txn, // now build the secondary indexes if (opts.syncIndexes) { - for (list<BSONObj>::iterator i = toClone.begin(); i != toClone.end(); i++) { - BSONObj collection = *i; + for (auto&& collection : toClone) { log() << "copying indexes for: " << collection; const char* collectionName = collection["name"].valuestr(); diff --git a/src/mongo/db/cloner.h b/src/mongo/db/cloner.h index 8a5cb691375..7b091e4f29b 100644 --- a/src/mongo/db/cloner.h +++ b/src/mongo/db/cloner.h @@ -30,6 +30,9 @@ #pragma once +#include <vector> +#include <string> + #include "mongo/base/disallow_copying.h" #include "mongo/client/dbclientinterface.h" #include "mongo/s/catalog/catalog_manager.h" @@ -54,12 +57,18 @@ public: /** * Copies an entire database from the specified host. + * clonedColls: when not-null, the function will return with this populated with a list of + * the collections that were cloned. This is for the user-facing clone command. + * collectionsToClone: When opts.createCollections is false, this list reflects the collections + * that are cloned. When opts.createCollections is true, this parameter is + * ignored and the collection list is fetched from the remote via _conn. */ Status copyDb(OperationContext* txn, const std::string& toDBName, const std::string& masterHost, const CloneOptions& opts, - std::set<std::string>* clonedColls); + std::set<std::string>* clonedColls, + std::vector<BSONObj> collectionsToClone = std::vector<BSONObj>()); bool copyCollection(OperationContext* txn, const std::string& ns, @@ -67,6 +76,17 @@ public: std::string& errmsg, bool copyIndexes); + // Filters a database's collection list and removes collections that should not be cloned. + // CloneOptions should be populated with a fromDB and a list of collections to ignore, which + // will be filtered out. + StatusWith<std::vector<BSONObj>> filterCollectionsForClone( + const CloneOptions& opts, const std::list<BSONObj>& initialCollections); + + // Executes 'createCollection' for each collection specified in 'collections', in 'dbName'. + Status createCollectionsForDb(OperationContext* txn, + const std::vector<BSONObj>& collections, + const std::string& dbName); + private: void copy(OperationContext* txn, const std::string& toDBName, @@ -99,6 +119,8 @@ private: * holding a distributed lock (such as movePrimary). Indicates that we need to * be periodically checking to see if the catalog manager has swapped and fail * if it has so that we don't block the mongos that initiated the command. + * createCollections - When 'true', will fetch a list of collections from the remote and create + * them. When 'false', assumes collections have already been created ahead of time. */ struct CloneOptions { std::string fromDB; @@ -111,6 +133,7 @@ struct CloneOptions { bool syncData = true; bool syncIndexes = true; bool checkForCatalogChange = false; + bool createCollections = true; }; } // namespace mongo diff --git a/src/mongo/db/repl/rs_initialsync.cpp b/src/mongo/db/repl/rs_initialsync.cpp index cb4cd2826d5..15a5657eed5 100644 --- a/src/mongo/db/repl/rs_initialsync.cpp +++ b/src/mongo/db/repl/rs_initialsync.cpp @@ -166,42 +166,40 @@ void checkAdminDatabasePostClone(OperationContext* txn, Database* adminDb) { bool _initialSyncClone(OperationContext* txn, Cloner& cloner, const std::string& host, - const list<string>& dbs, + const std::string& db, + std::vector<BSONObj>& collections, bool dataPass) { - for (list<string>::const_iterator i = dbs.begin(); i != dbs.end(); i++) { - const string db = *i; - if (db == "local") - continue; - - if (dataPass) - log() << "initial sync cloning db: " << db; - else - log() << "initial sync cloning indexes for : " << db; - - CloneOptions options; - options.fromDB = db; - options.slaveOk = true; - options.useReplAuth = true; - options.snapshot = false; - options.syncData = dataPass; - options.syncIndexes = !dataPass; - - // Make database stable - ScopedTransaction transaction(txn, MODE_IX); - Lock::DBLock dbWrite(txn->lockState(), db, MODE_X); - - Status status = cloner.copyDb(txn, db, host, options, NULL); - if (!status.isOK()) { - log() << "initial sync: error while " << (dataPass ? "cloning " : "indexing ") << db - << ". " << status.toString(); - return false; - } + if (db == "local") + return true; - if (db == "admin") { - checkAdminDatabasePostClone(txn, dbHolder().get(txn, db)); - } + if (dataPass) + log() << "initial sync cloning db: " << db; + else + log() << "initial sync cloning indexes for : " << db; + + CloneOptions options; + options.fromDB = db; + options.slaveOk = true; + options.useReplAuth = true; + options.snapshot = false; + options.syncData = dataPass; + options.syncIndexes = !dataPass; + options.createCollections = false; + + // Make database stable + ScopedTransaction transaction(txn, MODE_IX); + Lock::DBLock dbWrite(txn->lockState(), db, MODE_X); + + Status status = cloner.copyDb(txn, db, host, options, nullptr, collections); + if (!status.isOK()) { + log() << "initial sync: error while " << (dataPass ? "cloning " : "indexing ") << db + << ". " << status.toString(); + return false; } + if (dataPass && (db == "admin")) { + checkAdminDatabasePostClone(txn, dbHolder().get(txn, db)); + } return true; } @@ -366,11 +364,42 @@ Status _initialSync() { if (admin != dbs.end()) { dbs.splice(dbs.begin(), dbs, admin); } + // Ignore local db + dbs.erase(std::remove(dbs.begin(), dbs.end(), "local"), dbs.end()); } Cloner cloner; - if (!_initialSyncClone(&txn, cloner, r.conn()->getServerAddress(), dbs, true)) { - return Status(ErrorCodes::InitialSyncFailure, "initial sync failed data cloning"); + std::map<std::string, std::vector<BSONObj>> collectionsPerDb; + for (auto&& db : dbs) { + CloneOptions options; + options.fromDB = db; + log() << "fetching and creating collections for " << db; + std::list<BSONObj> initialCollections = + r.conn()->getCollectionInfos(options.fromDB); // may uassert + auto fetchStatus = cloner.filterCollectionsForClone(options, initialCollections); + if (!fetchStatus.isOK()) { + return fetchStatus.getStatus(); + } + auto collections = fetchStatus.getValue(); + + ScopedTransaction transaction(&txn, MODE_IX); + Lock::DBLock dbWrite(txn.lockState(), db, MODE_X); + + auto createStatus = cloner.createCollectionsForDb(&txn, collections, db); + if (!createStatus.isOK()) { + return createStatus; + } + collectionsPerDb.emplace(db, std::move(collections)); + } + for (auto&& dbCollsPair : collectionsPerDb) { + if (!_initialSyncClone(&txn, + cloner, + r.conn()->getServerAddress(), + dbCollsPair.first, + dbCollsPair.second, + true)) { + return Status(ErrorCodes::InitialSyncFailure, "initial sync failed data cloning"); + } } log() << "initial sync data copy, starting syncup"; @@ -388,9 +417,9 @@ Status _initialSync() { str::stream() << "initial sync failed: " << msg); } - // Now we sync to the latest op on the sync target _again_, as we may have recloned ops - // that were "from the future" from the data clone. During this second application, - // nothing should need to be recloned. + // Now we sync to the latest op on the sync target _again_, as we may have recloned ops that + // were "from the future" from the data clone. During this second application, nothing should + // need to be recloned. // TODO: replace with "tail" instance below, since we don't need to retry/reclone missing docs. msg = "oplog sync 2 of 3"; log() << msg; @@ -402,15 +431,21 @@ Status _initialSync() { msg = "initial sync building indexes"; log() << msg; - if (!_initialSyncClone(&txn, cloner, r.conn()->getServerAddress(), dbs, false)) { - return Status(ErrorCodes::InitialSyncFailure, - str::stream() << "initial sync failed: " << msg); + for (auto&& dbCollsPair : collectionsPerDb) { + if (!_initialSyncClone(&txn, + cloner, + r.conn()->getServerAddress(), + dbCollsPair.first, + dbCollsPair.second, + false)) { + return Status(ErrorCodes::InitialSyncFailure, + str::stream() << "initial sync failed: " << msg); + } } - // WARNING: If the 3rd oplog sync step is removed we must reset minValid - // to the last entry on the source server so that we don't come - // out of recovering until we get there (since the previous steps - // could have fetched newer document than the oplog entry we were applying from). + // WARNING: If the 3rd oplog sync step is removed we must reset minValid to the last entry on + // the source server so that we don't come out of recovering until we get there (since the + // previous steps could have fetched newer document than the oplog entry we were applying from). msg = "oplog sync 3 of 3"; log() << msg; @@ -436,8 +471,7 @@ Status _initialSync() { OpTime lastOpTimeWritten(getGlobalReplicationCoordinator()->getMyLastAppliedOpTime()); log() << "set minValid=" << lastOpTimeWritten; - // Initial sync is now complete. Flag this by setting minValid to the last thing - // we synced. + // Initial sync is now complete. Flag this by setting minValid to the last thing we synced. StorageInterface::get(&txn)->setMinValid(&txn, lastOpTimeWritten, DurableRequirement::None); BackgroundSync::get()->setInitialSyncRequestedFlag(false); } |