summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorEric Milkie <milkie@10gen.com>2016-04-28 10:39:38 -0400
committerEric Milkie <milkie@10gen.com>2016-05-02 10:50:13 -0400
commitc52c530428fbbe0cae1293ad6605c3ab7be2a281 (patch)
treeb4e007e87a4ee76b645bc27ed20924c6ddad7504 /src/mongo
parent1c5be329f5e3903d5cd4e9d106022733507b5e3f (diff)
downloadmongo-c52c530428fbbe0cae1293ad6605c3ab7be2a281.tar.gz
SERVER-23919 gather all collection names at the start of initial sync
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/cloner.cpp172
-rw-r--r--src/mongo/db/cloner.h25
-rw-r--r--src/mongo/db/repl/rs_initialsync.cpp126
3 files changed, 198 insertions, 125 deletions
diff --git a/src/mongo/db/cloner.cpp b/src/mongo/db/cloner.cpp
index cc09104ab10..635381749f0 100644
--- a/src/mongo/db/cloner.cpp
+++ b/src/mongo/db/cloner.cpp
@@ -34,8 +34,8 @@
#include "mongo/db/cloner.h"
-
#include "mongo/base/status.h"
+#include "mongo/bson/util/bson_extract.h"
#include "mongo/bson/util/builder.h"
#include "mongo/client/dbclientinterface.h"
#include "mongo/db/auth/authorization_manager.h"
@@ -51,7 +51,6 @@
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/dbhelpers.h"
-#include "mongo/db/service_context.h"
#include "mongo/db/index_builder.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/namespace_string.h"
@@ -59,6 +58,7 @@
#include "mongo/db/repl/isself.h"
#include "mongo/db/repl/replication_coordinator_global.h"
#include "mongo/db/server_parameters.h"
+#include "mongo/db/service_context.h"
#include "mongo/db/storage/storage_options.h"
#include "mongo/s/grid.h"
#include "mongo/util/assert_util.h"
@@ -426,11 +426,81 @@ bool Cloner::copyCollection(OperationContext* txn,
return true;
}
+StatusWith<std::vector<BSONObj>> Cloner::filterCollectionsForClone(
+ const CloneOptions& opts, const std::list<BSONObj>& initialCollections) {
+ std::vector<BSONObj> finalCollections;
+ for (auto&& collection : initialCollections) {
+ LOG(2) << "\t cloner got " << collection;
+
+ BSONElement collectionOptions = collection["options"];
+ if (collectionOptions.isABSONObj()) {
+ auto parseOptionsStatus = CollectionOptions().parse(collectionOptions.Obj());
+ if (!parseOptionsStatus.isOK()) {
+ return parseOptionsStatus;
+ }
+ }
+
+ std::string collectionName;
+ auto status = bsonExtractStringField(collection, "name", &collectionName);
+ if (!status.isOK()) {
+ return status;
+ }
+
+ const NamespaceString ns(opts.fromDB, collectionName.c_str());
+
+ if (ns.isSystem()) {
+ if (legalClientSystemNS(ns.ns(), true) == 0) {
+ LOG(2) << "\t\t not cloning because system collection" << endl;
+ continue;
+ }
+ }
+ if (!ns.isNormal()) {
+ LOG(2) << "\t\t not cloning because has $ ";
+ continue;
+ }
+ if (opts.collsToIgnore.find(ns.ns()) != opts.collsToIgnore.end()) {
+ LOG(2) << "\t\t ignoring collection " << ns;
+ continue;
+ } else {
+ LOG(2) << "\t\t not ignoring collection " << ns;
+ }
+
+ finalCollections.push_back(collection.getOwned());
+ }
+ return finalCollections;
+}
+
+Status Cloner::createCollectionsForDb(OperationContext* txn,
+ const std::vector<BSONObj>& collections,
+ const std::string& dbName) {
+ Database* db = dbHolder().openDb(txn, dbName);
+ for (auto&& collection : collections) {
+ BSONObj options = collection.getObjectField("options");
+ const NamespaceString nss(dbName, collection["name"].valuestr());
+
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
+ txn->checkForInterrupt();
+ WriteUnitOfWork wunit(txn);
+
+ // we defer building id index for performance - building it in batch is much faster
+ Status createStatus = userCreateNS(txn, db, nss.ns(), options, false);
+ if (!createStatus.isOK()) {
+ return createStatus;
+ }
+
+ wunit.commit();
+ }
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "createCollection", nss.ns());
+ }
+ return Status::OK();
+}
+
Status Cloner::copyDb(OperationContext* txn,
const std::string& toDBName,
const string& masterHost,
const CloneOptions& opts,
- set<string>* clonedColls) {
+ set<string>* clonedColls,
+ std::vector<BSONObj> collectionsToClone) {
massert(10289,
"useReplAuth is not written to replication log",
!opts.useReplAuth || !txn->writesAreReplicated());
@@ -483,67 +553,23 @@ Status Cloner::copyDb(OperationContext* txn,
}
// Gather the list of collections to clone
- list<BSONObj> toClone;
+ std::vector<BSONObj> toClone;
if (clonedColls) {
clonedColls->clear();
}
-
- {
+ if (opts.createCollections) {
// getCollectionInfos may make a remote call, which may block indefinitely, so release
// the global lock that we are entering with.
Lock::TempRelease tempRelease(txn->lockState());
-
- list<BSONObj> raw = _conn->getCollectionInfos(opts.fromDB);
- for (list<BSONObj>::iterator it = raw.begin(); it != raw.end(); ++it) {
- BSONObj collection = *it;
-
- LOG(2) << "\t cloner got " << collection << endl;
-
- BSONElement collectionOptions = collection["options"];
- if (collectionOptions.isABSONObj()) {
- Status parseOptionsStatus = CollectionOptions().parse(collectionOptions.Obj());
- if (!parseOptionsStatus.isOK()) {
- return parseOptionsStatus;
- }
- }
-
- BSONElement e = collection.getField("name");
- if (e.eoo()) {
- string s = "bad collection object " + collection.toString();
- massert(10290, s.c_str(), false);
- }
- verify(!e.eoo());
- verify(e.type() == String);
-
- const NamespaceString ns(opts.fromDB, e.valuestr());
-
- if (ns.isSystem()) {
- /* system.users and s.js is cloned -- but nothing else from system.
- * system.indexes is handled specially at the end*/
- if (legalClientSystemNS(ns.ns(), true) == 0) {
- LOG(2) << "\t\t not cloning because system collection" << endl;
- continue;
- }
- }
- if (!ns.isNormal()) {
- LOG(2) << "\t\t not cloning because has $ ";
- continue;
- }
-
- if (opts.collsToIgnore.find(ns.ns()) != opts.collsToIgnore.end()) {
- LOG(2) << "\t\t ignoring collection " << ns;
- continue;
- } else {
- LOG(2) << "\t\t not ignoring collection " << ns;
- }
-
- if (clonedColls) {
- clonedColls->insert(ns.ns());
- }
-
- toClone.push_back(collection.getOwned());
+ std::list<BSONObj> initialCollections = _conn->getCollectionInfos(opts.fromDB);
+ auto status = filterCollectionsForClone(opts, initialCollections);
+ if (!status.isOK()) {
+ return status.getStatus();
}
+ toClone = status.getValue();
+ } else {
+ toClone = collectionsToClone;
}
uassert(ErrorCodes::NotMaster,
@@ -553,32 +579,23 @@ Status Cloner::copyDb(OperationContext* txn,
repl::getGlobalReplicationCoordinator()->canAcceptWritesForDatabase(toDBName));
if (opts.syncData) {
- for (list<BSONObj>::iterator i = toClone.begin(); i != toClone.end(); i++) {
- BSONObj collection = *i;
+ if (opts.createCollections) {
+ Status status = createCollectionsForDb(txn, toClone, toDBName);
+ if (!status.isOK()) {
+ return status;
+ }
+ }
+ for (auto&& collection : toClone) {
LOG(2) << " really will clone: " << collection << endl;
+
const char* collectionName = collection["name"].valuestr();
BSONObj options = collection.getObjectField("options");
const NamespaceString from_name(opts.fromDB, collectionName);
const NamespaceString to_name(toDBName, collectionName);
- Database* db = dbHolder().openDb(txn, toDBName);
-
- {
- MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
- txn->checkForInterrupt();
- WriteUnitOfWork wunit(txn);
-
- // we defer building id index for performance - building it in batch is much
- // faster
- Status createStatus = userCreateNS(txn, db, to_name.ns(), options, false);
- if (!createStatus.isOK()) {
- return createStatus;
- }
-
- wunit.commit();
- }
- MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "createCollection", to_name.ns());
+ if (clonedColls) {
+ clonedColls->insert(from_name.ns());
}
LOG(1) << "\t\t cloning " << from_name << " -> " << to_name << endl;
@@ -591,7 +608,7 @@ Status Cloner::copyDb(OperationContext* txn,
// Copy releases the lock, so we need to re-load the database. This should
// probably throw if the database has changed in between, but for now preserve
// the existing behaviour.
- db = dbHolder().get(txn, toDBName);
+ Database* db = dbHolder().get(txn, toDBName);
uassert(18645, str::stream() << "database " << toDBName << " dropped during clone", db);
Collection* c = db->getCollection(to_name);
@@ -635,8 +652,7 @@ Status Cloner::copyDb(OperationContext* txn,
// now build the secondary indexes
if (opts.syncIndexes) {
- for (list<BSONObj>::iterator i = toClone.begin(); i != toClone.end(); i++) {
- BSONObj collection = *i;
+ for (auto&& collection : toClone) {
log() << "copying indexes for: " << collection;
const char* collectionName = collection["name"].valuestr();
diff --git a/src/mongo/db/cloner.h b/src/mongo/db/cloner.h
index 8a5cb691375..7b091e4f29b 100644
--- a/src/mongo/db/cloner.h
+++ b/src/mongo/db/cloner.h
@@ -30,6 +30,9 @@
#pragma once
+#include <vector>
+#include <string>
+
#include "mongo/base/disallow_copying.h"
#include "mongo/client/dbclientinterface.h"
#include "mongo/s/catalog/catalog_manager.h"
@@ -54,12 +57,18 @@ public:
/**
* Copies an entire database from the specified host.
+ * clonedColls: when not-null, the function will return with this populated with a list of
+ * the collections that were cloned. This is for the user-facing clone command.
+ * collectionsToClone: When opts.createCollections is false, this list reflects the collections
+ * that are cloned. When opts.createCollections is true, this parameter is
+ * ignored and the collection list is fetched from the remote via _conn.
*/
Status copyDb(OperationContext* txn,
const std::string& toDBName,
const std::string& masterHost,
const CloneOptions& opts,
- std::set<std::string>* clonedColls);
+ std::set<std::string>* clonedColls,
+ std::vector<BSONObj> collectionsToClone = std::vector<BSONObj>());
bool copyCollection(OperationContext* txn,
const std::string& ns,
@@ -67,6 +76,17 @@ public:
std::string& errmsg,
bool copyIndexes);
+ // Filters a database's collection list and removes collections that should not be cloned.
+ // CloneOptions should be populated with a fromDB and a list of collections to ignore, which
+ // will be filtered out.
+ StatusWith<std::vector<BSONObj>> filterCollectionsForClone(
+ const CloneOptions& opts, const std::list<BSONObj>& initialCollections);
+
+ // Executes 'createCollection' for each collection specified in 'collections', in 'dbName'.
+ Status createCollectionsForDb(OperationContext* txn,
+ const std::vector<BSONObj>& collections,
+ const std::string& dbName);
+
private:
void copy(OperationContext* txn,
const std::string& toDBName,
@@ -99,6 +119,8 @@ private:
* holding a distributed lock (such as movePrimary). Indicates that we need to
* be periodically checking to see if the catalog manager has swapped and fail
* if it has so that we don't block the mongos that initiated the command.
+ * createCollections - When 'true', will fetch a list of collections from the remote and create
+ * them. When 'false', assumes collections have already been created ahead of time.
*/
struct CloneOptions {
std::string fromDB;
@@ -111,6 +133,7 @@ struct CloneOptions {
bool syncData = true;
bool syncIndexes = true;
bool checkForCatalogChange = false;
+ bool createCollections = true;
};
} // namespace mongo
diff --git a/src/mongo/db/repl/rs_initialsync.cpp b/src/mongo/db/repl/rs_initialsync.cpp
index cb4cd2826d5..15a5657eed5 100644
--- a/src/mongo/db/repl/rs_initialsync.cpp
+++ b/src/mongo/db/repl/rs_initialsync.cpp
@@ -166,42 +166,40 @@ void checkAdminDatabasePostClone(OperationContext* txn, Database* adminDb) {
bool _initialSyncClone(OperationContext* txn,
Cloner& cloner,
const std::string& host,
- const list<string>& dbs,
+ const std::string& db,
+ std::vector<BSONObj>& collections,
bool dataPass) {
- for (list<string>::const_iterator i = dbs.begin(); i != dbs.end(); i++) {
- const string db = *i;
- if (db == "local")
- continue;
-
- if (dataPass)
- log() << "initial sync cloning db: " << db;
- else
- log() << "initial sync cloning indexes for : " << db;
-
- CloneOptions options;
- options.fromDB = db;
- options.slaveOk = true;
- options.useReplAuth = true;
- options.snapshot = false;
- options.syncData = dataPass;
- options.syncIndexes = !dataPass;
-
- // Make database stable
- ScopedTransaction transaction(txn, MODE_IX);
- Lock::DBLock dbWrite(txn->lockState(), db, MODE_X);
-
- Status status = cloner.copyDb(txn, db, host, options, NULL);
- if (!status.isOK()) {
- log() << "initial sync: error while " << (dataPass ? "cloning " : "indexing ") << db
- << ". " << status.toString();
- return false;
- }
+ if (db == "local")
+ return true;
- if (db == "admin") {
- checkAdminDatabasePostClone(txn, dbHolder().get(txn, db));
- }
+ if (dataPass)
+ log() << "initial sync cloning db: " << db;
+ else
+ log() << "initial sync cloning indexes for : " << db;
+
+ CloneOptions options;
+ options.fromDB = db;
+ options.slaveOk = true;
+ options.useReplAuth = true;
+ options.snapshot = false;
+ options.syncData = dataPass;
+ options.syncIndexes = !dataPass;
+ options.createCollections = false;
+
+ // Make database stable
+ ScopedTransaction transaction(txn, MODE_IX);
+ Lock::DBLock dbWrite(txn->lockState(), db, MODE_X);
+
+ Status status = cloner.copyDb(txn, db, host, options, nullptr, collections);
+ if (!status.isOK()) {
+ log() << "initial sync: error while " << (dataPass ? "cloning " : "indexing ") << db
+ << ". " << status.toString();
+ return false;
}
+ if (dataPass && (db == "admin")) {
+ checkAdminDatabasePostClone(txn, dbHolder().get(txn, db));
+ }
return true;
}
@@ -366,11 +364,42 @@ Status _initialSync() {
if (admin != dbs.end()) {
dbs.splice(dbs.begin(), dbs, admin);
}
+ // Ignore local db
+ dbs.erase(std::remove(dbs.begin(), dbs.end(), "local"), dbs.end());
}
Cloner cloner;
- if (!_initialSyncClone(&txn, cloner, r.conn()->getServerAddress(), dbs, true)) {
- return Status(ErrorCodes::InitialSyncFailure, "initial sync failed data cloning");
+ std::map<std::string, std::vector<BSONObj>> collectionsPerDb;
+ for (auto&& db : dbs) {
+ CloneOptions options;
+ options.fromDB = db;
+ log() << "fetching and creating collections for " << db;
+ std::list<BSONObj> initialCollections =
+ r.conn()->getCollectionInfos(options.fromDB); // may uassert
+ auto fetchStatus = cloner.filterCollectionsForClone(options, initialCollections);
+ if (!fetchStatus.isOK()) {
+ return fetchStatus.getStatus();
+ }
+ auto collections = fetchStatus.getValue();
+
+ ScopedTransaction transaction(&txn, MODE_IX);
+ Lock::DBLock dbWrite(txn.lockState(), db, MODE_X);
+
+ auto createStatus = cloner.createCollectionsForDb(&txn, collections, db);
+ if (!createStatus.isOK()) {
+ return createStatus;
+ }
+ collectionsPerDb.emplace(db, std::move(collections));
+ }
+ for (auto&& dbCollsPair : collectionsPerDb) {
+ if (!_initialSyncClone(&txn,
+ cloner,
+ r.conn()->getServerAddress(),
+ dbCollsPair.first,
+ dbCollsPair.second,
+ true)) {
+ return Status(ErrorCodes::InitialSyncFailure, "initial sync failed data cloning");
+ }
}
log() << "initial sync data copy, starting syncup";
@@ -388,9 +417,9 @@ Status _initialSync() {
str::stream() << "initial sync failed: " << msg);
}
- // Now we sync to the latest op on the sync target _again_, as we may have recloned ops
- // that were "from the future" from the data clone. During this second application,
- // nothing should need to be recloned.
+ // Now we sync to the latest op on the sync target _again_, as we may have recloned ops that
+ // were "from the future" from the data clone. During this second application, nothing should
+ // need to be recloned.
// TODO: replace with "tail" instance below, since we don't need to retry/reclone missing docs.
msg = "oplog sync 2 of 3";
log() << msg;
@@ -402,15 +431,21 @@ Status _initialSync() {
msg = "initial sync building indexes";
log() << msg;
- if (!_initialSyncClone(&txn, cloner, r.conn()->getServerAddress(), dbs, false)) {
- return Status(ErrorCodes::InitialSyncFailure,
- str::stream() << "initial sync failed: " << msg);
+ for (auto&& dbCollsPair : collectionsPerDb) {
+ if (!_initialSyncClone(&txn,
+ cloner,
+ r.conn()->getServerAddress(),
+ dbCollsPair.first,
+ dbCollsPair.second,
+ false)) {
+ return Status(ErrorCodes::InitialSyncFailure,
+ str::stream() << "initial sync failed: " << msg);
+ }
}
- // WARNING: If the 3rd oplog sync step is removed we must reset minValid
- // to the last entry on the source server so that we don't come
- // out of recovering until we get there (since the previous steps
- // could have fetched newer document than the oplog entry we were applying from).
+ // WARNING: If the 3rd oplog sync step is removed we must reset minValid to the last entry on
+ // the source server so that we don't come out of recovering until we get there (since the
+ // previous steps could have fetched newer document than the oplog entry we were applying from).
msg = "oplog sync 3 of 3";
log() << msg;
@@ -436,8 +471,7 @@ Status _initialSync() {
OpTime lastOpTimeWritten(getGlobalReplicationCoordinator()->getMyLastAppliedOpTime());
log() << "set minValid=" << lastOpTimeWritten;
- // Initial sync is now complete. Flag this by setting minValid to the last thing
- // we synced.
+ // Initial sync is now complete. Flag this by setting minValid to the last thing we synced.
StorageInterface::get(&txn)->setMinValid(&txn, lastOpTimeWritten, DurableRequirement::None);
BackgroundSync::get()->setInitialSyncRequestedFlag(false);
}