summaryrefslogtreecommitdiff
path: root/src/mongo/db/cloner.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/cloner.cpp')
-rw-r--r--src/mongo/db/cloner.cpp144
1 files changed, 73 insertions, 71 deletions
diff --git a/src/mongo/db/cloner.cpp b/src/mongo/db/cloner.cpp
index 6aa2f1cdb33..439d3ac574c 100644
--- a/src/mongo/db/cloner.cpp
+++ b/src/mongo/db/cloner.cpp
@@ -137,24 +137,25 @@ BSONObj Cloner::getIdIndexSpec(const std::list<BSONObj>& indexSpecs) {
Cloner::Cloner() {}
struct Cloner::Fun {
- Fun(OperationContext* txn, const string& dbName) : lastLog(0), txn(txn), _dbName(dbName) {}
+ Fun(OperationContext* opCtx, const string& dbName)
+ : lastLog(0), opCtx(opCtx), _dbName(dbName) {}
void operator()(DBClientCursorBatchIterator& i) {
invariant(from_collection.coll() != "system.indexes");
// XXX: can probably take dblock instead
- unique_ptr<ScopedTransaction> scopedXact(new ScopedTransaction(txn, MODE_X));
- unique_ptr<Lock::GlobalWrite> globalWriteLock(new Lock::GlobalWrite(txn->lockState()));
+ unique_ptr<ScopedTransaction> scopedXact(new ScopedTransaction(opCtx, MODE_X));
+ unique_ptr<Lock::GlobalWrite> globalWriteLock(new Lock::GlobalWrite(opCtx->lockState()));
uassert(
ErrorCodes::NotMaster,
str::stream() << "Not primary while cloning collection " << from_collection.ns()
<< " to "
<< to_collection.ns(),
- !txn->writesAreReplicated() ||
- repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(txn, to_collection));
+ !opCtx->writesAreReplicated() ||
+ repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(opCtx, to_collection));
// Make sure database still exists after we resume from the temp release
- Database* db = dbHolder().openDb(txn, _dbName);
+ Database* db = dbHolder().openDb(opCtx, _dbName);
bool createdCollection = false;
Collection* collection = NULL;
@@ -166,10 +167,10 @@ struct Cloner::Fun {
<< "]",
!createdCollection);
MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
- txn->checkForInterrupt();
+ opCtx->checkForInterrupt();
- WriteUnitOfWork wunit(txn);
- Status s = userCreateNS(txn,
+ WriteUnitOfWork wunit(opCtx);
+ Status s = userCreateNS(opCtx,
db,
to_collection.toString(),
from_options,
@@ -179,7 +180,7 @@ struct Cloner::Fun {
wunit.commit();
collection = db->getCollection(to_collection);
}
- MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "createCollection", to_collection.ns());
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_END(opCtx, "createCollection", to_collection.ns());
}
const bool isSystemViewsClone = to_collection.isSystemDotViews();
@@ -193,27 +194,27 @@ struct Cloner::Fun {
log() << "clone " << to_collection << ' ' << numSeen;
lastLog = now;
}
- txn->checkForInterrupt();
+ opCtx->checkForInterrupt();
scopedXact.reset();
globalWriteLock.reset();
- CurOp::get(txn)->yielded();
+ CurOp::get(opCtx)->yielded();
- scopedXact.reset(new ScopedTransaction(txn, MODE_X));
- globalWriteLock.reset(new Lock::GlobalWrite(txn->lockState()));
+ scopedXact.reset(new ScopedTransaction(opCtx, MODE_X));
+ globalWriteLock.reset(new Lock::GlobalWrite(opCtx->lockState()));
// Check if everything is still all right.
- if (txn->writesAreReplicated()) {
+ if (opCtx->writesAreReplicated()) {
uassert(28592,
str::stream() << "Cannot write to ns: " << to_collection.ns()
<< " after yielding",
repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(
- txn, to_collection));
+ opCtx, to_collection));
}
// TODO: SERVER-16598 abort if original db or collection is gone.
- db = dbHolder().get(txn, _dbName);
+ db = dbHolder().get(opCtx, _dbName);
uassert(28593,
str::stream() << "Database " << _dbName << " dropped while cloning",
db != NULL);
@@ -262,13 +263,13 @@ struct Cloner::Fun {
verify(collection);
++numSeen;
MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
- txn->checkForInterrupt();
+ opCtx->checkForInterrupt();
- WriteUnitOfWork wunit(txn);
+ WriteUnitOfWork wunit(opCtx);
BSONObj doc = tmp;
OpDebug* const nullOpDebug = nullptr;
- Status status = collection->insertDocument(txn, doc, nullOpDebug, true);
+ Status status = collection->insertDocument(opCtx, doc, nullOpDebug, true);
if (!status.isOK() && status.code() != ErrorCodes::DuplicateKey) {
error() << "error: exception cloning object in " << from_collection << ' '
<< redact(status) << " obj:" << redact(doc);
@@ -278,7 +279,7 @@ struct Cloner::Fun {
wunit.commit();
}
}
- MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "cloner insert", to_collection.ns());
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_END(opCtx, "cloner insert", to_collection.ns());
RARELY if (time(0) - saveLast > 60) {
log() << numSeen << " objects cloned so far from collection " << from_collection;
saveLast = time(0);
@@ -299,7 +300,7 @@ struct Cloner::Fun {
}
time_t lastLog;
- OperationContext* txn;
+ OperationContext* opCtx;
const string _dbName;
int64_t numSeen;
@@ -313,7 +314,7 @@ struct Cloner::Fun {
/* copy the specified collection
*/
-void Cloner::copy(OperationContext* txn,
+void Cloner::copy(OperationContext* opCtx,
const string& toDBName,
const NamespaceString& from_collection,
const BSONObj& from_opts,
@@ -324,7 +325,7 @@ void Cloner::copy(OperationContext* txn,
LOG(2) << "\t\tcloning collection " << from_collection << " to " << to_collection << " on "
<< _conn->getServerAddress() << " with filter " << redact(query.toString());
- Fun f(txn, toDBName);
+ Fun f(opCtx, toDBName);
f.numSeen = 0;
f.from_collection = from_collection;
f.from_options = from_opts;
@@ -335,7 +336,7 @@ void Cloner::copy(OperationContext* txn,
int options = QueryOption_NoCursorTimeout | (opts.slaveOk ? QueryOption_SlaveOk : 0);
{
- Lock::TempRelease tempRelease(txn->lockState());
+ Lock::TempRelease tempRelease(opCtx->lockState());
_conn->query(stdx::function<void(DBClientCursorBatchIterator&)>(f),
from_collection.ns(),
query,
@@ -349,11 +350,11 @@ void Cloner::copy(OperationContext* txn,
<< to_collection.ns()
<< " with filter "
<< query.toString(),
- !txn->writesAreReplicated() ||
- repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(txn, to_collection));
+ !opCtx->writesAreReplicated() ||
+ repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(opCtx, to_collection));
}
-void Cloner::copyIndexes(OperationContext* txn,
+void Cloner::copyIndexes(OperationContext* opCtx,
const string& toDBName,
const NamespaceString& from_collection,
const BSONObj& from_opts,
@@ -372,8 +373,8 @@ void Cloner::copyIndexes(OperationContext* txn,
<< " to "
<< to_collection.ns()
<< " (Cloner)",
- !txn->writesAreReplicated() ||
- repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(txn, to_collection));
+ !opCtx->writesAreReplicated() ||
+ repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(opCtx, to_collection));
if (indexesToBuild.empty())
@@ -381,16 +382,16 @@ void Cloner::copyIndexes(OperationContext* txn,
// We are under lock here again, so reload the database in case it may have disappeared
// during the temp release
- Database* db = dbHolder().openDb(txn, toDBName);
+ Database* db = dbHolder().openDb(opCtx, toDBName);
Collection* collection = db->getCollection(to_collection);
if (!collection) {
MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
- txn->checkForInterrupt();
+ opCtx->checkForInterrupt();
- WriteUnitOfWork wunit(txn);
+ WriteUnitOfWork wunit(opCtx);
Status s = userCreateNS(
- txn,
+ opCtx,
db,
to_collection.toString(),
from_opts,
@@ -401,7 +402,7 @@ void Cloner::copyIndexes(OperationContext* txn,
invariant(collection);
wunit.commit();
}
- MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "createCollection", to_collection.ns());
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_END(opCtx, "createCollection", to_collection.ns());
}
// TODO pass the MultiIndexBlock when inserting into the collection rather than building the
@@ -409,7 +410,7 @@ void Cloner::copyIndexes(OperationContext* txn,
// from creation to completion without yielding to ensure the index and the collection
// matches. It also wouldn't work on non-empty collections so we would need both
// implementations anyway as long as that is supported.
- MultiIndexBlock indexer(txn, collection);
+ MultiIndexBlock indexer(opCtx, collection);
indexer.allowInterruption();
indexer.removeExistingIndexes(&indexesToBuild);
@@ -419,20 +420,20 @@ void Cloner::copyIndexes(OperationContext* txn,
auto indexInfoObjs = uassertStatusOK(indexer.init(indexesToBuild));
uassertStatusOK(indexer.insertAllDocumentsInCollection());
- WriteUnitOfWork wunit(txn);
+ WriteUnitOfWork wunit(opCtx);
indexer.commit();
- if (txn->writesAreReplicated()) {
+ if (opCtx->writesAreReplicated()) {
const string targetSystemIndexesCollectionName = to_collection.getSystemIndexesCollection();
const char* createIndexNs = targetSystemIndexesCollectionName.c_str();
for (auto&& infoObj : indexInfoObjs) {
getGlobalServiceContext()->getOpObserver()->onCreateIndex(
- txn, createIndexNs, infoObj, false);
+ opCtx, createIndexNs, infoObj, false);
}
}
wunit.commit();
}
-bool Cloner::copyCollection(OperationContext* txn,
+bool Cloner::copyCollection(OperationContext* opCtx,
const string& ns,
const BSONObj& query,
string& errmsg,
@@ -474,22 +475,22 @@ bool Cloner::copyCollection(OperationContext* txn,
auto sourceIndexes = _conn->getIndexSpecs(nss.ns(), QueryOption_SlaveOk);
auto idIndexSpec = getIdIndexSpec(sourceIndexes);
- ScopedTransaction transaction(txn, MODE_IX);
- Lock::DBLock dbWrite(txn->lockState(), dbname, MODE_X);
+ ScopedTransaction transaction(opCtx, MODE_IX);
+ Lock::DBLock dbWrite(opCtx->lockState(), dbname, MODE_X);
uassert(ErrorCodes::PrimarySteppedDown,
str::stream() << "Not primary while copying collection " << ns << " (Cloner)",
- !txn->writesAreReplicated() ||
- repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(txn, nss));
+ !opCtx->writesAreReplicated() ||
+ repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(opCtx, nss));
- Database* db = dbHolder().openDb(txn, dbname);
+ Database* db = dbHolder().openDb(opCtx, dbname);
if (shouldCreateCollection) {
MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
- txn->checkForInterrupt();
+ opCtx->checkForInterrupt();
- WriteUnitOfWork wunit(txn);
- Status status = userCreateNS(txn, db, ns, options, true, idIndexSpec);
+ WriteUnitOfWork wunit(opCtx);
+ Status status = userCreateNS(opCtx, db, ns, options, true, idIndexSpec);
if (!status.isOK()) {
errmsg = status.toString();
// abort write unit of work
@@ -497,7 +498,7 @@ bool Cloner::copyCollection(OperationContext* txn,
}
wunit.commit();
}
- MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "createCollection", ns);
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_END(opCtx, "createCollection", ns);
} else {
LOG(1) << "No collection info found for ns:" << nss.toString()
<< ", host:" << _conn->getServerAddress();
@@ -506,7 +507,7 @@ bool Cloner::copyCollection(OperationContext* txn,
// main data
CloneOptions opts;
opts.slaveOk = true;
- copy(txn, dbname, nss, options, idIndexSpec, nss, opts, Query(query).snapshot());
+ copy(opCtx, dbname, nss, options, idIndexSpec, nss, opts, Query(query).snapshot());
/* TODO : copyIndexes bool does not seem to be implemented! */
if (!shouldCopyIndexes) {
@@ -514,7 +515,7 @@ bool Cloner::copyCollection(OperationContext* txn,
}
// indexes
- copyIndexes(txn, dbname, NamespaceString(ns), options, sourceIndexes, NamespaceString(ns));
+ copyIndexes(opCtx, dbname, NamespaceString(ns), options, sourceIndexes, NamespaceString(ns));
return true;
}
@@ -564,21 +565,21 @@ StatusWith<std::vector<BSONObj>> Cloner::filterCollectionsForClone(
}
Status Cloner::createCollectionsForDb(
- OperationContext* txn,
+ OperationContext* opCtx,
const std::vector<CreateCollectionParams>& createCollectionParams,
const std::string& dbName) {
- Database* db = dbHolder().openDb(txn, dbName);
+ Database* db = dbHolder().openDb(opCtx, dbName);
for (auto&& params : createCollectionParams) {
auto options = params.collectionInfo["options"].Obj();
const NamespaceString nss(dbName, params.collectionName);
uassertStatusOK(userAllowedCreateNS(dbName, params.collectionName));
MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
- txn->checkForInterrupt();
- WriteUnitOfWork wunit(txn);
+ opCtx->checkForInterrupt();
+ WriteUnitOfWork wunit(opCtx);
Status createStatus =
- userCreateNS(txn,
+ userCreateNS(opCtx,
db,
nss.ns(),
options,
@@ -590,12 +591,12 @@ Status Cloner::createCollectionsForDb(
wunit.commit();
}
- MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "createCollection", nss.ns());
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_END(opCtx, "createCollection", nss.ns());
}
return Status::OK();
}
-Status Cloner::copyDb(OperationContext* txn,
+Status Cloner::copyDb(OperationContext* opCtx,
const std::string& toDBName,
const string& masterHost,
const CloneOptions& opts,
@@ -603,7 +604,7 @@ Status Cloner::copyDb(OperationContext* txn,
std::vector<BSONObj> collectionsToClone) {
massert(10289,
"useReplAuth is not written to replication log",
- !opts.useReplAuth || !txn->writesAreReplicated());
+ !opts.useReplAuth || !opCtx->writesAreReplicated());
auto statusWithMasterHost = ConnectionString::parse(masterHost);
if (!statusWithMasterHost.isOK()) {
@@ -616,7 +617,7 @@ Status Cloner::copyDb(OperationContext* txn,
std::vector<HostAndPort> csServers = cs.getServers();
for (std::vector<HostAndPort>::const_iterator iter = csServers.begin(); iter != csServers.end();
++iter) {
- if (!repl::isSelf(*iter, txn->getServiceContext()))
+ if (!repl::isSelf(*iter, opCtx->getServiceContext()))
continue;
masterSameProcess = true;
@@ -648,7 +649,7 @@ Status Cloner::copyDb(OperationContext* txn,
_conn = std::move(con);
} else {
- _conn.reset(new DBDirectClient(txn));
+ _conn.reset(new DBDirectClient(opCtx));
}
}
@@ -661,7 +662,7 @@ Status Cloner::copyDb(OperationContext* txn,
if (opts.createCollections) {
// getCollectionInfos may make a remote call, which may block indefinitely, so release
// the global lock that we are entering with.
- Lock::TempRelease tempRelease(txn->lockState());
+ Lock::TempRelease tempRelease(opCtx->lockState());
std::list<BSONObj> initialCollections = _conn->getCollectionInfos(
opts.fromDB, ListCollectionsFilter::makeTypeCollectionFilter());
auto status = filterCollectionsForClone(opts, initialCollections);
@@ -687,7 +688,7 @@ Status Cloner::copyDb(OperationContext* txn,
// Get index specs for each collection.
std::map<StringData, std::list<BSONObj>> collectionIndexSpecs;
{
- Lock::TempRelease tempRelease(txn->lockState());
+ Lock::TempRelease tempRelease(opCtx->lockState());
for (auto&& params : createCollectionParams) {
const NamespaceString nss(opts.fromDB, params.collectionName);
auto indexSpecs =
@@ -701,15 +702,16 @@ Status Cloner::copyDb(OperationContext* txn,
}
}
- uassert(ErrorCodes::NotMaster,
- str::stream() << "Not primary while cloning database " << opts.fromDB
- << " (after getting list of collections to clone)",
- !txn->writesAreReplicated() ||
- repl::getGlobalReplicationCoordinator()->canAcceptWritesForDatabase(txn, toDBName));
+ uassert(
+ ErrorCodes::NotMaster,
+ str::stream() << "Not primary while cloning database " << opts.fromDB
+ << " (after getting list of collections to clone)",
+ !opCtx->writesAreReplicated() ||
+ repl::getGlobalReplicationCoordinator()->canAcceptWritesForDatabase(opCtx, toDBName));
if (opts.syncData) {
if (opts.createCollections) {
- Status status = createCollectionsForDb(txn, createCollectionParams, toDBName);
+ Status status = createCollectionsForDb(opCtx, createCollectionParams, toDBName);
if (!status.isOK()) {
return status;
}
@@ -729,7 +731,7 @@ Status Cloner::copyDb(OperationContext* txn,
if (opts.snapshot)
q.snapshot();
- copy(txn,
+ copy(opCtx,
toDBName,
from_name,
params.collectionInfo["options"].Obj(),
@@ -749,7 +751,7 @@ Status Cloner::copyDb(OperationContext* txn,
const NamespaceString to_name(toDBName, params.collectionName);
- copyIndexes(txn,
+ copyIndexes(opCtx,
toDBName,
from_name,
params.collectionInfo["options"].Obj(),