summaryrefslogtreecommitdiff
path: root/src/mongo/db/commands/mr.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/commands/mr.cpp')
-rw-r--r--src/mongo/db/commands/mr.cpp278
1 files changed, 144 insertions, 134 deletions
diff --git a/src/mongo/db/commands/mr.cpp b/src/mongo/db/commands/mr.cpp
index 781c0d1d5af..25e9590a6d9 100644
--- a/src/mongo/db/commands/mr.cpp
+++ b/src/mongo/db/commands/mr.cpp
@@ -372,40 +372,40 @@ Config::Config(const string& _dbname, const BSONObj& cmdObj) {
void State::dropTempCollections() {
if (!_config.tempNamespace.isEmpty()) {
MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
- ScopedTransaction scopedXact(_txn, MODE_IX);
- AutoGetDb autoDb(_txn, _config.tempNamespace.db(), MODE_X);
+ ScopedTransaction scopedXact(_opCtx, MODE_IX);
+ AutoGetDb autoDb(_opCtx, _config.tempNamespace.db(), MODE_X);
if (auto db = autoDb.getDb()) {
- WriteUnitOfWork wunit(_txn);
+ WriteUnitOfWork wunit(_opCtx);
uassert(ErrorCodes::PrimarySteppedDown,
"no longer primary",
repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(
- _txn, _config.tempNamespace));
- db->dropCollection(_txn, _config.tempNamespace.ns());
+ _opCtx, _config.tempNamespace));
+ db->dropCollection(_opCtx, _config.tempNamespace.ns());
wunit.commit();
}
}
MONGO_WRITE_CONFLICT_RETRY_LOOP_END(
- _txn, "M/R dropTempCollections", _config.tempNamespace.ns())
+ _opCtx, "M/R dropTempCollections", _config.tempNamespace.ns())
// Always forget about temporary namespaces, so we don't cache lots of them
ShardConnection::forgetNS(_config.tempNamespace.ns());
}
if (_useIncremental && !_config.incLong.isEmpty()) {
// We don't want to log the deletion of incLong as it isn't replicated. While
// harmless, this would lead to a scary looking warning on the secondaries.
- bool shouldReplicateWrites = _txn->writesAreReplicated();
- _txn->setReplicatedWrites(false);
- ON_BLOCK_EXIT(&OperationContext::setReplicatedWrites, _txn, shouldReplicateWrites);
+ bool shouldReplicateWrites = _opCtx->writesAreReplicated();
+ _opCtx->setReplicatedWrites(false);
+ ON_BLOCK_EXIT(&OperationContext::setReplicatedWrites, _opCtx, shouldReplicateWrites);
MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
- ScopedTransaction scopedXact(_txn, MODE_IX);
- Lock::DBLock lk(_txn->lockState(), _config.incLong.db(), MODE_X);
- if (Database* db = dbHolder().get(_txn, _config.incLong.ns())) {
- WriteUnitOfWork wunit(_txn);
- db->dropCollection(_txn, _config.incLong.ns());
+ ScopedTransaction scopedXact(_opCtx, MODE_IX);
+ Lock::DBLock lk(_opCtx->lockState(), _config.incLong.db(), MODE_X);
+ if (Database* db = dbHolder().get(_opCtx, _config.incLong.ns())) {
+ WriteUnitOfWork wunit(_opCtx);
+ db->dropCollection(_opCtx, _config.incLong.ns());
wunit.commit();
}
}
- MONGO_WRITE_CONFLICT_RETRY_LOOP_END(_txn, "M/R dropTempCollections", _config.incLong.ns())
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_END(_opCtx, "M/R dropTempCollections", _config.incLong.ns())
ShardConnection::forgetNS(_config.incLong.ns());
}
@@ -422,20 +422,20 @@ void State::prepTempCollection() {
if (_useIncremental) {
// Create the inc collection and make sure we have index on "0" key.
// Intentionally not replicating the inc collection to secondaries.
- bool shouldReplicateWrites = _txn->writesAreReplicated();
- _txn->setReplicatedWrites(false);
- ON_BLOCK_EXIT(&OperationContext::setReplicatedWrites, _txn, shouldReplicateWrites);
+ bool shouldReplicateWrites = _opCtx->writesAreReplicated();
+ _opCtx->setReplicatedWrites(false);
+ ON_BLOCK_EXIT(&OperationContext::setReplicatedWrites, _opCtx, shouldReplicateWrites);
MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
- OldClientWriteContext incCtx(_txn, _config.incLong.ns());
- WriteUnitOfWork wuow(_txn);
+ OldClientWriteContext incCtx(_opCtx, _config.incLong.ns());
+ WriteUnitOfWork wuow(_opCtx);
Collection* incColl = incCtx.getCollection();
invariant(!incColl);
CollectionOptions options;
options.setNoIdIndex();
options.temp = true;
- incColl = incCtx.db()->createCollection(_txn, _config.incLong.ns(), options);
+ incColl = incCtx.db()->createCollection(_opCtx, _config.incLong.ns(), options);
invariant(incColl);
// We explicitly create a v=2 index on the "0" field so that it is always possible for a
@@ -448,7 +448,7 @@ void State::prepTempCollection() {
<< "v"
<< static_cast<int>(IndexVersion::kV2));
Status status = incColl->getIndexCatalog()
- ->createIndexOnEmptyCollection(_txn, indexSpec)
+ ->createIndexOnEmptyCollection(_opCtx, indexSpec)
.getStatus();
if (!status.isOK()) {
uasserted(17305,
@@ -459,7 +459,7 @@ void State::prepTempCollection() {
}
wuow.commit();
}
- MONGO_WRITE_CONFLICT_RETRY_LOOP_END(_txn, "M/R prepTempCollection", _config.incLong.ns());
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_END(_opCtx, "M/R prepTempCollection", _config.incLong.ns());
}
CollectionOptions finalOptions;
@@ -467,13 +467,13 @@ void State::prepTempCollection() {
{
// copy indexes and collection options into temporary storage
- OldClientWriteContext finalCtx(_txn, _config.outputOptions.finalNamespace.ns());
+ OldClientWriteContext finalCtx(_opCtx, _config.outputOptions.finalNamespace.ns());
Collection* const finalColl = finalCtx.getCollection();
if (finalColl) {
- finalOptions = finalColl->getCatalogEntry()->getCollectionOptions(_txn);
+ finalOptions = finalColl->getCatalogEntry()->getCollectionOptions(_opCtx);
IndexCatalog::IndexIterator ii =
- finalColl->getIndexCatalog()->getIndexIterator(_txn, true);
+ finalColl->getIndexCatalog()->getIndexIterator(_opCtx, true);
// Iterate over finalColl's indexes.
while (ii.more()) {
IndexDescriptor* currIndex = ii.next();
@@ -495,23 +495,23 @@ void State::prepTempCollection() {
MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
// create temp collection and insert the indexes from temporary storage
- OldClientWriteContext tempCtx(_txn, _config.tempNamespace.ns());
- WriteUnitOfWork wuow(_txn);
+ OldClientWriteContext tempCtx(_opCtx, _config.tempNamespace.ns());
+ WriteUnitOfWork wuow(_opCtx);
uassert(ErrorCodes::PrimarySteppedDown,
"no longer primary",
- repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(_txn,
+ repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(_opCtx,
_config.tempNamespace));
Collection* tempColl = tempCtx.getCollection();
invariant(!tempColl);
CollectionOptions options = finalOptions;
options.temp = true;
- tempColl = tempCtx.db()->createCollection(_txn, _config.tempNamespace.ns(), options);
+ tempColl = tempCtx.db()->createCollection(_opCtx, _config.tempNamespace.ns(), options);
for (vector<BSONObj>::iterator it = indexesToInsert.begin(); it != indexesToInsert.end();
++it) {
Status status =
- tempColl->getIndexCatalog()->createIndexOnEmptyCollection(_txn, *it).getStatus();
+ tempColl->getIndexCatalog()->createIndexOnEmptyCollection(_opCtx, *it).getStatus();
if (!status.isOK()) {
if (status.code() == ErrorCodes::IndexAlreadyExists) {
continue;
@@ -520,11 +520,12 @@ void State::prepTempCollection() {
}
// Log the createIndex operation.
string logNs = _config.tempNamespace.db() + ".system.indexes";
- getGlobalServiceContext()->getOpObserver()->onCreateIndex(_txn, logNs, *it, false);
+ getGlobalServiceContext()->getOpObserver()->onCreateIndex(_opCtx, logNs, *it, false);
}
wuow.commit();
}
- MONGO_WRITE_CONFLICT_RETRY_LOOP_END(_txn, "M/R prepTempCollection", _config.tempNamespace.ns())
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_END(
+ _opCtx, "M/R prepTempCollection", _config.tempNamespace.ns())
}
/**
@@ -605,7 +606,7 @@ void State::appendResults(BSONObjBuilder& final) {
* Does post processing on output collection.
* This may involve replacing, merging or reducing.
*/
-long long State::postProcessCollection(OperationContext* txn,
+long long State::postProcessCollection(OperationContext* opCtx,
CurOp* curOp,
ProgressMeterHolder& pm) {
if (_onDisk == false || _config.outputOptions.outType == Config::INMEMORY)
@@ -613,22 +614,22 @@ long long State::postProcessCollection(OperationContext* txn,
bool holdingGlobalLock = false;
if (_config.outputOptions.outNonAtomic)
- return postProcessCollectionNonAtomic(txn, curOp, pm, holdingGlobalLock);
+ return postProcessCollectionNonAtomic(opCtx, curOp, pm, holdingGlobalLock);
- invariant(!txn->lockState()->isLocked());
+ invariant(!opCtx->lockState()->isLocked());
- ScopedTransaction transaction(txn, MODE_X);
+ ScopedTransaction transaction(opCtx, MODE_X);
// This must be global because we may write across different databases.
- Lock::GlobalWrite lock(txn->lockState());
+ Lock::GlobalWrite lock(opCtx->lockState());
holdingGlobalLock = true;
- return postProcessCollectionNonAtomic(txn, curOp, pm, holdingGlobalLock);
+ return postProcessCollectionNonAtomic(opCtx, curOp, pm, holdingGlobalLock);
}
namespace {
// Runs a count against the namespace specified by 'ns'. If the caller holds the global write lock,
// then this function does not acquire any additional locks.
-unsigned long long _collectionCount(OperationContext* txn,
+unsigned long long _collectionCount(OperationContext* opCtx,
const NamespaceString& nss,
bool callerHoldsGlobalLock) {
Collection* coll = nullptr;
@@ -637,32 +638,32 @@ unsigned long long _collectionCount(OperationContext* txn,
// If the global write lock is held, we must avoid using AutoGetCollectionForRead as it may lead
// to deadlock when waiting for a majority snapshot to be committed. See SERVER-24596.
if (callerHoldsGlobalLock) {
- Database* db = dbHolder().get(txn, nss.ns());
+ Database* db = dbHolder().get(opCtx, nss.ns());
if (db) {
coll = db->getCollection(nss);
}
} else {
- ctx.emplace(txn, nss);
+ ctx.emplace(opCtx, nss);
coll = ctx->getCollection();
}
- return coll ? coll->numRecords(txn) : 0;
+ return coll ? coll->numRecords(opCtx) : 0;
}
} // namespace
-long long State::postProcessCollectionNonAtomic(OperationContext* txn,
+long long State::postProcessCollectionNonAtomic(OperationContext* opCtx,
CurOp* curOp,
ProgressMeterHolder& pm,
bool callerHoldsGlobalLock) {
if (_config.outputOptions.finalNamespace == _config.tempNamespace)
- return _collectionCount(txn, _config.outputOptions.finalNamespace, callerHoldsGlobalLock);
+ return _collectionCount(opCtx, _config.outputOptions.finalNamespace, callerHoldsGlobalLock);
if (_config.outputOptions.outType == Config::REPLACE ||
- _collectionCount(txn, _config.outputOptions.finalNamespace, callerHoldsGlobalLock) == 0) {
- ScopedTransaction transaction(txn, MODE_X);
+ _collectionCount(opCtx, _config.outputOptions.finalNamespace, callerHoldsGlobalLock) == 0) {
+ ScopedTransaction transaction(opCtx, MODE_X);
// This must be global because we may write across different databases.
- Lock::GlobalWrite lock(txn->lockState());
+ Lock::GlobalWrite lock(opCtx->lockState());
// replace: just rename from temp to final collection name, dropping previous collection
_db.dropCollection(_config.outputOptions.finalNamespace.ns());
BSONObj info;
@@ -680,17 +681,19 @@ long long State::postProcessCollectionNonAtomic(OperationContext* txn,
} else if (_config.outputOptions.outType == Config::MERGE) {
// merge: upsert new docs into old collection
{
- const auto count = _collectionCount(txn, _config.tempNamespace, callerHoldsGlobalLock);
- stdx::lock_guard<Client> lk(*txn->getClient());
+ const auto count =
+ _collectionCount(opCtx, _config.tempNamespace, callerHoldsGlobalLock);
+ stdx::lock_guard<Client> lk(*opCtx->getClient());
curOp->setMessage_inlock(
"m/r: merge post processing", "M/R Merge Post Processing Progress", count);
}
unique_ptr<DBClientCursor> cursor = _db.query(_config.tempNamespace.ns(), BSONObj());
while (cursor->more()) {
- ScopedTransaction scopedXact(txn, MODE_X);
- Lock::DBLock lock(txn->lockState(), _config.outputOptions.finalNamespace.db(), MODE_X);
+ ScopedTransaction scopedXact(opCtx, MODE_X);
+ Lock::DBLock lock(
+ opCtx->lockState(), _config.outputOptions.finalNamespace.db(), MODE_X);
BSONObj o = cursor->nextSafe();
- Helpers::upsert(txn, _config.outputOptions.finalNamespace.ns(), o);
+ Helpers::upsert(opCtx, _config.outputOptions.finalNamespace.ns(), o);
pm.hit();
}
_db.dropCollection(_config.tempNamespace.ns());
@@ -700,25 +703,26 @@ long long State::postProcessCollectionNonAtomic(OperationContext* txn,
BSONList values;
{
- const auto count = _collectionCount(txn, _config.tempNamespace, callerHoldsGlobalLock);
- stdx::lock_guard<Client> lk(*txn->getClient());
+ const auto count =
+ _collectionCount(opCtx, _config.tempNamespace, callerHoldsGlobalLock);
+ stdx::lock_guard<Client> lk(*opCtx->getClient());
curOp->setMessage_inlock(
"m/r: reduce post processing", "M/R Reduce Post Processing Progress", count);
}
unique_ptr<DBClientCursor> cursor = _db.query(_config.tempNamespace.ns(), BSONObj());
while (cursor->more()) {
- ScopedTransaction transaction(txn, MODE_X);
+ ScopedTransaction transaction(opCtx, MODE_X);
// This must be global because we may write across different databases.
- Lock::GlobalWrite lock(txn->lockState());
+ Lock::GlobalWrite lock(opCtx->lockState());
BSONObj temp = cursor->nextSafe();
BSONObj old;
bool found;
{
- OldClientContext tx(txn, _config.outputOptions.finalNamespace.ns());
+ OldClientContext tx(opCtx, _config.outputOptions.finalNamespace.ns());
Collection* coll =
getCollectionOrUassert(tx.db(), _config.outputOptions.finalNamespace);
- found = Helpers::findOne(txn, coll, temp["_id"].wrap(), old, true);
+ found = Helpers::findOne(opCtx, coll, temp["_id"].wrap(), old, true);
}
if (found) {
@@ -726,18 +730,18 @@ long long State::postProcessCollectionNonAtomic(OperationContext* txn,
values.clear();
values.push_back(temp);
values.push_back(old);
- Helpers::upsert(txn,
+ Helpers::upsert(opCtx,
_config.outputOptions.finalNamespace.ns(),
_config.reducer->finalReduce(values, _config.finalizer.get()));
} else {
- Helpers::upsert(txn, _config.outputOptions.finalNamespace.ns(), temp);
+ Helpers::upsert(opCtx, _config.outputOptions.finalNamespace.ns(), temp);
}
pm.hit();
}
pm.finished();
}
- return _collectionCount(txn, _config.outputOptions.finalNamespace, callerHoldsGlobalLock);
+ return _collectionCount(opCtx, _config.outputOptions.finalNamespace, callerHoldsGlobalLock);
}
/**
@@ -747,11 +751,11 @@ void State::insert(const NamespaceString& nss, const BSONObj& o) {
verify(_onDisk);
MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
- OldClientWriteContext ctx(_txn, nss.ns());
- WriteUnitOfWork wuow(_txn);
+ OldClientWriteContext ctx(_opCtx, nss.ns());
+ WriteUnitOfWork wuow(_opCtx);
uassert(ErrorCodes::PrimarySteppedDown,
"no longer primary",
- repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(_txn, nss));
+ repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(_opCtx, nss));
Collection* coll = getCollectionOrUassert(ctx.db(), nss);
BSONObjBuilder b;
@@ -761,7 +765,7 @@ void State::insert(const NamespaceString& nss, const BSONObj& o) {
b.appendElements(o);
BSONObj bo = b.obj();
- StatusWith<BSONObj> res = fixDocumentForInsert(_txn->getServiceContext(), bo);
+ StatusWith<BSONObj> res = fixDocumentForInsert(_opCtx->getServiceContext(), bo);
uassertStatusOK(res.getStatus());
if (!res.getValue().isEmpty()) {
bo = res.getValue();
@@ -769,10 +773,10 @@ void State::insert(const NamespaceString& nss, const BSONObj& o) {
// TODO: Consider whether to pass OpDebug for stats tracking under SERVER-23261.
OpDebug* const nullOpDebug = nullptr;
- uassertStatusOK(coll->insertDocument(_txn, bo, nullOpDebug, true));
+ uassertStatusOK(coll->insertDocument(_opCtx, bo, nullOpDebug, true));
wuow.commit();
}
- MONGO_WRITE_CONFLICT_RETRY_LOOP_END(_txn, "M/R insert", nss.ns());
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_END(_opCtx, "M/R insert", nss.ns());
}
/**
@@ -782,12 +786,12 @@ void State::_insertToInc(BSONObj& o) {
verify(_onDisk);
MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
- OldClientWriteContext ctx(_txn, _config.incLong.ns());
- WriteUnitOfWork wuow(_txn);
+ OldClientWriteContext ctx(_opCtx, _config.incLong.ns());
+ WriteUnitOfWork wuow(_opCtx);
Collection* coll = getCollectionOrUassert(ctx.db(), _config.incLong);
- bool shouldReplicateWrites = _txn->writesAreReplicated();
- _txn->setReplicatedWrites(false);
- ON_BLOCK_EXIT(&OperationContext::setReplicatedWrites, _txn, shouldReplicateWrites);
+ bool shouldReplicateWrites = _opCtx->writesAreReplicated();
+ _opCtx->setReplicatedWrites(false);
+ ON_BLOCK_EXIT(&OperationContext::setReplicatedWrites, _opCtx, shouldReplicateWrites);
// The documents inserted into the incremental collection are of the form
// {"0": <key>, "1": <value>}, so we cannot call fixDocumentForInsert(o) here because the
@@ -804,14 +808,20 @@ void State::_insertToInc(BSONObj& o) {
// TODO: Consider whether to pass OpDebug for stats tracking under SERVER-23261.
OpDebug* const nullOpDebug = nullptr;
- uassertStatusOK(coll->insertDocument(_txn, o, nullOpDebug, true, false));
+ uassertStatusOK(coll->insertDocument(_opCtx, o, nullOpDebug, true, false));
wuow.commit();
}
- MONGO_WRITE_CONFLICT_RETRY_LOOP_END(_txn, "M/R insertToInc", _config.incLong.ns());
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_END(_opCtx, "M/R insertToInc", _config.incLong.ns());
}
-State::State(OperationContext* txn, const Config& c)
- : _config(c), _db(txn), _useIncremental(true), _txn(txn), _size(0), _dupCount(0), _numEmits(0) {
+State::State(OperationContext* opCtx, const Config& c)
+ : _config(c),
+ _db(opCtx),
+ _useIncremental(true),
+ _opCtx(opCtx),
+ _size(0),
+ _dupCount(0),
+ _numEmits(0) {
_temp.reset(new InMemory());
_onDisk = _config.outputOptions.outType != Config::INMEMORY;
}
@@ -849,9 +859,9 @@ void State::init() {
const string userToken =
AuthorizationSession::get(Client::getCurrent())->getAuthenticatedUserNamesToken();
_scope.reset(getGlobalScriptEngine()->newScopeForCurrentThread());
- _scope->registerOperation(_txn);
+ _scope->registerOperation(_opCtx);
_scope->setLocalDB(_config.dbname);
- _scope->loadStored(_txn, true);
+ _scope->loadStored(_opCtx, true);
if (!_config.scopeSetup.isEmpty())
_scope->init(&_config.scopeSetup);
@@ -1027,7 +1037,7 @@ BSONObj _nativeToTemp(const BSONObj& args, void* data) {
* After calling this method, the temp collection will be completed.
* If inline, the results will be in the in memory map
*/
-void State::finalReduce(OperationContext* txn, CurOp* curOp, ProgressMeterHolder& pm) {
+void State::finalReduce(OperationContext* opCtx, CurOp* curOp, ProgressMeterHolder& pm) {
if (_jsMode) {
// apply the reduce within JS
if (_onDisk) {
@@ -1066,12 +1076,12 @@ void State::finalReduce(OperationContext* txn, CurOp* curOp, ProgressMeterHolder
BSONObj sortKey = BSON("0" << 1);
MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
- OldClientWriteContext incCtx(_txn, _config.incLong.ns());
- WriteUnitOfWork wuow(_txn);
+ OldClientWriteContext incCtx(_opCtx, _config.incLong.ns());
+ WriteUnitOfWork wuow(_opCtx);
Collection* incColl = getCollectionOrUassert(incCtx.db(), _config.incLong);
bool foundIndex = false;
- IndexCatalog::IndexIterator ii = incColl->getIndexCatalog()->getIndexIterator(_txn, true);
+ IndexCatalog::IndexIterator ii = incColl->getIndexCatalog()->getIndexIterator(_opCtx, true);
// Iterate over incColl's indexes.
while (ii.more()) {
IndexDescriptor* currIndex = ii.next();
@@ -1085,28 +1095,28 @@ void State::finalReduce(OperationContext* txn, CurOp* curOp, ProgressMeterHolder
verify(foundIndex);
wuow.commit();
}
- MONGO_WRITE_CONFLICT_RETRY_LOOP_END(_txn, "finalReduce", _config.incLong.ns());
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_END(_opCtx, "finalReduce", _config.incLong.ns());
- unique_ptr<AutoGetCollectionForRead> ctx(new AutoGetCollectionForRead(_txn, _config.incLong));
+ unique_ptr<AutoGetCollectionForRead> ctx(new AutoGetCollectionForRead(_opCtx, _config.incLong));
BSONObj prev;
BSONList all;
{
const auto count = _db.count(_config.incLong.ns(), BSONObj(), QueryOption_SlaveOk);
- stdx::lock_guard<Client> lk(*_txn->getClient());
+ stdx::lock_guard<Client> lk(*_opCtx->getClient());
verify(pm ==
curOp->setMessage_inlock("m/r: (3/3) final reduce to collection",
"M/R: (3/3) Final Reduce Progress",
count));
}
- const ExtensionsCallbackReal extensionsCallback(_txn, &_config.incLong);
+ const ExtensionsCallbackReal extensionsCallback(_opCtx, &_config.incLong);
auto qr = stdx::make_unique<QueryRequest>(_config.incLong);
qr->setSort(sortKey);
- auto statusWithCQ = CanonicalQuery::canonicalize(txn, std::move(qr), extensionsCallback);
+ auto statusWithCQ = CanonicalQuery::canonicalize(opCtx, std::move(qr), extensionsCallback);
verify(statusWithCQ.isOK());
std::unique_ptr<CanonicalQuery> cq = std::move(statusWithCQ.getValue());
@@ -1114,7 +1124,7 @@ void State::finalReduce(OperationContext* txn, CurOp* curOp, ProgressMeterHolder
invariant(coll);
auto statusWithPlanExecutor = getExecutor(
- _txn, coll, std::move(cq), PlanExecutor::YIELD_AUTO, QueryPlannerParams::NO_TABLE_SCAN);
+ _opCtx, coll, std::move(cq), PlanExecutor::YIELD_AUTO, QueryPlannerParams::NO_TABLE_SCAN);
verify(statusWithPlanExecutor.isOK());
unique_ptr<PlanExecutor> exec = std::move(statusWithPlanExecutor.getValue());
@@ -1130,7 +1140,7 @@ void State::finalReduce(OperationContext* txn, CurOp* curOp, ProgressMeterHolder
// object is same as previous, add to array
all.push_back(o);
if (pm->hits() % 100 == 0) {
- _txn->checkForInterrupt();
+ _opCtx->checkForInterrupt();
}
continue;
}
@@ -1142,7 +1152,7 @@ void State::finalReduce(OperationContext* txn, CurOp* curOp, ProgressMeterHolder
// reduce a finalize array
finalReduce(all);
- ctx.reset(new AutoGetCollectionForRead(_txn, _config.incLong));
+ ctx.reset(new AutoGetCollectionForRead(_opCtx, _config.incLong));
all.clear();
prev = o;
@@ -1152,7 +1162,7 @@ void State::finalReduce(OperationContext* txn, CurOp* curOp, ProgressMeterHolder
uasserted(34375, "Plan executor killed during mapReduce final reduce");
}
- _txn->checkForInterrupt();
+ _opCtx->checkForInterrupt();
}
uassert(34428,
@@ -1162,7 +1172,7 @@ void State::finalReduce(OperationContext* txn, CurOp* curOp, ProgressMeterHolder
ctx.reset();
// reduce and finalize last array
finalReduce(all);
- ctx.reset(new AutoGetCollectionForRead(_txn, _config.incLong));
+ ctx.reset(new AutoGetCollectionForRead(_opCtx, _config.incLong));
pm.finished();
}
@@ -1247,7 +1257,7 @@ int State::_add(InMemory* im, const BSONObj& a) {
void State::reduceAndSpillInMemoryStateIfNeeded() {
// Make sure no DB locks are held, because this method manages its own locking and
// write units of work.
- invariant(!_txn->lockState()->isLocked());
+ invariant(!_opCtx->lockState()->isLocked());
if (_jsMode) {
// try to reduce if it is beneficial
@@ -1362,7 +1372,7 @@ public:
addPrivilegesRequiredForMapReduce(this, dbname, cmdObj, out);
}
- bool run(OperationContext* txn,
+ bool run(OperationContext* opCtx,
const string& dbname,
BSONObj& cmd,
int,
@@ -1372,9 +1382,9 @@ public:
boost::optional<DisableDocumentValidation> maybeDisableValidation;
if (shouldBypassDocumentValidationForCommand(cmd))
- maybeDisableValidation.emplace(txn);
+ maybeDisableValidation.emplace(opCtx);
- auto client = txn->getClient();
+ auto client = opCtx->getClient();
if (client->isInDirectClient()) {
return appendCommandStatus(
@@ -1382,7 +1392,7 @@ public:
Status(ErrorCodes::IllegalOperation, "Cannot run mapReduce command from eval()"));
}
- auto curOp = CurOp::get(txn);
+ auto curOp = CurOp::get(opCtx);
const Config config(dbname, cmd);
@@ -1404,7 +1414,7 @@ public:
unique_ptr<RangePreserver> rangePreserver;
ScopedCollectionMetadata collMetadata;
{
- AutoGetCollectionForRead ctx(txn, config.nss);
+ AutoGetCollectionForRead ctx(opCtx, config.nss);
Collection* collection = ctx.getCollection();
if (collection) {
@@ -1413,19 +1423,19 @@ public:
// Get metadata before we check our version, to make sure it doesn't increment
// in the meantime. Need to do this in the same lock scope as the block.
- if (ShardingState::get(txn)->needCollectionMetadata(txn, config.nss.ns())) {
- collMetadata = CollectionShardingState::get(txn, config.nss)->getMetadata();
+ if (ShardingState::get(opCtx)->needCollectionMetadata(opCtx, config.nss.ns())) {
+ collMetadata = CollectionShardingState::get(opCtx, config.nss)->getMetadata();
}
}
// Ensure that the RangePreserver is freed under the lock. This is necessary since the
// RangePreserver's destructor unpins a ClientCursor, and access to the CursorManager must
// be done under the lock.
- ON_BLOCK_EXIT([txn, &config, &rangePreserver] {
+ ON_BLOCK_EXIT([opCtx, &config, &rangePreserver] {
if (rangePreserver) {
// Be sure not to use AutoGetCollectionForRead here, since that has side-effects
// other than lock acquisition.
- AutoGetCollection ctx(txn, config.nss, MODE_IS);
+ AutoGetCollection ctx(opCtx, config.nss, MODE_IS);
rangePreserver.reset();
}
});
@@ -1434,7 +1444,7 @@ public:
BSONObjBuilder countsBuilder;
BSONObjBuilder timingBuilder;
- State state(txn, config);
+ State state(opCtx, config);
if (!state.sourceExists()) {
return appendCommandStatus(
result,
@@ -1444,7 +1454,7 @@ public:
if (state.isOnDisk()) {
// this means that it will be doing a write operation, make sure we are on Master
// ideally this check should be in slaveOk(), but at that point config is not known
- if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor_UNSAFE(txn,
+ if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor_UNSAFE(opCtx,
config.nss)) {
errmsg = "not master";
return false;
@@ -1460,7 +1470,7 @@ public:
bool showTotal = true;
if (state.config().filter.isEmpty()) {
const bool holdingGlobalLock = false;
- const auto count = _collectionCount(txn, config.nss, holdingGlobalLock);
+ const auto count = _collectionCount(opCtx, config.nss, holdingGlobalLock);
progressTotal =
(config.limit && (unsigned)config.limit < count) ? config.limit : count;
} else {
@@ -1469,7 +1479,7 @@ public:
progressTotal = 1;
}
- stdx::unique_lock<Client> lk(*txn->getClient());
+ stdx::unique_lock<Client> lk(*opCtx->getClient());
ProgressMeter& progress(curOp->setMessage_inlock(
"m/r: (1/3) emit phase", "M/R: (1/3) Emit Progress", progressTotal));
lk.unlock();
@@ -1488,18 +1498,18 @@ public:
// useful cursor.
// Need lock and context to use it
- unique_ptr<ScopedTransaction> scopedXact(new ScopedTransaction(txn, MODE_IS));
- unique_ptr<AutoGetDb> scopedAutoDb(new AutoGetDb(txn, config.nss.db(), MODE_S));
+ unique_ptr<ScopedTransaction> scopedXact(new ScopedTransaction(opCtx, MODE_IS));
+ unique_ptr<AutoGetDb> scopedAutoDb(new AutoGetDb(opCtx, config.nss.db(), MODE_S));
auto qr = stdx::make_unique<QueryRequest>(config.nss);
qr->setFilter(config.filter);
qr->setSort(config.sort);
qr->setCollation(config.collation);
- const ExtensionsCallbackReal extensionsCallback(txn, &config.nss);
+ const ExtensionsCallbackReal extensionsCallback(opCtx, &config.nss);
auto statusWithCQ =
- CanonicalQuery::canonicalize(txn, std::move(qr), extensionsCallback);
+ CanonicalQuery::canonicalize(opCtx, std::move(qr), extensionsCallback);
if (!statusWithCQ.isOK()) {
uasserted(17238, "Can't canonicalize query " + config.filter.toString());
return 0;
@@ -1513,7 +1523,7 @@ public:
invariant(coll);
auto statusWithPlanExecutor =
- getExecutor(txn, coll, std::move(cq), PlanExecutor::YIELD_AUTO);
+ getExecutor(opCtx, coll, std::move(cq), PlanExecutor::YIELD_AUTO);
if (!statusWithPlanExecutor.isOK()) {
uasserted(17239,
"Can't get executor for query " + config.filter.toString());
@@ -1524,8 +1534,8 @@ public:
}
{
- stdx::lock_guard<Client> lk(*txn->getClient());
- CurOp::get(txn)->setPlanSummary_inlock(Explain::getPlanSummary(exec.get()));
+ stdx::lock_guard<Client> lk(*opCtx->getClient());
+ CurOp::get(opCtx)->setPlanSummary_inlock(Explain::getPlanSummary(exec.get()));
}
Timer mt;
@@ -1568,8 +1578,8 @@ public:
state.reduceAndSpillInMemoryStateIfNeeded();
- scopedXact.reset(new ScopedTransaction(txn, MODE_IS));
- scopedAutoDb.reset(new AutoGetDb(txn, config.nss.db(), MODE_S));
+ scopedXact.reset(new ScopedTransaction(opCtx, MODE_IS));
+ scopedAutoDb.reset(new AutoGetDb(opCtx, config.nss.db(), MODE_S));
if (!exec->restoreState()) {
return appendCommandStatus(
@@ -1581,7 +1591,7 @@ public:
reduceTime += t.micros();
- txn->checkForInterrupt();
+ opCtx->checkForInterrupt();
}
pm.hit();
@@ -1608,7 +1618,7 @@ public:
Collection* coll = scopedAutoDb->getDb()->getCollection(config.nss);
invariant(coll); // 'exec' hasn't been killed, so collection must be alive.
- coll->infoCache()->notifyOfQuery(txn, stats.indexesUsed);
+ coll->infoCache()->notifyOfQuery(opCtx, stats.indexesUsed);
if (curOp->shouldDBProfile()) {
BSONObjBuilder execStatsBob;
@@ -1618,7 +1628,7 @@ public:
}
pm.finished();
- txn->checkForInterrupt();
+ opCtx->checkForInterrupt();
// update counters
countsBuilder.appendNumber("input", numInputs);
@@ -1630,7 +1640,7 @@ public:
timingBuilder.append("emitLoop", t.millis());
{
- stdx::lock_guard<Client> lk(*txn->getClient());
+ stdx::lock_guard<Client> lk(*opCtx->getClient());
curOp->setMessage_inlock("m/r: (2/3) final reduce in memory",
"M/R: (2/3) Final In-Memory Reduce Progress");
}
@@ -1641,13 +1651,13 @@ public:
// if not inline: dump the in memory map to inc collection, all data is on disk
state.dumpToInc();
// final reduce
- state.finalReduce(txn, curOp, pm);
+ state.finalReduce(opCtx, curOp, pm);
reduceTime += rt.micros();
// Ensure the profile shows the source namespace. If the output was not inline, the
// active namespace will be the temporary collection we inserted into.
{
- stdx::lock_guard<Client> lk(*txn->getClient());
+ stdx::lock_guard<Client> lk(*opCtx->getClient());
curOp->setNS_inlock(config.nss.ns());
}
@@ -1655,7 +1665,7 @@ public:
timingBuilder.appendNumber("reduceTime", reduceTime / 1000);
timingBuilder.append("mode", state.jsMode() ? "js" : "mixed");
- long long finalCount = state.postProcessCollection(txn, curOp, pm);
+ long long finalCount = state.postProcessCollection(opCtx, curOp, pm);
state.appendResults(result);
timingBuilder.appendNumber("total", t.millis());
@@ -1718,7 +1728,7 @@ public:
actions.addAction(ActionType::internal);
out->push_back(Privilege(ResourcePattern::forClusterResource(), actions));
}
- bool run(OperationContext* txn,
+ bool run(OperationContext* opCtx,
const string& dbname,
BSONObj& cmdObj,
int,
@@ -1734,7 +1744,7 @@ public:
boost::optional<DisableDocumentValidation> maybeDisableValidation;
if (shouldBypassDocumentValidationForCommand(cmdObj))
- maybeDisableValidation.emplace(txn);
+ maybeDisableValidation.emplace(opCtx);
ShardedConnectionInfo::addHook();
@@ -1754,10 +1764,10 @@ public:
inputNS = NamespaceString(dbname, shardedOutputCollection).ns();
}
- CurOp* curOp = CurOp::get(txn);
+ CurOp* curOp = CurOp::get(opCtx);
Config config(dbname, cmdObj.firstElement().embeddedObjectUserCheck());
- State state(txn, config);
+ State state(opCtx, config);
state.init();
// no need for incremental collection because records are already sorted
@@ -1767,7 +1777,7 @@ public:
BSONObj shardCounts = cmdObj["shardCounts"].embeddedObjectUserCheck();
BSONObj counts = cmdObj["counts"].embeddedObjectUserCheck();
- stdx::unique_lock<Client> lk(*txn->getClient());
+ stdx::unique_lock<Client> lk(*opCtx->getClient());
ProgressMeterHolder pm(curOp->setMessage_inlock("m/r: merge sort and reduce",
"M/R Merge Sort and Reduce Progress"));
lk.unlock();
@@ -1781,7 +1791,7 @@ public:
std::string server = e.fieldName();
servers.insert(server);
- uassertStatusOK(Grid::get(txn)->shardRegistry()->getShard(txn, server));
+ uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, server));
}
}
@@ -1801,7 +1811,7 @@ public:
result.append("result", config.outputOptions.collectionName);
}
- auto scopedDbStatus = ScopedShardDatabase::getExisting(txn, dbname);
+ auto scopedDbStatus = ScopedShardDatabase::getExisting(opCtx, dbname);
if (!scopedDbStatus.isOK()) {
return appendCommandStatus(result, scopedDbStatus.getStatus());
}
@@ -1812,11 +1822,11 @@ public:
if (confOut->isSharded(config.outputOptions.finalNamespace.ns())) {
shared_ptr<ChunkManager> cm =
- confOut->getChunkManager(txn, config.outputOptions.finalNamespace.ns());
+ confOut->getChunkManager(opCtx, config.outputOptions.finalNamespace.ns());
// Fetch result from other shards 1 chunk at a time. It would be better to do just one
// big $or query, but then the sorting would not be efficient.
- const string shardName = ShardingState::get(txn)->getShardName();
+ const string shardName = ShardingState::get(opCtx)->getShardName();
const ChunkMap& chunkMap = cm->getChunkMap();
for (ChunkMap::const_iterator it = chunkMap.begin(); it != chunkMap.end(); ++it) {
@@ -1846,7 +1856,7 @@ public:
BSONObj sortKey = BSON("_id" << 1);
ParallelSortClusteredCursor cursor(
servers, inputNS, Query(query).sort(sortKey), QueryOption_NoCursorTimeout);
- cursor.init(txn);
+ cursor.init(opCtx);
int chunkSize = 0;
while (cursor.more() || !values.empty()) {
@@ -1890,7 +1900,7 @@ public:
result.append("chunkSizes", chunkSizes.arr());
- long long outputCount = state.postProcessCollection(txn, curOp, pm);
+ long long outputCount = state.postProcessCollection(opCtx, curOp, pm);
state.appendResults(result);
BSONObjBuilder countsB(32);