diff options
Diffstat (limited to 'src/mongo/db/commands/mr.cpp')
-rw-r--r-- | src/mongo/db/commands/mr.cpp | 278 |
1 files changed, 144 insertions, 134 deletions
diff --git a/src/mongo/db/commands/mr.cpp b/src/mongo/db/commands/mr.cpp index 781c0d1d5af..25e9590a6d9 100644 --- a/src/mongo/db/commands/mr.cpp +++ b/src/mongo/db/commands/mr.cpp @@ -372,40 +372,40 @@ Config::Config(const string& _dbname, const BSONObj& cmdObj) { void State::dropTempCollections() { if (!_config.tempNamespace.isEmpty()) { MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { - ScopedTransaction scopedXact(_txn, MODE_IX); - AutoGetDb autoDb(_txn, _config.tempNamespace.db(), MODE_X); + ScopedTransaction scopedXact(_opCtx, MODE_IX); + AutoGetDb autoDb(_opCtx, _config.tempNamespace.db(), MODE_X); if (auto db = autoDb.getDb()) { - WriteUnitOfWork wunit(_txn); + WriteUnitOfWork wunit(_opCtx); uassert(ErrorCodes::PrimarySteppedDown, "no longer primary", repl::getGlobalReplicationCoordinator()->canAcceptWritesFor( - _txn, _config.tempNamespace)); - db->dropCollection(_txn, _config.tempNamespace.ns()); + _opCtx, _config.tempNamespace)); + db->dropCollection(_opCtx, _config.tempNamespace.ns()); wunit.commit(); } } MONGO_WRITE_CONFLICT_RETRY_LOOP_END( - _txn, "M/R dropTempCollections", _config.tempNamespace.ns()) + _opCtx, "M/R dropTempCollections", _config.tempNamespace.ns()) // Always forget about temporary namespaces, so we don't cache lots of them ShardConnection::forgetNS(_config.tempNamespace.ns()); } if (_useIncremental && !_config.incLong.isEmpty()) { // We don't want to log the deletion of incLong as it isn't replicated. While // harmless, this would lead to a scary looking warning on the secondaries. - bool shouldReplicateWrites = _txn->writesAreReplicated(); - _txn->setReplicatedWrites(false); - ON_BLOCK_EXIT(&OperationContext::setReplicatedWrites, _txn, shouldReplicateWrites); + bool shouldReplicateWrites = _opCtx->writesAreReplicated(); + _opCtx->setReplicatedWrites(false); + ON_BLOCK_EXIT(&OperationContext::setReplicatedWrites, _opCtx, shouldReplicateWrites); MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { - ScopedTransaction scopedXact(_txn, MODE_IX); - Lock::DBLock lk(_txn->lockState(), _config.incLong.db(), MODE_X); - if (Database* db = dbHolder().get(_txn, _config.incLong.ns())) { - WriteUnitOfWork wunit(_txn); - db->dropCollection(_txn, _config.incLong.ns()); + ScopedTransaction scopedXact(_opCtx, MODE_IX); + Lock::DBLock lk(_opCtx->lockState(), _config.incLong.db(), MODE_X); + if (Database* db = dbHolder().get(_opCtx, _config.incLong.ns())) { + WriteUnitOfWork wunit(_opCtx); + db->dropCollection(_opCtx, _config.incLong.ns()); wunit.commit(); } } - MONGO_WRITE_CONFLICT_RETRY_LOOP_END(_txn, "M/R dropTempCollections", _config.incLong.ns()) + MONGO_WRITE_CONFLICT_RETRY_LOOP_END(_opCtx, "M/R dropTempCollections", _config.incLong.ns()) ShardConnection::forgetNS(_config.incLong.ns()); } @@ -422,20 +422,20 @@ void State::prepTempCollection() { if (_useIncremental) { // Create the inc collection and make sure we have index on "0" key. // Intentionally not replicating the inc collection to secondaries. - bool shouldReplicateWrites = _txn->writesAreReplicated(); - _txn->setReplicatedWrites(false); - ON_BLOCK_EXIT(&OperationContext::setReplicatedWrites, _txn, shouldReplicateWrites); + bool shouldReplicateWrites = _opCtx->writesAreReplicated(); + _opCtx->setReplicatedWrites(false); + ON_BLOCK_EXIT(&OperationContext::setReplicatedWrites, _opCtx, shouldReplicateWrites); MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { - OldClientWriteContext incCtx(_txn, _config.incLong.ns()); - WriteUnitOfWork wuow(_txn); + OldClientWriteContext incCtx(_opCtx, _config.incLong.ns()); + WriteUnitOfWork wuow(_opCtx); Collection* incColl = incCtx.getCollection(); invariant(!incColl); CollectionOptions options; options.setNoIdIndex(); options.temp = true; - incColl = incCtx.db()->createCollection(_txn, _config.incLong.ns(), options); + incColl = incCtx.db()->createCollection(_opCtx, _config.incLong.ns(), options); invariant(incColl); // We explicitly create a v=2 index on the "0" field so that it is always possible for a @@ -448,7 +448,7 @@ void State::prepTempCollection() { << "v" << static_cast<int>(IndexVersion::kV2)); Status status = incColl->getIndexCatalog() - ->createIndexOnEmptyCollection(_txn, indexSpec) + ->createIndexOnEmptyCollection(_opCtx, indexSpec) .getStatus(); if (!status.isOK()) { uasserted(17305, @@ -459,7 +459,7 @@ void State::prepTempCollection() { } wuow.commit(); } - MONGO_WRITE_CONFLICT_RETRY_LOOP_END(_txn, "M/R prepTempCollection", _config.incLong.ns()); + MONGO_WRITE_CONFLICT_RETRY_LOOP_END(_opCtx, "M/R prepTempCollection", _config.incLong.ns()); } CollectionOptions finalOptions; @@ -467,13 +467,13 @@ void State::prepTempCollection() { { // copy indexes and collection options into temporary storage - OldClientWriteContext finalCtx(_txn, _config.outputOptions.finalNamespace.ns()); + OldClientWriteContext finalCtx(_opCtx, _config.outputOptions.finalNamespace.ns()); Collection* const finalColl = finalCtx.getCollection(); if (finalColl) { - finalOptions = finalColl->getCatalogEntry()->getCollectionOptions(_txn); + finalOptions = finalColl->getCatalogEntry()->getCollectionOptions(_opCtx); IndexCatalog::IndexIterator ii = - finalColl->getIndexCatalog()->getIndexIterator(_txn, true); + finalColl->getIndexCatalog()->getIndexIterator(_opCtx, true); // Iterate over finalColl's indexes. while (ii.more()) { IndexDescriptor* currIndex = ii.next(); @@ -495,23 +495,23 @@ void State::prepTempCollection() { MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { // create temp collection and insert the indexes from temporary storage - OldClientWriteContext tempCtx(_txn, _config.tempNamespace.ns()); - WriteUnitOfWork wuow(_txn); + OldClientWriteContext tempCtx(_opCtx, _config.tempNamespace.ns()); + WriteUnitOfWork wuow(_opCtx); uassert(ErrorCodes::PrimarySteppedDown, "no longer primary", - repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(_txn, + repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(_opCtx, _config.tempNamespace)); Collection* tempColl = tempCtx.getCollection(); invariant(!tempColl); CollectionOptions options = finalOptions; options.temp = true; - tempColl = tempCtx.db()->createCollection(_txn, _config.tempNamespace.ns(), options); + tempColl = tempCtx.db()->createCollection(_opCtx, _config.tempNamespace.ns(), options); for (vector<BSONObj>::iterator it = indexesToInsert.begin(); it != indexesToInsert.end(); ++it) { Status status = - tempColl->getIndexCatalog()->createIndexOnEmptyCollection(_txn, *it).getStatus(); + tempColl->getIndexCatalog()->createIndexOnEmptyCollection(_opCtx, *it).getStatus(); if (!status.isOK()) { if (status.code() == ErrorCodes::IndexAlreadyExists) { continue; @@ -520,11 +520,12 @@ void State::prepTempCollection() { } // Log the createIndex operation. string logNs = _config.tempNamespace.db() + ".system.indexes"; - getGlobalServiceContext()->getOpObserver()->onCreateIndex(_txn, logNs, *it, false); + getGlobalServiceContext()->getOpObserver()->onCreateIndex(_opCtx, logNs, *it, false); } wuow.commit(); } - MONGO_WRITE_CONFLICT_RETRY_LOOP_END(_txn, "M/R prepTempCollection", _config.tempNamespace.ns()) + MONGO_WRITE_CONFLICT_RETRY_LOOP_END( + _opCtx, "M/R prepTempCollection", _config.tempNamespace.ns()) } /** @@ -605,7 +606,7 @@ void State::appendResults(BSONObjBuilder& final) { * Does post processing on output collection. * This may involve replacing, merging or reducing. */ -long long State::postProcessCollection(OperationContext* txn, +long long State::postProcessCollection(OperationContext* opCtx, CurOp* curOp, ProgressMeterHolder& pm) { if (_onDisk == false || _config.outputOptions.outType == Config::INMEMORY) @@ -613,22 +614,22 @@ long long State::postProcessCollection(OperationContext* txn, bool holdingGlobalLock = false; if (_config.outputOptions.outNonAtomic) - return postProcessCollectionNonAtomic(txn, curOp, pm, holdingGlobalLock); + return postProcessCollectionNonAtomic(opCtx, curOp, pm, holdingGlobalLock); - invariant(!txn->lockState()->isLocked()); + invariant(!opCtx->lockState()->isLocked()); - ScopedTransaction transaction(txn, MODE_X); + ScopedTransaction transaction(opCtx, MODE_X); // This must be global because we may write across different databases. - Lock::GlobalWrite lock(txn->lockState()); + Lock::GlobalWrite lock(opCtx->lockState()); holdingGlobalLock = true; - return postProcessCollectionNonAtomic(txn, curOp, pm, holdingGlobalLock); + return postProcessCollectionNonAtomic(opCtx, curOp, pm, holdingGlobalLock); } namespace { // Runs a count against the namespace specified by 'ns'. If the caller holds the global write lock, // then this function does not acquire any additional locks. -unsigned long long _collectionCount(OperationContext* txn, +unsigned long long _collectionCount(OperationContext* opCtx, const NamespaceString& nss, bool callerHoldsGlobalLock) { Collection* coll = nullptr; @@ -637,32 +638,32 @@ unsigned long long _collectionCount(OperationContext* txn, // If the global write lock is held, we must avoid using AutoGetCollectionForRead as it may lead // to deadlock when waiting for a majority snapshot to be committed. See SERVER-24596. if (callerHoldsGlobalLock) { - Database* db = dbHolder().get(txn, nss.ns()); + Database* db = dbHolder().get(opCtx, nss.ns()); if (db) { coll = db->getCollection(nss); } } else { - ctx.emplace(txn, nss); + ctx.emplace(opCtx, nss); coll = ctx->getCollection(); } - return coll ? coll->numRecords(txn) : 0; + return coll ? coll->numRecords(opCtx) : 0; } } // namespace -long long State::postProcessCollectionNonAtomic(OperationContext* txn, +long long State::postProcessCollectionNonAtomic(OperationContext* opCtx, CurOp* curOp, ProgressMeterHolder& pm, bool callerHoldsGlobalLock) { if (_config.outputOptions.finalNamespace == _config.tempNamespace) - return _collectionCount(txn, _config.outputOptions.finalNamespace, callerHoldsGlobalLock); + return _collectionCount(opCtx, _config.outputOptions.finalNamespace, callerHoldsGlobalLock); if (_config.outputOptions.outType == Config::REPLACE || - _collectionCount(txn, _config.outputOptions.finalNamespace, callerHoldsGlobalLock) == 0) { - ScopedTransaction transaction(txn, MODE_X); + _collectionCount(opCtx, _config.outputOptions.finalNamespace, callerHoldsGlobalLock) == 0) { + ScopedTransaction transaction(opCtx, MODE_X); // This must be global because we may write across different databases. - Lock::GlobalWrite lock(txn->lockState()); + Lock::GlobalWrite lock(opCtx->lockState()); // replace: just rename from temp to final collection name, dropping previous collection _db.dropCollection(_config.outputOptions.finalNamespace.ns()); BSONObj info; @@ -680,17 +681,19 @@ long long State::postProcessCollectionNonAtomic(OperationContext* txn, } else if (_config.outputOptions.outType == Config::MERGE) { // merge: upsert new docs into old collection { - const auto count = _collectionCount(txn, _config.tempNamespace, callerHoldsGlobalLock); - stdx::lock_guard<Client> lk(*txn->getClient()); + const auto count = + _collectionCount(opCtx, _config.tempNamespace, callerHoldsGlobalLock); + stdx::lock_guard<Client> lk(*opCtx->getClient()); curOp->setMessage_inlock( "m/r: merge post processing", "M/R Merge Post Processing Progress", count); } unique_ptr<DBClientCursor> cursor = _db.query(_config.tempNamespace.ns(), BSONObj()); while (cursor->more()) { - ScopedTransaction scopedXact(txn, MODE_X); - Lock::DBLock lock(txn->lockState(), _config.outputOptions.finalNamespace.db(), MODE_X); + ScopedTransaction scopedXact(opCtx, MODE_X); + Lock::DBLock lock( + opCtx->lockState(), _config.outputOptions.finalNamespace.db(), MODE_X); BSONObj o = cursor->nextSafe(); - Helpers::upsert(txn, _config.outputOptions.finalNamespace.ns(), o); + Helpers::upsert(opCtx, _config.outputOptions.finalNamespace.ns(), o); pm.hit(); } _db.dropCollection(_config.tempNamespace.ns()); @@ -700,25 +703,26 @@ long long State::postProcessCollectionNonAtomic(OperationContext* txn, BSONList values; { - const auto count = _collectionCount(txn, _config.tempNamespace, callerHoldsGlobalLock); - stdx::lock_guard<Client> lk(*txn->getClient()); + const auto count = + _collectionCount(opCtx, _config.tempNamespace, callerHoldsGlobalLock); + stdx::lock_guard<Client> lk(*opCtx->getClient()); curOp->setMessage_inlock( "m/r: reduce post processing", "M/R Reduce Post Processing Progress", count); } unique_ptr<DBClientCursor> cursor = _db.query(_config.tempNamespace.ns(), BSONObj()); while (cursor->more()) { - ScopedTransaction transaction(txn, MODE_X); + ScopedTransaction transaction(opCtx, MODE_X); // This must be global because we may write across different databases. - Lock::GlobalWrite lock(txn->lockState()); + Lock::GlobalWrite lock(opCtx->lockState()); BSONObj temp = cursor->nextSafe(); BSONObj old; bool found; { - OldClientContext tx(txn, _config.outputOptions.finalNamespace.ns()); + OldClientContext tx(opCtx, _config.outputOptions.finalNamespace.ns()); Collection* coll = getCollectionOrUassert(tx.db(), _config.outputOptions.finalNamespace); - found = Helpers::findOne(txn, coll, temp["_id"].wrap(), old, true); + found = Helpers::findOne(opCtx, coll, temp["_id"].wrap(), old, true); } if (found) { @@ -726,18 +730,18 @@ long long State::postProcessCollectionNonAtomic(OperationContext* txn, values.clear(); values.push_back(temp); values.push_back(old); - Helpers::upsert(txn, + Helpers::upsert(opCtx, _config.outputOptions.finalNamespace.ns(), _config.reducer->finalReduce(values, _config.finalizer.get())); } else { - Helpers::upsert(txn, _config.outputOptions.finalNamespace.ns(), temp); + Helpers::upsert(opCtx, _config.outputOptions.finalNamespace.ns(), temp); } pm.hit(); } pm.finished(); } - return _collectionCount(txn, _config.outputOptions.finalNamespace, callerHoldsGlobalLock); + return _collectionCount(opCtx, _config.outputOptions.finalNamespace, callerHoldsGlobalLock); } /** @@ -747,11 +751,11 @@ void State::insert(const NamespaceString& nss, const BSONObj& o) { verify(_onDisk); MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { - OldClientWriteContext ctx(_txn, nss.ns()); - WriteUnitOfWork wuow(_txn); + OldClientWriteContext ctx(_opCtx, nss.ns()); + WriteUnitOfWork wuow(_opCtx); uassert(ErrorCodes::PrimarySteppedDown, "no longer primary", - repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(_txn, nss)); + repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(_opCtx, nss)); Collection* coll = getCollectionOrUassert(ctx.db(), nss); BSONObjBuilder b; @@ -761,7 +765,7 @@ void State::insert(const NamespaceString& nss, const BSONObj& o) { b.appendElements(o); BSONObj bo = b.obj(); - StatusWith<BSONObj> res = fixDocumentForInsert(_txn->getServiceContext(), bo); + StatusWith<BSONObj> res = fixDocumentForInsert(_opCtx->getServiceContext(), bo); uassertStatusOK(res.getStatus()); if (!res.getValue().isEmpty()) { bo = res.getValue(); @@ -769,10 +773,10 @@ void State::insert(const NamespaceString& nss, const BSONObj& o) { // TODO: Consider whether to pass OpDebug for stats tracking under SERVER-23261. OpDebug* const nullOpDebug = nullptr; - uassertStatusOK(coll->insertDocument(_txn, bo, nullOpDebug, true)); + uassertStatusOK(coll->insertDocument(_opCtx, bo, nullOpDebug, true)); wuow.commit(); } - MONGO_WRITE_CONFLICT_RETRY_LOOP_END(_txn, "M/R insert", nss.ns()); + MONGO_WRITE_CONFLICT_RETRY_LOOP_END(_opCtx, "M/R insert", nss.ns()); } /** @@ -782,12 +786,12 @@ void State::_insertToInc(BSONObj& o) { verify(_onDisk); MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { - OldClientWriteContext ctx(_txn, _config.incLong.ns()); - WriteUnitOfWork wuow(_txn); + OldClientWriteContext ctx(_opCtx, _config.incLong.ns()); + WriteUnitOfWork wuow(_opCtx); Collection* coll = getCollectionOrUassert(ctx.db(), _config.incLong); - bool shouldReplicateWrites = _txn->writesAreReplicated(); - _txn->setReplicatedWrites(false); - ON_BLOCK_EXIT(&OperationContext::setReplicatedWrites, _txn, shouldReplicateWrites); + bool shouldReplicateWrites = _opCtx->writesAreReplicated(); + _opCtx->setReplicatedWrites(false); + ON_BLOCK_EXIT(&OperationContext::setReplicatedWrites, _opCtx, shouldReplicateWrites); // The documents inserted into the incremental collection are of the form // {"0": <key>, "1": <value>}, so we cannot call fixDocumentForInsert(o) here because the @@ -804,14 +808,20 @@ void State::_insertToInc(BSONObj& o) { // TODO: Consider whether to pass OpDebug for stats tracking under SERVER-23261. OpDebug* const nullOpDebug = nullptr; - uassertStatusOK(coll->insertDocument(_txn, o, nullOpDebug, true, false)); + uassertStatusOK(coll->insertDocument(_opCtx, o, nullOpDebug, true, false)); wuow.commit(); } - MONGO_WRITE_CONFLICT_RETRY_LOOP_END(_txn, "M/R insertToInc", _config.incLong.ns()); + MONGO_WRITE_CONFLICT_RETRY_LOOP_END(_opCtx, "M/R insertToInc", _config.incLong.ns()); } -State::State(OperationContext* txn, const Config& c) - : _config(c), _db(txn), _useIncremental(true), _txn(txn), _size(0), _dupCount(0), _numEmits(0) { +State::State(OperationContext* opCtx, const Config& c) + : _config(c), + _db(opCtx), + _useIncremental(true), + _opCtx(opCtx), + _size(0), + _dupCount(0), + _numEmits(0) { _temp.reset(new InMemory()); _onDisk = _config.outputOptions.outType != Config::INMEMORY; } @@ -849,9 +859,9 @@ void State::init() { const string userToken = AuthorizationSession::get(Client::getCurrent())->getAuthenticatedUserNamesToken(); _scope.reset(getGlobalScriptEngine()->newScopeForCurrentThread()); - _scope->registerOperation(_txn); + _scope->registerOperation(_opCtx); _scope->setLocalDB(_config.dbname); - _scope->loadStored(_txn, true); + _scope->loadStored(_opCtx, true); if (!_config.scopeSetup.isEmpty()) _scope->init(&_config.scopeSetup); @@ -1027,7 +1037,7 @@ BSONObj _nativeToTemp(const BSONObj& args, void* data) { * After calling this method, the temp collection will be completed. * If inline, the results will be in the in memory map */ -void State::finalReduce(OperationContext* txn, CurOp* curOp, ProgressMeterHolder& pm) { +void State::finalReduce(OperationContext* opCtx, CurOp* curOp, ProgressMeterHolder& pm) { if (_jsMode) { // apply the reduce within JS if (_onDisk) { @@ -1066,12 +1076,12 @@ void State::finalReduce(OperationContext* txn, CurOp* curOp, ProgressMeterHolder BSONObj sortKey = BSON("0" << 1); MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { - OldClientWriteContext incCtx(_txn, _config.incLong.ns()); - WriteUnitOfWork wuow(_txn); + OldClientWriteContext incCtx(_opCtx, _config.incLong.ns()); + WriteUnitOfWork wuow(_opCtx); Collection* incColl = getCollectionOrUassert(incCtx.db(), _config.incLong); bool foundIndex = false; - IndexCatalog::IndexIterator ii = incColl->getIndexCatalog()->getIndexIterator(_txn, true); + IndexCatalog::IndexIterator ii = incColl->getIndexCatalog()->getIndexIterator(_opCtx, true); // Iterate over incColl's indexes. while (ii.more()) { IndexDescriptor* currIndex = ii.next(); @@ -1085,28 +1095,28 @@ void State::finalReduce(OperationContext* txn, CurOp* curOp, ProgressMeterHolder verify(foundIndex); wuow.commit(); } - MONGO_WRITE_CONFLICT_RETRY_LOOP_END(_txn, "finalReduce", _config.incLong.ns()); + MONGO_WRITE_CONFLICT_RETRY_LOOP_END(_opCtx, "finalReduce", _config.incLong.ns()); - unique_ptr<AutoGetCollectionForRead> ctx(new AutoGetCollectionForRead(_txn, _config.incLong)); + unique_ptr<AutoGetCollectionForRead> ctx(new AutoGetCollectionForRead(_opCtx, _config.incLong)); BSONObj prev; BSONList all; { const auto count = _db.count(_config.incLong.ns(), BSONObj(), QueryOption_SlaveOk); - stdx::lock_guard<Client> lk(*_txn->getClient()); + stdx::lock_guard<Client> lk(*_opCtx->getClient()); verify(pm == curOp->setMessage_inlock("m/r: (3/3) final reduce to collection", "M/R: (3/3) Final Reduce Progress", count)); } - const ExtensionsCallbackReal extensionsCallback(_txn, &_config.incLong); + const ExtensionsCallbackReal extensionsCallback(_opCtx, &_config.incLong); auto qr = stdx::make_unique<QueryRequest>(_config.incLong); qr->setSort(sortKey); - auto statusWithCQ = CanonicalQuery::canonicalize(txn, std::move(qr), extensionsCallback); + auto statusWithCQ = CanonicalQuery::canonicalize(opCtx, std::move(qr), extensionsCallback); verify(statusWithCQ.isOK()); std::unique_ptr<CanonicalQuery> cq = std::move(statusWithCQ.getValue()); @@ -1114,7 +1124,7 @@ void State::finalReduce(OperationContext* txn, CurOp* curOp, ProgressMeterHolder invariant(coll); auto statusWithPlanExecutor = getExecutor( - _txn, coll, std::move(cq), PlanExecutor::YIELD_AUTO, QueryPlannerParams::NO_TABLE_SCAN); + _opCtx, coll, std::move(cq), PlanExecutor::YIELD_AUTO, QueryPlannerParams::NO_TABLE_SCAN); verify(statusWithPlanExecutor.isOK()); unique_ptr<PlanExecutor> exec = std::move(statusWithPlanExecutor.getValue()); @@ -1130,7 +1140,7 @@ void State::finalReduce(OperationContext* txn, CurOp* curOp, ProgressMeterHolder // object is same as previous, add to array all.push_back(o); if (pm->hits() % 100 == 0) { - _txn->checkForInterrupt(); + _opCtx->checkForInterrupt(); } continue; } @@ -1142,7 +1152,7 @@ void State::finalReduce(OperationContext* txn, CurOp* curOp, ProgressMeterHolder // reduce a finalize array finalReduce(all); - ctx.reset(new AutoGetCollectionForRead(_txn, _config.incLong)); + ctx.reset(new AutoGetCollectionForRead(_opCtx, _config.incLong)); all.clear(); prev = o; @@ -1152,7 +1162,7 @@ void State::finalReduce(OperationContext* txn, CurOp* curOp, ProgressMeterHolder uasserted(34375, "Plan executor killed during mapReduce final reduce"); } - _txn->checkForInterrupt(); + _opCtx->checkForInterrupt(); } uassert(34428, @@ -1162,7 +1172,7 @@ void State::finalReduce(OperationContext* txn, CurOp* curOp, ProgressMeterHolder ctx.reset(); // reduce and finalize last array finalReduce(all); - ctx.reset(new AutoGetCollectionForRead(_txn, _config.incLong)); + ctx.reset(new AutoGetCollectionForRead(_opCtx, _config.incLong)); pm.finished(); } @@ -1247,7 +1257,7 @@ int State::_add(InMemory* im, const BSONObj& a) { void State::reduceAndSpillInMemoryStateIfNeeded() { // Make sure no DB locks are held, because this method manages its own locking and // write units of work. - invariant(!_txn->lockState()->isLocked()); + invariant(!_opCtx->lockState()->isLocked()); if (_jsMode) { // try to reduce if it is beneficial @@ -1362,7 +1372,7 @@ public: addPrivilegesRequiredForMapReduce(this, dbname, cmdObj, out); } - bool run(OperationContext* txn, + bool run(OperationContext* opCtx, const string& dbname, BSONObj& cmd, int, @@ -1372,9 +1382,9 @@ public: boost::optional<DisableDocumentValidation> maybeDisableValidation; if (shouldBypassDocumentValidationForCommand(cmd)) - maybeDisableValidation.emplace(txn); + maybeDisableValidation.emplace(opCtx); - auto client = txn->getClient(); + auto client = opCtx->getClient(); if (client->isInDirectClient()) { return appendCommandStatus( @@ -1382,7 +1392,7 @@ public: Status(ErrorCodes::IllegalOperation, "Cannot run mapReduce command from eval()")); } - auto curOp = CurOp::get(txn); + auto curOp = CurOp::get(opCtx); const Config config(dbname, cmd); @@ -1404,7 +1414,7 @@ public: unique_ptr<RangePreserver> rangePreserver; ScopedCollectionMetadata collMetadata; { - AutoGetCollectionForRead ctx(txn, config.nss); + AutoGetCollectionForRead ctx(opCtx, config.nss); Collection* collection = ctx.getCollection(); if (collection) { @@ -1413,19 +1423,19 @@ public: // Get metadata before we check our version, to make sure it doesn't increment // in the meantime. Need to do this in the same lock scope as the block. - if (ShardingState::get(txn)->needCollectionMetadata(txn, config.nss.ns())) { - collMetadata = CollectionShardingState::get(txn, config.nss)->getMetadata(); + if (ShardingState::get(opCtx)->needCollectionMetadata(opCtx, config.nss.ns())) { + collMetadata = CollectionShardingState::get(opCtx, config.nss)->getMetadata(); } } // Ensure that the RangePreserver is freed under the lock. This is necessary since the // RangePreserver's destructor unpins a ClientCursor, and access to the CursorManager must // be done under the lock. - ON_BLOCK_EXIT([txn, &config, &rangePreserver] { + ON_BLOCK_EXIT([opCtx, &config, &rangePreserver] { if (rangePreserver) { // Be sure not to use AutoGetCollectionForRead here, since that has side-effects // other than lock acquisition. - AutoGetCollection ctx(txn, config.nss, MODE_IS); + AutoGetCollection ctx(opCtx, config.nss, MODE_IS); rangePreserver.reset(); } }); @@ -1434,7 +1444,7 @@ public: BSONObjBuilder countsBuilder; BSONObjBuilder timingBuilder; - State state(txn, config); + State state(opCtx, config); if (!state.sourceExists()) { return appendCommandStatus( result, @@ -1444,7 +1454,7 @@ public: if (state.isOnDisk()) { // this means that it will be doing a write operation, make sure we are on Master // ideally this check should be in slaveOk(), but at that point config is not known - if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor_UNSAFE(txn, + if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor_UNSAFE(opCtx, config.nss)) { errmsg = "not master"; return false; @@ -1460,7 +1470,7 @@ public: bool showTotal = true; if (state.config().filter.isEmpty()) { const bool holdingGlobalLock = false; - const auto count = _collectionCount(txn, config.nss, holdingGlobalLock); + const auto count = _collectionCount(opCtx, config.nss, holdingGlobalLock); progressTotal = (config.limit && (unsigned)config.limit < count) ? config.limit : count; } else { @@ -1469,7 +1479,7 @@ public: progressTotal = 1; } - stdx::unique_lock<Client> lk(*txn->getClient()); + stdx::unique_lock<Client> lk(*opCtx->getClient()); ProgressMeter& progress(curOp->setMessage_inlock( "m/r: (1/3) emit phase", "M/R: (1/3) Emit Progress", progressTotal)); lk.unlock(); @@ -1488,18 +1498,18 @@ public: // useful cursor. // Need lock and context to use it - unique_ptr<ScopedTransaction> scopedXact(new ScopedTransaction(txn, MODE_IS)); - unique_ptr<AutoGetDb> scopedAutoDb(new AutoGetDb(txn, config.nss.db(), MODE_S)); + unique_ptr<ScopedTransaction> scopedXact(new ScopedTransaction(opCtx, MODE_IS)); + unique_ptr<AutoGetDb> scopedAutoDb(new AutoGetDb(opCtx, config.nss.db(), MODE_S)); auto qr = stdx::make_unique<QueryRequest>(config.nss); qr->setFilter(config.filter); qr->setSort(config.sort); qr->setCollation(config.collation); - const ExtensionsCallbackReal extensionsCallback(txn, &config.nss); + const ExtensionsCallbackReal extensionsCallback(opCtx, &config.nss); auto statusWithCQ = - CanonicalQuery::canonicalize(txn, std::move(qr), extensionsCallback); + CanonicalQuery::canonicalize(opCtx, std::move(qr), extensionsCallback); if (!statusWithCQ.isOK()) { uasserted(17238, "Can't canonicalize query " + config.filter.toString()); return 0; @@ -1513,7 +1523,7 @@ public: invariant(coll); auto statusWithPlanExecutor = - getExecutor(txn, coll, std::move(cq), PlanExecutor::YIELD_AUTO); + getExecutor(opCtx, coll, std::move(cq), PlanExecutor::YIELD_AUTO); if (!statusWithPlanExecutor.isOK()) { uasserted(17239, "Can't get executor for query " + config.filter.toString()); @@ -1524,8 +1534,8 @@ public: } { - stdx::lock_guard<Client> lk(*txn->getClient()); - CurOp::get(txn)->setPlanSummary_inlock(Explain::getPlanSummary(exec.get())); + stdx::lock_guard<Client> lk(*opCtx->getClient()); + CurOp::get(opCtx)->setPlanSummary_inlock(Explain::getPlanSummary(exec.get())); } Timer mt; @@ -1568,8 +1578,8 @@ public: state.reduceAndSpillInMemoryStateIfNeeded(); - scopedXact.reset(new ScopedTransaction(txn, MODE_IS)); - scopedAutoDb.reset(new AutoGetDb(txn, config.nss.db(), MODE_S)); + scopedXact.reset(new ScopedTransaction(opCtx, MODE_IS)); + scopedAutoDb.reset(new AutoGetDb(opCtx, config.nss.db(), MODE_S)); if (!exec->restoreState()) { return appendCommandStatus( @@ -1581,7 +1591,7 @@ public: reduceTime += t.micros(); - txn->checkForInterrupt(); + opCtx->checkForInterrupt(); } pm.hit(); @@ -1608,7 +1618,7 @@ public: Collection* coll = scopedAutoDb->getDb()->getCollection(config.nss); invariant(coll); // 'exec' hasn't been killed, so collection must be alive. - coll->infoCache()->notifyOfQuery(txn, stats.indexesUsed); + coll->infoCache()->notifyOfQuery(opCtx, stats.indexesUsed); if (curOp->shouldDBProfile()) { BSONObjBuilder execStatsBob; @@ -1618,7 +1628,7 @@ public: } pm.finished(); - txn->checkForInterrupt(); + opCtx->checkForInterrupt(); // update counters countsBuilder.appendNumber("input", numInputs); @@ -1630,7 +1640,7 @@ public: timingBuilder.append("emitLoop", t.millis()); { - stdx::lock_guard<Client> lk(*txn->getClient()); + stdx::lock_guard<Client> lk(*opCtx->getClient()); curOp->setMessage_inlock("m/r: (2/3) final reduce in memory", "M/R: (2/3) Final In-Memory Reduce Progress"); } @@ -1641,13 +1651,13 @@ public: // if not inline: dump the in memory map to inc collection, all data is on disk state.dumpToInc(); // final reduce - state.finalReduce(txn, curOp, pm); + state.finalReduce(opCtx, curOp, pm); reduceTime += rt.micros(); // Ensure the profile shows the source namespace. If the output was not inline, the // active namespace will be the temporary collection we inserted into. { - stdx::lock_guard<Client> lk(*txn->getClient()); + stdx::lock_guard<Client> lk(*opCtx->getClient()); curOp->setNS_inlock(config.nss.ns()); } @@ -1655,7 +1665,7 @@ public: timingBuilder.appendNumber("reduceTime", reduceTime / 1000); timingBuilder.append("mode", state.jsMode() ? "js" : "mixed"); - long long finalCount = state.postProcessCollection(txn, curOp, pm); + long long finalCount = state.postProcessCollection(opCtx, curOp, pm); state.appendResults(result); timingBuilder.appendNumber("total", t.millis()); @@ -1718,7 +1728,7 @@ public: actions.addAction(ActionType::internal); out->push_back(Privilege(ResourcePattern::forClusterResource(), actions)); } - bool run(OperationContext* txn, + bool run(OperationContext* opCtx, const string& dbname, BSONObj& cmdObj, int, @@ -1734,7 +1744,7 @@ public: boost::optional<DisableDocumentValidation> maybeDisableValidation; if (shouldBypassDocumentValidationForCommand(cmdObj)) - maybeDisableValidation.emplace(txn); + maybeDisableValidation.emplace(opCtx); ShardedConnectionInfo::addHook(); @@ -1754,10 +1764,10 @@ public: inputNS = NamespaceString(dbname, shardedOutputCollection).ns(); } - CurOp* curOp = CurOp::get(txn); + CurOp* curOp = CurOp::get(opCtx); Config config(dbname, cmdObj.firstElement().embeddedObjectUserCheck()); - State state(txn, config); + State state(opCtx, config); state.init(); // no need for incremental collection because records are already sorted @@ -1767,7 +1777,7 @@ public: BSONObj shardCounts = cmdObj["shardCounts"].embeddedObjectUserCheck(); BSONObj counts = cmdObj["counts"].embeddedObjectUserCheck(); - stdx::unique_lock<Client> lk(*txn->getClient()); + stdx::unique_lock<Client> lk(*opCtx->getClient()); ProgressMeterHolder pm(curOp->setMessage_inlock("m/r: merge sort and reduce", "M/R Merge Sort and Reduce Progress")); lk.unlock(); @@ -1781,7 +1791,7 @@ public: std::string server = e.fieldName(); servers.insert(server); - uassertStatusOK(Grid::get(txn)->shardRegistry()->getShard(txn, server)); + uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, server)); } } @@ -1801,7 +1811,7 @@ public: result.append("result", config.outputOptions.collectionName); } - auto scopedDbStatus = ScopedShardDatabase::getExisting(txn, dbname); + auto scopedDbStatus = ScopedShardDatabase::getExisting(opCtx, dbname); if (!scopedDbStatus.isOK()) { return appendCommandStatus(result, scopedDbStatus.getStatus()); } @@ -1812,11 +1822,11 @@ public: if (confOut->isSharded(config.outputOptions.finalNamespace.ns())) { shared_ptr<ChunkManager> cm = - confOut->getChunkManager(txn, config.outputOptions.finalNamespace.ns()); + confOut->getChunkManager(opCtx, config.outputOptions.finalNamespace.ns()); // Fetch result from other shards 1 chunk at a time. It would be better to do just one // big $or query, but then the sorting would not be efficient. - const string shardName = ShardingState::get(txn)->getShardName(); + const string shardName = ShardingState::get(opCtx)->getShardName(); const ChunkMap& chunkMap = cm->getChunkMap(); for (ChunkMap::const_iterator it = chunkMap.begin(); it != chunkMap.end(); ++it) { @@ -1846,7 +1856,7 @@ public: BSONObj sortKey = BSON("_id" << 1); ParallelSortClusteredCursor cursor( servers, inputNS, Query(query).sort(sortKey), QueryOption_NoCursorTimeout); - cursor.init(txn); + cursor.init(opCtx); int chunkSize = 0; while (cursor.more() || !values.empty()) { @@ -1890,7 +1900,7 @@ public: result.append("chunkSizes", chunkSizes.arr()); - long long outputCount = state.postProcessCollection(txn, curOp, pm); + long long outputCount = state.postProcessCollection(opCtx, curOp, pm); state.appendResults(result); BSONObjBuilder countsB(32); |