summaryrefslogtreecommitdiff
path: root/src/mongo/db/commands/mr.cpp
diff options
context:
space:
mode:
authorJustin Seyster <justin.seyster@mongodb.com>2019-01-31 15:55:02 -0500
committerJustin Seyster <justin.seyster@mongodb.com>2019-01-31 15:55:02 -0500
commit57b22a11d206272a78124ee03c5a6cf26b3e1105 (patch)
tree78363850f4be0ba14cf2272abbfb51b9f57393f5 /src/mongo/db/commands/mr.cpp
parenta0aef148ed113bf66ca4c8ab37864455524be2a2 (diff)
downloadmongo-57b22a11d206272a78124ee03c5a6cf26b3e1105.tar.gz
SERVER-38480 Make Map-Reduce fully interruptible
Diffstat (limited to 'src/mongo/db/commands/mr.cpp')
-rw-r--r--src/mongo/db/commands/mr.cpp130
1 files changed, 63 insertions, 67 deletions
diff --git a/src/mongo/db/commands/mr.cpp b/src/mongo/db/commands/mr.cpp
index 23c7261d18d..716ffce1d7b 100644
--- a/src/mongo/db/commands/mr.cpp
+++ b/src/mongo/db/commands/mr.cpp
@@ -163,6 +163,50 @@ void assertCollectionNotNull(const NamespaceString& nss, AutoT& autoT) {
uassert(18698, "Collection unexpectedly disappeared: " + nss.ns(), autoT.getCollection());
}
+/**
+ * Clean up the temporary and incremental collections
+ */
+void dropTempCollections(OperationContext* cleanupOpCtx,
+ const NamespaceString& tempNamespace,
+ const NamespaceString& incLong) {
+ if (!tempNamespace.isEmpty()) {
+ writeConflictRetry(
+ cleanupOpCtx,
+ "M/R dropTempCollections",
+ tempNamespace.ns(),
+ [cleanupOpCtx, &tempNamespace] {
+ AutoGetDb autoDb(cleanupOpCtx, tempNamespace.db(), MODE_X);
+ if (auto db = autoDb.getDb()) {
+ WriteUnitOfWork wunit(cleanupOpCtx);
+ uassert(ErrorCodes::PrimarySteppedDown,
+ str::stream() << "no longer primary while dropping temporary "
+ "collection for mapReduce: "
+ << tempNamespace.ns(),
+ repl::ReplicationCoordinator::get(cleanupOpCtx)
+ ->canAcceptWritesFor(cleanupOpCtx, tempNamespace));
+ uassertStatusOK(db->dropCollection(cleanupOpCtx, tempNamespace.ns()));
+ wunit.commit();
+ }
+ });
+ // Always forget about temporary namespaces, so we don't cache lots of them
+ ShardConnection::forgetNS(tempNamespace.ns());
+ }
+ if (!incLong.isEmpty()) {
+ writeConflictRetry(
+ cleanupOpCtx, "M/R dropTempCollections", incLong.ns(), [cleanupOpCtx, &incLong] {
+ Lock::DBLock lk(cleanupOpCtx, incLong.db(), MODE_X);
+ auto databaseHolder = DatabaseHolder::get(cleanupOpCtx);
+ if (auto db = databaseHolder->getDb(cleanupOpCtx, incLong.ns())) {
+ WriteUnitOfWork wunit(cleanupOpCtx);
+ uassertStatusOK(db->dropCollection(cleanupOpCtx, incLong.ns()));
+ wunit.commit();
+ }
+ });
+
+ ShardConnection::forgetNS(incLong.ns());
+ }
+}
+
} // namespace
AtomicWord<unsigned> Config::JOB_NUMBER;
@@ -440,54 +484,14 @@ Config::Config(const string& _dbname, const BSONObj& cmdObj) {
}
/**
- * Clean up the temporary and incremental collections
- */
-void State::dropTempCollections() {
- // The cleanup handler should not be interruptible.
- UninterruptibleLockGuard noInterrupt(_opCtx->lockState());
-
- if (!_config.tempNamespace.isEmpty()) {
- writeConflictRetry(_opCtx, "M/R dropTempCollections", _config.tempNamespace.ns(), [this] {
- AutoGetDb autoDb(_opCtx, _config.tempNamespace.db(), MODE_X);
- if (auto db = autoDb.getDb()) {
- WriteUnitOfWork wunit(_opCtx);
- uassert(
- ErrorCodes::PrimarySteppedDown,
- str::stream()
- << "no longer primary while dropping temporary collection for mapReduce: "
- << _config.tempNamespace.ns(),
- repl::ReplicationCoordinator::get(_opCtx)->canAcceptWritesFor(
- _opCtx, _config.tempNamespace));
- uassertStatusOK(db->dropCollection(_opCtx, _config.tempNamespace.ns()));
- wunit.commit();
- }
- });
- // Always forget about temporary namespaces, so we don't cache lots of them
- ShardConnection::forgetNS(_config.tempNamespace.ns());
- }
- if (_useIncremental && !_config.incLong.isEmpty()) {
- writeConflictRetry(_opCtx, "M/R dropTempCollections", _config.incLong.ns(), [this] {
- Lock::DBLock lk(_opCtx, _config.incLong.db(), MODE_X);
- auto databaseHolder = DatabaseHolder::get(_opCtx);
- if (auto db = databaseHolder->getDb(_opCtx, _config.incLong.ns())) {
- WriteUnitOfWork wunit(_opCtx);
- uassertStatusOK(db->dropCollection(_opCtx, _config.incLong.ns()));
- wunit.commit();
- }
- });
-
- ShardConnection::forgetNS(_config.incLong.ns());
- }
-}
-
-/**
* Create temporary collection, set up indexes
*/
void State::prepTempCollection() {
if (!_onDisk)
return;
- dropTempCollections();
+ dropTempCollections(
+ _opCtx, _config.tempNamespace, _useIncremental ? _config.incLong : NamespaceString());
if (_useIncremental) {
// Create the inc collection and make sure we have index on "0" key. The inc collection is
@@ -877,7 +881,23 @@ bool State::sourceExists() {
State::~State() {
if (_onDisk) {
try {
- dropTempCollections();
+ // If we're here because the map-reduce got interrupted, any attempt to drop temporary
+ // collections within the same operation context is guaranteed to fail as soon as we try
+ // to take the X-lock for the database. (An UninterruptibleLockGuard would allow
+ // dropTempCollections() to take the locks it needs, but taking an X-lock in
+ // UninterruptibleLockGuard context is not allowed.)
+ //
+ // We don't want every single interrupted map-reduce to leak temporary collections that
+ // will stick around until a server restart, so we execute the cleanup as though it's a
+ // new operation, by constructing a new Client and OperationContext. It's possible that
+ // the new operation will also get interrupted, but dropTempCollections() is short
+ // lived, so the odds are acceptably low.
+ auto cleanupClient = _opCtx->getServiceContext()->makeClient("M/R cleanup");
+ AlternativeClientRegion acr(cleanupClient);
+ auto cleanupOpCtx = cc().makeOperationContext();
+ dropTempCollections(cleanupOpCtx.get(),
+ _config.tempNamespace,
+ _useIncremental ? _config.incLong : NamespaceString());
} catch (...) {
error() << "Unable to drop temporary collection created by mapReduce: "
<< _config.tempNamespace << ". This collection will be removed automatically "
@@ -1166,9 +1186,6 @@ void State::finalReduce(OperationContext* opCtx, CurOp* curOp) {
verify(statusWithCQ.isOK());
std::unique_ptr<CanonicalQuery> cq = std::move(statusWithCQ.getValue());
- // The following anonymous block makes sure to destroy the executor prior to the
- // finalReduce(all) call. This is important to clear the cursors being held by the
- // storage engine.
{
auto exec = uassertStatusOK(getExecutor(_opCtx,
ctx->getCollection(),
@@ -1176,14 +1193,6 @@ void State::finalReduce(OperationContext* opCtx, CurOp* curOp) {
PlanExecutor::YIELD_AUTO,
QueryPlannerParams::NO_TABLE_SCAN));
- // Make sure the PlanExecutor is destroyed while holding a collection lock.
- ON_BLOCK_EXIT([&exec, &ctx, opCtx, this] {
- if (!ctx) {
- AutoGetCollection autoColl(opCtx, _config.incLong, MODE_IS);
- exec.reset();
- }
- });
-
// iterate over all sorted objects
BSONObj o;
PlanExecutor::ExecState state;
@@ -1393,8 +1402,6 @@ public:
string& errmsg,
BSONObjBuilder& result) {
Timer t;
- // Don't let a lock acquisition in map-reduce get interrupted.
- UninterruptibleLockGuard noInterrupt(opCtx->lockState());
boost::optional<DisableDocumentValidation> maybeDisableValidation;
if (shouldBypassDocumentValidationForCommand(cmd))
@@ -1498,14 +1505,6 @@ public:
PlanExecutor::YIELD_AUTO,
0));
- // Make sure the PlanExecutor is destroyed while holding the necessary locks.
- ON_BLOCK_EXIT([&exec, &scopedAutoColl, opCtx, &config] {
- if (!scopedAutoColl) {
- AutoGetDb autoDb(opCtx, config.nss.db(), MODE_S);
- exec.reset();
- }
- });
-
{
stdx::lock_guard<Client> lk(*opCtx->getClient());
CurOp::get(opCtx)->setPlanSummary_inlock(Explain::getPlanSummary(exec.get()));
@@ -1699,9 +1698,6 @@ public:
<< " which lives on config servers");
}
- // Don't let any lock acquisitions get interrupted.
- UninterruptibleLockGuard noInterrupt(opCtx->lockState());
-
boost::optional<DisableDocumentValidation> maybeDisableValidation;
if (shouldBypassDocumentValidationForCommand(cmdObj))
maybeDisableValidation.emplace(opCtx);