diff options
author | Justin Seyster <justin.seyster@mongodb.com> | 2019-01-31 15:55:02 -0500 |
---|---|---|
committer | Justin Seyster <justin.seyster@mongodb.com> | 2019-01-31 15:55:02 -0500 |
commit | 57b22a11d206272a78124ee03c5a6cf26b3e1105 (patch) | |
tree | 78363850f4be0ba14cf2272abbfb51b9f57393f5 | |
parent | a0aef148ed113bf66ca4c8ab37864455524be2a2 (diff) | |
download | mongo-57b22a11d206272a78124ee03c5a6cf26b3e1105.tar.gz |
SERVER-38480 Make Map-Reduce fully interruptible
14 files changed, 165 insertions, 76 deletions
diff --git a/buildscripts/resmokeconfig/suites/concurrency_sharded_causal_consistency.yml b/buildscripts/resmokeconfig/suites/concurrency_sharded_causal_consistency.yml index 856c9df45d5..435b14765e3 100644 --- a/buildscripts/resmokeconfig/suites/concurrency_sharded_causal_consistency.yml +++ b/buildscripts/resmokeconfig/suites/concurrency_sharded_causal_consistency.yml @@ -29,6 +29,7 @@ selector: # namespaces collide across mongos processes' - jstests/concurrency/fsm_workloads/map_reduce_drop.js - jstests/concurrency/fsm_workloads/map_reduce_inline.js + - jstests/concurrency/fsm_workloads/map_reduce_interrupt.js - jstests/concurrency/fsm_workloads/map_reduce_merge.js - jstests/concurrency/fsm_workloads/map_reduce_merge_nonatomic.js - jstests/concurrency/fsm_workloads/map_reduce_reduce.js diff --git a/buildscripts/resmokeconfig/suites/concurrency_sharded_causal_consistency_and_balancer.yml b/buildscripts/resmokeconfig/suites/concurrency_sharded_causal_consistency_and_balancer.yml index a1511d0dfb6..04da8dc3844 100644 --- a/buildscripts/resmokeconfig/suites/concurrency_sharded_causal_consistency_and_balancer.yml +++ b/buildscripts/resmokeconfig/suites/concurrency_sharded_causal_consistency_and_balancer.yml @@ -32,6 +32,7 @@ selector: # namespaces collide across mongos processes' - jstests/concurrency/fsm_workloads/map_reduce_drop.js - jstests/concurrency/fsm_workloads/map_reduce_inline.js + - jstests/concurrency/fsm_workloads/map_reduce_interrupt.js - jstests/concurrency/fsm_workloads/map_reduce_merge.js - jstests/concurrency/fsm_workloads/map_reduce_merge_nonatomic.js - jstests/concurrency/fsm_workloads/map_reduce_reduce.js diff --git a/buildscripts/resmokeconfig/suites/concurrency_sharded_replication.yml b/buildscripts/resmokeconfig/suites/concurrency_sharded_replication.yml index 713ff6286d4..8653ff344ea 100644 --- a/buildscripts/resmokeconfig/suites/concurrency_sharded_replication.yml +++ b/buildscripts/resmokeconfig/suites/concurrency_sharded_replication.yml @@ -26,6 +26,7 @@ selector: # namespaces collide across mongos processes' - jstests/concurrency/fsm_workloads/map_reduce_drop.js - jstests/concurrency/fsm_workloads/map_reduce_inline.js + - jstests/concurrency/fsm_workloads/map_reduce_interrupt.js - jstests/concurrency/fsm_workloads/map_reduce_merge.js - jstests/concurrency/fsm_workloads/map_reduce_merge_nonatomic.js - jstests/concurrency/fsm_workloads/map_reduce_reduce.js diff --git a/buildscripts/resmokeconfig/suites/concurrency_sharded_replication_local_read_write_multi_stmt_txn.yml b/buildscripts/resmokeconfig/suites/concurrency_sharded_replication_local_read_write_multi_stmt_txn.yml index 003d4d34cfb..65f6f3cc978 100644 --- a/buildscripts/resmokeconfig/suites/concurrency_sharded_replication_local_read_write_multi_stmt_txn.yml +++ b/buildscripts/resmokeconfig/suites/concurrency_sharded_replication_local_read_write_multi_stmt_txn.yml @@ -30,6 +30,7 @@ selector: # namespaces collide across mongos processes' - jstests/concurrency/fsm_workloads/map_reduce_drop.js - jstests/concurrency/fsm_workloads/map_reduce_inline.js + - jstests/concurrency/fsm_workloads/map_reduce_interrupt.js - jstests/concurrency/fsm_workloads/map_reduce_merge.js - jstests/concurrency/fsm_workloads/map_reduce_merge_nonatomic.js - jstests/concurrency/fsm_workloads/map_reduce_reduce.js diff --git a/buildscripts/resmokeconfig/suites/concurrency_sharded_replication_local_read_write_multi_stmt_txn_with_balancer.yml b/buildscripts/resmokeconfig/suites/concurrency_sharded_replication_local_read_write_multi_stmt_txn_with_balancer.yml index 70c7541c276..82aba9da465 100644 --- a/buildscripts/resmokeconfig/suites/concurrency_sharded_replication_local_read_write_multi_stmt_txn_with_balancer.yml +++ b/buildscripts/resmokeconfig/suites/concurrency_sharded_replication_local_read_write_multi_stmt_txn_with_balancer.yml @@ -30,6 +30,7 @@ selector: # namespaces collide across mongos processes' - jstests/concurrency/fsm_workloads/map_reduce_drop.js - jstests/concurrency/fsm_workloads/map_reduce_inline.js + - jstests/concurrency/fsm_workloads/map_reduce_interrupt.js - jstests/concurrency/fsm_workloads/map_reduce_merge.js - jstests/concurrency/fsm_workloads/map_reduce_merge_nonatomic.js - jstests/concurrency/fsm_workloads/map_reduce_reduce.js diff --git a/buildscripts/resmokeconfig/suites/concurrency_sharded_replication_multi_stmt_txn.yml b/buildscripts/resmokeconfig/suites/concurrency_sharded_replication_multi_stmt_txn.yml index 17b035eac5f..4b3debaa952 100644 --- a/buildscripts/resmokeconfig/suites/concurrency_sharded_replication_multi_stmt_txn.yml +++ b/buildscripts/resmokeconfig/suites/concurrency_sharded_replication_multi_stmt_txn.yml @@ -30,6 +30,7 @@ selector: # namespaces collide across mongos processes' - jstests/concurrency/fsm_workloads/map_reduce_drop.js - jstests/concurrency/fsm_workloads/map_reduce_inline.js + - jstests/concurrency/fsm_workloads/map_reduce_interrupt.js - jstests/concurrency/fsm_workloads/map_reduce_merge.js - jstests/concurrency/fsm_workloads/map_reduce_merge_nonatomic.js - jstests/concurrency/fsm_workloads/map_reduce_reduce.js diff --git a/buildscripts/resmokeconfig/suites/concurrency_sharded_replication_multi_stmt_txn_with_balancer.yml b/buildscripts/resmokeconfig/suites/concurrency_sharded_replication_multi_stmt_txn_with_balancer.yml index f0f7ac2b179..21b95389f91 100644 --- a/buildscripts/resmokeconfig/suites/concurrency_sharded_replication_multi_stmt_txn_with_balancer.yml +++ b/buildscripts/resmokeconfig/suites/concurrency_sharded_replication_multi_stmt_txn_with_balancer.yml @@ -30,6 +30,7 @@ selector: # namespaces collide across mongos processes' - jstests/concurrency/fsm_workloads/map_reduce_drop.js - jstests/concurrency/fsm_workloads/map_reduce_inline.js + - jstests/concurrency/fsm_workloads/map_reduce_interrupt.js - jstests/concurrency/fsm_workloads/map_reduce_merge.js - jstests/concurrency/fsm_workloads/map_reduce_merge_nonatomic.js - jstests/concurrency/fsm_workloads/map_reduce_reduce.js diff --git a/buildscripts/resmokeconfig/suites/concurrency_sharded_replication_with_balancer.yml b/buildscripts/resmokeconfig/suites/concurrency_sharded_replication_with_balancer.yml index 2ddd8e0ee05..039bd6cbba8 100644 --- a/buildscripts/resmokeconfig/suites/concurrency_sharded_replication_with_balancer.yml +++ b/buildscripts/resmokeconfig/suites/concurrency_sharded_replication_with_balancer.yml @@ -29,6 +29,7 @@ selector: # namespaces collide across mongos processes' - jstests/concurrency/fsm_workloads/map_reduce_drop.js - jstests/concurrency/fsm_workloads/map_reduce_inline.js + - jstests/concurrency/fsm_workloads/map_reduce_interrupt.js - jstests/concurrency/fsm_workloads/map_reduce_merge.js - jstests/concurrency/fsm_workloads/map_reduce_merge_nonatomic.js - jstests/concurrency/fsm_workloads/map_reduce_reduce.js diff --git a/buildscripts/resmokeconfig/suites/concurrency_sharded_with_stepdowns.yml b/buildscripts/resmokeconfig/suites/concurrency_sharded_with_stepdowns.yml index 1e1cd5f5318..3676e0e1d31 100644 --- a/buildscripts/resmokeconfig/suites/concurrency_sharded_with_stepdowns.yml +++ b/buildscripts/resmokeconfig/suites/concurrency_sharded_with_stepdowns.yml @@ -26,6 +26,7 @@ selector: # namespaces collide across mongos processes' - jstests/concurrency/fsm_workloads/map_reduce_drop.js - jstests/concurrency/fsm_workloads/map_reduce_inline.js + - jstests/concurrency/fsm_workloads/map_reduce_interrupt.js - jstests/concurrency/fsm_workloads/map_reduce_merge.js - jstests/concurrency/fsm_workloads/map_reduce_merge_nonatomic.js - jstests/concurrency/fsm_workloads/map_reduce_reduce.js diff --git a/buildscripts/resmokeconfig/suites/concurrency_sharded_with_stepdowns_and_balancer.yml b/buildscripts/resmokeconfig/suites/concurrency_sharded_with_stepdowns_and_balancer.yml index ac96886c3c9..74304822cd7 100644 --- a/buildscripts/resmokeconfig/suites/concurrency_sharded_with_stepdowns_and_balancer.yml +++ b/buildscripts/resmokeconfig/suites/concurrency_sharded_with_stepdowns_and_balancer.yml @@ -29,6 +29,7 @@ selector: # namespaces collide across mongos processes' - jstests/concurrency/fsm_workloads/map_reduce_drop.js - jstests/concurrency/fsm_workloads/map_reduce_inline.js + - jstests/concurrency/fsm_workloads/map_reduce_interrupt.js - jstests/concurrency/fsm_workloads/map_reduce_merge.js - jstests/concurrency/fsm_workloads/map_reduce_merge_nonatomic.js - jstests/concurrency/fsm_workloads/map_reduce_reduce.js diff --git a/jstests/concurrency/fsm_workloads/map_reduce_interrupt.js b/jstests/concurrency/fsm_workloads/map_reduce_interrupt.js new file mode 100644 index 00000000000..bbabdd954ac --- /dev/null +++ b/jstests/concurrency/fsm_workloads/map_reduce_interrupt.js @@ -0,0 +1,90 @@ +'use strict'; + +/** + * map_reduce_interrupt.js + * + * Extends the map_reduce_inline.js workload with a state that randomly kills a running map-reduce + * operation. This workload is intended to test that there are no deadlocks or unhandled exceptions + * when tearing down a map-reduce command following an interrupt. + * + * @tags: [uses_curop_agg_stage] + */ +load('jstests/concurrency/fsm_libs/extend_workload.js'); // for extendWorkload +load('jstests/concurrency/fsm_workloads/map_reduce_replace_nonexistent.js'); // for $config + +var $config = extendWorkload($config, function($config, $super) { + $config.data.prefix = 'map_reduce_interrupt'; + + $config.states.killOp = function killOp(db, collName) { + const mrOps = db.getSiblingDB('admin') + .aggregate([ + {$currentOp: {}}, + {$match: {'command.mapreduce': collName}}, + {$project: {opid: '$opid'}} + ]) + .toArray(); + + if (mrOps.length > 0) { + const randomOpIndex = Math.floor(mrOps.length * Math.random()); + const randomOpId = mrOps[randomOpIndex].opid; + jsTestLog('Randomly chose to kill Map-Reduce with opid: ' + tojson(randomOpId)); + + // Note: Even if the killOp reaches the server after the map-reduce command is already + // done, the server still returns an "ok" response, so this assertion is safe. + assertAlways.commandWorked( + db.getSiblingDB('admin').runCommand({killOp: 1, op: randomOpId})); + } else { + // No map-reduce operations to kill at the moment. + } + }; + + $config.states.mapReduce = function mapReduce(db, collName) { + try { + $super.states.mapReduce.apply(this, arguments); + } catch (err) { + // The nature of this test means that we expect the map-reduce command to sometimes fail + // due to interruption. No other failures are expected, though. Note that interruptions + // can cause some unrelated error codes, including InternalError (during JavaScript + // execution) and some non-specific errors (SERVER-39281, SERVER-39282). Checking for + // "interrupted" in the error message is a reasonable way to spot all the miscellaneous + // errors that can occur because of an interruption. + if (err.code != ErrorCodes.Interrupted && err.code != ErrorCodes.InternalError && + !/interrupted/i.test(err.message)) { + throw err; + } + } + }; + + $config.teardown = function teardown(db, collname, cluster) { + // Interrupted map-reduce operations should still be able to clean up the temp collections + // that they create within the database of the output collection and within the "local" + // database. + // + // Cleanup occurs as part of its own operations, which can also be interrupted, but the + // 'killOp' state of this test only targets map-reduce operations. + + const dbTempCollectionsResult = + db.runCommand({listCollections: 1, filter: {'options.temp': true}}); + assertAlways.commandWorked(dbTempCollectionsResult); + assertAlways.eq( + dbTempCollectionsResult.cursor.firstBatch.length, 0, dbTempCollectionsResult); + + if (!cluster.isSharded()) { + // Note that we can't do this check on sharded clusters, which do not have a "local" + // database. + const localTempCollectionsResult = db.getSiblingDB('local').runCommand( + {listCollections: 1, filter: {'options.temp': true}}); + assertAlways.commandWorked(localTempCollectionsResult); + assertAlways.eq( + localTempCollectionsResult.cursor.firstBatch.length, 0, localTempCollectionsResult); + } + }; + + $config.transitions = { + init: {mapReduce: 0.8, killOp: 0.2}, + mapReduce: {mapReduce: 0.8, killOp: 0.2}, + killOp: {mapReduce: 0.8, killOp: 0.2} + }; + + return $config; +}); diff --git a/jstests/concurrency/fsm_workloads/map_reduce_replace_nonexistent.js b/jstests/concurrency/fsm_workloads/map_reduce_replace_nonexistent.js index f6556e2744d..a87f07a8c73 100644 --- a/jstests/concurrency/fsm_workloads/map_reduce_replace_nonexistent.js +++ b/jstests/concurrency/fsm_workloads/map_reduce_replace_nonexistent.js @@ -17,14 +17,14 @@ var $config = extendWorkload($config, function($config, $super) { // Use the workload name as a prefix for the collection name, // since the workload name is assumed to be unique. - var prefix = 'map_reduce_replace_nonexistent'; + $config.data.prefix = 'map_reduce_replace_nonexistent'; function uniqueCollectionName(prefix, tid) { return prefix + tid; } $config.states.mapReduce = function mapReduce(db, collName) { - var outCollName = uniqueCollectionName(prefix, this.tid); + var outCollName = uniqueCollectionName(this.prefix, this.tid); var fullName = db[outCollName].getFullName(); assertAlways.isnull(db[outCollName].exists(), "output collection '" + fullName + "' should not exist"); diff --git a/src/mongo/db/commands/mr.cpp b/src/mongo/db/commands/mr.cpp index 23c7261d18d..716ffce1d7b 100644 --- a/src/mongo/db/commands/mr.cpp +++ b/src/mongo/db/commands/mr.cpp @@ -163,6 +163,50 @@ void assertCollectionNotNull(const NamespaceString& nss, AutoT& autoT) { uassert(18698, "Collection unexpectedly disappeared: " + nss.ns(), autoT.getCollection()); } +/** + * Clean up the temporary and incremental collections + */ +void dropTempCollections(OperationContext* cleanupOpCtx, + const NamespaceString& tempNamespace, + const NamespaceString& incLong) { + if (!tempNamespace.isEmpty()) { + writeConflictRetry( + cleanupOpCtx, + "M/R dropTempCollections", + tempNamespace.ns(), + [cleanupOpCtx, &tempNamespace] { + AutoGetDb autoDb(cleanupOpCtx, tempNamespace.db(), MODE_X); + if (auto db = autoDb.getDb()) { + WriteUnitOfWork wunit(cleanupOpCtx); + uassert(ErrorCodes::PrimarySteppedDown, + str::stream() << "no longer primary while dropping temporary " + "collection for mapReduce: " + << tempNamespace.ns(), + repl::ReplicationCoordinator::get(cleanupOpCtx) + ->canAcceptWritesFor(cleanupOpCtx, tempNamespace)); + uassertStatusOK(db->dropCollection(cleanupOpCtx, tempNamespace.ns())); + wunit.commit(); + } + }); + // Always forget about temporary namespaces, so we don't cache lots of them + ShardConnection::forgetNS(tempNamespace.ns()); + } + if (!incLong.isEmpty()) { + writeConflictRetry( + cleanupOpCtx, "M/R dropTempCollections", incLong.ns(), [cleanupOpCtx, &incLong] { + Lock::DBLock lk(cleanupOpCtx, incLong.db(), MODE_X); + auto databaseHolder = DatabaseHolder::get(cleanupOpCtx); + if (auto db = databaseHolder->getDb(cleanupOpCtx, incLong.ns())) { + WriteUnitOfWork wunit(cleanupOpCtx); + uassertStatusOK(db->dropCollection(cleanupOpCtx, incLong.ns())); + wunit.commit(); + } + }); + + ShardConnection::forgetNS(incLong.ns()); + } +} + } // namespace AtomicWord<unsigned> Config::JOB_NUMBER; @@ -440,54 +484,14 @@ Config::Config(const string& _dbname, const BSONObj& cmdObj) { } /** - * Clean up the temporary and incremental collections - */ -void State::dropTempCollections() { - // The cleanup handler should not be interruptible. - UninterruptibleLockGuard noInterrupt(_opCtx->lockState()); - - if (!_config.tempNamespace.isEmpty()) { - writeConflictRetry(_opCtx, "M/R dropTempCollections", _config.tempNamespace.ns(), [this] { - AutoGetDb autoDb(_opCtx, _config.tempNamespace.db(), MODE_X); - if (auto db = autoDb.getDb()) { - WriteUnitOfWork wunit(_opCtx); - uassert( - ErrorCodes::PrimarySteppedDown, - str::stream() - << "no longer primary while dropping temporary collection for mapReduce: " - << _config.tempNamespace.ns(), - repl::ReplicationCoordinator::get(_opCtx)->canAcceptWritesFor( - _opCtx, _config.tempNamespace)); - uassertStatusOK(db->dropCollection(_opCtx, _config.tempNamespace.ns())); - wunit.commit(); - } - }); - // Always forget about temporary namespaces, so we don't cache lots of them - ShardConnection::forgetNS(_config.tempNamespace.ns()); - } - if (_useIncremental && !_config.incLong.isEmpty()) { - writeConflictRetry(_opCtx, "M/R dropTempCollections", _config.incLong.ns(), [this] { - Lock::DBLock lk(_opCtx, _config.incLong.db(), MODE_X); - auto databaseHolder = DatabaseHolder::get(_opCtx); - if (auto db = databaseHolder->getDb(_opCtx, _config.incLong.ns())) { - WriteUnitOfWork wunit(_opCtx); - uassertStatusOK(db->dropCollection(_opCtx, _config.incLong.ns())); - wunit.commit(); - } - }); - - ShardConnection::forgetNS(_config.incLong.ns()); - } -} - -/** * Create temporary collection, set up indexes */ void State::prepTempCollection() { if (!_onDisk) return; - dropTempCollections(); + dropTempCollections( + _opCtx, _config.tempNamespace, _useIncremental ? _config.incLong : NamespaceString()); if (_useIncremental) { // Create the inc collection and make sure we have index on "0" key. The inc collection is @@ -877,7 +881,23 @@ bool State::sourceExists() { State::~State() { if (_onDisk) { try { - dropTempCollections(); + // If we're here because the map-reduce got interrupted, any attempt to drop temporary + // collections within the same operation context is guaranteed to fail as soon as we try + // to take the X-lock for the database. (An UninterruptibleLockGuard would allow + // dropTempCollections() to take the locks it needs, but taking an X-lock in + // UninterruptibleLockGuard context is not allowed.) + // + // We don't want every single interrupted map-reduce to leak temporary collections that + // will stick around until a server restart, so we execute the cleanup as though it's a + // new operation, by constructing a new Client and OperationContext. It's possible that + // the new operation will also get interrupted, but dropTempCollections() is short + // lived, so the odds are acceptably low. + auto cleanupClient = _opCtx->getServiceContext()->makeClient("M/R cleanup"); + AlternativeClientRegion acr(cleanupClient); + auto cleanupOpCtx = cc().makeOperationContext(); + dropTempCollections(cleanupOpCtx.get(), + _config.tempNamespace, + _useIncremental ? _config.incLong : NamespaceString()); } catch (...) { error() << "Unable to drop temporary collection created by mapReduce: " << _config.tempNamespace << ". This collection will be removed automatically " @@ -1166,9 +1186,6 @@ void State::finalReduce(OperationContext* opCtx, CurOp* curOp) { verify(statusWithCQ.isOK()); std::unique_ptr<CanonicalQuery> cq = std::move(statusWithCQ.getValue()); - // The following anonymous block makes sure to destroy the executor prior to the - // finalReduce(all) call. This is important to clear the cursors being held by the - // storage engine. { auto exec = uassertStatusOK(getExecutor(_opCtx, ctx->getCollection(), @@ -1176,14 +1193,6 @@ void State::finalReduce(OperationContext* opCtx, CurOp* curOp) { PlanExecutor::YIELD_AUTO, QueryPlannerParams::NO_TABLE_SCAN)); - // Make sure the PlanExecutor is destroyed while holding a collection lock. - ON_BLOCK_EXIT([&exec, &ctx, opCtx, this] { - if (!ctx) { - AutoGetCollection autoColl(opCtx, _config.incLong, MODE_IS); - exec.reset(); - } - }); - // iterate over all sorted objects BSONObj o; PlanExecutor::ExecState state; @@ -1393,8 +1402,6 @@ public: string& errmsg, BSONObjBuilder& result) { Timer t; - // Don't let a lock acquisition in map-reduce get interrupted. - UninterruptibleLockGuard noInterrupt(opCtx->lockState()); boost::optional<DisableDocumentValidation> maybeDisableValidation; if (shouldBypassDocumentValidationForCommand(cmd)) @@ -1498,14 +1505,6 @@ public: PlanExecutor::YIELD_AUTO, 0)); - // Make sure the PlanExecutor is destroyed while holding the necessary locks. - ON_BLOCK_EXIT([&exec, &scopedAutoColl, opCtx, &config] { - if (!scopedAutoColl) { - AutoGetDb autoDb(opCtx, config.nss.db(), MODE_S); - exec.reset(); - } - }); - { stdx::lock_guard<Client> lk(*opCtx->getClient()); CurOp::get(opCtx)->setPlanSummary_inlock(Explain::getPlanSummary(exec.get())); @@ -1699,9 +1698,6 @@ public: << " which lives on config servers"); } - // Don't let any lock acquisitions get interrupted. - UninterruptibleLockGuard noInterrupt(opCtx->lockState()); - boost::optional<DisableDocumentValidation> maybeDisableValidation; if (shouldBypassDocumentValidationForCommand(cmdObj)) maybeDisableValidation.emplace(opCtx); diff --git a/src/mongo/db/commands/mr.h b/src/mongo/db/commands/mr.h index 13fd4a43cd6..09b66e3dd20 100644 --- a/src/mongo/db/commands/mr.h +++ b/src/mongo/db/commands/mr.h @@ -307,13 +307,6 @@ public: void finalReduce(OperationContext* opCtx, CurOp* op); - // ------- cleanup/data positioning ---------- - - /** - * Clean up the temporary and incremental collections - */ - void dropTempCollections(); - /** @return number objects in collection */ |