diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2018-06-08 10:21:37 -0400 |
---|---|---|
committer | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2018-06-14 17:05:13 -0400 |
commit | f81eb8e9f5166fa6581927945700e5ace4321063 (patch) | |
tree | a5906ddc1f4e60f23a2a2040048309b33d3b8373 /src | |
parent | d7db8c9d6f433a462abef8781fdb507681f6b694 (diff) | |
download | mongo-f81eb8e9f5166fa6581927945700e5ace4321063.tar.gz |
SERVER-35516 Remove usages of OldClient(Write)Context from map/reduce
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/commands/mr.cpp | 213 |
1 files changed, 97 insertions, 116 deletions
diff --git a/src/mongo/db/commands/mr.cpp b/src/mongo/db/commands/mr.cpp index fbb4c8ef9d2..2cf896c516a 100644 --- a/src/mongo/db/commands/mr.cpp +++ b/src/mongo/db/commands/mr.cpp @@ -156,13 +156,9 @@ BSONObj _bailFromJS(const BSONObj& args, void* data) { return BSONObj(); } -Collection* getCollectionOrUassert(OperationContext* opCtx, - Database* db, - const NamespaceString& nss) { - UninterruptibleLockGuard noInterrupt(opCtx->lockState()); - Collection* out = db ? db->getCollection(opCtx, nss) : NULL; - uassert(18697, "Collection unexpectedly disappeared: " + nss.ns(), out); - return out; +template <class AutoT> +void assertCollectionNotNull(const NamespaceString& nss, AutoT& autoT) { + uassert(18698, "Collection unexpectedly disappeared: " + nss.ns(), autoT.getCollection()); } } // namespace @@ -490,23 +486,23 @@ void State::prepTempCollection() { return; dropTempCollections(); + if (_useIncremental) { // Create the inc collection and make sure we have index on "0" key. The inc collection is // in the "local" database, so it does not get replicated to secondaries. writeConflictRetry(_opCtx, "M/R prepTempCollection", _config.incLong.ns(), [this] { - OldClientWriteContext incCtx(_opCtx, _config.incLong.ns()); - WriteUnitOfWork wuow(_opCtx); - Collection* incColl = incCtx.getCollection(); - invariant(!incColl); + AutoGetOrCreateDb autoGetIncCollDb(_opCtx, _config.incLong.db(), MODE_X); + auto const db = autoGetIncCollDb.getDb(); + invariant(!db->getCollection(_opCtx, _config.incLong)); CollectionOptions options; options.setNoIdIndex(); options.temp = true; options.uuid.emplace(UUID::gen()); - incColl = incCtx.db()->createCollection( + WriteUnitOfWork wuow(_opCtx); + auto incColl = db->createCollection( _opCtx, _config.incLong.ns(), options, false /* force no _id index */); - invariant(incColl); auto rawIndexSpec = BSON("key" << BSON("0" << 1) << "ns" << _config.incLong.ns() << "name" @@ -514,16 +510,9 @@ void State::prepTempCollection() { auto indexSpec = uassertStatusOK(index_key_validate::validateIndexSpec( _opCtx, rawIndexSpec, _config.incLong, serverGlobalParams.featureCompatibility)); - Status status = incColl->getIndexCatalog() - ->createIndexOnEmptyCollection(_opCtx, indexSpec) - .getStatus(); - if (!status.isOK()) { - uasserted(17305, - str::stream() << "createIndex failed for mr incLong ns: " - << _config.incLong.ns() - << " err: " - << status.code()); - } + uassertStatusOKWithContext( + incColl->getIndexCatalog()->createIndexOnEmptyCollection(_opCtx, indexSpec), + str::stream() << "createIndex failed for mr incLong ns " << _config.incLong.ns()); wuow.commit(); }); } @@ -532,9 +521,10 @@ void State::prepTempCollection() { vector<BSONObj> indexesToInsert; { - // copy indexes and collection options into temporary storage - OldClientWriteContext finalCtx(_opCtx, _config.outputOptions.finalNamespace.ns()); - Collection* const finalColl = finalCtx.getCollection(); + // Copy indexes and collection options into temporary storage + AutoGetCollection autoGetFinalColl(_opCtx, _config.outputOptions.finalNamespace, MODE_IS); + + auto const finalColl = autoGetFinalColl.getCollection(); if (finalColl) { finalOptions = finalColl->getCatalogEntry()->getCollectionOptions(_opCtx); @@ -560,17 +550,17 @@ void State::prepTempCollection() { } writeConflictRetry(_opCtx, "M/R prepTempCollection", _config.tempNamespace.ns(), [&] { - // create temp collection and insert the indexes from temporary storage - OldClientWriteContext tempCtx(_opCtx, _config.tempNamespace.ns()); - WriteUnitOfWork wuow(_opCtx); + // Create temp collection and insert the indexes from temporary storage + AutoGetOrCreateDb autoGetFinalDb(_opCtx, _config.tempNamespace.db(), MODE_X); + auto const db = autoGetFinalDb.getDb(); + invariant(!db->getCollection(_opCtx, _config.tempNamespace)); + uassert( ErrorCodes::PrimarySteppedDown, str::stream() << "no longer primary while creating temporary collection for mapReduce: " << _config.tempNamespace.ns(), repl::ReplicationCoordinator::get(_opCtx)->canAcceptWritesFor(_opCtx, _config.tempNamespace)); - Collection* tempColl = tempCtx.getCollection(); - invariant(!tempColl); CollectionOptions options = finalOptions; options.temp = true; @@ -583,26 +573,24 @@ void State::prepTempCollection() { // Override createCollection's prohibition on creating new replicated collections without an // _id index. - bool buildIdIndex = (options.autoIndexId == CollectionOptions::YES || - options.autoIndexId == CollectionOptions::DEFAULT); - - tempColl = tempCtx.db()->createCollection( - _opCtx, _config.tempNamespace.ns(), options, buildIdIndex); - - for (vector<BSONObj>::iterator it = indexesToInsert.begin(); it != indexesToInsert.end(); - ++it) { - Status status = - tempColl->getIndexCatalog()->createIndexOnEmptyCollection(_opCtx, *it).getStatus(); - if (!status.isOK()) { - if (status.code() == ErrorCodes::IndexAlreadyExists) { - continue; - } - uassertStatusOK(status); + const bool buildIdIndex = (options.autoIndexId == CollectionOptions::YES || + options.autoIndexId == CollectionOptions::DEFAULT); + + WriteUnitOfWork wuow(_opCtx); + auto const tempColl = + db->createCollection(_opCtx, _config.tempNamespace.ns(), options, buildIdIndex); + + for (const auto& indexToInsert : indexesToInsert) { + try { + uassertStatusOK(tempColl->getIndexCatalog()->createIndexOnEmptyCollection( + _opCtx, indexToInsert)); + } catch (const ExceptionFor<ErrorCodes::IndexAlreadyExists>&) { + continue; } + // Log the createIndex operation. - auto uuid = tempColl->uuid(); - getGlobalServiceContext()->getOpObserver()->onCreateIndex( - _opCtx, _config.tempNamespace, uuid, *it, false); + _opCtx->getServiceContext()->getOpObserver()->onCreateIndex( + _opCtx, _config.tempNamespace, tempColl->uuid(), indexToInsert, false); } wuow.commit(); }); @@ -763,13 +751,12 @@ long long State::postProcessCollectionNonAtomic(OperationContext* opCtx, BSONObj temp = cursor->nextSafe(); BSONObj old; - bool found; - { - OldClientContext tx(opCtx, _config.outputOptions.finalNamespace.ns()); - Collection* coll = - getCollectionOrUassert(opCtx, tx.db(), _config.outputOptions.finalNamespace); - found = Helpers::findOne(opCtx, coll, temp["_id"].wrap(), old, true); - } + const bool found = [&] { + AutoGetCollection autoColl(opCtx, _config.outputOptions.finalNamespace, MODE_IS); + assertCollectionNotNull(_config.outputOptions.finalNamespace, autoColl); + return Helpers::findOne( + opCtx, autoColl.getCollection(), temp["_id"].wrap(), old, true); + }(); if (found) { // need to reduce @@ -794,11 +781,10 @@ long long State::postProcessCollectionNonAtomic(OperationContext* opCtx, * Insert doc in collection. This should be replicated. */ void State::insert(const NamespaceString& nss, const BSONObj& o) { - verify(_onDisk); + invariant(_onDisk); writeConflictRetry(_opCtx, "M/R insert", nss.ns(), [this, &nss, &o] { - OldClientWriteContext ctx(_opCtx, nss.ns()); - WriteUnitOfWork wuow(_opCtx); + AutoGetCollection autoColl(_opCtx, nss, MODE_IX); uassert( ErrorCodes::PrimarySteppedDown, str::stream() << "no longer primary while inserting mapReduce result into collection: " @@ -806,8 +792,9 @@ void State::insert(const NamespaceString& nss, const BSONObj& o) { << ": " << redact(o), repl::ReplicationCoordinator::get(_opCtx)->canAcceptWritesFor(_opCtx, nss)); - Collection* coll = getCollectionOrUassert(_opCtx, ctx.db(), nss); + assertCollectionNotNull(nss, autoColl); + WriteUnitOfWork wuow(_opCtx); BSONObjBuilder b; if (!o.hasField("_id")) { b.appendOID("_id", NULL, true); @@ -815,15 +802,15 @@ void State::insert(const NamespaceString& nss, const BSONObj& o) { b.appendElements(o); BSONObj bo = b.obj(); - StatusWith<BSONObj> res = fixDocumentForInsert(_opCtx->getServiceContext(), bo); - uassertStatusOK(res.getStatus()); - if (!res.getValue().isEmpty()) { - bo = res.getValue(); + auto fixedDoc = uassertStatusOK(fixDocumentForInsert(_opCtx->getServiceContext(), bo)); + if (!fixedDoc.isEmpty()) { + bo = fixedDoc; } // TODO: Consider whether to pass OpDebug for stats tracking under SERVER-23261. OpDebug* const nullOpDebug = nullptr; - uassertStatusOK(coll->insertDocument(_opCtx, InsertStatement(bo), nullOpDebug, true)); + uassertStatusOK(autoColl.getCollection()->insertDocument( + _opCtx, InsertStatement(bo), nullOpDebug, true)); wuow.commit(); }); } @@ -836,10 +823,10 @@ void State::_insertToInc(BSONObj& o) { verify(_onDisk); writeConflictRetry(_opCtx, "M/R insertToInc", _config.incLong.ns(), [this, &o] { - OldClientWriteContext ctx(_opCtx, _config.incLong.ns()); - WriteUnitOfWork wuow(_opCtx); - Collection* coll = getCollectionOrUassert(_opCtx, ctx.db(), _config.incLong); + AutoGetCollection autoColl(_opCtx, _config.incLong, MODE_IX); + assertCollectionNotNull(_config.incLong, autoColl); + WriteUnitOfWork wuow(_opCtx); // The documents inserted into the incremental collection are of the form // {"0": <key>, "1": <value>}, so we cannot call fixDocumentForInsert(o) here because the // check that the document has an "_id" field would fail. Instead, we directly verify that @@ -855,7 +842,8 @@ void State::_insertToInc(BSONObj& o) { // TODO: Consider whether to pass OpDebug for stats tracking under SERVER-23261. OpDebug* const nullOpDebug = nullptr; - uassertStatusOK(coll->insertDocument(_opCtx, InsertStatement(o), nullOpDebug, true, false)); + uassertStatusOK(autoColl.getCollection()->insertDocument( + _opCtx, InsertStatement(o), nullOpDebug, true, false)); wuow.commit(); }); } @@ -1119,13 +1107,13 @@ void State::finalReduce(OperationContext* opCtx, CurOp* curOp, ProgressMeterHold verify(_temp->size() == 0); BSONObj sortKey = BSON("0" << 1); - writeConflictRetry(_opCtx, "finalReduce", _config.incLong.ns(), [&] { - OldClientWriteContext incCtx(_opCtx, _config.incLong.ns()); - WriteUnitOfWork wuow(_opCtx); - Collection* incColl = getCollectionOrUassert(_opCtx, incCtx.db(), _config.incLong); + { + AutoGetCollection autoIncColl(_opCtx, _config.incLong, MODE_IS); + assertCollectionNotNull(_config.incLong, autoIncColl); bool foundIndex = false; - IndexCatalog::IndexIterator ii = incColl->getIndexCatalog()->getIndexIterator(_opCtx, true); + IndexCatalog::IndexIterator ii = + autoIncColl.getCollection()->getIndexCatalog()->getIndexIterator(_opCtx, true); // Iterate over incColl's indexes. while (ii.more()) { IndexDescriptor* currIndex = ii.next(); @@ -1136,12 +1124,12 @@ void State::finalReduce(OperationContext* opCtx, CurOp* curOp, ProgressMeterHold } } - verify(foundIndex); - wuow.commit(); - }); + invariant(foundIndex); + } - unique_ptr<AutoGetCollectionForReadCommand> ctx( - new AutoGetCollectionForReadCommand(_opCtx, _config.incLong)); + boost::optional<AutoGetCollectionForReadCommand> ctx; + ctx.emplace(_opCtx, _config.incLong); + assertCollectionNotNull(_config.incLong, *ctx); BSONObj prev; BSONList all; @@ -1170,11 +1158,11 @@ void State::finalReduce(OperationContext* opCtx, CurOp* curOp, ProgressMeterHold verify(statusWithCQ.isOK()); std::unique_ptr<CanonicalQuery> cq = std::move(statusWithCQ.getValue()); - Collection* coll = getCollectionOrUassert(opCtx, ctx->getDb(), _config.incLong); - invariant(coll); - - auto exec = uassertStatusOK(getExecutor( - _opCtx, coll, std::move(cq), PlanExecutor::YIELD_AUTO, QueryPlannerParams::NO_TABLE_SCAN)); + auto exec = uassertStatusOK(getExecutor(_opCtx, + ctx->getCollection(), + std::move(cq), + PlanExecutor::YIELD_AUTO, + QueryPlannerParams::NO_TABLE_SCAN)); // Make sure the PlanExecutor is destroyed while holding a collection lock. ON_BLOCK_EXIT([&exec, &ctx, opCtx, this] { @@ -1206,8 +1194,7 @@ void State::finalReduce(OperationContext* opCtx, CurOp* curOp, ProgressMeterHold // reduce a finalize array finalReduce(all); - - ctx.reset(new AutoGetCollectionForReadCommand(_opCtx, _config.incLong)); + ctx.emplace(_opCtx, _config.incLong); all.clear(); prev = o; @@ -1222,9 +1209,10 @@ void State::finalReduce(OperationContext* opCtx, CurOp* curOp, ProgressMeterHold PlanExecutor::IS_EOF == state); ctx.reset(); + // reduce and finalize last array finalReduce(all); - ctx.reset(new AutoGetCollectionForReadCommand(_opCtx, _config.incLong)); + ctx.emplace(_opCtx, _config.incLong); pm.finished(); } @@ -1462,20 +1450,20 @@ public: long long numInputs = 0; { - // We've got a cursor preventing migrations off, now re-establish our - // useful cursor. + // We've got a cursor preventing migrations off, now re-establish our useful cursor // Need lock and context to use it - unique_ptr<AutoGetDb> scopedAutoDb(new AutoGetDb(opCtx, config.nss.db(), MODE_S)); + boost::optional<AutoGetCollection> scopedAutoColl; + scopedAutoColl.emplace(opCtx, config.nss, MODE_S); + assertCollectionNotNull(config.nss, *scopedAutoColl); if (state.isOnDisk()) { - // this means that it will be doing a write operation, make sure it is safe to - // do so. - if (!repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesFor(opCtx, - config.nss)) { - uasserted(ErrorCodes::NotMaster, "not master"); - return false; - } + // This means that it will be doing a write operation, make sure it is safe to + // do so + uassert(ErrorCodes::NotMaster, + "not master", + repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesFor( + opCtx, config.nss)); } auto qr = stdx::make_unique<QueryRequest>(config.nss); @@ -1498,20 +1486,16 @@ public: } std::unique_ptr<CanonicalQuery> cq = std::move(statusWithCQ.getValue()); - unique_ptr<PlanExecutor, PlanExecutor::Deleter> exec; - { - Database* db = scopedAutoDb->getDb(); - Collection* coll = getCollectionOrUassert(opCtx, db, config.nss); - invariant(coll); - - exec = uassertStatusOK( - getExecutor(opCtx, coll, std::move(cq), PlanExecutor::YIELD_AUTO, 0)); - } + auto exec = uassertStatusOK(getExecutor(opCtx, + scopedAutoColl->getCollection(), + std::move(cq), + PlanExecutor::YIELD_AUTO, + 0)); // Make sure the PlanExecutor is destroyed while holding the necessary locks. - ON_BLOCK_EXIT([&exec, &scopedAutoDb, opCtx, &config] { - if (!scopedAutoDb) { - scopedAutoDb = stdx::make_unique<AutoGetDb>(opCtx, config.nss.db(), MODE_S); + ON_BLOCK_EXIT([&exec, &scopedAutoColl, opCtx, &config] { + if (!scopedAutoColl) { + AutoGetDb autoDb(opCtx, config.nss.db(), MODE_S); exec.reset(); } }); @@ -1557,11 +1541,10 @@ public: // it only happens if necessary. exec->saveState(); - scopedAutoDb.reset(); + scopedAutoColl.reset(); state.reduceAndSpillInMemoryStateIfNeeded(); - - scopedAutoDb.reset(new AutoGetDb(opCtx, config.nss.db(), MODE_S)); + scopedAutoColl.emplace(opCtx, config.nss, MODE_S); auto restoreStatus = exec->restoreState(); uassertStatusOK(restoreStatus); @@ -1590,10 +1573,8 @@ public: // TODO SERVER-23261: Confirm whether this is the correct place to gather all // metrics. There is no harm adding here for the time being. curOp->debug().setPlanSummaryMetrics(stats); - - Collection* coll = scopedAutoDb->getDb()->getCollection(opCtx, config.nss); - invariant(coll); // 'exec' hasn't been killed, so collection must be alive. - coll->infoCache()->notifyOfQuery(opCtx, stats.indexesUsed); + scopedAutoColl->getCollection()->infoCache()->notifyOfQuery(opCtx, + stats.indexesUsed); if (curOp->shouldDBProfile()) { BSONObjBuilder execStatsBob; |