diff options
25 files changed, 1255 insertions, 1310 deletions
@@ -1,8 +1,8 @@ # Print the full stack trace on python exceptions to aid debugging -set python print-stack full +# set python print-stack full # Load the mongodb pretty printers -source buildscripts/gdb/mongo.py +# source buildscripts/gdb/mongo.py # Load the mongodb lock analysis source buildscripts/gdb/mongo_lock.py diff --git a/src/mongo/db/dbhelpers.cpp b/src/mongo/db/dbhelpers.cpp index c247d31a6d1..4dfb73e0140 100644 --- a/src/mongo/db/dbhelpers.cpp +++ b/src/mongo/db/dbhelpers.cpp @@ -427,12 +427,12 @@ long long Helpers::removeRange(OperationContext* opCtx, bool docIsOrphan; // In write lock, so will be the most up-to-date version - auto metadataNow = CollectionShardingState::get(opCtx, nss.ns())->getMetadata(); + auto css = CollectionShardingState::get(opCtx, nss.ns()); + auto metadataNow = css->getMetadata(); if (metadataNow) { ShardKeyPattern kp(metadataNow->getKeyPattern()); BSONObj key = kp.extractShardKeyFromDoc(obj); - docIsOrphan = - !metadataNow->keyBelongsToMe(key) && !metadataNow->keyIsPending(key); + docIsOrphan = !metadataNow->keyBelongsToMe(key) && !css->keyIsPending(key); } else { docIsOrphan = false; } diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index a91265b6ee3..2bcde771c10 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -228,22 +228,10 @@ env.CppUnitTest( ) env.CppUnitTest( - target='sharding_metadata_test', - source=[ - 'collection_metadata_test.cpp', - 'shard_metadata_util_test.cpp', - ], - LIBDEPS=[ - 'metadata', - '$BUILD_DIR/mongo/s/shard_server_test_fixture', - ], -) - -env.CppUnitTest( target='shard_test', source=[ + 'shard_metadata_util_test.cpp', 'active_migrations_registry_test.cpp', - 'metadata_manager_test.cpp', 'migration_chunk_cloner_source_legacy_test.cpp', 'sharding_state_test.cpp', ], @@ -253,14 +241,17 @@ env.CppUnitTest( '$BUILD_DIR/mongo/s/catalog/sharding_catalog_client_impl', '$BUILD_DIR/mongo/s/catalog/sharding_catalog_mock', '$BUILD_DIR/mongo/s/sharding_mongod_test_fixture', + '$BUILD_DIR/mongo/s/shard_server_test_fixture', ], ) env.CppUnitTest( target='collection_sharding_state_test', source=[ - 'collection_sharding_state_test.cpp', + 'collection_metadata_test.cpp', 'collection_range_deleter_test.cpp', + 'metadata_manager_test.cpp', + 'collection_sharding_state_test.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/client/remote_command_targeter_mock', @@ -273,6 +264,7 @@ env.CppUnitTest( '$BUILD_DIR/mongo/util/clock_source_mock', '$BUILD_DIR/mongo/util/net/message_port_mock', '$BUILD_DIR/mongo/db/service_context_d_test_fixture', + '$BUILD_DIR/mongo/s/sharding_mongod_test_fixture', ], ) diff --git a/src/mongo/db/s/cleanup_orphaned_cmd.cpp b/src/mongo/db/s/cleanup_orphaned_cmd.cpp index d0e58779a3b..bb0d58273f3 100644 --- a/src/mongo/db/s/cleanup_orphaned_cmd.cpp +++ b/src/mongo/db/s/cleanup_orphaned_cmd.cpp @@ -78,65 +78,59 @@ CleanupResult cleanupOrphanedData(OperationContext* opCtx, const WriteConcernOptions& secondaryThrottle, BSONObj* stoppedAtKey, string* errMsg) { - BSONObj startingFromKey = startingFromKeyConst; - ScopedCollectionMetadata metadata; + BSONObj startingFromKey = startingFromKeyConst; + boost::optional<ChunkRange> targetRange; { AutoGetCollection autoColl(opCtx, ns, MODE_IS); - metadata = CollectionShardingState::get(opCtx, ns.toString())->getMetadata(); - } + auto css = CollectionShardingState::get(opCtx, ns.toString()); + auto metadata = css->getMetadata(); - if (!metadata) { - warning() << "skipping orphaned data cleanup for " << ns.toString() + if (!metadata) { + log() << "skipping orphaned data cleanup for " << ns.toString() << ", collection is not sharded"; - return CleanupResult_Done; - } + return CleanupResult_Done; + } - BSONObj keyPattern = metadata->getKeyPattern(); - if (!startingFromKey.isEmpty()) { - if (!metadata->isValidKey(startingFromKey)) { - *errMsg = stream() << "could not cleanup orphaned data, start key " - << redact(startingFromKey) << " does not match shard key pattern " - << keyPattern; + BSONObj keyPattern = metadata->getKeyPattern(); + if (!startingFromKey.isEmpty()) { + if (!metadata->isValidKey(startingFromKey)) { + *errMsg = stream() << "could not cleanup orphaned data, start key " + << redact(startingFromKey) + << " does not match shard key pattern " << keyPattern; - warning() << *errMsg; - return CleanupResult_Error; + log() << *errMsg; + return CleanupResult_Error; + } + } else { + startingFromKey = metadata->getMinKey(); } - } else { - startingFromKey = metadata->getMinKey(); - } - KeyRange orphanRange; - if (!metadata->getNextOrphanRange(startingFromKey, &orphanRange)) { - LOG(1) << "cleanupOrphaned requested for " << ns.toString() << " starting from " - << redact(startingFromKey) << ", no orphan ranges remain"; + KeyRange orphanRange; + if (!metadata->getNextOrphanRange(css->getReceiveMap(), startingFromKey, &orphanRange)) { + LOG(1) << "cleanupOrphaned requested for " << ns.toString() << " starting from " + << redact(startingFromKey) << ", no orphan ranges remain"; + + return CleanupResult_Done; + } + orphanRange.ns = ns.ns(); + *stoppedAtKey = orphanRange.maxKey; + + LOG(0) << "cleanupOrphaned requested for " << ns.toString() << " starting from " + << redact(startingFromKey) << ", removing next orphan range" + << " [" << redact(orphanRange.minKey) << "," << redact(orphanRange.maxKey) << ")"; - return CleanupResult_Done; + targetRange.emplace(ChunkRange(orphanRange.minKey, orphanRange.maxKey)); + css->cleanUpRange(*targetRange); } - orphanRange.ns = ns.ns(); - *stoppedAtKey = orphanRange.maxKey; - - LOG(0) << "cleanupOrphaned requested for " << ns.toString() << " starting from " - << redact(startingFromKey) << ", removing next orphan range" - << " [" << redact(orphanRange.minKey) << "," << redact(orphanRange.maxKey) << ")"; - - // Metadata snapshot may be stale now, but deleter checks metadata again in write lock - // before delete. - RangeDeleterOptions deleterOptions(orphanRange); - deleterOptions.writeConcern = secondaryThrottle; - deleterOptions.onlyRemoveOrphanedDocs = true; - deleterOptions.fromMigrate = true; - // Must wait for cursors since there can be existing cursors with an older - // CollectionMetadata. - deleterOptions.waitForOpenCursors = true; - deleterOptions.removeSaverReason = "cleanup-cmd"; - - if (!getDeleter()->deleteNow(opCtx, deleterOptions, errMsg)) { - warning() << redact(*errMsg); - return CleanupResult_Error; + if (targetRange) { + auto result = CollectionShardingState::waitForClean(opCtx, ns, *targetRange); + if (!result.isOK()) { + warning() << redact(result.reason()); + return CleanupResult_Error; + } } - return CleanupResult_Continue; } diff --git a/src/mongo/db/s/collection_metadata.cpp b/src/mongo/db/s/collection_metadata.cpp index b18e325adb0..0b4cf09e59e 100644 --- a/src/mongo/db/s/collection_metadata.cpp +++ b/src/mongo/db/s/collection_metadata.cpp @@ -49,7 +49,6 @@ CollectionMetadata::CollectionMetadata(const BSONObj& keyPattern, _collVersion(collectionVersion), _shardVersion(shardVersion), _chunksMap(std::move(shardChunksMap)), - _pendingMap(SimpleBSONObjComparator::kInstance.makeBSONObjIndexedMap<CachedChunkInfo>()), _rangesMap(SimpleBSONObjComparator::kInstance.makeBSONObjIndexedMap<CachedChunkInfo>()) { invariant(_shardKeyPattern.isValid()); @@ -61,16 +60,24 @@ CollectionMetadata::CollectionMetadata(const BSONObj& keyPattern, invariant(!_shardVersion.isSet()); return; } - invariant(_shardVersion.isSet()); - // Load the chunk information, coallesceing their ranges. The version for this shard would be + _rebuildRangesMap(); +} + +CollectionMetadata::~CollectionMetadata() = default; + +void CollectionMetadata::_rebuildRangesMap() { + _rangesMap.clear(); + + // Load the chunk information, coalescing their ranges. The version for this shard would be // the highest version for any of the chunks. + BSONObj min, max; for (const auto& entry : _chunksMap) { - BSONObj currMin = entry.first; - BSONObj currMax = entry.second.getMaxKey(); + BSONObj const& currMin = entry.first; + BSONObj const& currMax = entry.second.getMaxKey(); // Coalesce the chunk's bounds in ranges if they are adjacent chunks if (min.isEmpty()) { @@ -96,54 +103,9 @@ CollectionMetadata::CollectionMetadata(const BSONObj& keyPattern, _rangesMap.emplace(min, CachedChunkInfo(max, ChunkVersion::IGNORED())); } -CollectionMetadata::~CollectionMetadata() = default; - -std::unique_ptr<CollectionMetadata> CollectionMetadata::cloneMinusPending( - const ChunkType& chunk) const { - invariant(rangeMapContains(_pendingMap, chunk.getMin(), chunk.getMax())); - - auto metadata(stdx::make_unique<CollectionMetadata>( - _shardKeyPattern.toBSON(), getCollVersion(), getShardVersion(), getChunks())); - metadata->_pendingMap = _pendingMap; - - metadata->_pendingMap.erase(chunk.getMin()); - - return metadata; -} - -std::unique_ptr<CollectionMetadata> CollectionMetadata::clonePlusPending( - const ChunkType& chunk) const { - invariant(!rangeMapOverlaps(_chunksMap, chunk.getMin(), chunk.getMax())); - - auto metadata(stdx::make_unique<CollectionMetadata>( - _shardKeyPattern.toBSON(), getCollVersion(), getShardVersion(), getChunks())); - metadata->_pendingMap = _pendingMap; - - // If there are any pending chunks on the interval to be added this is ok, since pending chunks - // aren't officially tracked yet and something may have changed on servers we do not see yet. - // - // We remove any chunks we overlap because the remote request starting a chunk migration is what - // is authoritative. - - if (rangeMapOverlaps(_pendingMap, chunk.getMin(), chunk.getMax())) { - RangeVector pendingOverlap; - getRangeMapOverlap(_pendingMap, chunk.getMin(), chunk.getMax(), &pendingOverlap); - - warning() << "new pending chunk " << redact(rangeToString(chunk.getMin(), chunk.getMax())) - << " overlaps existing pending chunks " << redact(overlapToString(pendingOverlap)) - << ", a migration may not have completed"; - - for (RangeVector::iterator it = pendingOverlap.begin(); it != pendingOverlap.end(); ++it) { - metadata->_pendingMap.erase(it->first); - } - } - - // The pending map entry cannot contain a specific chunk version because we don't know what - // version would be generated for it at commit time. That's why we insert an IGNORED value. - metadata->_pendingMap.emplace(chunk.getMin(), - CachedChunkInfo(chunk.getMax(), ChunkVersion::IGNORED())); - - return metadata; +std::unique_ptr<CollectionMetadata> CollectionMetadata::clone() const { + return stdx::make_unique<CollectionMetadata>( + _shardKeyPattern.toBSON(), getCollVersion(), getShardVersion(), getChunks()); } bool CollectionMetadata::keyBelongsToMe(const BSONObj& key) const { @@ -158,18 +120,6 @@ bool CollectionMetadata::keyBelongsToMe(const BSONObj& key) const { return rangeContains(it->first, it->second.getMaxKey(), key); } -bool CollectionMetadata::keyIsPending(const BSONObj& key) const { - if (_pendingMap.empty()) { - return false; - } - - auto it = _pendingMap.upper_bound(key); - if (it != _pendingMap.begin()) - it--; - - return rangeContains(it->first, it->second.getMaxKey(), key); -} - bool CollectionMetadata::getNextChunk(const BSONObj& lookupKey, ChunkType* chunk) const { RangeMap::const_iterator upperChunkIt = _chunksMap.upper_bound(lookupKey); RangeMap::const_iterator lowerChunkIt = upperChunkIt; @@ -238,6 +188,10 @@ Status CollectionMetadata::checkChunkIsValid(const ChunkType& chunk) { return Status::OK(); } +bool CollectionMetadata::rangeOverlapsChunk(ChunkRange const& range) { + return rangeMapOverlaps(_rangesMap, range.getMin(), range.getMax()); +} + void CollectionMetadata::toBSONBasic(BSONObjBuilder& bb) const { _collVersion.addToBSON(bb, "collVersion"); _shardVersion.addToBSON(bb, "shardVersion"); @@ -256,97 +210,58 @@ void CollectionMetadata::toBSONChunks(BSONArrayBuilder& bb) const { } } -void CollectionMetadata::toBSONPending(BSONArrayBuilder& bb) const { - if (_pendingMap.empty()) - return; - - for (RangeMap::const_iterator it = _pendingMap.begin(); it != _pendingMap.end(); ++it) { - BSONArrayBuilder pendingBB(bb.subarrayStart()); - pendingBB.append(it->first); - pendingBB.append(it->second.getMaxKey()); - pendingBB.done(); - } -} - std::string CollectionMetadata::toStringBasic() const { return str::stream() << "collection version: " << _collVersion.toString() << ", shard version: " << _shardVersion.toString(); } -bool CollectionMetadata::getNextOrphanRange(const BSONObj& origLookupKey, KeyRange* range) const { +bool CollectionMetadata::getNextOrphanRange(RangeMap const& receivingChunks, + const BSONObj& origLookupKey, + KeyRange* range) const { BSONObj lookupKey = origLookupKey; BSONObj maxKey = getMaxKey(); // so we don't keep rebuilding while (lookupKey.woCompare(maxKey) < 0) { - RangeMap::const_iterator lowerChunkIt = _chunksMap.end(); - RangeMap::const_iterator upperChunkIt = _chunksMap.end(); - - if (!_chunksMap.empty()) { - upperChunkIt = _chunksMap.upper_bound(lookupKey); - lowerChunkIt = upperChunkIt; - if (upperChunkIt != _chunksMap.begin()) - --lowerChunkIt; - else - lowerChunkIt = _chunksMap.end(); - } - - // If we overlap, continue after the overlap - // TODO: Could optimize slightly by finding next non-contiguous chunk - if (lowerChunkIt != _chunksMap.end() && - lowerChunkIt->second.getMaxKey().woCompare(lookupKey) > 0) { - lookupKey = lowerChunkIt->second.getMaxKey(); - continue; - } - - RangeMap::const_iterator lowerPendingIt = _pendingMap.end(); - RangeMap::const_iterator upperPendingIt = _pendingMap.end(); - - if (!_pendingMap.empty()) { - upperPendingIt = _pendingMap.upper_bound(lookupKey); - lowerPendingIt = upperPendingIt; - if (upperPendingIt != _pendingMap.begin()) - --lowerPendingIt; - else - lowerPendingIt = _pendingMap.end(); - } - - // If we overlap, continue after the overlap - // TODO: Could optimize slightly by finding next non-contiguous chunk - if (lowerPendingIt != _pendingMap.end() && - lowerPendingIt->second.getMaxKey().woCompare(lookupKey) > 0) { - lookupKey = lowerPendingIt->second.getMaxKey(); - continue; - } - - // - // We know that the lookup key is not covered by a chunk or pending range, and where the - // previous chunk and pending chunks are. Now we fill in the bounds as the closest - // bounds of the surrounding ranges in both maps. - // range->keyPattern = _shardKeyPattern.toBSON(); range->minKey = getMinKey(); range->maxKey = maxKey; - if (lowerChunkIt != _chunksMap.end() && - lowerChunkIt->second.getMaxKey().woCompare(range->minKey) > 0) { - range->minKey = lowerChunkIt->second.getMaxKey(); - } - - if (upperChunkIt != _chunksMap.end() && upperChunkIt->first.woCompare(range->maxKey) < 0) { - range->maxKey = upperChunkIt->first; - } - - if (lowerPendingIt != _pendingMap.end() && - lowerPendingIt->second.getMaxKey().woCompare(range->minKey) > 0) { - range->minKey = lowerPendingIt->second.getMaxKey(); - } + auto patchUp = [&](auto const& map) { + auto lowerIt = map.end(), upperIt = map.end(); + + if (!map.empty()) { + upperIt = map.upper_bound(lookupKey); + lowerIt = upperIt; + if (upperIt != map.begin()) + --lowerIt; + else + lowerIt = map.end(); + } + + // If we overlap, continue after the overlap + // TODO: Could optimize slightly by finding next non-contiguous chunk + if (lowerIt != map.end() && lowerIt->second.getMaxKey().woCompare(lookupKey) > 0) { + lookupKey = lowerIt->second.getMaxKey(); + return false; + } + + // We know that the lookup key is not covered by a chunk or pending range, and where the + // previous chunk and pending chunks are. Now we fill in the bounds as the closest + // bounds of the surrounding ranges in both maps. + if (lowerIt != _chunksMap.end() && + lowerIt->second.getMaxKey().woCompare(range->minKey) > 0) { + range->minKey = lowerIt->second.getMaxKey(); + } + if (upperIt != _chunksMap.end() && upperIt->first.woCompare(range->maxKey) < 0) { + range->maxKey = upperIt->first; + } + return true; + }; - if (upperPendingIt != _pendingMap.end() && - upperPendingIt->first.woCompare(range->maxKey) < 0) { - range->maxKey = upperPendingIt->first; + // side effects on *range are safe even if we go around again. + if (patchUp(_chunksMap) && patchUp(receivingChunks)) { + return true; } - - return true; } return false; diff --git a/src/mongo/db/s/collection_metadata.h b/src/mongo/db/s/collection_metadata.h index ea7392842d6..c4b4ea70057 100644 --- a/src/mongo/db/s/collection_metadata.h +++ b/src/mongo/db/s/collection_metadata.h @@ -29,6 +29,7 @@ #pragma once #include "mongo/db/range_arithmetic.h" +#include "mongo/s/catalog/type_chunk.h" #include "mongo/s/chunk_version.h" #include "mongo/s/shard_key_pattern.h" @@ -64,20 +65,9 @@ public: ~CollectionMetadata(); /** - * Returns a new metadata's instance based on 'this's state by removing a 'pending' chunk. - * - * The shard and collection version of the new metadata are unaffected. The caller owns the - * new metadata. - */ - std::unique_ptr<CollectionMetadata> cloneMinusPending(const ChunkType& chunk) const; - - /** - * Returns a new metadata's instance based on 'this's state by adding a 'pending' chunk. - * - * The shard and collection version of the new metadata are unaffected. The caller owns the - * new metadata. + * Returns a full copy of *this. */ - std::unique_ptr<CollectionMetadata> clonePlusPending(const ChunkType& chunk) const; + std::unique_ptr<CollectionMetadata> clone() const; /** * Returns true if the document key 'key' is a valid instance of a shard key for this @@ -93,12 +83,6 @@ public: bool keyBelongsToMe(const BSONObj& key) const; /** - * Returns true if the document key 'key' is or has been migrated to this shard, and may - * belong to us after a subsequent config reload. Key must be the full shard key. - */ - bool keyIsPending(const BSONObj& key) const; - - /** * Given a key 'lookupKey' in the shard key range, get the next chunk which overlaps or is * greater than this key. Returns true if a chunk exists, false otherwise. * @@ -117,6 +101,11 @@ public: Status checkChunkIsValid(const ChunkType& chunk); /** + * Returns true if the argument range overlaps any chunk. + */ + bool rangeOverlapsChunk(ChunkRange const& range); + + /** * Given a key in the shard key range, get the next range which overlaps or is greater than * this key. * @@ -132,7 +121,9 @@ public: * @param lookupKey passing a key that does not belong to this metadata is undefined. * @param orphanRange the output range. Note that the NS is not set. */ - bool getNextOrphanRange(const BSONObj& lookupKey, KeyRange* orphanRange) const; + bool getNextOrphanRange(RangeMap const& receiveMap, + BSONObj const& lookupKey, + KeyRange* orphanRange) const; ChunkVersion getCollVersion() const { return _collVersion; @@ -183,6 +174,12 @@ public: std::string toStringBasic() const; private: + /** + * Clears and rebuilds _rangesMap from the contents of _chunksMap. + */ + + void _rebuildRangesMap(); + // Shard key pattern for the collection ShardKeyPattern _shardKeyPattern; @@ -196,10 +193,7 @@ private: // Map of chunks tracked by this shard RangeMap _chunksMap; - // Map of ranges of chunks that are migrating but have not been confirmed added yet - RangeMap _pendingMap; - - // A second map from a min key into a range or contiguous chunks. The map is redundant + // A second map from a min key into a range of contiguous chunks. The map is redundant // w.r.t. _chunkMap but we expect high chunk contiguity, especially in small // installations. RangeMap _rangesMap; diff --git a/src/mongo/db/s/collection_range_deleter.cpp b/src/mongo/db/s/collection_range_deleter.cpp index be37e510a9d..4e86274d4ee 100644 --- a/src/mongo/db/s/collection_range_deleter.cpp +++ b/src/mongo/db/s/collection_range_deleter.cpp @@ -33,6 +33,7 @@ #include "mongo/db/s/collection_range_deleter.h" #include <algorithm> +#include <utility> #include "mongo/db/catalog/collection.h" #include "mongo/db/client.h" @@ -41,13 +42,16 @@ #include "mongo/db/exec/working_set_common.h" #include "mongo/db/index/index_descriptor.h" #include "mongo/db/keypattern.h" +#include "mongo/db/operation_context.h" #include "mongo/db/query/internal_plans.h" #include "mongo/db/query/query_knobs.h" #include "mongo/db/query/query_planner.h" #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/db/s/collection_sharding_state.h" +#include "mongo/db/s/metadata_manager.h" #include "mongo/db/s/sharding_state.h" +#include "mongo/db/service_context.h" #include "mongo/db/write_concern.h" #include "mongo/executor/task_executor.h" #include "mongo/util/log.h" @@ -57,7 +61,6 @@ namespace mongo { class ChunkRange; -class OldClientWriteContext; using CallbackArgs = executor::TaskExecutor::CallbackArgs; using logger::LogComponent; @@ -67,62 +70,39 @@ namespace { const WriteConcernOptions kMajorityWriteConcern(WriteConcernOptions::kMajority, WriteConcernOptions::SyncMode::UNSET, Seconds(60)); - } // unnamed namespace -CollectionRangeDeleter::CollectionRangeDeleter(NamespaceString nss) : _nss(std::move(nss)) {} - -void CollectionRangeDeleter::run() { - Client::initThread(getThreadName()); - ON_BLOCK_EXIT([&] { Client::destroy(); }); - auto opCtx = cc().makeOperationContext().get(); - - const int maxToDelete = std::max(int(internalQueryExecYieldIterations.load()), 1); - bool hasNextRangeToClean = cleanupNextRange(opCtx, maxToDelete); - - // If there are more ranges to run, we add <this> back onto the task executor to run again. - if (hasNextRangeToClean) { - auto executor = ShardingState::get(opCtx)->getRangeDeleterTaskExecutor(); - executor->scheduleWork([this](const CallbackArgs& cbArgs) { run(); }); - } else { - delete this; - } +CollectionRangeDeleter::~CollectionRangeDeleter() { + clear(); // notify anybody sleeping on orphan ranges } -bool CollectionRangeDeleter::cleanupNextRange(OperationContext* opCtx, int maxToDelete) { - - { - AutoGetCollection autoColl(opCtx, _nss, MODE_IX); - auto* collection = autoColl.getCollection(); - if (!collection) { - return false; - } - auto* collectionShardingState = CollectionShardingState::get(opCtx, _nss); - dassert(collectionShardingState != nullptr); // every collection gets one - - auto& metadataManager = collectionShardingState->_metadataManager; - - if (!_rangeInProgress && !metadataManager.hasRangesToClean()) { - // Nothing left to do - return false; - } - - if (!_rangeInProgress || !metadataManager.isInRangesToClean(_rangeInProgress.get())) { - // No valid chunk in progress, get a new one - if (!metadataManager.hasRangesToClean()) { - return false; - } - _rangeInProgress = metadataManager.getNextRangeToClean(); +// This function runs in range deleter's task executor thread. +// call under a collection lock +bool CollectionRangeDeleter::cleanUpNextRange(OperationContext* opCtx, + Collection* collection, + BSONObj const& keyPattern, + stdx::mutex* lock, + int maxToDelete) { + dassert(collection != nullptr); + auto range = [lock, this]() -> boost::optional<ChunkRange> { + stdx::lock_guard<stdx::mutex> lk(*lock); + if (this->isEmpty()) { + return boost::none; + } else { + return this->_orphans.front().range; } + }(); + if (!range) { + return false; + } - auto scopedCollectionMetadata = collectionShardingState->getMetadata(); - int numDocumentsDeleted = - _doDeletion(opCtx, collection, scopedCollectionMetadata->getKeyPattern(), maxToDelete); - if (numDocumentsDeleted <= 0) { - metadataManager.removeRangeToClean(_rangeInProgress.get()); - _rangeInProgress = boost::none; - return metadataManager.hasRangesToClean(); + auto countWith = _doDeletion(opCtx, collection, keyPattern, *range, maxToDelete); + if (!countWith.isOK() || countWith.getValue() == 0) { + { + stdx::lock_guard<stdx::mutex> scopedLock(*lock); + _pop(countWith.getStatus()); } + return true; } // wait for replication @@ -131,59 +111,63 @@ bool CollectionRangeDeleter::cleanupNextRange(OperationContext* opCtx, int maxTo Status status = waitForWriteConcern(opCtx, currentClientOpTime, kMajorityWriteConcern, &wcResult); if (!status.isOK()) { - warning() << "Error when waiting for write concern after removing chunks in " << _nss - << " : " << status.reason(); + warning() << "Error when waiting for write concern after removing chunks in " + << collection->ns() << " : " << status.reason(); + { + stdx::lock_guard<stdx::mutex> scopedLock(*lock); + _pop(status); + } } - return true; } -int CollectionRangeDeleter::_doDeletion(OperationContext* opCtx, - Collection* collection, - const BSONObj& keyPattern, - int maxToDelete) { - invariant(_rangeInProgress); - invariant(collection); +// This function runs in range deleter task executor thread.. +// call under collection lock +StatusWith<int> CollectionRangeDeleter::_doDeletion(OperationContext* opCtx, + Collection* collection, + BSONObj const& keyPattern, + ChunkRange const& range, + int maxToDelete) { + NamespaceString const& nss = collection->ns(); // The IndexChunk has a keyPattern that may apply to more than one index - we need to // select the index and get the full index keyPattern here. - const IndexDescriptor* idx = - collection->getIndexCatalog()->findShardKeyPrefixedIndex(opCtx, keyPattern, false); + auto catalog = collection->getIndexCatalog(); + const IndexDescriptor* idx = catalog->findShardKeyPrefixedIndex(opCtx, keyPattern, false); if (idx == NULL) { - warning() << "Unable to find shard key index for " << keyPattern.toString() << " in " - << _nss; - return -1; + std::string msg = str::stream() << "Unable to find shard key index for " + << keyPattern.toString() << " in " << nss.ns(); + log() << msg; + return {ErrorCodes::InternalError, msg}; } - KeyPattern indexKeyPattern(idx->keyPattern().getOwned()); - // Extend bounds to match the index we found - const BSONObj& min = - Helpers::toKeyFormat(indexKeyPattern.extendRangeBound(_rangeInProgress->getMin(), false)); - const BSONObj& max = - Helpers::toKeyFormat(indexKeyPattern.extendRangeBound(_rangeInProgress->getMax(), false)); + KeyPattern indexKeyPattern(idx->keyPattern().getOwned()); + auto extend = [&](auto& key) { + return Helpers::toKeyFormat(indexKeyPattern.extendRangeBound(key, false)); + }; + const BSONObj& min = extend(range.getMin()); + const BSONObj& max = extend(range.getMax()); - LOG(1) << "begin removal of " << min << " to " << max << " in " << _nss; + LOG(1) << "begin removal of " << min << " to " << max << " in " << nss.ns(); auto indexName = idx->indexName(); - IndexDescriptor* desc = collection->getIndexCatalog()->findIndexByName(opCtx, indexName); - if (!desc) { - warning() << "shard key index with name " << indexName << " on '" << _nss - << "' was dropped"; - return -1; + IndexDescriptor* descriptor = collection->getIndexCatalog()->findIndexByName(opCtx, indexName); + if (!descriptor) { + std::string msg = str::stream() << "shard key index with name " << indexName << " on '" + << nss.ns() << "' was dropped"; + log() << msg; + return {ErrorCodes::InternalError, msg}; } int numDeleted = 0; do { - auto exec = InternalPlanner::indexScan(opCtx, - collection, - desc, - min, - max, - BoundInclusion::kIncludeStartKeyOnly, - PlanExecutor::YIELD_MANUAL, - InternalPlanner::FORWARD, - InternalPlanner::IXSCAN_FETCH); + auto bounds = BoundInclusion::kIncludeStartKeyOnly; + auto manual = PlanExecutor::YIELD_MANUAL; + auto forward = InternalPlanner::FORWARD; + auto fetch = InternalPlanner::IXSCAN_FETCH; + auto exec = InternalPlanner::indexScan( + opCtx, collection, descriptor, min, max, bounds, manual, forward, fetch); RecordId rloc; BSONObj obj; PlanExecutor::ExecState state = exec->getNext(&obj, &rloc); @@ -193,23 +177,66 @@ int CollectionRangeDeleter::_doDeletion(OperationContext* opCtx, if (state == PlanExecutor::FAILURE || state == PlanExecutor::DEAD) { warning(LogComponent::kSharding) << PlanExecutor::statestr(state) << " - cursor error while trying to delete " << min - << " to " << max << " in " << _nss << ": " << WorkingSetCommon::toStatusString(obj) + << " to " << max << " in " << nss << ": " << WorkingSetCommon::toStatusString(obj) << ", stats: " << Explain::getWinningPlanStats(exec.get()); break; } invariant(PlanExecutor::ADVANCED == state); WriteUnitOfWork wuow(opCtx); - if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(opCtx, _nss)) { - warning() << "stepped down from primary while deleting chunk; orphaning data in " - << _nss << " in range [" << min << ", " << max << ")"; - break; - } - OpDebug* const nullOpDebug = nullptr; - collection->deleteDocument(opCtx, rloc, nullOpDebug, true); + collection->deleteDocument(opCtx, rloc, nullptr, true); wuow.commit(); } while (++numDeleted < maxToDelete); return numDeleted; } +auto CollectionRangeDeleter::overlaps(ChunkRange const& range) -> DeleteNotification { + // start search with newest entries by using reverse iterators + auto it = find_if(_orphans.rbegin(), _orphans.rend(), [&](auto& cleanee) { + return bool(cleanee.range.overlapWith(range)); + }); + return it != _orphans.rend() ? it->notification : DeleteNotification(); +} + +void CollectionRangeDeleter::add(ChunkRange const& range) { + // We ignore the case of overlapping, or even equal, ranges. + + // Deleting overlapping ranges is similarly quick. + _orphans.emplace_back(Deletion{ChunkRange(range.getMin().getOwned(), range.getMax().getOwned()), + std::make_shared<Notification<Status>>()}); +} + +void CollectionRangeDeleter::append(BSONObjBuilder* builder) const { + BSONArrayBuilder arr(builder->subarrayStart("rangesToClean")); + for (auto const& entry : _orphans) { + BSONObjBuilder obj; + entry.range.append(&obj); + arr.append(obj.done()); + } + arr.done(); +} + +size_t CollectionRangeDeleter::size() const { + return _orphans.size(); +} + +bool CollectionRangeDeleter::isEmpty() const { + return _orphans.empty(); +} + +void CollectionRangeDeleter::clear() { + std::for_each(_orphans.begin(), _orphans.end(), [](auto& range) { + // Since deletion was not actually tried, we have no failures to report. + if (!*(range.notification)) { + range.notification->set(Status::OK()); // wake up anything waiting on it + } + }); + _orphans.clear(); +} + +void CollectionRangeDeleter::_pop(Status result) { + _orphans.front().notification->set(result); // wake up waitForClean + _orphans.pop_front(); +} + } // namespace mongo diff --git a/src/mongo/db/s/collection_range_deleter.h b/src/mongo/db/s/collection_range_deleter.h index f611215a73d..2aa969e3d28 100644 --- a/src/mongo/db/s/collection_range_deleter.h +++ b/src/mongo/db/s/collection_range_deleter.h @@ -29,7 +29,9 @@ #include "mongo/base/disallow_copying.h" #include "mongo/db/namespace_string.h" +#include "mongo/executor/task_executor.h" #include "mongo/s/catalog/type_chunk.h" +#include "mongo/util/concurrency/notification.h" namespace mongo { @@ -41,40 +43,88 @@ class CollectionRangeDeleter { MONGO_DISALLOW_COPYING(CollectionRangeDeleter); public: - CollectionRangeDeleter(NamespaceString nss); + /** + * Normally, construct with the collection name and ShardingState's dedicated executor. + */ + CollectionRangeDeleter() = default; + ~CollectionRangeDeleter(); + + using DeleteNotification = std::shared_ptr<Notification<Status>>; + + /** + * Adds a new range to be cleaned up by the cleaner thread. + */ + void add(const ChunkRange& range); + + /** + * Reports whether the argument range overlaps any of the ranges to clean. If there is overlap, + * it returns a notification that will be completed when the currently newest overlapping + * range is no longer scheduled. Its value indicates whether it has been successfully removed. + * If there is no overlap, the result is nullptr. After a successful removal, the caller + * should call again to ensure no other range overlaps the argument. + * (See MigrationDestinationManager::waitForClean and MetadataManager::trackCleanup for an + * example use.) + */ + DeleteNotification overlaps(ChunkRange const& range); /** - * Starts deleting ranges and cleans up this object when it is finished. + * Reports the number of ranges remaining to be cleaned up. */ - void run(); + size_t size() const; + + bool isEmpty() const; + + /* + * Notify anything waiting on ranges scheduled, before discarding the ranges. + */ + void clear(); + + /* + * Append a representation of self to the specified builder. + */ + void append(BSONObjBuilder* builder) const; /** - * Acquires the collection IX lock and checks whether there are new entries for the collection's - * rangesToClean structure. If there are, deletes up to maxToDelete entries and yields using - * the standard query yielding logic. + * If any ranges are scheduled to clean, deletes up to maxToDelete documents, notifying watchers + * of ranges as they are completed. Uses specified lock to serialize access to *this. * - * Returns true if there are more entries in rangesToClean, false if there is no more progress - * to be made. + * Returns true if it should be scheduled to run again, false if there is no more progress to be + * made. */ - bool cleanupNextRange(OperationContext* opCtx, int maxToDelete); + bool cleanUpNextRange(OperationContext*, + Collection*, + BSONObj const& keyPattern, + stdx::mutex* lock, + int maxToDelete); private: /** * Performs the deletion of up to maxToDelete entries within the range in progress. - * This function will invariant if called while _rangeInProgress is not set. * - * Returns the number of documents deleted (0 if deletion is finished), or -1 for error. + * Returns the number of documents deleted, 0 if done with the range, or bad status if deleting + * the range failed. */ - int _doDeletion(OperationContext* opCtx, - Collection* collection, - const BSONObj& keyPattern, - int maxToDelete); + StatusWith<int> _doDeletion(OperationContext* opCtx, + Collection* collection, + const BSONObj& keyPattern, + ChunkRange const& range, + int maxToDelete); - NamespaceString _nss; + /** + * Removes the latest-scheduled range from the ranges to be cleaned up, and notifies any + * interested callers of this->overlaps(range) with specified status. + */ + void _pop(Status status); - // Holds a range for which deletion has begun. If empty, then a new range - // must be requested from rangesToClean - boost::optional<ChunkRange> _rangeInProgress; + /** + * Ranges scheduled for deletion. The front of the list will be in active process of deletion. + * As each range is completed, its notification is signaled before it is popped. + */ + struct Deletion { + ChunkRange const range; + DeleteNotification const notification; + }; + std::list<Deletion> _orphans; }; } // namespace mongo diff --git a/src/mongo/db/s/collection_range_deleter_test.cpp b/src/mongo/db/s/collection_range_deleter_test.cpp index 2927379ff1c..9ff50c7eff6 100644 --- a/src/mongo/db/s/collection_range_deleter_test.cpp +++ b/src/mongo/db/s/collection_range_deleter_test.cpp @@ -30,55 +30,90 @@ #include "mongo/db/s/collection_range_deleter.h" +#include "mongo/bson/bsonobj.h" +#include "mongo/bson/bsonobjbuilder.h" #include "mongo/client/query.h" +#include "mongo/client/remote_command_targeter_mock.h" #include "mongo/db/client.h" #include "mongo/db/db_raii.h" #include "mongo/db/dbdirectclient.h" +#include "mongo/db/index/index_descriptor.h" #include "mongo/db/keypattern.h" #include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/db/repl/replication_coordinator_mock.h" #include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/s/sharding_state.h" #include "mongo/db/service_context_d_test_fixture.h" +#include "mongo/s/catalog/dist_lock_catalog_impl.h" +#include "mongo/s/catalog/dist_lock_manager_mock.h" +#include "mongo/s/catalog/sharding_catalog_client_mock.h" #include "mongo/s/chunk_version.h" +#include "mongo/s/client/shard_registry.h" +#include "mongo/s/sharding_mongod_test_fixture.h" namespace mongo { using unittest::assertGet; - -const NamespaceString kNamespaceString = NamespaceString("foo", "bar"); +const NamespaceString kNss = NamespaceString("foo", "bar"); const std::string kPattern = "_id"; const BSONObj kKeyPattern = BSON(kPattern << 1); +const std::string kShardName{"a"}; +const HostAndPort dummyHost("dummy", 123); + +// const auto kIxVer = IndexDescriptor::IndexVersion::kV2; -class CollectionRangeDeleterTest : public ServiceContextMongoDTest { +class CollectionRangeDeleterTest : public ShardingMongodTestFixture { protected: - ServiceContext::UniqueOperationContext _opCtx; - MetadataManager* _metadataManager; std::unique_ptr<DBDirectClient> _dbDirectClient; + stdx::mutex _dummyLock; + + bool next(CollectionRangeDeleter& rangeDeleter, Collection* collection, int maxToDelete) { + return rangeDeleter.cleanUpNextRange( + operationContext(), collection, kKeyPattern, &_dummyLock, maxToDelete); + } + std::shared_ptr<RemoteCommandTargeterMock> configTargeter() { + return RemoteCommandTargeterMock::get(shardRegistry()->getConfigShard()->getTargeter()); + } - OperationContext* operationContext(); private: void setUp() override; void tearDown() override; + + std::unique_ptr<DistLockCatalog> makeDistLockCatalog(ShardRegistry* shardRegistry) override { + invariant(shardRegistry); + return stdx::make_unique<DistLockCatalogImpl>(shardRegistry); + } + + std::unique_ptr<DistLockManager> makeDistLockManager( + std::unique_ptr<DistLockCatalog> distLockCatalog) override { + return stdx::make_unique<DistLockManagerMock>(std::move(distLockCatalog)); + } + + std::unique_ptr<ShardingCatalogClient> makeShardingCatalogClient( + std::unique_ptr<DistLockManager> distLockManager) override { + return stdx::make_unique<ShardingCatalogClientMock>(std::move(distLockManager)); + } }; void CollectionRangeDeleterTest::setUp() { - ServiceContextMongoDTest::setUp(); - const repl::ReplSettings replSettings = {}; - repl::setGlobalReplicationCoordinator( - new repl::ReplicationCoordinatorMock(getServiceContext(), replSettings)); - repl::getGlobalReplicationCoordinator()->setFollowerMode(repl::MemberState::RS_PRIMARY); - _opCtx = getServiceContext()->makeOperationContext(&cc()); - _dbDirectClient = stdx::make_unique<DBDirectClient>(operationContext()); + serverGlobalParams.clusterRole = ClusterRole::ShardServer; + ShardingMongodTestFixture::setUp(); + replicationCoordinator()->alwaysAllowWrites(true); + initializeGlobalShardingStateForMongodForTest(ConnectionString(dummyHost)); + + // RemoteCommandTargeterMock::get(shardRegistry()->getConfigShard()->getTargeter()) + // ->setConnectionStringReturnValue(kConfigConnStr); + + configTargeter()->setFindHostReturnValue(dummyHost); + _dbDirectClient = stdx::make_unique<DBDirectClient>(operationContext()); + ASSERT(_dbDirectClient->createCollection(kNss.ns())); { + AutoGetCollection autoColl(operationContext(), kNss, MODE_IX); + auto collectionShardingState = CollectionShardingState::get(operationContext(), kNss); const OID epoch = OID::gen(); - - AutoGetCollection autoColl(operationContext(), kNamespaceString, MODE_IX); - auto collectionShardingState = - CollectionShardingState::get(operationContext(), kNamespaceString); collectionShardingState->refreshMetadata( operationContext(), stdx::make_unique<CollectionMetadata>( @@ -86,208 +121,149 @@ void CollectionRangeDeleterTest::setUp() { ChunkVersion(1, 0, epoch), ChunkVersion(0, 0, epoch), SimpleBSONObjComparator::kInstance.makeBSONObjIndexedMap<CachedChunkInfo>())); - _metadataManager = collectionShardingState->getMetadataManagerForTest(); } } void CollectionRangeDeleterTest::tearDown() { { - AutoGetCollection autoColl(operationContext(), kNamespaceString, MODE_IX); - auto collectionShardingState = - CollectionShardingState::get(operationContext(), kNamespaceString); + AutoGetCollection autoColl(operationContext(), kNss, MODE_IX); + auto collectionShardingState = CollectionShardingState::get(operationContext(), kNss); collectionShardingState->refreshMetadata(operationContext(), nullptr); } _dbDirectClient.reset(); - _opCtx.reset(); - ServiceContextMongoDTest::tearDown(); - repl::setGlobalReplicationCoordinator(nullptr); -} - -OperationContext* CollectionRangeDeleterTest::operationContext() { - invariant(_opCtx); - return _opCtx.get(); + ShardingMongodTestFixture::tearDown(); } namespace { // Tests the case that there is nothing in the database. TEST_F(CollectionRangeDeleterTest, EmptyDatabase) { - CollectionRangeDeleter rangeDeleter(kNamespaceString); - ASSERT_FALSE(rangeDeleter.cleanupNextRange(operationContext(), 1)); + CollectionRangeDeleter rangeDeleter; + AutoGetCollection autoColl(operationContext(), kNss, MODE_IX); + auto collection = autoColl.getCollection(); + ASSERT_FALSE(next(rangeDeleter, collection, 1)); +} + +#if 0 +// Just inserts a record. +TEST_F(CollectionRangeDeleterTest, NothingToDo) { + CollectionRangeDeleter rangeDeleter; + const BSONObj insertedDoc = BSON(kPattern << 25); + _dbDirectClient->insert(kNss.toString(), insertedDoc); + ASSERT_BSONOBJ_EQ(insertedDoc, + _dbDirectClient->findOne(kNss.toString(), QUERY(kPattern << 25))); } +#endif // Tests the case that there is data, but it is not in a range to clean. TEST_F(CollectionRangeDeleterTest, NoDataInGivenRangeToClean) { - CollectionRangeDeleter rangeDeleter(kNamespaceString); + CollectionRangeDeleter rangeDeleter; const BSONObj insertedDoc = BSON(kPattern << 25); - - _dbDirectClient->insert(kNamespaceString.toString(), insertedDoc); + AutoGetCollection autoColl(operationContext(), kNss, MODE_IX); + auto collection = autoColl.getCollection(); + _dbDirectClient->insert(kNss.toString(), insertedDoc); ASSERT_BSONOBJ_EQ(insertedDoc, - _dbDirectClient->findOne(kNamespaceString.toString(), QUERY(kPattern << 25))); + _dbDirectClient->findOne(kNss.toString(), QUERY(kPattern << 25))); - _metadataManager->addRangeToClean(ChunkRange(BSON(kPattern << 0), BSON(kPattern << 10))); - ASSERT_FALSE(rangeDeleter.cleanupNextRange(operationContext(), 1)); + rangeDeleter.add(ChunkRange(BSON(kPattern << 0), BSON(kPattern << 10))); + ASSERT_TRUE(next(rangeDeleter, collection, 1)); ASSERT_BSONOBJ_EQ(insertedDoc, - _dbDirectClient->findOne(kNamespaceString.toString(), QUERY(kPattern << 25))); + _dbDirectClient->findOne(kNss.toString(), QUERY(kPattern << 25))); + + ASSERT_FALSE(next(rangeDeleter, collection, 1)); } // Tests the case that there is a single document within a range to clean. TEST_F(CollectionRangeDeleterTest, OneDocumentInOneRangeToClean) { - CollectionRangeDeleter rangeDeleter(kNamespaceString); + CollectionRangeDeleter rangeDeleter; const BSONObj insertedDoc = BSON(kPattern << 5); + AutoGetCollection autoColl(operationContext(), kNss, MODE_IX); + auto collection = autoColl.getCollection(); - _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 5)); - ASSERT_BSONOBJ_EQ(insertedDoc, - _dbDirectClient->findOne(kNamespaceString.toString(), QUERY(kPattern << 5))); + _dbDirectClient->insert(kNss.toString(), BSON(kPattern << 5)); + ASSERT_BSONOBJ_EQ(insertedDoc, _dbDirectClient->findOne(kNss.toString(), QUERY(kPattern << 5))); - _metadataManager->addRangeToClean(ChunkRange(BSON(kPattern << 0), BSON(kPattern << 10))); + rangeDeleter.add(ChunkRange(BSON(kPattern << 0), BSON(kPattern << 10))); - ASSERT_TRUE(rangeDeleter.cleanupNextRange(operationContext(), 1)); - ASSERT_TRUE( - _dbDirectClient->findOne(kNamespaceString.toString(), QUERY(kPattern << 5)).isEmpty()); - - ASSERT_FALSE(rangeDeleter.cleanupNextRange(operationContext(), 1)); + ASSERT_TRUE(next(rangeDeleter, collection, 1)); + ASSERT_TRUE(next(rangeDeleter, collection, 1)); + ASSERT_TRUE(_dbDirectClient->findOne(kNss.toString(), QUERY(kPattern << 5)).isEmpty()); + ASSERT_FALSE(next(rangeDeleter, collection, 1)); } // Tests the case that there are multiple documents within a range to clean. TEST_F(CollectionRangeDeleterTest, MultipleDocumentsInOneRangeToClean) { - CollectionRangeDeleter rangeDeleter(kNamespaceString); - _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 1)); - _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 2)); - _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 3)); - ASSERT_EQUALS(3ULL, - _dbDirectClient->count(kNamespaceString.toString(), BSON(kPattern << LT << 5))); - - _metadataManager->addRangeToClean(ChunkRange(BSON(kPattern << 0), BSON(kPattern << 10))); - - ASSERT_TRUE(rangeDeleter.cleanupNextRange(operationContext(), 100)); - ASSERT_EQUALS(0ULL, - _dbDirectClient->count(kNamespaceString.toString(), BSON(kPattern << LT << 5))); - - ASSERT_FALSE(rangeDeleter.cleanupNextRange(operationContext(), 100)); + CollectionRangeDeleter rangeDeleter; + AutoGetCollection autoColl(operationContext(), kNss, MODE_IX); + auto collection = autoColl.getCollection(); + _dbDirectClient->insert(kNss.toString(), BSON(kPattern << 1)); + _dbDirectClient->insert(kNss.toString(), BSON(kPattern << 2)); + _dbDirectClient->insert(kNss.toString(), BSON(kPattern << 3)); + ASSERT_EQUALS(3ULL, _dbDirectClient->count(kNss.toString(), BSON(kPattern << LT << 5))); + + rangeDeleter.add(ChunkRange(BSON(kPattern << 0), BSON(kPattern << 10))); + + ASSERT_TRUE(next(rangeDeleter, collection, 100)); + ASSERT_TRUE(next(rangeDeleter, collection, 100)); + ASSERT_EQUALS(0ULL, _dbDirectClient->count(kNss.toString(), BSON(kPattern << LT << 5))); + ASSERT_FALSE(next(rangeDeleter, collection, 100)); } // Tests the case that there are multiple documents within a range to clean, and the range deleter // has a max deletion rate of one document per run. -TEST_F(CollectionRangeDeleterTest, MultipleCleanupNextRangeCallsToCleanOneRange) { - CollectionRangeDeleter rangeDeleter(kNamespaceString); - _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 1)); - _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 2)); - _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 3)); - ASSERT_EQUALS(3ULL, - _dbDirectClient->count(kNamespaceString.toString(), BSON(kPattern << LT << 5))); - - _metadataManager->addRangeToClean(ChunkRange(BSON(kPattern << 0), BSON(kPattern << 10))); - - ASSERT_TRUE(rangeDeleter.cleanupNextRange(operationContext(), 1)); - ASSERT_EQUALS(2ULL, - _dbDirectClient->count(kNamespaceString.toString(), BSON(kPattern << LT << 5))); - - ASSERT_TRUE(rangeDeleter.cleanupNextRange(operationContext(), 1)); - ASSERT_EQUALS(1ULL, - _dbDirectClient->count(kNamespaceString.toString(), BSON(kPattern << LT << 5))); - - ASSERT_TRUE(rangeDeleter.cleanupNextRange(operationContext(), 1)); - ASSERT_EQUALS(0ULL, - _dbDirectClient->count(kNamespaceString.toString(), BSON(kPattern << LT << 5))); - - ASSERT_FALSE(rangeDeleter.cleanupNextRange(operationContext(), 1)); +TEST_F(CollectionRangeDeleterTest, MultipleCleanupNextRangeCalls) { + CollectionRangeDeleter rangeDeleter; + AutoGetCollection autoColl(operationContext(), kNss, MODE_IX); + auto collection = autoColl.getCollection(); + _dbDirectClient->insert(kNss.toString(), BSON(kPattern << 1)); + _dbDirectClient->insert(kNss.toString(), BSON(kPattern << 2)); + _dbDirectClient->insert(kNss.toString(), BSON(kPattern << 3)); + ASSERT_EQUALS(3ULL, _dbDirectClient->count(kNss.toString(), BSON(kPattern << LT << 5))); + + rangeDeleter.add(ChunkRange(BSON(kPattern << 0), BSON(kPattern << 10))); + + ASSERT_TRUE(next(rangeDeleter, collection, 1)); + ASSERT_EQUALS(2ULL, _dbDirectClient->count(kNss.toString(), BSON(kPattern << LT << 5))); + + ASSERT_TRUE(next(rangeDeleter, collection, 1)); + ASSERT_EQUALS(1ULL, _dbDirectClient->count(kNss.toString(), BSON(kPattern << LT << 5))); + + ASSERT_TRUE(next(rangeDeleter, collection, 1)); + ASSERT_TRUE(next(rangeDeleter, collection, 1)); + ASSERT_EQUALS(0ULL, _dbDirectClient->count(kNss.toString(), BSON(kPattern << LT << 5))); + ASSERT_FALSE(next(rangeDeleter, collection, 1)); } // Tests the case that there are two ranges to clean, each containing multiple documents. TEST_F(CollectionRangeDeleterTest, MultipleDocumentsInMultipleRangesToClean) { - CollectionRangeDeleter rangeDeleter(kNamespaceString); - _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 1)); - _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 2)); - _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 3)); - _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 4)); - _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 5)); - _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 6)); - ASSERT_EQUALS(6ULL, - _dbDirectClient->count(kNamespaceString.toString(), BSON(kPattern << LT << 10))); + CollectionRangeDeleter rangeDeleter; + AutoGetCollection autoColl(operationContext(), kNss, MODE_IX); + auto collection = autoColl.getCollection(); + _dbDirectClient->insert(kNss.toString(), BSON(kPattern << 1)); + _dbDirectClient->insert(kNss.toString(), BSON(kPattern << 2)); + _dbDirectClient->insert(kNss.toString(), BSON(kPattern << 3)); + _dbDirectClient->insert(kNss.toString(), BSON(kPattern << 4)); + _dbDirectClient->insert(kNss.toString(), BSON(kPattern << 5)); + _dbDirectClient->insert(kNss.toString(), BSON(kPattern << 6)); + ASSERT_EQUALS(6ULL, _dbDirectClient->count(kNss.toString(), BSON(kPattern << LT << 10))); const ChunkRange chunkRange1 = ChunkRange(BSON(kPattern << 0), BSON(kPattern << 4)); const ChunkRange chunkRange2 = ChunkRange(BSON(kPattern << 4), BSON(kPattern << 7)); - _metadataManager->addRangeToClean(chunkRange1); - _metadataManager->addRangeToClean(chunkRange2); - - ASSERT_TRUE(rangeDeleter.cleanupNextRange(operationContext(), 100)); - ASSERT_EQUALS(0ULL, - _dbDirectClient->count(kNamespaceString.toString(), BSON(kPattern << LT << 4))); - ASSERT_EQUALS(3ULL, - _dbDirectClient->count(kNamespaceString.toString(), BSON(kPattern << LT << 10))); - - ASSERT_TRUE(rangeDeleter.cleanupNextRange(operationContext(), 100)); - ASSERT_TRUE(rangeDeleter.cleanupNextRange(operationContext(), 100)); - ASSERT_EQUALS(0ULL, - _dbDirectClient->count(kNamespaceString.toString(), BSON(kPattern << LT << 10))); - - ASSERT_FALSE(rangeDeleter.cleanupNextRange(operationContext(), 1)); -} - -// Tests the case that there is a range to clean, and halfway through a deletion a chunk -// within the range is received -TEST_F(CollectionRangeDeleterTest, MultipleCallstoCleanupNextRangeWithChunkReceive) { - CollectionRangeDeleter rangeDeleter(kNamespaceString); - _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 1)); - _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 2)); - _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 3)); - _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 4)); - const BSONObj insertedDoc5 = BSON(kPattern << 5); // not to be deleted - _dbDirectClient->insert(kNamespaceString.toString(), insertedDoc5); - ASSERT_EQUALS(5ULL, - _dbDirectClient->count(kNamespaceString.toString(), BSON(kPattern << LT << 10))); - - _metadataManager->addRangeToClean(ChunkRange(BSON(kPattern << 0), BSON(kPattern << 10))); - - ASSERT_TRUE(rangeDeleter.cleanupNextRange(operationContext(), 2)); - ASSERT_EQUALS(3ULL, - _dbDirectClient->count(kNamespaceString.toString(), BSON(kPattern << LT << 10))); - - _metadataManager->beginReceive(ChunkRange(BSON(kPattern << 5), BSON(kPattern << 6))); - - // insertedDoc5 is no longer eligible for deletion - - ASSERT_TRUE(rangeDeleter.cleanupNextRange(operationContext(), 2)); - ASSERT_EQUALS(1ULL, - _dbDirectClient->count(kNamespaceString.toString(), BSON(kPattern << LT << 10))); - - ASSERT_TRUE(rangeDeleter.cleanupNextRange(operationContext(), 2)); - ASSERT_EQUALS(1ULL, - _dbDirectClient->count(kNamespaceString.toString(), BSON(kPattern << LT << 10))); - - ASSERT_FALSE(rangeDeleter.cleanupNextRange(operationContext(), 2)); - - ASSERT_BSONOBJ_EQ( - insertedDoc5, - _dbDirectClient->findOne(kNamespaceString.toString(), QUERY(kPattern << GT << 0))); -} - -TEST_F(CollectionRangeDeleterTest, CalltoCleanupNextRangeWithChunkReceive) { - CollectionRangeDeleter rangeDeleter(kNamespaceString); - _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 1)); - _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 2)); - _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 3)); - _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 4)); - ASSERT_EQUALS(4ULL, - _dbDirectClient->count(kNamespaceString.toString(), BSON(kPattern << LT << 10))); - - _metadataManager->addRangeToClean(ChunkRange(BSON(kPattern << 0), BSON(kPattern << 10))); - - ASSERT_TRUE(rangeDeleter.cleanupNextRange(operationContext(), 2)); - ASSERT_EQUALS(2ULL, - _dbDirectClient->count(kNamespaceString.toString(), BSON(kPattern << LT << 10))); - - _metadataManager->beginReceive(ChunkRange(BSON(kPattern << 0), BSON(kPattern << 10))); - - ASSERT_FALSE(rangeDeleter.cleanupNextRange(operationContext(), 2)); - - ASSERT_EQUALS(2ULL, - _dbDirectClient->count(kNamespaceString.toString(), BSON(kPattern << LT << 10))); + rangeDeleter.add(chunkRange1); + rangeDeleter.add(chunkRange2); + + ASSERT_TRUE(next(rangeDeleter, collection, 100)); + ASSERT_EQUALS(0ULL, _dbDirectClient->count(kNss.toString(), BSON(kPattern << LT << 4))); + ASSERT_EQUALS(3ULL, _dbDirectClient->count(kNss.toString(), BSON(kPattern << LT << 10))); + + ASSERT_TRUE(next(rangeDeleter, collection, 100)); + ASSERT_TRUE(next(rangeDeleter, collection, 100)); + ASSERT_TRUE(next(rangeDeleter, collection, 1)); + ASSERT_EQUALS(0ULL, _dbDirectClient->count(kNss.toString(), BSON(kPattern << LT << 10))); + ASSERT_FALSE(next(rangeDeleter, collection, 1)); } } // unnamed namespace - } // namespace mongo diff --git a/src/mongo/db/s/collection_sharding_state.cpp b/src/mongo/db/s/collection_sharding_state.cpp index 24746d7880e..5d362fcde5c 100644 --- a/src/mongo/db/s/collection_sharding_state.cpp +++ b/src/mongo/db/s/collection_sharding_state.cpp @@ -34,6 +34,7 @@ #include "mongo/db/client.h" #include "mongo/db/concurrency/lock_state.h" +#include "mongo/db/db_raii.h" #include "mongo/db/operation_context.h" #include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/db/s/collection_metadata.h" @@ -84,7 +85,8 @@ private: } // unnamed namespace CollectionShardingState::CollectionShardingState(ServiceContext* sc, NamespaceString nss) - : _nss(std::move(nss)), _metadataManager{sc, _nss} {} + : _nss(std::move(nss)), + _metadataManager{sc, _nss, ShardingState::get(sc)->getRangeDeleterTaskExecutor()} {} CollectionShardingState::~CollectionShardingState() { invariant(!_sourceMgr); @@ -119,14 +121,18 @@ void CollectionShardingState::markNotShardedAtStepdown() { _metadataManager.refreshActiveMetadata(nullptr); } -void CollectionShardingState::beginReceive(const ChunkRange& range) { - _metadataManager.beginReceive(range); +bool CollectionShardingState::beginReceive(ChunkRange const& range) { + return _metadataManager.beginReceive(range); } void CollectionShardingState::forgetReceive(const ChunkRange& range) { _metadataManager.forgetReceive(range); } +void CollectionShardingState::cleanUpRange(ChunkRange const& range) { + _metadataManager.cleanUpRange(range); +} + MigrationSourceManager* CollectionShardingState::getMigrationSourceManager() { return _sourceMgr; } @@ -172,6 +178,52 @@ bool CollectionShardingState::collectionIsSharded() { return true; } +bool CollectionShardingState::cleanUpNextRange(OperationContext* opCtx, + Collection* collection, + int maxToDelete) { + auto& keyPattern = getMetadata()->getKeyPattern(); + return _metadataManager.cleanUpNextRange(opCtx, collection, keyPattern, maxToDelete); +} + +// Call with collection unlocked. Note that the CollectionShardingState object involved might not +// exist anymore at the time of the call, or indeed anytime outside the AutoGetCollection block, so +// anything that might alias something in it must be copied first. + +Status CollectionShardingState::waitForClean(OperationContext* opCtx, + NamespaceString nss, + ChunkRange orphanRange) { + do { + auto stillScheduled = CollectionShardingState::CleanupNotification(nullptr); + { + AutoGetCollection autoColl(opCtx, nss, MODE_IX); + // First, see if collection was dropped. + auto css = CollectionShardingState::get(opCtx, nss); + auto metadata = css->getMetadata(); + if (!metadata) { + throw DBException("Collection being migrated was dropped", + ErrorCodes::StaleShardVersion); + } + + stillScheduled = css->_metadataManager.trackCleanup(orphanRange); + if (stillScheduled == nullptr) { + return Status::OK(); // done! + } + } // drop collection lock + + log() << "Waiting to delete " << nss.ns() << " range " << redact(orphanRange.toString()); + Status result = stillScheduled->get(opCtx); + if (!result.isOK()) { + return {result.code(), + str::stream() << "Failed to delete orphaned collection " << nss.ns() + << " range " + << orphanRange.toString() + << ": " + << result.reason()}; + } + } while (true); + MONGO_UNREACHABLE; +} + bool CollectionShardingState::isDocumentInMigratingChunk(OperationContext* opCtx, const BSONObj& doc) { dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX)); @@ -317,8 +369,7 @@ bool CollectionShardingState::_checkShardVersionOk(OperationContext* opCtx, if (!info) { // There is no shard version information on either 'opCtx' or 'client'. This means that // the operation represented by 'opCtx' is unversioned, and the shard version is always - // OK - // for unversioned operations. + // OK for unversioned operations. return true; } diff --git a/src/mongo/db/s/collection_sharding_state.h b/src/mongo/db/s/collection_sharding_state.h index 5bbc2b9c576..af27a07cd14 100644 --- a/src/mongo/db/s/collection_sharding_state.h +++ b/src/mongo/db/s/collection_sharding_state.h @@ -34,7 +34,9 @@ #include "mongo/base/disallow_copying.h" #include "mongo/base/string_data.h" #include "mongo/db/namespace_string.h" +#include "mongo/db/s/collection_range_deleter.h" #include "mongo/db/s/metadata_manager.h" +#include "mongo/util/concurrency/notification.h" namespace mongo { @@ -90,6 +92,27 @@ public: ScopedCollectionMetadata getMetadata(); /** + * Returns ranges being migrated in. + */ + RangeMap const& getReceiveMap() const { + return _metadataManager.getReceiveMap(); + } + + /** + * Returns true if the specified key is in a range being received. + */ + bool keyIsPending(const BSONObj& key) const { + return _metadataManager.keyIsPending(key); + } + + /** + * BSON output of the pending metadata into a BSONArray + */ + void toBSONPending(BSONArrayBuilder& bb) const { + _metadataManager.toBSONPending(bb); + } + + /** * Updates the metadata based on changes received from the config server and also resolves the * pending receives map in case some of these pending receives have completed or have been * abandoned. If newMetadata is null, unshard the collection. @@ -105,20 +128,24 @@ public: void markNotShardedAtStepdown(); /** - * Modifies the collection's sharding state to indicate that it is beginning to receive the - * given ChunkRange. + * Schedules any documents in the range for immediate cleanup iff no running queries can depend + * on them, and adds the range to the list of pending ranges. Otherwise, returns false. */ - void beginReceive(const ChunkRange& range); + bool beginReceive(ChunkRange const& range); /* - * Modifies the collection's sharding state to indicate that the previous pending migration - * failed. If the range was not previously pending, this function will crash the server. - * - * This function is the mirror image of beginReceive. + * Removes the range from the list of pending ranges, and schedules any documents in the range + * for immediate cleanup. */ void forgetReceive(const ChunkRange& range); /** + * Schedules documents in the range for cleanup after any running queries that may depend on + * them have terminated. + */ + void cleanUpRange(ChunkRange const& range); + + /** * Returns the active migration source manager, if one is available. */ MigrationSourceManager* getMigrationSourceManager(); @@ -154,6 +181,28 @@ public: */ bool collectionIsSharded(); + /** + * Tracks deletion of any documents within the range, returning when deletion is complete. + * Throws if the collection is dropped while it sleeps. Call this with the collection unlocked. + */ + static Status waitForClean(OperationContext*, NamespaceString, ChunkRange); + + using CleanupNotification = MetadataManager::CleanupNotification; + /** + * Reports whether any part of the argument range is still scheduled for deletion. If not, + * returns nullptr. Otherwise, returns a notification n such that n->get(opCtx) will wake when + * deletion of a range (possibly the one of interest) is completed. This should be called + * again after each wakeup until it returns nullptr, because there can be more than one range + * scheduled for deletion that overlaps its argument. + */ + CleanupNotification trackCleanup(ChunkRange const& range); + + /** + * Called from the range deletion executor, immediately deletes a the next scheduled range of + * documents. Returns true if it should be scheduled to be called again. + */ + bool cleanUpNextRange(OperationContext* opCtx, Collection* collection, int maxToDelete); + // Replication subsystem hooks. If this collection is serving as a source for migration, these // methods inform it of any changes to its contents. @@ -167,11 +216,6 @@ public: void onDropCollection(OperationContext* opCtx, const NamespaceString& collectionName); - MetadataManager* getMetadataManagerForTest() { - return &_metadataManager; - } - - private: /** * Checks whether the shard version of the operation matches that of the collection. @@ -191,7 +235,7 @@ private: ChunkVersion* expectedShardVersion, ChunkVersion* actualShardVersion); - // Namespace to which this state belongs. + // Namespace this state belongs to. const NamespaceString _nss; // Contains all the metadata associated with this collection. @@ -203,8 +247,6 @@ private: // // NOTE: The value is not owned by this class. MigrationSourceManager* _sourceMgr{nullptr}; - - friend class CollectionRangeDeleter; }; } // namespace mongo diff --git a/src/mongo/db/s/get_shard_version_command.cpp b/src/mongo/db/s/get_shard_version_command.cpp index 86796a4ef50..3fafe5db13d 100644 --- a/src/mongo/db/s/get_shard_version_command.cpp +++ b/src/mongo/db/s/get_shard_version_command.cpp @@ -131,9 +131,11 @@ public: metadata->toBSONChunks(chunksArr); chunksArr.doneFast(); - BSONArrayBuilder pendingArr(metadataBuilder.subarrayStart("pending")); - metadata->toBSONPending(pendingArr); - pendingArr.doneFast(); + if (css) { + BSONArrayBuilder pendingArr(metadataBuilder.subarrayStart("pending")); + css->toBSONPending(pendingArr); + pendingArr.doneFast(); + } } metadataBuilder.doneFast(); } diff --git a/src/mongo/db/s/merge_chunks_command.cpp b/src/mongo/db/s/merge_chunks_command.cpp index 98975244838..f03e7948374 100644 --- a/src/mongo/db/s/merge_chunks_command.cpp +++ b/src/mongo/db/s/merge_chunks_command.cpp @@ -133,7 +133,7 @@ Status mergeChunks(OperationContext* opCtx, AutoGetCollection autoColl(opCtx, nss, MODE_IS); metadata = CollectionShardingState::get(opCtx, nss.ns())->getMetadata(); - if (!metadata) { + if (!metadata || metadata->getKeyPattern().isEmpty()) { std::string errmsg = stream() << "could not merge chunks, collection " << nss.ns() << " is not sharded"; diff --git a/src/mongo/db/s/metadata_manager.cpp b/src/mongo/db/s/metadata_manager.cpp index faa062e9476..c04cbf0dab6 100644 --- a/src/mongo/db/s/metadata_manager.cpp +++ b/src/mongo/db/s/metadata_manager.cpp @@ -33,27 +33,38 @@ #include "mongo/db/s/metadata_manager.h" #include "mongo/bson/simple_bsonobj_comparator.h" +#include "mongo/bson/util/builder.h" +#include "mongo/db/db_raii.h" +#include "mongo/db/query/internal_plans.h" #include "mongo/db/range_arithmetic.h" #include "mongo/db/s/collection_range_deleter.h" +#include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/s/sharding_state.h" #include "mongo/stdx/memory.h" +#include "mongo/util/assert_util.h" #include "mongo/util/log.h" namespace mongo { -using CallbackArgs = executor::TaskExecutor::CallbackArgs; +using TaskExecutor = executor::TaskExecutor; +using CallbackArgs = TaskExecutor::CallbackArgs; -MetadataManager::MetadataManager(ServiceContext* sc, NamespaceString nss) +MetadataManager::MetadataManager(ServiceContext* sc, NamespaceString nss, TaskExecutor* executor) : _nss(std::move(nss)), _serviceContext(sc), _activeMetadataTracker(stdx::make_unique<CollectionMetadataTracker>(nullptr)), _receivingChunks(SimpleBSONObjComparator::kInstance.makeBSONObjIndexedMap<CachedChunkInfo>()), - _rangesToClean( - SimpleBSONObjComparator::kInstance.makeBSONObjIndexedMap<RangeToCleanDescriptor>()) {} + _notification(std::make_shared<Notification<Status>>()), + _executor(executor), + _rangesToClean() {} MetadataManager::~MetadataManager() { stdx::lock_guard<stdx::mutex> scopedLock(_managerLock); - invariant(!_activeMetadataTracker || _activeMetadataTracker->usageCounter == 0); + // wake everybody up to see us die + if (!*_notification) { + _notification->set(Status::OK()); + } + _rangesToClean.clear(); } ScopedCollectionMetadata MetadataManager::getActiveMetadata() { @@ -61,10 +72,14 @@ ScopedCollectionMetadata MetadataManager::getActiveMetadata() { if (!_activeMetadataTracker) { return ScopedCollectionMetadata(); } - return ScopedCollectionMetadata(this, _activeMetadataTracker.get()); } +size_t MetadataManager::numberOfMetadataSnapshots() { + stdx::lock_guard<stdx::mutex> scopedLock(_managerLock); + return _metadataInUse.size(); +} + void MetadataManager::refreshActiveMetadata(std::unique_ptr<CollectionMetadata> remoteMetadata) { stdx::lock_guard<stdx::mutex> scopedLock(_managerLock); @@ -73,7 +88,7 @@ void MetadataManager::refreshActiveMetadata(std::unique_ptr<CollectionMetadata> // collection sharding information regardless of whether the node is sharded or not. if (!remoteMetadata && !_activeMetadataTracker->metadata) { invariant(_receivingChunks.empty()); - invariant(_rangesToClean.empty()); + invariant(_rangesToClean.isEmpty()); return; } @@ -99,22 +114,7 @@ void MetadataManager::refreshActiveMetadata(std::unique_ptr<CollectionMetadata> << remoteMetadata->toStringBasic(); invariant(_receivingChunks.empty()); - invariant(_rangesToClean.empty()); - - _setActiveMetadata_inlock(std::move(remoteMetadata)); - return; - } - - // If the metadata being installed has a different epoch from ours, this means the collection - // was dropped and recreated, so we must entirely reset the metadata state - if (_activeMetadataTracker->metadata->getCollVersion().epoch() != - remoteMetadata->getCollVersion().epoch()) { - log() << "Overwriting metadata for collection " << _nss.ns() << " from " - << _activeMetadataTracker->metadata->toStringBasic() << " to " - << remoteMetadata->toStringBasic() << " due to epoch change"; - - _receivingChunks.clear(); - _rangesToClean.clear(); + invariant(_rangesToClean.isEmpty()); _setActiveMetadata_inlock(std::move(remoteMetadata)); return; @@ -132,150 +132,66 @@ void MetadataManager::refreshActiveMetadata(std::unique_ptr<CollectionMetadata> << _activeMetadataTracker->metadata->toStringBasic() << " to " << remoteMetadata->toStringBasic(); - // Resolve any receiving chunks, which might have completed by now + // Resolve any receiving chunks, which might have completed by now. + // Should be no more than one. for (auto it = _receivingChunks.begin(); it != _receivingChunks.end();) { - const BSONObj min = it->first; - const BSONObj max = it->second.getMaxKey(); - - // Our pending range overlaps at least one chunk - if (rangeMapContains(remoteMetadata->getChunks(), min, max)) { - // The remote metadata contains a chunk we were earlier in the process of receiving, so - // we deem it successfully received. - LOG(2) << "Verified chunk " << redact(ChunkRange(min, max).toString()) - << " for collection " << _nss.ns() << " has been migrated to this shard earlier"; + BSONObj const& min = it->first; + BSONObj const& max = it->second.getMaxKey(); - _receivingChunks.erase(it++); - continue; - } else if (!rangeMapOverlaps(remoteMetadata->getChunks(), min, max)) { + if (!remoteMetadata->rangeOverlapsChunk(ChunkRange(min, max))) { ++it; continue; } + // The remote metadata contains a chunk we were earlier in the process of receiving, so + // we deem it successfully received. + LOG(2) << "Verified chunk " << redact(ChunkRange(min, max).toString()) << " for collection " + << _nss.ns() << " has been migrated to this shard earlier"; - // Partial overlap indicates that the earlier migration has failed, but the chunk being - // migrated underwent some splits and other migrations and ended up here again. In this - // case, we will request full reload of the metadata. Currently this cannot happen, because - // all migrations are with the explicit knowledge of the recipient shard. However, we leave - // the option open so that chunk splits can do empty chunk move without having to notify the - // recipient. - RangeVector overlappedChunks; - getRangeMapOverlap(remoteMetadata->getChunks(), min, max, &overlappedChunks); - - for (const auto& overlapChunkMin : overlappedChunks) { - auto itRecv = _receivingChunks.find(overlapChunkMin.first); - invariant(itRecv != _receivingChunks.end()); - - const ChunkRange receivingRange(itRecv->first, itRecv->second.getMaxKey()); - - _receivingChunks.erase(itRecv); - - // Make sure any potentially partially copied chunks are scheduled to be cleaned up - _addRangeToClean_inlock(receivingRange); - } - - // Need to reset the iterator + _receivingChunks.erase(it); it = _receivingChunks.begin(); } - // For compatibility with the current range deleter, which is driven entirely by the contents of - // the CollectionMetadata update the pending chunks - for (const auto& receivingChunk : _receivingChunks) { - ChunkType chunk; - chunk.setMin(receivingChunk.first); - chunk.setMax(receivingChunk.second.getMaxKey()); - remoteMetadata = remoteMetadata->clonePlusPending(chunk); - } - _setActiveMetadata_inlock(std::move(remoteMetadata)); } -void MetadataManager::beginReceive(const ChunkRange& range) { - stdx::lock_guard<stdx::mutex> scopedLock(_managerLock); - - // Collection is not known to be sharded if the active metadata tracker is null - invariant(_activeMetadataTracker); - - // If range is contained within pending chunks, this means a previous migration must have failed - // and we need to clean all overlaps - RangeVector overlappedChunks; - getRangeMapOverlap(_receivingChunks, range.getMin(), range.getMax(), &overlappedChunks); - - for (const auto& overlapChunkMin : overlappedChunks) { - auto itRecv = _receivingChunks.find(overlapChunkMin.first); - invariant(itRecv != _receivingChunks.end()); - - const ChunkRange receivingRange(itRecv->first, itRecv->second.getMaxKey()); - - _receivingChunks.erase(itRecv); - - // Make sure any potentially partially copied chunks are scheduled to be cleaned up - _addRangeToClean_inlock(receivingRange); - } - - // Need to ensure that the background range deleter task won't delete the range we are about to - // receive - _removeRangeToClean_inlock(range, Status::OK()); - _receivingChunks.insert( - std::make_pair(range.getMin().getOwned(), - CachedChunkInfo(range.getMax().getOwned(), ChunkVersion::IGNORED()))); - - // For compatibility with the current range deleter, update the pending chunks on the collection - // metadata to include the chunk being received - ChunkType chunk; - chunk.setMin(range.getMin()); - chunk.setMax(range.getMax()); - _setActiveMetadata_inlock(_activeMetadataTracker->metadata->clonePlusPending(chunk)); -} - -void MetadataManager::forgetReceive(const ChunkRange& range) { - stdx::lock_guard<stdx::mutex> scopedLock(_managerLock); - - { - auto it = _receivingChunks.find(range.getMin()); - invariant(it != _receivingChunks.end()); - - // Verify entire ChunkRange is identical, not just the min key. - invariant( - SimpleBSONObjComparator::kInstance.evaluate(it->second.getMaxKey() == range.getMax())); - - _receivingChunks.erase(it); - } - - // This is potentially a partially received data, which needs to be cleaned up - _addRangeToClean_inlock(range); - - // For compatibility with the current range deleter, update the pending chunks on the collection - // metadata to exclude the chunk being received, which was added in beginReceive - ChunkType chunk; - chunk.setMin(range.getMin()); - chunk.setMax(range.getMax()); - _setActiveMetadata_inlock(_activeMetadataTracker->metadata->cloneMinusPending(chunk)); -} - -RangeMap MetadataManager::getCopyOfReceivingChunks() { - stdx::lock_guard<stdx::mutex> scopedLock(_managerLock); - return _receivingChunks; -} - void MetadataManager::_setActiveMetadata_inlock(std::unique_ptr<CollectionMetadata> newMetadata) { - if (_activeMetadataTracker->usageCounter > 0) { - _metadataInUse.push_front(std::move(_activeMetadataTracker)); + if (_activeMetadataTracker->usageCounter != 0 || !_metadataInUse.empty()) { + _metadataInUse.push_back(std::move(_activeMetadataTracker)); } - _activeMetadataTracker = stdx::make_unique<CollectionMetadataTracker>(std::move(newMetadata)); } -void MetadataManager::_removeMetadata_inlock(CollectionMetadataTracker* metadataTracker) { - invariant(metadataTracker->usageCounter == 0); - - auto i = _metadataInUse.begin(); - const auto e = _metadataInUse.end(); - while (i != e) { - if (metadataTracker == i->get()) { - _metadataInUse.erase(i); - return; +// this is called only from ScopedCollectionMetadata members, unlocked. +void MetadataManager::_decrementTrackerUsage(ScopedCollectionMetadata const& scoped) { + if (scoped._tracker != nullptr) { + stdx::lock_guard<stdx::mutex> lock(_managerLock); + + invariant(scoped._tracker->usageCounter != 0); + if (--scoped._tracker->usageCounter == 0) { + + // We don't care which usageCounter went to zero. We just expire all that are older + // than the oldest tracker still in use by queries. (Some start out at zero, some go to + // zero but can't be expired yet.) + + bool notify = false; + while (!_metadataInUse.empty() && _metadataInUse.front()->usageCounter == 0) { + auto& tracker = _metadataInUse.front(); + if (tracker->orphans) { + notify = true; + _pushRangeToClean(*tracker->orphans); + } + _metadataInUse.pop_front(); // Discard the tracker and its metadata. + } + if (_metadataInUse.empty() && _activeMetadataTracker->usageCounter == 0 && + _activeMetadataTracker->orphans) { + _pushRangeToClean(*_activeMetadataTracker->orphans); + _activeMetadataTracker->orphans = boost::none; + notify = true; + } + if (notify) { + _notifyInUse(); // wake up waitForClean because we changed inUse + } } - - ++i; } } @@ -285,17 +201,18 @@ MetadataManager::CollectionMetadataTracker::CollectionMetadataTracker( ScopedCollectionMetadata::ScopedCollectionMetadata() = default; -// called in lock +// called locked ScopedCollectionMetadata::ScopedCollectionMetadata( MetadataManager* manager, MetadataManager::CollectionMetadataTracker* tracker) : _manager(manager), _tracker(tracker) { - _tracker->usageCounter++; + ++_tracker->usageCounter; } +// do not call locked ScopedCollectionMetadata::~ScopedCollectionMetadata() { - if (!_tracker) - return; - _decrementUsageCounter(); + if (_manager) { + _manager->_decrementTrackerUsage(*this); + } } CollectionMetadata* ScopedCollectionMetadata::operator->() const { @@ -306,126 +223,44 @@ CollectionMetadata* ScopedCollectionMetadata::getMetadata() const { return _tracker->metadata.get(); } +// ScopedCollectionMetadata members + +// do not call locked ScopedCollectionMetadata::ScopedCollectionMetadata(ScopedCollectionMetadata&& other) { - *this = std::move(other); + *this = std::move(other); // Rely on _tracker being zero-initialized already. } +// do not call locked ScopedCollectionMetadata& ScopedCollectionMetadata::operator=(ScopedCollectionMetadata&& other) { if (this != &other) { - // If "this" was previously initialized, make sure we perform the same logic as in the - // destructor to decrement _tracker->usageCounter for the CollectionMetadata "this" had a - // reference to before replacing _tracker with other._tracker. - if (_tracker) { - _decrementUsageCounter(); + if (_manager) { + _manager->_decrementTrackerUsage(*this); } - _manager = other._manager; _tracker = other._tracker; other._manager = nullptr; other._tracker = nullptr; } - return *this; } -void ScopedCollectionMetadata::_decrementUsageCounter() { - invariant(_manager); - invariant(_tracker); - stdx::lock_guard<stdx::mutex> scopedLock(_manager->_managerLock); - invariant(_tracker->usageCounter > 0); - if (--_tracker->usageCounter == 0) { - _manager->_removeMetadata_inlock(_tracker); - } -} - ScopedCollectionMetadata::operator bool() const { return _tracker && _tracker->metadata.get(); } -RangeMap MetadataManager::getCopyOfRangesToClean() { - stdx::lock_guard<stdx::mutex> scopedLock(_managerLock); - return _getCopyOfRangesToClean_inlock(); -} - -RangeMap MetadataManager::_getCopyOfRangesToClean_inlock() { - RangeMap ranges = SimpleBSONObjComparator::kInstance.makeBSONObjIndexedMap<CachedChunkInfo>(); - for (auto it = _rangesToClean.begin(); it != _rangesToClean.end(); ++it) { - ranges.insert(std::make_pair( - it->first, CachedChunkInfo(it->second.getMax(), ChunkVersion::IGNORED()))); - } - return ranges; -} - -std::shared_ptr<Notification<Status>> MetadataManager::addRangeToClean(const ChunkRange& range) { - stdx::lock_guard<stdx::mutex> scopedLock(_managerLock); - return _addRangeToClean_inlock(range); -} - -std::shared_ptr<Notification<Status>> MetadataManager::_addRangeToClean_inlock( - const ChunkRange& range) { - // This first invariant currently makes an unnecessary copy, to reuse the - // rangeMapOverlaps helper function. - invariant(!rangeMapOverlaps(_getCopyOfRangesToClean_inlock(), range.getMin(), range.getMax())); - invariant(!rangeMapOverlaps(_receivingChunks, range.getMin(), range.getMax())); - - RangeToCleanDescriptor descriptor(range.getMax().getOwned()); - _rangesToClean.insert(std::make_pair(range.getMin().getOwned(), descriptor)); - - // If _rangesToClean was previously empty, we need to start the collection range deleter - if (_rangesToClean.size() == 1UL) { - ShardingState::get(_serviceContext)->scheduleCleanup(_nss); - } - - return descriptor.getNotification(); -} - -void MetadataManager::removeRangeToClean(const ChunkRange& range, Status deletionStatus) { - stdx::lock_guard<stdx::mutex> scopedLock(_managerLock); - _removeRangeToClean_inlock(range, deletionStatus); -} - -void MetadataManager::_removeRangeToClean_inlock(const ChunkRange& range, Status deletionStatus) { - auto it = _rangesToClean.upper_bound(range.getMin()); - // We want our iterator to point at the greatest value - // that is still less than or equal to range. - if (it != _rangesToClean.begin()) { - --it; - } - - for (; it != _rangesToClean.end() && - SimpleBSONObjComparator::kInstance.evaluate(it->first < range.getMax());) { - if (SimpleBSONObjComparator::kInstance.evaluate(it->second.getMax() <= range.getMin())) { - ++it; - continue; - } - - // There's overlap between *it and range so we remove *it - // and then replace with new ranges. - BSONObj oldMin = it->first; - BSONObj oldMax = it->second.getMax(); - it->second.complete(deletionStatus); - _rangesToClean.erase(it++); - if (SimpleBSONObjComparator::kInstance.evaluate(oldMin < range.getMin())) { - _addRangeToClean_inlock(ChunkRange(oldMin, range.getMin())); - } - - if (SimpleBSONObjComparator::kInstance.evaluate(oldMax > range.getMax())) { - _addRangeToClean_inlock(ChunkRange(range.getMax(), oldMax)); - } +void MetadataManager::toBSONPending(BSONArrayBuilder& bb) const { + for (auto it = _receivingChunks.begin(); it != _receivingChunks.end(); ++it) { + BSONArrayBuilder pendingBB(bb.subarrayStart()); + pendingBB.append(it->first); + pendingBB.append(it->second.getMaxKey()); + pendingBB.done(); } } void MetadataManager::append(BSONObjBuilder* builder) { stdx::lock_guard<stdx::mutex> scopedLock(_managerLock); - BSONArrayBuilder rtcArr(builder->subarrayStart("rangesToClean")); - for (const auto& entry : _rangesToClean) { - BSONObjBuilder obj; - ChunkRange r = ChunkRange(entry.first, entry.second.getMax()); - r.append(&obj); - rtcArr.append(obj.done()); - } - rtcArr.done(); + _rangesToClean.append(builder); BSONArrayBuilder pcArr(builder->subarrayStart("pendingChunks")); for (const auto& entry : _receivingChunks) { @@ -436,6 +271,7 @@ void MetadataManager::append(BSONObjBuilder* builder) { } pcArr.done(); + BSONArrayBuilder amrArr(builder->subarrayStart("activeMetadataRanges")); for (const auto& entry : _activeMetadataTracker->metadata->getChunks()) { BSONObjBuilder obj; @@ -446,23 +282,158 @@ void MetadataManager::append(BSONObjBuilder* builder) { amrArr.done(); } -bool MetadataManager::hasRangesToClean() { - stdx::lock_guard<stdx::mutex> scopedLock(_managerLock); - return !_rangesToClean.empty(); +void MetadataManager::_scheduleCleanup(executor::TaskExecutor* executor, NamespaceString nss) { + executor->scheduleWork([executor, nss](auto&) { + const int maxToDelete = std::max(int(internalQueryExecYieldIterations.load()), 1); + Client::initThreadIfNotAlready("Collection Range Deleter"); + auto UniqueOpCtx = Client::getCurrent()->makeOperationContext(); + auto opCtx = UniqueOpCtx.get(); + bool again = false; + { + AutoGetCollection autoColl(opCtx, nss, MODE_IX); + auto* collection = autoColl.getCollection(); + if (!collection) { + return; // collection was dropped + } + auto* css = CollectionShardingState::get(opCtx, nss); + again = css->cleanUpNextRange(opCtx, collection, maxToDelete); + } + if (again) { + _scheduleCleanup(executor, nss); + } + }); +} + +// the call to css->cleanUpNextRange, above, ends up back here, just to pass along. +bool MetadataManager::cleanUpNextRange(OperationContext* opCtx, + Collection* collection, + BSONObj const& keyPattern, + int maxToDelete) { + return _rangesToClean.cleanUpNextRange( + opCtx, collection, keyPattern, &_managerLock, maxToDelete); } -bool MetadataManager::isInRangesToClean(const ChunkRange& range) { +// call locked +void MetadataManager::_pushRangeToClean(ChunkRange const& range) { + _rangesToClean.add(range); + if (_rangesToClean.size() == 1) { + _scheduleCleanup(_executor, _nss); + } +} + +void MetadataManager::_addToReceiving(ChunkRange const& range) { + _receivingChunks.insert( + std::make_pair(range.getMin().getOwned(), + CachedChunkInfo(range.getMax().getOwned(), ChunkVersion::IGNORED()))); +} + +bool MetadataManager::beginReceive(ChunkRange const& range) { + stdx::unique_lock<stdx::mutex> scopedLock(_managerLock); + + invariant(bool(_activeMetadataTracker)); + CollectionMetadataTracker* tracker = _activeMetadataTracker.get(); + if (_overlapsInUseChunk(range) || + (tracker->usageCounter != 0 && tracker->metadata->rangeOverlapsChunk(range))) { + // a potentially long-running query might depend on documents in the range, give up. + return false; + } + _addToReceiving(range); + _pushRangeToClean(range); + return true; +} + +void MetadataManager::_removeFromReceiving(ChunkRange const& range) { + auto it = _receivingChunks.find(range.getMin()); + invariant(it != _receivingChunks.end()); + _receivingChunks.erase(it); +} + +void MetadataManager::forgetReceive(const ChunkRange& range) { stdx::lock_guard<stdx::mutex> scopedLock(_managerLock); - // For convenience, this line makes an unnecessary copy, to reuse the - // rangeMapContains helper function. - return rangeMapContains(_getCopyOfRangesToClean_inlock(), range.getMin(), range.getMax()); + // This is potentially a partially received chunk, which needs to be cleaned up. We know none + // of these documents are in use, so they can go straight to the deletion queue. + _removeFromReceiving(range); + _pushRangeToClean(range); +} + +void MetadataManager::cleanUpRange(ChunkRange const& range) { + stdx::unique_lock<stdx::mutex> scopedLock(_managerLock); + invariant(bool(_activeMetadataTracker)); + CollectionMetadataTracker* tracker = _activeMetadataTracker.get(); + + if ((tracker->usageCounter == 0 || !tracker->metadata->rangeOverlapsChunk(range)) && + !_overlapsInUseChunk(range)) { + // No running queries can depend on it, so queue it for deletion immediately. + _pushRangeToClean(range); + } else { + invariant(!tracker->orphans); + tracker->orphans.emplace(range.getMin().getOwned(), range.getMax().getOwned()); + } +} + +bool MetadataManager::keyIsPending(const BSONObj& key) const { + if (_receivingChunks.empty()) { + return false; + } + auto it = _receivingChunks.upper_bound(key); + if (it != _receivingChunks.begin()) + it--; + return rangeContains(it->first, it->second.getMaxKey(), key); } -ChunkRange MetadataManager::getNextRangeToClean() { + +size_t MetadataManager::numberOfRangesToCleanStillInUse() { stdx::lock_guard<stdx::mutex> scopedLock(_managerLock); - invariant(!_rangesToClean.empty()); - auto it = _rangesToClean.begin(); - return ChunkRange(it->first, it->second.getMax()); + size_t count = _activeMetadataTracker->orphans ? 1 : 0; + count += std::count_if(_metadataInUse.begin(), _metadataInUse.end(), [](auto& tracker) { + return bool(tracker->orphans); + }); + return count; +} + +size_t MetadataManager::numberOfRangesToClean() { + stdx::unique_lock<stdx::mutex> scopedLock(_managerLock); + return _rangesToClean.size(); +} + +MetadataManager::CleanupNotification MetadataManager::trackCleanup(ChunkRange const& range) { + stdx::unique_lock<stdx::mutex> scopedLock(_managerLock); + if (_overlapsInUseCleanups(range)) + return _notification; + return _rangesToClean.overlaps(range); +} + +// call locked +bool MetadataManager::_overlapsInUseChunk(ChunkRange const& range) { + if (_activeMetadataTracker->usageCounter != 0 && + _activeMetadataTracker->metadata->rangeOverlapsChunk(range)) { + return true; + } + for (auto& tracker : _metadataInUse) { + if (tracker->usageCounter != 0 && tracker->metadata->rangeOverlapsChunk(range)) { + return true; + } + } + return false; +} + +// call locked +bool MetadataManager::_overlapsInUseCleanups(ChunkRange const& range) { + if (_activeMetadataTracker->orphans && _activeMetadataTracker->orphans->overlapWith(range)) { + return true; + } + for (auto& tracker : _metadataInUse) { + if (tracker->orphans && bool(tracker->orphans->overlapWith(range))) { + return true; + } + } + return false; +} + +// call locked +void MetadataManager::_notifyInUse() { + _notification->set(Status::OK()); // wake up waitForClean + _notification = std::make_shared<Notification<Status>>(); } } // namespace mongo diff --git a/src/mongo/db/s/metadata_manager.h b/src/mongo/db/s/metadata_manager.h index 5c2e7f2e64a..2311433aa49 100644 --- a/src/mongo/db/s/metadata_manager.h +++ b/src/mongo/db/s/metadata_manager.h @@ -28,19 +28,21 @@ #pragma once -#include <list> -#include <memory> - #include "mongo/base/disallow_copying.h" #include "mongo/bson/simple_bsonobj_comparator.h" #include "mongo/db/namespace_string.h" +#include "mongo/db/range_arithmetic.h" #include "mongo/db/s/collection_metadata.h" +#include "mongo/db/s/collection_range_deleter.h" #include "mongo/db/service_context.h" +#include "mongo/executor/task_executor.h" #include "mongo/s/catalog/type_chunk.h" #include "mongo/util/concurrency/notification.h" #include "mongo/stdx/memory.h" +#include <list> + namespace mongo { class ScopedCollectionMetadata; @@ -49,7 +51,7 @@ class MetadataManager { MONGO_DISALLOW_COPYING(MetadataManager); public: - MetadataManager(ServiceContext* sc, NamespaceString nss); + MetadataManager(ServiceContext*, NamespaceString nss, executor::TaskExecutor* rangeDeleter); ~MetadataManager(); /** @@ -62,144 +64,170 @@ public: ScopedCollectionMetadata getActiveMetadata(); /** - * Uses the contents of the specified metadata as a way to purge any pending chunks. + * Returns the number of CollectionMetadata objects being maintained on behalf of running + * queries. The actual number may vary after it returns, so this is really only useful for unit + * tests. */ - void refreshActiveMetadata(std::unique_ptr<CollectionMetadata> newMetadata); + size_t numberOfMetadataSnapshots(); /** - * Puts the specified range on the list of chunks, which are being received so that the range - * deleter process will not clean the partially migrated data. + * Uses the contents of the specified metadata as a way to purge any pending chunks. */ - void beginReceive(const ChunkRange& range); + void refreshActiveMetadata(std::unique_ptr<CollectionMetadata> newMetadata); + + void toBSONPending(BSONArrayBuilder& bb) const; /** - * Removes a range from the list of chunks, which are being received. Used externally to - * indicate that a chunk migration failed. + * Appends information on all the chunk ranges in rangesToClean to builder. */ - void forgetReceive(const ChunkRange& range); + void append(BSONObjBuilder* builder); /** - * Gets copy of the set of chunk ranges which are being received for this collection. This - * method is intended for testing purposes only and should not be used in any production code. + * Returns a map to the set of chunks being migrated in. */ - RangeMap getCopyOfReceivingChunks(); + RangeMap const& getReceiveMap() const { + return _receivingChunks; + } /** - * Adds a new range to be cleaned up. - * The newly introduced range must not overlap with the existing ranges. - */ - std::shared_ptr<Notification<Status>> addRangeToClean(const ChunkRange& range); + * If no running queries can depend on documents in the range, schedules any such documents for + * immediate cleanup. Otherwise, returns false. + */ + bool beginReceive(ChunkRange const& range); /** - * Calls removeRangeToClean with Status::OK. + * Removes the range from the pending list, and schedules any documents in the range for + * immediate cleanup. Assumes no active queries can see any local documents in the range. */ - void removeRangeToClean(const ChunkRange& range) { - removeRangeToClean(range, Status::OK()); - } + void forgetReceive(const ChunkRange& range); /** - * Removes the specified range from the ranges to be cleaned up. - * The specified deletionStatus will be returned to callers waiting - * on whether the deletion succeeded or failed. + * Initiates cleanup of the orphaned documents as if a chunk has been migrated out. If any + * documents in the range might still be in use by running queries, queues cleanup to begin + * after they have all terminated. Otherwise, schedules documents for immediate cleanup. + * + * Must be called with the collection locked for writing. To monitor completion, use + * trackCleanup or CollectionShardingState::waitForClean. */ - void removeRangeToClean(const ChunkRange& range, Status deletionStatus); + void cleanUpRange(ChunkRange const& range); /** - * Gets copy of the set of chunk ranges which are scheduled for cleanup. - * Converts RangeToCleanMap to RangeMap. + * Returns true if the specified key is in a range being received. */ - RangeMap getCopyOfRangesToClean(); + bool keyIsPending(const BSONObj& key) const; /** - * Appends information on all the chunk ranges in rangesToClean to builder. + * Returns the number of ranges scheduled to be cleaned, exclusive of such ranges that might + * still be in use by running queries. Outside of test drivers, the actual number may vary + * after it returns, so this is really only useful for unit tests. */ - void append(BSONObjBuilder* builder); + size_t numberOfRangesToClean(); /** - * Returns true if _rangesToClean is not empty. + * Returns the number of ranges scheduled to be cleaned once all queries that could depend on + * them have terminated. The actual number may vary after it returns, so this is really only + * useful for unit tests. */ - bool hasRangesToClean(); + size_t numberOfRangesToCleanStillInUse(); + using CleanupNotification = CollectionRangeDeleter::DeleteNotification; /** - * Returns true if the exact range is in _rangesToClean. + * Reports whether the argument range is still scheduled for deletion. If not, returns nullptr. + * Otherwise, returns a notification n such that n->get(opCtx) will wake when deletion of a + * range (possibly the one of interest) is completed. */ - bool isInRangesToClean(const ChunkRange& range); + CleanupNotification trackCleanup(ChunkRange const& orphans); /** - * Gets and returns, but does not remove, a single ChunkRange from _rangesToClean. - * Should not be called if _rangesToClean is empty: it will hit an invariant. + * Called for the range deletion executor, actually deletes a scheduled range of documents + * immediately. Returns true if it should be scheduled to be called again. */ - ChunkRange getNextRangeToClean(); + bool cleanUpNextRange(OperationContext* opCtx, + Collection* collection, + BSONObj const& keyPattern, + int maxToDelete); private: - friend class ScopedCollectionMetadata; - struct CollectionMetadataTracker { - public: /** - * Creates a new CollectionMetadataTracker, with the usageCounter initialized to zero. + * Creates a new CollectionMetadataTracker with the usageCounter initialized to zero. */ CollectionMetadataTracker(std::unique_ptr<CollectionMetadata> m); + private: std::unique_ptr<CollectionMetadata> metadata; - uint32_t usageCounter{0}; + boost::optional<ChunkRange> orphans{boost::none}; + + friend class ScopedCollectionMetadata; // for access to usageCounter: + friend class MetadataManager; }; - // Class for the value of the _rangesToClean map. Used because callers of addRangeToClean - // sometimes need to wait until a range is deleted. Thus, complete(Status) is called - // when the range is deleted from _rangesToClean in removeRangeToClean(), letting callers - // of addRangeToClean know if the deletion succeeded or failed. - class RangeToCleanDescriptor { - public: - /** - * Initializes a RangeToCleanDescriptor with an empty notification. - */ - RangeToCleanDescriptor(BSONObj max) - : _max(max.getOwned()), _notification(std::make_shared<Notification<Status>>()) {} + /** + * Atomically decrements scoped.tracker->usageCount under our own mutex. At zero, retires any + * metadata that has fallen out of use, and pushes any orphan ranges found in them to the list + * of ranges actively being cleaned up. Calling with a null scoped.tracker is a no-op. + */ + void _decrementTrackerUsage(ScopedCollectionMetadata const& scoped); - /** - * Gets the maximum value of the range to be deleted. - */ - const BSONObj& getMax() const { - return _max; - } + /** + * Pushes current set of chunks, if any, to _metadataInUse, replaces it with newMetadata. + */ + void _setActiveMetadata_inlock(std::unique_ptr<CollectionMetadata> newMetadata); - // See comment on _notification. - std::shared_ptr<Notification<Status>> getNotification() { - return _notification; - } + /** + * Returns true if the specified range overlaps any chunk that might be currently in use by a + * running query. + * + * must be called locked. + */ - /** - * Sets the status on _notification. This will tell threads - * waiting on the value of status that the deletion succeeded or failed. - */ - void complete(Status status) { - _notification->set(status); - } + bool _overlapsInUseChunk(ChunkRange const& range); - private: - // The maximum value of the range to be deleted. - BSONObj _max; + /** + * Returns true if any range (possibly) still in use, but scheduled for cleanup, overlaps + * the argument range. + * + * Must be called locked. + */ + bool _overlapsInUseCleanups(ChunkRange const& range); - // This _notification will be set with a value indicating whether the deletion - // succeeded or failed. - std::shared_ptr<Notification<Status>> _notification; - }; + /** + * Deletes ranges, in background, until done, normally using a task executor attached to the + * ShardingState. + * + * Each time it completes cleaning up a range, it wakes up clients waiting on completion of + * that range, which may then verify their range has no more deletions scheduled, and proceed. + */ + static void _scheduleCleanup(executor::TaskExecutor*, NamespaceString nss); /** - * Removes the CollectionMetadata stored in the tracker from the _metadataInUse - * list (if it's there). + * Adds the range to the list of ranges scheduled for immediate deletion, and schedules a + * a background task to perform the work. + * + * Must be called locked. */ - void _removeMetadata_inlock(CollectionMetadataTracker* metadataTracker); + void _pushRangeToClean(ChunkRange const& range); - std::shared_ptr<Notification<Status>> _addRangeToClean_inlock(const ChunkRange& range); + /** + * Adds a range from the receiving map, so getNextOrphanRange will skip ranges migrating in. + */ + void _addToReceiving(ChunkRange const& range); - void _removeRangeToClean_inlock(const ChunkRange& range, Status deletionStatus); + /** + * Removes a range from the receiving map after a migration failure. range.minKey() must + * exactly match an element of _receivingChunks. + */ + void _removeFromReceiving(ChunkRange const& range); - RangeMap _getCopyOfRangesToClean_inlock(); + /** + * Wakes up any clients waiting on a range to leave _metadataInUse + * + * Must be called locked. + */ + void _notifyInUse(); - void _setActiveMetadata_inlock(std::unique_ptr<CollectionMetadata> newMetadata); + // data members const NamespaceString _nss; @@ -209,20 +237,28 @@ private: // Mutex to protect the state below stdx::mutex _managerLock; - // Holds the collection metadata, which is currently active + // The currently active collection metadata std::unique_ptr<CollectionMetadataTracker> _activeMetadataTracker; - // Holds collection metadata instances, which have previously been active, but are still in use - // by still active server operations or cursors + // Previously active collection metadata instances still in use by active server operations or + // cursors std::list<std::unique_ptr<CollectionMetadataTracker>> _metadataInUse; // Chunk ranges which are currently assumed to be transferred to the shard. Indexed by the min // key of the range. RangeMap _receivingChunks; - // Set of ranges to be deleted. Indexed by the min key of the range. - typedef BSONObjIndexedMap<RangeToCleanDescriptor> RangeToCleanMap; - RangeToCleanMap _rangesToClean; + // Clients can sleep on copies of _notification while waiting for their orphan ranges to fall + // out of use. + std::shared_ptr<Notification<Status>> _notification; + + // The background task that deletes documents from orphaned chunk ranges. + executor::TaskExecutor* _executor; + + // Ranges being deleted, or scheduled to be deleted, by a background task + CollectionRangeDeleter _rangesToClean; + + friend class ScopedCollectionMetadata; // Its op=() and dtor call _decrementTrackerUsage(). }; class ScopedCollectionMetadata { @@ -235,13 +271,15 @@ public: */ ScopedCollectionMetadata(); + // Must be called with owning MetadataManager unlocked ~ScopedCollectionMetadata(); + // must be called with owning MetadataManager unlocked ScopedCollectionMetadata(ScopedCollectionMetadata&& other); ScopedCollectionMetadata& operator=(ScopedCollectionMetadata&& other); /** - * Dereferencing the ScopedCollectionMetadata will dereference the internal CollectionMetadata. + * Dereferencing the ScopedCollectionMetadata dereferences the internal CollectionMetadata. */ CollectionMetadata* operator->() const; CollectionMetadata* getMetadata() const; @@ -252,22 +290,19 @@ public: operator bool() const; private: - friend ScopedCollectionMetadata MetadataManager::getActiveMetadata(); - /** - * Increments the counter in the CollectionMetadataTracker. + * Increments the refcount in the specified tracker. + * + * Must be called with specified *manager locked. */ ScopedCollectionMetadata(MetadataManager* manager, MetadataManager::CollectionMetadataTracker* tracker); - /** - * Decrements the usageCounter and conditionally makes a call to _removeMetadata on - * the tracker if the count has reached zero. - */ - void _decrementUsageCounter(); - MetadataManager* _manager{nullptr}; MetadataManager::CollectionMetadataTracker* _tracker{nullptr}; + + friend ScopedCollectionMetadata MetadataManager::getActiveMetadata(); // uses our private ctor + friend void MetadataManager::_decrementTrackerUsage(ScopedCollectionMetadata const&); }; } // namespace mongo diff --git a/src/mongo/db/s/metadata_manager_test.cpp b/src/mongo/db/s/metadata_manager_test.cpp index d4416cbfbea..20cdb8e052d 100644 --- a/src/mongo/db/s/metadata_manager_test.cpp +++ b/src/mongo/db/s/metadata_manager_test.cpp @@ -31,29 +31,74 @@ #include "mongo/db/s/metadata_manager.h" #include "mongo/bson/bsonobjbuilder.h" +#include "mongo/client/remote_command_targeter_mock.h" #include "mongo/db/client.h" +#include "mongo/db/db_raii.h" +#include "mongo/db/dbdirectclient.h" #include "mongo/db/jsobj.h" #include "mongo/db/namespace_string.h" +#include "mongo/db/repl/replication_coordinator_mock.h" #include "mongo/db/s/collection_metadata.h" +#include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/s/sharding_state.h" +#include "mongo/db/s/type_shard_identity.h" +#include "mongo/db/server_options.h" #include "mongo/db/service_context.h" #include "mongo/db/service_context_d_test_fixture.h" +#include "mongo/executor/task_executor.h" +#include "mongo/s/catalog/dist_lock_catalog_impl.h" +#include "mongo/s/catalog/dist_lock_manager_mock.h" +#include "mongo/s/catalog/sharding_catalog_client_mock.h" #include "mongo/s/catalog/type_chunk.h" +#include "mongo/s/client/shard_registry.h" +#include "mongo/s/sharding_mongod_test_fixture.h" +#include "mongo/stdx/condition_variable.h" #include "mongo/stdx/memory.h" #include "mongo/unittest/unittest.h" #include "mongo/util/assert_util.h" + +#include <boost/optional.hpp> + namespace mongo { namespace { using unittest::assertGet; -class MetadataManagerTest : public ServiceContextMongoDTest { +const NamespaceString kNss("TestDB", "TestColl"); +const std::string kPattern = "X"; +const BSONObj kShardKeyPattern{BSON(kPattern << 1)}; +const std::string kShardName{"a"}; +const HostAndPort dummyHost("dummy", 123); + +class MetadataManagerTest : public ShardingMongodTestFixture { +public: + std::shared_ptr<RemoteCommandTargeterMock> configTargeter() { + return RemoteCommandTargeterMock::get(shardRegistry()->getConfigShard()->getTargeter()); + } + protected: void setUp() override { - ServiceContextMongoDTest::setUp(); - ShardingState::get(getServiceContext()) - ->setScheduleCleanupFunctionForTest([](const NamespaceString& nss) {}); + ShardingMongodTestFixture::setUp(); + serverGlobalParams.clusterRole = ClusterRole::ShardServer; + initializeGlobalShardingStateForMongodForTest(ConnectionString(dummyHost)); + + configTargeter()->setFindHostReturnValue(dummyHost); + } + + std::unique_ptr<DistLockCatalog> makeDistLockCatalog(ShardRegistry* shardRegistry) override { + invariant(shardRegistry); + return stdx::make_unique<DistLockCatalogImpl>(shardRegistry); + } + + std::unique_ptr<DistLockManager> makeDistLockManager( + std::unique_ptr<DistLockCatalog> distLockCatalog) override { + return stdx::make_unique<DistLockManagerMock>(std::move(distLockCatalog)); + } + + std::unique_ptr<ShardingCatalogClient> makeShardingCatalogClient( + std::unique_ptr<DistLockManager> distLockManager) override { + return stdx::make_unique<ShardingCatalogClientMock>(std::move(distLockManager)); } static std::unique_ptr<CollectionMetadata> makeEmptyMetadata() { @@ -92,10 +137,23 @@ protected: return stdx::make_unique<CollectionMetadata>( metadata.getKeyPattern(), chunkVersion, chunkVersion, std::move(chunksMap)); } + + CollectionMetadata* addChunk(MetadataManager* manager) { + ScopedCollectionMetadata scopedMetadata1 = manager->getActiveMetadata(); + + ChunkVersion newVersion = scopedMetadata1->getCollVersion(); + newVersion.incMajor(); + std::unique_ptr<CollectionMetadata> cm2 = cloneMetadataPlusChunk( + *scopedMetadata1.getMetadata(), BSON("key" << 0), BSON("key" << 20), newVersion); + auto cm2Ptr = cm2.get(); + + manager->refreshActiveMetadata(std::move(cm2)); + return cm2Ptr; + } }; TEST_F(MetadataManagerTest, SetAndGetActiveMetadata) { - MetadataManager manager(getServiceContext(), NamespaceString("TestDb", "CollDB")); + MetadataManager manager(getServiceContext(), kNss, executor()); std::unique_ptr<CollectionMetadata> cm = makeEmptyMetadata(); auto cmPtr = cm.get(); @@ -107,197 +165,74 @@ TEST_F(MetadataManagerTest, SetAndGetActiveMetadata) { TEST_F(MetadataManagerTest, ResetActiveMetadata) { - MetadataManager manager(getServiceContext(), NamespaceString("TestDb", "CollDB")); + MetadataManager manager(getServiceContext(), kNss, executor()); manager.refreshActiveMetadata(makeEmptyMetadata()); - - ScopedCollectionMetadata scopedMetadata1 = manager.getActiveMetadata(); - - ChunkVersion newVersion = scopedMetadata1->getCollVersion(); - newVersion.incMajor(); - std::unique_ptr<CollectionMetadata> cm2 = cloneMetadataPlusChunk( - *scopedMetadata1.getMetadata(), BSON("key" << 0), BSON("key" << 10), newVersion); - auto cm2Ptr = cm2.get(); - - manager.refreshActiveMetadata(std::move(cm2)); + auto cm2Ptr = addChunk(&manager); ScopedCollectionMetadata scopedMetadata2 = manager.getActiveMetadata(); - ASSERT_EQ(cm2Ptr, scopedMetadata2.getMetadata()); }; -TEST_F(MetadataManagerTest, AddAndRemoveRangesToClean) { - MetadataManager manager(getServiceContext(), NamespaceString("TestDb", "CollDB")); - ChunkRange cr1 = ChunkRange(BSON("key" << 0), BSON("key" << 10)); - ChunkRange cr2 = ChunkRange(BSON("key" << 10), BSON("key" << 20)); - - manager.addRangeToClean(cr1); - ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 1UL); - manager.removeRangeToClean(cr1); - ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 0UL); - - manager.addRangeToClean(cr1); - manager.addRangeToClean(cr2); - manager.removeRangeToClean(cr1); - ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 1UL); - auto ranges = manager.getCopyOfRangesToClean(); - auto it = ranges.find(cr2.getMin()); - ChunkRange remainingChunk = ChunkRange(it->first, it->second.getMaxKey()); - ASSERT_EQ(remainingChunk.toString(), cr2.toString()); - manager.removeRangeToClean(cr2); -} - -// Tests that a removal in the middle of an existing ChunkRange results in -// two correct chunk ranges. -TEST_F(MetadataManagerTest, RemoveRangeInMiddleOfRange) { - MetadataManager manager(getServiceContext(), NamespaceString("TestDb", "CollDB")); - ChunkRange cr1 = ChunkRange(BSON("key" << 0), BSON("key" << 10)); +// In the following tests, the ranges-to-clean is not drained by the background deleter thread +// because the collection involved has no CollectionShardingState, so the task just returns without +// doing anything. - manager.addRangeToClean(cr1); - manager.removeRangeToClean(ChunkRange(BSON("key" << 4), BSON("key" << 6))); - ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 2UL); +TEST_F(MetadataManagerTest, CleanUpForMigrateIn) { + MetadataManager manager(getServiceContext(), kNss, executor()); - auto ranges = manager.getCopyOfRangesToClean(); - auto it = ranges.find(BSON("key" << 0)); - ChunkRange expectedChunk = ChunkRange(BSON("key" << 0), BSON("key" << 4)); - ChunkRange remainingChunk = ChunkRange(it->first, it->second.getMaxKey()); - ASSERT_EQ(remainingChunk.toString(), expectedChunk.toString()); - - it++; - expectedChunk = ChunkRange(BSON("key" << 6), BSON("key" << 10)); - remainingChunk = ChunkRange(it->first, it->second.getMaxKey()); - ASSERT_EQ(remainingChunk.toString(), expectedChunk.toString()); - - manager.removeRangeToClean(cr1); - ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 0UL); -} - -// Tests removals that overlap with just one ChunkRange. -TEST_F(MetadataManagerTest, RemoveRangeWithSingleRangeOverlap) { - MetadataManager manager(getServiceContext(), NamespaceString("TestDb", "CollDB")); - ChunkRange cr1 = ChunkRange(BSON("key" << 0), BSON("key" << 10)); - - manager.addRangeToClean(cr1); - manager.removeRangeToClean(ChunkRange(BSON("key" << 0), BSON("key" << 5))); - ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 1UL); - auto ranges = manager.getCopyOfRangesToClean(); - auto it = ranges.find(BSON("key" << 5)); - ChunkRange remainingChunk = ChunkRange(it->first, it->second.getMaxKey()); - ChunkRange expectedChunk = ChunkRange(BSON("key" << 5), BSON("key" << 10)); - ASSERT_EQ(remainingChunk.toString(), expectedChunk.toString()); - - manager.removeRangeToClean(ChunkRange(BSON("key" << 4), BSON("key" << 6))); - ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 1UL); - ranges = manager.getCopyOfRangesToClean(); - it = ranges.find(BSON("key" << 6)); - remainingChunk = ChunkRange(it->first, it->second.getMaxKey()); - expectedChunk = ChunkRange(BSON("key" << 6), BSON("key" << 10)); - ASSERT_EQ(remainingChunk.toString(), expectedChunk.toString()); - - manager.removeRangeToClean(ChunkRange(BSON("key" << 9), BSON("key" << 13))); - ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 1UL); - ranges = manager.getCopyOfRangesToClean(); - it = ranges.find(BSON("key" << 6)); - remainingChunk = ChunkRange(it->first, it->second.getMaxKey()); - expectedChunk = ChunkRange(BSON("key" << 6), BSON("key" << 9)); - ASSERT_EQ(remainingChunk.toString(), expectedChunk.toString()); - - manager.removeRangeToClean(ChunkRange(BSON("key" << 0), BSON("key" << 10))); - ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 0UL); + ChunkRange range2(BSON("key" << 10), BSON("key" << 20)); + manager.cleanUpForMigrateIn(ChunkRange(BSON("key" << 0), BSON("key" << 10))); + manager.cleanUpForMigrateIn(ChunkRange(BSON("key" << 10), BSON("key" << 20))); + ASSERT_EQ(manager.numberOfRangesToClean(), 2UL); + ASSERT_EQ(manager.numberOfRangesToCleanStillInUse(), 0UL); } -// Tests removals that overlap with more than one ChunkRange. -TEST_F(MetadataManagerTest, RemoveRangeWithMultipleRangeOverlaps) { - MetadataManager manager(getServiceContext(), NamespaceString("TestDb", "CollDB")); - ChunkRange cr1 = ChunkRange(BSON("key" << 0), BSON("key" << 10)); - ChunkRange cr2 = ChunkRange(BSON("key" << 10), BSON("key" << 20)); - ChunkRange cr3 = ChunkRange(BSON("key" << 20), BSON("key" << 30)); - - manager.addRangeToClean(cr1); - manager.addRangeToClean(cr2); - manager.addRangeToClean(cr3); - ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 3UL); - - manager.removeRangeToClean(ChunkRange(BSON("key" << 8), BSON("key" << 22))); - ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 2UL); - auto ranges = manager.getCopyOfRangesToClean(); - auto it = ranges.find(BSON("key" << 0)); - ChunkRange remainingChunk = ChunkRange(it->first, it->second.getMaxKey()); - ChunkRange expectedChunk = ChunkRange(BSON("key" << 0), BSON("key" << 8)); - ASSERT_EQ(remainingChunk.toString(), expectedChunk.toString()); - it++; - remainingChunk = ChunkRange(it->first, it->second.getMaxKey()); - expectedChunk = ChunkRange(BSON("key" << 22), BSON("key" << 30)); - ASSERT_EQ(remainingChunk.toString(), expectedChunk.toString()); - - manager.removeRangeToClean(ChunkRange(BSON("key" << 0), BSON("key" << 30))); - ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 0UL); -} - -TEST_F(MetadataManagerTest, AddAndRemoveRangeNotificationsBlockAndYield) { - MetadataManager manager(getServiceContext(), NamespaceString("TestDb", "CollDB")); - manager.refreshActiveMetadata(makeEmptyMetadata()); +TEST_F(MetadataManagerTest, AddRangeNotificationsBlockAndYield) { + MetadataManager manager(getServiceContext(), kNss, executor()); ChunkRange cr1(BSON("key" << 0), BSON("key" << 10)); - auto notification = manager.addRangeToClean(cr1); - manager.removeRangeToClean(cr1, Status::OK()); - ASSERT_OK(notification->get()); - ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 0UL); -} - -TEST_F(MetadataManagerTest, RemoveRangeToCleanCorrectlySetsBadStatus) { - MetadataManager manager(getServiceContext(), NamespaceString("TestDb", "CollDB")); - manager.refreshActiveMetadata(makeEmptyMetadata()); - - ChunkRange cr1(BSON("key" << 0), BSON("key" << 10)); - auto notification = manager.addRangeToClean(cr1); - manager.removeRangeToClean(cr1, Status(ErrorCodes::InternalError, "test error")); - ASSERT_NOT_OK(notification->get()); - ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 0UL); -} - -TEST_F(MetadataManagerTest, RemovingSubrangeStillSetsNotificationStatus) { - MetadataManager manager(getServiceContext(), NamespaceString("TestDb", "CollDB")); - manager.refreshActiveMetadata(makeEmptyMetadata()); - - ChunkRange cr1(BSON("key" << 0), BSON("key" << 10)); - auto notification = manager.addRangeToClean(cr1); - manager.removeRangeToClean(ChunkRange(BSON("key" << 3), BSON("key" << 7))); - ASSERT_OK(notification->get()); - ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 2UL); - manager.removeRangeToClean(cr1); - ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 0UL); - - notification = manager.addRangeToClean(cr1); - manager.removeRangeToClean(ChunkRange(BSON("key" << 7), BSON("key" << 15))); - ASSERT_OK(notification->get()); - ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 1UL); - manager.removeRangeToClean(cr1); - ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 0UL); + manager.cleanUpAfterMigrateOut(cr1); + ASSERT_EQ(manager.numberOfRangesToClean(), 1UL); + auto notification = manager.trackCleanup(cr1); + ASSERT(notification != nullptr && !bool(*notification)); + notification->set(Status::OK()); + ASSERT(bool(*notification)); + ASSERT_OK(notification->get(operationContext())); } TEST_F(MetadataManagerTest, NotificationBlocksUntilDeletion) { - MetadataManager manager(getServiceContext(), NamespaceString("TestDb", "CollDB")); - manager.refreshActiveMetadata(makeEmptyMetadata()); - ChunkRange cr1(BSON("key" << 0), BSON("key" << 10)); - auto notification = manager.addRangeToClean(cr1); - auto opCtx = cc().makeOperationContext(); - // Once the new range deleter is set up, this might fail if the range deleter - // deleted cr1 before we got here... - ASSERT_FALSE(notification->waitFor(opCtx.get(), Milliseconds(0))); - - manager.removeRangeToClean(cr1); - ASSERT_TRUE(notification->waitFor(opCtx.get(), Milliseconds(0))); - ASSERT_OK(notification->get()); + MetadataManager manager(getServiceContext(), kNss, executor()); + manager.refreshActiveMetadata(makeEmptyMetadata()); + auto notif = manager.trackCleanup(cr1); + ASSERT(notif.get() == nullptr); + { + ASSERT_EQ(manager.numberOfMetadataSnapshots(), 0UL); + ASSERT_EQ(manager.numberOfRangesToClean(), 0UL); + + auto scm = manager.getActiveMetadata(); // and increment scm's refcount + ASSERT(bool(scm)); + addChunk(&manager); // push new metadata + + ASSERT_EQ(manager.numberOfMetadataSnapshots(), 1UL); + ASSERT_EQ(manager.numberOfRangesToClean(), 0UL); // not yet... + + manager.cleanUpAfterMigrateOut(cr1); + ASSERT_EQ(manager.numberOfMetadataSnapshots(), 1UL); + ASSERT_EQ(manager.numberOfRangesToClean(), 1UL); + + notif = manager.trackCleanup(cr1); // will wake when scm goes away + } // scm destroyed, refcount of tracker goes to zero + ASSERT_EQ(manager.numberOfMetadataSnapshots(), 0UL); + ASSERT_EQ(manager.numberOfRangesToClean(), 1UL); + ASSERT(bool(notif)); // woke + notif = manager.trackCleanup(cr1); // now tracking the range in _rangesToClean + ASSERT(notif.get() != nullptr); } - TEST_F(MetadataManagerTest, RefreshAfterSuccessfulMigrationSinglePending) { - MetadataManager manager(getServiceContext(), NamespaceString("TestDb", "CollDB")); + MetadataManager manager(getServiceContext(), kNss, executor()); manager.refreshActiveMetadata(makeEmptyMetadata()); - const ChunkRange cr1(BSON("key" << 0), BSON("key" << 10)); - manager.beginReceive(cr1); - ASSERT_EQ(manager.getCopyOfReceivingChunks().size(), 1UL); ASSERT_EQ(manager.getActiveMetadata()->getChunks().size(), 0UL); ChunkVersion version = manager.getActiveMetadata()->getCollVersion(); @@ -305,21 +240,16 @@ TEST_F(MetadataManagerTest, RefreshAfterSuccessfulMigrationSinglePending) { manager.refreshActiveMetadata(cloneMetadataPlusChunk( *manager.getActiveMetadata().getMetadata(), cr1.getMin(), cr1.getMax(), version)); - ASSERT_EQ(manager.getCopyOfReceivingChunks().size(), 0UL); ASSERT_EQ(manager.getActiveMetadata()->getChunks().size(), 1UL); } + TEST_F(MetadataManagerTest, RefreshAfterSuccessfulMigrationMultiplePending) { - MetadataManager manager(getServiceContext(), NamespaceString("TestDb", "CollDB")); + MetadataManager manager(getServiceContext(), kNss, executor()); manager.refreshActiveMetadata(makeEmptyMetadata()); const ChunkRange cr1(BSON("key" << 0), BSON("key" << 10)); - manager.beginReceive(cr1); - const ChunkRange cr2(BSON("key" << 30), BSON("key" << 40)); - manager.beginReceive(cr2); - - ASSERT_EQ(manager.getCopyOfReceivingChunks().size(), 2UL); ASSERT_EQ(manager.getActiveMetadata()->getChunks().size(), 0UL); { @@ -328,7 +258,7 @@ TEST_F(MetadataManagerTest, RefreshAfterSuccessfulMigrationMultiplePending) { manager.refreshActiveMetadata(cloneMetadataPlusChunk( *manager.getActiveMetadata().getMetadata(), cr1.getMin(), cr1.getMax(), version)); - ASSERT_EQ(manager.getCopyOfReceivingChunks().size(), 1UL); + ASSERT_EQ(manager.numberOfRangesToClean(), 0UL); ASSERT_EQ(manager.getActiveMetadata()->getChunks().size(), 1UL); } @@ -338,22 +268,16 @@ TEST_F(MetadataManagerTest, RefreshAfterSuccessfulMigrationMultiplePending) { manager.refreshActiveMetadata(cloneMetadataPlusChunk( *manager.getActiveMetadata().getMetadata(), cr2.getMin(), cr2.getMax(), version)); - ASSERT_EQ(manager.getCopyOfReceivingChunks().size(), 0UL); ASSERT_EQ(manager.getActiveMetadata()->getChunks().size(), 2UL); } } TEST_F(MetadataManagerTest, RefreshAfterNotYetCompletedMigrationMultiplePending) { - MetadataManager manager(getServiceContext(), NamespaceString("TestDb", "CollDB")); + MetadataManager manager(getServiceContext(), kNss, executor()); manager.refreshActiveMetadata(makeEmptyMetadata()); const ChunkRange cr1(BSON("key" << 0), BSON("key" << 10)); - manager.beginReceive(cr1); - const ChunkRange cr2(BSON("key" << 30), BSON("key" << 40)); - manager.beginReceive(cr2); - - ASSERT_EQ(manager.getCopyOfReceivingChunks().size(), 2UL); ASSERT_EQ(manager.getActiveMetadata()->getChunks().size(), 0UL); ChunkVersion version = manager.getActiveMetadata()->getCollVersion(); @@ -361,35 +285,22 @@ TEST_F(MetadataManagerTest, RefreshAfterNotYetCompletedMigrationMultiplePending) manager.refreshActiveMetadata(cloneMetadataPlusChunk( *manager.getActiveMetadata().getMetadata(), BSON("key" << 50), BSON("key" << 60), version)); - ASSERT_EQ(manager.getCopyOfReceivingChunks().size(), 2UL); ASSERT_EQ(manager.getActiveMetadata()->getChunks().size(), 1UL); } TEST_F(MetadataManagerTest, BeginReceiveWithOverlappingRange) { - MetadataManager manager(getServiceContext(), NamespaceString("TestDb", "CollDB")); + MetadataManager manager(getServiceContext(), kNss, executor()); manager.refreshActiveMetadata(makeEmptyMetadata()); const ChunkRange cr1(BSON("key" << 0), BSON("key" << 10)); - manager.beginReceive(cr1); - const ChunkRange cr2(BSON("key" << 30), BSON("key" << 40)); - manager.beginReceive(cr2); - const ChunkRange crOverlap(BSON("key" << 5), BSON("key" << 35)); - manager.beginReceive(crOverlap); - const auto copyOfPending = manager.getCopyOfReceivingChunks(); - - ASSERT_EQ(copyOfPending.size(), 1UL); ASSERT_EQ(manager.getActiveMetadata()->getChunks().size(), 0UL); - - const auto it = copyOfPending.find(BSON("key" << 5)); - ASSERT(it != copyOfPending.end()); - ASSERT_BSONOBJ_EQ(it->second.getMaxKey(), BSON("key" << 35)); } TEST_F(MetadataManagerTest, RefreshMetadataAfterDropAndRecreate) { - MetadataManager manager(getServiceContext(), NamespaceString("TestDb", "CollDB")); + MetadataManager manager(getServiceContext(), kNss, executor()); manager.refreshActiveMetadata(makeEmptyMetadata()); { @@ -418,28 +329,15 @@ TEST_F(MetadataManagerTest, RefreshMetadataAfterDropAndRecreate) { // Tests membership functions for _rangesToClean TEST_F(MetadataManagerTest, RangesToCleanMembership) { - MetadataManager manager(getServiceContext(), NamespaceString("TestDb", "CollDB")); + MetadataManager manager(getServiceContext(), kNss, executor()); manager.refreshActiveMetadata(makeEmptyMetadata()); - ASSERT(!manager.hasRangesToClean()); - - ChunkRange cr1 = ChunkRange(BSON("key" << 0), BSON("key" << 10)); - manager.addRangeToClean(cr1); - - ASSERT(manager.hasRangesToClean()); - ASSERT(manager.isInRangesToClean(cr1)); -} - -// Tests that getNextRangeToClean successfully pulls a stored ChunkRange -TEST_F(MetadataManagerTest, GetNextRangeToClean) { - MetadataManager manager(getServiceContext(), NamespaceString("TestDb", "CollDB")); - manager.refreshActiveMetadata(makeEmptyMetadata()); + ASSERT(manager.numberOfRangesToClean() == 0UL); ChunkRange cr1 = ChunkRange(BSON("key" << 0), BSON("key" << 10)); - manager.addRangeToClean(cr1); + manager.cleanUpAfterMigrateOut(cr1); - ChunkRange cr2 = manager.getNextRangeToClean(); - ASSERT_EQ(cr1.toString(), cr2.toString()); + ASSERT(manager.numberOfRangesToClean() == 1UL); } } // namespace diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp index 2f9355dcdd4..55543cca712 100644 --- a/src/mongo/db/s/migration_destination_manager.cpp +++ b/src/mongo/db/s/migration_destination_manager.cpp @@ -51,6 +51,7 @@ #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/db/s/collection_metadata.h" +#include "mongo/db/s/collection_range_deleter.h" #include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/s/migration_util.h" #include "mongo/db/s/move_timing_helper.h" @@ -59,6 +60,7 @@ #include "mongo/s/catalog/type_chunk.h" #include "mongo/s/shard_key_pattern.h" #include "mongo/stdx/chrono.h" +#include "mongo/util/concurrency/notification.h" #include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" @@ -421,11 +423,7 @@ void MigrationDestinationManager::_migrateThread(BSONObj min, } if (getState() != DONE) { - // Unprotect the range if needed/possible on unsuccessful TO migration - Status status = _forgetPending(opCtx.get(), _nss, min, max, epoch); - if (!status.isOK()) { - warning() << "Failed to remove pending range" << redact(causedBy(status)); - } + _forgetReceive(opCtx.get(), _nss, min, max); } stdx::lock_guard<stdx::mutex> lk(_mutex); @@ -602,27 +600,10 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx, } { - // 2. Synchronously delete any data which might have been left orphaned in range - // being moved - - RangeDeleterOptions deleterOptions( - KeyRange(_nss.ns(), min.getOwned(), max.getOwned(), shardKeyPattern)); - deleterOptions.writeConcern = writeConcern; - - // No need to wait since all existing cursors will filter out this range when returning - // the results - deleterOptions.waitForOpenCursors = false; - deleterOptions.fromMigrate = true; - deleterOptions.onlyRemoveOrphanedDocs = true; - deleterOptions.removeSaverReason = "preCleanup"; - - if (!getDeleter()->deleteNow(opCtx, deleterOptions, &_errmsg)) { - warning() << "Failed to queue delete for migrate abort: " << redact(_errmsg); - setState(FAIL); - return; - } + // 2. Synchronously delete any data which might have been left orphaned in the range + // being moved, and wait for completion - Status status = _notePending(opCtx, _nss, min, max, epoch); + Status status = _beginReceive(opCtx, _nss, min, max); if (!status.isOK()) { _errmsg = status.reason(); setState(FAIL); @@ -990,80 +971,43 @@ bool MigrationDestinationManager::_flushPendingWrites(OperationContext* opCtx, return true; } -Status MigrationDestinationManager::_notePending(OperationContext* opCtx, - const NamespaceString& nss, - const BSONObj& min, - const BSONObj& max, - const OID& epoch) { - AutoGetCollection autoColl(opCtx, nss, MODE_IX, MODE_X); - - auto css = CollectionShardingState::get(opCtx, nss); - auto metadata = css->getMetadata(); - - // This can currently happen because drops aren't synchronized with in-migrations. The idea - // for checking this here is that in the future we shouldn't have this problem. - if (!metadata || metadata->getCollVersion().epoch() != epoch) { - return {ErrorCodes::StaleShardVersion, - str::stream() << "could not note chunk [" << min << "," << max << ")" - << " as pending because the epoch for " - << nss.ns() - << " has changed from " - << epoch - << " to " - << (metadata ? metadata->getCollVersion().epoch() - : ChunkVersion::UNSHARDED().epoch())}; - } - - css->beginReceive(ChunkRange(min, max)); - - stdx::lock_guard<stdx::mutex> sl(_mutex); - invariant(!_chunkMarkedPending); - _chunkMarkedPending = true; - - return Status::OK(); -} - -Status MigrationDestinationManager::_forgetPending(OperationContext* opCtx, - const NamespaceString& nss, - const BSONObj& min, - const BSONObj& max, - const OID& epoch) { +Status MigrationDestinationManager::_beginReceive(OperationContext* opCtx, + const NamespaceString& nss, + const BSONObj& min, + const BSONObj& max) { + ChunkRange footprint(min, max); { - stdx::lock_guard<stdx::mutex> sl(_mutex); - if (!_chunkMarkedPending) { + AutoGetCollection autoColl(opCtx, nss, MODE_IX, MODE_X); + if (autoColl.getCollection() == nullptr) { return Status::OK(); } - - _chunkMarkedPending = false; + auto css = CollectionShardingState::get(opCtx, nss); + + // start clearing any leftovers that would be in the new chunk + if (!css->beginReceive(footprint)) { + // TODO: assign a stable code + return {ErrorCodes::Error(17377), // ErrorCodes::RangeMayBeInUse, + str::stream() << "Collection " << nss.ns() << " range [" << redact(min) << ", " + << redact(max) + << ") migration aborted; documents in range may still" + " be in use on the destination shard."}; + } } - AutoGetCollection autoColl(opCtx, nss, MODE_IX, MODE_X); + return CollectionShardingState::waitForClean(opCtx, nss, footprint); +} +void MigrationDestinationManager::_forgetReceive(OperationContext* opCtx, + const NamespaceString& nss, + const BSONObj& min, + const BSONObj& max) { + AutoGetCollection autoColl(opCtx, nss, MODE_IX, MODE_X); + if (autoColl.getCollection() == nullptr) { + return; + } auto css = CollectionShardingState::get(opCtx, nss); auto metadata = css->getMetadata(); - - // This can currently happen because drops aren't synchronized with in-migrations. The idea - // for checking this here is that in the future we shouldn't have this problem. - if (!metadata || metadata->getCollVersion().epoch() != epoch) { - return {ErrorCodes::StaleShardVersion, - str::stream() << "no need to forget pending chunk " - << "[" - << min - << "," - << max - << ")" - << " because the epoch for " - << nss.ns() - << " has changed from " - << epoch - << " to " - << (metadata ? metadata->getCollVersion().epoch() - : ChunkVersion::UNSHARDED().epoch())}; - } - css->forgetReceive(ChunkRange(min, max)); - - return Status::OK(); } } // namespace mongo diff --git a/src/mongo/db/s/migration_destination_manager.h b/src/mongo/db/s/migration_destination_manager.h index c42ce1cdcb2..7763566a0c6 100644 --- a/src/mongo/db/s/migration_destination_manager.h +++ b/src/mongo/db/s/migration_destination_manager.h @@ -154,15 +154,11 @@ private: * * Overlapping pending ranges will be removed, so it is only safe to use this when you know * your metadata view is definitive, such as at the start of a migration. - * - * TODO: Because migrations may currently be active when a collection drops, an epoch is - * necessary to ensure the pending metadata change is still applicable. */ - Status _notePending(OperationContext* opCtx, - const NamespaceString& nss, - const BSONObj& min, - const BSONObj& max, - const OID& epoch); + Status _beginReceive(OperationContext* opCtx, + const NamespaceString& nss, + const BSONObj& min, + const BSONObj& max); /** * Stops tracking a chunk range between 'min' and 'max' that previously was having data @@ -170,15 +166,11 @@ private: * * To avoid removing pending ranges of other operations, ensure that this is only used when * a migration is still active. - * - * TODO: Because migrations may currently be active when a collection drops, an epoch is - * necessary to ensure the pending metadata change is still applicable. */ - Status _forgetPending(OperationContext* opCtx, - const NamespaceString& nss, - const BSONObj& min, - const BSONObj& max, - const OID& epoch); + void _forgetReceive(OperationContext* opCtx, + const NamespaceString& nss, + const BSONObj& min, + const BSONObj& max); /** * Checks whether the MigrationDestinationManager is currently handling a migration by checking diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp index 374b859ec15..f3d25b18d48 100644 --- a/src/mongo/db/s/migration_source_manager.cpp +++ b/src/mongo/db/s/migration_source_manager.cpp @@ -375,7 +375,8 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* opC if (refreshStatus.isOK()) { AutoGetCollection autoColl(opCtx, getNss(), MODE_IS); - auto refreshedMetadata = CollectionShardingState::get(opCtx, getNss())->getMetadata(); + auto css = CollectionShardingState::get(opCtx, getNss()); + auto refreshedMetadata = css->getMetadata(); if (!refreshedMetadata) { return {ErrorCodes::NamespaceNotSharded, @@ -391,6 +392,10 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* opC << migrationCommitStatus.reason()}; } + // Schedule clearing out orphaned documents when they are no longer in active use. + auto orphans = ChunkRange(_args.getMinKey(), _args.getMaxKey()); + css->cleanUpRange(orphans); + // Migration succeeded log() << "Migration succeeded and updated collection version to " << refreshedMetadata->getCollVersion(); @@ -405,7 +410,8 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* opC // We don't know whether migration succeeded or failed return {migrationCommitStatus.code(), - str::stream() << "Failed to refresh metadata after migration commit due to " + str::stream() << "Orphaned range not cleaned up. Failed to refresh metadata after" + " migration commit due to " << refreshStatus.toString()}; } diff --git a/src/mongo/db/s/move_chunk_command.cpp b/src/mongo/db/s/move_chunk_command.cpp index 08ea3723920..fa646974985 100644 --- a/src/mongo/db/s/move_chunk_command.cpp +++ b/src/mongo/db/s/move_chunk_command.cpp @@ -36,9 +36,11 @@ #include "mongo/db/auth/authorization_manager.h" #include "mongo/db/auth/authorization_session.h" #include "mongo/db/commands.h" +#include "mongo/db/db_raii.h" #include "mongo/db/range_deleter_service.h" #include "mongo/db/s/chunk_move_write_concern_options.h" #include "mongo/db/s/collection_metadata.h" +#include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/s/migration_source_manager.h" #include "mongo/db/s/move_timing_helper.h" #include "mongo/db/s/sharding_state.h" @@ -226,40 +228,13 @@ private: moveTimingHelper.done(6); MONGO_FAIL_POINT_PAUSE_WHILE_SET(moveChunkHangAtStep6); } - - // Schedule the range deleter - RangeDeleterOptions deleterOptions(KeyRange(moveChunkRequest.getNss().ns(), - moveChunkRequest.getMinKey().getOwned(), - moveChunkRequest.getMaxKey().getOwned(), - shardKeyPattern)); - deleterOptions.writeConcern = writeConcernForRangeDeleter; - deleterOptions.waitForOpenCursors = true; - deleterOptions.fromMigrate = true; - deleterOptions.onlyRemoveOrphanedDocs = true; - deleterOptions.removeSaverReason = "post-cleanup"; - + auto range = ChunkRange(moveChunkRequest.getMinKey(), moveChunkRequest.getMaxKey()); if (moveChunkRequest.getWaitForDelete()) { - log() << "doing delete inline for cleanup of chunk data"; - - string errMsg; - - // This is an immediate delete, and as a consequence, there could be more - // deletes happening simultaneously than there are deleter worker threads. - if (!getDeleter()->deleteNow(opCtx, deleterOptions, &errMsg)) { - log() << "Error occured while performing cleanup: " << redact(errMsg); - } + CollectionShardingState::waitForClean(opCtx, moveChunkRequest.getNss(), range); } else { - log() << "forking for cleanup of chunk data"; - - string errMsg; - if (!getDeleter()->queueDelete(opCtx, - deleterOptions, - NULL, // Don't want to be notified - &errMsg)) { - log() << "could not queue migration cleanup: " << redact(errMsg); - } + log() << "Leaving cleanup of " << moveChunkRequest.getNss().ns() << " range (" + << range.getMin() << ", " << range.getMax() << "] to complete in background"; } - moveTimingHelper.done(7); MONGO_FAIL_POINT_PAUSE_WHILE_SET(moveChunkHangAtStep7); } diff --git a/src/mongo/db/s/sharding_state.cpp b/src/mongo/db/s/sharding_state.cpp index 2646af582dd..df9a1c9da61 100644 --- a/src/mongo/db/s/sharding_state.cpp +++ b/src/mongo/db/s/sharding_state.cpp @@ -140,8 +140,7 @@ MONGO_INITIALIZER_WITH_PREREQUISITES(MongoDLocalShardingInfo, ("SetGlobalEnviron ShardingState::ShardingState() : _initializationState(static_cast<uint32_t>(InitializationState::kNew)), _initializationStatus(Status(ErrorCodes::InternalError, "Uninitialized value")), - _globalInit(&initializeGlobalShardingStateForMongod), - _scheduleWorkFn([](NamespaceString nss) {}) {} + _globalInit(&initializeGlobalShardingStateForMongod) {} ShardingState::~ShardingState() = default; @@ -237,14 +236,6 @@ void ShardingState::setGlobalInitMethodForTest(GlobalInitFunc func) { _globalInit = func; } -void ShardingState::setScheduleCleanupFunctionForTest(RangeDeleterCleanupNotificationFunc fn) { - _scheduleWorkFn = fn; -} - -void ShardingState::scheduleCleanup(const NamespaceString& nss) { - _scheduleWorkFn(nss); -} - Status ShardingState::onStaleShardVersion(OperationContext* opCtx, const NamespaceString& nss, const ChunkVersion& expectedVersion) { @@ -362,7 +353,6 @@ Status ShardingState::initializeFromShardIdentity(OperationContext* opCtx, _shardName = shardIdentity.getShardName(); _clusterId = shardIdentity.getClusterId(); - _initializeRangeDeleterTaskExecutor(); return status; } catch (const DBException& ex) { @@ -620,16 +610,24 @@ Status ShardingState::updateShardIdentityConfigString(OperationContext* opCtx, return Status::OK(); } -void ShardingState::_initializeRangeDeleterTaskExecutor() { - invariant(!_rangeDeleterTaskExecutor); - auto net = - executor::makeNetworkInterface("NetworkInterfaceCollectionRangeDeleter-TaskExecutor"); - auto netPtr = net.get(); - _rangeDeleterTaskExecutor = stdx::make_unique<executor::ThreadPoolTaskExecutor>( - stdx::make_unique<executor::NetworkInterfaceThreadPool>(netPtr), std::move(net)); +executor::TaskExecutor* ShardingState::getRangeDeleterTaskExecutor() { + stdx::lock_guard<stdx::mutex> lk(_rangeDeleterExecutor.lock); + if (_rangeDeleterExecutor.motor.get() == nullptr) { + static const char kExecName[] = "NetworkInterfaceCollectionRangeDeleter-TaskExecutor"; + auto net = executor::makeNetworkInterface(kExecName); + auto pool = stdx::make_unique<executor::NetworkInterfaceThreadPool>(net.get()); + _rangeDeleterExecutor.motor = + stdx::make_unique<executor::ThreadPoolTaskExecutor>(std::move(pool), std::move(net)); + _rangeDeleterExecutor.motor->startup(); + } + return _rangeDeleterExecutor.motor.get(); } -executor::ThreadPoolTaskExecutor* ShardingState::getRangeDeleterTaskExecutor() { - return _rangeDeleterTaskExecutor.get(); +ShardingState::RangeDeleterExecutor::~RangeDeleterExecutor() { + if (motor) { + motor->shutdown(); + motor->join(); + } } + } // namespace mongo diff --git a/src/mongo/db/s/sharding_state.h b/src/mongo/db/s/sharding_state.h index e5a74e081a9..c54fce68538 100644 --- a/src/mongo/db/s/sharding_state.h +++ b/src/mongo/db/s/sharding_state.h @@ -73,10 +73,6 @@ public: using GlobalInitFunc = stdx::function<Status(OperationContext*, const ConnectionString&, StringData)>; - // Signature for the callback function used by the MetadataManager to inform the - // sharding subsystem that there is range cleanup work to be done. - using RangeDeleterCleanupNotificationFunc = stdx::function<void(const NamespaceString&)>; - ShardingState(); ~ShardingState(); @@ -242,18 +238,6 @@ public: void scheduleCleanup(const NamespaceString& nss); /** - * Returns a pointer to the collection range deleter task executor. - */ - executor::ThreadPoolTaskExecutor* getRangeDeleterTaskExecutor(); - - /** - * Sets the function used by scheduleWorkOnRangeDeleterTaskExecutor to - * schedule work. Used for mocking the executor for testing. See the ShardingState - * for the default implementation of _scheduleWorkFn. - */ - void setScheduleCleanupFunctionForTest(RangeDeleterCleanupNotificationFunc fn); - - /** * If started with --shardsvr, initializes sharding awareness from the shardIdentity document * on disk, if there is one. * If started with --shardsvr in queryableBackupMode, initializes sharding awareness from the @@ -266,6 +250,11 @@ public: */ StatusWith<bool> initializeShardingAwarenessIfNeeded(OperationContext* opCtx); + /** + * Return the task executor to be shared by the range deleters for all collections. + */ + executor::TaskExecutor* getRangeDeleterTaskExecutor(); + private: // Map from a namespace into the sharding state for each collection we have typedef stdx::unordered_map<std::string, std::unique_ptr<CollectionShardingState>> @@ -304,9 +293,6 @@ private: */ ChunkVersion _refreshMetadata(OperationContext* opCtx, const NamespaceString& nss); - // Initializes a TaskExecutor for cleaning up orphaned ranges - void _initializeRangeDeleterTaskExecutor(); - // Manages the state of the migration recipient shard MigrationDestinationManager _migrationDestManager; @@ -339,12 +325,13 @@ private: // Function for initializing the external sharding state components not owned here. GlobalInitFunc _globalInit; - // Function for scheduling work on the _rangeDeleterTaskExecutor. - // Used in call to scheduleCleanup(NamespaceString). - RangeDeleterCleanupNotificationFunc _scheduleWorkFn; - - // Task executor for the collection range deleter. - std::unique_ptr<executor::ThreadPoolTaskExecutor> _rangeDeleterTaskExecutor; + // Task executor shared by the collection range deleters. + struct RangeDeleterExecutor { + stdx::mutex lock{}; + std::unique_ptr<executor::TaskExecutor> motor{nullptr}; + ~RangeDeleterExecutor(); + }; + RangeDeleterExecutor _rangeDeleterExecutor; }; } // namespace mongo diff --git a/src/mongo/s/catalog/type_chunk.cpp b/src/mongo/s/catalog/type_chunk.cpp index a114f86a681..eb286b7a631 100644 --- a/src/mongo/s/catalog/type_chunk.cpp +++ b/src/mongo/s/catalog/type_chunk.cpp @@ -133,6 +133,28 @@ bool ChunkRange::operator!=(const ChunkRange& other) const { return !(*this == other); } +bool ChunkRange::covers(ChunkRange const& other) const { + auto le = [](auto const& a, auto const& b) { return a.woCompare(b) <= 0; }; + return le(_minKey, other._minKey) && le(other._maxKey, _maxKey); +} + +boost::optional<ChunkRange> ChunkRange::overlapWith(ChunkRange const& other) const { + auto le = [](auto const& a, auto const& b) { return a.woCompare(b) <= 0; }; + if (le(other._maxKey, _minKey) || le(_maxKey, other._minKey)) { + return boost::none; + } + return ChunkRange(le(_minKey, other._minKey) ? other._minKey : _minKey, + le(_maxKey, other._maxKey) ? _maxKey : other._maxKey); +} + +ChunkRange ChunkRange::unionWith(ChunkRange const& other) const { + auto le = [](auto const& a, auto const& b) { return a.woCompare(b) <= 0; }; + return ChunkRange(le(_minKey, other._minKey) ? _minKey : other._minKey, + le(_maxKey, other._maxKey) ? other._maxKey : _maxKey); +} + +// ChunkType + ChunkType::ChunkType() = default; ChunkType::ChunkType(NamespaceString nss, ChunkRange range, ChunkVersion version, ShardId shardId) diff --git a/src/mongo/s/catalog/type_chunk.h b/src/mongo/s/catalog/type_chunk.h index 9827b5a3a79..b1bf1611880 100644 --- a/src/mongo/s/catalog/type_chunk.h +++ b/src/mongo/s/catalog/type_chunk.h @@ -83,9 +83,24 @@ public: bool operator==(const ChunkRange& other) const; bool operator!=(const ChunkRange& other) const; -private: - BSONObj _minKey; - BSONObj _maxKey; + /** + * Returns true iff the union of *this and the argument range is the same as *this. + */ + bool covers(ChunkRange const& other) const; + + /** + * Returns the range of overlap between *this and other, if any. + */ + boost::optional<ChunkRange> overlapWith(ChunkRange const& other) const; + + /** + * Returns a range that includes *this and other. If the ranges do not overlap, it includes + * all the space between, as well. + */ + ChunkRange unionWith(ChunkRange const& other) const; + + const BSONObj _minKey; + const BSONObj _maxKey; }; /** diff --git a/src/mongo/s/catalog/type_chunk_test.cpp b/src/mongo/s/catalog/type_chunk_test.cpp index 4589e4d4c38..eabce7ed879 100644 --- a/src/mongo/s/catalog/type_chunk_test.cpp +++ b/src/mongo/s/catalog/type_chunk_test.cpp @@ -245,6 +245,65 @@ TEST(ChunkRange, BasicBSONParsing) { ASSERT_BSONOBJ_EQ(BSON("x" << 10), chunkRange.getMax()); } +TEST(ChunkRange, Covers) { + auto target = ChunkRange(BSON("x" << 5), BSON("x" << 10)); + ASSERT(!target.covers(ChunkRange(BSON("x" << 0), BSON("x" << 5)))); + ASSERT(!target.covers(ChunkRange(BSON("x" << 10), BSON("x" << 15)))); + ASSERT(!target.covers(ChunkRange(BSON("x" << 0), BSON("x" << 7)))); + ASSERT(!target.covers(ChunkRange(BSON("x" << 7), BSON("x" << 15)))); + ASSERT(!target.covers(ChunkRange(BSON("x" << 0), BSON("x" << 15)))); + ASSERT(!target.covers(ChunkRange(BSON("x" << 0), BSON("x" << 10)))); + ASSERT(!target.covers(ChunkRange(BSON("x" << 5), BSON("x" << 15)))); + ASSERT(target.covers(ChunkRange(BSON("x" << 5), BSON("x" << 10)))); + ASSERT(target.covers(ChunkRange(BSON("x" << 6), BSON("x" << 10)))); + ASSERT(target.covers(ChunkRange(BSON("x" << 5), BSON("x" << 9)))); + ASSERT(target.covers(ChunkRange(BSON("x" << 6), BSON("x" << 9)))); +} + +TEST(ChunkRange, Overlap) { + auto target = ChunkRange(BSON("x" << 5), BSON("x" << 10)); + ASSERT(!target.overlapWith(ChunkRange(BSON("x" << 0), BSON("x" << 5)))); + ASSERT(!target.overlapWith(ChunkRange(BSON("x" << 0), BSON("x" << 4)))); + ASSERT(!target.overlapWith(ChunkRange(BSON("x" << 10), BSON("x" << 15)))); + ASSERT(!target.overlapWith(ChunkRange(BSON("x" << 11), BSON("x" << 15)))); + ASSERT(ChunkRange(BSON("x" << 7), BSON("x" << 10)) == + *target.overlapWith(ChunkRange(BSON("x" << 7), BSON("x" << 15)))); + ASSERT(ChunkRange(BSON("x" << 5), BSON("x" << 10)) == + *target.overlapWith(ChunkRange(BSON("x" << 0), BSON("x" << 10)))); + ASSERT(ChunkRange(BSON("x" << 5), BSON("x" << 10)) == + *target.overlapWith(ChunkRange(BSON("x" << 0), BSON("x" << 15)))); + ASSERT(ChunkRange(BSON("x" << 5), BSON("x" << 10)) == + *target.overlapWith(ChunkRange(BSON("x" << 5), BSON("x" << 15)))); + ASSERT(ChunkRange(BSON("x" << 5), BSON("x" << 9)) == + *target.overlapWith(ChunkRange(BSON("x" << 0), BSON("x" << 9)))); + ASSERT(ChunkRange(BSON("x" << 9), BSON("x" << 10)) == + *target.overlapWith(ChunkRange(BSON("x" << 9), BSON("x" << 15)))); +} + +TEST(ChunkRange, Union) { + auto target = ChunkRange(BSON("x" << 5), BSON("x" << 10)); + ASSERT(ChunkRange(BSON("x" << 0), BSON("x" << 10)) == + target.unionWith(ChunkRange(BSON("x" << 0), BSON("x" << 5)))); + ASSERT(ChunkRange(BSON("x" << 0), BSON("x" << 10)) == + target.unionWith(ChunkRange(BSON("x" << 0), BSON("x" << 4)))); + ASSERT(ChunkRange(BSON("x" << 5), BSON("x" << 15)) == + target.unionWith(ChunkRange(BSON("x" << 10), BSON("x" << 15)))); + ASSERT(ChunkRange(BSON("x" << 5), BSON("x" << 15)) == + target.unionWith(ChunkRange(BSON("x" << 11), BSON("x" << 15)))); + ASSERT(ChunkRange(BSON("x" << 5), BSON("x" << 15)) == + target.unionWith(ChunkRange(BSON("x" << 7), BSON("x" << 15)))); + ASSERT(ChunkRange(BSON("x" << 0), BSON("x" << 10)) == + target.unionWith(ChunkRange(BSON("x" << 0), BSON("x" << 10)))); + ASSERT(ChunkRange(BSON("x" << 0), BSON("x" << 14)) == + target.unionWith(ChunkRange(BSON("x" << 0), BSON("x" << 14)))); + ASSERT(ChunkRange(BSON("x" << 5), BSON("x" << 15)) == + target.unionWith(ChunkRange(BSON("x" << 5), BSON("x" << 15)))); + ASSERT(ChunkRange(BSON("x" << 0), BSON("x" << 10)) == + target.unionWith(ChunkRange(BSON("x" << 0), BSON("x" << 9)))); + ASSERT(ChunkRange(BSON("x" << 5), BSON("x" << 15)) == + target.unionWith(ChunkRange(BSON("x" << 9), BSON("x" << 15)))); +} + TEST(ChunkRange, MinGreaterThanMaxShouldError) { auto parseStatus = ChunkRange::fromBSON(BSON("min" << BSON("x" << 10) << "max" << BSON("x" << 0))); |