summaryrefslogtreecommitdiff
path: root/src/mongo/db/cloner.cpp
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2020-04-16 06:10:04 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-04-16 10:39:18 +0000
commitfc99a1a80f0d4cc063d47db3cacbba462bf69154 (patch)
tree4d3b75e8a89ed41d8bc87ea5c25f918d7ed0f79c /src/mongo/db/cloner.cpp
parentabd513345c01e236a95641f205ac0e1ff4444141 (diff)
downloadmongo-fc99a1a80f0d4cc063d47db3cacbba462bf69154.tar.gz
SERVER-47438 Cloner::copyDb() always copies the same database from a remote host
Diffstat (limited to 'src/mongo/db/cloner.cpp')
-rw-r--r--src/mongo/db/cloner.cpp273
1 files changed, 99 insertions, 174 deletions
diff --git a/src/mongo/db/cloner.cpp b/src/mongo/db/cloner.cpp
index 69626a3e4b7..646685701bf 100644
--- a/src/mongo/db/cloner.cpp
+++ b/src/mongo/db/cloner.cpp
@@ -74,9 +74,6 @@
namespace mongo {
-using std::string;
-using std::unique_ptr;
-
using IndexVersion = IndexDescriptor::IndexVersion;
MONGO_FAIL_POINT_DEFINE(movePrimaryFailPoint);
@@ -98,68 +95,47 @@ BSONObj Cloner::_getIdIndexSpec(const std::list<BSONObj>& indexSpecs) {
Cloner::Cloner() {}
struct Cloner::Fun {
- Fun(OperationContext* opCtx, const string& dbName)
+ Fun(OperationContext* opCtx, const std::string& dbName)
: lastLog(0), opCtx(opCtx), _dbName(dbName) {}
void operator()(DBClientCursorBatchIterator& i) {
boost::optional<Lock::DBLock> dbLock;
dbLock.emplace(opCtx, _dbName, MODE_X);
- uassert(
- ErrorCodes::NotMaster,
- str::stream() << "Not primary while cloning collection " << from_collection.ns()
- << " to " << to_collection.ns(),
- !opCtx->writesAreReplicated() ||
- repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesFor(opCtx, to_collection));
+ uassert(ErrorCodes::NotMaster,
+ str::stream() << "Not primary while cloning collection " << nss,
+ !opCtx->writesAreReplicated() ||
+ repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesFor(opCtx, nss));
// Make sure database still exists after we resume from the temp release
auto databaseHolder = DatabaseHolder::get(opCtx);
auto db = databaseHolder->openDb(opCtx, _dbName);
-
- bool createdCollection = false;
- Collection* collection = nullptr;
-
- collection =
- CollectionCatalog::get(opCtx).lookupCollectionByNamespace(opCtx, to_collection);
+ auto collection = CollectionCatalog::get(opCtx).lookupCollectionByNamespace(opCtx, nss);
if (!collection) {
- massert(17321,
- str::stream() << "collection dropped during clone [" << to_collection.ns()
- << "]",
- !createdCollection);
- writeConflictRetry(opCtx, "createCollection", to_collection.ns(), [&] {
+ writeConflictRetry(opCtx, "createCollection", nss.ns(), [&] {
opCtx->checkForInterrupt();
WriteUnitOfWork wunit(opCtx);
const bool createDefaultIndexes = true;
CollectionOptions collectionOptions = uassertStatusOK(CollectionOptions::parse(
from_options, CollectionOptions::ParseKind::parseForCommand));
- invariant(db->userCreateNS(opCtx,
- to_collection,
- collectionOptions,
- createDefaultIndexes,
- from_id_index),
- str::stream() << "collection creation failed during clone ["
- << to_collection.ns() << "]");
+ invariant(db->userCreateNS(
+ opCtx, nss, collectionOptions, createDefaultIndexes, from_id_index),
+ str::stream()
+ << "collection creation failed during clone [" << nss << "]");
wunit.commit();
- collection =
- CollectionCatalog::get(opCtx).lookupCollectionByNamespace(opCtx, to_collection);
+ collection = CollectionCatalog::get(opCtx).lookupCollectionByNamespace(opCtx, nss);
invariant(collection,
- str::stream()
- << "Missing collection during clone [" << to_collection.ns() << "]");
+ str::stream() << "Missing collection during clone [" << nss << "]");
});
}
- const bool isSystemViewsClone = to_collection.isSystemDotViews();
-
while (i.moreInCurrentBatch()) {
if (numSeen % 128 == 127) {
time_t now = time(nullptr);
if (now - lastLog >= 60) {
// report progress
if (lastLog)
- LOGV2(20412,
- "clone {to_collection} {numSeen}",
- "to_collection"_attr = to_collection,
- "numSeen"_attr = numSeen);
+ LOGV2(20412, "clone", "ns"_attr = nss, "numSeen"_attr = numSeen);
lastLog = now;
}
opCtx->checkForInterrupt();
@@ -172,11 +148,10 @@ struct Cloner::Fun {
// Check if everything is still all right.
if (opCtx->writesAreReplicated()) {
- uassert(ErrorCodes::PrimarySteppedDown,
- str::stream() << "Cannot write to ns: " << to_collection.ns()
- << " after yielding",
- repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesFor(
- opCtx, to_collection));
+ uassert(
+ ErrorCodes::PrimarySteppedDown,
+ str::stream() << "Cannot write to ns: " << nss << " after yielding",
+ repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesFor(opCtx, nss));
}
db = databaseHolder->getDb(opCtx, _dbName);
@@ -184,41 +159,21 @@ struct Cloner::Fun {
str::stream() << "Database " << _dbName << " dropped while cloning",
db != nullptr);
- collection =
- CollectionCatalog::get(opCtx).lookupCollectionByNamespace(opCtx, to_collection);
+ collection = CollectionCatalog::get(opCtx).lookupCollectionByNamespace(opCtx, nss);
uassert(28594,
- str::stream()
- << "Collection " << to_collection.ns() << " dropped while cloning",
+ str::stream() << "Collection " << nss << " dropped while cloning",
collection != nullptr);
}
BSONObj tmp = i.nextSafe();
- // If copying the system.views collection to a database with a different name, then any
- // view definitions must be modified to refer to the 'to' database.
- if (isSystemViewsClone && from_collection.db() != to_collection.db()) {
- BSONObjBuilder bob;
- for (auto&& item : tmp) {
- if (item.fieldNameStringData() == "_id") {
- auto viewNss = NamespaceString(item.checkAndGetStringData());
-
- bob.append("_id",
- NamespaceString(to_collection.db(), viewNss.coll()).toString());
- } else {
- bob.append(item);
- }
- }
- tmp = bob.obj();
- }
-
/* assure object is valid. note this will slow us down a little. */
// Use the latest BSON validation version. We allow cloning of collections containing
// decimal data even if decimal is disabled.
const Status status = validateBSON(tmp.objdata(), tmp.objsize(), BSONVersion::kLatest);
if (!status.isOK()) {
str::stream ss;
- ss << "Cloner: found corrupt document in " << from_collection.toString() << ": "
- << redact(status);
+ ss << "Cloner: found corrupt document in " << nss << ": " << redact(status);
if (gSkipCorruptDocumentsWhenCloning.load()) {
LOGV2_WARNING(20423, "{ss_ss_str}; skipping", "ss_ss_str"_attr = ss.ss.str());
continue;
@@ -229,7 +184,7 @@ struct Cloner::Fun {
verify(collection);
++numSeen;
- writeConflictRetry(opCtx, "cloner insert", to_collection.ns(), [&] {
+ writeConflictRetry(opCtx, "cloner insert", nss.ns(), [&] {
opCtx->checkForInterrupt();
WriteUnitOfWork wunit(opCtx);
@@ -239,12 +194,11 @@ struct Cloner::Fun {
Status status =
collection->insertDocument(opCtx, InsertStatement(doc), nullOpDebug, true);
if (!status.isOK() && status.code() != ErrorCodes::DuplicateKey) {
- LOGV2_ERROR(
- 20424,
- "error: exception cloning object in {from_collection} {status} obj:{doc}",
- "from_collection"_attr = from_collection,
- "status"_attr = redact(status),
- "doc"_attr = redact(doc));
+ LOGV2_ERROR(20424,
+ "error: exception cloning object",
+ "ns"_attr = nss,
+ "status"_attr = redact(status),
+ "doc"_attr = redact(doc));
uassertStatusOK(status);
}
if (status.isOK()) {
@@ -255,9 +209,9 @@ struct Cloner::Fun {
static Rarely sampler;
if (sampler.tick() && (time(nullptr) - saveLast > 60)) {
LOGV2(20413,
- "{numSeen} objects cloned so far from collection {from_collection}",
+ "objects cloned so far from collection",
"numSeen"_attr = numSeen,
- "from_collection"_attr = from_collection);
+ "ns"_attr = nss);
saveLast = time(nullptr);
}
}
@@ -265,79 +219,73 @@ struct Cloner::Fun {
time_t lastLog;
OperationContext* opCtx;
- const string _dbName;
+ const std::string _dbName;
int64_t numSeen;
- NamespaceString from_collection;
+ NamespaceString nss;
BSONObj from_options;
BSONObj from_id_index;
- NamespaceString to_collection;
time_t saveLast;
};
/* copy the specified collection
*/
void Cloner::_copy(OperationContext* opCtx,
- const string& toDBName,
- const NamespaceString& from_collection,
+ const std::string& toDBName,
+ const NamespaceString& nss,
const BSONObj& from_opts,
const BSONObj& from_id_index,
- const NamespaceString& to_collection,
- Query query) {
+ Query query,
+ DBClientBase* conn) {
LOGV2_DEBUG(20414,
2,
- "\t\tcloning collection {from_collection} to {to_collection} on "
- "{conn_getServerAddress} with filter {query}",
- "from_collection"_attr = from_collection,
- "to_collection"_attr = to_collection,
- "conn_getServerAddress"_attr = _conn->getServerAddress(),
+ "\t\tcloning collection with filter",
+ "ns"_attr = nss,
+ "conn_getServerAddress"_attr = conn->getServerAddress(),
"query"_attr = redact(query.toString()));
Fun f(opCtx, toDBName);
f.numSeen = 0;
- f.from_collection = from_collection;
+ f.nss = nss;
f.from_options = from_opts;
f.from_id_index = from_id_index;
- f.to_collection = to_collection;
f.saveLast = time(nullptr);
int options = QueryOption_NoCursorTimeout | QueryOption_Exhaust;
{
Lock::TempRelease tempRelease(opCtx->lockState());
- _conn->query(std::function<void(DBClientCursorBatchIterator&)>(f),
- from_collection,
- query,
- nullptr,
- options,
- 0 /* batchSize */,
- repl::ReadConcernArgs::kImplicitDefault);
+ conn->query(std::function<void(DBClientCursorBatchIterator&)>(f),
+ nss,
+ query,
+ nullptr,
+ options,
+ 0 /* batchSize */,
+ repl::ReadConcernArgs::kImplicitDefault);
}
uassert(ErrorCodes::PrimarySteppedDown,
- str::stream() << "Not primary while cloning collection " << from_collection.ns()
- << " to " << to_collection.ns() << " with filter " << query.toString(),
+ str::stream() << "Not primary while cloning collection " << nss.ns() << " with filter "
+ << query.toString(),
!opCtx->writesAreReplicated() ||
- repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesFor(opCtx, to_collection));
+ repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesFor(opCtx, nss));
}
void Cloner::_copyIndexes(OperationContext* opCtx,
- const string& toDBName,
- const NamespaceString& from_collection,
+ const std::string& toDBName,
+ const NamespaceString& nss,
const BSONObj& from_opts,
const std::list<BSONObj>& from_indexes,
- const NamespaceString& to_collection) {
+ DBClientBase* conn) {
LOGV2_DEBUG(20415,
2,
- "\t\t copyIndexes {from_collection} to {to_collection} on {conn_getServerAddress}",
- "from_collection"_attr = from_collection,
- "to_collection"_attr = to_collection,
- "conn_getServerAddress"_attr = _conn->getServerAddress());
+ "\t\t copyIndexes",
+ "ns"_attr = nss,
+ "conn_getServerAddress"_attr = conn->getServerAddress());
uassert(ErrorCodes::PrimarySteppedDown,
- str::stream() << "Not primary while copying indexes from " << from_collection.ns()
- << " to " << to_collection.ns() << " (Cloner)",
+ str::stream() << "Not primary while copying indexes from " << nss << " (Cloner)",
!opCtx->writesAreReplicated() ||
- repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesFor(opCtx, to_collection));
+ repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesFor(opCtx, nss));
if (from_indexes.empty())
return;
@@ -347,10 +295,9 @@ void Cloner::_copyIndexes(OperationContext* opCtx,
auto databaseHolder = DatabaseHolder::get(opCtx);
auto db = databaseHolder->openDb(opCtx, toDBName);
- Collection* collection =
- CollectionCatalog::get(opCtx).lookupCollectionByNamespace(opCtx, to_collection);
+ Collection* collection = CollectionCatalog::get(opCtx).lookupCollectionByNamespace(opCtx, nss);
if (!collection) {
- writeConflictRetry(opCtx, "createCollection", to_collection.ns(), [&] {
+ writeConflictRetry(opCtx, "createCollection", nss.ns(), [&] {
opCtx->checkForInterrupt();
WriteUnitOfWork wunit(opCtx);
@@ -358,18 +305,15 @@ void Cloner::_copyIndexes(OperationContext* opCtx,
CollectionOptions::parse(from_opts, CollectionOptions::ParseKind::parseForCommand));
const bool createDefaultIndexes = true;
invariant(db->userCreateNS(opCtx,
- to_collection,
+ nss,
collectionOptions,
createDefaultIndexes,
_getIdIndexSpec(from_indexes)),
- str::stream()
- << "Collection creation failed while copying indexes from "
- << from_collection.ns() << " to " << to_collection.ns() << " (Cloner)");
+ str::stream() << "Collection creation failed while copying indexes from "
+ << nss << " (Cloner)");
wunit.commit();
- collection =
- CollectionCatalog::get(opCtx).lookupCollectionByNamespace(opCtx, to_collection);
- invariant(collection,
- str::stream() << "Missing collection " << to_collection.ns() << " (Cloner)");
+ collection = CollectionCatalog::get(opCtx).lookupCollectionByNamespace(opCtx, nss);
+ invariant(collection, str::stream() << "Missing collection " << nss << " (Cloner)");
});
}
@@ -413,7 +357,7 @@ void Cloner::_copyIndexes(OperationContext* opCtx,
uassertStatusOK(indexbuildentryhelpers::addIndexBuildEntry(opCtx, indexbuildEntry));
opObserver->onStartIndexBuild(
- opCtx, to_collection, collection->uuid(), *buildUUID, specs, fromMigrate);
+ opCtx, nss, collection->uuid(), *buildUUID, specs, fromMigrate);
return Status::OK();
};
} else {
@@ -581,12 +525,11 @@ Status Cloner::_createCollectionsForDb(
}
Status Cloner::copyDb(OperationContext* opCtx,
- const std::string& toDBName,
+ const std::string& dBName,
const std::string& masterHost,
- const std::string& fromDBName,
const std::vector<NamespaceString>& shardedColls,
std::set<std::string>* clonedColls) {
- invariant(clonedColls, str::stream() << masterHost << ":" << toDBName);
+ invariant(clonedColls, str::stream() << masterHost << ":" << dBName);
auto statusWithMasterHost = ConnectionString::parse(masterHost);
if (!statusWithMasterHost.isOK()) {
@@ -607,33 +550,21 @@ Status Cloner::copyDb(OperationContext* opCtx,
}
if (masterSameProcess) {
- if (fromDBName == toDBName) {
- // Guard against re-entrance
- return Status(ErrorCodes::IllegalOperation, "can't clone from self (localhost)");
- }
+ // Guard against re-entrance
+ return Status(ErrorCodes::IllegalOperation, "can't clone from self (localhost)");
}
- {
- // setup connection
- if (_conn.get()) {
- // nothing to do
- } else if (!masterSameProcess) {
- std::string errmsg;
- unique_ptr<DBClientBase> con(cs.connect(StringData(), errmsg));
- if (!con.get()) {
- return Status(ErrorCodes::HostUnreachable, errmsg);
- }
-
- if (auth::isInternalAuthSet()) {
- auto authStatus = con->authenticateInternalUser();
- if (!authStatus.isOK()) {
- return authStatus;
- }
- }
+ // Set up connection.
+ std::string errmsg;
+ std::unique_ptr<DBClientBase> conn(cs.connect(StringData(), errmsg));
+ if (!conn.get()) {
+ return Status(ErrorCodes::HostUnreachable, errmsg);
+ }
- _conn = std::move(con);
- } else {
- _conn.reset(new DBDirectClient(opCtx));
+ if (auth::isInternalAuthSet()) {
+ auto authStatus = conn->authenticateInternalUser();
+ if (!authStatus.isOK()) {
+ return authStatus;
}
}
@@ -646,10 +577,10 @@ Status Cloner::copyDb(OperationContext* opCtx,
// the global lock that we are entering with.
Lock::TempRelease tempRelease(opCtx->lockState());
- std::list<BSONObj> initialCollections = _conn->getCollectionInfos(
- fromDBName, ListCollectionsFilter::makeTypeCollectionFilter());
+ std::list<BSONObj> initialCollections =
+ conn->getCollectionInfos(dBName, ListCollectionsFilter::makeTypeCollectionFilter());
- auto status = _filterCollectionsForClone(fromDBName, initialCollections);
+ auto status = _filterCollectionsForClone(dBName, initialCollections);
if (!status.isOK()) {
return status.getStatus();
}
@@ -665,7 +596,7 @@ Status Cloner::copyDb(OperationContext* opCtx,
params.idIndexSpec = idIndex.Obj();
}
- const NamespaceString ns(fromDBName, params.collectionName);
+ const NamespaceString ns(dBName, params.collectionName);
if (std::find(shardedColls.begin(), shardedColls.end(), ns) != shardedColls.end()) {
params.shardedColl = true;
}
@@ -677,8 +608,8 @@ Status Cloner::copyDb(OperationContext* opCtx,
{
Lock::TempRelease tempRelease(opCtx->lockState());
for (auto&& params : createCollectionParams) {
- const NamespaceString nss(fromDBName, params.collectionName);
- auto indexSpecs = _conn->getIndexSpecs(nss);
+ const NamespaceString nss(dBName, params.collectionName);
+ auto indexSpecs = conn->getIndexSpecs(nss);
collectionIndexSpecs[params.collectionName] = indexSpecs;
@@ -690,12 +621,12 @@ Status Cloner::copyDb(OperationContext* opCtx,
uassert(
ErrorCodes::NotMaster,
- str::stream() << "Not primary while cloning database " << fromDBName
+ str::stream() << "Not primary while cloning database " << dBName
<< " (after getting list of collections to clone)",
!opCtx->writesAreReplicated() ||
- repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesForDatabase(opCtx, toDBName));
+ repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesForDatabase(opCtx, dBName));
- auto status = _createCollectionsForDb(opCtx, createCollectionParams, toDBName);
+ auto status = _createCollectionsForDb(opCtx, createCollectionParams, dBName);
if (!status.isOK()) {
return status;
}
@@ -710,24 +641,19 @@ Status Cloner::copyDb(OperationContext* opCtx,
" really will clone: {params_collectionInfo}",
"params_collectionInfo"_attr = params.collectionInfo);
- const NamespaceString from_name(fromDBName, params.collectionName);
- const NamespaceString to_name(toDBName, params.collectionName);
+ const NamespaceString nss(dBName, params.collectionName);
- clonedColls->insert(from_name.ns());
+ clonedColls->insert(nss.ns());
- LOGV2_DEBUG(20421,
- 1,
- "\t\t cloning {from_name} -> {to_name}",
- "from_name"_attr = from_name,
- "to_name"_attr = to_name);
+ LOGV2_DEBUG(20421, 1, "\t\t cloning", "ns"_attr = nss, "host"_attr = masterHost);
_copy(opCtx,
- toDBName,
- from_name,
+ dBName,
+ nss,
params.collectionInfo["options"].Obj(),
params.idIndexSpec,
- to_name,
- Query());
+ Query(),
+ conn.get());
}
// now build the secondary indexes
@@ -736,16 +662,15 @@ Status Cloner::copyDb(OperationContext* opCtx,
"copying indexes for: {params_collectionInfo}",
"params_collectionInfo"_attr = params.collectionInfo);
- const NamespaceString from_name(fromDBName, params.collectionName);
- const NamespaceString to_name(toDBName, params.collectionName);
+ const NamespaceString nss(dBName, params.collectionName);
_copyIndexes(opCtx,
- toDBName,
- from_name,
+ dBName,
+ nss,
params.collectionInfo["options"].Obj(),
collectionIndexSpecs[params.collectionName],
- to_name);
+ conn.get());
}
return Status::OK();