summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNathan Myers <nathan.myers@10gen.com>2017-02-16 12:03:44 -0500
committerNathan Myers <nathan.myers@10gen.com>2017-04-03 14:15:23 -0400
commit707fa63463b988656959f15faaac96cfd2f780f2 (patch)
tree9622716f5083d2756f2a15569fb6574387aa4fba
parent17c72c9b7d82e114436a559c03e3ece04532d9c2 (diff)
downloadmongo-707fa63463b988656959f15faaac96cfd2f780f2.tar.gz
SERVER-27921 Use new range deleter everwhere
-rw-r--r--.gdbinit4
-rw-r--r--src/mongo/db/dbhelpers.cpp6
-rw-r--r--src/mongo/db/s/SConscript20
-rw-r--r--src/mongo/db/s/cleanup_orphaned_cmd.cpp86
-rw-r--r--src/mongo/db/s/collection_metadata.cpp197
-rw-r--r--src/mongo/db/s/collection_metadata.h42
-rw-r--r--src/mongo/db/s/collection_range_deleter.cpp213
-rw-r--r--src/mongo/db/s/collection_range_deleter.h88
-rw-r--r--src/mongo/db/s/collection_range_deleter_test.cpp318
-rw-r--r--src/mongo/db/s/collection_sharding_state.cpp61
-rw-r--r--src/mongo/db/s/collection_sharding_state.h72
-rw-r--r--src/mongo/db/s/get_shard_version_command.cpp8
-rw-r--r--src/mongo/db/s/merge_chunks_command.cpp2
-rw-r--r--src/mongo/db/s/metadata_manager.cpp493
-rw-r--r--src/mongo/db/s/metadata_manager.h239
-rw-r--r--src/mongo/db/s/metadata_manager_test.cpp344
-rw-r--r--src/mongo/db/s/migration_destination_manager.cpp124
-rw-r--r--src/mongo/db/s/migration_destination_manager.h24
-rw-r--r--src/mongo/db/s/migration_source_manager.cpp10
-rw-r--r--src/mongo/db/s/move_chunk_command.cpp37
-rw-r--r--src/mongo/db/s/sharding_state.cpp38
-rw-r--r--src/mongo/db/s/sharding_state.h37
-rw-r--r--src/mongo/s/catalog/type_chunk.cpp22
-rw-r--r--src/mongo/s/catalog/type_chunk.h21
-rw-r--r--src/mongo/s/catalog/type_chunk_test.cpp59
25 files changed, 1255 insertions, 1310 deletions
diff --git a/.gdbinit b/.gdbinit
index 66d56149410..49fa398cf85 100644
--- a/.gdbinit
+++ b/.gdbinit
@@ -1,8 +1,8 @@
# Print the full stack trace on python exceptions to aid debugging
-set python print-stack full
+# set python print-stack full
# Load the mongodb pretty printers
-source buildscripts/gdb/mongo.py
+# source buildscripts/gdb/mongo.py
# Load the mongodb lock analysis
source buildscripts/gdb/mongo_lock.py
diff --git a/src/mongo/db/dbhelpers.cpp b/src/mongo/db/dbhelpers.cpp
index c247d31a6d1..4dfb73e0140 100644
--- a/src/mongo/db/dbhelpers.cpp
+++ b/src/mongo/db/dbhelpers.cpp
@@ -427,12 +427,12 @@ long long Helpers::removeRange(OperationContext* opCtx,
bool docIsOrphan;
// In write lock, so will be the most up-to-date version
- auto metadataNow = CollectionShardingState::get(opCtx, nss.ns())->getMetadata();
+ auto css = CollectionShardingState::get(opCtx, nss.ns());
+ auto metadataNow = css->getMetadata();
if (metadataNow) {
ShardKeyPattern kp(metadataNow->getKeyPattern());
BSONObj key = kp.extractShardKeyFromDoc(obj);
- docIsOrphan =
- !metadataNow->keyBelongsToMe(key) && !metadataNow->keyIsPending(key);
+ docIsOrphan = !metadataNow->keyBelongsToMe(key) && !css->keyIsPending(key);
} else {
docIsOrphan = false;
}
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index a91265b6ee3..2bcde771c10 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -228,22 +228,10 @@ env.CppUnitTest(
)
env.CppUnitTest(
- target='sharding_metadata_test',
- source=[
- 'collection_metadata_test.cpp',
- 'shard_metadata_util_test.cpp',
- ],
- LIBDEPS=[
- 'metadata',
- '$BUILD_DIR/mongo/s/shard_server_test_fixture',
- ],
-)
-
-env.CppUnitTest(
target='shard_test',
source=[
+ 'shard_metadata_util_test.cpp',
'active_migrations_registry_test.cpp',
- 'metadata_manager_test.cpp',
'migration_chunk_cloner_source_legacy_test.cpp',
'sharding_state_test.cpp',
],
@@ -253,14 +241,17 @@ env.CppUnitTest(
'$BUILD_DIR/mongo/s/catalog/sharding_catalog_client_impl',
'$BUILD_DIR/mongo/s/catalog/sharding_catalog_mock',
'$BUILD_DIR/mongo/s/sharding_mongod_test_fixture',
+ '$BUILD_DIR/mongo/s/shard_server_test_fixture',
],
)
env.CppUnitTest(
target='collection_sharding_state_test',
source=[
- 'collection_sharding_state_test.cpp',
+ 'collection_metadata_test.cpp',
'collection_range_deleter_test.cpp',
+ 'metadata_manager_test.cpp',
+ 'collection_sharding_state_test.cpp',
],
LIBDEPS=[
'$BUILD_DIR/mongo/client/remote_command_targeter_mock',
@@ -273,6 +264,7 @@ env.CppUnitTest(
'$BUILD_DIR/mongo/util/clock_source_mock',
'$BUILD_DIR/mongo/util/net/message_port_mock',
'$BUILD_DIR/mongo/db/service_context_d_test_fixture',
+ '$BUILD_DIR/mongo/s/sharding_mongod_test_fixture',
],
)
diff --git a/src/mongo/db/s/cleanup_orphaned_cmd.cpp b/src/mongo/db/s/cleanup_orphaned_cmd.cpp
index d0e58779a3b..bb0d58273f3 100644
--- a/src/mongo/db/s/cleanup_orphaned_cmd.cpp
+++ b/src/mongo/db/s/cleanup_orphaned_cmd.cpp
@@ -78,65 +78,59 @@ CleanupResult cleanupOrphanedData(OperationContext* opCtx,
const WriteConcernOptions& secondaryThrottle,
BSONObj* stoppedAtKey,
string* errMsg) {
- BSONObj startingFromKey = startingFromKeyConst;
- ScopedCollectionMetadata metadata;
+ BSONObj startingFromKey = startingFromKeyConst;
+ boost::optional<ChunkRange> targetRange;
{
AutoGetCollection autoColl(opCtx, ns, MODE_IS);
- metadata = CollectionShardingState::get(opCtx, ns.toString())->getMetadata();
- }
+ auto css = CollectionShardingState::get(opCtx, ns.toString());
+ auto metadata = css->getMetadata();
- if (!metadata) {
- warning() << "skipping orphaned data cleanup for " << ns.toString()
+ if (!metadata) {
+ log() << "skipping orphaned data cleanup for " << ns.toString()
<< ", collection is not sharded";
- return CleanupResult_Done;
- }
+ return CleanupResult_Done;
+ }
- BSONObj keyPattern = metadata->getKeyPattern();
- if (!startingFromKey.isEmpty()) {
- if (!metadata->isValidKey(startingFromKey)) {
- *errMsg = stream() << "could not cleanup orphaned data, start key "
- << redact(startingFromKey) << " does not match shard key pattern "
- << keyPattern;
+ BSONObj keyPattern = metadata->getKeyPattern();
+ if (!startingFromKey.isEmpty()) {
+ if (!metadata->isValidKey(startingFromKey)) {
+ *errMsg = stream() << "could not cleanup orphaned data, start key "
+ << redact(startingFromKey)
+ << " does not match shard key pattern " << keyPattern;
- warning() << *errMsg;
- return CleanupResult_Error;
+ log() << *errMsg;
+ return CleanupResult_Error;
+ }
+ } else {
+ startingFromKey = metadata->getMinKey();
}
- } else {
- startingFromKey = metadata->getMinKey();
- }
- KeyRange orphanRange;
- if (!metadata->getNextOrphanRange(startingFromKey, &orphanRange)) {
- LOG(1) << "cleanupOrphaned requested for " << ns.toString() << " starting from "
- << redact(startingFromKey) << ", no orphan ranges remain";
+ KeyRange orphanRange;
+ if (!metadata->getNextOrphanRange(css->getReceiveMap(), startingFromKey, &orphanRange)) {
+ LOG(1) << "cleanupOrphaned requested for " << ns.toString() << " starting from "
+ << redact(startingFromKey) << ", no orphan ranges remain";
+
+ return CleanupResult_Done;
+ }
+ orphanRange.ns = ns.ns();
+ *stoppedAtKey = orphanRange.maxKey;
+
+ LOG(0) << "cleanupOrphaned requested for " << ns.toString() << " starting from "
+ << redact(startingFromKey) << ", removing next orphan range"
+ << " [" << redact(orphanRange.minKey) << "," << redact(orphanRange.maxKey) << ")";
- return CleanupResult_Done;
+ targetRange.emplace(ChunkRange(orphanRange.minKey, orphanRange.maxKey));
+ css->cleanUpRange(*targetRange);
}
- orphanRange.ns = ns.ns();
- *stoppedAtKey = orphanRange.maxKey;
-
- LOG(0) << "cleanupOrphaned requested for " << ns.toString() << " starting from "
- << redact(startingFromKey) << ", removing next orphan range"
- << " [" << redact(orphanRange.minKey) << "," << redact(orphanRange.maxKey) << ")";
-
- // Metadata snapshot may be stale now, but deleter checks metadata again in write lock
- // before delete.
- RangeDeleterOptions deleterOptions(orphanRange);
- deleterOptions.writeConcern = secondaryThrottle;
- deleterOptions.onlyRemoveOrphanedDocs = true;
- deleterOptions.fromMigrate = true;
- // Must wait for cursors since there can be existing cursors with an older
- // CollectionMetadata.
- deleterOptions.waitForOpenCursors = true;
- deleterOptions.removeSaverReason = "cleanup-cmd";
-
- if (!getDeleter()->deleteNow(opCtx, deleterOptions, errMsg)) {
- warning() << redact(*errMsg);
- return CleanupResult_Error;
+ if (targetRange) {
+ auto result = CollectionShardingState::waitForClean(opCtx, ns, *targetRange);
+ if (!result.isOK()) {
+ warning() << redact(result.reason());
+ return CleanupResult_Error;
+ }
}
-
return CleanupResult_Continue;
}
diff --git a/src/mongo/db/s/collection_metadata.cpp b/src/mongo/db/s/collection_metadata.cpp
index b18e325adb0..0b4cf09e59e 100644
--- a/src/mongo/db/s/collection_metadata.cpp
+++ b/src/mongo/db/s/collection_metadata.cpp
@@ -49,7 +49,6 @@ CollectionMetadata::CollectionMetadata(const BSONObj& keyPattern,
_collVersion(collectionVersion),
_shardVersion(shardVersion),
_chunksMap(std::move(shardChunksMap)),
- _pendingMap(SimpleBSONObjComparator::kInstance.makeBSONObjIndexedMap<CachedChunkInfo>()),
_rangesMap(SimpleBSONObjComparator::kInstance.makeBSONObjIndexedMap<CachedChunkInfo>()) {
invariant(_shardKeyPattern.isValid());
@@ -61,16 +60,24 @@ CollectionMetadata::CollectionMetadata(const BSONObj& keyPattern,
invariant(!_shardVersion.isSet());
return;
}
-
invariant(_shardVersion.isSet());
- // Load the chunk information, coallesceing their ranges. The version for this shard would be
+ _rebuildRangesMap();
+}
+
+CollectionMetadata::~CollectionMetadata() = default;
+
+void CollectionMetadata::_rebuildRangesMap() {
+ _rangesMap.clear();
+
+ // Load the chunk information, coalescing their ranges. The version for this shard would be
// the highest version for any of the chunks.
+
BSONObj min, max;
for (const auto& entry : _chunksMap) {
- BSONObj currMin = entry.first;
- BSONObj currMax = entry.second.getMaxKey();
+ BSONObj const& currMin = entry.first;
+ BSONObj const& currMax = entry.second.getMaxKey();
// Coalesce the chunk's bounds in ranges if they are adjacent chunks
if (min.isEmpty()) {
@@ -96,54 +103,9 @@ CollectionMetadata::CollectionMetadata(const BSONObj& keyPattern,
_rangesMap.emplace(min, CachedChunkInfo(max, ChunkVersion::IGNORED()));
}
-CollectionMetadata::~CollectionMetadata() = default;
-
-std::unique_ptr<CollectionMetadata> CollectionMetadata::cloneMinusPending(
- const ChunkType& chunk) const {
- invariant(rangeMapContains(_pendingMap, chunk.getMin(), chunk.getMax()));
-
- auto metadata(stdx::make_unique<CollectionMetadata>(
- _shardKeyPattern.toBSON(), getCollVersion(), getShardVersion(), getChunks()));
- metadata->_pendingMap = _pendingMap;
-
- metadata->_pendingMap.erase(chunk.getMin());
-
- return metadata;
-}
-
-std::unique_ptr<CollectionMetadata> CollectionMetadata::clonePlusPending(
- const ChunkType& chunk) const {
- invariant(!rangeMapOverlaps(_chunksMap, chunk.getMin(), chunk.getMax()));
-
- auto metadata(stdx::make_unique<CollectionMetadata>(
- _shardKeyPattern.toBSON(), getCollVersion(), getShardVersion(), getChunks()));
- metadata->_pendingMap = _pendingMap;
-
- // If there are any pending chunks on the interval to be added this is ok, since pending chunks
- // aren't officially tracked yet and something may have changed on servers we do not see yet.
- //
- // We remove any chunks we overlap because the remote request starting a chunk migration is what
- // is authoritative.
-
- if (rangeMapOverlaps(_pendingMap, chunk.getMin(), chunk.getMax())) {
- RangeVector pendingOverlap;
- getRangeMapOverlap(_pendingMap, chunk.getMin(), chunk.getMax(), &pendingOverlap);
-
- warning() << "new pending chunk " << redact(rangeToString(chunk.getMin(), chunk.getMax()))
- << " overlaps existing pending chunks " << redact(overlapToString(pendingOverlap))
- << ", a migration may not have completed";
-
- for (RangeVector::iterator it = pendingOverlap.begin(); it != pendingOverlap.end(); ++it) {
- metadata->_pendingMap.erase(it->first);
- }
- }
-
- // The pending map entry cannot contain a specific chunk version because we don't know what
- // version would be generated for it at commit time. That's why we insert an IGNORED value.
- metadata->_pendingMap.emplace(chunk.getMin(),
- CachedChunkInfo(chunk.getMax(), ChunkVersion::IGNORED()));
-
- return metadata;
+std::unique_ptr<CollectionMetadata> CollectionMetadata::clone() const {
+ return stdx::make_unique<CollectionMetadata>(
+ _shardKeyPattern.toBSON(), getCollVersion(), getShardVersion(), getChunks());
}
bool CollectionMetadata::keyBelongsToMe(const BSONObj& key) const {
@@ -158,18 +120,6 @@ bool CollectionMetadata::keyBelongsToMe(const BSONObj& key) const {
return rangeContains(it->first, it->second.getMaxKey(), key);
}
-bool CollectionMetadata::keyIsPending(const BSONObj& key) const {
- if (_pendingMap.empty()) {
- return false;
- }
-
- auto it = _pendingMap.upper_bound(key);
- if (it != _pendingMap.begin())
- it--;
-
- return rangeContains(it->first, it->second.getMaxKey(), key);
-}
-
bool CollectionMetadata::getNextChunk(const BSONObj& lookupKey, ChunkType* chunk) const {
RangeMap::const_iterator upperChunkIt = _chunksMap.upper_bound(lookupKey);
RangeMap::const_iterator lowerChunkIt = upperChunkIt;
@@ -238,6 +188,10 @@ Status CollectionMetadata::checkChunkIsValid(const ChunkType& chunk) {
return Status::OK();
}
+bool CollectionMetadata::rangeOverlapsChunk(ChunkRange const& range) {
+ return rangeMapOverlaps(_rangesMap, range.getMin(), range.getMax());
+}
+
void CollectionMetadata::toBSONBasic(BSONObjBuilder& bb) const {
_collVersion.addToBSON(bb, "collVersion");
_shardVersion.addToBSON(bb, "shardVersion");
@@ -256,97 +210,58 @@ void CollectionMetadata::toBSONChunks(BSONArrayBuilder& bb) const {
}
}
-void CollectionMetadata::toBSONPending(BSONArrayBuilder& bb) const {
- if (_pendingMap.empty())
- return;
-
- for (RangeMap::const_iterator it = _pendingMap.begin(); it != _pendingMap.end(); ++it) {
- BSONArrayBuilder pendingBB(bb.subarrayStart());
- pendingBB.append(it->first);
- pendingBB.append(it->second.getMaxKey());
- pendingBB.done();
- }
-}
-
std::string CollectionMetadata::toStringBasic() const {
return str::stream() << "collection version: " << _collVersion.toString()
<< ", shard version: " << _shardVersion.toString();
}
-bool CollectionMetadata::getNextOrphanRange(const BSONObj& origLookupKey, KeyRange* range) const {
+bool CollectionMetadata::getNextOrphanRange(RangeMap const& receivingChunks,
+ const BSONObj& origLookupKey,
+ KeyRange* range) const {
BSONObj lookupKey = origLookupKey;
BSONObj maxKey = getMaxKey(); // so we don't keep rebuilding
while (lookupKey.woCompare(maxKey) < 0) {
- RangeMap::const_iterator lowerChunkIt = _chunksMap.end();
- RangeMap::const_iterator upperChunkIt = _chunksMap.end();
-
- if (!_chunksMap.empty()) {
- upperChunkIt = _chunksMap.upper_bound(lookupKey);
- lowerChunkIt = upperChunkIt;
- if (upperChunkIt != _chunksMap.begin())
- --lowerChunkIt;
- else
- lowerChunkIt = _chunksMap.end();
- }
-
- // If we overlap, continue after the overlap
- // TODO: Could optimize slightly by finding next non-contiguous chunk
- if (lowerChunkIt != _chunksMap.end() &&
- lowerChunkIt->second.getMaxKey().woCompare(lookupKey) > 0) {
- lookupKey = lowerChunkIt->second.getMaxKey();
- continue;
- }
-
- RangeMap::const_iterator lowerPendingIt = _pendingMap.end();
- RangeMap::const_iterator upperPendingIt = _pendingMap.end();
-
- if (!_pendingMap.empty()) {
- upperPendingIt = _pendingMap.upper_bound(lookupKey);
- lowerPendingIt = upperPendingIt;
- if (upperPendingIt != _pendingMap.begin())
- --lowerPendingIt;
- else
- lowerPendingIt = _pendingMap.end();
- }
-
- // If we overlap, continue after the overlap
- // TODO: Could optimize slightly by finding next non-contiguous chunk
- if (lowerPendingIt != _pendingMap.end() &&
- lowerPendingIt->second.getMaxKey().woCompare(lookupKey) > 0) {
- lookupKey = lowerPendingIt->second.getMaxKey();
- continue;
- }
-
- //
- // We know that the lookup key is not covered by a chunk or pending range, and where the
- // previous chunk and pending chunks are. Now we fill in the bounds as the closest
- // bounds of the surrounding ranges in both maps.
- //
range->keyPattern = _shardKeyPattern.toBSON();
range->minKey = getMinKey();
range->maxKey = maxKey;
- if (lowerChunkIt != _chunksMap.end() &&
- lowerChunkIt->second.getMaxKey().woCompare(range->minKey) > 0) {
- range->minKey = lowerChunkIt->second.getMaxKey();
- }
-
- if (upperChunkIt != _chunksMap.end() && upperChunkIt->first.woCompare(range->maxKey) < 0) {
- range->maxKey = upperChunkIt->first;
- }
-
- if (lowerPendingIt != _pendingMap.end() &&
- lowerPendingIt->second.getMaxKey().woCompare(range->minKey) > 0) {
- range->minKey = lowerPendingIt->second.getMaxKey();
- }
+ auto patchUp = [&](auto const& map) {
+ auto lowerIt = map.end(), upperIt = map.end();
+
+ if (!map.empty()) {
+ upperIt = map.upper_bound(lookupKey);
+ lowerIt = upperIt;
+ if (upperIt != map.begin())
+ --lowerIt;
+ else
+ lowerIt = map.end();
+ }
+
+ // If we overlap, continue after the overlap
+ // TODO: Could optimize slightly by finding next non-contiguous chunk
+ if (lowerIt != map.end() && lowerIt->second.getMaxKey().woCompare(lookupKey) > 0) {
+ lookupKey = lowerIt->second.getMaxKey();
+ return false;
+ }
+
+ // We know that the lookup key is not covered by a chunk or pending range, and where the
+ // previous chunk and pending chunks are. Now we fill in the bounds as the closest
+ // bounds of the surrounding ranges in both maps.
+ if (lowerIt != _chunksMap.end() &&
+ lowerIt->second.getMaxKey().woCompare(range->minKey) > 0) {
+ range->minKey = lowerIt->second.getMaxKey();
+ }
+ if (upperIt != _chunksMap.end() && upperIt->first.woCompare(range->maxKey) < 0) {
+ range->maxKey = upperIt->first;
+ }
+ return true;
+ };
- if (upperPendingIt != _pendingMap.end() &&
- upperPendingIt->first.woCompare(range->maxKey) < 0) {
- range->maxKey = upperPendingIt->first;
+ // side effects on *range are safe even if we go around again.
+ if (patchUp(_chunksMap) && patchUp(receivingChunks)) {
+ return true;
}
-
- return true;
}
return false;
diff --git a/src/mongo/db/s/collection_metadata.h b/src/mongo/db/s/collection_metadata.h
index ea7392842d6..c4b4ea70057 100644
--- a/src/mongo/db/s/collection_metadata.h
+++ b/src/mongo/db/s/collection_metadata.h
@@ -29,6 +29,7 @@
#pragma once
#include "mongo/db/range_arithmetic.h"
+#include "mongo/s/catalog/type_chunk.h"
#include "mongo/s/chunk_version.h"
#include "mongo/s/shard_key_pattern.h"
@@ -64,20 +65,9 @@ public:
~CollectionMetadata();
/**
- * Returns a new metadata's instance based on 'this's state by removing a 'pending' chunk.
- *
- * The shard and collection version of the new metadata are unaffected. The caller owns the
- * new metadata.
- */
- std::unique_ptr<CollectionMetadata> cloneMinusPending(const ChunkType& chunk) const;
-
- /**
- * Returns a new metadata's instance based on 'this's state by adding a 'pending' chunk.
- *
- * The shard and collection version of the new metadata are unaffected. The caller owns the
- * new metadata.
+ * Returns a full copy of *this.
*/
- std::unique_ptr<CollectionMetadata> clonePlusPending(const ChunkType& chunk) const;
+ std::unique_ptr<CollectionMetadata> clone() const;
/**
* Returns true if the document key 'key' is a valid instance of a shard key for this
@@ -93,12 +83,6 @@ public:
bool keyBelongsToMe(const BSONObj& key) const;
/**
- * Returns true if the document key 'key' is or has been migrated to this shard, and may
- * belong to us after a subsequent config reload. Key must be the full shard key.
- */
- bool keyIsPending(const BSONObj& key) const;
-
- /**
* Given a key 'lookupKey' in the shard key range, get the next chunk which overlaps or is
* greater than this key. Returns true if a chunk exists, false otherwise.
*
@@ -117,6 +101,11 @@ public:
Status checkChunkIsValid(const ChunkType& chunk);
/**
+ * Returns true if the argument range overlaps any chunk.
+ */
+ bool rangeOverlapsChunk(ChunkRange const& range);
+
+ /**
* Given a key in the shard key range, get the next range which overlaps or is greater than
* this key.
*
@@ -132,7 +121,9 @@ public:
* @param lookupKey passing a key that does not belong to this metadata is undefined.
* @param orphanRange the output range. Note that the NS is not set.
*/
- bool getNextOrphanRange(const BSONObj& lookupKey, KeyRange* orphanRange) const;
+ bool getNextOrphanRange(RangeMap const& receiveMap,
+ BSONObj const& lookupKey,
+ KeyRange* orphanRange) const;
ChunkVersion getCollVersion() const {
return _collVersion;
@@ -183,6 +174,12 @@ public:
std::string toStringBasic() const;
private:
+ /**
+ * Clears and rebuilds _rangesMap from the contents of _chunksMap.
+ */
+
+ void _rebuildRangesMap();
+
// Shard key pattern for the collection
ShardKeyPattern _shardKeyPattern;
@@ -196,10 +193,7 @@ private:
// Map of chunks tracked by this shard
RangeMap _chunksMap;
- // Map of ranges of chunks that are migrating but have not been confirmed added yet
- RangeMap _pendingMap;
-
- // A second map from a min key into a range or contiguous chunks. The map is redundant
+ // A second map from a min key into a range of contiguous chunks. The map is redundant
// w.r.t. _chunkMap but we expect high chunk contiguity, especially in small
// installations.
RangeMap _rangesMap;
diff --git a/src/mongo/db/s/collection_range_deleter.cpp b/src/mongo/db/s/collection_range_deleter.cpp
index be37e510a9d..4e86274d4ee 100644
--- a/src/mongo/db/s/collection_range_deleter.cpp
+++ b/src/mongo/db/s/collection_range_deleter.cpp
@@ -33,6 +33,7 @@
#include "mongo/db/s/collection_range_deleter.h"
#include <algorithm>
+#include <utility>
#include "mongo/db/catalog/collection.h"
#include "mongo/db/client.h"
@@ -41,13 +42,16 @@
#include "mongo/db/exec/working_set_common.h"
#include "mongo/db/index/index_descriptor.h"
#include "mongo/db/keypattern.h"
+#include "mongo/db/operation_context.h"
#include "mongo/db/query/internal_plans.h"
#include "mongo/db/query/query_knobs.h"
#include "mongo/db/query/query_planner.h"
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/repl/replication_coordinator_global.h"
#include "mongo/db/s/collection_sharding_state.h"
+#include "mongo/db/s/metadata_manager.h"
#include "mongo/db/s/sharding_state.h"
+#include "mongo/db/service_context.h"
#include "mongo/db/write_concern.h"
#include "mongo/executor/task_executor.h"
#include "mongo/util/log.h"
@@ -57,7 +61,6 @@
namespace mongo {
class ChunkRange;
-class OldClientWriteContext;
using CallbackArgs = executor::TaskExecutor::CallbackArgs;
using logger::LogComponent;
@@ -67,62 +70,39 @@ namespace {
const WriteConcernOptions kMajorityWriteConcern(WriteConcernOptions::kMajority,
WriteConcernOptions::SyncMode::UNSET,
Seconds(60));
-
} // unnamed namespace
-CollectionRangeDeleter::CollectionRangeDeleter(NamespaceString nss) : _nss(std::move(nss)) {}
-
-void CollectionRangeDeleter::run() {
- Client::initThread(getThreadName());
- ON_BLOCK_EXIT([&] { Client::destroy(); });
- auto opCtx = cc().makeOperationContext().get();
-
- const int maxToDelete = std::max(int(internalQueryExecYieldIterations.load()), 1);
- bool hasNextRangeToClean = cleanupNextRange(opCtx, maxToDelete);
-
- // If there are more ranges to run, we add <this> back onto the task executor to run again.
- if (hasNextRangeToClean) {
- auto executor = ShardingState::get(opCtx)->getRangeDeleterTaskExecutor();
- executor->scheduleWork([this](const CallbackArgs& cbArgs) { run(); });
- } else {
- delete this;
- }
+CollectionRangeDeleter::~CollectionRangeDeleter() {
+ clear(); // notify anybody sleeping on orphan ranges
}
-bool CollectionRangeDeleter::cleanupNextRange(OperationContext* opCtx, int maxToDelete) {
-
- {
- AutoGetCollection autoColl(opCtx, _nss, MODE_IX);
- auto* collection = autoColl.getCollection();
- if (!collection) {
- return false;
- }
- auto* collectionShardingState = CollectionShardingState::get(opCtx, _nss);
- dassert(collectionShardingState != nullptr); // every collection gets one
-
- auto& metadataManager = collectionShardingState->_metadataManager;
-
- if (!_rangeInProgress && !metadataManager.hasRangesToClean()) {
- // Nothing left to do
- return false;
- }
-
- if (!_rangeInProgress || !metadataManager.isInRangesToClean(_rangeInProgress.get())) {
- // No valid chunk in progress, get a new one
- if (!metadataManager.hasRangesToClean()) {
- return false;
- }
- _rangeInProgress = metadataManager.getNextRangeToClean();
+// This function runs in range deleter's task executor thread.
+// call under a collection lock
+bool CollectionRangeDeleter::cleanUpNextRange(OperationContext* opCtx,
+ Collection* collection,
+ BSONObj const& keyPattern,
+ stdx::mutex* lock,
+ int maxToDelete) {
+ dassert(collection != nullptr);
+ auto range = [lock, this]() -> boost::optional<ChunkRange> {
+ stdx::lock_guard<stdx::mutex> lk(*lock);
+ if (this->isEmpty()) {
+ return boost::none;
+ } else {
+ return this->_orphans.front().range;
}
+ }();
+ if (!range) {
+ return false;
+ }
- auto scopedCollectionMetadata = collectionShardingState->getMetadata();
- int numDocumentsDeleted =
- _doDeletion(opCtx, collection, scopedCollectionMetadata->getKeyPattern(), maxToDelete);
- if (numDocumentsDeleted <= 0) {
- metadataManager.removeRangeToClean(_rangeInProgress.get());
- _rangeInProgress = boost::none;
- return metadataManager.hasRangesToClean();
+ auto countWith = _doDeletion(opCtx, collection, keyPattern, *range, maxToDelete);
+ if (!countWith.isOK() || countWith.getValue() == 0) {
+ {
+ stdx::lock_guard<stdx::mutex> scopedLock(*lock);
+ _pop(countWith.getStatus());
}
+ return true;
}
// wait for replication
@@ -131,59 +111,63 @@ bool CollectionRangeDeleter::cleanupNextRange(OperationContext* opCtx, int maxTo
Status status =
waitForWriteConcern(opCtx, currentClientOpTime, kMajorityWriteConcern, &wcResult);
if (!status.isOK()) {
- warning() << "Error when waiting for write concern after removing chunks in " << _nss
- << " : " << status.reason();
+ warning() << "Error when waiting for write concern after removing chunks in "
+ << collection->ns() << " : " << status.reason();
+ {
+ stdx::lock_guard<stdx::mutex> scopedLock(*lock);
+ _pop(status);
+ }
}
-
return true;
}
-int CollectionRangeDeleter::_doDeletion(OperationContext* opCtx,
- Collection* collection,
- const BSONObj& keyPattern,
- int maxToDelete) {
- invariant(_rangeInProgress);
- invariant(collection);
+// This function runs in range deleter task executor thread..
+// call under collection lock
+StatusWith<int> CollectionRangeDeleter::_doDeletion(OperationContext* opCtx,
+ Collection* collection,
+ BSONObj const& keyPattern,
+ ChunkRange const& range,
+ int maxToDelete) {
+ NamespaceString const& nss = collection->ns();
// The IndexChunk has a keyPattern that may apply to more than one index - we need to
// select the index and get the full index keyPattern here.
- const IndexDescriptor* idx =
- collection->getIndexCatalog()->findShardKeyPrefixedIndex(opCtx, keyPattern, false);
+ auto catalog = collection->getIndexCatalog();
+ const IndexDescriptor* idx = catalog->findShardKeyPrefixedIndex(opCtx, keyPattern, false);
if (idx == NULL) {
- warning() << "Unable to find shard key index for " << keyPattern.toString() << " in "
- << _nss;
- return -1;
+ std::string msg = str::stream() << "Unable to find shard key index for "
+ << keyPattern.toString() << " in " << nss.ns();
+ log() << msg;
+ return {ErrorCodes::InternalError, msg};
}
- KeyPattern indexKeyPattern(idx->keyPattern().getOwned());
-
// Extend bounds to match the index we found
- const BSONObj& min =
- Helpers::toKeyFormat(indexKeyPattern.extendRangeBound(_rangeInProgress->getMin(), false));
- const BSONObj& max =
- Helpers::toKeyFormat(indexKeyPattern.extendRangeBound(_rangeInProgress->getMax(), false));
+ KeyPattern indexKeyPattern(idx->keyPattern().getOwned());
+ auto extend = [&](auto& key) {
+ return Helpers::toKeyFormat(indexKeyPattern.extendRangeBound(key, false));
+ };
+ const BSONObj& min = extend(range.getMin());
+ const BSONObj& max = extend(range.getMax());
- LOG(1) << "begin removal of " << min << " to " << max << " in " << _nss;
+ LOG(1) << "begin removal of " << min << " to " << max << " in " << nss.ns();
auto indexName = idx->indexName();
- IndexDescriptor* desc = collection->getIndexCatalog()->findIndexByName(opCtx, indexName);
- if (!desc) {
- warning() << "shard key index with name " << indexName << " on '" << _nss
- << "' was dropped";
- return -1;
+ IndexDescriptor* descriptor = collection->getIndexCatalog()->findIndexByName(opCtx, indexName);
+ if (!descriptor) {
+ std::string msg = str::stream() << "shard key index with name " << indexName << " on '"
+ << nss.ns() << "' was dropped";
+ log() << msg;
+ return {ErrorCodes::InternalError, msg};
}
int numDeleted = 0;
do {
- auto exec = InternalPlanner::indexScan(opCtx,
- collection,
- desc,
- min,
- max,
- BoundInclusion::kIncludeStartKeyOnly,
- PlanExecutor::YIELD_MANUAL,
- InternalPlanner::FORWARD,
- InternalPlanner::IXSCAN_FETCH);
+ auto bounds = BoundInclusion::kIncludeStartKeyOnly;
+ auto manual = PlanExecutor::YIELD_MANUAL;
+ auto forward = InternalPlanner::FORWARD;
+ auto fetch = InternalPlanner::IXSCAN_FETCH;
+ auto exec = InternalPlanner::indexScan(
+ opCtx, collection, descriptor, min, max, bounds, manual, forward, fetch);
RecordId rloc;
BSONObj obj;
PlanExecutor::ExecState state = exec->getNext(&obj, &rloc);
@@ -193,23 +177,66 @@ int CollectionRangeDeleter::_doDeletion(OperationContext* opCtx,
if (state == PlanExecutor::FAILURE || state == PlanExecutor::DEAD) {
warning(LogComponent::kSharding)
<< PlanExecutor::statestr(state) << " - cursor error while trying to delete " << min
- << " to " << max << " in " << _nss << ": " << WorkingSetCommon::toStatusString(obj)
+ << " to " << max << " in " << nss << ": " << WorkingSetCommon::toStatusString(obj)
<< ", stats: " << Explain::getWinningPlanStats(exec.get());
break;
}
invariant(PlanExecutor::ADVANCED == state);
WriteUnitOfWork wuow(opCtx);
- if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(opCtx, _nss)) {
- warning() << "stepped down from primary while deleting chunk; orphaning data in "
- << _nss << " in range [" << min << ", " << max << ")";
- break;
- }
- OpDebug* const nullOpDebug = nullptr;
- collection->deleteDocument(opCtx, rloc, nullOpDebug, true);
+ collection->deleteDocument(opCtx, rloc, nullptr, true);
wuow.commit();
} while (++numDeleted < maxToDelete);
return numDeleted;
}
+auto CollectionRangeDeleter::overlaps(ChunkRange const& range) -> DeleteNotification {
+ // start search with newest entries by using reverse iterators
+ auto it = find_if(_orphans.rbegin(), _orphans.rend(), [&](auto& cleanee) {
+ return bool(cleanee.range.overlapWith(range));
+ });
+ return it != _orphans.rend() ? it->notification : DeleteNotification();
+}
+
+void CollectionRangeDeleter::add(ChunkRange const& range) {
+ // We ignore the case of overlapping, or even equal, ranges.
+
+ // Deleting overlapping ranges is similarly quick.
+ _orphans.emplace_back(Deletion{ChunkRange(range.getMin().getOwned(), range.getMax().getOwned()),
+ std::make_shared<Notification<Status>>()});
+}
+
+void CollectionRangeDeleter::append(BSONObjBuilder* builder) const {
+ BSONArrayBuilder arr(builder->subarrayStart("rangesToClean"));
+ for (auto const& entry : _orphans) {
+ BSONObjBuilder obj;
+ entry.range.append(&obj);
+ arr.append(obj.done());
+ }
+ arr.done();
+}
+
+size_t CollectionRangeDeleter::size() const {
+ return _orphans.size();
+}
+
+bool CollectionRangeDeleter::isEmpty() const {
+ return _orphans.empty();
+}
+
+void CollectionRangeDeleter::clear() {
+ std::for_each(_orphans.begin(), _orphans.end(), [](auto& range) {
+ // Since deletion was not actually tried, we have no failures to report.
+ if (!*(range.notification)) {
+ range.notification->set(Status::OK()); // wake up anything waiting on it
+ }
+ });
+ _orphans.clear();
+}
+
+void CollectionRangeDeleter::_pop(Status result) {
+ _orphans.front().notification->set(result); // wake up waitForClean
+ _orphans.pop_front();
+}
+
} // namespace mongo
diff --git a/src/mongo/db/s/collection_range_deleter.h b/src/mongo/db/s/collection_range_deleter.h
index f611215a73d..2aa969e3d28 100644
--- a/src/mongo/db/s/collection_range_deleter.h
+++ b/src/mongo/db/s/collection_range_deleter.h
@@ -29,7 +29,9 @@
#include "mongo/base/disallow_copying.h"
#include "mongo/db/namespace_string.h"
+#include "mongo/executor/task_executor.h"
#include "mongo/s/catalog/type_chunk.h"
+#include "mongo/util/concurrency/notification.h"
namespace mongo {
@@ -41,40 +43,88 @@ class CollectionRangeDeleter {
MONGO_DISALLOW_COPYING(CollectionRangeDeleter);
public:
- CollectionRangeDeleter(NamespaceString nss);
+ /**
+ * Normally, construct with the collection name and ShardingState's dedicated executor.
+ */
+ CollectionRangeDeleter() = default;
+ ~CollectionRangeDeleter();
+
+ using DeleteNotification = std::shared_ptr<Notification<Status>>;
+
+ /**
+ * Adds a new range to be cleaned up by the cleaner thread.
+ */
+ void add(const ChunkRange& range);
+
+ /**
+ * Reports whether the argument range overlaps any of the ranges to clean. If there is overlap,
+ * it returns a notification that will be completed when the currently newest overlapping
+ * range is no longer scheduled. Its value indicates whether it has been successfully removed.
+ * If there is no overlap, the result is nullptr. After a successful removal, the caller
+ * should call again to ensure no other range overlaps the argument.
+ * (See MigrationDestinationManager::waitForClean and MetadataManager::trackCleanup for an
+ * example use.)
+ */
+ DeleteNotification overlaps(ChunkRange const& range);
/**
- * Starts deleting ranges and cleans up this object when it is finished.
+ * Reports the number of ranges remaining to be cleaned up.
*/
- void run();
+ size_t size() const;
+
+ bool isEmpty() const;
+
+ /*
+ * Notify anything waiting on ranges scheduled, before discarding the ranges.
+ */
+ void clear();
+
+ /*
+ * Append a representation of self to the specified builder.
+ */
+ void append(BSONObjBuilder* builder) const;
/**
- * Acquires the collection IX lock and checks whether there are new entries for the collection's
- * rangesToClean structure. If there are, deletes up to maxToDelete entries and yields using
- * the standard query yielding logic.
+ * If any ranges are scheduled to clean, deletes up to maxToDelete documents, notifying watchers
+ * of ranges as they are completed. Uses specified lock to serialize access to *this.
*
- * Returns true if there are more entries in rangesToClean, false if there is no more progress
- * to be made.
+ * Returns true if it should be scheduled to run again, false if there is no more progress to be
+ * made.
*/
- bool cleanupNextRange(OperationContext* opCtx, int maxToDelete);
+ bool cleanUpNextRange(OperationContext*,
+ Collection*,
+ BSONObj const& keyPattern,
+ stdx::mutex* lock,
+ int maxToDelete);
private:
/**
* Performs the deletion of up to maxToDelete entries within the range in progress.
- * This function will invariant if called while _rangeInProgress is not set.
*
- * Returns the number of documents deleted (0 if deletion is finished), or -1 for error.
+ * Returns the number of documents deleted, 0 if done with the range, or bad status if deleting
+ * the range failed.
*/
- int _doDeletion(OperationContext* opCtx,
- Collection* collection,
- const BSONObj& keyPattern,
- int maxToDelete);
+ StatusWith<int> _doDeletion(OperationContext* opCtx,
+ Collection* collection,
+ const BSONObj& keyPattern,
+ ChunkRange const& range,
+ int maxToDelete);
- NamespaceString _nss;
+ /**
+ * Removes the latest-scheduled range from the ranges to be cleaned up, and notifies any
+ * interested callers of this->overlaps(range) with specified status.
+ */
+ void _pop(Status status);
- // Holds a range for which deletion has begun. If empty, then a new range
- // must be requested from rangesToClean
- boost::optional<ChunkRange> _rangeInProgress;
+ /**
+ * Ranges scheduled for deletion. The front of the list will be in active process of deletion.
+ * As each range is completed, its notification is signaled before it is popped.
+ */
+ struct Deletion {
+ ChunkRange const range;
+ DeleteNotification const notification;
+ };
+ std::list<Deletion> _orphans;
};
} // namespace mongo
diff --git a/src/mongo/db/s/collection_range_deleter_test.cpp b/src/mongo/db/s/collection_range_deleter_test.cpp
index 2927379ff1c..9ff50c7eff6 100644
--- a/src/mongo/db/s/collection_range_deleter_test.cpp
+++ b/src/mongo/db/s/collection_range_deleter_test.cpp
@@ -30,55 +30,90 @@
#include "mongo/db/s/collection_range_deleter.h"
+#include "mongo/bson/bsonobj.h"
+#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/client/query.h"
+#include "mongo/client/remote_command_targeter_mock.h"
#include "mongo/db/client.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/index/index_descriptor.h"
#include "mongo/db/keypattern.h"
#include "mongo/db/repl/replication_coordinator_global.h"
#include "mongo/db/repl/replication_coordinator_mock.h"
#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/db/service_context_d_test_fixture.h"
+#include "mongo/s/catalog/dist_lock_catalog_impl.h"
+#include "mongo/s/catalog/dist_lock_manager_mock.h"
+#include "mongo/s/catalog/sharding_catalog_client_mock.h"
#include "mongo/s/chunk_version.h"
+#include "mongo/s/client/shard_registry.h"
+#include "mongo/s/sharding_mongod_test_fixture.h"
namespace mongo {
using unittest::assertGet;
-
-const NamespaceString kNamespaceString = NamespaceString("foo", "bar");
+const NamespaceString kNss = NamespaceString("foo", "bar");
const std::string kPattern = "_id";
const BSONObj kKeyPattern = BSON(kPattern << 1);
+const std::string kShardName{"a"};
+const HostAndPort dummyHost("dummy", 123);
+
+// const auto kIxVer = IndexDescriptor::IndexVersion::kV2;
-class CollectionRangeDeleterTest : public ServiceContextMongoDTest {
+class CollectionRangeDeleterTest : public ShardingMongodTestFixture {
protected:
- ServiceContext::UniqueOperationContext _opCtx;
- MetadataManager* _metadataManager;
std::unique_ptr<DBDirectClient> _dbDirectClient;
+ stdx::mutex _dummyLock;
+
+ bool next(CollectionRangeDeleter& rangeDeleter, Collection* collection, int maxToDelete) {
+ return rangeDeleter.cleanUpNextRange(
+ operationContext(), collection, kKeyPattern, &_dummyLock, maxToDelete);
+ }
+ std::shared_ptr<RemoteCommandTargeterMock> configTargeter() {
+ return RemoteCommandTargeterMock::get(shardRegistry()->getConfigShard()->getTargeter());
+ }
- OperationContext* operationContext();
private:
void setUp() override;
void tearDown() override;
+
+ std::unique_ptr<DistLockCatalog> makeDistLockCatalog(ShardRegistry* shardRegistry) override {
+ invariant(shardRegistry);
+ return stdx::make_unique<DistLockCatalogImpl>(shardRegistry);
+ }
+
+ std::unique_ptr<DistLockManager> makeDistLockManager(
+ std::unique_ptr<DistLockCatalog> distLockCatalog) override {
+ return stdx::make_unique<DistLockManagerMock>(std::move(distLockCatalog));
+ }
+
+ std::unique_ptr<ShardingCatalogClient> makeShardingCatalogClient(
+ std::unique_ptr<DistLockManager> distLockManager) override {
+ return stdx::make_unique<ShardingCatalogClientMock>(std::move(distLockManager));
+ }
};
void CollectionRangeDeleterTest::setUp() {
- ServiceContextMongoDTest::setUp();
- const repl::ReplSettings replSettings = {};
- repl::setGlobalReplicationCoordinator(
- new repl::ReplicationCoordinatorMock(getServiceContext(), replSettings));
- repl::getGlobalReplicationCoordinator()->setFollowerMode(repl::MemberState::RS_PRIMARY);
- _opCtx = getServiceContext()->makeOperationContext(&cc());
- _dbDirectClient = stdx::make_unique<DBDirectClient>(operationContext());
+ serverGlobalParams.clusterRole = ClusterRole::ShardServer;
+ ShardingMongodTestFixture::setUp();
+ replicationCoordinator()->alwaysAllowWrites(true);
+ initializeGlobalShardingStateForMongodForTest(ConnectionString(dummyHost));
+
+ // RemoteCommandTargeterMock::get(shardRegistry()->getConfigShard()->getTargeter())
+ // ->setConnectionStringReturnValue(kConfigConnStr);
+
+ configTargeter()->setFindHostReturnValue(dummyHost);
+ _dbDirectClient = stdx::make_unique<DBDirectClient>(operationContext());
+ ASSERT(_dbDirectClient->createCollection(kNss.ns()));
{
+ AutoGetCollection autoColl(operationContext(), kNss, MODE_IX);
+ auto collectionShardingState = CollectionShardingState::get(operationContext(), kNss);
const OID epoch = OID::gen();
-
- AutoGetCollection autoColl(operationContext(), kNamespaceString, MODE_IX);
- auto collectionShardingState =
- CollectionShardingState::get(operationContext(), kNamespaceString);
collectionShardingState->refreshMetadata(
operationContext(),
stdx::make_unique<CollectionMetadata>(
@@ -86,208 +121,149 @@ void CollectionRangeDeleterTest::setUp() {
ChunkVersion(1, 0, epoch),
ChunkVersion(0, 0, epoch),
SimpleBSONObjComparator::kInstance.makeBSONObjIndexedMap<CachedChunkInfo>()));
- _metadataManager = collectionShardingState->getMetadataManagerForTest();
}
}
void CollectionRangeDeleterTest::tearDown() {
{
- AutoGetCollection autoColl(operationContext(), kNamespaceString, MODE_IX);
- auto collectionShardingState =
- CollectionShardingState::get(operationContext(), kNamespaceString);
+ AutoGetCollection autoColl(operationContext(), kNss, MODE_IX);
+ auto collectionShardingState = CollectionShardingState::get(operationContext(), kNss);
collectionShardingState->refreshMetadata(operationContext(), nullptr);
}
_dbDirectClient.reset();
- _opCtx.reset();
- ServiceContextMongoDTest::tearDown();
- repl::setGlobalReplicationCoordinator(nullptr);
-}
-
-OperationContext* CollectionRangeDeleterTest::operationContext() {
- invariant(_opCtx);
- return _opCtx.get();
+ ShardingMongodTestFixture::tearDown();
}
namespace {
// Tests the case that there is nothing in the database.
TEST_F(CollectionRangeDeleterTest, EmptyDatabase) {
- CollectionRangeDeleter rangeDeleter(kNamespaceString);
- ASSERT_FALSE(rangeDeleter.cleanupNextRange(operationContext(), 1));
+ CollectionRangeDeleter rangeDeleter;
+ AutoGetCollection autoColl(operationContext(), kNss, MODE_IX);
+ auto collection = autoColl.getCollection();
+ ASSERT_FALSE(next(rangeDeleter, collection, 1));
+}
+
+#if 0
+// Just inserts a record.
+TEST_F(CollectionRangeDeleterTest, NothingToDo) {
+ CollectionRangeDeleter rangeDeleter;
+ const BSONObj insertedDoc = BSON(kPattern << 25);
+ _dbDirectClient->insert(kNss.toString(), insertedDoc);
+ ASSERT_BSONOBJ_EQ(insertedDoc,
+ _dbDirectClient->findOne(kNss.toString(), QUERY(kPattern << 25)));
}
+#endif
// Tests the case that there is data, but it is not in a range to clean.
TEST_F(CollectionRangeDeleterTest, NoDataInGivenRangeToClean) {
- CollectionRangeDeleter rangeDeleter(kNamespaceString);
+ CollectionRangeDeleter rangeDeleter;
const BSONObj insertedDoc = BSON(kPattern << 25);
-
- _dbDirectClient->insert(kNamespaceString.toString(), insertedDoc);
+ AutoGetCollection autoColl(operationContext(), kNss, MODE_IX);
+ auto collection = autoColl.getCollection();
+ _dbDirectClient->insert(kNss.toString(), insertedDoc);
ASSERT_BSONOBJ_EQ(insertedDoc,
- _dbDirectClient->findOne(kNamespaceString.toString(), QUERY(kPattern << 25)));
+ _dbDirectClient->findOne(kNss.toString(), QUERY(kPattern << 25)));
- _metadataManager->addRangeToClean(ChunkRange(BSON(kPattern << 0), BSON(kPattern << 10)));
- ASSERT_FALSE(rangeDeleter.cleanupNextRange(operationContext(), 1));
+ rangeDeleter.add(ChunkRange(BSON(kPattern << 0), BSON(kPattern << 10)));
+ ASSERT_TRUE(next(rangeDeleter, collection, 1));
ASSERT_BSONOBJ_EQ(insertedDoc,
- _dbDirectClient->findOne(kNamespaceString.toString(), QUERY(kPattern << 25)));
+ _dbDirectClient->findOne(kNss.toString(), QUERY(kPattern << 25)));
+
+ ASSERT_FALSE(next(rangeDeleter, collection, 1));
}
// Tests the case that there is a single document within a range to clean.
TEST_F(CollectionRangeDeleterTest, OneDocumentInOneRangeToClean) {
- CollectionRangeDeleter rangeDeleter(kNamespaceString);
+ CollectionRangeDeleter rangeDeleter;
const BSONObj insertedDoc = BSON(kPattern << 5);
+ AutoGetCollection autoColl(operationContext(), kNss, MODE_IX);
+ auto collection = autoColl.getCollection();
- _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 5));
- ASSERT_BSONOBJ_EQ(insertedDoc,
- _dbDirectClient->findOne(kNamespaceString.toString(), QUERY(kPattern << 5)));
+ _dbDirectClient->insert(kNss.toString(), BSON(kPattern << 5));
+ ASSERT_BSONOBJ_EQ(insertedDoc, _dbDirectClient->findOne(kNss.toString(), QUERY(kPattern << 5)));
- _metadataManager->addRangeToClean(ChunkRange(BSON(kPattern << 0), BSON(kPattern << 10)));
+ rangeDeleter.add(ChunkRange(BSON(kPattern << 0), BSON(kPattern << 10)));
- ASSERT_TRUE(rangeDeleter.cleanupNextRange(operationContext(), 1));
- ASSERT_TRUE(
- _dbDirectClient->findOne(kNamespaceString.toString(), QUERY(kPattern << 5)).isEmpty());
-
- ASSERT_FALSE(rangeDeleter.cleanupNextRange(operationContext(), 1));
+ ASSERT_TRUE(next(rangeDeleter, collection, 1));
+ ASSERT_TRUE(next(rangeDeleter, collection, 1));
+ ASSERT_TRUE(_dbDirectClient->findOne(kNss.toString(), QUERY(kPattern << 5)).isEmpty());
+ ASSERT_FALSE(next(rangeDeleter, collection, 1));
}
// Tests the case that there are multiple documents within a range to clean.
TEST_F(CollectionRangeDeleterTest, MultipleDocumentsInOneRangeToClean) {
- CollectionRangeDeleter rangeDeleter(kNamespaceString);
- _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 1));
- _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 2));
- _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 3));
- ASSERT_EQUALS(3ULL,
- _dbDirectClient->count(kNamespaceString.toString(), BSON(kPattern << LT << 5)));
-
- _metadataManager->addRangeToClean(ChunkRange(BSON(kPattern << 0), BSON(kPattern << 10)));
-
- ASSERT_TRUE(rangeDeleter.cleanupNextRange(operationContext(), 100));
- ASSERT_EQUALS(0ULL,
- _dbDirectClient->count(kNamespaceString.toString(), BSON(kPattern << LT << 5)));
-
- ASSERT_FALSE(rangeDeleter.cleanupNextRange(operationContext(), 100));
+ CollectionRangeDeleter rangeDeleter;
+ AutoGetCollection autoColl(operationContext(), kNss, MODE_IX);
+ auto collection = autoColl.getCollection();
+ _dbDirectClient->insert(kNss.toString(), BSON(kPattern << 1));
+ _dbDirectClient->insert(kNss.toString(), BSON(kPattern << 2));
+ _dbDirectClient->insert(kNss.toString(), BSON(kPattern << 3));
+ ASSERT_EQUALS(3ULL, _dbDirectClient->count(kNss.toString(), BSON(kPattern << LT << 5)));
+
+ rangeDeleter.add(ChunkRange(BSON(kPattern << 0), BSON(kPattern << 10)));
+
+ ASSERT_TRUE(next(rangeDeleter, collection, 100));
+ ASSERT_TRUE(next(rangeDeleter, collection, 100));
+ ASSERT_EQUALS(0ULL, _dbDirectClient->count(kNss.toString(), BSON(kPattern << LT << 5)));
+ ASSERT_FALSE(next(rangeDeleter, collection, 100));
}
// Tests the case that there are multiple documents within a range to clean, and the range deleter
// has a max deletion rate of one document per run.
-TEST_F(CollectionRangeDeleterTest, MultipleCleanupNextRangeCallsToCleanOneRange) {
- CollectionRangeDeleter rangeDeleter(kNamespaceString);
- _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 1));
- _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 2));
- _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 3));
- ASSERT_EQUALS(3ULL,
- _dbDirectClient->count(kNamespaceString.toString(), BSON(kPattern << LT << 5)));
-
- _metadataManager->addRangeToClean(ChunkRange(BSON(kPattern << 0), BSON(kPattern << 10)));
-
- ASSERT_TRUE(rangeDeleter.cleanupNextRange(operationContext(), 1));
- ASSERT_EQUALS(2ULL,
- _dbDirectClient->count(kNamespaceString.toString(), BSON(kPattern << LT << 5)));
-
- ASSERT_TRUE(rangeDeleter.cleanupNextRange(operationContext(), 1));
- ASSERT_EQUALS(1ULL,
- _dbDirectClient->count(kNamespaceString.toString(), BSON(kPattern << LT << 5)));
-
- ASSERT_TRUE(rangeDeleter.cleanupNextRange(operationContext(), 1));
- ASSERT_EQUALS(0ULL,
- _dbDirectClient->count(kNamespaceString.toString(), BSON(kPattern << LT << 5)));
-
- ASSERT_FALSE(rangeDeleter.cleanupNextRange(operationContext(), 1));
+TEST_F(CollectionRangeDeleterTest, MultipleCleanupNextRangeCalls) {
+ CollectionRangeDeleter rangeDeleter;
+ AutoGetCollection autoColl(operationContext(), kNss, MODE_IX);
+ auto collection = autoColl.getCollection();
+ _dbDirectClient->insert(kNss.toString(), BSON(kPattern << 1));
+ _dbDirectClient->insert(kNss.toString(), BSON(kPattern << 2));
+ _dbDirectClient->insert(kNss.toString(), BSON(kPattern << 3));
+ ASSERT_EQUALS(3ULL, _dbDirectClient->count(kNss.toString(), BSON(kPattern << LT << 5)));
+
+ rangeDeleter.add(ChunkRange(BSON(kPattern << 0), BSON(kPattern << 10)));
+
+ ASSERT_TRUE(next(rangeDeleter, collection, 1));
+ ASSERT_EQUALS(2ULL, _dbDirectClient->count(kNss.toString(), BSON(kPattern << LT << 5)));
+
+ ASSERT_TRUE(next(rangeDeleter, collection, 1));
+ ASSERT_EQUALS(1ULL, _dbDirectClient->count(kNss.toString(), BSON(kPattern << LT << 5)));
+
+ ASSERT_TRUE(next(rangeDeleter, collection, 1));
+ ASSERT_TRUE(next(rangeDeleter, collection, 1));
+ ASSERT_EQUALS(0ULL, _dbDirectClient->count(kNss.toString(), BSON(kPattern << LT << 5)));
+ ASSERT_FALSE(next(rangeDeleter, collection, 1));
}
// Tests the case that there are two ranges to clean, each containing multiple documents.
TEST_F(CollectionRangeDeleterTest, MultipleDocumentsInMultipleRangesToClean) {
- CollectionRangeDeleter rangeDeleter(kNamespaceString);
- _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 1));
- _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 2));
- _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 3));
- _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 4));
- _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 5));
- _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 6));
- ASSERT_EQUALS(6ULL,
- _dbDirectClient->count(kNamespaceString.toString(), BSON(kPattern << LT << 10)));
+ CollectionRangeDeleter rangeDeleter;
+ AutoGetCollection autoColl(operationContext(), kNss, MODE_IX);
+ auto collection = autoColl.getCollection();
+ _dbDirectClient->insert(kNss.toString(), BSON(kPattern << 1));
+ _dbDirectClient->insert(kNss.toString(), BSON(kPattern << 2));
+ _dbDirectClient->insert(kNss.toString(), BSON(kPattern << 3));
+ _dbDirectClient->insert(kNss.toString(), BSON(kPattern << 4));
+ _dbDirectClient->insert(kNss.toString(), BSON(kPattern << 5));
+ _dbDirectClient->insert(kNss.toString(), BSON(kPattern << 6));
+ ASSERT_EQUALS(6ULL, _dbDirectClient->count(kNss.toString(), BSON(kPattern << LT << 10)));
const ChunkRange chunkRange1 = ChunkRange(BSON(kPattern << 0), BSON(kPattern << 4));
const ChunkRange chunkRange2 = ChunkRange(BSON(kPattern << 4), BSON(kPattern << 7));
- _metadataManager->addRangeToClean(chunkRange1);
- _metadataManager->addRangeToClean(chunkRange2);
-
- ASSERT_TRUE(rangeDeleter.cleanupNextRange(operationContext(), 100));
- ASSERT_EQUALS(0ULL,
- _dbDirectClient->count(kNamespaceString.toString(), BSON(kPattern << LT << 4)));
- ASSERT_EQUALS(3ULL,
- _dbDirectClient->count(kNamespaceString.toString(), BSON(kPattern << LT << 10)));
-
- ASSERT_TRUE(rangeDeleter.cleanupNextRange(operationContext(), 100));
- ASSERT_TRUE(rangeDeleter.cleanupNextRange(operationContext(), 100));
- ASSERT_EQUALS(0ULL,
- _dbDirectClient->count(kNamespaceString.toString(), BSON(kPattern << LT << 10)));
-
- ASSERT_FALSE(rangeDeleter.cleanupNextRange(operationContext(), 1));
-}
-
-// Tests the case that there is a range to clean, and halfway through a deletion a chunk
-// within the range is received
-TEST_F(CollectionRangeDeleterTest, MultipleCallstoCleanupNextRangeWithChunkReceive) {
- CollectionRangeDeleter rangeDeleter(kNamespaceString);
- _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 1));
- _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 2));
- _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 3));
- _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 4));
- const BSONObj insertedDoc5 = BSON(kPattern << 5); // not to be deleted
- _dbDirectClient->insert(kNamespaceString.toString(), insertedDoc5);
- ASSERT_EQUALS(5ULL,
- _dbDirectClient->count(kNamespaceString.toString(), BSON(kPattern << LT << 10)));
-
- _metadataManager->addRangeToClean(ChunkRange(BSON(kPattern << 0), BSON(kPattern << 10)));
-
- ASSERT_TRUE(rangeDeleter.cleanupNextRange(operationContext(), 2));
- ASSERT_EQUALS(3ULL,
- _dbDirectClient->count(kNamespaceString.toString(), BSON(kPattern << LT << 10)));
-
- _metadataManager->beginReceive(ChunkRange(BSON(kPattern << 5), BSON(kPattern << 6)));
-
- // insertedDoc5 is no longer eligible for deletion
-
- ASSERT_TRUE(rangeDeleter.cleanupNextRange(operationContext(), 2));
- ASSERT_EQUALS(1ULL,
- _dbDirectClient->count(kNamespaceString.toString(), BSON(kPattern << LT << 10)));
-
- ASSERT_TRUE(rangeDeleter.cleanupNextRange(operationContext(), 2));
- ASSERT_EQUALS(1ULL,
- _dbDirectClient->count(kNamespaceString.toString(), BSON(kPattern << LT << 10)));
-
- ASSERT_FALSE(rangeDeleter.cleanupNextRange(operationContext(), 2));
-
- ASSERT_BSONOBJ_EQ(
- insertedDoc5,
- _dbDirectClient->findOne(kNamespaceString.toString(), QUERY(kPattern << GT << 0)));
-}
-
-TEST_F(CollectionRangeDeleterTest, CalltoCleanupNextRangeWithChunkReceive) {
- CollectionRangeDeleter rangeDeleter(kNamespaceString);
- _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 1));
- _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 2));
- _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 3));
- _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 4));
- ASSERT_EQUALS(4ULL,
- _dbDirectClient->count(kNamespaceString.toString(), BSON(kPattern << LT << 10)));
-
- _metadataManager->addRangeToClean(ChunkRange(BSON(kPattern << 0), BSON(kPattern << 10)));
-
- ASSERT_TRUE(rangeDeleter.cleanupNextRange(operationContext(), 2));
- ASSERT_EQUALS(2ULL,
- _dbDirectClient->count(kNamespaceString.toString(), BSON(kPattern << LT << 10)));
-
- _metadataManager->beginReceive(ChunkRange(BSON(kPattern << 0), BSON(kPattern << 10)));
-
- ASSERT_FALSE(rangeDeleter.cleanupNextRange(operationContext(), 2));
-
- ASSERT_EQUALS(2ULL,
- _dbDirectClient->count(kNamespaceString.toString(), BSON(kPattern << LT << 10)));
+ rangeDeleter.add(chunkRange1);
+ rangeDeleter.add(chunkRange2);
+
+ ASSERT_TRUE(next(rangeDeleter, collection, 100));
+ ASSERT_EQUALS(0ULL, _dbDirectClient->count(kNss.toString(), BSON(kPattern << LT << 4)));
+ ASSERT_EQUALS(3ULL, _dbDirectClient->count(kNss.toString(), BSON(kPattern << LT << 10)));
+
+ ASSERT_TRUE(next(rangeDeleter, collection, 100));
+ ASSERT_TRUE(next(rangeDeleter, collection, 100));
+ ASSERT_TRUE(next(rangeDeleter, collection, 1));
+ ASSERT_EQUALS(0ULL, _dbDirectClient->count(kNss.toString(), BSON(kPattern << LT << 10)));
+ ASSERT_FALSE(next(rangeDeleter, collection, 1));
}
} // unnamed namespace
-
} // namespace mongo
diff --git a/src/mongo/db/s/collection_sharding_state.cpp b/src/mongo/db/s/collection_sharding_state.cpp
index 24746d7880e..5d362fcde5c 100644
--- a/src/mongo/db/s/collection_sharding_state.cpp
+++ b/src/mongo/db/s/collection_sharding_state.cpp
@@ -34,6 +34,7 @@
#include "mongo/db/client.h"
#include "mongo/db/concurrency/lock_state.h"
+#include "mongo/db/db_raii.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/repl/replication_coordinator_global.h"
#include "mongo/db/s/collection_metadata.h"
@@ -84,7 +85,8 @@ private:
} // unnamed namespace
CollectionShardingState::CollectionShardingState(ServiceContext* sc, NamespaceString nss)
- : _nss(std::move(nss)), _metadataManager{sc, _nss} {}
+ : _nss(std::move(nss)),
+ _metadataManager{sc, _nss, ShardingState::get(sc)->getRangeDeleterTaskExecutor()} {}
CollectionShardingState::~CollectionShardingState() {
invariant(!_sourceMgr);
@@ -119,14 +121,18 @@ void CollectionShardingState::markNotShardedAtStepdown() {
_metadataManager.refreshActiveMetadata(nullptr);
}
-void CollectionShardingState::beginReceive(const ChunkRange& range) {
- _metadataManager.beginReceive(range);
+bool CollectionShardingState::beginReceive(ChunkRange const& range) {
+ return _metadataManager.beginReceive(range);
}
void CollectionShardingState::forgetReceive(const ChunkRange& range) {
_metadataManager.forgetReceive(range);
}
+void CollectionShardingState::cleanUpRange(ChunkRange const& range) {
+ _metadataManager.cleanUpRange(range);
+}
+
MigrationSourceManager* CollectionShardingState::getMigrationSourceManager() {
return _sourceMgr;
}
@@ -172,6 +178,52 @@ bool CollectionShardingState::collectionIsSharded() {
return true;
}
+bool CollectionShardingState::cleanUpNextRange(OperationContext* opCtx,
+ Collection* collection,
+ int maxToDelete) {
+ auto& keyPattern = getMetadata()->getKeyPattern();
+ return _metadataManager.cleanUpNextRange(opCtx, collection, keyPattern, maxToDelete);
+}
+
+// Call with collection unlocked. Note that the CollectionShardingState object involved might not
+// exist anymore at the time of the call, or indeed anytime outside the AutoGetCollection block, so
+// anything that might alias something in it must be copied first.
+
+Status CollectionShardingState::waitForClean(OperationContext* opCtx,
+ NamespaceString nss,
+ ChunkRange orphanRange) {
+ do {
+ auto stillScheduled = CollectionShardingState::CleanupNotification(nullptr);
+ {
+ AutoGetCollection autoColl(opCtx, nss, MODE_IX);
+ // First, see if collection was dropped.
+ auto css = CollectionShardingState::get(opCtx, nss);
+ auto metadata = css->getMetadata();
+ if (!metadata) {
+ throw DBException("Collection being migrated was dropped",
+ ErrorCodes::StaleShardVersion);
+ }
+
+ stillScheduled = css->_metadataManager.trackCleanup(orphanRange);
+ if (stillScheduled == nullptr) {
+ return Status::OK(); // done!
+ }
+ } // drop collection lock
+
+ log() << "Waiting to delete " << nss.ns() << " range " << redact(orphanRange.toString());
+ Status result = stillScheduled->get(opCtx);
+ if (!result.isOK()) {
+ return {result.code(),
+ str::stream() << "Failed to delete orphaned collection " << nss.ns()
+ << " range "
+ << orphanRange.toString()
+ << ": "
+ << result.reason()};
+ }
+ } while (true);
+ MONGO_UNREACHABLE;
+}
+
bool CollectionShardingState::isDocumentInMigratingChunk(OperationContext* opCtx,
const BSONObj& doc) {
dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX));
@@ -317,8 +369,7 @@ bool CollectionShardingState::_checkShardVersionOk(OperationContext* opCtx,
if (!info) {
// There is no shard version information on either 'opCtx' or 'client'. This means that
// the operation represented by 'opCtx' is unversioned, and the shard version is always
- // OK
- // for unversioned operations.
+ // OK for unversioned operations.
return true;
}
diff --git a/src/mongo/db/s/collection_sharding_state.h b/src/mongo/db/s/collection_sharding_state.h
index 5bbc2b9c576..af27a07cd14 100644
--- a/src/mongo/db/s/collection_sharding_state.h
+++ b/src/mongo/db/s/collection_sharding_state.h
@@ -34,7 +34,9 @@
#include "mongo/base/disallow_copying.h"
#include "mongo/base/string_data.h"
#include "mongo/db/namespace_string.h"
+#include "mongo/db/s/collection_range_deleter.h"
#include "mongo/db/s/metadata_manager.h"
+#include "mongo/util/concurrency/notification.h"
namespace mongo {
@@ -90,6 +92,27 @@ public:
ScopedCollectionMetadata getMetadata();
/**
+ * Returns ranges being migrated in.
+ */
+ RangeMap const& getReceiveMap() const {
+ return _metadataManager.getReceiveMap();
+ }
+
+ /**
+ * Returns true if the specified key is in a range being received.
+ */
+ bool keyIsPending(const BSONObj& key) const {
+ return _metadataManager.keyIsPending(key);
+ }
+
+ /**
+ * BSON output of the pending metadata into a BSONArray
+ */
+ void toBSONPending(BSONArrayBuilder& bb) const {
+ _metadataManager.toBSONPending(bb);
+ }
+
+ /**
* Updates the metadata based on changes received from the config server and also resolves the
* pending receives map in case some of these pending receives have completed or have been
* abandoned. If newMetadata is null, unshard the collection.
@@ -105,20 +128,24 @@ public:
void markNotShardedAtStepdown();
/**
- * Modifies the collection's sharding state to indicate that it is beginning to receive the
- * given ChunkRange.
+ * Schedules any documents in the range for immediate cleanup iff no running queries can depend
+ * on them, and adds the range to the list of pending ranges. Otherwise, returns false.
*/
- void beginReceive(const ChunkRange& range);
+ bool beginReceive(ChunkRange const& range);
/*
- * Modifies the collection's sharding state to indicate that the previous pending migration
- * failed. If the range was not previously pending, this function will crash the server.
- *
- * This function is the mirror image of beginReceive.
+ * Removes the range from the list of pending ranges, and schedules any documents in the range
+ * for immediate cleanup.
*/
void forgetReceive(const ChunkRange& range);
/**
+ * Schedules documents in the range for cleanup after any running queries that may depend on
+ * them have terminated.
+ */
+ void cleanUpRange(ChunkRange const& range);
+
+ /**
* Returns the active migration source manager, if one is available.
*/
MigrationSourceManager* getMigrationSourceManager();
@@ -154,6 +181,28 @@ public:
*/
bool collectionIsSharded();
+ /**
+ * Tracks deletion of any documents within the range, returning when deletion is complete.
+ * Throws if the collection is dropped while it sleeps. Call this with the collection unlocked.
+ */
+ static Status waitForClean(OperationContext*, NamespaceString, ChunkRange);
+
+ using CleanupNotification = MetadataManager::CleanupNotification;
+ /**
+ * Reports whether any part of the argument range is still scheduled for deletion. If not,
+ * returns nullptr. Otherwise, returns a notification n such that n->get(opCtx) will wake when
+ * deletion of a range (possibly the one of interest) is completed. This should be called
+ * again after each wakeup until it returns nullptr, because there can be more than one range
+ * scheduled for deletion that overlaps its argument.
+ */
+ CleanupNotification trackCleanup(ChunkRange const& range);
+
+ /**
+ * Called from the range deletion executor, immediately deletes a the next scheduled range of
+ * documents. Returns true if it should be scheduled to be called again.
+ */
+ bool cleanUpNextRange(OperationContext* opCtx, Collection* collection, int maxToDelete);
+
// Replication subsystem hooks. If this collection is serving as a source for migration, these
// methods inform it of any changes to its contents.
@@ -167,11 +216,6 @@ public:
void onDropCollection(OperationContext* opCtx, const NamespaceString& collectionName);
- MetadataManager* getMetadataManagerForTest() {
- return &_metadataManager;
- }
-
-
private:
/**
* Checks whether the shard version of the operation matches that of the collection.
@@ -191,7 +235,7 @@ private:
ChunkVersion* expectedShardVersion,
ChunkVersion* actualShardVersion);
- // Namespace to which this state belongs.
+ // Namespace this state belongs to.
const NamespaceString _nss;
// Contains all the metadata associated with this collection.
@@ -203,8 +247,6 @@ private:
//
// NOTE: The value is not owned by this class.
MigrationSourceManager* _sourceMgr{nullptr};
-
- friend class CollectionRangeDeleter;
};
} // namespace mongo
diff --git a/src/mongo/db/s/get_shard_version_command.cpp b/src/mongo/db/s/get_shard_version_command.cpp
index 86796a4ef50..3fafe5db13d 100644
--- a/src/mongo/db/s/get_shard_version_command.cpp
+++ b/src/mongo/db/s/get_shard_version_command.cpp
@@ -131,9 +131,11 @@ public:
metadata->toBSONChunks(chunksArr);
chunksArr.doneFast();
- BSONArrayBuilder pendingArr(metadataBuilder.subarrayStart("pending"));
- metadata->toBSONPending(pendingArr);
- pendingArr.doneFast();
+ if (css) {
+ BSONArrayBuilder pendingArr(metadataBuilder.subarrayStart("pending"));
+ css->toBSONPending(pendingArr);
+ pendingArr.doneFast();
+ }
}
metadataBuilder.doneFast();
}
diff --git a/src/mongo/db/s/merge_chunks_command.cpp b/src/mongo/db/s/merge_chunks_command.cpp
index 98975244838..f03e7948374 100644
--- a/src/mongo/db/s/merge_chunks_command.cpp
+++ b/src/mongo/db/s/merge_chunks_command.cpp
@@ -133,7 +133,7 @@ Status mergeChunks(OperationContext* opCtx,
AutoGetCollection autoColl(opCtx, nss, MODE_IS);
metadata = CollectionShardingState::get(opCtx, nss.ns())->getMetadata();
- if (!metadata) {
+ if (!metadata || metadata->getKeyPattern().isEmpty()) {
std::string errmsg = stream() << "could not merge chunks, collection " << nss.ns()
<< " is not sharded";
diff --git a/src/mongo/db/s/metadata_manager.cpp b/src/mongo/db/s/metadata_manager.cpp
index faa062e9476..c04cbf0dab6 100644
--- a/src/mongo/db/s/metadata_manager.cpp
+++ b/src/mongo/db/s/metadata_manager.cpp
@@ -33,27 +33,38 @@
#include "mongo/db/s/metadata_manager.h"
#include "mongo/bson/simple_bsonobj_comparator.h"
+#include "mongo/bson/util/builder.h"
+#include "mongo/db/db_raii.h"
+#include "mongo/db/query/internal_plans.h"
#include "mongo/db/range_arithmetic.h"
#include "mongo/db/s/collection_range_deleter.h"
+#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/stdx/memory.h"
+#include "mongo/util/assert_util.h"
#include "mongo/util/log.h"
namespace mongo {
-using CallbackArgs = executor::TaskExecutor::CallbackArgs;
+using TaskExecutor = executor::TaskExecutor;
+using CallbackArgs = TaskExecutor::CallbackArgs;
-MetadataManager::MetadataManager(ServiceContext* sc, NamespaceString nss)
+MetadataManager::MetadataManager(ServiceContext* sc, NamespaceString nss, TaskExecutor* executor)
: _nss(std::move(nss)),
_serviceContext(sc),
_activeMetadataTracker(stdx::make_unique<CollectionMetadataTracker>(nullptr)),
_receivingChunks(SimpleBSONObjComparator::kInstance.makeBSONObjIndexedMap<CachedChunkInfo>()),
- _rangesToClean(
- SimpleBSONObjComparator::kInstance.makeBSONObjIndexedMap<RangeToCleanDescriptor>()) {}
+ _notification(std::make_shared<Notification<Status>>()),
+ _executor(executor),
+ _rangesToClean() {}
MetadataManager::~MetadataManager() {
stdx::lock_guard<stdx::mutex> scopedLock(_managerLock);
- invariant(!_activeMetadataTracker || _activeMetadataTracker->usageCounter == 0);
+ // wake everybody up to see us die
+ if (!*_notification) {
+ _notification->set(Status::OK());
+ }
+ _rangesToClean.clear();
}
ScopedCollectionMetadata MetadataManager::getActiveMetadata() {
@@ -61,10 +72,14 @@ ScopedCollectionMetadata MetadataManager::getActiveMetadata() {
if (!_activeMetadataTracker) {
return ScopedCollectionMetadata();
}
-
return ScopedCollectionMetadata(this, _activeMetadataTracker.get());
}
+size_t MetadataManager::numberOfMetadataSnapshots() {
+ stdx::lock_guard<stdx::mutex> scopedLock(_managerLock);
+ return _metadataInUse.size();
+}
+
void MetadataManager::refreshActiveMetadata(std::unique_ptr<CollectionMetadata> remoteMetadata) {
stdx::lock_guard<stdx::mutex> scopedLock(_managerLock);
@@ -73,7 +88,7 @@ void MetadataManager::refreshActiveMetadata(std::unique_ptr<CollectionMetadata>
// collection sharding information regardless of whether the node is sharded or not.
if (!remoteMetadata && !_activeMetadataTracker->metadata) {
invariant(_receivingChunks.empty());
- invariant(_rangesToClean.empty());
+ invariant(_rangesToClean.isEmpty());
return;
}
@@ -99,22 +114,7 @@ void MetadataManager::refreshActiveMetadata(std::unique_ptr<CollectionMetadata>
<< remoteMetadata->toStringBasic();
invariant(_receivingChunks.empty());
- invariant(_rangesToClean.empty());
-
- _setActiveMetadata_inlock(std::move(remoteMetadata));
- return;
- }
-
- // If the metadata being installed has a different epoch from ours, this means the collection
- // was dropped and recreated, so we must entirely reset the metadata state
- if (_activeMetadataTracker->metadata->getCollVersion().epoch() !=
- remoteMetadata->getCollVersion().epoch()) {
- log() << "Overwriting metadata for collection " << _nss.ns() << " from "
- << _activeMetadataTracker->metadata->toStringBasic() << " to "
- << remoteMetadata->toStringBasic() << " due to epoch change";
-
- _receivingChunks.clear();
- _rangesToClean.clear();
+ invariant(_rangesToClean.isEmpty());
_setActiveMetadata_inlock(std::move(remoteMetadata));
return;
@@ -132,150 +132,66 @@ void MetadataManager::refreshActiveMetadata(std::unique_ptr<CollectionMetadata>
<< _activeMetadataTracker->metadata->toStringBasic() << " to "
<< remoteMetadata->toStringBasic();
- // Resolve any receiving chunks, which might have completed by now
+ // Resolve any receiving chunks, which might have completed by now.
+ // Should be no more than one.
for (auto it = _receivingChunks.begin(); it != _receivingChunks.end();) {
- const BSONObj min = it->first;
- const BSONObj max = it->second.getMaxKey();
-
- // Our pending range overlaps at least one chunk
- if (rangeMapContains(remoteMetadata->getChunks(), min, max)) {
- // The remote metadata contains a chunk we were earlier in the process of receiving, so
- // we deem it successfully received.
- LOG(2) << "Verified chunk " << redact(ChunkRange(min, max).toString())
- << " for collection " << _nss.ns() << " has been migrated to this shard earlier";
+ BSONObj const& min = it->first;
+ BSONObj const& max = it->second.getMaxKey();
- _receivingChunks.erase(it++);
- continue;
- } else if (!rangeMapOverlaps(remoteMetadata->getChunks(), min, max)) {
+ if (!remoteMetadata->rangeOverlapsChunk(ChunkRange(min, max))) {
++it;
continue;
}
+ // The remote metadata contains a chunk we were earlier in the process of receiving, so
+ // we deem it successfully received.
+ LOG(2) << "Verified chunk " << redact(ChunkRange(min, max).toString()) << " for collection "
+ << _nss.ns() << " has been migrated to this shard earlier";
- // Partial overlap indicates that the earlier migration has failed, but the chunk being
- // migrated underwent some splits and other migrations and ended up here again. In this
- // case, we will request full reload of the metadata. Currently this cannot happen, because
- // all migrations are with the explicit knowledge of the recipient shard. However, we leave
- // the option open so that chunk splits can do empty chunk move without having to notify the
- // recipient.
- RangeVector overlappedChunks;
- getRangeMapOverlap(remoteMetadata->getChunks(), min, max, &overlappedChunks);
-
- for (const auto& overlapChunkMin : overlappedChunks) {
- auto itRecv = _receivingChunks.find(overlapChunkMin.first);
- invariant(itRecv != _receivingChunks.end());
-
- const ChunkRange receivingRange(itRecv->first, itRecv->second.getMaxKey());
-
- _receivingChunks.erase(itRecv);
-
- // Make sure any potentially partially copied chunks are scheduled to be cleaned up
- _addRangeToClean_inlock(receivingRange);
- }
-
- // Need to reset the iterator
+ _receivingChunks.erase(it);
it = _receivingChunks.begin();
}
- // For compatibility with the current range deleter, which is driven entirely by the contents of
- // the CollectionMetadata update the pending chunks
- for (const auto& receivingChunk : _receivingChunks) {
- ChunkType chunk;
- chunk.setMin(receivingChunk.first);
- chunk.setMax(receivingChunk.second.getMaxKey());
- remoteMetadata = remoteMetadata->clonePlusPending(chunk);
- }
-
_setActiveMetadata_inlock(std::move(remoteMetadata));
}
-void MetadataManager::beginReceive(const ChunkRange& range) {
- stdx::lock_guard<stdx::mutex> scopedLock(_managerLock);
-
- // Collection is not known to be sharded if the active metadata tracker is null
- invariant(_activeMetadataTracker);
-
- // If range is contained within pending chunks, this means a previous migration must have failed
- // and we need to clean all overlaps
- RangeVector overlappedChunks;
- getRangeMapOverlap(_receivingChunks, range.getMin(), range.getMax(), &overlappedChunks);
-
- for (const auto& overlapChunkMin : overlappedChunks) {
- auto itRecv = _receivingChunks.find(overlapChunkMin.first);
- invariant(itRecv != _receivingChunks.end());
-
- const ChunkRange receivingRange(itRecv->first, itRecv->second.getMaxKey());
-
- _receivingChunks.erase(itRecv);
-
- // Make sure any potentially partially copied chunks are scheduled to be cleaned up
- _addRangeToClean_inlock(receivingRange);
- }
-
- // Need to ensure that the background range deleter task won't delete the range we are about to
- // receive
- _removeRangeToClean_inlock(range, Status::OK());
- _receivingChunks.insert(
- std::make_pair(range.getMin().getOwned(),
- CachedChunkInfo(range.getMax().getOwned(), ChunkVersion::IGNORED())));
-
- // For compatibility with the current range deleter, update the pending chunks on the collection
- // metadata to include the chunk being received
- ChunkType chunk;
- chunk.setMin(range.getMin());
- chunk.setMax(range.getMax());
- _setActiveMetadata_inlock(_activeMetadataTracker->metadata->clonePlusPending(chunk));
-}
-
-void MetadataManager::forgetReceive(const ChunkRange& range) {
- stdx::lock_guard<stdx::mutex> scopedLock(_managerLock);
-
- {
- auto it = _receivingChunks.find(range.getMin());
- invariant(it != _receivingChunks.end());
-
- // Verify entire ChunkRange is identical, not just the min key.
- invariant(
- SimpleBSONObjComparator::kInstance.evaluate(it->second.getMaxKey() == range.getMax()));
-
- _receivingChunks.erase(it);
- }
-
- // This is potentially a partially received data, which needs to be cleaned up
- _addRangeToClean_inlock(range);
-
- // For compatibility with the current range deleter, update the pending chunks on the collection
- // metadata to exclude the chunk being received, which was added in beginReceive
- ChunkType chunk;
- chunk.setMin(range.getMin());
- chunk.setMax(range.getMax());
- _setActiveMetadata_inlock(_activeMetadataTracker->metadata->cloneMinusPending(chunk));
-}
-
-RangeMap MetadataManager::getCopyOfReceivingChunks() {
- stdx::lock_guard<stdx::mutex> scopedLock(_managerLock);
- return _receivingChunks;
-}
-
void MetadataManager::_setActiveMetadata_inlock(std::unique_ptr<CollectionMetadata> newMetadata) {
- if (_activeMetadataTracker->usageCounter > 0) {
- _metadataInUse.push_front(std::move(_activeMetadataTracker));
+ if (_activeMetadataTracker->usageCounter != 0 || !_metadataInUse.empty()) {
+ _metadataInUse.push_back(std::move(_activeMetadataTracker));
}
-
_activeMetadataTracker = stdx::make_unique<CollectionMetadataTracker>(std::move(newMetadata));
}
-void MetadataManager::_removeMetadata_inlock(CollectionMetadataTracker* metadataTracker) {
- invariant(metadataTracker->usageCounter == 0);
-
- auto i = _metadataInUse.begin();
- const auto e = _metadataInUse.end();
- while (i != e) {
- if (metadataTracker == i->get()) {
- _metadataInUse.erase(i);
- return;
+// this is called only from ScopedCollectionMetadata members, unlocked.
+void MetadataManager::_decrementTrackerUsage(ScopedCollectionMetadata const& scoped) {
+ if (scoped._tracker != nullptr) {
+ stdx::lock_guard<stdx::mutex> lock(_managerLock);
+
+ invariant(scoped._tracker->usageCounter != 0);
+ if (--scoped._tracker->usageCounter == 0) {
+
+ // We don't care which usageCounter went to zero. We just expire all that are older
+ // than the oldest tracker still in use by queries. (Some start out at zero, some go to
+ // zero but can't be expired yet.)
+
+ bool notify = false;
+ while (!_metadataInUse.empty() && _metadataInUse.front()->usageCounter == 0) {
+ auto& tracker = _metadataInUse.front();
+ if (tracker->orphans) {
+ notify = true;
+ _pushRangeToClean(*tracker->orphans);
+ }
+ _metadataInUse.pop_front(); // Discard the tracker and its metadata.
+ }
+ if (_metadataInUse.empty() && _activeMetadataTracker->usageCounter == 0 &&
+ _activeMetadataTracker->orphans) {
+ _pushRangeToClean(*_activeMetadataTracker->orphans);
+ _activeMetadataTracker->orphans = boost::none;
+ notify = true;
+ }
+ if (notify) {
+ _notifyInUse(); // wake up waitForClean because we changed inUse
+ }
}
-
- ++i;
}
}
@@ -285,17 +201,18 @@ MetadataManager::CollectionMetadataTracker::CollectionMetadataTracker(
ScopedCollectionMetadata::ScopedCollectionMetadata() = default;
-// called in lock
+// called locked
ScopedCollectionMetadata::ScopedCollectionMetadata(
MetadataManager* manager, MetadataManager::CollectionMetadataTracker* tracker)
: _manager(manager), _tracker(tracker) {
- _tracker->usageCounter++;
+ ++_tracker->usageCounter;
}
+// do not call locked
ScopedCollectionMetadata::~ScopedCollectionMetadata() {
- if (!_tracker)
- return;
- _decrementUsageCounter();
+ if (_manager) {
+ _manager->_decrementTrackerUsage(*this);
+ }
}
CollectionMetadata* ScopedCollectionMetadata::operator->() const {
@@ -306,126 +223,44 @@ CollectionMetadata* ScopedCollectionMetadata::getMetadata() const {
return _tracker->metadata.get();
}
+// ScopedCollectionMetadata members
+
+// do not call locked
ScopedCollectionMetadata::ScopedCollectionMetadata(ScopedCollectionMetadata&& other) {
- *this = std::move(other);
+ *this = std::move(other); // Rely on _tracker being zero-initialized already.
}
+// do not call locked
ScopedCollectionMetadata& ScopedCollectionMetadata::operator=(ScopedCollectionMetadata&& other) {
if (this != &other) {
- // If "this" was previously initialized, make sure we perform the same logic as in the
- // destructor to decrement _tracker->usageCounter for the CollectionMetadata "this" had a
- // reference to before replacing _tracker with other._tracker.
- if (_tracker) {
- _decrementUsageCounter();
+ if (_manager) {
+ _manager->_decrementTrackerUsage(*this);
}
-
_manager = other._manager;
_tracker = other._tracker;
other._manager = nullptr;
other._tracker = nullptr;
}
-
return *this;
}
-void ScopedCollectionMetadata::_decrementUsageCounter() {
- invariant(_manager);
- invariant(_tracker);
- stdx::lock_guard<stdx::mutex> scopedLock(_manager->_managerLock);
- invariant(_tracker->usageCounter > 0);
- if (--_tracker->usageCounter == 0) {
- _manager->_removeMetadata_inlock(_tracker);
- }
-}
-
ScopedCollectionMetadata::operator bool() const {
return _tracker && _tracker->metadata.get();
}
-RangeMap MetadataManager::getCopyOfRangesToClean() {
- stdx::lock_guard<stdx::mutex> scopedLock(_managerLock);
- return _getCopyOfRangesToClean_inlock();
-}
-
-RangeMap MetadataManager::_getCopyOfRangesToClean_inlock() {
- RangeMap ranges = SimpleBSONObjComparator::kInstance.makeBSONObjIndexedMap<CachedChunkInfo>();
- for (auto it = _rangesToClean.begin(); it != _rangesToClean.end(); ++it) {
- ranges.insert(std::make_pair(
- it->first, CachedChunkInfo(it->second.getMax(), ChunkVersion::IGNORED())));
- }
- return ranges;
-}
-
-std::shared_ptr<Notification<Status>> MetadataManager::addRangeToClean(const ChunkRange& range) {
- stdx::lock_guard<stdx::mutex> scopedLock(_managerLock);
- return _addRangeToClean_inlock(range);
-}
-
-std::shared_ptr<Notification<Status>> MetadataManager::_addRangeToClean_inlock(
- const ChunkRange& range) {
- // This first invariant currently makes an unnecessary copy, to reuse the
- // rangeMapOverlaps helper function.
- invariant(!rangeMapOverlaps(_getCopyOfRangesToClean_inlock(), range.getMin(), range.getMax()));
- invariant(!rangeMapOverlaps(_receivingChunks, range.getMin(), range.getMax()));
-
- RangeToCleanDescriptor descriptor(range.getMax().getOwned());
- _rangesToClean.insert(std::make_pair(range.getMin().getOwned(), descriptor));
-
- // If _rangesToClean was previously empty, we need to start the collection range deleter
- if (_rangesToClean.size() == 1UL) {
- ShardingState::get(_serviceContext)->scheduleCleanup(_nss);
- }
-
- return descriptor.getNotification();
-}
-
-void MetadataManager::removeRangeToClean(const ChunkRange& range, Status deletionStatus) {
- stdx::lock_guard<stdx::mutex> scopedLock(_managerLock);
- _removeRangeToClean_inlock(range, deletionStatus);
-}
-
-void MetadataManager::_removeRangeToClean_inlock(const ChunkRange& range, Status deletionStatus) {
- auto it = _rangesToClean.upper_bound(range.getMin());
- // We want our iterator to point at the greatest value
- // that is still less than or equal to range.
- if (it != _rangesToClean.begin()) {
- --it;
- }
-
- for (; it != _rangesToClean.end() &&
- SimpleBSONObjComparator::kInstance.evaluate(it->first < range.getMax());) {
- if (SimpleBSONObjComparator::kInstance.evaluate(it->second.getMax() <= range.getMin())) {
- ++it;
- continue;
- }
-
- // There's overlap between *it and range so we remove *it
- // and then replace with new ranges.
- BSONObj oldMin = it->first;
- BSONObj oldMax = it->second.getMax();
- it->second.complete(deletionStatus);
- _rangesToClean.erase(it++);
- if (SimpleBSONObjComparator::kInstance.evaluate(oldMin < range.getMin())) {
- _addRangeToClean_inlock(ChunkRange(oldMin, range.getMin()));
- }
-
- if (SimpleBSONObjComparator::kInstance.evaluate(oldMax > range.getMax())) {
- _addRangeToClean_inlock(ChunkRange(range.getMax(), oldMax));
- }
+void MetadataManager::toBSONPending(BSONArrayBuilder& bb) const {
+ for (auto it = _receivingChunks.begin(); it != _receivingChunks.end(); ++it) {
+ BSONArrayBuilder pendingBB(bb.subarrayStart());
+ pendingBB.append(it->first);
+ pendingBB.append(it->second.getMaxKey());
+ pendingBB.done();
}
}
void MetadataManager::append(BSONObjBuilder* builder) {
stdx::lock_guard<stdx::mutex> scopedLock(_managerLock);
- BSONArrayBuilder rtcArr(builder->subarrayStart("rangesToClean"));
- for (const auto& entry : _rangesToClean) {
- BSONObjBuilder obj;
- ChunkRange r = ChunkRange(entry.first, entry.second.getMax());
- r.append(&obj);
- rtcArr.append(obj.done());
- }
- rtcArr.done();
+ _rangesToClean.append(builder);
BSONArrayBuilder pcArr(builder->subarrayStart("pendingChunks"));
for (const auto& entry : _receivingChunks) {
@@ -436,6 +271,7 @@ void MetadataManager::append(BSONObjBuilder* builder) {
}
pcArr.done();
+
BSONArrayBuilder amrArr(builder->subarrayStart("activeMetadataRanges"));
for (const auto& entry : _activeMetadataTracker->metadata->getChunks()) {
BSONObjBuilder obj;
@@ -446,23 +282,158 @@ void MetadataManager::append(BSONObjBuilder* builder) {
amrArr.done();
}
-bool MetadataManager::hasRangesToClean() {
- stdx::lock_guard<stdx::mutex> scopedLock(_managerLock);
- return !_rangesToClean.empty();
+void MetadataManager::_scheduleCleanup(executor::TaskExecutor* executor, NamespaceString nss) {
+ executor->scheduleWork([executor, nss](auto&) {
+ const int maxToDelete = std::max(int(internalQueryExecYieldIterations.load()), 1);
+ Client::initThreadIfNotAlready("Collection Range Deleter");
+ auto UniqueOpCtx = Client::getCurrent()->makeOperationContext();
+ auto opCtx = UniqueOpCtx.get();
+ bool again = false;
+ {
+ AutoGetCollection autoColl(opCtx, nss, MODE_IX);
+ auto* collection = autoColl.getCollection();
+ if (!collection) {
+ return; // collection was dropped
+ }
+ auto* css = CollectionShardingState::get(opCtx, nss);
+ again = css->cleanUpNextRange(opCtx, collection, maxToDelete);
+ }
+ if (again) {
+ _scheduleCleanup(executor, nss);
+ }
+ });
+}
+
+// the call to css->cleanUpNextRange, above, ends up back here, just to pass along.
+bool MetadataManager::cleanUpNextRange(OperationContext* opCtx,
+ Collection* collection,
+ BSONObj const& keyPattern,
+ int maxToDelete) {
+ return _rangesToClean.cleanUpNextRange(
+ opCtx, collection, keyPattern, &_managerLock, maxToDelete);
}
-bool MetadataManager::isInRangesToClean(const ChunkRange& range) {
+// call locked
+void MetadataManager::_pushRangeToClean(ChunkRange const& range) {
+ _rangesToClean.add(range);
+ if (_rangesToClean.size() == 1) {
+ _scheduleCleanup(_executor, _nss);
+ }
+}
+
+void MetadataManager::_addToReceiving(ChunkRange const& range) {
+ _receivingChunks.insert(
+ std::make_pair(range.getMin().getOwned(),
+ CachedChunkInfo(range.getMax().getOwned(), ChunkVersion::IGNORED())));
+}
+
+bool MetadataManager::beginReceive(ChunkRange const& range) {
+ stdx::unique_lock<stdx::mutex> scopedLock(_managerLock);
+
+ invariant(bool(_activeMetadataTracker));
+ CollectionMetadataTracker* tracker = _activeMetadataTracker.get();
+ if (_overlapsInUseChunk(range) ||
+ (tracker->usageCounter != 0 && tracker->metadata->rangeOverlapsChunk(range))) {
+ // a potentially long-running query might depend on documents in the range, give up.
+ return false;
+ }
+ _addToReceiving(range);
+ _pushRangeToClean(range);
+ return true;
+}
+
+void MetadataManager::_removeFromReceiving(ChunkRange const& range) {
+ auto it = _receivingChunks.find(range.getMin());
+ invariant(it != _receivingChunks.end());
+ _receivingChunks.erase(it);
+}
+
+void MetadataManager::forgetReceive(const ChunkRange& range) {
stdx::lock_guard<stdx::mutex> scopedLock(_managerLock);
- // For convenience, this line makes an unnecessary copy, to reuse the
- // rangeMapContains helper function.
- return rangeMapContains(_getCopyOfRangesToClean_inlock(), range.getMin(), range.getMax());
+ // This is potentially a partially received chunk, which needs to be cleaned up. We know none
+ // of these documents are in use, so they can go straight to the deletion queue.
+ _removeFromReceiving(range);
+ _pushRangeToClean(range);
+}
+
+void MetadataManager::cleanUpRange(ChunkRange const& range) {
+ stdx::unique_lock<stdx::mutex> scopedLock(_managerLock);
+ invariant(bool(_activeMetadataTracker));
+ CollectionMetadataTracker* tracker = _activeMetadataTracker.get();
+
+ if ((tracker->usageCounter == 0 || !tracker->metadata->rangeOverlapsChunk(range)) &&
+ !_overlapsInUseChunk(range)) {
+ // No running queries can depend on it, so queue it for deletion immediately.
+ _pushRangeToClean(range);
+ } else {
+ invariant(!tracker->orphans);
+ tracker->orphans.emplace(range.getMin().getOwned(), range.getMax().getOwned());
+ }
+}
+
+bool MetadataManager::keyIsPending(const BSONObj& key) const {
+ if (_receivingChunks.empty()) {
+ return false;
+ }
+ auto it = _receivingChunks.upper_bound(key);
+ if (it != _receivingChunks.begin())
+ it--;
+ return rangeContains(it->first, it->second.getMaxKey(), key);
}
-ChunkRange MetadataManager::getNextRangeToClean() {
+
+size_t MetadataManager::numberOfRangesToCleanStillInUse() {
stdx::lock_guard<stdx::mutex> scopedLock(_managerLock);
- invariant(!_rangesToClean.empty());
- auto it = _rangesToClean.begin();
- return ChunkRange(it->first, it->second.getMax());
+ size_t count = _activeMetadataTracker->orphans ? 1 : 0;
+ count += std::count_if(_metadataInUse.begin(), _metadataInUse.end(), [](auto& tracker) {
+ return bool(tracker->orphans);
+ });
+ return count;
+}
+
+size_t MetadataManager::numberOfRangesToClean() {
+ stdx::unique_lock<stdx::mutex> scopedLock(_managerLock);
+ return _rangesToClean.size();
+}
+
+MetadataManager::CleanupNotification MetadataManager::trackCleanup(ChunkRange const& range) {
+ stdx::unique_lock<stdx::mutex> scopedLock(_managerLock);
+ if (_overlapsInUseCleanups(range))
+ return _notification;
+ return _rangesToClean.overlaps(range);
+}
+
+// call locked
+bool MetadataManager::_overlapsInUseChunk(ChunkRange const& range) {
+ if (_activeMetadataTracker->usageCounter != 0 &&
+ _activeMetadataTracker->metadata->rangeOverlapsChunk(range)) {
+ return true;
+ }
+ for (auto& tracker : _metadataInUse) {
+ if (tracker->usageCounter != 0 && tracker->metadata->rangeOverlapsChunk(range)) {
+ return true;
+ }
+ }
+ return false;
+}
+
+// call locked
+bool MetadataManager::_overlapsInUseCleanups(ChunkRange const& range) {
+ if (_activeMetadataTracker->orphans && _activeMetadataTracker->orphans->overlapWith(range)) {
+ return true;
+ }
+ for (auto& tracker : _metadataInUse) {
+ if (tracker->orphans && bool(tracker->orphans->overlapWith(range))) {
+ return true;
+ }
+ }
+ return false;
+}
+
+// call locked
+void MetadataManager::_notifyInUse() {
+ _notification->set(Status::OK()); // wake up waitForClean
+ _notification = std::make_shared<Notification<Status>>();
}
} // namespace mongo
diff --git a/src/mongo/db/s/metadata_manager.h b/src/mongo/db/s/metadata_manager.h
index 5c2e7f2e64a..2311433aa49 100644
--- a/src/mongo/db/s/metadata_manager.h
+++ b/src/mongo/db/s/metadata_manager.h
@@ -28,19 +28,21 @@
#pragma once
-#include <list>
-#include <memory>
-
#include "mongo/base/disallow_copying.h"
#include "mongo/bson/simple_bsonobj_comparator.h"
#include "mongo/db/namespace_string.h"
+#include "mongo/db/range_arithmetic.h"
#include "mongo/db/s/collection_metadata.h"
+#include "mongo/db/s/collection_range_deleter.h"
#include "mongo/db/service_context.h"
+#include "mongo/executor/task_executor.h"
#include "mongo/s/catalog/type_chunk.h"
#include "mongo/util/concurrency/notification.h"
#include "mongo/stdx/memory.h"
+#include <list>
+
namespace mongo {
class ScopedCollectionMetadata;
@@ -49,7 +51,7 @@ class MetadataManager {
MONGO_DISALLOW_COPYING(MetadataManager);
public:
- MetadataManager(ServiceContext* sc, NamespaceString nss);
+ MetadataManager(ServiceContext*, NamespaceString nss, executor::TaskExecutor* rangeDeleter);
~MetadataManager();
/**
@@ -62,144 +64,170 @@ public:
ScopedCollectionMetadata getActiveMetadata();
/**
- * Uses the contents of the specified metadata as a way to purge any pending chunks.
+ * Returns the number of CollectionMetadata objects being maintained on behalf of running
+ * queries. The actual number may vary after it returns, so this is really only useful for unit
+ * tests.
*/
- void refreshActiveMetadata(std::unique_ptr<CollectionMetadata> newMetadata);
+ size_t numberOfMetadataSnapshots();
/**
- * Puts the specified range on the list of chunks, which are being received so that the range
- * deleter process will not clean the partially migrated data.
+ * Uses the contents of the specified metadata as a way to purge any pending chunks.
*/
- void beginReceive(const ChunkRange& range);
+ void refreshActiveMetadata(std::unique_ptr<CollectionMetadata> newMetadata);
+
+ void toBSONPending(BSONArrayBuilder& bb) const;
/**
- * Removes a range from the list of chunks, which are being received. Used externally to
- * indicate that a chunk migration failed.
+ * Appends information on all the chunk ranges in rangesToClean to builder.
*/
- void forgetReceive(const ChunkRange& range);
+ void append(BSONObjBuilder* builder);
/**
- * Gets copy of the set of chunk ranges which are being received for this collection. This
- * method is intended for testing purposes only and should not be used in any production code.
+ * Returns a map to the set of chunks being migrated in.
*/
- RangeMap getCopyOfReceivingChunks();
+ RangeMap const& getReceiveMap() const {
+ return _receivingChunks;
+ }
/**
- * Adds a new range to be cleaned up.
- * The newly introduced range must not overlap with the existing ranges.
- */
- std::shared_ptr<Notification<Status>> addRangeToClean(const ChunkRange& range);
+ * If no running queries can depend on documents in the range, schedules any such documents for
+ * immediate cleanup. Otherwise, returns false.
+ */
+ bool beginReceive(ChunkRange const& range);
/**
- * Calls removeRangeToClean with Status::OK.
+ * Removes the range from the pending list, and schedules any documents in the range for
+ * immediate cleanup. Assumes no active queries can see any local documents in the range.
*/
- void removeRangeToClean(const ChunkRange& range) {
- removeRangeToClean(range, Status::OK());
- }
+ void forgetReceive(const ChunkRange& range);
/**
- * Removes the specified range from the ranges to be cleaned up.
- * The specified deletionStatus will be returned to callers waiting
- * on whether the deletion succeeded or failed.
+ * Initiates cleanup of the orphaned documents as if a chunk has been migrated out. If any
+ * documents in the range might still be in use by running queries, queues cleanup to begin
+ * after they have all terminated. Otherwise, schedules documents for immediate cleanup.
+ *
+ * Must be called with the collection locked for writing. To monitor completion, use
+ * trackCleanup or CollectionShardingState::waitForClean.
*/
- void removeRangeToClean(const ChunkRange& range, Status deletionStatus);
+ void cleanUpRange(ChunkRange const& range);
/**
- * Gets copy of the set of chunk ranges which are scheduled for cleanup.
- * Converts RangeToCleanMap to RangeMap.
+ * Returns true if the specified key is in a range being received.
*/
- RangeMap getCopyOfRangesToClean();
+ bool keyIsPending(const BSONObj& key) const;
/**
- * Appends information on all the chunk ranges in rangesToClean to builder.
+ * Returns the number of ranges scheduled to be cleaned, exclusive of such ranges that might
+ * still be in use by running queries. Outside of test drivers, the actual number may vary
+ * after it returns, so this is really only useful for unit tests.
*/
- void append(BSONObjBuilder* builder);
+ size_t numberOfRangesToClean();
/**
- * Returns true if _rangesToClean is not empty.
+ * Returns the number of ranges scheduled to be cleaned once all queries that could depend on
+ * them have terminated. The actual number may vary after it returns, so this is really only
+ * useful for unit tests.
*/
- bool hasRangesToClean();
+ size_t numberOfRangesToCleanStillInUse();
+ using CleanupNotification = CollectionRangeDeleter::DeleteNotification;
/**
- * Returns true if the exact range is in _rangesToClean.
+ * Reports whether the argument range is still scheduled for deletion. If not, returns nullptr.
+ * Otherwise, returns a notification n such that n->get(opCtx) will wake when deletion of a
+ * range (possibly the one of interest) is completed.
*/
- bool isInRangesToClean(const ChunkRange& range);
+ CleanupNotification trackCleanup(ChunkRange const& orphans);
/**
- * Gets and returns, but does not remove, a single ChunkRange from _rangesToClean.
- * Should not be called if _rangesToClean is empty: it will hit an invariant.
+ * Called for the range deletion executor, actually deletes a scheduled range of documents
+ * immediately. Returns true if it should be scheduled to be called again.
*/
- ChunkRange getNextRangeToClean();
+ bool cleanUpNextRange(OperationContext* opCtx,
+ Collection* collection,
+ BSONObj const& keyPattern,
+ int maxToDelete);
private:
- friend class ScopedCollectionMetadata;
-
struct CollectionMetadataTracker {
- public:
/**
- * Creates a new CollectionMetadataTracker, with the usageCounter initialized to zero.
+ * Creates a new CollectionMetadataTracker with the usageCounter initialized to zero.
*/
CollectionMetadataTracker(std::unique_ptr<CollectionMetadata> m);
+ private:
std::unique_ptr<CollectionMetadata> metadata;
-
uint32_t usageCounter{0};
+ boost::optional<ChunkRange> orphans{boost::none};
+
+ friend class ScopedCollectionMetadata; // for access to usageCounter:
+ friend class MetadataManager;
};
- // Class for the value of the _rangesToClean map. Used because callers of addRangeToClean
- // sometimes need to wait until a range is deleted. Thus, complete(Status) is called
- // when the range is deleted from _rangesToClean in removeRangeToClean(), letting callers
- // of addRangeToClean know if the deletion succeeded or failed.
- class RangeToCleanDescriptor {
- public:
- /**
- * Initializes a RangeToCleanDescriptor with an empty notification.
- */
- RangeToCleanDescriptor(BSONObj max)
- : _max(max.getOwned()), _notification(std::make_shared<Notification<Status>>()) {}
+ /**
+ * Atomically decrements scoped.tracker->usageCount under our own mutex. At zero, retires any
+ * metadata that has fallen out of use, and pushes any orphan ranges found in them to the list
+ * of ranges actively being cleaned up. Calling with a null scoped.tracker is a no-op.
+ */
+ void _decrementTrackerUsage(ScopedCollectionMetadata const& scoped);
- /**
- * Gets the maximum value of the range to be deleted.
- */
- const BSONObj& getMax() const {
- return _max;
- }
+ /**
+ * Pushes current set of chunks, if any, to _metadataInUse, replaces it with newMetadata.
+ */
+ void _setActiveMetadata_inlock(std::unique_ptr<CollectionMetadata> newMetadata);
- // See comment on _notification.
- std::shared_ptr<Notification<Status>> getNotification() {
- return _notification;
- }
+ /**
+ * Returns true if the specified range overlaps any chunk that might be currently in use by a
+ * running query.
+ *
+ * must be called locked.
+ */
- /**
- * Sets the status on _notification. This will tell threads
- * waiting on the value of status that the deletion succeeded or failed.
- */
- void complete(Status status) {
- _notification->set(status);
- }
+ bool _overlapsInUseChunk(ChunkRange const& range);
- private:
- // The maximum value of the range to be deleted.
- BSONObj _max;
+ /**
+ * Returns true if any range (possibly) still in use, but scheduled for cleanup, overlaps
+ * the argument range.
+ *
+ * Must be called locked.
+ */
+ bool _overlapsInUseCleanups(ChunkRange const& range);
- // This _notification will be set with a value indicating whether the deletion
- // succeeded or failed.
- std::shared_ptr<Notification<Status>> _notification;
- };
+ /**
+ * Deletes ranges, in background, until done, normally using a task executor attached to the
+ * ShardingState.
+ *
+ * Each time it completes cleaning up a range, it wakes up clients waiting on completion of
+ * that range, which may then verify their range has no more deletions scheduled, and proceed.
+ */
+ static void _scheduleCleanup(executor::TaskExecutor*, NamespaceString nss);
/**
- * Removes the CollectionMetadata stored in the tracker from the _metadataInUse
- * list (if it's there).
+ * Adds the range to the list of ranges scheduled for immediate deletion, and schedules a
+ * a background task to perform the work.
+ *
+ * Must be called locked.
*/
- void _removeMetadata_inlock(CollectionMetadataTracker* metadataTracker);
+ void _pushRangeToClean(ChunkRange const& range);
- std::shared_ptr<Notification<Status>> _addRangeToClean_inlock(const ChunkRange& range);
+ /**
+ * Adds a range from the receiving map, so getNextOrphanRange will skip ranges migrating in.
+ */
+ void _addToReceiving(ChunkRange const& range);
- void _removeRangeToClean_inlock(const ChunkRange& range, Status deletionStatus);
+ /**
+ * Removes a range from the receiving map after a migration failure. range.minKey() must
+ * exactly match an element of _receivingChunks.
+ */
+ void _removeFromReceiving(ChunkRange const& range);
- RangeMap _getCopyOfRangesToClean_inlock();
+ /**
+ * Wakes up any clients waiting on a range to leave _metadataInUse
+ *
+ * Must be called locked.
+ */
+ void _notifyInUse();
- void _setActiveMetadata_inlock(std::unique_ptr<CollectionMetadata> newMetadata);
+ // data members
const NamespaceString _nss;
@@ -209,20 +237,28 @@ private:
// Mutex to protect the state below
stdx::mutex _managerLock;
- // Holds the collection metadata, which is currently active
+ // The currently active collection metadata
std::unique_ptr<CollectionMetadataTracker> _activeMetadataTracker;
- // Holds collection metadata instances, which have previously been active, but are still in use
- // by still active server operations or cursors
+ // Previously active collection metadata instances still in use by active server operations or
+ // cursors
std::list<std::unique_ptr<CollectionMetadataTracker>> _metadataInUse;
// Chunk ranges which are currently assumed to be transferred to the shard. Indexed by the min
// key of the range.
RangeMap _receivingChunks;
- // Set of ranges to be deleted. Indexed by the min key of the range.
- typedef BSONObjIndexedMap<RangeToCleanDescriptor> RangeToCleanMap;
- RangeToCleanMap _rangesToClean;
+ // Clients can sleep on copies of _notification while waiting for their orphan ranges to fall
+ // out of use.
+ std::shared_ptr<Notification<Status>> _notification;
+
+ // The background task that deletes documents from orphaned chunk ranges.
+ executor::TaskExecutor* _executor;
+
+ // Ranges being deleted, or scheduled to be deleted, by a background task
+ CollectionRangeDeleter _rangesToClean;
+
+ friend class ScopedCollectionMetadata; // Its op=() and dtor call _decrementTrackerUsage().
};
class ScopedCollectionMetadata {
@@ -235,13 +271,15 @@ public:
*/
ScopedCollectionMetadata();
+ // Must be called with owning MetadataManager unlocked
~ScopedCollectionMetadata();
+ // must be called with owning MetadataManager unlocked
ScopedCollectionMetadata(ScopedCollectionMetadata&& other);
ScopedCollectionMetadata& operator=(ScopedCollectionMetadata&& other);
/**
- * Dereferencing the ScopedCollectionMetadata will dereference the internal CollectionMetadata.
+ * Dereferencing the ScopedCollectionMetadata dereferences the internal CollectionMetadata.
*/
CollectionMetadata* operator->() const;
CollectionMetadata* getMetadata() const;
@@ -252,22 +290,19 @@ public:
operator bool() const;
private:
- friend ScopedCollectionMetadata MetadataManager::getActiveMetadata();
-
/**
- * Increments the counter in the CollectionMetadataTracker.
+ * Increments the refcount in the specified tracker.
+ *
+ * Must be called with specified *manager locked.
*/
ScopedCollectionMetadata(MetadataManager* manager,
MetadataManager::CollectionMetadataTracker* tracker);
- /**
- * Decrements the usageCounter and conditionally makes a call to _removeMetadata on
- * the tracker if the count has reached zero.
- */
- void _decrementUsageCounter();
-
MetadataManager* _manager{nullptr};
MetadataManager::CollectionMetadataTracker* _tracker{nullptr};
+
+ friend ScopedCollectionMetadata MetadataManager::getActiveMetadata(); // uses our private ctor
+ friend void MetadataManager::_decrementTrackerUsage(ScopedCollectionMetadata const&);
};
} // namespace mongo
diff --git a/src/mongo/db/s/metadata_manager_test.cpp b/src/mongo/db/s/metadata_manager_test.cpp
index d4416cbfbea..20cdb8e052d 100644
--- a/src/mongo/db/s/metadata_manager_test.cpp
+++ b/src/mongo/db/s/metadata_manager_test.cpp
@@ -31,29 +31,74 @@
#include "mongo/db/s/metadata_manager.h"
#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/client/remote_command_targeter_mock.h"
#include "mongo/db/client.h"
+#include "mongo/db/db_raii.h"
+#include "mongo/db/dbdirectclient.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/namespace_string.h"
+#include "mongo/db/repl/replication_coordinator_mock.h"
#include "mongo/db/s/collection_metadata.h"
+#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/s/sharding_state.h"
+#include "mongo/db/s/type_shard_identity.h"
+#include "mongo/db/server_options.h"
#include "mongo/db/service_context.h"
#include "mongo/db/service_context_d_test_fixture.h"
+#include "mongo/executor/task_executor.h"
+#include "mongo/s/catalog/dist_lock_catalog_impl.h"
+#include "mongo/s/catalog/dist_lock_manager_mock.h"
+#include "mongo/s/catalog/sharding_catalog_client_mock.h"
#include "mongo/s/catalog/type_chunk.h"
+#include "mongo/s/client/shard_registry.h"
+#include "mongo/s/sharding_mongod_test_fixture.h"
+#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/memory.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/assert_util.h"
+
+#include <boost/optional.hpp>
+
namespace mongo {
namespace {
using unittest::assertGet;
-class MetadataManagerTest : public ServiceContextMongoDTest {
+const NamespaceString kNss("TestDB", "TestColl");
+const std::string kPattern = "X";
+const BSONObj kShardKeyPattern{BSON(kPattern << 1)};
+const std::string kShardName{"a"};
+const HostAndPort dummyHost("dummy", 123);
+
+class MetadataManagerTest : public ShardingMongodTestFixture {
+public:
+ std::shared_ptr<RemoteCommandTargeterMock> configTargeter() {
+ return RemoteCommandTargeterMock::get(shardRegistry()->getConfigShard()->getTargeter());
+ }
+
protected:
void setUp() override {
- ServiceContextMongoDTest::setUp();
- ShardingState::get(getServiceContext())
- ->setScheduleCleanupFunctionForTest([](const NamespaceString& nss) {});
+ ShardingMongodTestFixture::setUp();
+ serverGlobalParams.clusterRole = ClusterRole::ShardServer;
+ initializeGlobalShardingStateForMongodForTest(ConnectionString(dummyHost));
+
+ configTargeter()->setFindHostReturnValue(dummyHost);
+ }
+
+ std::unique_ptr<DistLockCatalog> makeDistLockCatalog(ShardRegistry* shardRegistry) override {
+ invariant(shardRegistry);
+ return stdx::make_unique<DistLockCatalogImpl>(shardRegistry);
+ }
+
+ std::unique_ptr<DistLockManager> makeDistLockManager(
+ std::unique_ptr<DistLockCatalog> distLockCatalog) override {
+ return stdx::make_unique<DistLockManagerMock>(std::move(distLockCatalog));
+ }
+
+ std::unique_ptr<ShardingCatalogClient> makeShardingCatalogClient(
+ std::unique_ptr<DistLockManager> distLockManager) override {
+ return stdx::make_unique<ShardingCatalogClientMock>(std::move(distLockManager));
}
static std::unique_ptr<CollectionMetadata> makeEmptyMetadata() {
@@ -92,10 +137,23 @@ protected:
return stdx::make_unique<CollectionMetadata>(
metadata.getKeyPattern(), chunkVersion, chunkVersion, std::move(chunksMap));
}
+
+ CollectionMetadata* addChunk(MetadataManager* manager) {
+ ScopedCollectionMetadata scopedMetadata1 = manager->getActiveMetadata();
+
+ ChunkVersion newVersion = scopedMetadata1->getCollVersion();
+ newVersion.incMajor();
+ std::unique_ptr<CollectionMetadata> cm2 = cloneMetadataPlusChunk(
+ *scopedMetadata1.getMetadata(), BSON("key" << 0), BSON("key" << 20), newVersion);
+ auto cm2Ptr = cm2.get();
+
+ manager->refreshActiveMetadata(std::move(cm2));
+ return cm2Ptr;
+ }
};
TEST_F(MetadataManagerTest, SetAndGetActiveMetadata) {
- MetadataManager manager(getServiceContext(), NamespaceString("TestDb", "CollDB"));
+ MetadataManager manager(getServiceContext(), kNss, executor());
std::unique_ptr<CollectionMetadata> cm = makeEmptyMetadata();
auto cmPtr = cm.get();
@@ -107,197 +165,74 @@ TEST_F(MetadataManagerTest, SetAndGetActiveMetadata) {
TEST_F(MetadataManagerTest, ResetActiveMetadata) {
- MetadataManager manager(getServiceContext(), NamespaceString("TestDb", "CollDB"));
+ MetadataManager manager(getServiceContext(), kNss, executor());
manager.refreshActiveMetadata(makeEmptyMetadata());
-
- ScopedCollectionMetadata scopedMetadata1 = manager.getActiveMetadata();
-
- ChunkVersion newVersion = scopedMetadata1->getCollVersion();
- newVersion.incMajor();
- std::unique_ptr<CollectionMetadata> cm2 = cloneMetadataPlusChunk(
- *scopedMetadata1.getMetadata(), BSON("key" << 0), BSON("key" << 10), newVersion);
- auto cm2Ptr = cm2.get();
-
- manager.refreshActiveMetadata(std::move(cm2));
+ auto cm2Ptr = addChunk(&manager);
ScopedCollectionMetadata scopedMetadata2 = manager.getActiveMetadata();
-
ASSERT_EQ(cm2Ptr, scopedMetadata2.getMetadata());
};
-TEST_F(MetadataManagerTest, AddAndRemoveRangesToClean) {
- MetadataManager manager(getServiceContext(), NamespaceString("TestDb", "CollDB"));
- ChunkRange cr1 = ChunkRange(BSON("key" << 0), BSON("key" << 10));
- ChunkRange cr2 = ChunkRange(BSON("key" << 10), BSON("key" << 20));
-
- manager.addRangeToClean(cr1);
- ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 1UL);
- manager.removeRangeToClean(cr1);
- ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 0UL);
-
- manager.addRangeToClean(cr1);
- manager.addRangeToClean(cr2);
- manager.removeRangeToClean(cr1);
- ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 1UL);
- auto ranges = manager.getCopyOfRangesToClean();
- auto it = ranges.find(cr2.getMin());
- ChunkRange remainingChunk = ChunkRange(it->first, it->second.getMaxKey());
- ASSERT_EQ(remainingChunk.toString(), cr2.toString());
- manager.removeRangeToClean(cr2);
-}
-
-// Tests that a removal in the middle of an existing ChunkRange results in
-// two correct chunk ranges.
-TEST_F(MetadataManagerTest, RemoveRangeInMiddleOfRange) {
- MetadataManager manager(getServiceContext(), NamespaceString("TestDb", "CollDB"));
- ChunkRange cr1 = ChunkRange(BSON("key" << 0), BSON("key" << 10));
+// In the following tests, the ranges-to-clean is not drained by the background deleter thread
+// because the collection involved has no CollectionShardingState, so the task just returns without
+// doing anything.
- manager.addRangeToClean(cr1);
- manager.removeRangeToClean(ChunkRange(BSON("key" << 4), BSON("key" << 6)));
- ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 2UL);
+TEST_F(MetadataManagerTest, CleanUpForMigrateIn) {
+ MetadataManager manager(getServiceContext(), kNss, executor());
- auto ranges = manager.getCopyOfRangesToClean();
- auto it = ranges.find(BSON("key" << 0));
- ChunkRange expectedChunk = ChunkRange(BSON("key" << 0), BSON("key" << 4));
- ChunkRange remainingChunk = ChunkRange(it->first, it->second.getMaxKey());
- ASSERT_EQ(remainingChunk.toString(), expectedChunk.toString());
-
- it++;
- expectedChunk = ChunkRange(BSON("key" << 6), BSON("key" << 10));
- remainingChunk = ChunkRange(it->first, it->second.getMaxKey());
- ASSERT_EQ(remainingChunk.toString(), expectedChunk.toString());
-
- manager.removeRangeToClean(cr1);
- ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 0UL);
-}
-
-// Tests removals that overlap with just one ChunkRange.
-TEST_F(MetadataManagerTest, RemoveRangeWithSingleRangeOverlap) {
- MetadataManager manager(getServiceContext(), NamespaceString("TestDb", "CollDB"));
- ChunkRange cr1 = ChunkRange(BSON("key" << 0), BSON("key" << 10));
-
- manager.addRangeToClean(cr1);
- manager.removeRangeToClean(ChunkRange(BSON("key" << 0), BSON("key" << 5)));
- ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 1UL);
- auto ranges = manager.getCopyOfRangesToClean();
- auto it = ranges.find(BSON("key" << 5));
- ChunkRange remainingChunk = ChunkRange(it->first, it->second.getMaxKey());
- ChunkRange expectedChunk = ChunkRange(BSON("key" << 5), BSON("key" << 10));
- ASSERT_EQ(remainingChunk.toString(), expectedChunk.toString());
-
- manager.removeRangeToClean(ChunkRange(BSON("key" << 4), BSON("key" << 6)));
- ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 1UL);
- ranges = manager.getCopyOfRangesToClean();
- it = ranges.find(BSON("key" << 6));
- remainingChunk = ChunkRange(it->first, it->second.getMaxKey());
- expectedChunk = ChunkRange(BSON("key" << 6), BSON("key" << 10));
- ASSERT_EQ(remainingChunk.toString(), expectedChunk.toString());
-
- manager.removeRangeToClean(ChunkRange(BSON("key" << 9), BSON("key" << 13)));
- ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 1UL);
- ranges = manager.getCopyOfRangesToClean();
- it = ranges.find(BSON("key" << 6));
- remainingChunk = ChunkRange(it->first, it->second.getMaxKey());
- expectedChunk = ChunkRange(BSON("key" << 6), BSON("key" << 9));
- ASSERT_EQ(remainingChunk.toString(), expectedChunk.toString());
-
- manager.removeRangeToClean(ChunkRange(BSON("key" << 0), BSON("key" << 10)));
- ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 0UL);
+ ChunkRange range2(BSON("key" << 10), BSON("key" << 20));
+ manager.cleanUpForMigrateIn(ChunkRange(BSON("key" << 0), BSON("key" << 10)));
+ manager.cleanUpForMigrateIn(ChunkRange(BSON("key" << 10), BSON("key" << 20)));
+ ASSERT_EQ(manager.numberOfRangesToClean(), 2UL);
+ ASSERT_EQ(manager.numberOfRangesToCleanStillInUse(), 0UL);
}
-// Tests removals that overlap with more than one ChunkRange.
-TEST_F(MetadataManagerTest, RemoveRangeWithMultipleRangeOverlaps) {
- MetadataManager manager(getServiceContext(), NamespaceString("TestDb", "CollDB"));
- ChunkRange cr1 = ChunkRange(BSON("key" << 0), BSON("key" << 10));
- ChunkRange cr2 = ChunkRange(BSON("key" << 10), BSON("key" << 20));
- ChunkRange cr3 = ChunkRange(BSON("key" << 20), BSON("key" << 30));
-
- manager.addRangeToClean(cr1);
- manager.addRangeToClean(cr2);
- manager.addRangeToClean(cr3);
- ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 3UL);
-
- manager.removeRangeToClean(ChunkRange(BSON("key" << 8), BSON("key" << 22)));
- ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 2UL);
- auto ranges = manager.getCopyOfRangesToClean();
- auto it = ranges.find(BSON("key" << 0));
- ChunkRange remainingChunk = ChunkRange(it->first, it->second.getMaxKey());
- ChunkRange expectedChunk = ChunkRange(BSON("key" << 0), BSON("key" << 8));
- ASSERT_EQ(remainingChunk.toString(), expectedChunk.toString());
- it++;
- remainingChunk = ChunkRange(it->first, it->second.getMaxKey());
- expectedChunk = ChunkRange(BSON("key" << 22), BSON("key" << 30));
- ASSERT_EQ(remainingChunk.toString(), expectedChunk.toString());
-
- manager.removeRangeToClean(ChunkRange(BSON("key" << 0), BSON("key" << 30)));
- ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 0UL);
-}
-
-TEST_F(MetadataManagerTest, AddAndRemoveRangeNotificationsBlockAndYield) {
- MetadataManager manager(getServiceContext(), NamespaceString("TestDb", "CollDB"));
- manager.refreshActiveMetadata(makeEmptyMetadata());
+TEST_F(MetadataManagerTest, AddRangeNotificationsBlockAndYield) {
+ MetadataManager manager(getServiceContext(), kNss, executor());
ChunkRange cr1(BSON("key" << 0), BSON("key" << 10));
- auto notification = manager.addRangeToClean(cr1);
- manager.removeRangeToClean(cr1, Status::OK());
- ASSERT_OK(notification->get());
- ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 0UL);
-}
-
-TEST_F(MetadataManagerTest, RemoveRangeToCleanCorrectlySetsBadStatus) {
- MetadataManager manager(getServiceContext(), NamespaceString("TestDb", "CollDB"));
- manager.refreshActiveMetadata(makeEmptyMetadata());
-
- ChunkRange cr1(BSON("key" << 0), BSON("key" << 10));
- auto notification = manager.addRangeToClean(cr1);
- manager.removeRangeToClean(cr1, Status(ErrorCodes::InternalError, "test error"));
- ASSERT_NOT_OK(notification->get());
- ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 0UL);
-}
-
-TEST_F(MetadataManagerTest, RemovingSubrangeStillSetsNotificationStatus) {
- MetadataManager manager(getServiceContext(), NamespaceString("TestDb", "CollDB"));
- manager.refreshActiveMetadata(makeEmptyMetadata());
-
- ChunkRange cr1(BSON("key" << 0), BSON("key" << 10));
- auto notification = manager.addRangeToClean(cr1);
- manager.removeRangeToClean(ChunkRange(BSON("key" << 3), BSON("key" << 7)));
- ASSERT_OK(notification->get());
- ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 2UL);
- manager.removeRangeToClean(cr1);
- ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 0UL);
-
- notification = manager.addRangeToClean(cr1);
- manager.removeRangeToClean(ChunkRange(BSON("key" << 7), BSON("key" << 15)));
- ASSERT_OK(notification->get());
- ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 1UL);
- manager.removeRangeToClean(cr1);
- ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 0UL);
+ manager.cleanUpAfterMigrateOut(cr1);
+ ASSERT_EQ(manager.numberOfRangesToClean(), 1UL);
+ auto notification = manager.trackCleanup(cr1);
+ ASSERT(notification != nullptr && !bool(*notification));
+ notification->set(Status::OK());
+ ASSERT(bool(*notification));
+ ASSERT_OK(notification->get(operationContext()));
}
TEST_F(MetadataManagerTest, NotificationBlocksUntilDeletion) {
- MetadataManager manager(getServiceContext(), NamespaceString("TestDb", "CollDB"));
- manager.refreshActiveMetadata(makeEmptyMetadata());
-
ChunkRange cr1(BSON("key" << 0), BSON("key" << 10));
- auto notification = manager.addRangeToClean(cr1);
- auto opCtx = cc().makeOperationContext();
- // Once the new range deleter is set up, this might fail if the range deleter
- // deleted cr1 before we got here...
- ASSERT_FALSE(notification->waitFor(opCtx.get(), Milliseconds(0)));
-
- manager.removeRangeToClean(cr1);
- ASSERT_TRUE(notification->waitFor(opCtx.get(), Milliseconds(0)));
- ASSERT_OK(notification->get());
+ MetadataManager manager(getServiceContext(), kNss, executor());
+ manager.refreshActiveMetadata(makeEmptyMetadata());
+ auto notif = manager.trackCleanup(cr1);
+ ASSERT(notif.get() == nullptr);
+ {
+ ASSERT_EQ(manager.numberOfMetadataSnapshots(), 0UL);
+ ASSERT_EQ(manager.numberOfRangesToClean(), 0UL);
+
+ auto scm = manager.getActiveMetadata(); // and increment scm's refcount
+ ASSERT(bool(scm));
+ addChunk(&manager); // push new metadata
+
+ ASSERT_EQ(manager.numberOfMetadataSnapshots(), 1UL);
+ ASSERT_EQ(manager.numberOfRangesToClean(), 0UL); // not yet...
+
+ manager.cleanUpAfterMigrateOut(cr1);
+ ASSERT_EQ(manager.numberOfMetadataSnapshots(), 1UL);
+ ASSERT_EQ(manager.numberOfRangesToClean(), 1UL);
+
+ notif = manager.trackCleanup(cr1); // will wake when scm goes away
+ } // scm destroyed, refcount of tracker goes to zero
+ ASSERT_EQ(manager.numberOfMetadataSnapshots(), 0UL);
+ ASSERT_EQ(manager.numberOfRangesToClean(), 1UL);
+ ASSERT(bool(notif)); // woke
+ notif = manager.trackCleanup(cr1); // now tracking the range in _rangesToClean
+ ASSERT(notif.get() != nullptr);
}
-
TEST_F(MetadataManagerTest, RefreshAfterSuccessfulMigrationSinglePending) {
- MetadataManager manager(getServiceContext(), NamespaceString("TestDb", "CollDB"));
+ MetadataManager manager(getServiceContext(), kNss, executor());
manager.refreshActiveMetadata(makeEmptyMetadata());
-
const ChunkRange cr1(BSON("key" << 0), BSON("key" << 10));
- manager.beginReceive(cr1);
- ASSERT_EQ(manager.getCopyOfReceivingChunks().size(), 1UL);
ASSERT_EQ(manager.getActiveMetadata()->getChunks().size(), 0UL);
ChunkVersion version = manager.getActiveMetadata()->getCollVersion();
@@ -305,21 +240,16 @@ TEST_F(MetadataManagerTest, RefreshAfterSuccessfulMigrationSinglePending) {
manager.refreshActiveMetadata(cloneMetadataPlusChunk(
*manager.getActiveMetadata().getMetadata(), cr1.getMin(), cr1.getMax(), version));
- ASSERT_EQ(manager.getCopyOfReceivingChunks().size(), 0UL);
ASSERT_EQ(manager.getActiveMetadata()->getChunks().size(), 1UL);
}
+
TEST_F(MetadataManagerTest, RefreshAfterSuccessfulMigrationMultiplePending) {
- MetadataManager manager(getServiceContext(), NamespaceString("TestDb", "CollDB"));
+ MetadataManager manager(getServiceContext(), kNss, executor());
manager.refreshActiveMetadata(makeEmptyMetadata());
const ChunkRange cr1(BSON("key" << 0), BSON("key" << 10));
- manager.beginReceive(cr1);
-
const ChunkRange cr2(BSON("key" << 30), BSON("key" << 40));
- manager.beginReceive(cr2);
-
- ASSERT_EQ(manager.getCopyOfReceivingChunks().size(), 2UL);
ASSERT_EQ(manager.getActiveMetadata()->getChunks().size(), 0UL);
{
@@ -328,7 +258,7 @@ TEST_F(MetadataManagerTest, RefreshAfterSuccessfulMigrationMultiplePending) {
manager.refreshActiveMetadata(cloneMetadataPlusChunk(
*manager.getActiveMetadata().getMetadata(), cr1.getMin(), cr1.getMax(), version));
- ASSERT_EQ(manager.getCopyOfReceivingChunks().size(), 1UL);
+ ASSERT_EQ(manager.numberOfRangesToClean(), 0UL);
ASSERT_EQ(manager.getActiveMetadata()->getChunks().size(), 1UL);
}
@@ -338,22 +268,16 @@ TEST_F(MetadataManagerTest, RefreshAfterSuccessfulMigrationMultiplePending) {
manager.refreshActiveMetadata(cloneMetadataPlusChunk(
*manager.getActiveMetadata().getMetadata(), cr2.getMin(), cr2.getMax(), version));
- ASSERT_EQ(manager.getCopyOfReceivingChunks().size(), 0UL);
ASSERT_EQ(manager.getActiveMetadata()->getChunks().size(), 2UL);
}
}
TEST_F(MetadataManagerTest, RefreshAfterNotYetCompletedMigrationMultiplePending) {
- MetadataManager manager(getServiceContext(), NamespaceString("TestDb", "CollDB"));
+ MetadataManager manager(getServiceContext(), kNss, executor());
manager.refreshActiveMetadata(makeEmptyMetadata());
const ChunkRange cr1(BSON("key" << 0), BSON("key" << 10));
- manager.beginReceive(cr1);
-
const ChunkRange cr2(BSON("key" << 30), BSON("key" << 40));
- manager.beginReceive(cr2);
-
- ASSERT_EQ(manager.getCopyOfReceivingChunks().size(), 2UL);
ASSERT_EQ(manager.getActiveMetadata()->getChunks().size(), 0UL);
ChunkVersion version = manager.getActiveMetadata()->getCollVersion();
@@ -361,35 +285,22 @@ TEST_F(MetadataManagerTest, RefreshAfterNotYetCompletedMigrationMultiplePending)
manager.refreshActiveMetadata(cloneMetadataPlusChunk(
*manager.getActiveMetadata().getMetadata(), BSON("key" << 50), BSON("key" << 60), version));
- ASSERT_EQ(manager.getCopyOfReceivingChunks().size(), 2UL);
ASSERT_EQ(manager.getActiveMetadata()->getChunks().size(), 1UL);
}
TEST_F(MetadataManagerTest, BeginReceiveWithOverlappingRange) {
- MetadataManager manager(getServiceContext(), NamespaceString("TestDb", "CollDB"));
+ MetadataManager manager(getServiceContext(), kNss, executor());
manager.refreshActiveMetadata(makeEmptyMetadata());
const ChunkRange cr1(BSON("key" << 0), BSON("key" << 10));
- manager.beginReceive(cr1);
-
const ChunkRange cr2(BSON("key" << 30), BSON("key" << 40));
- manager.beginReceive(cr2);
-
const ChunkRange crOverlap(BSON("key" << 5), BSON("key" << 35));
- manager.beginReceive(crOverlap);
- const auto copyOfPending = manager.getCopyOfReceivingChunks();
-
- ASSERT_EQ(copyOfPending.size(), 1UL);
ASSERT_EQ(manager.getActiveMetadata()->getChunks().size(), 0UL);
-
- const auto it = copyOfPending.find(BSON("key" << 5));
- ASSERT(it != copyOfPending.end());
- ASSERT_BSONOBJ_EQ(it->second.getMaxKey(), BSON("key" << 35));
}
TEST_F(MetadataManagerTest, RefreshMetadataAfterDropAndRecreate) {
- MetadataManager manager(getServiceContext(), NamespaceString("TestDb", "CollDB"));
+ MetadataManager manager(getServiceContext(), kNss, executor());
manager.refreshActiveMetadata(makeEmptyMetadata());
{
@@ -418,28 +329,15 @@ TEST_F(MetadataManagerTest, RefreshMetadataAfterDropAndRecreate) {
// Tests membership functions for _rangesToClean
TEST_F(MetadataManagerTest, RangesToCleanMembership) {
- MetadataManager manager(getServiceContext(), NamespaceString("TestDb", "CollDB"));
+ MetadataManager manager(getServiceContext(), kNss, executor());
manager.refreshActiveMetadata(makeEmptyMetadata());
- ASSERT(!manager.hasRangesToClean());
-
- ChunkRange cr1 = ChunkRange(BSON("key" << 0), BSON("key" << 10));
- manager.addRangeToClean(cr1);
-
- ASSERT(manager.hasRangesToClean());
- ASSERT(manager.isInRangesToClean(cr1));
-}
-
-// Tests that getNextRangeToClean successfully pulls a stored ChunkRange
-TEST_F(MetadataManagerTest, GetNextRangeToClean) {
- MetadataManager manager(getServiceContext(), NamespaceString("TestDb", "CollDB"));
- manager.refreshActiveMetadata(makeEmptyMetadata());
+ ASSERT(manager.numberOfRangesToClean() == 0UL);
ChunkRange cr1 = ChunkRange(BSON("key" << 0), BSON("key" << 10));
- manager.addRangeToClean(cr1);
+ manager.cleanUpAfterMigrateOut(cr1);
- ChunkRange cr2 = manager.getNextRangeToClean();
- ASSERT_EQ(cr1.toString(), cr2.toString());
+ ASSERT(manager.numberOfRangesToClean() == 1UL);
}
} // namespace
diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp
index 2f9355dcdd4..55543cca712 100644
--- a/src/mongo/db/s/migration_destination_manager.cpp
+++ b/src/mongo/db/s/migration_destination_manager.cpp
@@ -51,6 +51,7 @@
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/repl/replication_coordinator_global.h"
#include "mongo/db/s/collection_metadata.h"
+#include "mongo/db/s/collection_range_deleter.h"
#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/s/migration_util.h"
#include "mongo/db/s/move_timing_helper.h"
@@ -59,6 +60,7 @@
#include "mongo/s/catalog/type_chunk.h"
#include "mongo/s/shard_key_pattern.h"
#include "mongo/stdx/chrono.h"
+#include "mongo/util/concurrency/notification.h"
#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
#include "mongo/util/mongoutils/str.h"
@@ -421,11 +423,7 @@ void MigrationDestinationManager::_migrateThread(BSONObj min,
}
if (getState() != DONE) {
- // Unprotect the range if needed/possible on unsuccessful TO migration
- Status status = _forgetPending(opCtx.get(), _nss, min, max, epoch);
- if (!status.isOK()) {
- warning() << "Failed to remove pending range" << redact(causedBy(status));
- }
+ _forgetReceive(opCtx.get(), _nss, min, max);
}
stdx::lock_guard<stdx::mutex> lk(_mutex);
@@ -602,27 +600,10 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx,
}
{
- // 2. Synchronously delete any data which might have been left orphaned in range
- // being moved
-
- RangeDeleterOptions deleterOptions(
- KeyRange(_nss.ns(), min.getOwned(), max.getOwned(), shardKeyPattern));
- deleterOptions.writeConcern = writeConcern;
-
- // No need to wait since all existing cursors will filter out this range when returning
- // the results
- deleterOptions.waitForOpenCursors = false;
- deleterOptions.fromMigrate = true;
- deleterOptions.onlyRemoveOrphanedDocs = true;
- deleterOptions.removeSaverReason = "preCleanup";
-
- if (!getDeleter()->deleteNow(opCtx, deleterOptions, &_errmsg)) {
- warning() << "Failed to queue delete for migrate abort: " << redact(_errmsg);
- setState(FAIL);
- return;
- }
+ // 2. Synchronously delete any data which might have been left orphaned in the range
+ // being moved, and wait for completion
- Status status = _notePending(opCtx, _nss, min, max, epoch);
+ Status status = _beginReceive(opCtx, _nss, min, max);
if (!status.isOK()) {
_errmsg = status.reason();
setState(FAIL);
@@ -990,80 +971,43 @@ bool MigrationDestinationManager::_flushPendingWrites(OperationContext* opCtx,
return true;
}
-Status MigrationDestinationManager::_notePending(OperationContext* opCtx,
- const NamespaceString& nss,
- const BSONObj& min,
- const BSONObj& max,
- const OID& epoch) {
- AutoGetCollection autoColl(opCtx, nss, MODE_IX, MODE_X);
-
- auto css = CollectionShardingState::get(opCtx, nss);
- auto metadata = css->getMetadata();
-
- // This can currently happen because drops aren't synchronized with in-migrations. The idea
- // for checking this here is that in the future we shouldn't have this problem.
- if (!metadata || metadata->getCollVersion().epoch() != epoch) {
- return {ErrorCodes::StaleShardVersion,
- str::stream() << "could not note chunk [" << min << "," << max << ")"
- << " as pending because the epoch for "
- << nss.ns()
- << " has changed from "
- << epoch
- << " to "
- << (metadata ? metadata->getCollVersion().epoch()
- : ChunkVersion::UNSHARDED().epoch())};
- }
-
- css->beginReceive(ChunkRange(min, max));
-
- stdx::lock_guard<stdx::mutex> sl(_mutex);
- invariant(!_chunkMarkedPending);
- _chunkMarkedPending = true;
-
- return Status::OK();
-}
-
-Status MigrationDestinationManager::_forgetPending(OperationContext* opCtx,
- const NamespaceString& nss,
- const BSONObj& min,
- const BSONObj& max,
- const OID& epoch) {
+Status MigrationDestinationManager::_beginReceive(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const BSONObj& min,
+ const BSONObj& max) {
+ ChunkRange footprint(min, max);
{
- stdx::lock_guard<stdx::mutex> sl(_mutex);
- if (!_chunkMarkedPending) {
+ AutoGetCollection autoColl(opCtx, nss, MODE_IX, MODE_X);
+ if (autoColl.getCollection() == nullptr) {
return Status::OK();
}
-
- _chunkMarkedPending = false;
+ auto css = CollectionShardingState::get(opCtx, nss);
+
+ // start clearing any leftovers that would be in the new chunk
+ if (!css->beginReceive(footprint)) {
+ // TODO: assign a stable code
+ return {ErrorCodes::Error(17377), // ErrorCodes::RangeMayBeInUse,
+ str::stream() << "Collection " << nss.ns() << " range [" << redact(min) << ", "
+ << redact(max)
+ << ") migration aborted; documents in range may still"
+ " be in use on the destination shard."};
+ }
}
- AutoGetCollection autoColl(opCtx, nss, MODE_IX, MODE_X);
+ return CollectionShardingState::waitForClean(opCtx, nss, footprint);
+}
+void MigrationDestinationManager::_forgetReceive(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const BSONObj& min,
+ const BSONObj& max) {
+ AutoGetCollection autoColl(opCtx, nss, MODE_IX, MODE_X);
+ if (autoColl.getCollection() == nullptr) {
+ return;
+ }
auto css = CollectionShardingState::get(opCtx, nss);
auto metadata = css->getMetadata();
-
- // This can currently happen because drops aren't synchronized with in-migrations. The idea
- // for checking this here is that in the future we shouldn't have this problem.
- if (!metadata || metadata->getCollVersion().epoch() != epoch) {
- return {ErrorCodes::StaleShardVersion,
- str::stream() << "no need to forget pending chunk "
- << "["
- << min
- << ","
- << max
- << ")"
- << " because the epoch for "
- << nss.ns()
- << " has changed from "
- << epoch
- << " to "
- << (metadata ? metadata->getCollVersion().epoch()
- : ChunkVersion::UNSHARDED().epoch())};
- }
-
css->forgetReceive(ChunkRange(min, max));
-
- return Status::OK();
}
} // namespace mongo
diff --git a/src/mongo/db/s/migration_destination_manager.h b/src/mongo/db/s/migration_destination_manager.h
index c42ce1cdcb2..7763566a0c6 100644
--- a/src/mongo/db/s/migration_destination_manager.h
+++ b/src/mongo/db/s/migration_destination_manager.h
@@ -154,15 +154,11 @@ private:
*
* Overlapping pending ranges will be removed, so it is only safe to use this when you know
* your metadata view is definitive, such as at the start of a migration.
- *
- * TODO: Because migrations may currently be active when a collection drops, an epoch is
- * necessary to ensure the pending metadata change is still applicable.
*/
- Status _notePending(OperationContext* opCtx,
- const NamespaceString& nss,
- const BSONObj& min,
- const BSONObj& max,
- const OID& epoch);
+ Status _beginReceive(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const BSONObj& min,
+ const BSONObj& max);
/**
* Stops tracking a chunk range between 'min' and 'max' that previously was having data
@@ -170,15 +166,11 @@ private:
*
* To avoid removing pending ranges of other operations, ensure that this is only used when
* a migration is still active.
- *
- * TODO: Because migrations may currently be active when a collection drops, an epoch is
- * necessary to ensure the pending metadata change is still applicable.
*/
- Status _forgetPending(OperationContext* opCtx,
- const NamespaceString& nss,
- const BSONObj& min,
- const BSONObj& max,
- const OID& epoch);
+ void _forgetReceive(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const BSONObj& min,
+ const BSONObj& max);
/**
* Checks whether the MigrationDestinationManager is currently handling a migration by checking
diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp
index 374b859ec15..f3d25b18d48 100644
--- a/src/mongo/db/s/migration_source_manager.cpp
+++ b/src/mongo/db/s/migration_source_manager.cpp
@@ -375,7 +375,8 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* opC
if (refreshStatus.isOK()) {
AutoGetCollection autoColl(opCtx, getNss(), MODE_IS);
- auto refreshedMetadata = CollectionShardingState::get(opCtx, getNss())->getMetadata();
+ auto css = CollectionShardingState::get(opCtx, getNss());
+ auto refreshedMetadata = css->getMetadata();
if (!refreshedMetadata) {
return {ErrorCodes::NamespaceNotSharded,
@@ -391,6 +392,10 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* opC
<< migrationCommitStatus.reason()};
}
+ // Schedule clearing out orphaned documents when they are no longer in active use.
+ auto orphans = ChunkRange(_args.getMinKey(), _args.getMaxKey());
+ css->cleanUpRange(orphans);
+
// Migration succeeded
log() << "Migration succeeded and updated collection version to "
<< refreshedMetadata->getCollVersion();
@@ -405,7 +410,8 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* opC
// We don't know whether migration succeeded or failed
return {migrationCommitStatus.code(),
- str::stream() << "Failed to refresh metadata after migration commit due to "
+ str::stream() << "Orphaned range not cleaned up. Failed to refresh metadata after"
+ " migration commit due to "
<< refreshStatus.toString()};
}
diff --git a/src/mongo/db/s/move_chunk_command.cpp b/src/mongo/db/s/move_chunk_command.cpp
index 08ea3723920..fa646974985 100644
--- a/src/mongo/db/s/move_chunk_command.cpp
+++ b/src/mongo/db/s/move_chunk_command.cpp
@@ -36,9 +36,11 @@
#include "mongo/db/auth/authorization_manager.h"
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/commands.h"
+#include "mongo/db/db_raii.h"
#include "mongo/db/range_deleter_service.h"
#include "mongo/db/s/chunk_move_write_concern_options.h"
#include "mongo/db/s/collection_metadata.h"
+#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/s/migration_source_manager.h"
#include "mongo/db/s/move_timing_helper.h"
#include "mongo/db/s/sharding_state.h"
@@ -226,40 +228,13 @@ private:
moveTimingHelper.done(6);
MONGO_FAIL_POINT_PAUSE_WHILE_SET(moveChunkHangAtStep6);
}
-
- // Schedule the range deleter
- RangeDeleterOptions deleterOptions(KeyRange(moveChunkRequest.getNss().ns(),
- moveChunkRequest.getMinKey().getOwned(),
- moveChunkRequest.getMaxKey().getOwned(),
- shardKeyPattern));
- deleterOptions.writeConcern = writeConcernForRangeDeleter;
- deleterOptions.waitForOpenCursors = true;
- deleterOptions.fromMigrate = true;
- deleterOptions.onlyRemoveOrphanedDocs = true;
- deleterOptions.removeSaverReason = "post-cleanup";
-
+ auto range = ChunkRange(moveChunkRequest.getMinKey(), moveChunkRequest.getMaxKey());
if (moveChunkRequest.getWaitForDelete()) {
- log() << "doing delete inline for cleanup of chunk data";
-
- string errMsg;
-
- // This is an immediate delete, and as a consequence, there could be more
- // deletes happening simultaneously than there are deleter worker threads.
- if (!getDeleter()->deleteNow(opCtx, deleterOptions, &errMsg)) {
- log() << "Error occured while performing cleanup: " << redact(errMsg);
- }
+ CollectionShardingState::waitForClean(opCtx, moveChunkRequest.getNss(), range);
} else {
- log() << "forking for cleanup of chunk data";
-
- string errMsg;
- if (!getDeleter()->queueDelete(opCtx,
- deleterOptions,
- NULL, // Don't want to be notified
- &errMsg)) {
- log() << "could not queue migration cleanup: " << redact(errMsg);
- }
+ log() << "Leaving cleanup of " << moveChunkRequest.getNss().ns() << " range ("
+ << range.getMin() << ", " << range.getMax() << "] to complete in background";
}
-
moveTimingHelper.done(7);
MONGO_FAIL_POINT_PAUSE_WHILE_SET(moveChunkHangAtStep7);
}
diff --git a/src/mongo/db/s/sharding_state.cpp b/src/mongo/db/s/sharding_state.cpp
index 2646af582dd..df9a1c9da61 100644
--- a/src/mongo/db/s/sharding_state.cpp
+++ b/src/mongo/db/s/sharding_state.cpp
@@ -140,8 +140,7 @@ MONGO_INITIALIZER_WITH_PREREQUISITES(MongoDLocalShardingInfo, ("SetGlobalEnviron
ShardingState::ShardingState()
: _initializationState(static_cast<uint32_t>(InitializationState::kNew)),
_initializationStatus(Status(ErrorCodes::InternalError, "Uninitialized value")),
- _globalInit(&initializeGlobalShardingStateForMongod),
- _scheduleWorkFn([](NamespaceString nss) {}) {}
+ _globalInit(&initializeGlobalShardingStateForMongod) {}
ShardingState::~ShardingState() = default;
@@ -237,14 +236,6 @@ void ShardingState::setGlobalInitMethodForTest(GlobalInitFunc func) {
_globalInit = func;
}
-void ShardingState::setScheduleCleanupFunctionForTest(RangeDeleterCleanupNotificationFunc fn) {
- _scheduleWorkFn = fn;
-}
-
-void ShardingState::scheduleCleanup(const NamespaceString& nss) {
- _scheduleWorkFn(nss);
-}
-
Status ShardingState::onStaleShardVersion(OperationContext* opCtx,
const NamespaceString& nss,
const ChunkVersion& expectedVersion) {
@@ -362,7 +353,6 @@ Status ShardingState::initializeFromShardIdentity(OperationContext* opCtx,
_shardName = shardIdentity.getShardName();
_clusterId = shardIdentity.getClusterId();
- _initializeRangeDeleterTaskExecutor();
return status;
} catch (const DBException& ex) {
@@ -620,16 +610,24 @@ Status ShardingState::updateShardIdentityConfigString(OperationContext* opCtx,
return Status::OK();
}
-void ShardingState::_initializeRangeDeleterTaskExecutor() {
- invariant(!_rangeDeleterTaskExecutor);
- auto net =
- executor::makeNetworkInterface("NetworkInterfaceCollectionRangeDeleter-TaskExecutor");
- auto netPtr = net.get();
- _rangeDeleterTaskExecutor = stdx::make_unique<executor::ThreadPoolTaskExecutor>(
- stdx::make_unique<executor::NetworkInterfaceThreadPool>(netPtr), std::move(net));
+executor::TaskExecutor* ShardingState::getRangeDeleterTaskExecutor() {
+ stdx::lock_guard<stdx::mutex> lk(_rangeDeleterExecutor.lock);
+ if (_rangeDeleterExecutor.motor.get() == nullptr) {
+ static const char kExecName[] = "NetworkInterfaceCollectionRangeDeleter-TaskExecutor";
+ auto net = executor::makeNetworkInterface(kExecName);
+ auto pool = stdx::make_unique<executor::NetworkInterfaceThreadPool>(net.get());
+ _rangeDeleterExecutor.motor =
+ stdx::make_unique<executor::ThreadPoolTaskExecutor>(std::move(pool), std::move(net));
+ _rangeDeleterExecutor.motor->startup();
+ }
+ return _rangeDeleterExecutor.motor.get();
}
-executor::ThreadPoolTaskExecutor* ShardingState::getRangeDeleterTaskExecutor() {
- return _rangeDeleterTaskExecutor.get();
+ShardingState::RangeDeleterExecutor::~RangeDeleterExecutor() {
+ if (motor) {
+ motor->shutdown();
+ motor->join();
+ }
}
+
} // namespace mongo
diff --git a/src/mongo/db/s/sharding_state.h b/src/mongo/db/s/sharding_state.h
index e5a74e081a9..c54fce68538 100644
--- a/src/mongo/db/s/sharding_state.h
+++ b/src/mongo/db/s/sharding_state.h
@@ -73,10 +73,6 @@ public:
using GlobalInitFunc =
stdx::function<Status(OperationContext*, const ConnectionString&, StringData)>;
- // Signature for the callback function used by the MetadataManager to inform the
- // sharding subsystem that there is range cleanup work to be done.
- using RangeDeleterCleanupNotificationFunc = stdx::function<void(const NamespaceString&)>;
-
ShardingState();
~ShardingState();
@@ -242,18 +238,6 @@ public:
void scheduleCleanup(const NamespaceString& nss);
/**
- * Returns a pointer to the collection range deleter task executor.
- */
- executor::ThreadPoolTaskExecutor* getRangeDeleterTaskExecutor();
-
- /**
- * Sets the function used by scheduleWorkOnRangeDeleterTaskExecutor to
- * schedule work. Used for mocking the executor for testing. See the ShardingState
- * for the default implementation of _scheduleWorkFn.
- */
- void setScheduleCleanupFunctionForTest(RangeDeleterCleanupNotificationFunc fn);
-
- /**
* If started with --shardsvr, initializes sharding awareness from the shardIdentity document
* on disk, if there is one.
* If started with --shardsvr in queryableBackupMode, initializes sharding awareness from the
@@ -266,6 +250,11 @@ public:
*/
StatusWith<bool> initializeShardingAwarenessIfNeeded(OperationContext* opCtx);
+ /**
+ * Return the task executor to be shared by the range deleters for all collections.
+ */
+ executor::TaskExecutor* getRangeDeleterTaskExecutor();
+
private:
// Map from a namespace into the sharding state for each collection we have
typedef stdx::unordered_map<std::string, std::unique_ptr<CollectionShardingState>>
@@ -304,9 +293,6 @@ private:
*/
ChunkVersion _refreshMetadata(OperationContext* opCtx, const NamespaceString& nss);
- // Initializes a TaskExecutor for cleaning up orphaned ranges
- void _initializeRangeDeleterTaskExecutor();
-
// Manages the state of the migration recipient shard
MigrationDestinationManager _migrationDestManager;
@@ -339,12 +325,13 @@ private:
// Function for initializing the external sharding state components not owned here.
GlobalInitFunc _globalInit;
- // Function for scheduling work on the _rangeDeleterTaskExecutor.
- // Used in call to scheduleCleanup(NamespaceString).
- RangeDeleterCleanupNotificationFunc _scheduleWorkFn;
-
- // Task executor for the collection range deleter.
- std::unique_ptr<executor::ThreadPoolTaskExecutor> _rangeDeleterTaskExecutor;
+ // Task executor shared by the collection range deleters.
+ struct RangeDeleterExecutor {
+ stdx::mutex lock{};
+ std::unique_ptr<executor::TaskExecutor> motor{nullptr};
+ ~RangeDeleterExecutor();
+ };
+ RangeDeleterExecutor _rangeDeleterExecutor;
};
} // namespace mongo
diff --git a/src/mongo/s/catalog/type_chunk.cpp b/src/mongo/s/catalog/type_chunk.cpp
index a114f86a681..eb286b7a631 100644
--- a/src/mongo/s/catalog/type_chunk.cpp
+++ b/src/mongo/s/catalog/type_chunk.cpp
@@ -133,6 +133,28 @@ bool ChunkRange::operator!=(const ChunkRange& other) const {
return !(*this == other);
}
+bool ChunkRange::covers(ChunkRange const& other) const {
+ auto le = [](auto const& a, auto const& b) { return a.woCompare(b) <= 0; };
+ return le(_minKey, other._minKey) && le(other._maxKey, _maxKey);
+}
+
+boost::optional<ChunkRange> ChunkRange::overlapWith(ChunkRange const& other) const {
+ auto le = [](auto const& a, auto const& b) { return a.woCompare(b) <= 0; };
+ if (le(other._maxKey, _minKey) || le(_maxKey, other._minKey)) {
+ return boost::none;
+ }
+ return ChunkRange(le(_minKey, other._minKey) ? other._minKey : _minKey,
+ le(_maxKey, other._maxKey) ? _maxKey : other._maxKey);
+}
+
+ChunkRange ChunkRange::unionWith(ChunkRange const& other) const {
+ auto le = [](auto const& a, auto const& b) { return a.woCompare(b) <= 0; };
+ return ChunkRange(le(_minKey, other._minKey) ? _minKey : other._minKey,
+ le(_maxKey, other._maxKey) ? other._maxKey : _maxKey);
+}
+
+// ChunkType
+
ChunkType::ChunkType() = default;
ChunkType::ChunkType(NamespaceString nss, ChunkRange range, ChunkVersion version, ShardId shardId)
diff --git a/src/mongo/s/catalog/type_chunk.h b/src/mongo/s/catalog/type_chunk.h
index 9827b5a3a79..b1bf1611880 100644
--- a/src/mongo/s/catalog/type_chunk.h
+++ b/src/mongo/s/catalog/type_chunk.h
@@ -83,9 +83,24 @@ public:
bool operator==(const ChunkRange& other) const;
bool operator!=(const ChunkRange& other) const;
-private:
- BSONObj _minKey;
- BSONObj _maxKey;
+ /**
+ * Returns true iff the union of *this and the argument range is the same as *this.
+ */
+ bool covers(ChunkRange const& other) const;
+
+ /**
+ * Returns the range of overlap between *this and other, if any.
+ */
+ boost::optional<ChunkRange> overlapWith(ChunkRange const& other) const;
+
+ /**
+ * Returns a range that includes *this and other. If the ranges do not overlap, it includes
+ * all the space between, as well.
+ */
+ ChunkRange unionWith(ChunkRange const& other) const;
+
+ const BSONObj _minKey;
+ const BSONObj _maxKey;
};
/**
diff --git a/src/mongo/s/catalog/type_chunk_test.cpp b/src/mongo/s/catalog/type_chunk_test.cpp
index 4589e4d4c38..eabce7ed879 100644
--- a/src/mongo/s/catalog/type_chunk_test.cpp
+++ b/src/mongo/s/catalog/type_chunk_test.cpp
@@ -245,6 +245,65 @@ TEST(ChunkRange, BasicBSONParsing) {
ASSERT_BSONOBJ_EQ(BSON("x" << 10), chunkRange.getMax());
}
+TEST(ChunkRange, Covers) {
+ auto target = ChunkRange(BSON("x" << 5), BSON("x" << 10));
+ ASSERT(!target.covers(ChunkRange(BSON("x" << 0), BSON("x" << 5))));
+ ASSERT(!target.covers(ChunkRange(BSON("x" << 10), BSON("x" << 15))));
+ ASSERT(!target.covers(ChunkRange(BSON("x" << 0), BSON("x" << 7))));
+ ASSERT(!target.covers(ChunkRange(BSON("x" << 7), BSON("x" << 15))));
+ ASSERT(!target.covers(ChunkRange(BSON("x" << 0), BSON("x" << 15))));
+ ASSERT(!target.covers(ChunkRange(BSON("x" << 0), BSON("x" << 10))));
+ ASSERT(!target.covers(ChunkRange(BSON("x" << 5), BSON("x" << 15))));
+ ASSERT(target.covers(ChunkRange(BSON("x" << 5), BSON("x" << 10))));
+ ASSERT(target.covers(ChunkRange(BSON("x" << 6), BSON("x" << 10))));
+ ASSERT(target.covers(ChunkRange(BSON("x" << 5), BSON("x" << 9))));
+ ASSERT(target.covers(ChunkRange(BSON("x" << 6), BSON("x" << 9))));
+}
+
+TEST(ChunkRange, Overlap) {
+ auto target = ChunkRange(BSON("x" << 5), BSON("x" << 10));
+ ASSERT(!target.overlapWith(ChunkRange(BSON("x" << 0), BSON("x" << 5))));
+ ASSERT(!target.overlapWith(ChunkRange(BSON("x" << 0), BSON("x" << 4))));
+ ASSERT(!target.overlapWith(ChunkRange(BSON("x" << 10), BSON("x" << 15))));
+ ASSERT(!target.overlapWith(ChunkRange(BSON("x" << 11), BSON("x" << 15))));
+ ASSERT(ChunkRange(BSON("x" << 7), BSON("x" << 10)) ==
+ *target.overlapWith(ChunkRange(BSON("x" << 7), BSON("x" << 15))));
+ ASSERT(ChunkRange(BSON("x" << 5), BSON("x" << 10)) ==
+ *target.overlapWith(ChunkRange(BSON("x" << 0), BSON("x" << 10))));
+ ASSERT(ChunkRange(BSON("x" << 5), BSON("x" << 10)) ==
+ *target.overlapWith(ChunkRange(BSON("x" << 0), BSON("x" << 15))));
+ ASSERT(ChunkRange(BSON("x" << 5), BSON("x" << 10)) ==
+ *target.overlapWith(ChunkRange(BSON("x" << 5), BSON("x" << 15))));
+ ASSERT(ChunkRange(BSON("x" << 5), BSON("x" << 9)) ==
+ *target.overlapWith(ChunkRange(BSON("x" << 0), BSON("x" << 9))));
+ ASSERT(ChunkRange(BSON("x" << 9), BSON("x" << 10)) ==
+ *target.overlapWith(ChunkRange(BSON("x" << 9), BSON("x" << 15))));
+}
+
+TEST(ChunkRange, Union) {
+ auto target = ChunkRange(BSON("x" << 5), BSON("x" << 10));
+ ASSERT(ChunkRange(BSON("x" << 0), BSON("x" << 10)) ==
+ target.unionWith(ChunkRange(BSON("x" << 0), BSON("x" << 5))));
+ ASSERT(ChunkRange(BSON("x" << 0), BSON("x" << 10)) ==
+ target.unionWith(ChunkRange(BSON("x" << 0), BSON("x" << 4))));
+ ASSERT(ChunkRange(BSON("x" << 5), BSON("x" << 15)) ==
+ target.unionWith(ChunkRange(BSON("x" << 10), BSON("x" << 15))));
+ ASSERT(ChunkRange(BSON("x" << 5), BSON("x" << 15)) ==
+ target.unionWith(ChunkRange(BSON("x" << 11), BSON("x" << 15))));
+ ASSERT(ChunkRange(BSON("x" << 5), BSON("x" << 15)) ==
+ target.unionWith(ChunkRange(BSON("x" << 7), BSON("x" << 15))));
+ ASSERT(ChunkRange(BSON("x" << 0), BSON("x" << 10)) ==
+ target.unionWith(ChunkRange(BSON("x" << 0), BSON("x" << 10))));
+ ASSERT(ChunkRange(BSON("x" << 0), BSON("x" << 14)) ==
+ target.unionWith(ChunkRange(BSON("x" << 0), BSON("x" << 14))));
+ ASSERT(ChunkRange(BSON("x" << 5), BSON("x" << 15)) ==
+ target.unionWith(ChunkRange(BSON("x" << 5), BSON("x" << 15))));
+ ASSERT(ChunkRange(BSON("x" << 0), BSON("x" << 10)) ==
+ target.unionWith(ChunkRange(BSON("x" << 0), BSON("x" << 9))));
+ ASSERT(ChunkRange(BSON("x" << 5), BSON("x" << 15)) ==
+ target.unionWith(ChunkRange(BSON("x" << 9), BSON("x" << 15))));
+}
+
TEST(ChunkRange, MinGreaterThanMaxShouldError) {
auto parseStatus =
ChunkRange::fromBSON(BSON("min" << BSON("x" << 10) << "max" << BSON("x" << 0)));