summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2018-06-08 10:21:37 -0400
committerKaloian Manassiev <kaloian.manassiev@mongodb.com>2018-06-14 17:05:13 -0400
commitf81eb8e9f5166fa6581927945700e5ace4321063 (patch)
treea5906ddc1f4e60f23a2a2040048309b33d3b8373 /src
parentd7db8c9d6f433a462abef8781fdb507681f6b694 (diff)
downloadmongo-f81eb8e9f5166fa6581927945700e5ace4321063.tar.gz
SERVER-35516 Remove usages of OldClient(Write)Context from map/reduce
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/commands/mr.cpp213
1 files changed, 97 insertions, 116 deletions
diff --git a/src/mongo/db/commands/mr.cpp b/src/mongo/db/commands/mr.cpp
index fbb4c8ef9d2..2cf896c516a 100644
--- a/src/mongo/db/commands/mr.cpp
+++ b/src/mongo/db/commands/mr.cpp
@@ -156,13 +156,9 @@ BSONObj _bailFromJS(const BSONObj& args, void* data) {
return BSONObj();
}
-Collection* getCollectionOrUassert(OperationContext* opCtx,
- Database* db,
- const NamespaceString& nss) {
- UninterruptibleLockGuard noInterrupt(opCtx->lockState());
- Collection* out = db ? db->getCollection(opCtx, nss) : NULL;
- uassert(18697, "Collection unexpectedly disappeared: " + nss.ns(), out);
- return out;
+template <class AutoT>
+void assertCollectionNotNull(const NamespaceString& nss, AutoT& autoT) {
+ uassert(18698, "Collection unexpectedly disappeared: " + nss.ns(), autoT.getCollection());
}
} // namespace
@@ -490,23 +486,23 @@ void State::prepTempCollection() {
return;
dropTempCollections();
+
if (_useIncremental) {
// Create the inc collection and make sure we have index on "0" key. The inc collection is
// in the "local" database, so it does not get replicated to secondaries.
writeConflictRetry(_opCtx, "M/R prepTempCollection", _config.incLong.ns(), [this] {
- OldClientWriteContext incCtx(_opCtx, _config.incLong.ns());
- WriteUnitOfWork wuow(_opCtx);
- Collection* incColl = incCtx.getCollection();
- invariant(!incColl);
+ AutoGetOrCreateDb autoGetIncCollDb(_opCtx, _config.incLong.db(), MODE_X);
+ auto const db = autoGetIncCollDb.getDb();
+ invariant(!db->getCollection(_opCtx, _config.incLong));
CollectionOptions options;
options.setNoIdIndex();
options.temp = true;
options.uuid.emplace(UUID::gen());
- incColl = incCtx.db()->createCollection(
+ WriteUnitOfWork wuow(_opCtx);
+ auto incColl = db->createCollection(
_opCtx, _config.incLong.ns(), options, false /* force no _id index */);
- invariant(incColl);
auto rawIndexSpec =
BSON("key" << BSON("0" << 1) << "ns" << _config.incLong.ns() << "name"
@@ -514,16 +510,9 @@ void State::prepTempCollection() {
auto indexSpec = uassertStatusOK(index_key_validate::validateIndexSpec(
_opCtx, rawIndexSpec, _config.incLong, serverGlobalParams.featureCompatibility));
- Status status = incColl->getIndexCatalog()
- ->createIndexOnEmptyCollection(_opCtx, indexSpec)
- .getStatus();
- if (!status.isOK()) {
- uasserted(17305,
- str::stream() << "createIndex failed for mr incLong ns: "
- << _config.incLong.ns()
- << " err: "
- << status.code());
- }
+ uassertStatusOKWithContext(
+ incColl->getIndexCatalog()->createIndexOnEmptyCollection(_opCtx, indexSpec),
+ str::stream() << "createIndex failed for mr incLong ns " << _config.incLong.ns());
wuow.commit();
});
}
@@ -532,9 +521,10 @@ void State::prepTempCollection() {
vector<BSONObj> indexesToInsert;
{
- // copy indexes and collection options into temporary storage
- OldClientWriteContext finalCtx(_opCtx, _config.outputOptions.finalNamespace.ns());
- Collection* const finalColl = finalCtx.getCollection();
+ // Copy indexes and collection options into temporary storage
+ AutoGetCollection autoGetFinalColl(_opCtx, _config.outputOptions.finalNamespace, MODE_IS);
+
+ auto const finalColl = autoGetFinalColl.getCollection();
if (finalColl) {
finalOptions = finalColl->getCatalogEntry()->getCollectionOptions(_opCtx);
@@ -560,17 +550,17 @@ void State::prepTempCollection() {
}
writeConflictRetry(_opCtx, "M/R prepTempCollection", _config.tempNamespace.ns(), [&] {
- // create temp collection and insert the indexes from temporary storage
- OldClientWriteContext tempCtx(_opCtx, _config.tempNamespace.ns());
- WriteUnitOfWork wuow(_opCtx);
+ // Create temp collection and insert the indexes from temporary storage
+ AutoGetOrCreateDb autoGetFinalDb(_opCtx, _config.tempNamespace.db(), MODE_X);
+ auto const db = autoGetFinalDb.getDb();
+ invariant(!db->getCollection(_opCtx, _config.tempNamespace));
+
uassert(
ErrorCodes::PrimarySteppedDown,
str::stream() << "no longer primary while creating temporary collection for mapReduce: "
<< _config.tempNamespace.ns(),
repl::ReplicationCoordinator::get(_opCtx)->canAcceptWritesFor(_opCtx,
_config.tempNamespace));
- Collection* tempColl = tempCtx.getCollection();
- invariant(!tempColl);
CollectionOptions options = finalOptions;
options.temp = true;
@@ -583,26 +573,24 @@ void State::prepTempCollection() {
// Override createCollection's prohibition on creating new replicated collections without an
// _id index.
- bool buildIdIndex = (options.autoIndexId == CollectionOptions::YES ||
- options.autoIndexId == CollectionOptions::DEFAULT);
-
- tempColl = tempCtx.db()->createCollection(
- _opCtx, _config.tempNamespace.ns(), options, buildIdIndex);
-
- for (vector<BSONObj>::iterator it = indexesToInsert.begin(); it != indexesToInsert.end();
- ++it) {
- Status status =
- tempColl->getIndexCatalog()->createIndexOnEmptyCollection(_opCtx, *it).getStatus();
- if (!status.isOK()) {
- if (status.code() == ErrorCodes::IndexAlreadyExists) {
- continue;
- }
- uassertStatusOK(status);
+ const bool buildIdIndex = (options.autoIndexId == CollectionOptions::YES ||
+ options.autoIndexId == CollectionOptions::DEFAULT);
+
+ WriteUnitOfWork wuow(_opCtx);
+ auto const tempColl =
+ db->createCollection(_opCtx, _config.tempNamespace.ns(), options, buildIdIndex);
+
+ for (const auto& indexToInsert : indexesToInsert) {
+ try {
+ uassertStatusOK(tempColl->getIndexCatalog()->createIndexOnEmptyCollection(
+ _opCtx, indexToInsert));
+ } catch (const ExceptionFor<ErrorCodes::IndexAlreadyExists>&) {
+ continue;
}
+
// Log the createIndex operation.
- auto uuid = tempColl->uuid();
- getGlobalServiceContext()->getOpObserver()->onCreateIndex(
- _opCtx, _config.tempNamespace, uuid, *it, false);
+ _opCtx->getServiceContext()->getOpObserver()->onCreateIndex(
+ _opCtx, _config.tempNamespace, tempColl->uuid(), indexToInsert, false);
}
wuow.commit();
});
@@ -763,13 +751,12 @@ long long State::postProcessCollectionNonAtomic(OperationContext* opCtx,
BSONObj temp = cursor->nextSafe();
BSONObj old;
- bool found;
- {
- OldClientContext tx(opCtx, _config.outputOptions.finalNamespace.ns());
- Collection* coll =
- getCollectionOrUassert(opCtx, tx.db(), _config.outputOptions.finalNamespace);
- found = Helpers::findOne(opCtx, coll, temp["_id"].wrap(), old, true);
- }
+ const bool found = [&] {
+ AutoGetCollection autoColl(opCtx, _config.outputOptions.finalNamespace, MODE_IS);
+ assertCollectionNotNull(_config.outputOptions.finalNamespace, autoColl);
+ return Helpers::findOne(
+ opCtx, autoColl.getCollection(), temp["_id"].wrap(), old, true);
+ }();
if (found) {
// need to reduce
@@ -794,11 +781,10 @@ long long State::postProcessCollectionNonAtomic(OperationContext* opCtx,
* Insert doc in collection. This should be replicated.
*/
void State::insert(const NamespaceString& nss, const BSONObj& o) {
- verify(_onDisk);
+ invariant(_onDisk);
writeConflictRetry(_opCtx, "M/R insert", nss.ns(), [this, &nss, &o] {
- OldClientWriteContext ctx(_opCtx, nss.ns());
- WriteUnitOfWork wuow(_opCtx);
+ AutoGetCollection autoColl(_opCtx, nss, MODE_IX);
uassert(
ErrorCodes::PrimarySteppedDown,
str::stream() << "no longer primary while inserting mapReduce result into collection: "
@@ -806,8 +792,9 @@ void State::insert(const NamespaceString& nss, const BSONObj& o) {
<< ": "
<< redact(o),
repl::ReplicationCoordinator::get(_opCtx)->canAcceptWritesFor(_opCtx, nss));
- Collection* coll = getCollectionOrUassert(_opCtx, ctx.db(), nss);
+ assertCollectionNotNull(nss, autoColl);
+ WriteUnitOfWork wuow(_opCtx);
BSONObjBuilder b;
if (!o.hasField("_id")) {
b.appendOID("_id", NULL, true);
@@ -815,15 +802,15 @@ void State::insert(const NamespaceString& nss, const BSONObj& o) {
b.appendElements(o);
BSONObj bo = b.obj();
- StatusWith<BSONObj> res = fixDocumentForInsert(_opCtx->getServiceContext(), bo);
- uassertStatusOK(res.getStatus());
- if (!res.getValue().isEmpty()) {
- bo = res.getValue();
+ auto fixedDoc = uassertStatusOK(fixDocumentForInsert(_opCtx->getServiceContext(), bo));
+ if (!fixedDoc.isEmpty()) {
+ bo = fixedDoc;
}
// TODO: Consider whether to pass OpDebug for stats tracking under SERVER-23261.
OpDebug* const nullOpDebug = nullptr;
- uassertStatusOK(coll->insertDocument(_opCtx, InsertStatement(bo), nullOpDebug, true));
+ uassertStatusOK(autoColl.getCollection()->insertDocument(
+ _opCtx, InsertStatement(bo), nullOpDebug, true));
wuow.commit();
});
}
@@ -836,10 +823,10 @@ void State::_insertToInc(BSONObj& o) {
verify(_onDisk);
writeConflictRetry(_opCtx, "M/R insertToInc", _config.incLong.ns(), [this, &o] {
- OldClientWriteContext ctx(_opCtx, _config.incLong.ns());
- WriteUnitOfWork wuow(_opCtx);
- Collection* coll = getCollectionOrUassert(_opCtx, ctx.db(), _config.incLong);
+ AutoGetCollection autoColl(_opCtx, _config.incLong, MODE_IX);
+ assertCollectionNotNull(_config.incLong, autoColl);
+ WriteUnitOfWork wuow(_opCtx);
// The documents inserted into the incremental collection are of the form
// {"0": <key>, "1": <value>}, so we cannot call fixDocumentForInsert(o) here because the
// check that the document has an "_id" field would fail. Instead, we directly verify that
@@ -855,7 +842,8 @@ void State::_insertToInc(BSONObj& o) {
// TODO: Consider whether to pass OpDebug for stats tracking under SERVER-23261.
OpDebug* const nullOpDebug = nullptr;
- uassertStatusOK(coll->insertDocument(_opCtx, InsertStatement(o), nullOpDebug, true, false));
+ uassertStatusOK(autoColl.getCollection()->insertDocument(
+ _opCtx, InsertStatement(o), nullOpDebug, true, false));
wuow.commit();
});
}
@@ -1119,13 +1107,13 @@ void State::finalReduce(OperationContext* opCtx, CurOp* curOp, ProgressMeterHold
verify(_temp->size() == 0);
BSONObj sortKey = BSON("0" << 1);
- writeConflictRetry(_opCtx, "finalReduce", _config.incLong.ns(), [&] {
- OldClientWriteContext incCtx(_opCtx, _config.incLong.ns());
- WriteUnitOfWork wuow(_opCtx);
- Collection* incColl = getCollectionOrUassert(_opCtx, incCtx.db(), _config.incLong);
+ {
+ AutoGetCollection autoIncColl(_opCtx, _config.incLong, MODE_IS);
+ assertCollectionNotNull(_config.incLong, autoIncColl);
bool foundIndex = false;
- IndexCatalog::IndexIterator ii = incColl->getIndexCatalog()->getIndexIterator(_opCtx, true);
+ IndexCatalog::IndexIterator ii =
+ autoIncColl.getCollection()->getIndexCatalog()->getIndexIterator(_opCtx, true);
// Iterate over incColl's indexes.
while (ii.more()) {
IndexDescriptor* currIndex = ii.next();
@@ -1136,12 +1124,12 @@ void State::finalReduce(OperationContext* opCtx, CurOp* curOp, ProgressMeterHold
}
}
- verify(foundIndex);
- wuow.commit();
- });
+ invariant(foundIndex);
+ }
- unique_ptr<AutoGetCollectionForReadCommand> ctx(
- new AutoGetCollectionForReadCommand(_opCtx, _config.incLong));
+ boost::optional<AutoGetCollectionForReadCommand> ctx;
+ ctx.emplace(_opCtx, _config.incLong);
+ assertCollectionNotNull(_config.incLong, *ctx);
BSONObj prev;
BSONList all;
@@ -1170,11 +1158,11 @@ void State::finalReduce(OperationContext* opCtx, CurOp* curOp, ProgressMeterHold
verify(statusWithCQ.isOK());
std::unique_ptr<CanonicalQuery> cq = std::move(statusWithCQ.getValue());
- Collection* coll = getCollectionOrUassert(opCtx, ctx->getDb(), _config.incLong);
- invariant(coll);
-
- auto exec = uassertStatusOK(getExecutor(
- _opCtx, coll, std::move(cq), PlanExecutor::YIELD_AUTO, QueryPlannerParams::NO_TABLE_SCAN));
+ auto exec = uassertStatusOK(getExecutor(_opCtx,
+ ctx->getCollection(),
+ std::move(cq),
+ PlanExecutor::YIELD_AUTO,
+ QueryPlannerParams::NO_TABLE_SCAN));
// Make sure the PlanExecutor is destroyed while holding a collection lock.
ON_BLOCK_EXIT([&exec, &ctx, opCtx, this] {
@@ -1206,8 +1194,7 @@ void State::finalReduce(OperationContext* opCtx, CurOp* curOp, ProgressMeterHold
// reduce a finalize array
finalReduce(all);
-
- ctx.reset(new AutoGetCollectionForReadCommand(_opCtx, _config.incLong));
+ ctx.emplace(_opCtx, _config.incLong);
all.clear();
prev = o;
@@ -1222,9 +1209,10 @@ void State::finalReduce(OperationContext* opCtx, CurOp* curOp, ProgressMeterHold
PlanExecutor::IS_EOF == state);
ctx.reset();
+
// reduce and finalize last array
finalReduce(all);
- ctx.reset(new AutoGetCollectionForReadCommand(_opCtx, _config.incLong));
+ ctx.emplace(_opCtx, _config.incLong);
pm.finished();
}
@@ -1462,20 +1450,20 @@ public:
long long numInputs = 0;
{
- // We've got a cursor preventing migrations off, now re-establish our
- // useful cursor.
+ // We've got a cursor preventing migrations off, now re-establish our useful cursor
// Need lock and context to use it
- unique_ptr<AutoGetDb> scopedAutoDb(new AutoGetDb(opCtx, config.nss.db(), MODE_S));
+ boost::optional<AutoGetCollection> scopedAutoColl;
+ scopedAutoColl.emplace(opCtx, config.nss, MODE_S);
+ assertCollectionNotNull(config.nss, *scopedAutoColl);
if (state.isOnDisk()) {
- // this means that it will be doing a write operation, make sure it is safe to
- // do so.
- if (!repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesFor(opCtx,
- config.nss)) {
- uasserted(ErrorCodes::NotMaster, "not master");
- return false;
- }
+ // This means that it will be doing a write operation, make sure it is safe to
+ // do so
+ uassert(ErrorCodes::NotMaster,
+ "not master",
+ repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesFor(
+ opCtx, config.nss));
}
auto qr = stdx::make_unique<QueryRequest>(config.nss);
@@ -1498,20 +1486,16 @@ public:
}
std::unique_ptr<CanonicalQuery> cq = std::move(statusWithCQ.getValue());
- unique_ptr<PlanExecutor, PlanExecutor::Deleter> exec;
- {
- Database* db = scopedAutoDb->getDb();
- Collection* coll = getCollectionOrUassert(opCtx, db, config.nss);
- invariant(coll);
-
- exec = uassertStatusOK(
- getExecutor(opCtx, coll, std::move(cq), PlanExecutor::YIELD_AUTO, 0));
- }
+ auto exec = uassertStatusOK(getExecutor(opCtx,
+ scopedAutoColl->getCollection(),
+ std::move(cq),
+ PlanExecutor::YIELD_AUTO,
+ 0));
// Make sure the PlanExecutor is destroyed while holding the necessary locks.
- ON_BLOCK_EXIT([&exec, &scopedAutoDb, opCtx, &config] {
- if (!scopedAutoDb) {
- scopedAutoDb = stdx::make_unique<AutoGetDb>(opCtx, config.nss.db(), MODE_S);
+ ON_BLOCK_EXIT([&exec, &scopedAutoColl, opCtx, &config] {
+ if (!scopedAutoColl) {
+ AutoGetDb autoDb(opCtx, config.nss.db(), MODE_S);
exec.reset();
}
});
@@ -1557,11 +1541,10 @@ public:
// it only happens if necessary.
exec->saveState();
- scopedAutoDb.reset();
+ scopedAutoColl.reset();
state.reduceAndSpillInMemoryStateIfNeeded();
-
- scopedAutoDb.reset(new AutoGetDb(opCtx, config.nss.db(), MODE_S));
+ scopedAutoColl.emplace(opCtx, config.nss, MODE_S);
auto restoreStatus = exec->restoreState();
uassertStatusOK(restoreStatus);
@@ -1590,10 +1573,8 @@ public:
// TODO SERVER-23261: Confirm whether this is the correct place to gather all
// metrics. There is no harm adding here for the time being.
curOp->debug().setPlanSummaryMetrics(stats);
-
- Collection* coll = scopedAutoDb->getDb()->getCollection(opCtx, config.nss);
- invariant(coll); // 'exec' hasn't been killed, so collection must be alive.
- coll->infoCache()->notifyOfQuery(opCtx, stats.indexesUsed);
+ scopedAutoColl->getCollection()->infoCache()->notifyOfQuery(opCtx,
+ stats.indexesUsed);
if (curOp->shouldDBProfile()) {
BSONObjBuilder execStatsBob;