diff options
author | Justin Seyster <justin.seyster@mongodb.com> | 2019-01-31 15:55:02 -0500 |
---|---|---|
committer | Justin Seyster <justin.seyster@mongodb.com> | 2019-01-31 15:55:02 -0500 |
commit | 57b22a11d206272a78124ee03c5a6cf26b3e1105 (patch) | |
tree | 78363850f4be0ba14cf2272abbfb51b9f57393f5 /src/mongo | |
parent | a0aef148ed113bf66ca4c8ab37864455524be2a2 (diff) | |
download | mongo-57b22a11d206272a78124ee03c5a6cf26b3e1105.tar.gz |
SERVER-38480 Make Map-Reduce fully interruptible
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/commands/mr.cpp | 130 | ||||
-rw-r--r-- | src/mongo/db/commands/mr.h | 7 |
2 files changed, 63 insertions, 74 deletions
diff --git a/src/mongo/db/commands/mr.cpp b/src/mongo/db/commands/mr.cpp index 23c7261d18d..716ffce1d7b 100644 --- a/src/mongo/db/commands/mr.cpp +++ b/src/mongo/db/commands/mr.cpp @@ -163,6 +163,50 @@ void assertCollectionNotNull(const NamespaceString& nss, AutoT& autoT) { uassert(18698, "Collection unexpectedly disappeared: " + nss.ns(), autoT.getCollection()); } +/** + * Clean up the temporary and incremental collections + */ +void dropTempCollections(OperationContext* cleanupOpCtx, + const NamespaceString& tempNamespace, + const NamespaceString& incLong) { + if (!tempNamespace.isEmpty()) { + writeConflictRetry( + cleanupOpCtx, + "M/R dropTempCollections", + tempNamespace.ns(), + [cleanupOpCtx, &tempNamespace] { + AutoGetDb autoDb(cleanupOpCtx, tempNamespace.db(), MODE_X); + if (auto db = autoDb.getDb()) { + WriteUnitOfWork wunit(cleanupOpCtx); + uassert(ErrorCodes::PrimarySteppedDown, + str::stream() << "no longer primary while dropping temporary " + "collection for mapReduce: " + << tempNamespace.ns(), + repl::ReplicationCoordinator::get(cleanupOpCtx) + ->canAcceptWritesFor(cleanupOpCtx, tempNamespace)); + uassertStatusOK(db->dropCollection(cleanupOpCtx, tempNamespace.ns())); + wunit.commit(); + } + }); + // Always forget about temporary namespaces, so we don't cache lots of them + ShardConnection::forgetNS(tempNamespace.ns()); + } + if (!incLong.isEmpty()) { + writeConflictRetry( + cleanupOpCtx, "M/R dropTempCollections", incLong.ns(), [cleanupOpCtx, &incLong] { + Lock::DBLock lk(cleanupOpCtx, incLong.db(), MODE_X); + auto databaseHolder = DatabaseHolder::get(cleanupOpCtx); + if (auto db = databaseHolder->getDb(cleanupOpCtx, incLong.ns())) { + WriteUnitOfWork wunit(cleanupOpCtx); + uassertStatusOK(db->dropCollection(cleanupOpCtx, incLong.ns())); + wunit.commit(); + } + }); + + ShardConnection::forgetNS(incLong.ns()); + } +} + } // namespace AtomicWord<unsigned> Config::JOB_NUMBER; @@ -440,54 +484,14 @@ Config::Config(const string& _dbname, const BSONObj& cmdObj) { } /** - * Clean up the temporary and incremental collections - */ -void State::dropTempCollections() { - // The cleanup handler should not be interruptible. - UninterruptibleLockGuard noInterrupt(_opCtx->lockState()); - - if (!_config.tempNamespace.isEmpty()) { - writeConflictRetry(_opCtx, "M/R dropTempCollections", _config.tempNamespace.ns(), [this] { - AutoGetDb autoDb(_opCtx, _config.tempNamespace.db(), MODE_X); - if (auto db = autoDb.getDb()) { - WriteUnitOfWork wunit(_opCtx); - uassert( - ErrorCodes::PrimarySteppedDown, - str::stream() - << "no longer primary while dropping temporary collection for mapReduce: " - << _config.tempNamespace.ns(), - repl::ReplicationCoordinator::get(_opCtx)->canAcceptWritesFor( - _opCtx, _config.tempNamespace)); - uassertStatusOK(db->dropCollection(_opCtx, _config.tempNamespace.ns())); - wunit.commit(); - } - }); - // Always forget about temporary namespaces, so we don't cache lots of them - ShardConnection::forgetNS(_config.tempNamespace.ns()); - } - if (_useIncremental && !_config.incLong.isEmpty()) { - writeConflictRetry(_opCtx, "M/R dropTempCollections", _config.incLong.ns(), [this] { - Lock::DBLock lk(_opCtx, _config.incLong.db(), MODE_X); - auto databaseHolder = DatabaseHolder::get(_opCtx); - if (auto db = databaseHolder->getDb(_opCtx, _config.incLong.ns())) { - WriteUnitOfWork wunit(_opCtx); - uassertStatusOK(db->dropCollection(_opCtx, _config.incLong.ns())); - wunit.commit(); - } - }); - - ShardConnection::forgetNS(_config.incLong.ns()); - } -} - -/** * Create temporary collection, set up indexes */ void State::prepTempCollection() { if (!_onDisk) return; - dropTempCollections(); + dropTempCollections( + _opCtx, _config.tempNamespace, _useIncremental ? _config.incLong : NamespaceString()); if (_useIncremental) { // Create the inc collection and make sure we have index on "0" key. The inc collection is @@ -877,7 +881,23 @@ bool State::sourceExists() { State::~State() { if (_onDisk) { try { - dropTempCollections(); + // If we're here because the map-reduce got interrupted, any attempt to drop temporary + // collections within the same operation context is guaranteed to fail as soon as we try + // to take the X-lock for the database. (An UninterruptibleLockGuard would allow + // dropTempCollections() to take the locks it needs, but taking an X-lock in + // UninterruptibleLockGuard context is not allowed.) + // + // We don't want every single interrupted map-reduce to leak temporary collections that + // will stick around until a server restart, so we execute the cleanup as though it's a + // new operation, by constructing a new Client and OperationContext. It's possible that + // the new operation will also get interrupted, but dropTempCollections() is short + // lived, so the odds are acceptably low. + auto cleanupClient = _opCtx->getServiceContext()->makeClient("M/R cleanup"); + AlternativeClientRegion acr(cleanupClient); + auto cleanupOpCtx = cc().makeOperationContext(); + dropTempCollections(cleanupOpCtx.get(), + _config.tempNamespace, + _useIncremental ? _config.incLong : NamespaceString()); } catch (...) { error() << "Unable to drop temporary collection created by mapReduce: " << _config.tempNamespace << ". This collection will be removed automatically " @@ -1166,9 +1186,6 @@ void State::finalReduce(OperationContext* opCtx, CurOp* curOp) { verify(statusWithCQ.isOK()); std::unique_ptr<CanonicalQuery> cq = std::move(statusWithCQ.getValue()); - // The following anonymous block makes sure to destroy the executor prior to the - // finalReduce(all) call. This is important to clear the cursors being held by the - // storage engine. { auto exec = uassertStatusOK(getExecutor(_opCtx, ctx->getCollection(), @@ -1176,14 +1193,6 @@ void State::finalReduce(OperationContext* opCtx, CurOp* curOp) { PlanExecutor::YIELD_AUTO, QueryPlannerParams::NO_TABLE_SCAN)); - // Make sure the PlanExecutor is destroyed while holding a collection lock. - ON_BLOCK_EXIT([&exec, &ctx, opCtx, this] { - if (!ctx) { - AutoGetCollection autoColl(opCtx, _config.incLong, MODE_IS); - exec.reset(); - } - }); - // iterate over all sorted objects BSONObj o; PlanExecutor::ExecState state; @@ -1393,8 +1402,6 @@ public: string& errmsg, BSONObjBuilder& result) { Timer t; - // Don't let a lock acquisition in map-reduce get interrupted. - UninterruptibleLockGuard noInterrupt(opCtx->lockState()); boost::optional<DisableDocumentValidation> maybeDisableValidation; if (shouldBypassDocumentValidationForCommand(cmd)) @@ -1498,14 +1505,6 @@ public: PlanExecutor::YIELD_AUTO, 0)); - // Make sure the PlanExecutor is destroyed while holding the necessary locks. - ON_BLOCK_EXIT([&exec, &scopedAutoColl, opCtx, &config] { - if (!scopedAutoColl) { - AutoGetDb autoDb(opCtx, config.nss.db(), MODE_S); - exec.reset(); - } - }); - { stdx::lock_guard<Client> lk(*opCtx->getClient()); CurOp::get(opCtx)->setPlanSummary_inlock(Explain::getPlanSummary(exec.get())); @@ -1699,9 +1698,6 @@ public: << " which lives on config servers"); } - // Don't let any lock acquisitions get interrupted. - UninterruptibleLockGuard noInterrupt(opCtx->lockState()); - boost::optional<DisableDocumentValidation> maybeDisableValidation; if (shouldBypassDocumentValidationForCommand(cmdObj)) maybeDisableValidation.emplace(opCtx); diff --git a/src/mongo/db/commands/mr.h b/src/mongo/db/commands/mr.h index 13fd4a43cd6..09b66e3dd20 100644 --- a/src/mongo/db/commands/mr.h +++ b/src/mongo/db/commands/mr.h @@ -307,13 +307,6 @@ public: void finalReduce(OperationContext* opCtx, CurOp* op); - // ------- cleanup/data positioning ---------- - - /** - * Clean up the temporary and incremental collections - */ - void dropTempCollections(); - /** @return number objects in collection */ |