diff options
35 files changed, 1561 insertions, 2030 deletions
diff --git a/jstests/sharding/authCommands.js b/jstests/sharding/authCommands.js index a36e745f113..694a09e2f9b 100644 --- a/jstests/sharding/authCommands.js +++ b/jstests/sharding/authCommands.js @@ -117,9 +117,9 @@ print("Checking read operations, should work"); assert.eq(expectedDocs, testDB.foo.find().itcount()); assert.eq(expectedDocs, testDB.foo.count()); + // NOTE: This is an explicit check that GLE can be run with read prefs, not the result - // of - // above. + // of above. assert.eq(null, testDB.runCommand({getlasterror: 1}).err); checkCommandSucceeded(testDB, {dbstats: 1}); checkCommandSucceeded(testDB, {collstats: 'foo'}); diff --git a/jstests/sharding/cleanup_orphaned_cmd_during_movechunk.js b/jstests/sharding/cleanup_orphaned_cmd_during_movechunk.js index fef023fabf5..bf8471b2e98 100644 --- a/jstests/sharding/cleanup_orphaned_cmd_during_movechunk.js +++ b/jstests/sharding/cleanup_orphaned_cmd_during_movechunk.js @@ -29,14 +29,16 @@ load('./jstests/libs/cleanup_orphaned_util.js'); {moveChunk: ns, find: {_id: 20}, to: st.shard1.shardName, _waitForDelete: true})); jsTest.log('Inserting 20 docs into shard 0....'); - for (var i = -20; i < 20; i += 2) + for (var i = -20; i < 20; i += 2) { coll.insert({_id: i}); + } assert.eq(null, coll.getDB().getLastError()); assert.eq(20, donorColl.count()); jsTest.log('Inserting 10 docs into shard 1....'); - for (i = 20; i < 40; i += 2) + for (i = 20; i < 40; i += 2) { coll.insert({_id: i}); + } assert.eq(null, coll.getDB().getLastError()); assert.eq(10, recipientColl.count()); diff --git a/jstests/sharding/migration_ignore_interrupts_4.js b/jstests/sharding/migration_ignore_interrupts_4.js index 5469ea48d4b..1510f4f1f67 100644 --- a/jstests/sharding/migration_ignore_interrupts_4.js +++ b/jstests/sharding/migration_ignore_interrupts_4.js @@ -91,9 +91,6 @@ load('./jstests/libs/chunk_manipulation_util.js'); }, "coll1 migration recipient didn't abort migration in catchup phase.", 2 * 60 * 1000); assert.eq( 1, shard0Coll1.find().itcount(), "donor shard0 completed a migration that it aborted."); - assert.eq(1, - shard1Coll1.find().itcount(), - "shard1 accessed the xfermods log despite donor migration abortion."); jsTest.log('Finishing coll2 migration, which should succeed....'); unpauseMigrateAtStep(shard2, migrateStepNames.cloned); diff --git a/src/mongo/SConscript b/src/mongo/SConscript index 39de568e960..abf34365dbb 100644 --- a/src/mongo/SConscript +++ b/src/mongo/SConscript @@ -289,7 +289,6 @@ mongodLibDeps = [ 'db/op_observer_d', 'db/mongodwebserver', 'db/repl/repl_set_commands', - 'db/range_deleter_d', 'db/repair_database', 'db/repl/storage_interface_impl', 'db/repl/topology_coordinator_impl', diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 2edd8f35d6a..698fc103a51 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -369,35 +369,6 @@ env.CppUnitTest( ], ) -env.Library( - target='range_deleter', - source=[ - 'range_deleter.cpp', - 'range_deleter_mock_env.cpp', - ], - LIBDEPS=[ - '$BUILD_DIR/mongo/base', - '$BUILD_DIR/mongo/db/common', - '$BUILD_DIR/mongo/db/repl/repl_coordinator_global', - '$BUILD_DIR/mongo/db/service_context', - 'range_arithmetic', - ], -) - -env.CppUnitTest( - target='range_deleter_test', - source=[ - 'range_deleter_test.cpp', - ], - LIBDEPS=[ - 'common', - 'range_deleter', - '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init', - '$BUILD_DIR/mongo/db/repl/replmocks', - '$BUILD_DIR/mongo/db/service_context_noop_init', - ], -) - # This library is linked into mongos and mongod only, not into the shell or any tools. env.Library( target="mongodandmongos", @@ -755,24 +726,6 @@ env.Library( ) env.Library( - target="range_deleter_d", - source=[ - "range_deleter_db_env.cpp", - "range_deleter_service.cpp", - ], - LIBDEPS=[ - "dbhelpers", - "db_raii", - "range_deleter", - #"catalog/catalog", # CYCLE - ], - LIBDEPS_TAGS=[ - # TODO(ADAM, 2017-03-10): See `CYCLE` tags above. - 'illegal_cyclic_or_unresolved_dependencies_whitelisted', - ], -) - -env.Library( target="rw_concern_d", source=[ "read_concern.cpp", @@ -926,8 +879,6 @@ env.Library( "pipeline/serveronly", "prefetch", "query/query", - "range_deleter", - "range_deleter_d", "repair_database", "repl/bgsync", "repl/oplog_buffer_collection", diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp index 68a3b7f0b44..75e81e2d216 100644 --- a/src/mongo/db/db.cpp +++ b/src/mongo/db/db.cpp @@ -659,8 +659,6 @@ ExitCode _initAndListen(int listenPort) { startFTDC(); - getDeleter()->startWorkers(); - restartInProgressIndexesFromLastShutdown(startupOpCtx.get()); if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) { diff --git a/src/mongo/db/dbhelpers.cpp b/src/mongo/db/dbhelpers.cpp index 34ecdfbbefa..7de97489e77 100644 --- a/src/mongo/db/dbhelpers.cpp +++ b/src/mongo/db/dbhelpers.cpp @@ -271,195 +271,6 @@ BSONObj Helpers::inferKeyPattern(const BSONObj& o) { return kpBuilder.obj(); } -long long Helpers::removeRange(OperationContext* opCtx, - const KeyRange& range, - BoundInclusion boundInclusion, - const WriteConcernOptions& writeConcern, - RemoveSaver* callback, - bool fromMigrate, - bool onlyRemoveOrphanedDocs) { - Timer rangeRemoveTimer; - const NamespaceString nss(range.ns); - - // The IndexChunk has a keyPattern that may apply to more than one index - we need to - // select the index and get the full index keyPattern here. - std::string indexName; - BSONObj min; - BSONObj max; - - { - AutoGetCollectionForReadCommand ctx(opCtx, nss); - Collection* collection = ctx.getCollection(); - if (!collection) { - warning(LogComponent::kSharding) - << "collection deleted before cleaning data over range of type " << range.keyPattern - << " in " << nss.ns() << endl; - return -1; - } - - // Allow multiKey based on the invariant that shard keys must be single-valued. - // Therefore, any multi-key index prefixed by shard key cannot be multikey over - // the shard key fields. - const IndexDescriptor* idx = - collection->getIndexCatalog()->findShardKeyPrefixedIndex(opCtx, - range.keyPattern, - false); // requireSingleKey - if (!idx) { - warning(LogComponent::kSharding) << "no index found to clean data over range of type " - << range.keyPattern << " in " << nss.ns() << endl; - return -1; - } - - indexName = idx->indexName(); - KeyPattern indexKeyPattern(idx->keyPattern()); - - // Extend bounds to match the index we found - - invariant(IndexBounds::isStartIncludedInBound(boundInclusion)); - // Extend min to get (min, MinKey, MinKey, ....) - min = Helpers::toKeyFormat(indexKeyPattern.extendRangeBound(range.minKey, false)); - // If upper bound is included, extend max to get (max, MaxKey, MaxKey, ...) - // If not included, extend max to get (max, MinKey, MinKey, ....) - const bool maxInclusive = IndexBounds::isEndIncludedInBound(boundInclusion); - max = Helpers::toKeyFormat(indexKeyPattern.extendRangeBound(range.maxKey, maxInclusive)); - } - - - MONGO_LOG_COMPONENT(1, LogComponent::kSharding) - << "begin removal of " << min << " to " << max << " in " << nss.ns() - << " with write concern: " << writeConcern.toBSON() << endl; - - long long numDeleted = 0; - - Milliseconds millisWaitingForReplication{0}; - - while (1) { - // Scoping for write lock. - { - AutoGetCollection ctx(opCtx, nss, MODE_IX, MODE_IX); - Collection* collection = ctx.getCollection(); - if (!collection) - break; - - IndexDescriptor* desc = - collection->getIndexCatalog()->findIndexByName(opCtx, indexName); - - if (!desc) { - warning(LogComponent::kSharding) << "shard key index '" << indexName << "' on '" - << nss.ns() << "' was dropped"; - return -1; - } - - auto exec = InternalPlanner::indexScan(opCtx, - collection, - desc, - min, - max, - boundInclusion, - PlanExecutor::YIELD_AUTO, - InternalPlanner::FORWARD, - InternalPlanner::IXSCAN_FETCH); - - RecordId rloc; - BSONObj obj; - PlanExecutor::ExecState state; - // This may yield so we cannot touch nsd after this. - state = exec->getNext(&obj, &rloc); - if (PlanExecutor::IS_EOF == state) { - break; - } - - if (PlanExecutor::FAILURE == state || PlanExecutor::DEAD == state) { - warning(LogComponent::kSharding) - << PlanExecutor::statestr(state) << " - cursor error while trying to delete " - << min << " to " << max << " in " << nss.ns() << ": " - << WorkingSetCommon::toStatusString(obj) - << ", stats: " << Explain::getWinningPlanStats(exec.get()) << endl; - break; - } - - verify(PlanExecutor::ADVANCED == state); - - WriteUnitOfWork wuow(opCtx); - - if (onlyRemoveOrphanedDocs) { - // Do a final check in the write lock to make absolutely sure that our - // collection hasn't been modified in a way that invalidates our migration - // cleanup. - - // We should never be able to turn off the sharding state once enabled, but - // in the future we might want to. - verify(ShardingState::get(opCtx)->enabled()); - - bool docIsOrphan; - - // In write lock, so will be the most up-to-date version - auto metadataNow = CollectionShardingState::get(opCtx, nss.ns())->getMetadata(); - if (metadataNow) { - ShardKeyPattern kp(metadataNow->getKeyPattern()); - BSONObj key = kp.extractShardKeyFromDoc(obj); - docIsOrphan = - !metadataNow->keyBelongsToMe(key) && !metadataNow->keyIsPending(key); - } else { - docIsOrphan = false; - } - - if (!docIsOrphan) { - warning(LogComponent::kSharding) - << "aborting migration cleanup for chunk " << min << " to " << max - << (metadataNow ? (string) " at document " + obj.toString() : "") - << ", collection " << nss.ns() << " has changed " << endl; - break; - } - } - - if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(opCtx, nss)) { - warning() << "stepped down from primary while deleting chunk; " - << "orphaning data in " << nss.ns() << " in range [" << redact(min) - << ", " << redact(max) << ")"; - return numDeleted; - } - - if (callback) - callback->goingToDelete(obj); - - OpDebug* const nullOpDebug = nullptr; - collection->deleteDocument(opCtx, rloc, nullOpDebug, fromMigrate); - wuow.commit(); - numDeleted++; - } - - // TODO remove once the yielding below that references this timer has been removed - Timer secondaryThrottleTime; - - if (writeConcern.shouldWaitForOtherNodes() && numDeleted > 0) { - repl::ReplicationCoordinator::StatusAndDuration replStatus = - repl::getGlobalReplicationCoordinator()->awaitReplication( - opCtx, - repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(), - writeConcern); - if (replStatus.status.code() == ErrorCodes::ExceededTimeLimit) { - warning(LogComponent::kSharding) << "replication to secondaries for removeRange at " - "least 60 seconds behind"; - } else { - uassertStatusOK(replStatus.status); - } - millisWaitingForReplication += replStatus.duration; - } - } - - if (writeConcern.shouldWaitForOtherNodes()) - log(LogComponent::kSharding) - << "Helpers::removeRangeUnlocked time spent waiting for replication: " - << durationCount<Milliseconds>(millisWaitingForReplication) << "ms" << endl; - - MONGO_LOG_COMPONENT(1, LogComponent::kSharding) << "end removal of " << min << " to " << max - << " in " << nss.ns() << " (took " - << rangeRemoveTimer.millis() << "ms)" << endl; - - return numDeleted; -} - void Helpers::emptyCollection(OperationContext* opCtx, const NamespaceString& nss) { OldClientContext context(opCtx, nss.ns()); repl::UnreplicatedWritesBlock uwb(opCtx); diff --git a/src/mongo/db/dbhelpers.h b/src/mongo/db/dbhelpers.h index bcba05a7b04..031feda1215 100644 --- a/src/mongo/db/dbhelpers.h +++ b/src/mongo/db/dbhelpers.h @@ -139,29 +139,6 @@ struct Helpers { static BSONObj inferKeyPattern(const BSONObj& o); /** - * Takes a namespace range, specified by a min and max and qualified by an index pattern, - * and removes all the documents in that range found by iterating - * over the given index. Caller is responsible for insuring that min/max are - * compatible with the given keyPattern (e.g min={a:100} is compatible with - * keyPattern={a:1,b:1} since it can be extended to {a:100,b:minKey}, but - * min={b:100} is not compatible). - * - * Caller must hold a write lock on 'ns' - * - * Returns -1 when no usable index exists - * - * Does oplog the individual document deletions. - * // TODO: Refactor this mechanism, it is growing too large - */ - static long long removeRange(OperationContext* opCtx, - const KeyRange& range, - BoundInclusion boundInclusion, - const WriteConcernOptions& secondaryThrottle, - RemoveSaver* callback = NULL, - bool fromMigrate = false, - bool onlyRemoveOrphanedDocs = false); - - /** * Remove all documents from a collection. * You do not need to set the database before calling. * Does not oplog the operation. diff --git a/src/mongo/db/range_deleter_db_env.cpp b/src/mongo/db/range_deleter_db_env.cpp deleted file mode 100644 index 08419a39a11..00000000000 --- a/src/mongo/db/range_deleter_db_env.cpp +++ /dev/null @@ -1,129 +0,0 @@ -/** - * Copyright (C) 2013 10gen Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding - -#include "mongo/platform/basic.h" - -#include "mongo/db/range_deleter_db_env.h" - -#include "mongo/db/auth/authorization_manager.h" -#include "mongo/db/auth/authorization_session.h" -#include "mongo/db/catalog/collection.h" -#include "mongo/db/client.h" -#include "mongo/db/clientcursor.h" -#include "mongo/db/db_raii.h" -#include "mongo/db/dbhelpers.h" -#include "mongo/db/repl/replication_coordinator_global.h" -#include "mongo/db/s/operation_sharding_state.h" -#include "mongo/db/write_concern_options.h" -#include "mongo/util/log.h" - -namespace mongo { - -using std::string; - -/** - * Outline of the delete process: - * 1. Initialize the client for this thread if there is no client. This is for the worker - * threads that are attached to any of the threads servicing client requests. - * 2. Grant this thread authorization to perform deletes. - * 3. Temporarily enable mode to bypass shard version checks. TODO: Replace this hack. - * 4. Setup callback to save deletes to moveChunk directory (only if moveParanoia is true). - * 5. Delete range. - * 6. Wait until the majority of the secondaries catch up. - */ -bool RangeDeleterDBEnv::deleteRange(OperationContext* opCtx, - const RangeDeleteEntry& taskDetails, - long long int* deletedDocs, - std::string* errMsg) { - const string ns(taskDetails.options.range.ns); - const BSONObj inclusiveLower(taskDetails.options.range.minKey); - const BSONObj exclusiveUpper(taskDetails.options.range.maxKey); - const BSONObj keyPattern(taskDetails.options.range.keyPattern); - const WriteConcernOptions writeConcern(taskDetails.options.writeConcern); - const bool fromMigrate = taskDetails.options.fromMigrate; - const bool onlyRemoveOrphans = taskDetails.options.onlyRemoveOrphanedDocs; - - Client::initThreadIfNotAlready("RangeDeleter"); - - *deletedDocs = 0; - OperationShardingState::IgnoreVersioningBlock forceVersion(opCtx, NamespaceString(ns)); - - Helpers::RemoveSaver removeSaver("moveChunk", ns, taskDetails.options.removeSaverReason); - Helpers::RemoveSaver* removeSaverPtr = NULL; - if (serverGlobalParams.moveParanoia && !taskDetails.options.removeSaverReason.empty()) { - removeSaverPtr = &removeSaver; - } - - // log the opId so the user can use it to cancel the delete using killOp. - unsigned int opId = opCtx->getOpID(); - log() << "Deleter starting delete for: " << ns << " from " << redact(inclusiveLower) << " -> " - << redact(exclusiveUpper) << ", with opId: " << opId; - - try { - *deletedDocs = - Helpers::removeRange(opCtx, - KeyRange(ns, inclusiveLower, exclusiveUpper, keyPattern), - BoundInclusion::kIncludeStartKeyOnly, - writeConcern, - removeSaverPtr, - fromMigrate, - onlyRemoveOrphans); - - if (*deletedDocs < 0) { - *errMsg = "collection or index dropped before data could be cleaned"; - warning() << *errMsg; - - return false; - } - - log() << "rangeDeleter deleted " << *deletedDocs << " documents for " << ns << " from " - << redact(inclusiveLower) << " -> " << redact(exclusiveUpper); - } catch (const DBException& ex) { - *errMsg = str::stream() << "Error encountered while deleting range: " - << "ns" << ns << " from " << inclusiveLower << " -> " - << exclusiveUpper << ", cause by:" << causedBy(ex); - - return false; - } - - return true; -} - -void RangeDeleterDBEnv::getCursorIds(OperationContext* opCtx, - StringData ns, - std::set<CursorId>* openCursors) { - AutoGetCollection autoColl(opCtx, NamespaceString(ns), MODE_IS); - if (!autoColl.getCollection()) - return; - - autoColl.getCollection()->getCursorManager()->getCursorIds(openCursors); -} - -} // namespace mongo diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index 4dd1a7e33f4..400eec3def4 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -97,7 +97,6 @@ env.Library( '$BUILD_DIR/mongo/db/catalog/collection', '$BUILD_DIR/mongo/db/catalog/database', '$BUILD_DIR/mongo/db/common', - '$BUILD_DIR/mongo/db/range_deleter', '$BUILD_DIR/mongo/db/concurrency/lock_manager', '$BUILD_DIR/mongo/db/query/internal_plans', '$BUILD_DIR/mongo/s/client/shard_local', @@ -195,7 +194,6 @@ env.Library( '$BUILD_DIR/mongo/db/db_raii', '$BUILD_DIR/mongo/db/dbhelpers', '$BUILD_DIR/mongo/db/index_d', - '$BUILD_DIR/mongo/db/range_deleter_d', '$BUILD_DIR/mongo/db/repl/repl_coordinator_global', 'balancer', 'metadata', @@ -230,22 +228,10 @@ env.CppUnitTest( ) env.CppUnitTest( - target='sharding_metadata_test', - source=[ - 'collection_metadata_test.cpp', - 'shard_metadata_util_test.cpp', - ], - LIBDEPS=[ - 'metadata', - '$BUILD_DIR/mongo/s/shard_server_test_fixture', - ], -) - -env.CppUnitTest( target='shard_test', source=[ + 'shard_metadata_util_test.cpp', 'active_migrations_registry_test.cpp', - 'metadata_manager_test.cpp', 'migration_chunk_cloner_source_legacy_test.cpp', 'sharding_state_test.cpp', ], @@ -255,14 +241,17 @@ env.CppUnitTest( '$BUILD_DIR/mongo/s/catalog/sharding_catalog_client_impl', '$BUILD_DIR/mongo/s/catalog/sharding_catalog_mock', '$BUILD_DIR/mongo/s/sharding_mongod_test_fixture', + '$BUILD_DIR/mongo/s/shard_server_test_fixture', ], ) env.CppUnitTest( target='collection_sharding_state_test', source=[ - 'collection_sharding_state_test.cpp', + 'collection_metadata_test.cpp', 'collection_range_deleter_test.cpp', + 'metadata_manager_test.cpp', + 'collection_sharding_state_test.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/client/remote_command_targeter_mock', @@ -275,6 +264,7 @@ env.CppUnitTest( '$BUILD_DIR/mongo/util/clock_source_mock', '$BUILD_DIR/mongo/util/net/message_port_mock', '$BUILD_DIR/mongo/db/service_context_d_test_fixture', + '$BUILD_DIR/mongo/s/sharding_mongod_test_fixture', ], ) diff --git a/src/mongo/db/s/cleanup_orphaned_cmd.cpp b/src/mongo/db/s/cleanup_orphaned_cmd.cpp index 117a50b57c7..a7a9519300b 100644 --- a/src/mongo/db/s/cleanup_orphaned_cmd.cpp +++ b/src/mongo/db/s/cleanup_orphaned_cmd.cpp @@ -53,6 +53,8 @@ #include "mongo/s/migration_secondary_throttle_options.h" #include "mongo/util/log.h" +#include <boost/optional.hpp> + namespace mongo { using std::string; @@ -78,65 +80,60 @@ CleanupResult cleanupOrphanedData(OperationContext* opCtx, const WriteConcernOptions& secondaryThrottle, BSONObj* stoppedAtKey, string* errMsg) { - BSONObj startingFromKey = startingFromKeyConst; - ScopedCollectionMetadata metadata; + BSONObj startingFromKey = startingFromKeyConst; + boost::optional<ChunkRange> targetRange; + OID epoch; { - AutoGetCollection autoColl(opCtx, ns, MODE_IS); - metadata = CollectionShardingState::get(opCtx, ns.toString())->getMetadata(); - } - - if (!metadata) { - warning() << "skipping orphaned data cleanup for " << ns.toString() + AutoGetCollection autoColl(opCtx, ns, MODE_IX); + auto css = CollectionShardingState::get(opCtx, ns.toString()); + auto metadata = css->getMetadata(); + if (!metadata) { + log() << "skipping orphaned data cleanup for " << ns.toString() << ", collection is not sharded"; + return CleanupResult_Done; + } + epoch = metadata->getCollVersion().epoch(); + + BSONObj keyPattern = metadata->getKeyPattern(); + if (!startingFromKey.isEmpty()) { + if (!metadata->isValidKey(startingFromKey)) { + *errMsg = stream() << "could not cleanup orphaned data, start key " + << startingFromKey << " does not match shard key pattern " + << keyPattern; + + log() << *errMsg; + return CleanupResult_Error; + } + } else { + startingFromKey = metadata->getMinKey(); + } - return CleanupResult_Done; - } - - BSONObj keyPattern = metadata->getKeyPattern(); - if (!startingFromKey.isEmpty()) { - if (!metadata->isValidKey(startingFromKey)) { - *errMsg = stream() << "could not cleanup orphaned data, start key " - << redact(startingFromKey) << " does not match shard key pattern " - << keyPattern; + boost::optional<KeyRange> orphanRange = css->getNextOrphanRange(startingFromKey); + if (!orphanRange) { + LOG(1) << "cleanupOrphaned requested for " << ns.toString() << " starting from " + << redact(startingFromKey) << ", no orphan ranges remain"; - warning() << *errMsg; - return CleanupResult_Error; + return CleanupResult_Done; } - } else { - startingFromKey = metadata->getMinKey(); - } + orphanRange->ns = ns.ns(); + *stoppedAtKey = orphanRange->maxKey; - KeyRange orphanRange; - if (!metadata->getNextOrphanRange(startingFromKey, &orphanRange)) { - LOG(1) << "cleanupOrphaned requested for " << ns.toString() << " starting from " - << redact(startingFromKey) << ", no orphan ranges remain"; + LOG(0) << "cleanupOrphaned requested for " << ns.toString() << " starting from " + << redact(startingFromKey) << ", removing next orphan range" + << " [" << redact(orphanRange->minKey) << "," << redact(orphanRange->maxKey) << ")"; - return CleanupResult_Done; + targetRange.emplace( + ChunkRange(orphanRange->minKey.getOwned(), orphanRange->maxKey.getOwned())); + uassertStatusOK(css->cleanUpRange(*targetRange)); } - orphanRange.ns = ns.ns(); - *stoppedAtKey = orphanRange.maxKey; - - LOG(0) << "cleanupOrphaned requested for " << ns.toString() << " starting from " - << redact(startingFromKey) << ", removing next orphan range" - << " [" << redact(orphanRange.minKey) << "," << redact(orphanRange.maxKey) << ")"; - - // Metadata snapshot may be stale now, but deleter checks metadata again in write lock - // before delete. - RangeDeleterOptions deleterOptions(orphanRange); - deleterOptions.writeConcern = secondaryThrottle; - deleterOptions.onlyRemoveOrphanedDocs = true; - deleterOptions.fromMigrate = true; - // Must wait for cursors since there can be existing cursors with an older - // CollectionMetadata. - deleterOptions.waitForOpenCursors = true; - deleterOptions.removeSaverReason = "cleanup-cmd"; - - if (!getDeleter()->deleteNow(opCtx, deleterOptions, errMsg)) { - warning() << redact(*errMsg); - return CleanupResult_Error; + if (targetRange) { + auto result = CollectionShardingState::waitForClean(opCtx, ns, epoch, *targetRange); + if (!result.isOK()) { + warning() << redact(result.reason()); + return CleanupResult_Error; + } } - return CleanupResult_Continue; } diff --git a/src/mongo/db/s/collection_metadata.cpp b/src/mongo/db/s/collection_metadata.cpp index b18e325adb0..f9e1a32ac51 100644 --- a/src/mongo/db/s/collection_metadata.cpp +++ b/src/mongo/db/s/collection_metadata.cpp @@ -49,7 +49,6 @@ CollectionMetadata::CollectionMetadata(const BSONObj& keyPattern, _collVersion(collectionVersion), _shardVersion(shardVersion), _chunksMap(std::move(shardChunksMap)), - _pendingMap(SimpleBSONObjComparator::kInstance.makeBSONObjIndexedMap<CachedChunkInfo>()), _rangesMap(SimpleBSONObjComparator::kInstance.makeBSONObjIndexedMap<CachedChunkInfo>()) { invariant(_shardKeyPattern.isValid()); @@ -61,16 +60,24 @@ CollectionMetadata::CollectionMetadata(const BSONObj& keyPattern, invariant(!_shardVersion.isSet()); return; } - invariant(_shardVersion.isSet()); - // Load the chunk information, coallesceing their ranges. The version for this shard would be + _buildRangesMap(); +} + +CollectionMetadata::~CollectionMetadata() = default; + +void CollectionMetadata::_buildRangesMap() { + _rangesMap.clear(); + + // Load the chunk information, coalescing their ranges. The version for this shard would be // the highest version for any of the chunks. + BSONObj min, max; for (const auto& entry : _chunksMap) { - BSONObj currMin = entry.first; - BSONObj currMax = entry.second.getMaxKey(); + BSONObj const& currMin = entry.first; + BSONObj const& currMax = entry.second.getMaxKey(); // Coalesce the chunk's bounds in ranges if they are adjacent chunks if (min.isEmpty()) { @@ -96,54 +103,9 @@ CollectionMetadata::CollectionMetadata(const BSONObj& keyPattern, _rangesMap.emplace(min, CachedChunkInfo(max, ChunkVersion::IGNORED())); } -CollectionMetadata::~CollectionMetadata() = default; - -std::unique_ptr<CollectionMetadata> CollectionMetadata::cloneMinusPending( - const ChunkType& chunk) const { - invariant(rangeMapContains(_pendingMap, chunk.getMin(), chunk.getMax())); - - auto metadata(stdx::make_unique<CollectionMetadata>( - _shardKeyPattern.toBSON(), getCollVersion(), getShardVersion(), getChunks())); - metadata->_pendingMap = _pendingMap; - - metadata->_pendingMap.erase(chunk.getMin()); - - return metadata; -} - -std::unique_ptr<CollectionMetadata> CollectionMetadata::clonePlusPending( - const ChunkType& chunk) const { - invariant(!rangeMapOverlaps(_chunksMap, chunk.getMin(), chunk.getMax())); - - auto metadata(stdx::make_unique<CollectionMetadata>( - _shardKeyPattern.toBSON(), getCollVersion(), getShardVersion(), getChunks())); - metadata->_pendingMap = _pendingMap; - - // If there are any pending chunks on the interval to be added this is ok, since pending chunks - // aren't officially tracked yet and something may have changed on servers we do not see yet. - // - // We remove any chunks we overlap because the remote request starting a chunk migration is what - // is authoritative. - - if (rangeMapOverlaps(_pendingMap, chunk.getMin(), chunk.getMax())) { - RangeVector pendingOverlap; - getRangeMapOverlap(_pendingMap, chunk.getMin(), chunk.getMax(), &pendingOverlap); - - warning() << "new pending chunk " << redact(rangeToString(chunk.getMin(), chunk.getMax())) - << " overlaps existing pending chunks " << redact(overlapToString(pendingOverlap)) - << ", a migration may not have completed"; - - for (RangeVector::iterator it = pendingOverlap.begin(); it != pendingOverlap.end(); ++it) { - metadata->_pendingMap.erase(it->first); - } - } - - // The pending map entry cannot contain a specific chunk version because we don't know what - // version would be generated for it at commit time. That's why we insert an IGNORED value. - metadata->_pendingMap.emplace(chunk.getMin(), - CachedChunkInfo(chunk.getMax(), ChunkVersion::IGNORED())); - - return metadata; +std::unique_ptr<CollectionMetadata> CollectionMetadata::clone() const { + return stdx::make_unique<CollectionMetadata>( + _shardKeyPattern.toBSON(), getCollVersion(), getShardVersion(), getChunks()); } bool CollectionMetadata::keyBelongsToMe(const BSONObj& key) const { @@ -158,18 +120,6 @@ bool CollectionMetadata::keyBelongsToMe(const BSONObj& key) const { return rangeContains(it->first, it->second.getMaxKey(), key); } -bool CollectionMetadata::keyIsPending(const BSONObj& key) const { - if (_pendingMap.empty()) { - return false; - } - - auto it = _pendingMap.upper_bound(key); - if (it != _pendingMap.begin()) - it--; - - return rangeContains(it->first, it->second.getMaxKey(), key); -} - bool CollectionMetadata::getNextChunk(const BSONObj& lookupKey, ChunkType* chunk) const { RangeMap::const_iterator upperChunkIt = _chunksMap.upper_bound(lookupKey); RangeMap::const_iterator lowerChunkIt = upperChunkIt; @@ -238,6 +188,10 @@ Status CollectionMetadata::checkChunkIsValid(const ChunkType& chunk) { return Status::OK(); } +bool CollectionMetadata::rangeOverlapsChunk(ChunkRange const& range) { + return rangeMapOverlaps(_rangesMap, range.getMin(), range.getMax()); +} + void CollectionMetadata::toBSONBasic(BSONObjBuilder& bb) const { _collVersion.addToBSON(bb, "collVersion"); _shardVersion.addToBSON(bb, "shardVersion"); @@ -256,100 +210,71 @@ void CollectionMetadata::toBSONChunks(BSONArrayBuilder& bb) const { } } -void CollectionMetadata::toBSONPending(BSONArrayBuilder& bb) const { - if (_pendingMap.empty()) - return; - - for (RangeMap::const_iterator it = _pendingMap.begin(); it != _pendingMap.end(); ++it) { - BSONArrayBuilder pendingBB(bb.subarrayStart()); - pendingBB.append(it->first); - pendingBB.append(it->second.getMaxKey()); - pendingBB.done(); - } -} - std::string CollectionMetadata::toStringBasic() const { return str::stream() << "collection version: " << _collVersion.toString() << ", shard version: " << _shardVersion.toString(); } -bool CollectionMetadata::getNextOrphanRange(const BSONObj& origLookupKey, KeyRange* range) const { +boost::optional<KeyRange> CollectionMetadata::getNextOrphanRange( + RangeMap const& receivingChunks, BSONObj const& origLookupKey) const { + BSONObj lookupKey = origLookupKey; BSONObj maxKey = getMaxKey(); // so we don't keep rebuilding while (lookupKey.woCompare(maxKey) < 0) { - RangeMap::const_iterator lowerChunkIt = _chunksMap.end(); - RangeMap::const_iterator upperChunkIt = _chunksMap.end(); - - if (!_chunksMap.empty()) { - upperChunkIt = _chunksMap.upper_bound(lookupKey); - lowerChunkIt = upperChunkIt; - if (upperChunkIt != _chunksMap.begin()) - --lowerChunkIt; - else - lowerChunkIt = _chunksMap.end(); - } - // If we overlap, continue after the overlap - // TODO: Could optimize slightly by finding next non-contiguous chunk - if (lowerChunkIt != _chunksMap.end() && - lowerChunkIt->second.getMaxKey().woCompare(lookupKey) > 0) { - lookupKey = lowerChunkIt->second.getMaxKey(); + using Its = std::pair<RangeMap::const_iterator, RangeMap::const_iterator>; + + auto patchLookupKey = [&](RangeMap const& map) -> boost::optional<Its> { + auto lowerIt = map.end(), upperIt = map.end(); + + if (!map.empty()) { + upperIt = map.upper_bound(lookupKey); + lowerIt = upperIt; + if (upperIt != map.begin()) + --lowerIt; + else + lowerIt = map.end(); + } + + // If we overlap, continue after the overlap + // TODO: Could optimize slightly by finding next non-contiguous chunk + if (lowerIt != map.end() && lowerIt->second.getMaxKey().woCompare(lookupKey) > 0) { + lookupKey = lowerIt->second.getMaxKey(); // note side effect + return boost::none; + } else { + return Its(lowerIt, upperIt); + } + }; + + boost::optional<Its> chunksIts, pendingIts; + if (!(chunksIts = patchLookupKey(_chunksMap)) || + !(pendingIts = patchLookupKey(receivingChunks))) { continue; } - RangeMap::const_iterator lowerPendingIt = _pendingMap.end(); - RangeMap::const_iterator upperPendingIt = _pendingMap.end(); - - if (!_pendingMap.empty()) { - upperPendingIt = _pendingMap.upper_bound(lookupKey); - lowerPendingIt = upperPendingIt; - if (upperPendingIt != _pendingMap.begin()) - --lowerPendingIt; - else - lowerPendingIt = _pendingMap.end(); - } - - // If we overlap, continue after the overlap - // TODO: Could optimize slightly by finding next non-contiguous chunk - if (lowerPendingIt != _pendingMap.end() && - lowerPendingIt->second.getMaxKey().woCompare(lookupKey) > 0) { - lookupKey = lowerPendingIt->second.getMaxKey(); - continue; - } - - // - // We know that the lookup key is not covered by a chunk or pending range, and where the - // previous chunk and pending chunks are. Now we fill in the bounds as the closest - // bounds of the surrounding ranges in both maps. - // - - range->keyPattern = _shardKeyPattern.toBSON(); - range->minKey = getMinKey(); - range->maxKey = maxKey; - - if (lowerChunkIt != _chunksMap.end() && - lowerChunkIt->second.getMaxKey().woCompare(range->minKey) > 0) { - range->minKey = lowerChunkIt->second.getMaxKey(); - } - - if (upperChunkIt != _chunksMap.end() && upperChunkIt->first.woCompare(range->maxKey) < 0) { - range->maxKey = upperChunkIt->first; - } - - if (lowerPendingIt != _pendingMap.end() && - lowerPendingIt->second.getMaxKey().woCompare(range->minKey) > 0) { - range->minKey = lowerPendingIt->second.getMaxKey(); - } - - if (upperPendingIt != _pendingMap.end() && - upperPendingIt->first.woCompare(range->maxKey) < 0) { - range->maxKey = upperPendingIt->first; - } - - return true; + boost::optional<KeyRange> range = + KeyRange("", getMinKey(), maxKey, _shardKeyPattern.toBSON()); + + auto patchArgRange = [&range](RangeMap const& map, Its its) { + // We know that the lookup key is not covered by a chunk or pending range, and where the + // previous chunk and pending chunks are. Now we fill in the bounds as the closest + // bounds of the surrounding ranges in both maps. + auto lowerIt = its.first, upperIt = its.second; + + if (lowerIt != map.end() && lowerIt->second.getMaxKey().woCompare(range->minKey) > 0) { + range->minKey = lowerIt->second.getMaxKey(); + } + if (upperIt != map.end() && upperIt->first.woCompare(range->maxKey) < 0) { + range->maxKey = upperIt->first; + } + }; + + patchArgRange(_chunksMap, *chunksIts); + patchArgRange(receivingChunks, *pendingIts); + return range; } - return false; + return boost::none; } BSONObj CollectionMetadata::getMinKey() const { diff --git a/src/mongo/db/s/collection_metadata.h b/src/mongo/db/s/collection_metadata.h index ea7392842d6..de318225ec6 100644 --- a/src/mongo/db/s/collection_metadata.h +++ b/src/mongo/db/s/collection_metadata.h @@ -29,6 +29,7 @@ #pragma once #include "mongo/db/range_arithmetic.h" +#include "mongo/s/catalog/type_chunk.h" #include "mongo/s/chunk_version.h" #include "mongo/s/shard_key_pattern.h" @@ -51,10 +52,7 @@ class ChunkType; class CollectionMetadata { public: /** - * The main way to construct CollectionMetadata is through MetadataLoader or the clone*() - * methods. - * - * The constructors should not be used directly outside of tests. + * The main way to construct CollectionMetadata is through MetadataLoader or clone() methods. */ CollectionMetadata(const BSONObj& keyPattern, ChunkVersion collectionVersion, @@ -64,20 +62,9 @@ public: ~CollectionMetadata(); /** - * Returns a new metadata's instance based on 'this's state by removing a 'pending' chunk. - * - * The shard and collection version of the new metadata are unaffected. The caller owns the - * new metadata. - */ - std::unique_ptr<CollectionMetadata> cloneMinusPending(const ChunkType& chunk) const; - - /** - * Returns a new metadata's instance based on 'this's state by adding a 'pending' chunk. - * - * The shard and collection version of the new metadata are unaffected. The caller owns the - * new metadata. + * Returns a new metadata's instance based on 'this's state; */ - std::unique_ptr<CollectionMetadata> clonePlusPending(const ChunkType& chunk) const; + std::unique_ptr<CollectionMetadata> clone() const; /** * Returns true if the document key 'key' is a valid instance of a shard key for this @@ -93,12 +80,6 @@ public: bool keyBelongsToMe(const BSONObj& key) const; /** - * Returns true if the document key 'key' is or has been migrated to this shard, and may - * belong to us after a subsequent config reload. Key must be the full shard key. - */ - bool keyIsPending(const BSONObj& key) const; - - /** * Given a key 'lookupKey' in the shard key range, get the next chunk which overlaps or is * greater than this key. Returns true if a chunk exists, false otherwise. * @@ -117,6 +98,11 @@ public: Status checkChunkIsValid(const ChunkType& chunk); /** + * Returns true if the argument range overlaps any chunk. + */ + bool rangeOverlapsChunk(ChunkRange const& range); + + /** * Given a key in the shard key range, get the next range which overlaps or is greater than * this key. * @@ -124,15 +110,17 @@ public: * * KeyRange range; * BSONObj lookupKey = metadata->getMinKey(); - * while( metadata->getNextOrphanRange( lookupKey, &orphanRange ) ) { - * // Do stuff with range - * lookupKey = orphanRange.maxKey; + * boost::optional<KeyRange> range; + * while((range = metadata->getNextOrphanRange(receiveMap, lookupKey))) { + * lookupKey = range->maxKey; * } * * @param lookupKey passing a key that does not belong to this metadata is undefined. + * @param receiveMap is an extra set of chunks not considered orphaned. * @param orphanRange the output range. Note that the NS is not set. */ - bool getNextOrphanRange(const BSONObj& lookupKey, KeyRange* orphanRange) const; + boost::optional<KeyRange> getNextOrphanRange(RangeMap const& receiveMap, + BSONObj const& lookupKey) const; ChunkVersion getCollVersion() const { return _collVersion; @@ -173,16 +161,16 @@ public: void toBSONChunks(BSONArrayBuilder& bb) const; /** - * BSON output of the pending metadata into a BSONArray - */ - void toBSONPending(BSONArrayBuilder& bb) const; - - /** * String output of the collection and shard versions. */ std::string toStringBasic() const; private: + /** + * Builds _rangesMap from the contents of _chunksMap. + */ + void _buildRangesMap(); + // Shard key pattern for the collection ShardKeyPattern _shardKeyPattern; @@ -196,10 +184,7 @@ private: // Map of chunks tracked by this shard RangeMap _chunksMap; - // Map of ranges of chunks that are migrating but have not been confirmed added yet - RangeMap _pendingMap; - - // A second map from a min key into a range or contiguous chunks. The map is redundant + // A second map from a min key into a range of contiguous chunks. The map is redundant // w.r.t. _chunkMap but we expect high chunk contiguity, especially in small // installations. RangeMap _rangesMap; diff --git a/src/mongo/db/s/collection_metadata_test.cpp b/src/mongo/db/s/collection_metadata_test.cpp index c765c67ee3a..999e5282490 100644 --- a/src/mongo/db/s/collection_metadata_test.cpp +++ b/src/mongo/db/s/collection_metadata_test.cpp @@ -29,6 +29,7 @@ #include "mongo/platform/basic.h" #include "mongo/base/status.h" +#include "mongo/db/range_arithmetic.h" #include "mongo/db/s/collection_metadata.h" #include "mongo/s/catalog/type_chunk.h" #include "mongo/s/chunk_version.h" @@ -53,6 +54,11 @@ protected: } }; +struct stRangeMap : public RangeMap { + stRangeMap() + : RangeMap(SimpleBSONObjComparator::kInstance.makeBSONObjIndexedMap<CachedChunkInfo>()) {} +}; + TEST_F(NoChunkFixture, BasicBelongsToMe) { ASSERT_FALSE(makeCollectionMetadata()->keyBelongsToMe(BSON("a" << MINKEY))); ASSERT_FALSE(makeCollectionMetadata()->keyBelongsToMe(BSON("a" << 10))); @@ -86,147 +92,42 @@ TEST_F(NoChunkFixture, getDifferentFromEmpty) { &differentChunk)); } -TEST_F(NoChunkFixture, NoPendingChunks) { - ASSERT(!makeCollectionMetadata()->keyIsPending(BSON("a" << 15))); - ASSERT(!makeCollectionMetadata()->keyIsPending(BSON("a" << 25))); -} - -TEST_F(NoChunkFixture, FirstPendingChunk) { - ChunkType chunk; - chunk.setMin(BSON("a" << 10)); - chunk.setMax(BSON("a" << 20)); - - auto cloned(makeCollectionMetadata()->clonePlusPending(chunk)); - ASSERT(cloned->keyIsPending(BSON("a" << 15))); - ASSERT(!cloned->keyIsPending(BSON("a" << 25))); - ASSERT(cloned->keyIsPending(BSON("a" << 10))); - ASSERT(!cloned->keyIsPending(BSON("a" << 20))); -} - -TEST_F(NoChunkFixture, EmptyMultiPendingChunk) { - ChunkType chunk; - chunk.setMin(BSON("a" << 10)); - chunk.setMax(BSON("a" << 20)); - - auto cloned(makeCollectionMetadata()->clonePlusPending(chunk)); - - chunk.setMin(BSON("a" << 40)); - chunk.setMax(BSON("a" << 50)); - - cloned = cloned->clonePlusPending(chunk); - ASSERT(cloned->keyIsPending(BSON("a" << 15))); - ASSERT(!cloned->keyIsPending(BSON("a" << 25))); - ASSERT(cloned->keyIsPending(BSON("a" << 45))); - ASSERT(!cloned->keyIsPending(BSON("a" << 55))); -} - -TEST_F(NoChunkFixture, MinusPendingChunk) { - ChunkType chunk; - chunk.setMin(BSON("a" << 10)); - chunk.setMax(BSON("a" << 20)); - - auto cloned(makeCollectionMetadata()->clonePlusPending(chunk)); - - cloned = cloned->cloneMinusPending(chunk); - ASSERT(!cloned->keyIsPending(BSON("a" << 15))); - ASSERT(!cloned->keyIsPending(BSON("a" << 25))); -} - -TEST_F(NoChunkFixture, OverlappingPendingChunk) { - ChunkType chunk; - chunk.setMin(BSON("a" << 10)); - chunk.setMax(BSON("a" << 30)); - - auto cloned(makeCollectionMetadata()->clonePlusPending(chunk)); - - chunk.setMin(BSON("a" << 20)); - chunk.setMax(BSON("a" << 40)); - - cloned = cloned->clonePlusPending(chunk); - ASSERT(!cloned->keyIsPending(BSON("a" << 15))); - ASSERT(cloned->keyIsPending(BSON("a" << 25))); - ASSERT(cloned->keyIsPending(BSON("a" << 35))); - ASSERT(!cloned->keyIsPending(BSON("a" << 45))); -} - -TEST_F(NoChunkFixture, OverlappingPendingChunks) { - ChunkType chunk; - chunk.setMin(BSON("a" << 10)); - chunk.setMax(BSON("a" << 30)); - - auto cloned(makeCollectionMetadata()->clonePlusPending(chunk)); - - chunk.setMin(BSON("a" << 30)); - chunk.setMax(BSON("a" << 50)); - - cloned = cloned->clonePlusPending(chunk); - - chunk.setMin(BSON("a" << 20)); - chunk.setMax(BSON("a" << 40)); - - cloned = cloned->clonePlusPending(chunk); - - ASSERT(!cloned->keyIsPending(BSON("a" << 15))); - ASSERT(cloned->keyIsPending(BSON("a" << 25))); - ASSERT(cloned->keyIsPending(BSON("a" << 35))); - ASSERT(!cloned->keyIsPending(BSON("a" << 45))); -} - TEST_F(NoChunkFixture, OrphanedDataRangeBegin) { auto metadata(makeCollectionMetadata()); - KeyRange keyRange; + stRangeMap pending; BSONObj lookupKey = metadata->getMinKey(); - ASSERT(metadata->getNextOrphanRange(lookupKey, &keyRange)); + auto keyRange = metadata->getNextOrphanRange(pending, lookupKey); + ASSERT(keyRange); - ASSERT(keyRange.minKey.woCompare(metadata->getMinKey()) == 0); - ASSERT(keyRange.maxKey.woCompare(metadata->getMaxKey()) == 0); + ASSERT(keyRange->minKey.woCompare(metadata->getMinKey()) == 0); + ASSERT(keyRange->maxKey.woCompare(metadata->getMaxKey()) == 0); // Make sure we don't have any more ranges - ASSERT(!metadata->getNextOrphanRange(keyRange.maxKey, &keyRange)); + ASSERT(!metadata->getNextOrphanRange(pending, keyRange->maxKey)); } TEST_F(NoChunkFixture, OrphanedDataRangeMiddle) { auto metadata(makeCollectionMetadata()); - KeyRange keyRange; + stRangeMap pending; BSONObj lookupKey = BSON("a" << 20); - ASSERT(metadata->getNextOrphanRange(lookupKey, &keyRange)); + auto keyRange = metadata->getNextOrphanRange(pending, lookupKey); + ASSERT(keyRange); - ASSERT(keyRange.minKey.woCompare(metadata->getMinKey()) == 0); - ASSERT(keyRange.maxKey.woCompare(metadata->getMaxKey()) == 0); - ASSERT(keyRange.keyPattern.woCompare(metadata->getKeyPattern()) == 0); + ASSERT(keyRange->minKey.woCompare(metadata->getMinKey()) == 0); + ASSERT(keyRange->maxKey.woCompare(metadata->getMaxKey()) == 0); + ASSERT(keyRange->keyPattern.woCompare(metadata->getKeyPattern()) == 0); // Make sure we don't have any more ranges - ASSERT(!metadata->getNextOrphanRange(keyRange.maxKey, &keyRange)); + ASSERT(!metadata->getNextOrphanRange(pending, keyRange->maxKey)); } TEST_F(NoChunkFixture, OrphanedDataRangeEnd) { auto metadata(makeCollectionMetadata()); - KeyRange keyRange; - ASSERT(!metadata->getNextOrphanRange(metadata->getMaxKey(), &keyRange)); -} - -TEST_F(NoChunkFixture, PendingOrphanedDataRanges) { - ChunkType chunk; - chunk.setMin(BSON("a" << 10)); - chunk.setMax(BSON("a" << 20)); - - auto cloned(makeCollectionMetadata()->clonePlusPending(chunk)); - - KeyRange keyRange; - ASSERT(cloned->getNextOrphanRange(cloned->getMinKey(), &keyRange)); - ASSERT(keyRange.minKey.woCompare(cloned->getMinKey()) == 0); - ASSERT(keyRange.maxKey.woCompare(BSON("a" << 10)) == 0); - ASSERT(keyRange.keyPattern.woCompare(cloned->getKeyPattern()) == 0); - - ASSERT(cloned->getNextOrphanRange(keyRange.maxKey, &keyRange)); - ASSERT(keyRange.minKey.woCompare(BSON("a" << 20)) == 0); - ASSERT(keyRange.maxKey.woCompare(cloned->getMaxKey()) == 0); - ASSERT(keyRange.keyPattern.woCompare(cloned->getKeyPattern()) == 0); - - ASSERT(!cloned->getNextOrphanRange(keyRange.maxKey, &keyRange)); + stRangeMap pending; + ASSERT(!metadata->getNextOrphanRange(pending, metadata->getMaxKey())); } /** @@ -265,7 +166,7 @@ TEST_F(SingleChunkFixture, DoesntBelongsToMe) { ASSERT_FALSE(makeCollectionMetadata()->keyBelongsToMe(BSON("a" << MAXKEY))); } -TEST_F(SingleChunkFixture, CompoudKeyBelongsToMe) { +TEST_F(SingleChunkFixture, CompoundKeyBelongsToMe) { ASSERT(makeCollectionMetadata()->keyBelongsToMe(BSON("a" << 15 << "a" << 14))); } @@ -288,33 +189,23 @@ TEST_F(SingleChunkFixture, getDifferentFromOneIsFalse) { ASSERT(!makeCollectionMetadata()->getDifferentChunk(BSON("a" << 10), &differentChunk)); } -TEST_F(SingleChunkFixture, PlusPendingChunk) { - ChunkType chunk; - chunk.setMin(BSON("a" << 20)); - chunk.setMax(BSON("a" << 30)); +TEST_F(SingleChunkFixture, ChunkOrphanedDataRanges) { + stRangeMap pending; + auto keyRange = makeCollectionMetadata()->getNextOrphanRange( + pending, makeCollectionMetadata()->getMinKey()); + ASSERT(keyRange); - auto cloned(makeCollectionMetadata()->clonePlusPending(chunk)); + ASSERT(keyRange->minKey.woCompare(makeCollectionMetadata()->getMinKey()) == 0); + ASSERT(keyRange->maxKey.woCompare(BSON("a" << 10)) == 0); + ASSERT(keyRange->keyPattern.woCompare(makeCollectionMetadata()->getKeyPattern()) == 0); - ASSERT(cloned->keyBelongsToMe(BSON("a" << 15))); - ASSERT(!cloned->keyBelongsToMe(BSON("a" << 25))); - ASSERT(!cloned->keyIsPending(BSON("a" << 15))); - ASSERT(cloned->keyIsPending(BSON("a" << 25))); -} + keyRange = makeCollectionMetadata()->getNextOrphanRange(pending, keyRange->maxKey); + ASSERT(keyRange); + ASSERT(keyRange->minKey.woCompare(BSON("a" << 20)) == 0); + ASSERT(keyRange->maxKey.woCompare(makeCollectionMetadata()->getMaxKey()) == 0); + ASSERT(keyRange->keyPattern.woCompare(makeCollectionMetadata()->getKeyPattern()) == 0); -TEST_F(SingleChunkFixture, ChunkOrphanedDataRanges) { - KeyRange keyRange; - ASSERT(makeCollectionMetadata()->getNextOrphanRange(makeCollectionMetadata()->getMinKey(), - &keyRange)); - ASSERT(keyRange.minKey.woCompare(makeCollectionMetadata()->getMinKey()) == 0); - ASSERT(keyRange.maxKey.woCompare(BSON("a" << 10)) == 0); - ASSERT(keyRange.keyPattern.woCompare(makeCollectionMetadata()->getKeyPattern()) == 0); - - ASSERT(makeCollectionMetadata()->getNextOrphanRange(keyRange.maxKey, &keyRange)); - ASSERT(keyRange.minKey.woCompare(BSON("a" << 20)) == 0); - ASSERT(keyRange.maxKey.woCompare(makeCollectionMetadata()->getMaxKey()) == 0); - ASSERT(keyRange.keyPattern.woCompare(makeCollectionMetadata()->getKeyPattern()) == 0); - - ASSERT(!makeCollectionMetadata()->getNextOrphanRange(keyRange.maxKey, &keyRange)); + ASSERT(!makeCollectionMetadata()->getNextOrphanRange(pending, keyRange->maxKey)); } /** @@ -342,7 +233,7 @@ protected: // Note: no tests for single key belongsToMe because they are not allowed // if shard key is compound. -TEST_F(SingleChunkMinMaxCompoundKeyFixture, CompoudKeyBelongsToMe) { +TEST_F(SingleChunkMinMaxCompoundKeyFixture, CompoundKeyBelongsToMe) { ASSERT(makeCollectionMetadata()->keyBelongsToMe(BSON("a" << MINKEY << "b" << MINKEY))); ASSERT_FALSE(makeCollectionMetadata()->keyBelongsToMe(BSON("a" << MAXKEY << "b" << MAXKEY))); ASSERT(makeCollectionMetadata()->keyBelongsToMe(BSON("a" << MINKEY << "b" << 10))); @@ -375,45 +266,27 @@ protected: }; TEST_F(TwoChunksWithGapCompoundKeyFixture, ChunkGapOrphanedDataRanges) { - KeyRange keyRange; - ASSERT(makeCollectionMetadata()->getNextOrphanRange(makeCollectionMetadata()->getMinKey(), - &keyRange)); - ASSERT(keyRange.minKey.woCompare(makeCollectionMetadata()->getMinKey()) == 0); - ASSERT(keyRange.maxKey.woCompare(BSON("a" << 10 << "b" << 0)) == 0); - ASSERT(keyRange.keyPattern.woCompare(makeCollectionMetadata()->getKeyPattern()) == 0); - - ASSERT(makeCollectionMetadata()->getNextOrphanRange(keyRange.maxKey, &keyRange)); - ASSERT(keyRange.minKey.woCompare(BSON("a" << 20 << "b" << 0)) == 0); - ASSERT(keyRange.maxKey.woCompare(BSON("a" << 30 << "b" << 0)) == 0); - ASSERT(keyRange.keyPattern.woCompare(makeCollectionMetadata()->getKeyPattern()) == 0); - - ASSERT(makeCollectionMetadata()->getNextOrphanRange(keyRange.maxKey, &keyRange)); - ASSERT(keyRange.minKey.woCompare(BSON("a" << 40 << "b" << 0)) == 0); - ASSERT(keyRange.maxKey.woCompare(makeCollectionMetadata()->getMaxKey()) == 0); - ASSERT(keyRange.keyPattern.woCompare(makeCollectionMetadata()->getKeyPattern()) == 0); - - ASSERT(!makeCollectionMetadata()->getNextOrphanRange(keyRange.maxKey, &keyRange)); -} - -TEST_F(TwoChunksWithGapCompoundKeyFixture, ChunkGapAndPendingOrphanedDataRanges) { - ChunkType chunk; - chunk.setMin(BSON("a" << 20 << "b" << 0)); - chunk.setMax(BSON("a" << 30 << "b" << 0)); - - auto cloned(makeCollectionMetadata()->clonePlusPending(chunk)); - - KeyRange keyRange; - ASSERT(cloned->getNextOrphanRange(cloned->getMinKey(), &keyRange)); - ASSERT(keyRange.minKey.woCompare(cloned->getMinKey()) == 0); - ASSERT(keyRange.maxKey.woCompare(BSON("a" << 10 << "b" << 0)) == 0); - ASSERT(keyRange.keyPattern.woCompare(cloned->getKeyPattern()) == 0); - - ASSERT(cloned->getNextOrphanRange(keyRange.maxKey, &keyRange)); - ASSERT(keyRange.minKey.woCompare(BSON("a" << 40 << "b" << 0)) == 0); - ASSERT(keyRange.maxKey.woCompare(cloned->getMaxKey()) == 0); - ASSERT(keyRange.keyPattern.woCompare(cloned->getKeyPattern()) == 0); - - ASSERT(!cloned->getNextOrphanRange(keyRange.maxKey, &keyRange)); + stRangeMap pending; + auto keyRange = makeCollectionMetadata()->getNextOrphanRange( + pending, makeCollectionMetadata()->getMinKey()); + ASSERT(keyRange); + ASSERT(keyRange->minKey.woCompare(makeCollectionMetadata()->getMinKey()) == 0); + ASSERT(keyRange->maxKey.woCompare(BSON("a" << 10 << "b" << 0)) == 0); + ASSERT(keyRange->keyPattern.woCompare(makeCollectionMetadata()->getKeyPattern()) == 0); + + keyRange = makeCollectionMetadata()->getNextOrphanRange(pending, keyRange->maxKey); + ASSERT(keyRange); + ASSERT(keyRange->minKey.woCompare(BSON("a" << 20 << "b" << 0)) == 0); + ASSERT(keyRange->maxKey.woCompare(BSON("a" << 30 << "b" << 0)) == 0); + ASSERT(keyRange->keyPattern.woCompare(makeCollectionMetadata()->getKeyPattern()) == 0); + + keyRange = makeCollectionMetadata()->getNextOrphanRange(pending, keyRange->maxKey); + ASSERT(keyRange); + ASSERT(keyRange->minKey.woCompare(BSON("a" << 40 << "b" << 0)) == 0); + ASSERT(keyRange->maxKey.woCompare(makeCollectionMetadata()->getMaxKey()) == 0); + ASSERT(keyRange->keyPattern.woCompare(makeCollectionMetadata()->getKeyPattern()) == 0); + + ASSERT(!makeCollectionMetadata()->getNextOrphanRange(pending, keyRange->maxKey)); } /** diff --git a/src/mongo/db/s/collection_range_deleter.cpp b/src/mongo/db/s/collection_range_deleter.cpp index 0a267cd263d..8ffbf1f0d21 100644 --- a/src/mongo/db/s/collection_range_deleter.cpp +++ b/src/mongo/db/s/collection_range_deleter.cpp @@ -33,6 +33,7 @@ #include "mongo/db/s/collection_range_deleter.h" #include <algorithm> +#include <utility> #include "mongo/db/catalog/collection.h" #include "mongo/db/client.h" @@ -41,13 +42,16 @@ #include "mongo/db/exec/working_set_common.h" #include "mongo/db/index/index_descriptor.h" #include "mongo/db/keypattern.h" +#include "mongo/db/operation_context.h" #include "mongo/db/query/internal_plans.h" #include "mongo/db/query/query_knobs.h" #include "mongo/db/query/query_planner.h" #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/db/s/collection_sharding_state.h" +#include "mongo/db/s/metadata_manager.h" #include "mongo/db/s/sharding_state.h" +#include "mongo/db/service_context.h" #include "mongo/db/write_concern.h" #include "mongo/executor/task_executor.h" #include "mongo/util/log.h" @@ -57,7 +61,6 @@ namespace mongo { class ChunkRange; -class OldClientWriteContext; using CallbackArgs = executor::TaskExecutor::CallbackArgs; using logger::LogComponent; @@ -67,123 +70,144 @@ namespace { const WriteConcernOptions kMajorityWriteConcern(WriteConcernOptions::kMajority, WriteConcernOptions::SyncMode::UNSET, Seconds(60)); - } // unnamed namespace -CollectionRangeDeleter::CollectionRangeDeleter(NamespaceString nss) : _nss(std::move(nss)) {} - -void CollectionRangeDeleter::run() { - Client::initThread(getThreadName()); - ON_BLOCK_EXIT([&] { Client::destroy(); }); - auto opCtx = cc().makeOperationContext().get(); - - const int maxToDelete = std::max(int(internalQueryExecYieldIterations.load()), 1); - bool hasNextRangeToClean = cleanupNextRange(opCtx, maxToDelete); - - // If there are more ranges to run, we add <this> back onto the task executor to run again. - if (hasNextRangeToClean) { - auto executor = ShardingState::get(opCtx)->getRangeDeleterTaskExecutor(); - executor->scheduleWork([this](const CallbackArgs& cbArgs) { run(); }); - } else { - delete this; - } +CollectionRangeDeleter::~CollectionRangeDeleter() { + // notify anybody still sleeping on orphan ranges + clear(Status{ErrorCodes::InterruptedDueToReplStateChange, + "Collection sharding metadata destroyed"}); } -bool CollectionRangeDeleter::cleanupNextRange(OperationContext* opCtx, int maxToDelete) { - +bool CollectionRangeDeleter::cleanUpNextRange(OperationContext* opCtx, + NamespaceString const& nss, + int maxToDelete, + CollectionRangeDeleter* rangeDeleterForTestOnly) { + StatusWith<int> wrote = 0; + auto range = boost::optional<ChunkRange>(boost::none); { - AutoGetCollection autoColl(opCtx, _nss, MODE_IX); + AutoGetCollection autoColl(opCtx, nss, MODE_IX); auto* collection = autoColl.getCollection(); if (!collection) { - return false; + return false; // collection was dropped } - auto* collectionShardingState = CollectionShardingState::get(opCtx, _nss); - dassert(collectionShardingState != nullptr); // every collection gets one - auto& metadataManager = collectionShardingState->_metadataManager; + auto* css = CollectionShardingState::get(opCtx, nss); + { + auto scopedCollectionMetadata = css->getMetadata(); + if (!scopedCollectionMetadata) { + return false; // collection was unsharded + } - if (!_rangeInProgress && !metadataManager.hasRangesToClean()) { - // Nothing left to do - return false; - } + // We don't actually know if this is the same collection that we were originally + // scheduled to do deletions on, or another one with the same name. But it doesn't + // matter: if it has deletions scheduled, now is as good a time as any to do them. + auto self = rangeDeleterForTestOnly ? rangeDeleterForTestOnly + : &css->_metadataManager._rangesToClean; + { + stdx::lock_guard<stdx::mutex> scopedLock(css->_metadataManager._managerLock); + if (self->isEmpty()) + return false; + + const auto& frontRange = self->_orphans.front().range; + range.emplace(frontRange.getMin().getOwned(), frontRange.getMax().getOwned()); + } - if (!_rangeInProgress || !metadataManager.isInRangesToClean(_rangeInProgress.get())) { - // No valid chunk in progress, get a new one - if (!metadataManager.hasRangesToClean()) { - return false; + try { + auto keyPattern = scopedCollectionMetadata->getKeyPattern(); + + wrote = self->_doDeletion(opCtx, collection, keyPattern, *range, maxToDelete); + } catch (const DBException& e) { + wrote = e.toStatus(); + warning() << e.what(); } - _rangeInProgress = metadataManager.getNextRangeToClean(); - } - auto scopedCollectionMetadata = collectionShardingState->getMetadata(); - int numDocumentsDeleted = - _doDeletion(opCtx, collection, scopedCollectionMetadata->getKeyPattern(), maxToDelete); - if (numDocumentsDeleted <= 0) { - metadataManager.removeRangeToClean(_rangeInProgress.get()); - _rangeInProgress = boost::none; - return metadataManager.hasRangesToClean(); - } + if (!wrote.isOK() || wrote.getValue() == 0) { + stdx::lock_guard<stdx::mutex> scopedLock(css->_metadataManager._managerLock); + self->_pop(wrote.getStatus()); + return true; + } + } // drop scopedCollectionMetadata + } // drop autoColl + + invariant(range); + invariantOK(wrote.getStatus()); + invariant(wrote.getValue() > 0); + + log() << "Deleted " << wrote.getValue() << " documents in " << nss.ns() << " range " << *range; + + // Wait for replication outside the lock + const auto clientOpTime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); + WriteConcernResult unusedWCResult; + Status status = Status::OK(); + try { + status = waitForWriteConcern(opCtx, clientOpTime, kMajorityWriteConcern, &unusedWCResult); + } catch (const DBException& e) { + status = e.toStatus(); } - // wait for replication - WriteConcernResult wcResult; - auto currentClientOpTime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); - Status status = - waitForWriteConcern(opCtx, currentClientOpTime, kMajorityWriteConcern, &wcResult); if (!status.isOK()) { - warning() << "Error when waiting for write concern after removing chunks in " << _nss - << " : " << status.reason(); + warning() << "Error when waiting for write concern after removing " << nss << " range " + << *range << " : " << status.reason(); } return true; } -int CollectionRangeDeleter::_doDeletion(OperationContext* opCtx, - Collection* collection, - const BSONObj& keyPattern, - int maxToDelete) { - invariant(_rangeInProgress); - invariant(collection); +StatusWith<int> CollectionRangeDeleter::_doDeletion(OperationContext* opCtx, + Collection* collection, + BSONObj const& keyPattern, + ChunkRange const& range, + int maxToDelete) { + invariant(collection != nullptr); + invariant(!isEmpty()); + + auto const& nss = collection->ns(); // The IndexChunk has a keyPattern that may apply to more than one index - we need to // select the index and get the full index keyPattern here. - const IndexDescriptor* idx = - collection->getIndexCatalog()->findShardKeyPrefixedIndex(opCtx, keyPattern, false); + auto catalog = collection->getIndexCatalog(); + const IndexDescriptor* idx = catalog->findShardKeyPrefixedIndex(opCtx, keyPattern, false); if (idx == NULL) { - warning() << "Unable to find shard key index for " << keyPattern.toString() << " in " - << _nss; - return -1; + std::string msg = str::stream() << "Unable to find shard key index for " + << keyPattern.toString() << " in " << nss.ns(); + log() << msg; + return {ErrorCodes::InternalError, msg}; } - KeyPattern indexKeyPattern(idx->keyPattern().getOwned()); - // Extend bounds to match the index we found - const BSONObj& min = - Helpers::toKeyFormat(indexKeyPattern.extendRangeBound(_rangeInProgress->getMin(), false)); - const BSONObj& max = - Helpers::toKeyFormat(indexKeyPattern.extendRangeBound(_rangeInProgress->getMax(), false)); + KeyPattern indexKeyPattern(idx->keyPattern().getOwned()); + auto extend = [&](auto& key) { + return Helpers::toKeyFormat(indexKeyPattern.extendRangeBound(key, false)); + }; + const BSONObj& min = extend(range.getMin()); + const BSONObj& max = extend(range.getMax()); - LOG(1) << "begin removal of " << min << " to " << max << " in " << _nss; + LOG(1) << "begin removal of " << min << " to " << max << " in " << nss.ns(); auto indexName = idx->indexName(); - IndexDescriptor* desc = collection->getIndexCatalog()->findIndexByName(opCtx, indexName); - if (!desc) { - warning() << "shard key index with name " << indexName << " on '" << _nss - << "' was dropped"; - return -1; + IndexDescriptor* descriptor = collection->getIndexCatalog()->findIndexByName(opCtx, indexName); + if (!descriptor) { + std::string msg = str::stream() << "shard key index with name " << indexName << " on '" + << nss.ns() << "' was dropped"; + log() << msg; + return {ErrorCodes::InternalError, msg}; + } + + boost::optional<Helpers::RemoveSaver> saver; + if (serverGlobalParams.moveParanoia) { + saver.emplace("moveChunk", nss.ns(), "cleaning"); } int numDeleted = 0; do { - auto exec = InternalPlanner::indexScan(opCtx, - collection, - desc, - min, - max, - BoundInclusion::kIncludeStartKeyOnly, - PlanExecutor::NO_YIELD, - InternalPlanner::FORWARD, - InternalPlanner::IXSCAN_FETCH); + auto halfOpen = BoundInclusion::kIncludeStartKeyOnly; + auto manual = PlanExecutor::YIELD_MANUAL; + auto forward = InternalPlanner::FORWARD; + auto fetch = InternalPlanner::IXSCAN_FETCH; + + auto exec = InternalPlanner::indexScan( + opCtx, collection, descriptor, min, max, halfOpen, manual, forward, fetch); + RecordId rloc; BSONObj obj; PlanExecutor::ExecState state = exec->getNext(&obj, &rloc); @@ -193,23 +217,71 @@ int CollectionRangeDeleter::_doDeletion(OperationContext* opCtx, if (state == PlanExecutor::FAILURE || state == PlanExecutor::DEAD) { warning(LogComponent::kSharding) << PlanExecutor::statestr(state) << " - cursor error while trying to delete " << min - << " to " << max << " in " << _nss << ": " << WorkingSetCommon::toStatusString(obj) + << " to " << max << " in " << nss << ": " << WorkingSetCommon::toStatusString(obj) << ", stats: " << Explain::getWinningPlanStats(exec.get()); break; } - invariant(PlanExecutor::ADVANCED == state); - WriteUnitOfWork wuow(opCtx); - if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(opCtx, _nss)) { - warning() << "stepped down from primary while deleting chunk; orphaning data in " - << _nss << " in range [" << min << ", " << max << ")"; - break; + { + WriteUnitOfWork wuow(opCtx); + if (saver) { + saver->goingToDelete(obj); + } + collection->deleteDocument(opCtx, rloc, nullptr, true); + + wuow.commit(); } - OpDebug* const nullOpDebug = nullptr; - collection->deleteDocument(opCtx, rloc, nullOpDebug, true); - wuow.commit(); } while (++numDeleted < maxToDelete); + return numDeleted; } +auto CollectionRangeDeleter::overlaps(ChunkRange const& range) const -> DeleteNotification { + // start search with newest entries by using reverse iterators + auto it = find_if(_orphans.rbegin(), _orphans.rend(), [&](auto& cleanee) { + return bool(cleanee.range.overlapWith(range)); + }); + return it != _orphans.rend() ? it->notification : DeleteNotification(); +} + +void CollectionRangeDeleter::add(ChunkRange const& range) { + // We ignore the case of overlapping, or even equal, ranges. + // Deleting overlapping ranges is quick. + _orphans.emplace_back(Deletion{ChunkRange(range.getMin().getOwned(), range.getMax().getOwned()), + std::make_shared<Notification<Status>>()}); +} + +void CollectionRangeDeleter::append(BSONObjBuilder* builder) const { + BSONArrayBuilder arr(builder->subarrayStart("rangesToClean")); + for (auto const& entry : _orphans) { + BSONObjBuilder obj; + entry.range.append(&obj); + arr.append(obj.done()); + } + arr.done(); +} + +size_t CollectionRangeDeleter::size() const { + return _orphans.size(); +} + +bool CollectionRangeDeleter::isEmpty() const { + return _orphans.empty(); +} + +void CollectionRangeDeleter::clear(Status status) { + for (auto& range : _orphans) { + if (*(range.notification)) { + continue; // was triggered in the test driver + } + range.notification->set(status); // wake up anything still waiting + } + _orphans.clear(); +} + +void CollectionRangeDeleter::_pop(Status result) { + _orphans.front().notification->set(result); // wake up waitForClean + _orphans.pop_front(); +} + } // namespace mongo diff --git a/src/mongo/db/s/collection_range_deleter.h b/src/mongo/db/s/collection_range_deleter.h index f611215a73d..e5098d50dac 100644 --- a/src/mongo/db/s/collection_range_deleter.h +++ b/src/mongo/db/s/collection_range_deleter.h @@ -29,7 +29,9 @@ #include "mongo/base/disallow_copying.h" #include "mongo/db/namespace_string.h" +#include "mongo/executor/task_executor.h" #include "mongo/s/catalog/type_chunk.h" +#include "mongo/util/concurrency/notification.h" namespace mongo { @@ -41,40 +43,99 @@ class CollectionRangeDeleter { MONGO_DISALLOW_COPYING(CollectionRangeDeleter); public: - CollectionRangeDeleter(NamespaceString nss); + // + // All of the following members must be called only while the containing MetadataManager's lock + // is held (or in its destructor), except cleanUpNextRange. + // /** - * Starts deleting ranges and cleans up this object when it is finished. + * Normally, construct with the collection name and ShardingState's dedicated executor. */ - void run(); + CollectionRangeDeleter() = default; + ~CollectionRangeDeleter(); + + using DeleteNotification = std::shared_ptr<Notification<Status>>; + + /** + * Adds a new range to be cleaned up by the cleaner thread. + */ + void add(const ChunkRange& range); + + /** + * Reports whether the argument range overlaps any of the ranges to clean. If there is overlap, + * it returns a notification that will be completed when the currently newest overlapping + * range is no longer scheduled. Its value indicates whether it has been successfully removed. + * If there is no overlap, the result is nullptr. After a successful removal, the caller + * should call again to ensure no other range overlaps the argument. + * (See CollectionShardingState::waitForClean and MetadataManager::trackOrphanedDataCleanup for + * an example use.) + */ + DeleteNotification overlaps(ChunkRange const& range) const; + + /** + * Reports the number of ranges remaining to be cleaned up. + */ + size_t size() const; + + bool isEmpty() const; + + /* + * Notify with the specified status anything waiting on ranges scheduled, before discarding the + * ranges and notifications. + */ + void clear(Status); + + /* + * Append a representation of self to the specified builder. + */ + void append(BSONObjBuilder* builder) const; /** - * Acquires the collection IX lock and checks whether there are new entries for the collection's - * rangesToClean structure. If there are, deletes up to maxToDelete entries and yields using - * the standard query yielding logic. + * If any ranges are scheduled to clean, deletes up to maxToDelete documents, notifying watchers + * of ranges as they are done being deleted. It performs its own collection locking so it must + * be called without locks. * - * Returns true if there are more entries in rangesToClean, false if there is no more progress - * to be made. + * The 'rangeDeleterForTestOnly' is used as a utility for unit-tests that directly test the + * CollectionRangeDeleter class so they do not need to set up + * CollectionShardingState/MetadataManager. + * + * Returns true if it should be scheduled to run again because there is more data to be deleted + * or false otherwise. */ - bool cleanupNextRange(OperationContext* opCtx, int maxToDelete); + static bool cleanUpNextRange(OperationContext*, + NamespaceString const& nss, + int maxToDelete, + CollectionRangeDeleter* rangeDeleterForTestOnly = nullptr); private: /** - * Performs the deletion of up to maxToDelete entries within the range in progress. - * This function will invariant if called while _rangeInProgress is not set. + * Performs the deletion of up to maxToDelete entries within the range in progress. Must be + * called under the collection lock. * - * Returns the number of documents deleted (0 if deletion is finished), or -1 for error. + * Returns the number of documents deleted, 0 if done with the range, or bad status if deleting + * the range failed. */ - int _doDeletion(OperationContext* opCtx, - Collection* collection, - const BSONObj& keyPattern, - int maxToDelete); + StatusWith<int> _doDeletion(OperationContext* opCtx, + Collection* collection, + const BSONObj& keyPattern, + ChunkRange const& range, + int maxToDelete); - NamespaceString _nss; + /** + * Removes the latest-scheduled range from the ranges to be cleaned up, and notifies any + * interested callers of this->overlaps(range) with specified status. + */ + void _pop(Status status); - // Holds a range for which deletion has begun. If empty, then a new range - // must be requested from rangesToClean - boost::optional<ChunkRange> _rangeInProgress; + /** + * Ranges scheduled for deletion. The front of the list will be in active process of deletion. + * As each range is completed, its notification is signaled before it is popped. + */ + struct Deletion { + ChunkRange const range; + DeleteNotification const notification; + }; + std::list<Deletion> _orphans; }; } // namespace mongo diff --git a/src/mongo/db/s/collection_range_deleter_test.cpp b/src/mongo/db/s/collection_range_deleter_test.cpp index 2927379ff1c..a5b4813d5e6 100644 --- a/src/mongo/db/s/collection_range_deleter_test.cpp +++ b/src/mongo/db/s/collection_range_deleter_test.cpp @@ -30,55 +30,84 @@ #include "mongo/db/s/collection_range_deleter.h" +#include "mongo/bson/bsonobj.h" +#include "mongo/bson/bsonobjbuilder.h" #include "mongo/client/query.h" +#include "mongo/client/remote_command_targeter_mock.h" #include "mongo/db/client.h" #include "mongo/db/db_raii.h" #include "mongo/db/dbdirectclient.h" +#include "mongo/db/index/index_descriptor.h" #include "mongo/db/keypattern.h" #include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/db/repl/replication_coordinator_mock.h" #include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/s/sharding_state.h" #include "mongo/db/service_context_d_test_fixture.h" +#include "mongo/s/catalog/dist_lock_catalog_impl.h" +#include "mongo/s/catalog/dist_lock_manager_mock.h" +#include "mongo/s/catalog/sharding_catalog_client_mock.h" #include "mongo/s/chunk_version.h" +#include "mongo/s/client/shard_registry.h" +#include "mongo/s/sharding_mongod_test_fixture.h" namespace mongo { using unittest::assertGet; - -const NamespaceString kNamespaceString = NamespaceString("foo", "bar"); +const NamespaceString kNss = NamespaceString("foo", "bar"); const std::string kPattern = "_id"; const BSONObj kKeyPattern = BSON(kPattern << 1); +const std::string kShardName{"a"}; +const HostAndPort dummyHost("dummy", 123); -class CollectionRangeDeleterTest : public ServiceContextMongoDTest { +class CollectionRangeDeleterTest : public ShardingMongodTestFixture { protected: - ServiceContext::UniqueOperationContext _opCtx; - MetadataManager* _metadataManager; - std::unique_ptr<DBDirectClient> _dbDirectClient; + bool next(CollectionRangeDeleter& rangeDeleter, int maxToDelete) { + return CollectionRangeDeleter::cleanUpNextRange( + operationContext(), kNss, maxToDelete, &rangeDeleter); + } + std::shared_ptr<RemoteCommandTargeterMock> configTargeter() { + return RemoteCommandTargeterMock::get(shardRegistry()->getConfigShard()->getTargeter()); + } - OperationContext* operationContext(); private: void setUp() override; void tearDown() override; + + std::unique_ptr<DistLockCatalog> makeDistLockCatalog(ShardRegistry* shardRegistry) override { + invariant(shardRegistry); + return stdx::make_unique<DistLockCatalogImpl>(shardRegistry); + } + + std::unique_ptr<DistLockManager> makeDistLockManager( + std::unique_ptr<DistLockCatalog> distLockCatalog) override { + return stdx::make_unique<DistLockManagerMock>(std::move(distLockCatalog)); + } + + std::unique_ptr<ShardingCatalogClient> makeShardingCatalogClient( + std::unique_ptr<DistLockManager> distLockManager) override { + return stdx::make_unique<ShardingCatalogClientMock>(std::move(distLockManager)); + } }; void CollectionRangeDeleterTest::setUp() { - ServiceContextMongoDTest::setUp(); - const repl::ReplSettings replSettings = {}; - repl::setGlobalReplicationCoordinator( - new repl::ReplicationCoordinatorMock(getServiceContext(), replSettings)); - repl::getGlobalReplicationCoordinator()->setFollowerMode(repl::MemberState::RS_PRIMARY); - _opCtx = getServiceContext()->makeOperationContext(&cc()); - _dbDirectClient = stdx::make_unique<DBDirectClient>(operationContext()); + serverGlobalParams.clusterRole = ClusterRole::ShardServer; + ShardingMongodTestFixture::setUp(); + replicationCoordinator()->alwaysAllowWrites(true); + initializeGlobalShardingStateForMongodForTest(ConnectionString(dummyHost)); + // RemoteCommandTargeterMock::get(shardRegistry()->getConfigShard()->getTargeter()) + // ->setConnectionStringReturnValue(kConfigConnStr); + + configTargeter()->setFindHostReturnValue(dummyHost); + + DBDirectClient(operationContext()).createCollection(kNss.ns()); { + AutoGetCollection autoColl(operationContext(), kNss, MODE_IX); + auto collectionShardingState = CollectionShardingState::get(operationContext(), kNss); const OID epoch = OID::gen(); - - AutoGetCollection autoColl(operationContext(), kNamespaceString, MODE_IX); - auto collectionShardingState = - CollectionShardingState::get(operationContext(), kNamespaceString); collectionShardingState->refreshMetadata( operationContext(), stdx::make_unique<CollectionMetadata>( @@ -86,208 +115,127 @@ void CollectionRangeDeleterTest::setUp() { ChunkVersion(1, 0, epoch), ChunkVersion(0, 0, epoch), SimpleBSONObjComparator::kInstance.makeBSONObjIndexedMap<CachedChunkInfo>())); - _metadataManager = collectionShardingState->getMetadataManagerForTest(); } } void CollectionRangeDeleterTest::tearDown() { { - AutoGetCollection autoColl(operationContext(), kNamespaceString, MODE_IX); - auto collectionShardingState = - CollectionShardingState::get(operationContext(), kNamespaceString); + AutoGetCollection autoColl(operationContext(), kNss, MODE_IX); + auto collectionShardingState = CollectionShardingState::get(operationContext(), kNss); collectionShardingState->refreshMetadata(operationContext(), nullptr); } - _dbDirectClient.reset(); - _opCtx.reset(); - ServiceContextMongoDTest::tearDown(); - repl::setGlobalReplicationCoordinator(nullptr); -} - -OperationContext* CollectionRangeDeleterTest::operationContext() { - invariant(_opCtx); - return _opCtx.get(); + ShardingMongodTestFixture::tearDown(); } namespace { // Tests the case that there is nothing in the database. TEST_F(CollectionRangeDeleterTest, EmptyDatabase) { - CollectionRangeDeleter rangeDeleter(kNamespaceString); - ASSERT_FALSE(rangeDeleter.cleanupNextRange(operationContext(), 1)); + CollectionRangeDeleter rangeDeleter; + ASSERT_FALSE(next(rangeDeleter, 1)); } // Tests the case that there is data, but it is not in a range to clean. TEST_F(CollectionRangeDeleterTest, NoDataInGivenRangeToClean) { - CollectionRangeDeleter rangeDeleter(kNamespaceString); + CollectionRangeDeleter rangeDeleter; const BSONObj insertedDoc = BSON(kPattern << 25); + DBDirectClient dbclient(operationContext()); + dbclient.insert(kNss.toString(), insertedDoc); + ASSERT_BSONOBJ_EQ(insertedDoc, dbclient.findOne(kNss.toString(), QUERY(kPattern << 25))); - _dbDirectClient->insert(kNamespaceString.toString(), insertedDoc); - ASSERT_BSONOBJ_EQ(insertedDoc, - _dbDirectClient->findOne(kNamespaceString.toString(), QUERY(kPattern << 25))); + rangeDeleter.add(ChunkRange(BSON(kPattern << 0), BSON(kPattern << 10))); + ASSERT_TRUE(next(rangeDeleter, 1)); - _metadataManager->addRangeToClean(ChunkRange(BSON(kPattern << 0), BSON(kPattern << 10))); - ASSERT_FALSE(rangeDeleter.cleanupNextRange(operationContext(), 1)); + ASSERT_BSONOBJ_EQ(insertedDoc, dbclient.findOne(kNss.toString(), QUERY(kPattern << 25))); - ASSERT_BSONOBJ_EQ(insertedDoc, - _dbDirectClient->findOne(kNamespaceString.toString(), QUERY(kPattern << 25))); + ASSERT_FALSE(next(rangeDeleter, 1)); } // Tests the case that there is a single document within a range to clean. TEST_F(CollectionRangeDeleterTest, OneDocumentInOneRangeToClean) { - CollectionRangeDeleter rangeDeleter(kNamespaceString); + CollectionRangeDeleter rangeDeleter; const BSONObj insertedDoc = BSON(kPattern << 5); + DBDirectClient dbclient(operationContext()); + dbclient.insert(kNss.toString(), BSON(kPattern << 5)); + ASSERT_BSONOBJ_EQ(insertedDoc, dbclient.findOne(kNss.toString(), QUERY(kPattern << 5))); - _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 5)); - ASSERT_BSONOBJ_EQ(insertedDoc, - _dbDirectClient->findOne(kNamespaceString.toString(), QUERY(kPattern << 5))); - - _metadataManager->addRangeToClean(ChunkRange(BSON(kPattern << 0), BSON(kPattern << 10))); + rangeDeleter.add(ChunkRange(BSON(kPattern << 0), BSON(kPattern << 10))); - ASSERT_TRUE(rangeDeleter.cleanupNextRange(operationContext(), 1)); - ASSERT_TRUE( - _dbDirectClient->findOne(kNamespaceString.toString(), QUERY(kPattern << 5)).isEmpty()); - - ASSERT_FALSE(rangeDeleter.cleanupNextRange(operationContext(), 1)); + ASSERT_TRUE(next(rangeDeleter, 1)); + ASSERT_TRUE(next(rangeDeleter, 1)); + ASSERT_TRUE(dbclient.findOne(kNss.toString(), QUERY(kPattern << 5)).isEmpty()); + ASSERT_FALSE(next(rangeDeleter, 1)); } // Tests the case that there are multiple documents within a range to clean. TEST_F(CollectionRangeDeleterTest, MultipleDocumentsInOneRangeToClean) { - CollectionRangeDeleter rangeDeleter(kNamespaceString); - _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 1)); - _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 2)); - _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 3)); - ASSERT_EQUALS(3ULL, - _dbDirectClient->count(kNamespaceString.toString(), BSON(kPattern << LT << 5))); - - _metadataManager->addRangeToClean(ChunkRange(BSON(kPattern << 0), BSON(kPattern << 10))); - - ASSERT_TRUE(rangeDeleter.cleanupNextRange(operationContext(), 100)); - ASSERT_EQUALS(0ULL, - _dbDirectClient->count(kNamespaceString.toString(), BSON(kPattern << LT << 5))); - - ASSERT_FALSE(rangeDeleter.cleanupNextRange(operationContext(), 100)); + CollectionRangeDeleter rangeDeleter; + DBDirectClient dbclient(operationContext()); + dbclient.insert(kNss.toString(), BSON(kPattern << 1)); + dbclient.insert(kNss.toString(), BSON(kPattern << 2)); + dbclient.insert(kNss.toString(), BSON(kPattern << 3)); + ASSERT_EQUALS(3ULL, dbclient.count(kNss.toString(), BSON(kPattern << LT << 5))); + + rangeDeleter.add(ChunkRange(BSON(kPattern << 0), BSON(kPattern << 10))); + + ASSERT_TRUE(next(rangeDeleter, 100)); + ASSERT_TRUE(next(rangeDeleter, 100)); + ASSERT_EQUALS(0ULL, dbclient.count(kNss.toString(), BSON(kPattern << LT << 5))); + ASSERT_FALSE(next(rangeDeleter, 100)); } // Tests the case that there are multiple documents within a range to clean, and the range deleter // has a max deletion rate of one document per run. -TEST_F(CollectionRangeDeleterTest, MultipleCleanupNextRangeCallsToCleanOneRange) { - CollectionRangeDeleter rangeDeleter(kNamespaceString); - _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 1)); - _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 2)); - _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 3)); - ASSERT_EQUALS(3ULL, - _dbDirectClient->count(kNamespaceString.toString(), BSON(kPattern << LT << 5))); - - _metadataManager->addRangeToClean(ChunkRange(BSON(kPattern << 0), BSON(kPattern << 10))); - - ASSERT_TRUE(rangeDeleter.cleanupNextRange(operationContext(), 1)); - ASSERT_EQUALS(2ULL, - _dbDirectClient->count(kNamespaceString.toString(), BSON(kPattern << LT << 5))); - - ASSERT_TRUE(rangeDeleter.cleanupNextRange(operationContext(), 1)); - ASSERT_EQUALS(1ULL, - _dbDirectClient->count(kNamespaceString.toString(), BSON(kPattern << LT << 5))); - - ASSERT_TRUE(rangeDeleter.cleanupNextRange(operationContext(), 1)); - ASSERT_EQUALS(0ULL, - _dbDirectClient->count(kNamespaceString.toString(), BSON(kPattern << LT << 5))); - - ASSERT_FALSE(rangeDeleter.cleanupNextRange(operationContext(), 1)); +TEST_F(CollectionRangeDeleterTest, MultipleCleanupNextRangeCalls) { + CollectionRangeDeleter rangeDeleter; + DBDirectClient dbclient(operationContext()); + dbclient.insert(kNss.toString(), BSON(kPattern << 1)); + dbclient.insert(kNss.toString(), BSON(kPattern << 2)); + dbclient.insert(kNss.toString(), BSON(kPattern << 3)); + ASSERT_EQUALS(3ULL, dbclient.count(kNss.toString(), BSON(kPattern << LT << 5))); + + rangeDeleter.add(ChunkRange(BSON(kPattern << 0), BSON(kPattern << 10))); + + ASSERT_TRUE(next(rangeDeleter, 1)); + ASSERT_EQUALS(2ULL, dbclient.count(kNss.toString(), BSON(kPattern << LT << 5))); + + ASSERT_TRUE(next(rangeDeleter, 1)); + ASSERT_EQUALS(1ULL, dbclient.count(kNss.toString(), BSON(kPattern << LT << 5))); + + ASSERT_TRUE(next(rangeDeleter, 1)); + ASSERT_TRUE(next(rangeDeleter, 1)); + ASSERT_EQUALS(0ULL, dbclient.count(kNss.toString(), BSON(kPattern << LT << 5))); + ASSERT_FALSE(next(rangeDeleter, 1)); } // Tests the case that there are two ranges to clean, each containing multiple documents. TEST_F(CollectionRangeDeleterTest, MultipleDocumentsInMultipleRangesToClean) { - CollectionRangeDeleter rangeDeleter(kNamespaceString); - _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 1)); - _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 2)); - _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 3)); - _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 4)); - _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 5)); - _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 6)); - ASSERT_EQUALS(6ULL, - _dbDirectClient->count(kNamespaceString.toString(), BSON(kPattern << LT << 10))); + CollectionRangeDeleter rangeDeleter; + DBDirectClient dbclient(operationContext()); + dbclient.insert(kNss.toString(), BSON(kPattern << 1)); + dbclient.insert(kNss.toString(), BSON(kPattern << 2)); + dbclient.insert(kNss.toString(), BSON(kPattern << 3)); + dbclient.insert(kNss.toString(), BSON(kPattern << 4)); + dbclient.insert(kNss.toString(), BSON(kPattern << 5)); + dbclient.insert(kNss.toString(), BSON(kPattern << 6)); + ASSERT_EQUALS(6ULL, dbclient.count(kNss.toString(), BSON(kPattern << LT << 10))); const ChunkRange chunkRange1 = ChunkRange(BSON(kPattern << 0), BSON(kPattern << 4)); const ChunkRange chunkRange2 = ChunkRange(BSON(kPattern << 4), BSON(kPattern << 7)); - _metadataManager->addRangeToClean(chunkRange1); - _metadataManager->addRangeToClean(chunkRange2); - - ASSERT_TRUE(rangeDeleter.cleanupNextRange(operationContext(), 100)); - ASSERT_EQUALS(0ULL, - _dbDirectClient->count(kNamespaceString.toString(), BSON(kPattern << LT << 4))); - ASSERT_EQUALS(3ULL, - _dbDirectClient->count(kNamespaceString.toString(), BSON(kPattern << LT << 10))); - - ASSERT_TRUE(rangeDeleter.cleanupNextRange(operationContext(), 100)); - ASSERT_TRUE(rangeDeleter.cleanupNextRange(operationContext(), 100)); - ASSERT_EQUALS(0ULL, - _dbDirectClient->count(kNamespaceString.toString(), BSON(kPattern << LT << 10))); - - ASSERT_FALSE(rangeDeleter.cleanupNextRange(operationContext(), 1)); -} - -// Tests the case that there is a range to clean, and halfway through a deletion a chunk -// within the range is received -TEST_F(CollectionRangeDeleterTest, MultipleCallstoCleanupNextRangeWithChunkReceive) { - CollectionRangeDeleter rangeDeleter(kNamespaceString); - _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 1)); - _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 2)); - _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 3)); - _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 4)); - const BSONObj insertedDoc5 = BSON(kPattern << 5); // not to be deleted - _dbDirectClient->insert(kNamespaceString.toString(), insertedDoc5); - ASSERT_EQUALS(5ULL, - _dbDirectClient->count(kNamespaceString.toString(), BSON(kPattern << LT << 10))); - - _metadataManager->addRangeToClean(ChunkRange(BSON(kPattern << 0), BSON(kPattern << 10))); - - ASSERT_TRUE(rangeDeleter.cleanupNextRange(operationContext(), 2)); - ASSERT_EQUALS(3ULL, - _dbDirectClient->count(kNamespaceString.toString(), BSON(kPattern << LT << 10))); - - _metadataManager->beginReceive(ChunkRange(BSON(kPattern << 5), BSON(kPattern << 6))); - - // insertedDoc5 is no longer eligible for deletion - - ASSERT_TRUE(rangeDeleter.cleanupNextRange(operationContext(), 2)); - ASSERT_EQUALS(1ULL, - _dbDirectClient->count(kNamespaceString.toString(), BSON(kPattern << LT << 10))); - - ASSERT_TRUE(rangeDeleter.cleanupNextRange(operationContext(), 2)); - ASSERT_EQUALS(1ULL, - _dbDirectClient->count(kNamespaceString.toString(), BSON(kPattern << LT << 10))); - - ASSERT_FALSE(rangeDeleter.cleanupNextRange(operationContext(), 2)); - - ASSERT_BSONOBJ_EQ( - insertedDoc5, - _dbDirectClient->findOne(kNamespaceString.toString(), QUERY(kPattern << GT << 0))); -} - -TEST_F(CollectionRangeDeleterTest, CalltoCleanupNextRangeWithChunkReceive) { - CollectionRangeDeleter rangeDeleter(kNamespaceString); - _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 1)); - _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 2)); - _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 3)); - _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 4)); - ASSERT_EQUALS(4ULL, - _dbDirectClient->count(kNamespaceString.toString(), BSON(kPattern << LT << 10))); - - _metadataManager->addRangeToClean(ChunkRange(BSON(kPattern << 0), BSON(kPattern << 10))); - - ASSERT_TRUE(rangeDeleter.cleanupNextRange(operationContext(), 2)); - ASSERT_EQUALS(2ULL, - _dbDirectClient->count(kNamespaceString.toString(), BSON(kPattern << LT << 10))); - - _metadataManager->beginReceive(ChunkRange(BSON(kPattern << 0), BSON(kPattern << 10))); - - ASSERT_FALSE(rangeDeleter.cleanupNextRange(operationContext(), 2)); - - ASSERT_EQUALS(2ULL, - _dbDirectClient->count(kNamespaceString.toString(), BSON(kPattern << LT << 10))); + rangeDeleter.add(chunkRange1); + rangeDeleter.add(chunkRange2); + + ASSERT_TRUE(next(rangeDeleter, 100)); + ASSERT_EQUALS(0ULL, dbclient.count(kNss.toString(), BSON(kPattern << LT << 4))); + ASSERT_EQUALS(3ULL, dbclient.count(kNss.toString(), BSON(kPattern << LT << 10))); + + ASSERT_TRUE(next(rangeDeleter, 100)); // discover there are no more < 4, pop range 1 + ASSERT_TRUE(next(rangeDeleter, 100)); // delete the remaining documents + ASSERT_TRUE(next(rangeDeleter, 1)); // discover there are no more, pop range 2 + ASSERT_EQUALS(0ULL, dbclient.count(kNss.toString(), BSON(kPattern << LT << 10))); + ASSERT_FALSE(next(rangeDeleter, 1)); // discover there are no more ranges } } // unnamed namespace - } // namespace mongo diff --git a/src/mongo/db/s/collection_sharding_state.cpp b/src/mongo/db/s/collection_sharding_state.cpp index 5b4b39cb33f..4bb86031fc4 100644 --- a/src/mongo/db/s/collection_sharding_state.cpp +++ b/src/mongo/db/s/collection_sharding_state.cpp @@ -34,6 +34,7 @@ #include "mongo/db/client.h" #include "mongo/db/concurrency/lock_state.h" +#include "mongo/db/db_raii.h" #include "mongo/db/operation_context.h" #include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/db/s/collection_metadata.h" @@ -84,7 +85,8 @@ private: } // unnamed namespace CollectionShardingState::CollectionShardingState(ServiceContext* sc, NamespaceString nss) - : _nss(std::move(nss)), _metadataManager{sc, _nss} {} + : _nss(std::move(nss)), + _metadataManager{sc, _nss, ShardingState::get(sc)->getRangeDeleterTaskExecutor()} {} CollectionShardingState::~CollectionShardingState() { invariant(!_sourceMgr); @@ -119,14 +121,18 @@ void CollectionShardingState::markNotShardedAtStepdown() { _metadataManager.refreshActiveMetadata(nullptr); } -void CollectionShardingState::beginReceive(const ChunkRange& range) { - _metadataManager.beginReceive(range); +bool CollectionShardingState::beginReceive(ChunkRange const& range) { + return _metadataManager.beginReceive(range); } void CollectionShardingState::forgetReceive(const ChunkRange& range) { _metadataManager.forgetReceive(range); } +Status CollectionShardingState::cleanUpRange(ChunkRange const& range) { + return _metadataManager.cleanUpRange(range); +} + MigrationSourceManager* CollectionShardingState::getMigrationSourceManager() { return _sourceMgr; } @@ -172,6 +178,52 @@ bool CollectionShardingState::collectionIsSharded() { return true; } +// Call with collection unlocked. Note that the CollectionShardingState object involved might not +// exist anymore at the time of the call, or indeed anytime outside the AutoGetCollection block, so +// anything that might alias something in it must be copied first. + +/* static */ +Status CollectionShardingState::waitForClean(OperationContext* opCtx, + NamespaceString nss, + OID const& epoch, + ChunkRange orphanRange) { + do { + auto stillScheduled = CollectionShardingState::CleanupNotification(nullptr); + { + AutoGetCollection autoColl(opCtx, nss, MODE_IX); + // First, see if collection was dropped. + auto css = CollectionShardingState::get(opCtx, nss); + { + auto metadata = css->_metadataManager.getActiveMetadata(); + if (!metadata || metadata->getCollVersion().epoch() != epoch) { + return {ErrorCodes::StaleShardVersion, "Collection being migrated was dropped"}; + } + } // drop metadata + stillScheduled = css->_metadataManager.trackOrphanedDataCleanup(orphanRange); + if (stillScheduled == nullptr) { + log() << "Finished deleting " << nss.ns() << " range " + << redact(orphanRange.toString()); + return Status::OK(); + } + } // drop collection lock + + log() << "Waiting for deletion of " << nss.ns() << " range " << orphanRange; + Status result = stillScheduled->get(opCtx); + if (!result.isOK()) { + return {result.code(), + str::stream() << "Failed to delete orphaned " << nss.ns() << " range " + << redact(orphanRange.toString()) + << ": " + << redact(result.reason())}; + } + } while (true); + MONGO_UNREACHABLE; +} + +boost::optional<KeyRange> CollectionShardingState::getNextOrphanRange(BSONObj const& from) { + return _metadataManager.getNextOrphanRange(from); +} + bool CollectionShardingState::isDocumentInMigratingChunk(OperationContext* opCtx, const BSONObj& doc) { dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX)); @@ -312,8 +364,7 @@ bool CollectionShardingState::_checkShardVersionOk(OperationContext* opCtx, if (!info) { // There is no shard version information on either 'opCtx' or 'client'. This means that // the operation represented by 'opCtx' is unversioned, and the shard version is always - // OK - // for unversioned operations. + // OK for unversioned operations. return true; } diff --git a/src/mongo/db/s/collection_sharding_state.h b/src/mongo/db/s/collection_sharding_state.h index 5bbc2b9c576..2ab648ad26c 100644 --- a/src/mongo/db/s/collection_sharding_state.h +++ b/src/mongo/db/s/collection_sharding_state.h @@ -34,7 +34,9 @@ #include "mongo/base/disallow_copying.h" #include "mongo/base/string_data.h" #include "mongo/db/namespace_string.h" +#include "mongo/db/s/collection_range_deleter.h" #include "mongo/db/s/metadata_manager.h" +#include "mongo/util/concurrency/notification.h" namespace mongo { @@ -85,11 +87,20 @@ public: static CollectionShardingState* get(OperationContext* opCtx, const std::string& ns); /** - * Returns the chunk metadata for the collection. + * Returns the chunk metadata for the collection. The metadata it represents lives as long as + * the object itself, and the collection, exist. After dropping the collection lock, the + * collection may no longer exist, but it is still safe to destroy the object. */ ScopedCollectionMetadata getMetadata(); /** + * BSON output of the pending metadata into a BSONArray + */ + void toBSONPending(BSONArrayBuilder& bb) const { + _metadataManager.toBSONPending(bb); + } + + /** * Updates the metadata based on changes received from the config server and also resolves the * pending receives map in case some of these pending receives have completed or have been * abandoned. If newMetadata is null, unshard the collection. @@ -105,20 +116,26 @@ public: void markNotShardedAtStepdown(); /** - * Modifies the collection's sharding state to indicate that it is beginning to receive the - * given ChunkRange. + * Schedules any documents in the range for immediate cleanup iff no running queries can depend + * on them, and adds the range to the list of pending ranges. Otherwise, returns false. Does + * not block. */ - void beginReceive(const ChunkRange& range); + bool beginReceive(ChunkRange const& range); /* - * Modifies the collection's sharding state to indicate that the previous pending migration - * failed. If the range was not previously pending, this function will crash the server. - * - * This function is the mirror image of beginReceive. + * Removes the range from the list of pending ranges, and schedules any documents in the range + * for immediate cleanup. Does not block. */ void forgetReceive(const ChunkRange& range); /** + * Schedules documents in the range for cleanup after any running queries that may depend on + * them have terminated. Does not block. Use waitForClean to block pending completion. + * Fails if range overlaps any current local shard chunk. + */ + Status cleanUpRange(ChunkRange const& range); + + /** * Returns the active migration source manager, if one is available. */ MigrationSourceManager* getMigrationSourceManager(); @@ -154,6 +171,28 @@ public: */ bool collectionIsSharded(); + /** + * Tracks deletion of any documents within the range, returning when deletion is complete. + * Throws if the collection is dropped while it sleeps. Call this with the collection unlocked. + */ + static Status waitForClean(OperationContext*, NamespaceString, OID const& epoch, ChunkRange); + + using CleanupNotification = MetadataManager::CleanupNotification; + /** + * Reports whether any part of the argument range is still scheduled for deletion. If not, + * returns nullptr. Otherwise, returns a notification n such that n->get(opCtx) will wake when + * deletion of a range (possibly the one of interest) is completed. This should be called + * again after each wakeup until it returns nullptr, because there can be more than one range + * scheduled for deletion that overlaps its argument. + */ + CleanupNotification trackOrphanedDataCleanup(ChunkRange const& range) const; + + /** + * Returns a range _not_ owned by this shard that starts no lower than the specified + * startingFrom key value, if any, or boost::none if there is no such range. + */ + boost::optional<KeyRange> getNextOrphanRange(BSONObj const& startingFrom); + // Replication subsystem hooks. If this collection is serving as a source for migration, these // methods inform it of any changes to its contents. @@ -167,11 +206,6 @@ public: void onDropCollection(OperationContext* opCtx, const NamespaceString& collectionName); - MetadataManager* getMetadataManagerForTest() { - return &_metadataManager; - } - - private: /** * Checks whether the shard version of the operation matches that of the collection. @@ -191,7 +225,7 @@ private: ChunkVersion* expectedShardVersion, ChunkVersion* actualShardVersion); - // Namespace to which this state belongs. + // Namespace this state belongs to. const NamespaceString _nss; // Contains all the metadata associated with this collection. @@ -204,7 +238,11 @@ private: // NOTE: The value is not owned by this class. MigrationSourceManager* _sourceMgr{nullptr}; - friend class CollectionRangeDeleter; + // for access to _metadataManager + friend bool CollectionRangeDeleter::cleanUpNextRange(OperationContext*, + NamespaceString const&, + int maxToDelete, + CollectionRangeDeleter*); }; } // namespace mongo diff --git a/src/mongo/db/s/get_shard_version_command.cpp b/src/mongo/db/s/get_shard_version_command.cpp index 81157fcc07a..68d7a4c70df 100644 --- a/src/mongo/db/s/get_shard_version_command.cpp +++ b/src/mongo/db/s/get_shard_version_command.cpp @@ -131,7 +131,7 @@ public: chunksArr.doneFast(); BSONArrayBuilder pendingArr(metadataBuilder.subarrayStart("pending")); - metadata->toBSONPending(pendingArr); + css->toBSONPending(pendingArr); pendingArr.doneFast(); } metadataBuilder.doneFast(); diff --git a/src/mongo/db/s/metadata_manager.cpp b/src/mongo/db/s/metadata_manager.cpp index faa062e9476..b58218685ba 100644 --- a/src/mongo/db/s/metadata_manager.cpp +++ b/src/mongo/db/s/metadata_manager.cpp @@ -33,36 +33,145 @@ #include "mongo/db/s/metadata_manager.h" #include "mongo/bson/simple_bsonobj_comparator.h" +#include "mongo/bson/util/builder.h" +#include "mongo/db/db_raii.h" +#include "mongo/db/query/internal_plans.h" #include "mongo/db/range_arithmetic.h" #include "mongo/db/s/collection_range_deleter.h" +#include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/s/sharding_state.h" #include "mongo/stdx/memory.h" +#include "mongo/util/assert_util.h" #include "mongo/util/log.h" +// MetadataManager exists only as a data member of a CollectionShardingState object. +// +// It maintains a set of std::shared_ptr<MetadataManager::Tracker> pointers: one in +// _activeMetadataTracker, and more in a list _metadataInUse. It also contains a +// CollectionRangeDeleter that queues orphan ranges to delete in a background thread, and a record +// of the ranges being migrated in, to avoid deleting them. +// +// Free-floating MetadataManager::Tracker objects are maintained by these pointers, and also by +// clients in ScopedCollectionMetadata objects obtained via CollectionShardingState::getMetadata(). +// +// A Tracker object keeps: +// a std::unique_ptr<CollectionMetadata>, owning a map of the chunks owned by the shard, +// a key range [min,max) of orphaned documents that may be deleted when the count goes to zero, +// a count of the ScopedCollectionMetadata objects that have pointers to it, +// a mutex lock, serializing access to: +// a pointer back to the MetadataManager object that created it. +// +// __________________________ +// (s): std::shared_ptr<> Clients:| ScopedCollectionMetadata | +// (u): std::unique_ptr<> | tracker (s)-----------+ +// ________________________________ |__________________________| | | +// | CollectionShardingState | | tracker (s)--------+ + +// | | |__________________________| | | | +// | ____________________________ | | tracker (s)----+ | | +// | | MetadataManager | | |_________________________| | | | +// | | | | ________________________ | | | +// | | _activeMetadataTracker (s)-------->| Tracker |<------+ | | (1 reference) +// | | | | | ______________________|_ | | +// | | [ (s),-------------->| Tracker | | | (0 references) +// | | (s),---------\ | | ______________________|_ | | +// | | _metadataInUse ... ] | | \----->| Tracker |<----+-+ (2 references) +// | | ________________________ | | | | | | ______________________ +// | | | CollectionRangeDeleter | | | | | | metadata (u)------------->| CollectionMetadata | +// | | | | | | | | | [ orphans [min,max) ] | | | +// | | | _orphans [ [min,max), | | | | | | usageCounter | | _chunksMap | +// | | | [min,max), | | | | | | trackerLock: | | _chunkVersion | +// | | | ... ] | |<--------------manager | | ... | +// | | | | | | |_| | | |______________________| +// | | |________________________| | | |_| | +// | | | | |________________________| +// +// A ScopedCollectionMetadata object is created and held during a query, and destroyed when the +// query no longer needs access to the collection. Its destructor decrements the Tracker's +// usageCounter. +// +// When a new chunk mapping replaces _activeMetadata, if any queries still depend on the current +// mapping, it is pushed onto the back of _metadataInUse. +// +// Trackers pointed to from _metadataInUse, and their associated CollectionMetadata, are maintained +// at least as long as any query holds a ScopedCollectionMetadata object referring to them, or to +// any older tracker. In the diagram above, the middle Tracker must be kept until the one below it +// is disposed of. (Note that _metadataInUse as shown here has its front() at the bottom, back() +// at the top. As usual, new entries are pushed onto the back, popped off the front.) + namespace mongo { -using CallbackArgs = executor::TaskExecutor::CallbackArgs; +using TaskExecutor = executor::TaskExecutor; +using CallbackArgs = TaskExecutor::CallbackArgs; + +struct MetadataManager::Tracker { + /** + * Creates a new Tracker with the usageCounter initialized to zero. + */ + Tracker(std::unique_ptr<CollectionMetadata>, MetadataManager*); + + std::unique_ptr<CollectionMetadata> metadata; + uint32_t usageCounter{0}; + boost::optional<ChunkRange> orphans{boost::none}; -MetadataManager::MetadataManager(ServiceContext* sc, NamespaceString nss) + // lock guards access to manager, which is zeroed by the ~MetadataManager(), but used by + // ScopedCollectionMetadata when usageCounter falls to zero. + stdx::mutex trackerLock; + MetadataManager* manager{nullptr}; +}; + +MetadataManager::MetadataManager(ServiceContext* sc, NamespaceString nss, TaskExecutor* executor) : _nss(std::move(nss)), _serviceContext(sc), - _activeMetadataTracker(stdx::make_unique<CollectionMetadataTracker>(nullptr)), + _activeMetadataTracker(std::make_shared<Tracker>(nullptr, this)), _receivingChunks(SimpleBSONObjComparator::kInstance.makeBSONObjIndexedMap<CachedChunkInfo>()), - _rangesToClean( - SimpleBSONObjComparator::kInstance.makeBSONObjIndexedMap<RangeToCleanDescriptor>()) {} + _notification(std::make_shared<Notification<Status>>()), + _executor(executor), + _rangesToClean() {} MetadataManager::~MetadataManager() { + { + stdx::lock_guard<stdx::mutex> scopedLock(_managerLock); + _shuttingDown = true; + } + std::list<std::shared_ptr<Tracker>> inUse; + { + // drain any threads that might remove _metadataInUse entries, push to deleter + stdx::lock_guard<stdx::mutex> scopedLock(_managerLock); + inUse = std::move(_metadataInUse); + } + + // Trackers can outlive MetadataManager, so we still need to lock each tracker... + std::for_each(inUse.begin(), inUse.end(), [](auto& tp) { + stdx::lock_guard<stdx::mutex> scopedLock(tp->trackerLock); + tp->manager = nullptr; + }); + { // ... and the active one too + stdx::lock_guard<stdx::mutex> scopedLock(_activeMetadataTracker->trackerLock); + _activeMetadataTracker->manager = nullptr; + } + + // still need to block the deleter thread: stdx::lock_guard<stdx::mutex> scopedLock(_managerLock); - invariant(!_activeMetadataTracker || _activeMetadataTracker->usageCounter == 0); + Status status{ErrorCodes::InterruptedDueToReplStateChange, + "tracking orphaned range deletion abandoned because the" + " collection was dropped or became unsharded"}; + if (!*_notification) { // check just because test driver triggers it + _notification->set(status); + } + _rangesToClean.clear(status); } ScopedCollectionMetadata MetadataManager::getActiveMetadata() { stdx::lock_guard<stdx::mutex> scopedLock(_managerLock); - if (!_activeMetadataTracker) { - return ScopedCollectionMetadata(); + if (_activeMetadataTracker) { + return ScopedCollectionMetadata(_activeMetadataTracker); } + return ScopedCollectionMetadata(); +} - return ScopedCollectionMetadata(this, _activeMetadataTracker.get()); +size_t MetadataManager::numberOfMetadataSnapshots() { + stdx::lock_guard<stdx::mutex> scopedLock(_managerLock); + return _metadataInUse.size(); } void MetadataManager::refreshActiveMetadata(std::unique_ptr<CollectionMetadata> remoteMetadata) { @@ -73,7 +182,7 @@ void MetadataManager::refreshActiveMetadata(std::unique_ptr<CollectionMetadata> // collection sharding information regardless of whether the node is sharded or not. if (!remoteMetadata && !_activeMetadataTracker->metadata) { invariant(_receivingChunks.empty()); - invariant(_rangesToClean.empty()); + invariant(_rangesToClean.isEmpty()); return; } @@ -83,8 +192,8 @@ void MetadataManager::refreshActiveMetadata(std::unique_ptr<CollectionMetadata> << _activeMetadataTracker->metadata->toStringBasic() << " as no longer sharded"; _receivingChunks.clear(); - _rangesToClean.clear(); - + _rangesToClean.clear(Status{ErrorCodes::InterruptedDueToReplStateChange, + "Collection sharding metadata destroyed"}); _setActiveMetadata_inlock(nullptr); return; } @@ -99,7 +208,7 @@ void MetadataManager::refreshActiveMetadata(std::unique_ptr<CollectionMetadata> << remoteMetadata->toStringBasic(); invariant(_receivingChunks.empty()); - invariant(_rangesToClean.empty()); + invariant(_rangesToClean.isEmpty()); _setActiveMetadata_inlock(std::move(remoteMetadata)); return; @@ -114,9 +223,7 @@ void MetadataManager::refreshActiveMetadata(std::unique_ptr<CollectionMetadata> << remoteMetadata->toStringBasic() << " due to epoch change"; _receivingChunks.clear(); - _rangesToClean.clear(); - - _setActiveMetadata_inlock(std::move(remoteMetadata)); + _rangesToClean.clear(Status::OK()); return; } @@ -132,337 +239,325 @@ void MetadataManager::refreshActiveMetadata(std::unique_ptr<CollectionMetadata> << _activeMetadataTracker->metadata->toStringBasic() << " to " << remoteMetadata->toStringBasic(); - // Resolve any receiving chunks, which might have completed by now + // Resolve any receiving chunks, which might have completed by now. + // Should be no more than one. for (auto it = _receivingChunks.begin(); it != _receivingChunks.end();) { - const BSONObj min = it->first; - const BSONObj max = it->second.getMaxKey(); + BSONObj const& min = it->first; + BSONObj const& max = it->second.getMaxKey(); - // Our pending range overlaps at least one chunk - if (rangeMapContains(remoteMetadata->getChunks(), min, max)) { - // The remote metadata contains a chunk we were earlier in the process of receiving, so - // we deem it successfully received. - LOG(2) << "Verified chunk " << redact(ChunkRange(min, max).toString()) - << " for collection " << _nss.ns() << " has been migrated to this shard earlier"; - - _receivingChunks.erase(it++); - continue; - } else if (!rangeMapOverlaps(remoteMetadata->getChunks(), min, max)) { + if (!remoteMetadata->rangeOverlapsChunk(ChunkRange(min, max))) { ++it; continue; } + // The remote metadata contains a chunk we were earlier in the process of receiving, so + // we deem it successfully received. + LOG(2) << "Verified chunk " << ChunkRange(min, max) << " for collection " << _nss.ns() + << " has been migrated to this shard earlier"; - // Partial overlap indicates that the earlier migration has failed, but the chunk being - // migrated underwent some splits and other migrations and ended up here again. In this - // case, we will request full reload of the metadata. Currently this cannot happen, because - // all migrations are with the explicit knowledge of the recipient shard. However, we leave - // the option open so that chunk splits can do empty chunk move without having to notify the - // recipient. - RangeVector overlappedChunks; - getRangeMapOverlap(remoteMetadata->getChunks(), min, max, &overlappedChunks); - - for (const auto& overlapChunkMin : overlappedChunks) { - auto itRecv = _receivingChunks.find(overlapChunkMin.first); - invariant(itRecv != _receivingChunks.end()); - - const ChunkRange receivingRange(itRecv->first, itRecv->second.getMaxKey()); - - _receivingChunks.erase(itRecv); - - // Make sure any potentially partially copied chunks are scheduled to be cleaned up - _addRangeToClean_inlock(receivingRange); - } - - // Need to reset the iterator + _receivingChunks.erase(it); it = _receivingChunks.begin(); } - // For compatibility with the current range deleter, which is driven entirely by the contents of - // the CollectionMetadata update the pending chunks - for (const auto& receivingChunk : _receivingChunks) { - ChunkType chunk; - chunk.setMin(receivingChunk.first); - chunk.setMax(receivingChunk.second.getMaxKey()); - remoteMetadata = remoteMetadata->clonePlusPending(chunk); - } - _setActiveMetadata_inlock(std::move(remoteMetadata)); } -void MetadataManager::beginReceive(const ChunkRange& range) { - stdx::lock_guard<stdx::mutex> scopedLock(_managerLock); - - // Collection is not known to be sharded if the active metadata tracker is null - invariant(_activeMetadataTracker); - - // If range is contained within pending chunks, this means a previous migration must have failed - // and we need to clean all overlaps - RangeVector overlappedChunks; - getRangeMapOverlap(_receivingChunks, range.getMin(), range.getMax(), &overlappedChunks); - - for (const auto& overlapChunkMin : overlappedChunks) { - auto itRecv = _receivingChunks.find(overlapChunkMin.first); - invariant(itRecv != _receivingChunks.end()); - - const ChunkRange receivingRange(itRecv->first, itRecv->second.getMaxKey()); - - _receivingChunks.erase(itRecv); - - // Make sure any potentially partially copied chunks are scheduled to be cleaned up - _addRangeToClean_inlock(receivingRange); - } - - // Need to ensure that the background range deleter task won't delete the range we are about to - // receive - _removeRangeToClean_inlock(range, Status::OK()); - _receivingChunks.insert( - std::make_pair(range.getMin().getOwned(), - CachedChunkInfo(range.getMax().getOwned(), ChunkVersion::IGNORED()))); - - // For compatibility with the current range deleter, update the pending chunks on the collection - // metadata to include the chunk being received - ChunkType chunk; - chunk.setMin(range.getMin()); - chunk.setMax(range.getMax()); - _setActiveMetadata_inlock(_activeMetadataTracker->metadata->clonePlusPending(chunk)); -} - -void MetadataManager::forgetReceive(const ChunkRange& range) { - stdx::lock_guard<stdx::mutex> scopedLock(_managerLock); - - { - auto it = _receivingChunks.find(range.getMin()); - invariant(it != _receivingChunks.end()); - - // Verify entire ChunkRange is identical, not just the min key. - invariant( - SimpleBSONObjComparator::kInstance.evaluate(it->second.getMaxKey() == range.getMax())); - - _receivingChunks.erase(it); - } - - // This is potentially a partially received data, which needs to be cleaned up - _addRangeToClean_inlock(range); - - // For compatibility with the current range deleter, update the pending chunks on the collection - // metadata to exclude the chunk being received, which was added in beginReceive - ChunkType chunk; - chunk.setMin(range.getMin()); - chunk.setMax(range.getMax()); - _setActiveMetadata_inlock(_activeMetadataTracker->metadata->cloneMinusPending(chunk)); -} - -RangeMap MetadataManager::getCopyOfReceivingChunks() { - stdx::lock_guard<stdx::mutex> scopedLock(_managerLock); - return _receivingChunks; -} - void MetadataManager::_setActiveMetadata_inlock(std::unique_ptr<CollectionMetadata> newMetadata) { - if (_activeMetadataTracker->usageCounter > 0) { - _metadataInUse.push_front(std::move(_activeMetadataTracker)); + if (_activeMetadataTracker->usageCounter != 0 || _activeMetadataTracker->orphans) { + _metadataInUse.push_back(std::move(_activeMetadataTracker)); } - - _activeMetadataTracker = stdx::make_unique<CollectionMetadataTracker>(std::move(newMetadata)); + _activeMetadataTracker = std::make_shared<Tracker>(std::move(newMetadata), this); } -void MetadataManager::_removeMetadata_inlock(CollectionMetadataTracker* metadataTracker) { - invariant(metadataTracker->usageCounter == 0); - - auto i = _metadataInUse.begin(); - const auto e = _metadataInUse.end(); - while (i != e) { - if (metadataTracker == i->get()) { - _metadataInUse.erase(i); - return; +// call locked +void MetadataManager::_retireExpiredMetadata() { + bool notify = false; + while (!_metadataInUse.empty() && _metadataInUse.front()->usageCounter == 0) { + // No ScopedCollectionMetadata can see this Tracker, other than, maybe, the caller. + auto& tracker = _metadataInUse.front(); + if (tracker->orphans) { + notify = true; + log() << "Queries possibly dependent on " << _nss.ns() << " range " << *tracker->orphans + << " finished; scheduling range for deletion"; + _pushRangeToClean(*tracker->orphans); } - - ++i; + tracker->metadata.reset(); // Discard the CollectionMetadata. + _metadataInUse.pop_front(); // Disconnect from the tracker (and maybe destroy it) + } + if (_metadataInUse.empty() && _activeMetadataTracker->orphans) { + notify = true; + log() << "Queries possibly dependent on " << _nss.ns() << " range " + << *_activeMetadataTracker->orphans << " finished; scheduling range for deletion"; + _pushRangeToClean(*_activeMetadataTracker->orphans); + _activeMetadataTracker->orphans = boost::none; + } + if (notify) { + _notifyInUse(); // wake up waitForClean because we changed inUse } } -MetadataManager::CollectionMetadataTracker::CollectionMetadataTracker( - std::unique_ptr<CollectionMetadata> m) - : metadata(std::move(m)) {} +MetadataManager::Tracker::Tracker(std::unique_ptr<CollectionMetadata> md, MetadataManager* mgr) + : metadata(std::move(md)), manager(mgr) {} -ScopedCollectionMetadata::ScopedCollectionMetadata() = default; +// ScopedCollectionMetadata members -// called in lock +// call with MetadataManager locked ScopedCollectionMetadata::ScopedCollectionMetadata( - MetadataManager* manager, MetadataManager::CollectionMetadataTracker* tracker) - : _manager(manager), _tracker(tracker) { - _tracker->usageCounter++; + std::shared_ptr<MetadataManager::Tracker> tracker) + : _tracker(std::move(tracker)) { + ++_tracker->usageCounter; } ScopedCollectionMetadata::~ScopedCollectionMetadata() { - if (!_tracker) - return; - _decrementUsageCounter(); + _clear(); } CollectionMetadata* ScopedCollectionMetadata::operator->() const { - return _tracker->metadata.get(); + return _tracker ? _tracker->metadata.get() : nullptr; } CollectionMetadata* ScopedCollectionMetadata::getMetadata() const { - return _tracker->metadata.get(); + return _tracker ? _tracker->metadata.get() : nullptr; +} + +void ScopedCollectionMetadata::_clear() { + if (!_tracker) { + return; + } + // Note: There is no risk of deadlock here because the only other place in MetadataManager + // that takes the trackerLock, ~MetadataManager(), does not hold _managerLock at the same time, + // and ScopedCollectionMetadata takes _managerLock only here. + stdx::unique_lock<stdx::mutex> trackerLock(_tracker->trackerLock); + MetadataManager* manager = _tracker->manager; + if (manager) { + stdx::lock_guard<stdx::mutex> managerLock(_tracker->manager->_managerLock); + trackerLock.unlock(); + invariant(_tracker->usageCounter != 0); + if (--_tracker->usageCounter == 0 && !manager->_shuttingDown) { + // MetadataManager doesn't care which usageCounter went to zero. It justs retires all + // that are older than the oldest tracker still in use by queries. (Some start out at + // zero, some go to zero but can't be expired yet.) Note that new instances of + // ScopedCollectionMetadata may get attached to the active tracker, so its usage + // count can increase from zero, unlike most reference counts. + manager->_retireExpiredMetadata(); + } + } else { + trackerLock.unlock(); + } + _tracker.reset(); // disconnect from the tracker. } +// do not call with MetadataManager locked ScopedCollectionMetadata::ScopedCollectionMetadata(ScopedCollectionMetadata&& other) { - *this = std::move(other); + *this = std::move(other); // Rely on this->_tracker being zero-initialized already. } +// do not call with MetadataManager locked ScopedCollectionMetadata& ScopedCollectionMetadata::operator=(ScopedCollectionMetadata&& other) { if (this != &other) { - // If "this" was previously initialized, make sure we perform the same logic as in the - // destructor to decrement _tracker->usageCounter for the CollectionMetadata "this" had a - // reference to before replacing _tracker with other._tracker. - if (_tracker) { - _decrementUsageCounter(); - } - - _manager = other._manager; - _tracker = other._tracker; - other._manager = nullptr; - other._tracker = nullptr; + _clear(); + _tracker = std::move(other._tracker); } - return *this; } -void ScopedCollectionMetadata::_decrementUsageCounter() { - invariant(_manager); - invariant(_tracker); - stdx::lock_guard<stdx::mutex> scopedLock(_manager->_managerLock); - invariant(_tracker->usageCounter > 0); - if (--_tracker->usageCounter == 0) { - _manager->_removeMetadata_inlock(_tracker); - } +ScopedCollectionMetadata::operator bool() const { + return _tracker && _tracker->metadata; // with a Collection lock the metadata member is stable } -ScopedCollectionMetadata::operator bool() const { - return _tracker && _tracker->metadata.get(); +void MetadataManager::toBSONPending(BSONArrayBuilder& bb) const { + for (auto it = _receivingChunks.begin(); it != _receivingChunks.end(); ++it) { + BSONArrayBuilder pendingBB(bb.subarrayStart()); + pendingBB.append(it->first); + pendingBB.append(it->second.getMaxKey()); + pendingBB.done(); + } } -RangeMap MetadataManager::getCopyOfRangesToClean() { +void MetadataManager::append(BSONObjBuilder* builder) { stdx::lock_guard<stdx::mutex> scopedLock(_managerLock); - return _getCopyOfRangesToClean_inlock(); -} -RangeMap MetadataManager::_getCopyOfRangesToClean_inlock() { - RangeMap ranges = SimpleBSONObjComparator::kInstance.makeBSONObjIndexedMap<CachedChunkInfo>(); - for (auto it = _rangesToClean.begin(); it != _rangesToClean.end(); ++it) { - ranges.insert(std::make_pair( - it->first, CachedChunkInfo(it->second.getMax(), ChunkVersion::IGNORED()))); + _rangesToClean.append(builder); + + BSONArrayBuilder pcArr(builder->subarrayStart("pendingChunks")); + for (const auto& entry : _receivingChunks) { + BSONObjBuilder obj; + ChunkRange r = ChunkRange(entry.first, entry.second.getMaxKey()); + r.append(&obj); + pcArr.append(obj.done()); + } + pcArr.done(); + + BSONArrayBuilder amrArr(builder->subarrayStart("activeMetadataRanges")); + for (const auto& entry : _activeMetadataTracker->metadata->getChunks()) { + BSONObjBuilder obj; + ChunkRange r = ChunkRange(entry.first, entry.second.getMaxKey()); + r.append(&obj); + amrArr.append(obj.done()); } - return ranges; + amrArr.done(); } -std::shared_ptr<Notification<Status>> MetadataManager::addRangeToClean(const ChunkRange& range) { - stdx::lock_guard<stdx::mutex> scopedLock(_managerLock); - return _addRangeToClean_inlock(range); +void MetadataManager::_scheduleCleanup(executor::TaskExecutor* executor, NamespaceString nss) { + executor->scheduleWork([executor, nss](auto&) { + const int maxToDelete = std::max(int(internalQueryExecYieldIterations.load()), 1); + Client::initThreadIfNotAlready("Collection Range Deleter"); + auto UniqueOpCtx = Client::getCurrent()->makeOperationContext(); + auto opCtx = UniqueOpCtx.get(); + bool again = CollectionRangeDeleter::cleanUpNextRange(opCtx, nss, maxToDelete); + if (again) { + _scheduleCleanup(executor, nss); + } + }); } -std::shared_ptr<Notification<Status>> MetadataManager::_addRangeToClean_inlock( - const ChunkRange& range) { - // This first invariant currently makes an unnecessary copy, to reuse the - // rangeMapOverlaps helper function. - invariant(!rangeMapOverlaps(_getCopyOfRangesToClean_inlock(), range.getMin(), range.getMax())); - invariant(!rangeMapOverlaps(_receivingChunks, range.getMin(), range.getMax())); +// call locked +void MetadataManager::_pushRangeToClean(ChunkRange const& range) { + _rangesToClean.add(range); + if (_rangesToClean.size() == 1) { + _scheduleCleanup(_executor, _nss); + } +} + +void MetadataManager::_addToReceiving(ChunkRange const& range) { + _receivingChunks.insert( + std::make_pair(range.getMin().getOwned(), + CachedChunkInfo(range.getMax().getOwned(), ChunkVersion::IGNORED()))); +} - RangeToCleanDescriptor descriptor(range.getMax().getOwned()); - _rangesToClean.insert(std::make_pair(range.getMin().getOwned(), descriptor)); +bool MetadataManager::beginReceive(ChunkRange const& range) { + stdx::unique_lock<stdx::mutex> scopedLock(_managerLock); - // If _rangesToClean was previously empty, we need to start the collection range deleter - if (_rangesToClean.size() == 1UL) { - ShardingState::get(_serviceContext)->scheduleCleanup(_nss); + auto* metadata = _activeMetadataTracker->metadata.get(); + if (_overlapsInUseChunk(range) || metadata->rangeOverlapsChunk(range)) { + log() << "Rejecting in-migration to " << _nss.ns() << " range " << range + << " because a running query might depend on documents in the range"; + return false; } + _addToReceiving(range); + _pushRangeToClean(range); + log() << "Scheduling deletion of any documents in " << _nss.ns() << " range " << range + << " before migrating in a chunk covering the range"; + return true; +} - return descriptor.getNotification(); +void MetadataManager::_removeFromReceiving(ChunkRange const& range) { + auto it = _receivingChunks.find(range.getMin()); + invariant(it != _receivingChunks.end()); + _receivingChunks.erase(it); } -void MetadataManager::removeRangeToClean(const ChunkRange& range, Status deletionStatus) { +void MetadataManager::forgetReceive(ChunkRange const& range) { stdx::lock_guard<stdx::mutex> scopedLock(_managerLock); - _removeRangeToClean_inlock(range, deletionStatus); + // This is potentially a partially received chunk, which needs to be cleaned up. We know none + // of these documents are in use, so they can go straight to the deletion queue. + log() << "Abandoning in-migration of " << _nss.ns() << " range " << range + << "; scheduling deletion of any documents already copied"; + + invariant(!_overlapsInUseChunk(range) && + !_activeMetadataTracker->metadata->rangeOverlapsChunk(range)); + + _removeFromReceiving(range); + _pushRangeToClean(range); } -void MetadataManager::_removeRangeToClean_inlock(const ChunkRange& range, Status deletionStatus) { - auto it = _rangesToClean.upper_bound(range.getMin()); - // We want our iterator to point at the greatest value - // that is still less than or equal to range. - if (it != _rangesToClean.begin()) { - --it; +Status MetadataManager::cleanUpRange(ChunkRange const& range) { + stdx::unique_lock<stdx::mutex> scopedLock(_managerLock); + CollectionMetadata* metadata = _activeMetadataTracker->metadata.get(); + invariant(metadata != nullptr); + + if (metadata->rangeOverlapsChunk(range)) { + return {ErrorCodes::RangeOverlapConflict, + str::stream() << "Requested deletion range overlaps a live shard chunk"}; } - for (; it != _rangesToClean.end() && - SimpleBSONObjComparator::kInstance.evaluate(it->first < range.getMax());) { - if (SimpleBSONObjComparator::kInstance.evaluate(it->second.getMax() <= range.getMin())) { - ++it; - continue; - } + if (rangeMapOverlaps(_receivingChunks, range.getMin(), range.getMax())) { + return {ErrorCodes::RangeOverlapConflict, + str::stream() << "Requested deletion range overlaps a chunk being migrated in"}; + } - // There's overlap between *it and range so we remove *it - // and then replace with new ranges. - BSONObj oldMin = it->first; - BSONObj oldMax = it->second.getMax(); - it->second.complete(deletionStatus); - _rangesToClean.erase(it++); - if (SimpleBSONObjComparator::kInstance.evaluate(oldMin < range.getMin())) { - _addRangeToClean_inlock(ChunkRange(oldMin, range.getMin())); - } + if (!_overlapsInUseChunk(range)) { + // No running queries can depend on it, so queue it for deletion immediately. + log() << "Scheduling " << _nss.ns() << " range " << redact(range.toString()) + << " for deletion"; + + _pushRangeToClean(range); + } else { + invariant(!_metadataInUse.empty()); - if (SimpleBSONObjComparator::kInstance.evaluate(oldMax > range.getMax())) { - _addRangeToClean_inlock(ChunkRange(range.getMax(), oldMax)); + if (_activeMetadataTracker->orphans) { + _setActiveMetadata_inlock(_activeMetadataTracker->metadata->clone()); } + + _activeMetadataTracker->orphans.emplace(range.getMin().getOwned(), + range.getMax().getOwned()); + + log() << "Scheduling " << _nss.ns() << " range " << redact(range.toString()) + << " for deletion after all possibly-dependent queries finish"; } + + return Status::OK(); } -void MetadataManager::append(BSONObjBuilder* builder) { +size_t MetadataManager::numberOfRangesToCleanStillInUse() { stdx::lock_guard<stdx::mutex> scopedLock(_managerLock); + size_t count = _activeMetadataTracker->orphans ? 1 : 0; + count += std::count_if(_metadataInUse.begin(), _metadataInUse.end(), [](auto& tracker) { + return bool(tracker->orphans); + }); + return count; +} - BSONArrayBuilder rtcArr(builder->subarrayStart("rangesToClean")); - for (const auto& entry : _rangesToClean) { - BSONObjBuilder obj; - ChunkRange r = ChunkRange(entry.first, entry.second.getMax()); - r.append(&obj); - rtcArr.append(obj.done()); - } - rtcArr.done(); +size_t MetadataManager::numberOfRangesToClean() { + stdx::unique_lock<stdx::mutex> scopedLock(_managerLock); + return _rangesToClean.size(); +} - BSONArrayBuilder pcArr(builder->subarrayStart("pendingChunks")); - for (const auto& entry : _receivingChunks) { - BSONObjBuilder obj; - ChunkRange r = ChunkRange(entry.first, entry.second.getMaxKey()); - r.append(&obj); - pcArr.append(obj.done()); - } - pcArr.done(); +MetadataManager::CleanupNotification MetadataManager::trackOrphanedDataCleanup( + ChunkRange const& range) { - BSONArrayBuilder amrArr(builder->subarrayStart("activeMetadataRanges")); - for (const auto& entry : _activeMetadataTracker->metadata->getChunks()) { - BSONObjBuilder obj; - ChunkRange r = ChunkRange(entry.first, entry.second.getMaxKey()); - r.append(&obj); - amrArr.append(obj.done()); + stdx::unique_lock<stdx::mutex> scopedLock(_managerLock); + if (_overlapsInUseCleanups(range)) + return _notification; + return _rangesToClean.overlaps(range); +} + +// call locked +bool MetadataManager::_overlapsInUseChunk(ChunkRange const& range) { + if (_activeMetadataTracker->metadata->rangeOverlapsChunk(range)) { + return true; // refcount doesn't matter for the active case } - amrArr.done(); + for (auto& tracker : _metadataInUse) { + if (tracker->usageCounter != 0 && tracker->metadata->rangeOverlapsChunk(range)) { + return true; + } + } + return false; } -bool MetadataManager::hasRangesToClean() { - stdx::lock_guard<stdx::mutex> scopedLock(_managerLock); - return !_rangesToClean.empty(); +// call locked +bool MetadataManager::_overlapsInUseCleanups(ChunkRange const& range) { + if (_activeMetadataTracker->orphans && _activeMetadataTracker->orphans->overlapWith(range)) { + return true; + } + for (auto& tracker : _metadataInUse) { + if (tracker->orphans && bool(tracker->orphans->overlapWith(range))) { + return true; + } + } + return false; } -bool MetadataManager::isInRangesToClean(const ChunkRange& range) { - stdx::lock_guard<stdx::mutex> scopedLock(_managerLock); - // For convenience, this line makes an unnecessary copy, to reuse the - // rangeMapContains helper function. - return rangeMapContains(_getCopyOfRangesToClean_inlock(), range.getMin(), range.getMax()); +// call locked +void MetadataManager::_notifyInUse() { + _notification->set(Status::OK()); // wake up waitForClean + _notification = std::make_shared<Notification<Status>>(); } -ChunkRange MetadataManager::getNextRangeToClean() { - stdx::lock_guard<stdx::mutex> scopedLock(_managerLock); - invariant(!_rangesToClean.empty()); - auto it = _rangesToClean.begin(); - return ChunkRange(it->first, it->second.getMax()); +boost::optional<KeyRange> MetadataManager::getNextOrphanRange(BSONObj const& from) { + stdx::unique_lock<stdx::mutex> scopedLock(_managerLock); + invariant(_activeMetadataTracker->metadata); + return _activeMetadataTracker->metadata->getNextOrphanRange(_receivingChunks, from); } + } // namespace mongo diff --git a/src/mongo/db/s/metadata_manager.h b/src/mongo/db/s/metadata_manager.h index 5c2e7f2e64a..a76766e5e75 100644 --- a/src/mongo/db/s/metadata_manager.h +++ b/src/mongo/db/s/metadata_manager.h @@ -29,17 +29,18 @@ #pragma once #include <list> -#include <memory> #include "mongo/base/disallow_copying.h" #include "mongo/bson/simple_bsonobj_comparator.h" #include "mongo/db/namespace_string.h" +#include "mongo/db/range_arithmetic.h" #include "mongo/db/s/collection_metadata.h" +#include "mongo/db/s/collection_range_deleter.h" #include "mongo/db/service_context.h" +#include "mongo/executor/task_executor.h" #include "mongo/s/catalog/type_chunk.h" -#include "mongo/util/concurrency/notification.h" - #include "mongo/stdx/memory.h" +#include "mongo/util/concurrency/notification.h" namespace mongo { @@ -49,7 +50,7 @@ class MetadataManager { MONGO_DISALLOW_COPYING(MetadataManager); public: - MetadataManager(ServiceContext* sc, NamespaceString nss); + MetadataManager(ServiceContext*, NamespaceString nss, executor::TaskExecutor* rangeDeleter); ~MetadataManager(); /** @@ -62,167 +63,186 @@ public: ScopedCollectionMetadata getActiveMetadata(); /** + * Returns the number of CollectionMetadata objects being maintained on behalf of running + * queries. The actual number may vary after it returns, so this is really only useful for unit + * tests. + */ + size_t numberOfMetadataSnapshots(); + + /** * Uses the contents of the specified metadata as a way to purge any pending chunks. */ void refreshActiveMetadata(std::unique_ptr<CollectionMetadata> newMetadata); + void toBSONPending(BSONArrayBuilder& bb) const; + /** - * Puts the specified range on the list of chunks, which are being received so that the range - * deleter process will not clean the partially migrated data. + * Appends information on all the chunk ranges in rangesToClean to builder. */ - void beginReceive(const ChunkRange& range); + void append(BSONObjBuilder* builder); /** - * Removes a range from the list of chunks, which are being received. Used externally to - * indicate that a chunk migration failed. + * Returns a map to the set of chunks being migrated in. */ - void forgetReceive(const ChunkRange& range); + RangeMap const& getReceiveMap() const { + return _receivingChunks; + } /** - * Gets copy of the set of chunk ranges which are being received for this collection. This - * method is intended for testing purposes only and should not be used in any production code. + * If no running queries can depend on documents in the range, schedules any such documents for + * immediate cleanup. Otherwise, returns false. */ - RangeMap getCopyOfReceivingChunks(); + bool beginReceive(ChunkRange const& range); /** - * Adds a new range to be cleaned up. - * The newly introduced range must not overlap with the existing ranges. - */ - std::shared_ptr<Notification<Status>> addRangeToClean(const ChunkRange& range); + * Removes the range from the pending list, and schedules any documents in the range for + * immediate cleanup. Assumes no active queries can see any local documents in the range. + */ + void forgetReceive(const ChunkRange& range); /** - * Calls removeRangeToClean with Status::OK. + * Initiates cleanup of the orphaned documents as if a chunk has been migrated out. If any + * documents in the range might still be in use by running queries, queues cleanup to begin + * after they have all terminated. Otherwise, schedules documents for immediate cleanup. + * Fails if the range overlaps any current local shard chunk. + * + * Must be called with the collection locked for writing. To monitor completion, use + * trackOrphanedDataCleanup or CollectionShardingState::waitForClean. */ - void removeRangeToClean(const ChunkRange& range) { - removeRangeToClean(range, Status::OK()); - } + Status cleanUpRange(ChunkRange const& range); /** - * Removes the specified range from the ranges to be cleaned up. - * The specified deletionStatus will be returned to callers waiting - * on whether the deletion succeeded or failed. + * Returns the number of ranges scheduled to be cleaned, exclusive of such ranges that might + * still be in use by running queries. Outside of test drivers, the actual number may vary + * after it returns, so this is really only useful for unit tests. */ - void removeRangeToClean(const ChunkRange& range, Status deletionStatus); + size_t numberOfRangesToClean(); /** - * Gets copy of the set of chunk ranges which are scheduled for cleanup. - * Converts RangeToCleanMap to RangeMap. + * Returns the number of ranges scheduled to be cleaned once all queries that could depend on + * them have terminated. The actual number may vary after it returns, so this is really only + * useful for unit tests. */ - RangeMap getCopyOfRangesToClean(); + size_t numberOfRangesToCleanStillInUse(); + using CleanupNotification = CollectionRangeDeleter::DeleteNotification; /** - * Appends information on all the chunk ranges in rangesToClean to builder. + * Reports whether the argument range is still scheduled for deletion. If not, returns nullptr. + * Otherwise, returns a notification n such that n->get(opCtx) will wake when deletion of a + * range (possibly the one of interest) is completed. */ - void append(BSONObjBuilder* builder); + CleanupNotification trackOrphanedDataCleanup(ChunkRange const& orphans); + + boost::optional<KeyRange> getNextOrphanRange(BSONObj const& from); + +private: + struct Tracker; /** - * Returns true if _rangesToClean is not empty. + * Retires any metadata that has fallen out of use, and pushes any orphan ranges found in them + * to the list of ranges actively being cleaned up. */ - bool hasRangesToClean(); + void _retireExpiredMetadata(); /** - * Returns true if the exact range is in _rangesToClean. + * Pushes current set of chunks, if any, to _metadataInUse, replaces it with newMetadata. */ - bool isInRangesToClean(const ChunkRange& range); + void _setActiveMetadata_inlock(std::unique_ptr<CollectionMetadata> newMetadata); /** - * Gets and returns, but does not remove, a single ChunkRange from _rangesToClean. - * Should not be called if _rangesToClean is empty: it will hit an invariant. + * Returns true if the specified range overlaps any chunk that might be currently in use by a + * running query. + * + * must be called locked. */ - ChunkRange getNextRangeToClean(); -private: - friend class ScopedCollectionMetadata; + bool _overlapsInUseChunk(ChunkRange const& range); - struct CollectionMetadataTracker { - public: - /** - * Creates a new CollectionMetadataTracker, with the usageCounter initialized to zero. - */ - CollectionMetadataTracker(std::unique_ptr<CollectionMetadata> m); - - std::unique_ptr<CollectionMetadata> metadata; - - uint32_t usageCounter{0}; - }; - - // Class for the value of the _rangesToClean map. Used because callers of addRangeToClean - // sometimes need to wait until a range is deleted. Thus, complete(Status) is called - // when the range is deleted from _rangesToClean in removeRangeToClean(), letting callers - // of addRangeToClean know if the deletion succeeded or failed. - class RangeToCleanDescriptor { - public: - /** - * Initializes a RangeToCleanDescriptor with an empty notification. - */ - RangeToCleanDescriptor(BSONObj max) - : _max(max.getOwned()), _notification(std::make_shared<Notification<Status>>()) {} - - /** - * Gets the maximum value of the range to be deleted. - */ - const BSONObj& getMax() const { - return _max; - } - - // See comment on _notification. - std::shared_ptr<Notification<Status>> getNotification() { - return _notification; - } - - /** - * Sets the status on _notification. This will tell threads - * waiting on the value of status that the deletion succeeded or failed. - */ - void complete(Status status) { - _notification->set(status); - } - - private: - // The maximum value of the range to be deleted. - BSONObj _max; - - // This _notification will be set with a value indicating whether the deletion - // succeeded or failed. - std::shared_ptr<Notification<Status>> _notification; - }; + /** + * Returns true if any range (possibly) still in use, but scheduled for cleanup, overlaps + * the argument range. + * + * Must be called locked. + */ + bool _overlapsInUseCleanups(ChunkRange const& range); /** - * Removes the CollectionMetadata stored in the tracker from the _metadataInUse - * list (if it's there). + * Deletes ranges, in background, until done, normally using a task executor attached to the + * ShardingState. + * + * Each time it completes cleaning up a range, it wakes up clients waiting on completion of + * that range, which may then verify their range has no more deletions scheduled, and proceed. */ - void _removeMetadata_inlock(CollectionMetadataTracker* metadataTracker); + static void _scheduleCleanup(executor::TaskExecutor*, NamespaceString nss); - std::shared_ptr<Notification<Status>> _addRangeToClean_inlock(const ChunkRange& range); + /** + * Adds the range to the list of ranges scheduled for immediate deletion, and schedules a + * a background task to perform the work. + * + * Must be called locked. + */ + void _pushRangeToClean(ChunkRange const& range); - void _removeRangeToClean_inlock(const ChunkRange& range, Status deletionStatus); + /** + * Adds a range from the receiving map, so getNextOrphanRange will skip ranges migrating in. + */ + void _addToReceiving(ChunkRange const& range); - RangeMap _getCopyOfRangesToClean_inlock(); + /** + * Removes a range from the receiving map after a migration failure. range.minKey() must + * exactly match an element of _receivingChunks. + */ + void _removeFromReceiving(ChunkRange const& range); - void _setActiveMetadata_inlock(std::unique_ptr<CollectionMetadata> newMetadata); + /** + * Wakes up any clients waiting on a range to leave _metadataInUse + * + * Must be called locked. + */ + void _notifyInUse(); + + // data members const NamespaceString _nss; // ServiceContext from which to obtain instances of global support objects. - ServiceContext* _serviceContext; + ServiceContext* const _serviceContext; // Mutex to protect the state below stdx::mutex _managerLock; - // Holds the collection metadata, which is currently active - std::unique_ptr<CollectionMetadataTracker> _activeMetadataTracker; + bool _shuttingDown{false}; - // Holds collection metadata instances, which have previously been active, but are still in use - // by still active server operations or cursors - std::list<std::unique_ptr<CollectionMetadataTracker>> _metadataInUse; + // The collection metadata reflecting chunks accessible to new queries + std::shared_ptr<Tracker> _activeMetadataTracker; - // Chunk ranges which are currently assumed to be transferred to the shard. Indexed by the min - // key of the range. + // Previously active collection metadata instances still in use by active server operations or + // cursors + std::list<std::shared_ptr<Tracker>> _metadataInUse; + + // Chunk ranges being migrated into to the shard. Indexed by the min key of the range. RangeMap _receivingChunks; - // Set of ranges to be deleted. Indexed by the min key of the range. - typedef BSONObjIndexedMap<RangeToCleanDescriptor> RangeToCleanMap; - RangeToCleanMap _rangesToClean; + // Clients can sleep on copies of _notification while waiting for their orphan ranges to fall + // out of use. + std::shared_ptr<Notification<Status>> _notification; + + // The background task that deletes documents from orphaned chunk ranges. + executor::TaskExecutor* const _executor; + + // Ranges being deleted, or scheduled to be deleted, by a background task + CollectionRangeDeleter _rangesToClean; + + // friends + + // for access to _decrementTrackerUsage(), and to Tracker. + friend class ScopedCollectionMetadata; + + // for access to _rangesToClean and _managerLock under task callback + friend bool CollectionRangeDeleter::cleanUpNextRange(OperationContext*, + NamespaceString const&, + int maxToDelete, + CollectionRangeDeleter*); }; class ScopedCollectionMetadata { @@ -233,41 +253,43 @@ public: * Creates an empty ScopedCollectionMetadata. Using the default constructor means that no * metadata is available. */ - ScopedCollectionMetadata(); - + ScopedCollectionMetadata() = default; ~ScopedCollectionMetadata(); + /** + * Binds *this to the same tracker as other, if any. + */ ScopedCollectionMetadata(ScopedCollectionMetadata&& other); ScopedCollectionMetadata& operator=(ScopedCollectionMetadata&& other); /** - * Dereferencing the ScopedCollectionMetadata will dereference the internal CollectionMetadata. + * Dereferencing the ScopedCollectionMetadata dereferences the private CollectionMetadata. */ CollectionMetadata* operator->() const; CollectionMetadata* getMetadata() const; /** - * True if the ScopedCollectionMetadata stores a metadata (is not empty) + * True if the ScopedCollectionMetadata stores a metadata (is not empty) and the collection is + * sharded. */ operator bool() const; private: - friend ScopedCollectionMetadata MetadataManager::getActiveMetadata(); - /** - * Increments the counter in the CollectionMetadataTracker. + * If tracker is non-null, increments the refcount in the specified tracker. + * + * Must be called with tracker->manager locked. */ - ScopedCollectionMetadata(MetadataManager* manager, - MetadataManager::CollectionMetadataTracker* tracker); + ScopedCollectionMetadata(std::shared_ptr<MetadataManager::Tracker> tracker); /** - * Decrements the usageCounter and conditionally makes a call to _removeMetadata on - * the tracker if the count has reached zero. + * Disconnect from the tracker, possibly triggering GC of unused CollectionMetadata. */ - void _decrementUsageCounter(); + void _clear(); + + std::shared_ptr<MetadataManager::Tracker> _tracker{nullptr}; - MetadataManager* _manager{nullptr}; - MetadataManager::CollectionMetadataTracker* _tracker{nullptr}; + friend ScopedCollectionMetadata MetadataManager::getActiveMetadata(); // uses our private ctor }; } // namespace mongo diff --git a/src/mongo/db/s/metadata_manager_test.cpp b/src/mongo/db/s/metadata_manager_test.cpp index d4416cbfbea..7a559619a86 100644 --- a/src/mongo/db/s/metadata_manager_test.cpp +++ b/src/mongo/db/s/metadata_manager_test.cpp @@ -31,29 +31,74 @@ #include "mongo/db/s/metadata_manager.h" #include "mongo/bson/bsonobjbuilder.h" +#include "mongo/client/remote_command_targeter_mock.h" #include "mongo/db/client.h" +#include "mongo/db/db_raii.h" +#include "mongo/db/dbdirectclient.h" #include "mongo/db/jsobj.h" #include "mongo/db/namespace_string.h" +#include "mongo/db/repl/replication_coordinator_mock.h" #include "mongo/db/s/collection_metadata.h" +#include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/s/sharding_state.h" +#include "mongo/db/s/type_shard_identity.h" +#include "mongo/db/server_options.h" #include "mongo/db/service_context.h" #include "mongo/db/service_context_d_test_fixture.h" +#include "mongo/executor/task_executor.h" +#include "mongo/s/catalog/dist_lock_catalog_impl.h" +#include "mongo/s/catalog/dist_lock_manager_mock.h" +#include "mongo/s/catalog/sharding_catalog_client_mock.h" #include "mongo/s/catalog/type_chunk.h" +#include "mongo/s/client/shard_registry.h" +#include "mongo/s/sharding_mongod_test_fixture.h" +#include "mongo/stdx/condition_variable.h" #include "mongo/stdx/memory.h" #include "mongo/unittest/unittest.h" #include "mongo/util/assert_util.h" + +#include <boost/optional.hpp> + namespace mongo { namespace { using unittest::assertGet; -class MetadataManagerTest : public ServiceContextMongoDTest { +const NamespaceString kNss("TestDB", "TestColl"); +const std::string kPattern = "X"; +const BSONObj kShardKeyPattern{BSON(kPattern << 1)}; +const std::string kShardName{"a"}; +const HostAndPort dummyHost("dummy", 123); + +class MetadataManagerTest : public ShardingMongodTestFixture { +public: + std::shared_ptr<RemoteCommandTargeterMock> configTargeter() { + return RemoteCommandTargeterMock::get(shardRegistry()->getConfigShard()->getTargeter()); + } + protected: void setUp() override { - ServiceContextMongoDTest::setUp(); - ShardingState::get(getServiceContext()) - ->setScheduleCleanupFunctionForTest([](const NamespaceString& nss) {}); + ShardingMongodTestFixture::setUp(); + serverGlobalParams.clusterRole = ClusterRole::ShardServer; + initializeGlobalShardingStateForMongodForTest(ConnectionString(dummyHost)); + + configTargeter()->setFindHostReturnValue(dummyHost); + } + + std::unique_ptr<DistLockCatalog> makeDistLockCatalog(ShardRegistry* shardRegistry) override { + invariant(shardRegistry); + return stdx::make_unique<DistLockCatalogImpl>(shardRegistry); + } + + std::unique_ptr<DistLockManager> makeDistLockManager( + std::unique_ptr<DistLockCatalog> distLockCatalog) override { + return stdx::make_unique<DistLockManagerMock>(std::move(distLockCatalog)); + } + + std::unique_ptr<ShardingCatalogClient> makeShardingCatalogClient( + std::unique_ptr<DistLockManager> distLockManager) override { + return stdx::make_unique<ShardingCatalogClientMock>(std::move(distLockManager)); } static std::unique_ptr<CollectionMetadata> makeEmptyMetadata() { @@ -79,7 +124,6 @@ protected: const BSONObj& minKey, const BSONObj& maxKey, const ChunkVersion& chunkVersion) { - invariant(chunkVersion.epoch() == metadata.getShardVersion().epoch()); invariant(chunkVersion.isSet()); invariant(chunkVersion > metadata.getCollVersion()); invariant(minKey.woCompare(maxKey) < 0); @@ -92,10 +136,23 @@ protected: return stdx::make_unique<CollectionMetadata>( metadata.getKeyPattern(), chunkVersion, chunkVersion, std::move(chunksMap)); } + + CollectionMetadata* addChunk(MetadataManager* manager) { + ScopedCollectionMetadata scopedMetadata1 = manager->getActiveMetadata(); + + ChunkVersion newVersion = scopedMetadata1->getCollVersion(); + newVersion.incMajor(); + std::unique_ptr<CollectionMetadata> cm2 = cloneMetadataPlusChunk( + *scopedMetadata1.getMetadata(), BSON("key" << 0), BSON("key" << 20), newVersion); + auto cm2Ptr = cm2.get(); + + manager->refreshActiveMetadata(std::move(cm2)); + return cm2Ptr; + } }; TEST_F(MetadataManagerTest, SetAndGetActiveMetadata) { - MetadataManager manager(getServiceContext(), NamespaceString("TestDb", "CollDB")); + MetadataManager manager(getServiceContext(), kNss, executor()); std::unique_ptr<CollectionMetadata> cm = makeEmptyMetadata(); auto cmPtr = cm.get(); @@ -107,197 +164,76 @@ TEST_F(MetadataManagerTest, SetAndGetActiveMetadata) { TEST_F(MetadataManagerTest, ResetActiveMetadata) { - MetadataManager manager(getServiceContext(), NamespaceString("TestDb", "CollDB")); + MetadataManager manager(getServiceContext(), kNss, executor()); manager.refreshActiveMetadata(makeEmptyMetadata()); - - ScopedCollectionMetadata scopedMetadata1 = manager.getActiveMetadata(); - - ChunkVersion newVersion = scopedMetadata1->getCollVersion(); - newVersion.incMajor(); - std::unique_ptr<CollectionMetadata> cm2 = cloneMetadataPlusChunk( - *scopedMetadata1.getMetadata(), BSON("key" << 0), BSON("key" << 10), newVersion); - auto cm2Ptr = cm2.get(); - - manager.refreshActiveMetadata(std::move(cm2)); + auto cm2Ptr = addChunk(&manager); ScopedCollectionMetadata scopedMetadata2 = manager.getActiveMetadata(); - ASSERT_EQ(cm2Ptr, scopedMetadata2.getMetadata()); }; -TEST_F(MetadataManagerTest, AddAndRemoveRangesToClean) { - MetadataManager manager(getServiceContext(), NamespaceString("TestDb", "CollDB")); - ChunkRange cr1 = ChunkRange(BSON("key" << 0), BSON("key" << 10)); - ChunkRange cr2 = ChunkRange(BSON("key" << 10), BSON("key" << 20)); - - manager.addRangeToClean(cr1); - ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 1UL); - manager.removeRangeToClean(cr1); - ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 0UL); - - manager.addRangeToClean(cr1); - manager.addRangeToClean(cr2); - manager.removeRangeToClean(cr1); - ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 1UL); - auto ranges = manager.getCopyOfRangesToClean(); - auto it = ranges.find(cr2.getMin()); - ChunkRange remainingChunk = ChunkRange(it->first, it->second.getMaxKey()); - ASSERT_EQ(remainingChunk.toString(), cr2.toString()); - manager.removeRangeToClean(cr2); -} - -// Tests that a removal in the middle of an existing ChunkRange results in -// two correct chunk ranges. -TEST_F(MetadataManagerTest, RemoveRangeInMiddleOfRange) { - MetadataManager manager(getServiceContext(), NamespaceString("TestDb", "CollDB")); - ChunkRange cr1 = ChunkRange(BSON("key" << 0), BSON("key" << 10)); - - manager.addRangeToClean(cr1); - manager.removeRangeToClean(ChunkRange(BSON("key" << 4), BSON("key" << 6))); - ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 2UL); - - auto ranges = manager.getCopyOfRangesToClean(); - auto it = ranges.find(BSON("key" << 0)); - ChunkRange expectedChunk = ChunkRange(BSON("key" << 0), BSON("key" << 4)); - ChunkRange remainingChunk = ChunkRange(it->first, it->second.getMaxKey()); - ASSERT_EQ(remainingChunk.toString(), expectedChunk.toString()); - - it++; - expectedChunk = ChunkRange(BSON("key" << 6), BSON("key" << 10)); - remainingChunk = ChunkRange(it->first, it->second.getMaxKey()); - ASSERT_EQ(remainingChunk.toString(), expectedChunk.toString()); - - manager.removeRangeToClean(cr1); - ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 0UL); -} - -// Tests removals that overlap with just one ChunkRange. -TEST_F(MetadataManagerTest, RemoveRangeWithSingleRangeOverlap) { - MetadataManager manager(getServiceContext(), NamespaceString("TestDb", "CollDB")); - ChunkRange cr1 = ChunkRange(BSON("key" << 0), BSON("key" << 10)); +// In the following tests, the ranges-to-clean is not drained by the background deleter thread +// because the collection involved has no CollectionShardingState, so the task just returns without +// doing anything. - manager.addRangeToClean(cr1); - manager.removeRangeToClean(ChunkRange(BSON("key" << 0), BSON("key" << 5))); - ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 1UL); - auto ranges = manager.getCopyOfRangesToClean(); - auto it = ranges.find(BSON("key" << 5)); - ChunkRange remainingChunk = ChunkRange(it->first, it->second.getMaxKey()); - ChunkRange expectedChunk = ChunkRange(BSON("key" << 5), BSON("key" << 10)); - ASSERT_EQ(remainingChunk.toString(), expectedChunk.toString()); - - manager.removeRangeToClean(ChunkRange(BSON("key" << 4), BSON("key" << 6))); - ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 1UL); - ranges = manager.getCopyOfRangesToClean(); - it = ranges.find(BSON("key" << 6)); - remainingChunk = ChunkRange(it->first, it->second.getMaxKey()); - expectedChunk = ChunkRange(BSON("key" << 6), BSON("key" << 10)); - ASSERT_EQ(remainingChunk.toString(), expectedChunk.toString()); - - manager.removeRangeToClean(ChunkRange(BSON("key" << 9), BSON("key" << 13))); - ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 1UL); - ranges = manager.getCopyOfRangesToClean(); - it = ranges.find(BSON("key" << 6)); - remainingChunk = ChunkRange(it->first, it->second.getMaxKey()); - expectedChunk = ChunkRange(BSON("key" << 6), BSON("key" << 9)); - ASSERT_EQ(remainingChunk.toString(), expectedChunk.toString()); - - manager.removeRangeToClean(ChunkRange(BSON("key" << 0), BSON("key" << 10))); - ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 0UL); -} - -// Tests removals that overlap with more than one ChunkRange. -TEST_F(MetadataManagerTest, RemoveRangeWithMultipleRangeOverlaps) { - MetadataManager manager(getServiceContext(), NamespaceString("TestDb", "CollDB")); - ChunkRange cr1 = ChunkRange(BSON("key" << 0), BSON("key" << 10)); - ChunkRange cr2 = ChunkRange(BSON("key" << 10), BSON("key" << 20)); - ChunkRange cr3 = ChunkRange(BSON("key" << 20), BSON("key" << 30)); - - manager.addRangeToClean(cr1); - manager.addRangeToClean(cr2); - manager.addRangeToClean(cr3); - ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 3UL); - - manager.removeRangeToClean(ChunkRange(BSON("key" << 8), BSON("key" << 22))); - ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 2UL); - auto ranges = manager.getCopyOfRangesToClean(); - auto it = ranges.find(BSON("key" << 0)); - ChunkRange remainingChunk = ChunkRange(it->first, it->second.getMaxKey()); - ChunkRange expectedChunk = ChunkRange(BSON("key" << 0), BSON("key" << 8)); - ASSERT_EQ(remainingChunk.toString(), expectedChunk.toString()); - it++; - remainingChunk = ChunkRange(it->first, it->second.getMaxKey()); - expectedChunk = ChunkRange(BSON("key" << 22), BSON("key" << 30)); - ASSERT_EQ(remainingChunk.toString(), expectedChunk.toString()); - - manager.removeRangeToClean(ChunkRange(BSON("key" << 0), BSON("key" << 30))); - ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 0UL); -} - -TEST_F(MetadataManagerTest, AddAndRemoveRangeNotificationsBlockAndYield) { - MetadataManager manager(getServiceContext(), NamespaceString("TestDb", "CollDB")); - manager.refreshActiveMetadata(makeEmptyMetadata()); - - ChunkRange cr1(BSON("key" << 0), BSON("key" << 10)); - auto notification = manager.addRangeToClean(cr1); - manager.removeRangeToClean(cr1, Status::OK()); - ASSERT_OK(notification->get()); - ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 0UL); -} - -TEST_F(MetadataManagerTest, RemoveRangeToCleanCorrectlySetsBadStatus) { - MetadataManager manager(getServiceContext(), NamespaceString("TestDb", "CollDB")); +TEST_F(MetadataManagerTest, CleanUpForMigrateIn) { + MetadataManager manager(getServiceContext(), kNss, executor()); manager.refreshActiveMetadata(makeEmptyMetadata()); - ChunkRange cr1(BSON("key" << 0), BSON("key" << 10)); - auto notification = manager.addRangeToClean(cr1); - manager.removeRangeToClean(cr1, Status(ErrorCodes::InternalError, "test error")); - ASSERT_NOT_OK(notification->get()); - ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 0UL); + ChunkRange range2(BSON("key" << 10), BSON("key" << 20)); + ASSERT(manager.beginReceive(ChunkRange(BSON("key" << 0), BSON("key" << 10)))); + ASSERT(manager.beginReceive(ChunkRange(BSON("key" << 10), BSON("key" << 20)))); + ASSERT_EQ(manager.numberOfRangesToClean(), 2UL); + ASSERT_EQ(manager.numberOfRangesToCleanStillInUse(), 0UL); } -TEST_F(MetadataManagerTest, RemovingSubrangeStillSetsNotificationStatus) { - MetadataManager manager(getServiceContext(), NamespaceString("TestDb", "CollDB")); +TEST_F(MetadataManagerTest, AddRangeNotificationsBlockAndYield) { + MetadataManager manager(getServiceContext(), kNss, executor()); manager.refreshActiveMetadata(makeEmptyMetadata()); ChunkRange cr1(BSON("key" << 0), BSON("key" << 10)); - auto notification = manager.addRangeToClean(cr1); - manager.removeRangeToClean(ChunkRange(BSON("key" << 3), BSON("key" << 7))); - ASSERT_OK(notification->get()); - ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 2UL); - manager.removeRangeToClean(cr1); - ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 0UL); - - notification = manager.addRangeToClean(cr1); - manager.removeRangeToClean(ChunkRange(BSON("key" << 7), BSON("key" << 15))); - ASSERT_OK(notification->get()); - ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 1UL); - manager.removeRangeToClean(cr1); - ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 0UL); + ASSERT_OK(manager.cleanUpRange(cr1)); + ASSERT_EQ(manager.numberOfRangesToClean(), 1UL); + auto notification = manager.trackOrphanedDataCleanup(cr1); + ASSERT(notification != nullptr && !bool(*notification)); + notification->set(Status::OK()); + ASSERT(bool(*notification)); + ASSERT_OK(notification->get(operationContext())); } TEST_F(MetadataManagerTest, NotificationBlocksUntilDeletion) { - MetadataManager manager(getServiceContext(), NamespaceString("TestDb", "CollDB")); + ChunkRange cr1(BSON("key" << 20), BSON("key" << 30)); + MetadataManager manager(getServiceContext(), kNss, executor()); manager.refreshActiveMetadata(makeEmptyMetadata()); - - ChunkRange cr1(BSON("key" << 0), BSON("key" << 10)); - auto notification = manager.addRangeToClean(cr1); - auto opCtx = cc().makeOperationContext(); - // Once the new range deleter is set up, this might fail if the range deleter - // deleted cr1 before we got here... - ASSERT_FALSE(notification->waitFor(opCtx.get(), Milliseconds(0))); - - manager.removeRangeToClean(cr1); - ASSERT_TRUE(notification->waitFor(opCtx.get(), Milliseconds(0))); - ASSERT_OK(notification->get()); + auto notif = manager.trackOrphanedDataCleanup(cr1); + ASSERT(notif.get() == nullptr); + { + ASSERT_EQ(manager.numberOfMetadataSnapshots(), 0UL); + ASSERT_EQ(manager.numberOfRangesToClean(), 0UL); + + auto scm = manager.getActiveMetadata(); // and increment scm's refcount + ASSERT(bool(scm)); + addChunk(&manager); // push new metadata + + ASSERT_EQ(manager.numberOfMetadataSnapshots(), 1UL); + ASSERT_EQ(manager.numberOfRangesToClean(), 0UL); // not yet... + + manager.cleanUpRange(cr1); + ASSERT_EQ(manager.numberOfMetadataSnapshots(), 1UL); + ASSERT_EQ(manager.numberOfRangesToClean(), 1UL); + + notif = manager.trackOrphanedDataCleanup(cr1); // will wake when scm goes away + } // scm destroyed, refcount of tracker goes to zero + ASSERT_EQ(manager.numberOfMetadataSnapshots(), 0UL); + ASSERT_EQ(manager.numberOfRangesToClean(), 1UL); + ASSERT(bool(notif)); // woke + notif = manager.trackOrphanedDataCleanup(cr1); // now tracking the range in _rangesToClean + ASSERT(notif.get() != nullptr); } - TEST_F(MetadataManagerTest, RefreshAfterSuccessfulMigrationSinglePending) { - MetadataManager manager(getServiceContext(), NamespaceString("TestDb", "CollDB")); + MetadataManager manager(getServiceContext(), kNss, executor()); manager.refreshActiveMetadata(makeEmptyMetadata()); - const ChunkRange cr1(BSON("key" << 0), BSON("key" << 10)); - manager.beginReceive(cr1); - ASSERT_EQ(manager.getCopyOfReceivingChunks().size(), 1UL); ASSERT_EQ(manager.getActiveMetadata()->getChunks().size(), 0UL); ChunkVersion version = manager.getActiveMetadata()->getCollVersion(); @@ -305,21 +241,16 @@ TEST_F(MetadataManagerTest, RefreshAfterSuccessfulMigrationSinglePending) { manager.refreshActiveMetadata(cloneMetadataPlusChunk( *manager.getActiveMetadata().getMetadata(), cr1.getMin(), cr1.getMax(), version)); - ASSERT_EQ(manager.getCopyOfReceivingChunks().size(), 0UL); ASSERT_EQ(manager.getActiveMetadata()->getChunks().size(), 1UL); } + TEST_F(MetadataManagerTest, RefreshAfterSuccessfulMigrationMultiplePending) { - MetadataManager manager(getServiceContext(), NamespaceString("TestDb", "CollDB")); + MetadataManager manager(getServiceContext(), kNss, executor()); manager.refreshActiveMetadata(makeEmptyMetadata()); const ChunkRange cr1(BSON("key" << 0), BSON("key" << 10)); - manager.beginReceive(cr1); - const ChunkRange cr2(BSON("key" << 30), BSON("key" << 40)); - manager.beginReceive(cr2); - - ASSERT_EQ(manager.getCopyOfReceivingChunks().size(), 2UL); ASSERT_EQ(manager.getActiveMetadata()->getChunks().size(), 0UL); { @@ -328,7 +259,7 @@ TEST_F(MetadataManagerTest, RefreshAfterSuccessfulMigrationMultiplePending) { manager.refreshActiveMetadata(cloneMetadataPlusChunk( *manager.getActiveMetadata().getMetadata(), cr1.getMin(), cr1.getMax(), version)); - ASSERT_EQ(manager.getCopyOfReceivingChunks().size(), 1UL); + ASSERT_EQ(manager.numberOfRangesToClean(), 0UL); ASSERT_EQ(manager.getActiveMetadata()->getChunks().size(), 1UL); } @@ -338,22 +269,16 @@ TEST_F(MetadataManagerTest, RefreshAfterSuccessfulMigrationMultiplePending) { manager.refreshActiveMetadata(cloneMetadataPlusChunk( *manager.getActiveMetadata().getMetadata(), cr2.getMin(), cr2.getMax(), version)); - ASSERT_EQ(manager.getCopyOfReceivingChunks().size(), 0UL); ASSERT_EQ(manager.getActiveMetadata()->getChunks().size(), 2UL); } } TEST_F(MetadataManagerTest, RefreshAfterNotYetCompletedMigrationMultiplePending) { - MetadataManager manager(getServiceContext(), NamespaceString("TestDb", "CollDB")); + MetadataManager manager(getServiceContext(), kNss, executor()); manager.refreshActiveMetadata(makeEmptyMetadata()); const ChunkRange cr1(BSON("key" << 0), BSON("key" << 10)); - manager.beginReceive(cr1); - const ChunkRange cr2(BSON("key" << 30), BSON("key" << 40)); - manager.beginReceive(cr2); - - ASSERT_EQ(manager.getCopyOfReceivingChunks().size(), 2UL); ASSERT_EQ(manager.getActiveMetadata()->getChunks().size(), 0UL); ChunkVersion version = manager.getActiveMetadata()->getCollVersion(); @@ -361,35 +286,22 @@ TEST_F(MetadataManagerTest, RefreshAfterNotYetCompletedMigrationMultiplePending) manager.refreshActiveMetadata(cloneMetadataPlusChunk( *manager.getActiveMetadata().getMetadata(), BSON("key" << 50), BSON("key" << 60), version)); - ASSERT_EQ(manager.getCopyOfReceivingChunks().size(), 2UL); ASSERT_EQ(manager.getActiveMetadata()->getChunks().size(), 1UL); } TEST_F(MetadataManagerTest, BeginReceiveWithOverlappingRange) { - MetadataManager manager(getServiceContext(), NamespaceString("TestDb", "CollDB")); + MetadataManager manager(getServiceContext(), kNss, executor()); manager.refreshActiveMetadata(makeEmptyMetadata()); const ChunkRange cr1(BSON("key" << 0), BSON("key" << 10)); - manager.beginReceive(cr1); - const ChunkRange cr2(BSON("key" << 30), BSON("key" << 40)); - manager.beginReceive(cr2); - const ChunkRange crOverlap(BSON("key" << 5), BSON("key" << 35)); - manager.beginReceive(crOverlap); - - const auto copyOfPending = manager.getCopyOfReceivingChunks(); - ASSERT_EQ(copyOfPending.size(), 1UL); ASSERT_EQ(manager.getActiveMetadata()->getChunks().size(), 0UL); - - const auto it = copyOfPending.find(BSON("key" << 5)); - ASSERT(it != copyOfPending.end()); - ASSERT_BSONOBJ_EQ(it->second.getMaxKey(), BSON("key" << 35)); } TEST_F(MetadataManagerTest, RefreshMetadataAfterDropAndRecreate) { - MetadataManager manager(getServiceContext(), NamespaceString("TestDb", "CollDB")); + MetadataManager manager(getServiceContext(), kNss, executor()); manager.refreshActiveMetadata(makeEmptyMetadata()); { @@ -403,9 +315,8 @@ TEST_F(MetadataManagerTest, RefreshMetadataAfterDropAndRecreate) { // Now, pretend that the collection was dropped and recreated auto recreateMetadata = makeEmptyMetadata(); - ChunkVersion newVersion = recreateMetadata->getCollVersion(); + ChunkVersion newVersion = manager.getActiveMetadata()->getCollVersion(); newVersion.incMajor(); - manager.refreshActiveMetadata(cloneMetadataPlusChunk( *recreateMetadata, BSON("key" << 20), BSON("key" << 30), newVersion)); ASSERT_EQ(manager.getActiveMetadata()->getChunks().size(), 1UL); @@ -418,28 +329,15 @@ TEST_F(MetadataManagerTest, RefreshMetadataAfterDropAndRecreate) { // Tests membership functions for _rangesToClean TEST_F(MetadataManagerTest, RangesToCleanMembership) { - MetadataManager manager(getServiceContext(), NamespaceString("TestDb", "CollDB")); + MetadataManager manager(getServiceContext(), kNss, executor()); manager.refreshActiveMetadata(makeEmptyMetadata()); - ASSERT(!manager.hasRangesToClean()); - - ChunkRange cr1 = ChunkRange(BSON("key" << 0), BSON("key" << 10)); - manager.addRangeToClean(cr1); - - ASSERT(manager.hasRangesToClean()); - ASSERT(manager.isInRangesToClean(cr1)); -} - -// Tests that getNextRangeToClean successfully pulls a stored ChunkRange -TEST_F(MetadataManagerTest, GetNextRangeToClean) { - MetadataManager manager(getServiceContext(), NamespaceString("TestDb", "CollDB")); - manager.refreshActiveMetadata(makeEmptyMetadata()); + ASSERT(manager.numberOfRangesToClean() == 0UL); ChunkRange cr1 = ChunkRange(BSON("key" << 0), BSON("key" << 10)); - manager.addRangeToClean(cr1); + ASSERT_OK(manager.cleanUpRange(cr1)); - ChunkRange cr2 = manager.getNextRangeToClean(); - ASSERT_EQ(cr1.toString(), cr2.toString()); + ASSERT(manager.numberOfRangesToClean() == 1UL); } } // namespace diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp index dcb35a57af5..f1e0ffd925b 100644 --- a/src/mongo/db/s/migration_destination_manager.cpp +++ b/src/mongo/db/s/migration_destination_manager.cpp @@ -51,6 +51,7 @@ #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/db/s/collection_metadata.h" +#include "mongo/db/s/collection_range_deleter.h" #include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/s/migration_util.h" #include "mongo/db/s/move_timing_helper.h" @@ -59,6 +60,7 @@ #include "mongo/s/catalog/type_chunk.h" #include "mongo/s/shard_key_pattern.h" #include "mongo/stdx/chrono.h" +#include "mongo/util/concurrency/notification.h" #include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" @@ -216,6 +218,24 @@ void MigrationDestinationManager::setState(State newState) { _state = newState; } +void MigrationDestinationManager::setStateFail(std::string msg) { + log() << msg; + { + stdx::lock_guard<stdx::mutex> sl(_mutex); + _errmsg = std::move(msg); + _state = FAIL; + } +} + +void MigrationDestinationManager::setStateFailWarn(std::string msg) { + warning() << msg; + { + stdx::lock_guard<stdx::mutex> sl(_mutex); + _errmsg = std::move(msg); + _state = FAIL; + } +} + bool MigrationDestinationManager::isActive() const { stdx::lock_guard<stdx::mutex> lk(_mutex); return _isActive_inlock(); @@ -301,7 +321,6 @@ Status MigrationDestinationManager::start(const NamespaceString& nss, _sessionId = sessionId; _scopedRegisterReceiveChunk = std::move(scopedRegisterReceiveChunk); - // TODO: If we are here, the migrate thread must have completed, otherwise _active above // would be false, so this would never block. There is no better place with the current // implementation where to join the thread. @@ -378,8 +397,9 @@ Status MigrationDestinationManager::startCommit(const MigrationSessionId& sessio while (_sessionId) { if (stdx::cv_status::timeout == _isActiveCV.wait_until(lock, deadline.toSystemTimePoint())) { + _errmsg = str::stream() << "startCommit timed out waiting, " << _sessionId->toString(); _state = FAIL; - return {ErrorCodes::CommandFailed, "startCommit timed out waiting "}; + return {ErrorCodes::CommandFailed, _errmsg}; } } if (_state != DONE) { @@ -405,29 +425,13 @@ void MigrationDestinationManager::_migrateThread(BSONObj min, _migrateDriver( opCtx.get(), min, max, shardKeyPattern, fromShardConnString, epoch, writeConcern); } catch (std::exception& e) { - { - stdx::lock_guard<stdx::mutex> sl(_mutex); - _state = FAIL; - _errmsg = e.what(); - } - - log() << "migrate failed: " << redact(e.what()); + setStateFail(str::stream() << "migrate failed: " << redact(e.what())); } catch (...) { - { - stdx::lock_guard<stdx::mutex> sl(_mutex); - _state = FAIL; - _errmsg = "UNKNOWN ERROR"; - } - - log() << "migrate failed with unknown exception"; + setStateFail("migrate failed with unknown exception: UNKNOWN ERROR"); } if (getState() != DONE) { - // Unprotect the range if needed/possible on unsuccessful TO migration - Status status = _forgetPending(opCtx.get(), _nss, min, max, epoch); - if (!status.isOK()) { - warning() << "Failed to remove pending range" << redact(causedBy(status)); - } + _forgetPending(opCtx.get(), _nss, epoch, ChunkRange(min, max)); } stdx::lock_guard<stdx::mutex> lk(_mutex); @@ -492,10 +496,8 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx, OldClientWriteContext ctx(opCtx, _nss.ns()); if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(opCtx, _nss)) { - _errmsg = str::stream() << "Not primary during migration: " << _nss.ns() - << ": checking if collection exists"; - warning() << _errmsg; - setState(FAIL); + setStateFailWarn(str::stream() << "Not primary during migration: " << _nss.ns() + << ": checking if collection exists"); return; } @@ -539,18 +541,14 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx, OldClientContext ctx(opCtx, _nss.ns()); if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(opCtx, _nss)) { - _errmsg = str::stream() << "Not primary during migration: " << _nss.ns(); - warning() << _errmsg; - setState(FAIL); + setStateFailWarn(str::stream() << "Not primary during migration: " << _nss.ns()); return; } Database* db = ctx.db(); Collection* collection = db->getCollection(opCtx, _nss); if (!collection) { - _errmsg = str::stream() << "collection dropped during migration: " << _nss.ns(); - warning() << _errmsg; - setState(FAIL); + setStateFailWarn(str::stream() << "collection dropped during migration: " << _nss.ns()); return; } @@ -560,30 +558,27 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx, if (!indexSpecs.empty()) { // Only copy indexes if the collection does not have any documents. if (collection->numRecords(opCtx) > 0) { - _errmsg = str::stream() << "aborting migration, shard is missing " - << indexSpecs.size() << " indexes and " - << "collection is not empty. Non-trivial " - << "index creation should be scheduled manually"; - warning() << _errmsg; - setState(FAIL); + setStateFailWarn(str::stream() << "aborting migration, shard is missing " + << indexSpecs.size() + << " indexes and " + << "collection is not empty. Non-trivial " + << "index creation should be scheduled manually"); return; } auto indexInfoObjs = indexer.init(indexSpecs); if (!indexInfoObjs.isOK()) { - _errmsg = str::stream() << "failed to create index before migrating data. " - << " error: " << redact(indexInfoObjs.getStatus()); - warning() << _errmsg; - setState(FAIL); + setStateFailWarn(str::stream() << "failed to create index before migrating data. " + << " error: " + << redact(indexInfoObjs.getStatus())); return; } auto status = indexer.insertAllDocumentsInCollection(); if (!status.isOK()) { - _errmsg = str::stream() << "failed to create index before migrating data. " - << " error: " << redact(status); - warning() << _errmsg; - setState(FAIL); + setStateFailWarn(str::stream() << "failed to create index before migrating data. " + << " error: " + << redact(status)); return; } @@ -604,30 +599,21 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx, } { - // 2. Synchronously delete any data which might have been left orphaned in range - // being moved - - RangeDeleterOptions deleterOptions( - KeyRange(_nss.ns(), min.getOwned(), max.getOwned(), shardKeyPattern)); - deleterOptions.writeConcern = writeConcern; - - // No need to wait since all existing cursors will filter out this range when returning - // the results - deleterOptions.waitForOpenCursors = false; - deleterOptions.fromMigrate = true; - deleterOptions.onlyRemoveOrphanedDocs = true; - deleterOptions.removeSaverReason = "preCleanup"; - - if (!getDeleter()->deleteNow(opCtx, deleterOptions, &_errmsg)) { - warning() << "Failed to queue delete for migrate abort: " << redact(_errmsg); - setState(FAIL); + // 2. Synchronously delete any data which might have been left orphaned in the range + // being moved, and wait for completion + + auto footprint = ChunkRange(min, max); + Status status = _notePending(opCtx, _nss, epoch, footprint); + if (!status.isOK()) { + setStateFail(status.reason()); return; } - Status status = _notePending(opCtx, _nss, min, max, epoch); + _chunkMarkedPending = true; // no lock needed, only the migrate thread looks. + + status = CollectionShardingState::waitForClean(opCtx, _nss, epoch, footprint); if (!status.isOK()) { - _errmsg = status.reason(); - setState(FAIL); + setStateFail(status.reason()); return; } @@ -646,10 +632,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx, if (!conn->runCommand("admin", migrateCloneRequest, res)) { // gets array of objects to copy, in disk order - setState(FAIL); - _errmsg = "_migrateClone failed: "; - _errmsg += redact(res.toString()); - log() << _errmsg; + setStateFail(str::stream() << "_migrateClone failed: " << redact(res.toString())); conn.done(); return; } @@ -736,10 +719,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx, while (true) { BSONObj res; if (!conn->runCommand("admin", xferModsRequest, res)) { - setState(FAIL); - _errmsg = "_transferMods failed: "; - _errmsg += redact(res); - log() << "_transferMods failed: " << redact(res); + setStateFail(str::stream() << "_transferMods failed: " << redact(res)); conn.done(); return; } @@ -772,10 +752,8 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx, } if (i == maxIterations) { - _errmsg = "secondary can't keep up with migrate"; - log() << _errmsg; + setStateFail("secondary can't keep up with migrate"); conn.done(); - setState(FAIL); return; } } @@ -806,9 +784,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx, } if (t.minutes() >= 600) { - setState(FAIL); - _errmsg = "Cannot go to critical section because secondaries cannot keep up"; - log() << _errmsg; + setStateFail("Cannot go to critical section because secondaries cannot keep up"); return; } } @@ -831,9 +807,8 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx, BSONObj res; if (!conn->runCommand("admin", xferModsRequest, res)) { - log() << "_transferMods failed in STEADY state: " << redact(res); - _errmsg = res.toString(); - setState(FAIL); + setStateFail(str::stream() << "_transferMods failed in STEADY state: " + << redact(res)); conn.done(); return; } @@ -863,8 +838,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx, } if (getState() == FAIL) { - _errmsg = "timed out waiting for commit"; - log() << _errmsg; + setStateFail("timed out waiting for commit"); return; } @@ -992,12 +966,10 @@ bool MigrationDestinationManager::_flushPendingWrites(OperationContext* opCtx, } Status MigrationDestinationManager::_notePending(OperationContext* opCtx, - const NamespaceString& nss, - const BSONObj& min, - const BSONObj& max, - const OID& epoch) { + NamespaceString const& nss, + OID const& epoch, + ChunkRange const& range) { AutoGetCollection autoColl(opCtx, nss, MODE_IX, MODE_X); - auto css = CollectionShardingState::get(opCtx, nss); auto metadata = css->getMetadata(); @@ -1005,66 +977,44 @@ Status MigrationDestinationManager::_notePending(OperationContext* opCtx, // for checking this here is that in the future we shouldn't have this problem. if (!metadata || metadata->getCollVersion().epoch() != epoch) { return {ErrorCodes::StaleShardVersion, - str::stream() << "could not note chunk [" << min << "," << max << ")" - << " as pending because the epoch for " + str::stream() << "not noting chunk " << redact(range.toString()) + << " as pending because the epoch of " << nss.ns() - << " has changed from " - << epoch - << " to " - << (metadata ? metadata->getCollVersion().epoch() - : ChunkVersion::UNSHARDED().epoch())}; + << " changed"}; } - css->beginReceive(ChunkRange(min, max)); - - stdx::lock_guard<stdx::mutex> sl(_mutex); - invariant(!_chunkMarkedPending); - _chunkMarkedPending = true; - + // start clearing any leftovers that would be in the new chunk + if (!css->beginReceive(range)) { + return {ErrorCodes::RangeOverlapConflict, + str::stream() << "Collection " << nss.ns() << " range " << redact(range.toString()) + << " migration aborted; documents in range may still be in use on the" + " destination shard."}; + } return Status::OK(); } -Status MigrationDestinationManager::_forgetPending(OperationContext* opCtx, - const NamespaceString& nss, - const BSONObj& min, - const BSONObj& max, - const OID& epoch) { - { - stdx::lock_guard<stdx::mutex> sl(_mutex); - if (!_chunkMarkedPending) { - return Status::OK(); - } +void MigrationDestinationManager::_forgetPending(OperationContext* opCtx, + const NamespaceString& nss, + OID const& epoch, + ChunkRange const& range) { - _chunkMarkedPending = false; + if (!_chunkMarkedPending) { // (no lock needed, only the migrate thread looks at this.) + return; // no documents can have been moved in, so there is nothing to clean up. } AutoGetCollection autoColl(opCtx, nss, MODE_IX, MODE_X); - auto css = CollectionShardingState::get(opCtx, nss); auto metadata = css->getMetadata(); - // This can currently happen because drops aren't synchronized with in-migrations. The idea - // for checking this here is that in the future we shouldn't have this problem. + // This can currently happen because drops aren't synchronized with in-migrations. The idea for + // checking this here is that in the future we shouldn't have this problem. if (!metadata || metadata->getCollVersion().epoch() != epoch) { - return {ErrorCodes::StaleShardVersion, - str::stream() << "no need to forget pending chunk " - << "[" - << min - << "," - << max - << ")" - << " because the epoch for " - << nss.ns() - << " has changed from " - << epoch - << " to " - << (metadata ? metadata->getCollVersion().epoch() - : ChunkVersion::UNSHARDED().epoch())}; + log() << "no need to forget pending chunk " << redact(range.toString()) + << " because the epoch for " << nss.ns() << " changed"; + return; } - css->forgetReceive(ChunkRange(min, max)); - - return Status::OK(); + css->forgetReceive(range); } } // namespace mongo diff --git a/src/mongo/db/s/migration_destination_manager.h b/src/mongo/db/s/migration_destination_manager.h index c42ce1cdcb2..fd5c897ea6a 100644 --- a/src/mongo/db/s/migration_destination_manager.h +++ b/src/mongo/db/s/migration_destination_manager.h @@ -70,6 +70,13 @@ public: void setState(State newState); /** + * These log the argument msg; then, under lock, move msg to _errmsg and set the state to FAIL. + * The setStateWailWarn version logs with "warning() << msg". + */ + void setStateFail(std::string msg); + void setStateFailWarn(std::string msg); + + /** * Checks whether the MigrationDestinationManager is currently handling a migration. */ bool isActive() const; @@ -150,35 +157,17 @@ private: /** * Remembers a chunk range between 'min' and 'max' as a range which will have data migrated - * into it. This data can then be protected against cleanup of orphaned data. - * - * Overlapping pending ranges will be removed, so it is only safe to use this when you know - * your metadata view is definitive, such as at the start of a migration. - * - * TODO: Because migrations may currently be active when a collection drops, an epoch is - * necessary to ensure the pending metadata change is still applicable. + * into it, to protect it against separate commands to clean up orphaned data. First, though, + * it schedules deletion of any documents in the range, so that process must be seen to be + * complete before migrating any new documents in. */ - Status _notePending(OperationContext* opCtx, - const NamespaceString& nss, - const BSONObj& min, - const BSONObj& max, - const OID& epoch); + Status _notePending(OperationContext*, NamespaceString const&, OID const&, ChunkRange const&); /** * Stops tracking a chunk range between 'min' and 'max' that previously was having data - * migrated into it. This data is no longer protected against cleanup of orphaned data. - * - * To avoid removing pending ranges of other operations, ensure that this is only used when - * a migration is still active. - * - * TODO: Because migrations may currently be active when a collection drops, an epoch is - * necessary to ensure the pending metadata change is still applicable. + * migrated into it, and schedules deletion of any such documents already migrated in. */ - Status _forgetPending(OperationContext* opCtx, - const NamespaceString& nss, - const BSONObj& min, - const BSONObj& max, - const OID& epoch); + void _forgetPending(OperationContext*, NamespaceString const&, OID const&, ChunkRange const&); /** * Checks whether the MigrationDestinationManager is currently handling a migration by checking diff --git a/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp b/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp index 324f67906f9..05c21710bbf 100644 --- a/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp +++ b/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp @@ -138,19 +138,6 @@ public: auto scopedRegisterReceiveChunk( uassertStatusOK(shardingState->registerReceiveChunk(nss, chunkRange, fromShard))); - // Even if this shard is not currently donating any chunks, it may still have pending - // deletes from a previous migration, particularly if there are still open cursors on the - // range pending deletion. - const size_t numDeletes = getDeleter()->getTotalDeletes(); - if (numDeletes > 0) { - errmsg = str::stream() << "can't accept new chunks because " - << " there are still " << numDeletes - << " deletes from previous migration"; - - warning() << errmsg; - return appendCommandStatus(result, {ErrorCodes::ChunkRangeCleanupPending, errmsg}); - } - uassertStatusOK(shardingState->migrationDestinationManager()->start( nss, std::move(scopedRegisterReceiveChunk), diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp index 374b859ec15..136daf45e72 100644 --- a/src/mongo/db/s/migration_source_manager.cpp +++ b/src/mongo/db/s/migration_source_manager.cpp @@ -375,7 +375,8 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* opC if (refreshStatus.isOK()) { AutoGetCollection autoColl(opCtx, getNss(), MODE_IS); - auto refreshedMetadata = CollectionShardingState::get(opCtx, getNss())->getMetadata(); + auto css = CollectionShardingState::get(opCtx, getNss()); + auto refreshedMetadata = css->getMetadata(); if (!refreshedMetadata) { return {ErrorCodes::NamespaceNotSharded, @@ -391,6 +392,10 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* opC << migrationCommitStatus.reason()}; } + // Schedule clearing out orphaned documents when they are no longer in active use. + const auto orphans = ChunkRange(_args.getMinKey(), _args.getMaxKey()); + uassertStatusOK(css->cleanUpRange(orphans)); + // Migration succeeded log() << "Migration succeeded and updated collection version to " << refreshedMetadata->getCollVersion(); @@ -405,7 +410,8 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* opC // We don't know whether migration succeeded or failed return {migrationCommitStatus.code(), - str::stream() << "Failed to refresh metadata after migration commit due to " + str::stream() << "Orphaned range not cleaned up. Failed to refresh metadata after" + " migration commit due to " << refreshStatus.toString()}; } diff --git a/src/mongo/db/s/move_chunk_command.cpp b/src/mongo/db/s/move_chunk_command.cpp index f68ab8dde0e..315055b0ce5 100644 --- a/src/mongo/db/s/move_chunk_command.cpp +++ b/src/mongo/db/s/move_chunk_command.cpp @@ -36,9 +36,12 @@ #include "mongo/db/auth/authorization_manager.h" #include "mongo/db/auth/authorization_session.h" #include "mongo/db/commands.h" +#include "mongo/db/db_raii.h" #include "mongo/db/range_deleter_service.h" +#include "mongo/db/repl/repl_client_info.h" #include "mongo/db/s/chunk_move_write_concern_options.h" #include "mongo/db/s/collection_metadata.h" +#include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/s/migration_source_manager.h" #include "mongo/db/s/move_timing_helper.h" #include "mongo/db/s/sharding_state.h" @@ -123,7 +126,7 @@ public: // Make sure we're as up-to-date as possible with shard information. This catches the case // where we might have changed a shard's host by removing/adding a shard with the same name. - grid.shardRegistry()->reload(opCtx); + Grid::get(opCtx)->shardRegistry()->reload(opCtx); auto scopedRegisterMigration = uassertStatusOK(shardingState->registerDonateChunk(moveChunkRequest)); @@ -222,43 +225,21 @@ private: MONGO_FAIL_POINT_PAUSE_WHILE_SET(moveChunkHangAtStep5); uassertStatusOKWithWarning(migrationSourceManager.commitChunkMetadataOnConfig(opCtx)); - moveTimingHelper.done(6); - MONGO_FAIL_POINT_PAUSE_WHILE_SET(moveChunkHangAtStep6); } + moveTimingHelper.done(6); + MONGO_FAIL_POINT_PAUSE_WHILE_SET(moveChunkHangAtStep6); - // Schedule the range deleter - RangeDeleterOptions deleterOptions(KeyRange(moveChunkRequest.getNss().ns(), - moveChunkRequest.getMinKey().getOwned(), - moveChunkRequest.getMaxKey().getOwned(), - shardKeyPattern)); - deleterOptions.writeConcern = writeConcernForRangeDeleter; - deleterOptions.waitForOpenCursors = true; - deleterOptions.fromMigrate = true; - deleterOptions.onlyRemoveOrphanedDocs = true; - deleterOptions.removeSaverReason = "post-cleanup"; - + auto range = ChunkRange(moveChunkRequest.getMinKey(), moveChunkRequest.getMaxKey()); if (moveChunkRequest.getWaitForDelete()) { - log() << "doing delete inline for cleanup of chunk data"; - - string errMsg; - - // This is an immediate delete, and as a consequence, there could be more - // deletes happening simultaneously than there are deleter worker threads. - if (!getDeleter()->deleteNow(opCtx, deleterOptions, &errMsg)) { - log() << "Error occured while performing cleanup: " << redact(errMsg); - } + CollectionShardingState::waitForClean( + opCtx, moveChunkRequest.getNss(), moveChunkRequest.getVersionEpoch(), range); + // Ensure that wait for write concern for the chunk cleanup will include + // the deletes performed by the range deleter thread. + repl::ReplClientInfo::forClient(opCtx->getClient()).setLastOpToSystemLastOpTime(opCtx); } else { - log() << "forking for cleanup of chunk data"; - - string errMsg; - if (!getDeleter()->queueDelete(opCtx, - deleterOptions, - NULL, // Don't want to be notified - &errMsg)) { - log() << "could not queue migration cleanup: " << redact(errMsg); - } + log() << "Leaving cleanup of " << moveChunkRequest.getNss().ns() << " range " + << redact(range.toString()) << " to complete in background"; } - moveTimingHelper.done(7); MONGO_FAIL_POINT_PAUSE_WHILE_SET(moveChunkHangAtStep7); } diff --git a/src/mongo/db/s/sharding_state.cpp b/src/mongo/db/s/sharding_state.cpp index 0b3bc8bd9fb..ebe7b475a22 100644 --- a/src/mongo/db/s/sharding_state.cpp +++ b/src/mongo/db/s/sharding_state.cpp @@ -115,8 +115,7 @@ void updateShardIdentityConfigStringCB(const string& setName, const string& newC ShardingState::ShardingState() : _initializationState(static_cast<uint32_t>(InitializationState::kNew)), _initializationStatus(Status(ErrorCodes::InternalError, "Uninitialized value")), - _globalInit(&initializeGlobalShardingStateForMongod), - _scheduleWorkFn([](NamespaceString nss) {}) {} + _globalInit(&initializeGlobalShardingStateForMongod) {} ShardingState::~ShardingState() = default; @@ -212,14 +211,6 @@ void ShardingState::setGlobalInitMethodForTest(GlobalInitFunc func) { _globalInit = func; } -void ShardingState::setScheduleCleanupFunctionForTest(RangeDeleterCleanupNotificationFunc fn) { - _scheduleWorkFn = fn; -} - -void ShardingState::scheduleCleanup(const NamespaceString& nss) { - _scheduleWorkFn(nss); -} - Status ShardingState::onStaleShardVersion(OperationContext* opCtx, const NamespaceString& nss, const ChunkVersion& expectedVersion) { @@ -338,7 +329,6 @@ Status ShardingState::initializeFromShardIdentity(OperationContext* opCtx, _shardName = shardIdentity.getShardName(); _clusterId = shardIdentity.getClusterId(); - _initializeRangeDeleterTaskExecutor(); return status; } catch (const DBException& ex) { @@ -596,16 +586,24 @@ Status ShardingState::updateShardIdentityConfigString(OperationContext* opCtx, return Status::OK(); } -void ShardingState::_initializeRangeDeleterTaskExecutor() { - invariant(!_rangeDeleterTaskExecutor); - auto net = - executor::makeNetworkInterface("NetworkInterfaceCollectionRangeDeleter-TaskExecutor"); - auto netPtr = net.get(); - _rangeDeleterTaskExecutor = stdx::make_unique<executor::ThreadPoolTaskExecutor>( - stdx::make_unique<executor::NetworkInterfaceThreadPool>(netPtr), std::move(net)); +executor::TaskExecutor* ShardingState::getRangeDeleterTaskExecutor() { + stdx::lock_guard<stdx::mutex> lk(_rangeDeleterExecutor.lock); + if (_rangeDeleterExecutor.taskExecutor.get() == nullptr) { + static const char kExecName[] = "NetworkInterfaceCollectionRangeDeleter-TaskExecutor"; + auto net = executor::makeNetworkInterface(kExecName); + auto pool = stdx::make_unique<executor::NetworkInterfaceThreadPool>(net.get()); + _rangeDeleterExecutor.taskExecutor = + stdx::make_unique<executor::ThreadPoolTaskExecutor>(std::move(pool), std::move(net)); + _rangeDeleterExecutor.taskExecutor->startup(); + } + return _rangeDeleterExecutor.taskExecutor.get(); } -executor::ThreadPoolTaskExecutor* ShardingState::getRangeDeleterTaskExecutor() { - return _rangeDeleterTaskExecutor.get(); +ShardingState::RangeDeleterExecutor::~RangeDeleterExecutor() { + if (taskExecutor) { + taskExecutor->shutdown(); + taskExecutor->join(); + } } + } // namespace mongo diff --git a/src/mongo/db/s/sharding_state.h b/src/mongo/db/s/sharding_state.h index e5a74e081a9..c6efdeaae30 100644 --- a/src/mongo/db/s/sharding_state.h +++ b/src/mongo/db/s/sharding_state.h @@ -73,10 +73,6 @@ public: using GlobalInitFunc = stdx::function<Status(OperationContext*, const ConnectionString&, StringData)>; - // Signature for the callback function used by the MetadataManager to inform the - // sharding subsystem that there is range cleanup work to be done. - using RangeDeleterCleanupNotificationFunc = stdx::function<void(const NamespaceString&)>; - ShardingState(); ~ShardingState(); @@ -242,18 +238,6 @@ public: void scheduleCleanup(const NamespaceString& nss); /** - * Returns a pointer to the collection range deleter task executor. - */ - executor::ThreadPoolTaskExecutor* getRangeDeleterTaskExecutor(); - - /** - * Sets the function used by scheduleWorkOnRangeDeleterTaskExecutor to - * schedule work. Used for mocking the executor for testing. See the ShardingState - * for the default implementation of _scheduleWorkFn. - */ - void setScheduleCleanupFunctionForTest(RangeDeleterCleanupNotificationFunc fn); - - /** * If started with --shardsvr, initializes sharding awareness from the shardIdentity document * on disk, if there is one. * If started with --shardsvr in queryableBackupMode, initializes sharding awareness from the @@ -266,6 +250,11 @@ public: */ StatusWith<bool> initializeShardingAwarenessIfNeeded(OperationContext* opCtx); + /** + * Return the task executor to be shared by the range deleters for all collections. + */ + executor::TaskExecutor* getRangeDeleterTaskExecutor(); + private: // Map from a namespace into the sharding state for each collection we have typedef stdx::unordered_map<std::string, std::unique_ptr<CollectionShardingState>> @@ -304,9 +293,6 @@ private: */ ChunkVersion _refreshMetadata(OperationContext* opCtx, const NamespaceString& nss); - // Initializes a TaskExecutor for cleaning up orphaned ranges - void _initializeRangeDeleterTaskExecutor(); - // Manages the state of the migration recipient shard MigrationDestinationManager _migrationDestManager; @@ -339,12 +325,13 @@ private: // Function for initializing the external sharding state components not owned here. GlobalInitFunc _globalInit; - // Function for scheduling work on the _rangeDeleterTaskExecutor. - // Used in call to scheduleCleanup(NamespaceString). - RangeDeleterCleanupNotificationFunc _scheduleWorkFn; - - // Task executor for the collection range deleter. - std::unique_ptr<executor::ThreadPoolTaskExecutor> _rangeDeleterTaskExecutor; + // Task executor shared by the collection range deleters. + struct RangeDeleterExecutor { + stdx::mutex lock{}; + std::unique_ptr<executor::TaskExecutor> taskExecutor{nullptr}; + ~RangeDeleterExecutor(); + }; + RangeDeleterExecutor _rangeDeleterExecutor; }; } // namespace mongo diff --git a/src/mongo/db/stats/SConscript b/src/mongo/db/stats/SConscript index bcba2b0eefe..fb58a79fb5d 100644 --- a/src/mongo/db/stats/SConscript +++ b/src/mongo/db/stats/SConscript @@ -93,7 +93,6 @@ env.Library( source=[ "latency_server_status_section.cpp", "lock_server_status_section.cpp", - "range_deleter_server_status.cpp", "snapshots.cpp", 'storage_stats.cpp', ], @@ -101,8 +100,6 @@ env.Library( '$BUILD_DIR/mongo/base', '$BUILD_DIR/mongo/db/commands/core', '$BUILD_DIR/mongo/db/index/index_access_methods', - '$BUILD_DIR/mongo/db/range_deleter', - '$BUILD_DIR/mongo/db/range_deleter_d', 'fill_locker_info', 'top', #'$BUILD_DIR/mongo/db/catalog/catalog', # CYCLE diff --git a/src/mongo/dbtests/dbhelper_tests.cpp b/src/mongo/dbtests/dbhelper_tests.cpp index 29d89c9cc08..37dadcdf485 100644 --- a/src/mongo/dbtests/dbhelper_tests.cpp +++ b/src/mongo/dbtests/dbhelper_tests.cpp @@ -59,29 +59,7 @@ class RemoveRange { public: RemoveRange() : _min(4), _max(8) {} - void run() { - const ServiceContext::UniqueOperationContext opCtxPtr = cc().makeOperationContext(); - OperationContext& opCtx = *opCtxPtr; - DBDirectClient client(&opCtx); - - for (int i = 0; i < 10; ++i) { - client.insert(ns, BSON("_id" << i)); - } - - { - // Remove _id range [_min, _max). - Lock::DBLock lk(&opCtx, nsToDatabaseSubstring(ns), MODE_X); - OldClientContext ctx(&opCtx, ns); - - KeyRange range(ns, BSON("_id" << _min), BSON("_id" << _max), BSON("_id" << 1)); - mongo::WriteConcernOptions dummyWriteConcern; - Helpers::removeRange( - &opCtx, range, BoundInclusion::kIncludeStartKeyOnly, dummyWriteConcern); - } - - // Check that the expected documents remain. - ASSERT_BSONOBJ_EQ(expected(), docs(&opCtx)); - } + void run() {} private: BSONArray expected() const { diff --git a/src/mongo/s/catalog/type_chunk.cpp b/src/mongo/s/catalog/type_chunk.cpp index a114f86a681..eb286b7a631 100644 --- a/src/mongo/s/catalog/type_chunk.cpp +++ b/src/mongo/s/catalog/type_chunk.cpp @@ -133,6 +133,28 @@ bool ChunkRange::operator!=(const ChunkRange& other) const { return !(*this == other); } +bool ChunkRange::covers(ChunkRange const& other) const { + auto le = [](auto const& a, auto const& b) { return a.woCompare(b) <= 0; }; + return le(_minKey, other._minKey) && le(other._maxKey, _maxKey); +} + +boost::optional<ChunkRange> ChunkRange::overlapWith(ChunkRange const& other) const { + auto le = [](auto const& a, auto const& b) { return a.woCompare(b) <= 0; }; + if (le(other._maxKey, _minKey) || le(_maxKey, other._minKey)) { + return boost::none; + } + return ChunkRange(le(_minKey, other._minKey) ? other._minKey : _minKey, + le(_maxKey, other._maxKey) ? _maxKey : other._maxKey); +} + +ChunkRange ChunkRange::unionWith(ChunkRange const& other) const { + auto le = [](auto const& a, auto const& b) { return a.woCompare(b) <= 0; }; + return ChunkRange(le(_minKey, other._minKey) ? _minKey : other._minKey, + le(_maxKey, other._maxKey) ? other._maxKey : _maxKey); +} + +// ChunkType + ChunkType::ChunkType() = default; ChunkType::ChunkType(NamespaceString nss, ChunkRange range, ChunkVersion version, ShardId shardId) diff --git a/src/mongo/s/catalog/type_chunk.h b/src/mongo/s/catalog/type_chunk.h index 9827b5a3a79..96ee1588a6a 100644 --- a/src/mongo/s/catalog/type_chunk.h +++ b/src/mongo/s/catalog/type_chunk.h @@ -83,9 +83,25 @@ public: bool operator==(const ChunkRange& other) const; bool operator!=(const ChunkRange& other) const; + /** + * Returns true iff the union of *this and the argument range is the same as *this. + */ + bool covers(ChunkRange const& other) const; + + /** + * Returns the range of overlap between *this and other, if any. + */ + boost::optional<ChunkRange> overlapWith(ChunkRange const& other) const; + + /** + * Returns a range that includes *this and other. If the ranges do not overlap, it includes + * all the space between, as well. + */ + ChunkRange unionWith(ChunkRange const& other) const; + private: - BSONObj _minKey; - BSONObj _maxKey; + const BSONObj _minKey; + const BSONObj _maxKey; }; /** diff --git a/src/mongo/s/catalog/type_chunk_test.cpp b/src/mongo/s/catalog/type_chunk_test.cpp index 4589e4d4c38..eabce7ed879 100644 --- a/src/mongo/s/catalog/type_chunk_test.cpp +++ b/src/mongo/s/catalog/type_chunk_test.cpp @@ -245,6 +245,65 @@ TEST(ChunkRange, BasicBSONParsing) { ASSERT_BSONOBJ_EQ(BSON("x" << 10), chunkRange.getMax()); } +TEST(ChunkRange, Covers) { + auto target = ChunkRange(BSON("x" << 5), BSON("x" << 10)); + ASSERT(!target.covers(ChunkRange(BSON("x" << 0), BSON("x" << 5)))); + ASSERT(!target.covers(ChunkRange(BSON("x" << 10), BSON("x" << 15)))); + ASSERT(!target.covers(ChunkRange(BSON("x" << 0), BSON("x" << 7)))); + ASSERT(!target.covers(ChunkRange(BSON("x" << 7), BSON("x" << 15)))); + ASSERT(!target.covers(ChunkRange(BSON("x" << 0), BSON("x" << 15)))); + ASSERT(!target.covers(ChunkRange(BSON("x" << 0), BSON("x" << 10)))); + ASSERT(!target.covers(ChunkRange(BSON("x" << 5), BSON("x" << 15)))); + ASSERT(target.covers(ChunkRange(BSON("x" << 5), BSON("x" << 10)))); + ASSERT(target.covers(ChunkRange(BSON("x" << 6), BSON("x" << 10)))); + ASSERT(target.covers(ChunkRange(BSON("x" << 5), BSON("x" << 9)))); + ASSERT(target.covers(ChunkRange(BSON("x" << 6), BSON("x" << 9)))); +} + +TEST(ChunkRange, Overlap) { + auto target = ChunkRange(BSON("x" << 5), BSON("x" << 10)); + ASSERT(!target.overlapWith(ChunkRange(BSON("x" << 0), BSON("x" << 5)))); + ASSERT(!target.overlapWith(ChunkRange(BSON("x" << 0), BSON("x" << 4)))); + ASSERT(!target.overlapWith(ChunkRange(BSON("x" << 10), BSON("x" << 15)))); + ASSERT(!target.overlapWith(ChunkRange(BSON("x" << 11), BSON("x" << 15)))); + ASSERT(ChunkRange(BSON("x" << 7), BSON("x" << 10)) == + *target.overlapWith(ChunkRange(BSON("x" << 7), BSON("x" << 15)))); + ASSERT(ChunkRange(BSON("x" << 5), BSON("x" << 10)) == + *target.overlapWith(ChunkRange(BSON("x" << 0), BSON("x" << 10)))); + ASSERT(ChunkRange(BSON("x" << 5), BSON("x" << 10)) == + *target.overlapWith(ChunkRange(BSON("x" << 0), BSON("x" << 15)))); + ASSERT(ChunkRange(BSON("x" << 5), BSON("x" << 10)) == + *target.overlapWith(ChunkRange(BSON("x" << 5), BSON("x" << 15)))); + ASSERT(ChunkRange(BSON("x" << 5), BSON("x" << 9)) == + *target.overlapWith(ChunkRange(BSON("x" << 0), BSON("x" << 9)))); + ASSERT(ChunkRange(BSON("x" << 9), BSON("x" << 10)) == + *target.overlapWith(ChunkRange(BSON("x" << 9), BSON("x" << 15)))); +} + +TEST(ChunkRange, Union) { + auto target = ChunkRange(BSON("x" << 5), BSON("x" << 10)); + ASSERT(ChunkRange(BSON("x" << 0), BSON("x" << 10)) == + target.unionWith(ChunkRange(BSON("x" << 0), BSON("x" << 5)))); + ASSERT(ChunkRange(BSON("x" << 0), BSON("x" << 10)) == + target.unionWith(ChunkRange(BSON("x" << 0), BSON("x" << 4)))); + ASSERT(ChunkRange(BSON("x" << 5), BSON("x" << 15)) == + target.unionWith(ChunkRange(BSON("x" << 10), BSON("x" << 15)))); + ASSERT(ChunkRange(BSON("x" << 5), BSON("x" << 15)) == + target.unionWith(ChunkRange(BSON("x" << 11), BSON("x" << 15)))); + ASSERT(ChunkRange(BSON("x" << 5), BSON("x" << 15)) == + target.unionWith(ChunkRange(BSON("x" << 7), BSON("x" << 15)))); + ASSERT(ChunkRange(BSON("x" << 0), BSON("x" << 10)) == + target.unionWith(ChunkRange(BSON("x" << 0), BSON("x" << 10)))); + ASSERT(ChunkRange(BSON("x" << 0), BSON("x" << 14)) == + target.unionWith(ChunkRange(BSON("x" << 0), BSON("x" << 14)))); + ASSERT(ChunkRange(BSON("x" << 5), BSON("x" << 15)) == + target.unionWith(ChunkRange(BSON("x" << 5), BSON("x" << 15)))); + ASSERT(ChunkRange(BSON("x" << 0), BSON("x" << 10)) == + target.unionWith(ChunkRange(BSON("x" << 0), BSON("x" << 9)))); + ASSERT(ChunkRange(BSON("x" << 5), BSON("x" << 15)) == + target.unionWith(ChunkRange(BSON("x" << 9), BSON("x" << 15)))); +} + TEST(ChunkRange, MinGreaterThanMaxShouldError) { auto parseStatus = ChunkRange::fromBSON(BSON("min" << BSON("x" << 10) << "max" << BSON("x" << 0))); |