diff options
author | Kaloian Manassiev <kaloianm@mongodb.com> | 2017-11-06 15:08:31 +0200 |
---|---|---|
committer | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2017-11-09 02:19:41 -0500 |
commit | 5f9e9f4291d6d0b345844d55517cedeb1d190d65 (patch) | |
tree | 44ad5c4f8672310c6b6427b54ab7f765a58963f1 | |
parent | d0ecca650c1f506db0cb6a4e58f0d0f112304b28 (diff) | |
download | mongo-5f9e9f4291d6d0b345844d55517cedeb1d190d65.tar.gz |
SERVER-31848 Cleanup CollectionRangeDeleter
-rw-r--r-- | src/mongo/db/s/collection_range_deleter.cpp | 336 | ||||
-rw-r--r-- | src/mongo/db/s/collection_range_deleter.h | 26 | ||||
-rw-r--r-- | src/mongo/db/s/collection_range_deleter_test.cpp | 110 | ||||
-rw-r--r-- | src/mongo/db/s/collection_sharding_state.cpp | 32 | ||||
-rw-r--r-- | src/mongo/db/s/collection_sharding_state.h | 7 | ||||
-rw-r--r-- | src/mongo/db/s/metadata_manager.cpp | 24 | ||||
-rw-r--r-- | src/mongo/db/s/metadata_manager_test.cpp | 12 | ||||
-rw-r--r-- | src/mongo/db/s/migration_destination_manager.cpp | 24 | ||||
-rw-r--r-- | src/mongo/db/s/migration_destination_manager.h | 6 | ||||
-rw-r--r-- | src/mongo/db/s/migration_source_manager.cpp | 6 |
10 files changed, 307 insertions, 276 deletions
diff --git a/src/mongo/db/s/collection_range_deleter.cpp b/src/mongo/db/s/collection_range_deleter.cpp index 2e17d047d5d..8a7054c914a 100644 --- a/src/mongo/db/s/collection_range_deleter.cpp +++ b/src/mongo/db/s/collection_range_deleter.cpp @@ -61,137 +61,168 @@ #include "mongo/util/scopeguard.h" namespace mongo { - -class ChunkRange; - -using CallbackArgs = executor::TaskExecutor::CallbackArgs; -using logger::LogComponent; - namespace { +using Deletion = CollectionRangeDeleter::Deletion; +using DeleteNotification = CollectionRangeDeleter::DeleteNotification; + const WriteConcernOptions kMajorityWriteConcern(WriteConcernOptions::kMajority, WriteConcernOptions::SyncMode::UNSET, Seconds(60)); -} // unnamed namespace + +boost::optional<DeleteNotification> checkOverlap(std::list<Deletion> const& deletions, + ChunkRange const& range) { + // Start search with newest entries by using reverse iterators + auto it = find_if(deletions.rbegin(), deletions.rend(), [&](auto& cleanee) { + return bool(cleanee.range.overlapWith(range)); + }); + + if (it != deletions.rend()) + return it->notification; + + return boost::none; +} + +} // namespace + +CollectionRangeDeleter::CollectionRangeDeleter() = default; CollectionRangeDeleter::~CollectionRangeDeleter() { - // notify anybody still sleeping on orphan ranges - clear(Status{ErrorCodes::InterruptedDueToReplStateChange, - "Collection sharding metadata discarded"}); + // Notify anybody still sleeping on orphan ranges + clear({ErrorCodes::InterruptedDueToReplStateChange, "Collection sharding metadata discarded"}); } -auto CollectionRangeDeleter::cleanUpNextRange(OperationContext* opCtx, - NamespaceString const& nss, - OID const& epoch, - int maxToDelete, - CollectionRangeDeleter* forTestOnly) - -> boost::optional<Date_t> { +boost::optional<Date_t> CollectionRangeDeleter::cleanUpNextRange( + OperationContext* opCtx, + NamespaceString const& nss, + OID const& epoch, + int maxToDelete, + CollectionRangeDeleter* forTestOnly) { StatusWith<int> wrote = 0; auto range = boost::optional<ChunkRange>(boost::none); auto notification = DeleteNotification(); + { AutoGetCollection autoColl(opCtx, nss, MODE_IX); - auto* collection = autoColl.getCollection(); - auto* css = CollectionShardingState::get(opCtx, nss); - { - auto scopedCollectionMetadata = css->getMetadata(); - if (!forTestOnly && (!collection || !scopedCollectionMetadata)) { - if (!collection) { - log() << "Abandoning any range deletions left over from dropped " << nss.ns(); - } else { - log() << "Abandoning any range deletions left over from previously sharded" - << nss.ns(); - } - stdx::lock_guard<stdx::mutex> lk(css->_metadataManager->_managerLock); - css->_metadataManager->_clearAllCleanups(lk); - return boost::none; + + auto* const collection = autoColl.getCollection(); + auto* const css = CollectionShardingState::get(opCtx, nss); + auto* const self = forTestOnly ? forTestOnly : &css->_metadataManager->_rangesToClean; + + auto scopedCollectionMetadata = css->getMetadata(); + + if (!forTestOnly && (!collection || !scopedCollectionMetadata)) { + if (!collection) { + LOG(0) << "Abandoning any range deletions left over from dropped " << nss.ns(); + } else { + LOG(0) << "Abandoning any range deletions left over from previously sharded" + << nss.ns(); } - if (!forTestOnly && scopedCollectionMetadata->getCollVersion().epoch() != epoch) { - LOG(1) << "Range deletion task for " << nss.ns() << " epoch " << epoch << " woke;" - << " (current is " << scopedCollectionMetadata->getCollVersion() << ")"; + + stdx::lock_guard<stdx::mutex> lk(css->_metadataManager->_managerLock); + css->_metadataManager->_clearAllCleanups(lk); + return boost::none; + } + + if (!forTestOnly && scopedCollectionMetadata->getCollVersion().epoch() != epoch) { + LOG(1) << "Range deletion task for " << nss.ns() << " epoch " << epoch << " woke;" + << " (current is " << scopedCollectionMetadata->getCollVersion() << ")"; + return boost::none; + } + + bool writeOpLog = false; + + { + stdx::lock_guard<stdx::mutex> scopedLock(css->_metadataManager->_managerLock); + if (self->isEmpty()) { + LOG(1) << "No further range deletions scheduled on " << nss.ns(); return boost::none; } - auto self = forTestOnly ? forTestOnly : &css->_metadataManager->_rangesToClean; - bool writeOpLog = false; - { - stdx::lock_guard<stdx::mutex> scopedLock(css->_metadataManager->_managerLock); - if (self->isEmpty()) { - LOG(1) << "No further range deletions scheduled on " << nss.ns(); - return boost::none; - } - auto& o = self->_orphans; - if (o.empty()) { - // We have delayed deletions; see if any are ready. - auto& df = self->_delayedOrphans.front(); - if (df.whenToDelete > Date_t::now()) { - log() << "Deferring deletion of " << nss.ns() << " range " - << redact(df.range.toString()) << " until " << df.whenToDelete; - return df.whenToDelete; - } - // Move a single range from _delayedOrphans to _orphans: - o.splice(o.end(), self->_delayedOrphans, self->_delayedOrphans.begin()); - LOG(1) << "Proceeding with deferred deletion of " << nss.ns() << " range " - << redact(o.front().range.toString()); - writeOpLog = true; - } - invariant(!o.empty()); - const auto& frontRange = o.front().range; - range.emplace(frontRange.getMin().getOwned(), frontRange.getMax().getOwned()); - notification = o.front().notification; - } - invariant(range); - - if (writeOpLog) { - // clang-format off - // Secondaries will watch for this update, and kill any queries that may depend on - // documents in the range -- excepting any queries with a read-concern option - // 'ignoreChunkMigration' - try { - auto& serverConfigurationNss = NamespaceString::kServerConfigurationNamespace; - auto epoch = scopedCollectionMetadata->getCollVersion().epoch(); - AutoGetCollection autoAdmin(opCtx, serverConfigurationNss, MODE_IX); - - Helpers::upsert(opCtx, serverConfigurationNss.ns(), - BSON("_id" << "startRangeDeletion" << "ns" << nss.ns() << "epoch" << epoch - << "min" << range->getMin() << "max" << range->getMax())); - - } catch (DBException const& e) { - stdx::lock_guard<stdx::mutex> scopedLock(css->_metadataManager->_managerLock); - css->_metadataManager->_clearAllCleanups( - scopedLock, - {e.code(), - str::stream() << "cannot push startRangeDeletion record to Op Log," - " abandoning scheduled range deletions: " << e.what()}); - return boost::none; + + auto& orphans = self->_orphans; + if (orphans.empty()) { + // We have delayed deletions; see if any are ready. + auto& df = self->_delayedOrphans.front(); + if (df.whenToDelete > Date_t::now()) { + LOG(0) << "Deferring deletion of " << nss.ns() << " range " + << redact(df.range.toString()) << " until " << df.whenToDelete; + return df.whenToDelete; } - // clang-format on + + // Move a single range from _delayedOrphans to _orphans + orphans.splice(orphans.end(), self->_delayedOrphans, self->_delayedOrphans.begin()); + LOG(1) << "Proceeding with deferred deletion of " << nss.ns() << " range " + << redact(orphans.front().range.toString()); + + writeOpLog = true; } - try { - auto keyPattern = scopedCollectionMetadata->getKeyPattern(); + invariant(!orphans.empty()); + const auto& frontRange = orphans.front().range; + range.emplace(frontRange.getMin().getOwned(), frontRange.getMax().getOwned()); + notification = orphans.front().notification; + } + + invariant(range); - wrote = self->_doDeletion(opCtx, collection, keyPattern, *range, maxToDelete); + if (writeOpLog) { + // Secondaries will watch for this update, and kill any queries that may depend on + // documents in the range -- excepting any queries with a read-concern option + // 'ignoreChunkMigration' + try { + AutoGetCollection autoAdmin( + opCtx, NamespaceString::kServerConfigurationNamespace, MODE_IX); + + Helpers::upsert(opCtx, + NamespaceString::kServerConfigurationNamespace.ns(), + BSON("_id" + << "startRangeDeletion" + << "ns" + << nss.ns() + << "epoch" + << epoch + << "min" + << range->getMin() + << "max" + << range->getMax())); } catch (const DBException& e) { - wrote = e.toStatus(); - warning() << e.what(); - } - if (!wrote.isOK() || wrote.getValue() == 0) { - if (wrote.isOK()) { - log() << "No documents remain to delete in " << nss << " range " - << redact(range->toString()); - } stdx::lock_guard<stdx::mutex> scopedLock(css->_metadataManager->_managerLock); - self->_pop(wrote.getStatus()); - if (!self->_orphans.empty()) { - LOG(1) << "Deleting " << nss.ns() << " range " - << redact(self->_orphans.front().range.toString()) << " next."; - } - return Date_t{}; + css->_metadataManager->_clearAllCleanups( + scopedLock, + {e.code(), + str::stream() << "cannot push startRangeDeletion record to Op Log," + " abandoning scheduled range deletions: " + << e.what()}); + return boost::none; + } + } + + try { + const auto keyPattern = scopedCollectionMetadata->getKeyPattern(); + wrote = self->_doDeletion(opCtx, collection, keyPattern, *range, maxToDelete); + } catch (const DBException& e) { + wrote = e.toStatus(); + warning() << e.what(); + } + + if (!wrote.isOK() || wrote.getValue() == 0) { + if (wrote.isOK()) { + LOG(0) << "No documents remain to delete in " << nss << " range " + << redact(range->toString()); } - } // drop scopedCollectionMetadata - } // drop autoColl + + stdx::lock_guard<stdx::mutex> scopedLock(css->_metadataManager->_managerLock); + self->_pop(wrote.getStatus()); + if (!self->_orphans.empty()) { + LOG(1) << "Deleting " << nss.ns() << " range " + << redact(self->_orphans.front().range.toString()) << " next."; + } + + return Date_t{}; + } + } // drop autoColl invariant(range); invariantOK(wrote.getStatus()); @@ -201,33 +232,37 @@ auto CollectionRangeDeleter::cleanUpNextRange(OperationContext* opCtx, const auto clientOpTime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); // Wait for replication outside the lock - WriteConcernResult unusedWCResult; - Status status = Status::OK(); - try { - status = waitForWriteConcern(opCtx, clientOpTime, kMajorityWriteConcern, &unusedWCResult); - } catch (const DBException& e) { - status = e.toStatus(); - } + const auto status = [&] { + try { + WriteConcernResult unusedWCResult; + return waitForWriteConcern(opCtx, clientOpTime, kMajorityWriteConcern, &unusedWCResult); + } catch (const DBException& e) { + return e.toStatus(); + } + }(); + if (!status.isOK()) { - log() << "Error when waiting for write concern after removing " << nss << " range " - << redact(range->toString()) << " : " << redact(status.reason()); + LOG(0) << "Error when waiting for write concern after removing " << nss << " range " + << redact(range->toString()) << " : " << redact(status.reason()); AutoGetCollection autoColl(opCtx, nss, MODE_IX); - auto* css = CollectionShardingState::get(opCtx, nss); + auto* const css = CollectionShardingState::get(opCtx, nss); + stdx::lock_guard<stdx::mutex> scopedLock(css->_metadataManager->_managerLock); - auto* self = &css->_metadataManager->_rangesToClean; - // if range were already popped (e.g. by dropping nss during the waitForWriteConcern above) + auto* const self = &css->_metadataManager->_rangesToClean; + + // If range were already popped (e.g. by dropping nss during the waitForWriteConcern above) // its notification would have been triggered, so this check suffices to ensure that it is - // safe to pop the range here. + // safe to pop the range here if (!notification.ready()) { invariant(!self->isEmpty() && self->_orphans.front().notification == notification); - log() << "Abandoning deletion of latest range in " << nss.ns() << " after " - << wrote.getValue() << " local deletions because of replication failure"; + LOG(0) << "Abandoning deletion of latest range in " << nss.ns() << " after " + << wrote.getValue() << " local deletions because of replication failure"; self->_pop(status); } } else { - log() << "Deleted " << wrote.getValue() << " documents in " << nss.ns() << " range " - << redact(range->toString()); + LOG(0) << "Deleted " << wrote.getValue() << " documents in " << nss.ns() << " range " + << redact(range->toString()); } notification.abandon(); @@ -248,29 +283,30 @@ StatusWith<int> CollectionRangeDeleter::_doDeletion(OperationContext* opCtx, // select the index and get the full index keyPattern here. auto catalog = collection->getIndexCatalog(); const IndexDescriptor* idx = catalog->findShardKeyPrefixedIndex(opCtx, keyPattern, false); - if (idx == NULL) { + if (!idx) { std::string msg = str::stream() << "Unable to find shard key index for " << keyPattern.toString() << " in " << nss.ns(); - log() << msg; + LOG(0) << msg; return {ErrorCodes::InternalError, msg}; } // Extend bounds to match the index we found - KeyPattern indexKeyPattern(idx->keyPattern().getOwned()); - auto extend = [&](auto& key) { + const KeyPattern indexKeyPattern(idx->keyPattern()); + const auto extend = [&](const auto& key) { return Helpers::toKeyFormat(indexKeyPattern.extendRangeBound(key, false)); }; - const BSONObj& min = extend(range.getMin()); - const BSONObj& max = extend(range.getMax()); + + const auto min = extend(range.getMin()); + const auto max = extend(range.getMax()); LOG(1) << "begin removal of " << min << " to " << max << " in " << nss.ns(); - auto indexName = idx->indexName(); + const auto indexName = idx->indexName(); IndexDescriptor* descriptor = collection->getIndexCatalog()->findIndexByName(opCtx, indexName); if (!descriptor) { std::string msg = str::stream() << "shard key index with name " << indexName << " on '" << nss.ns() << "' was dropped"; - log() << msg; + LOG(0) << msg; return {ErrorCodes::InternalError, msg}; } @@ -296,10 +332,10 @@ StatusWith<int> CollectionRangeDeleter::_doDeletion(OperationContext* opCtx, break; } if (state == PlanExecutor::FAILURE || state == PlanExecutor::DEAD) { - warning(LogComponent::kSharding) - << PlanExecutor::statestr(state) << " - cursor error while trying to delete " << min - << " to " << max << " in " << nss << ": " << WorkingSetCommon::toStatusString(obj) - << ", stats: " << Explain::getWinningPlanStats(exec.get()); + warning() << PlanExecutor::statestr(state) << " - cursor error while trying to delete " + << redact(min) << " to " << redact(max) << " in " << nss << ": " + << WorkingSetCommon::toStatusString(obj) + << ", stats: " << Explain::getWinningPlanStats(exec.get()); break; } invariant(PlanExecutor::ADVANCED == state); @@ -307,7 +343,7 @@ StatusWith<int> CollectionRangeDeleter::_doDeletion(OperationContext* opCtx, writeConflictRetry(opCtx, "delete range", nss.ns(), [&] { WriteUnitOfWork wuow(opCtx); if (saver) { - saver->goingToDelete(obj).transitional_ignore(); + uassertStatusOK(saver->goingToDelete(obj)); } collection->deleteDocument(opCtx, kUninitializedStmtId, rloc, nullptr, true); wuow.commit(); @@ -317,23 +353,6 @@ StatusWith<int> CollectionRangeDeleter::_doDeletion(OperationContext* opCtx, return numDeleted; } -namespace { - -using Deletion = CollectionRangeDeleter::Deletion; -using DeleteNotification = CollectionRangeDeleter::DeleteNotification; -using Notif = boost::optional<DeleteNotification>; - -auto checkOverlap(std::list<Deletion> const& deletions, ChunkRange const& range) - -> boost::optional<DeleteNotification> { - // start search with newest entries by using reverse iterators - auto it = find_if(deletions.rbegin(), deletions.rend(), [&](auto& cleanee) { - return bool(cleanee.range.overlapWith(range)); - }); - return (it != deletions.rend()) ? Notif{it->notification} : Notif{boost::none}; -} - -} // namespace - auto CollectionRangeDeleter::overlaps(ChunkRange const& range) const -> boost::optional<DeleteNotification> { auto result = checkOverlap(_orphans, range); @@ -343,11 +362,12 @@ auto CollectionRangeDeleter::overlaps(ChunkRange const& range) const return checkOverlap(_delayedOrphans, range); } -auto CollectionRangeDeleter::add(std::list<Deletion> ranges) -> boost::optional<Date_t> { - // We ignore the case of overlapping, or even equal, ranges. - // Deleting overlapping ranges is quick. - bool wasScheduledImmediate = !_orphans.empty(); - bool wasScheduledLater = !_delayedOrphans.empty(); +boost::optional<Date_t> CollectionRangeDeleter::add(std::list<Deletion> ranges) { + // We ignore the case of overlapping, or even equal, ranges. Deleting overlapping ranges is + // quick. + const bool wasScheduledImmediate = !_orphans.empty(); + const bool wasScheduledLater = !_delayedOrphans.empty(); + while (!ranges.empty()) { if (ranges.front().whenToDelete != Date_t{}) { _delayedOrphans.splice(_delayedOrphans.end(), ranges, ranges.begin()); @@ -355,6 +375,7 @@ auto CollectionRangeDeleter::add(std::list<Deletion> ranges) -> boost::optional< _orphans.splice(_orphans.end(), ranges, ranges.begin()); } } + if (wasScheduledImmediate) { return boost::none; // already scheduled } else if (!_orphans.empty()) { @@ -364,6 +385,7 @@ auto CollectionRangeDeleter::add(std::list<Deletion> ranges) -> boost::optional< } else if (!_delayedOrphans.empty()) { return _delayedOrphans.front().whenToDelete; } + return boost::none; } diff --git a/src/mongo/db/s/collection_range_deleter.h b/src/mongo/db/s/collection_range_deleter.h index 2f3373d3f99..32bd61e2133 100644 --- a/src/mongo/db/s/collection_range_deleter.h +++ b/src/mongo/db/s/collection_range_deleter.h @@ -27,6 +27,8 @@ */ #pragma once +#include <list> + #include "mongo/base/disallow_copying.h" #include "mongo/db/namespace_string.h" #include "mongo/executor/task_executor.h" @@ -74,8 +76,10 @@ public: notification->set(status); } - // Sleeps waiting for notification, and returns notify's argument. - // On interruption, throws; calling waitStatus afterward returns failed status. + /** + * Sleeps waiting for notification, and returns notify's argument. On interruption, throws; + * calling waitStatus afterward returns failed status. + */ Status waitStatus(OperationContext* opCtx); bool ready() const { @@ -95,7 +99,6 @@ public: std::shared_ptr<Notification<Status>> notification; }; - struct Deletion { Deletion(ChunkRange r, Date_t when) : range(std::move(r)), whenToDelete(when) {} @@ -104,7 +107,7 @@ public: DeleteNotification notification{}; }; - CollectionRangeDeleter() = default; + CollectionRangeDeleter(); ~CollectionRangeDeleter(); // @@ -118,7 +121,7 @@ public: * Returns the time to begin deletions, if needed, or boost::none if deletions are already * scheduled. */ - auto add(std::list<Deletion> ranges) -> boost::optional<Date_t>; + boost::optional<Date_t> add(std::list<Deletion> ranges); /** * Reports whether the argument range overlaps any of the ranges to clean. If there is overlap, @@ -141,7 +144,7 @@ public: * Notifies with the specified status anything waiting on ranges scheduled, and then discards * the ranges and notifications. Is called in the destructor. */ - void clear(Status); + void clear(Status status); /* * Append a representation of self to the specified builder. @@ -159,12 +162,11 @@ public: * Argument 'forTestOnly' is used in unit tests that exercise the CollectionRangeDeleter class, * so that they do not need to set up CollectionShardingState and MetadataManager objects. */ - static auto cleanUpNextRange(OperationContext*, - NamespaceString const& nss, - OID const& epoch, - int maxToDelete, - CollectionRangeDeleter* forTestOnly = nullptr) - -> boost::optional<Date_t>; + static boost::optional<Date_t> cleanUpNextRange(OperationContext*, + NamespaceString const& nss, + OID const& epoch, + int maxToDelete, + CollectionRangeDeleter* forTestOnly = nullptr); private: /** diff --git a/src/mongo/db/s/collection_range_deleter_test.cpp b/src/mongo/db/s/collection_range_deleter_test.cpp index 30135befdbc..8939860c9be 100644 --- a/src/mongo/db/s/collection_range_deleter_test.cpp +++ b/src/mongo/db/s/collection_range_deleter_test.cpp @@ -28,8 +28,6 @@ #include "mongo/platform/basic.h" -#include "mongo/db/s/collection_range_deleter.h" - #include "mongo/bson/bsonobj.h" #include "mongo/bson/bsonobjbuilder.h" #include "mongo/client/query.h" @@ -41,6 +39,7 @@ #include "mongo/db/keypattern.h" #include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/db/repl/replication_coordinator_mock.h" +#include "mongo/db/s/collection_range_deleter.h" #include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/s/sharding_state.h" #include "mongo/db/service_context_d_test_fixture.h" @@ -51,9 +50,12 @@ #include "mongo/unittest/unittest.h" namespace mongo { +namespace { using unittest::assertGet; +using Deletion = CollectionRangeDeleter::Deletion; + const NamespaceString kNss = NamespaceString("foo", "bar"); const std::string kPattern = "_id"; const BSONObj kKeyPattern = BSON(kPattern << 1); @@ -62,79 +64,73 @@ const HostAndPort dummyHost("dummy", 123); const NamespaceString kAdminSysVer = NamespaceString("admin", "system.version"); class CollectionRangeDeleterTest : public ShardingMongodTestFixture { -public: - using Deletion = CollectionRangeDeleter::Deletion; - protected: + void setUp() override { + _epoch = OID::gen(); + serverGlobalParams.clusterRole = ClusterRole::ShardServer; + ShardingMongodTestFixture::setUp(); + replicationCoordinator()->alwaysAllowWrites(true); + initializeGlobalShardingStateForMongodForTest(ConnectionString(dummyHost)) + .transitional_ignore(); + + // RemoteCommandTargeterMock::get(shardRegistry()->getConfigShard()->getTargeter()) + // ->setConnectionStringReturnValue(kConfigConnStr); + + configTargeter()->setFindHostReturnValue(dummyHost); + + DBDirectClient(operationContext()).createCollection(kNss.ns()); + { + AutoGetCollection autoColl(operationContext(), kNss, MODE_IX); + auto collectionShardingState = CollectionShardingState::get(operationContext(), kNss); + const KeyPattern skPattern(kKeyPattern); + auto cm = ChunkManager::makeNew( + kNss, + UUID::gen(), + kKeyPattern, + nullptr, + false, + epoch(), + {ChunkType(kNss, + ChunkRange{skPattern.globalMin(), skPattern.globalMax()}, + ChunkVersion(1, 0, epoch()), + ShardId("otherShard"))}); + collectionShardingState->refreshMetadata( + operationContext(), + stdx::make_unique<CollectionMetadata>(cm, ShardId("thisShard"))); + } + } + + void tearDown() override { + { + AutoGetCollection autoColl(operationContext(), kNss, MODE_IX); + auto collectionShardingState = CollectionShardingState::get(operationContext(), kNss); + collectionShardingState->refreshMetadata(operationContext(), nullptr); + } + + ShardingMongodTestFixture::tearDown(); + } + boost::optional<Date_t> next(CollectionRangeDeleter& rangeDeleter, int maxToDelete) { return CollectionRangeDeleter::cleanUpNextRange( operationContext(), kNss, epoch(), maxToDelete, &rangeDeleter); } - std::shared_ptr<RemoteCommandTargeterMock> configTargeter() { + std::shared_ptr<RemoteCommandTargeterMock> configTargeter() const { return RemoteCommandTargeterMock::get(shardRegistry()->getConfigShard()->getTargeter()); } - OID const& epoch() { + OID const& epoch() const { return _epoch; } - virtual std::unique_ptr<BalancerConfiguration> makeBalancerConfiguration() override { + std::unique_ptr<BalancerConfiguration> makeBalancerConfiguration() override { return stdx::make_unique<BalancerConfiguration>(); } private: - void setUp() override; - void tearDown() override; - OID _epoch; }; -void CollectionRangeDeleterTest::setUp() { - _epoch = OID::gen(); - serverGlobalParams.clusterRole = ClusterRole::ShardServer; - ShardingMongodTestFixture::setUp(); - replicationCoordinator()->alwaysAllowWrites(true); - initializeGlobalShardingStateForMongodForTest(ConnectionString(dummyHost)) - .transitional_ignore(); - - // RemoteCommandTargeterMock::get(shardRegistry()->getConfigShard()->getTargeter()) - // ->setConnectionStringReturnValue(kConfigConnStr); - - configTargeter()->setFindHostReturnValue(dummyHost); - - DBDirectClient(operationContext()).createCollection(kNss.ns()); - { - AutoGetCollection autoColl(operationContext(), kNss, MODE_IX); - auto collectionShardingState = CollectionShardingState::get(operationContext(), kNss); - const KeyPattern skPattern(kKeyPattern); - auto cm = ChunkManager::makeNew( - kNss, - UUID::gen(), - kKeyPattern, - nullptr, - false, - epoch(), - {ChunkType(kNss, - ChunkRange{skPattern.globalMin(), skPattern.globalMax()}, - ChunkVersion(1, 0, epoch()), - ShardId("otherShard"))}); - collectionShardingState->refreshMetadata( - operationContext(), stdx::make_unique<CollectionMetadata>(cm, ShardId("thisShard"))); - } -} - -void CollectionRangeDeleterTest::tearDown() { - { - AutoGetCollection autoColl(operationContext(), kNss, MODE_IX); - auto collectionShardingState = CollectionShardingState::get(operationContext(), kNss); - collectionShardingState->refreshMetadata(operationContext(), nullptr); - } - ShardingMongodTestFixture::tearDown(); -} - -namespace { - // Tests the case that there is nothing in the database. TEST_F(CollectionRangeDeleterTest, EmptyDatabase) { CollectionRangeDeleter rangeDeleter; @@ -404,5 +400,5 @@ TEST_F(CollectionRangeDeleterTest, MultipleDocumentsInMultipleRangesToClean) { ASSERT_FALSE(next(rangeDeleter, 1)); } -} // unnamed namespace +} // namespace } // namespace mongo diff --git a/src/mongo/db/s/collection_sharding_state.cpp b/src/mongo/db/s/collection_sharding_state.cpp index 29ef4caf2b8..7407ed6cf53 100644 --- a/src/mongo/db/s/collection_sharding_state.cpp +++ b/src/mongo/db/s/collection_sharding_state.cpp @@ -176,7 +176,7 @@ void CollectionShardingState::forgetReceive(const ChunkRange& range) { } auto CollectionShardingState::cleanUpRange(ChunkRange const& range, CleanWhen when) - -> MetadataManager::CleanupNotification { + -> CleanupNotification { Date_t time = (when == kNow) ? Date_t{} : Date_t::now() + stdx::chrono::seconds{orphanCleanupDelaySecs.load()}; return _metadataManager->cleanUpRange(range, time); @@ -237,41 +237,45 @@ bool CollectionShardingState::collectionIsSharded() { // exist anymore at the time of the call, or indeed anytime outside the AutoGetCollection block, so // anything that might alias something in it must be copied first. -/* static */ Status CollectionShardingState::waitForClean(OperationContext* opCtx, - NamespaceString nss, + const NamespaceString& nss, OID const& epoch, ChunkRange orphanRange) { - do { - auto stillScheduled = boost::optional<CleanupNotification>(); + while (true) { + boost::optional<CleanupNotification> stillScheduled; + { AutoGetCollection autoColl(opCtx, nss, MODE_IX); - // First, see if collection was dropped. auto css = CollectionShardingState::get(opCtx, nss); + { + // First, see if collection was dropped, but do it in a separate scope in order to + // not hold reference on it, which would make it appear in use auto metadata = css->_metadataManager->getActiveMetadata(css->_metadataManager); if (!metadata || metadata->getCollVersion().epoch() != epoch) { return {ErrorCodes::StaleShardVersion, "Collection being migrated was dropped"}; } - } // drop metadata + } + stillScheduled = css->trackOrphanedDataCleanup(orphanRange); if (!stillScheduled) { log() << "Finished deleting " << nss.ns() << " range " << redact(orphanRange.toString()); return Status::OK(); } - } // drop collection lock + } log() << "Waiting for deletion of " << nss.ns() << " range " << orphanRange; + Status result = stillScheduled->waitStatus(opCtx); if (!result.isOK()) { - return Status{result.code(), - str::stream() << "Failed to delete orphaned " << nss.ns() << " range " - << orphanRange.toString() - << ": " - << result.reason()}; + return {result.code(), + str::stream() << "Failed to delete orphaned " << nss.ns() << " range " + << orphanRange.toString() + << " due to " + << result.reason()}; } - } while (true); + } MONGO_UNREACHABLE; } diff --git a/src/mongo/db/s/collection_sharding_state.h b/src/mongo/db/s/collection_sharding_state.h index 3ce7e8004bd..ecc3e5f1df2 100644 --- a/src/mongo/db/s/collection_sharding_state.h +++ b/src/mongo/db/s/collection_sharding_state.h @@ -199,9 +199,12 @@ public: /** * Tracks deletion of any documents within the range, returning when deletion is complete. - * Throws if the collection is dropped while it sleeps. Call this with the collection unlocked. + * Throws if the collection is dropped while it sleeps. */ - static Status waitForClean(OperationContext*, NamespaceString, OID const& epoch, ChunkRange); + static Status waitForClean(OperationContext* opCtx, + const NamespaceString& nss, + OID const& epoch, + ChunkRange orphanRange); /** * Reports whether any range still scheduled for deletion overlaps the argument range. If so, diff --git a/src/mongo/db/s/metadata_manager.cpp b/src/mongo/db/s/metadata_manager.cpp index ae20639db51..3ad381e04c4 100644 --- a/src/mongo/db/s/metadata_manager.cpp +++ b/src/mongo/db/s/metadata_manager.cpp @@ -419,8 +419,7 @@ void scheduleCleanup(executor::TaskExecutor* executor, auto MetadataManager::_pushRangeToClean(WithLock lock, ChunkRange const& range, Date_t when) -> CleanupNotification { std::list<Deletion> ranges; - auto ownedRange = ChunkRange{range.getMin().getOwned(), range.getMax().getOwned()}; - ranges.emplace_back(Deletion{std::move(ownedRange), when}); + ranges.emplace_back(ChunkRange(range.getMin().getOwned(), range.getMax().getOwned()), when); auto& notifn = ranges.back().notification; _pushListToClean(lock, std::move(ranges)); return notifn; @@ -449,9 +448,12 @@ auto MetadataManager::beginReceive(ChunkRange const& range) -> CleanupNotificati return Status{ErrorCodes::RangeOverlapConflict, "Documents in target range may still be in use on the destination shard."}; } + _addToReceiving(scopedLock, range); + log() << "Scheduling deletion of any documents in " << _nss.ns() << " range " << redact(range.toString()) << " before migrating in a chunk covering the range"; + return _pushRangeToClean(scopedLock, range, Date_t{}); } @@ -481,8 +483,9 @@ auto MetadataManager::cleanUpRange(ChunkRange const& range, Date_t whenToDelete) stdx::unique_lock<stdx::mutex> scopedLock(_managerLock); invariant(!_metadata.empty()); - auto* activeMetadata = _metadata.back().get(); - auto* overlapMetadata = _newestOverlappingMetadata(scopedLock, range); + auto* const activeMetadata = _metadata.back().get(); + auto* const overlapMetadata = _newestOverlappingMetadata(scopedLock, range); + if (overlapMetadata == activeMetadata) { return Status{ErrorCodes::RangeOverlapConflict, str::stream() << "Requested deletion range overlaps a live shard chunk"}; @@ -494,20 +497,21 @@ auto MetadataManager::cleanUpRange(ChunkRange const& range, Date_t whenToDelete) " migrated in"}; } - if (overlapMetadata == nullptr) { + if (!overlapMetadata) { // No running queries can depend on it, so queue it for deletion immediately. - auto whenStr = (whenToDelete == Date_t{}) ? "immediate"_sd : "deferred"_sd; + const auto whenStr = (whenToDelete == Date_t{}) ? "immediate"_sd : "deferred"_sd; log() << "Scheduling " << whenStr << " deletion of " << _nss.ns() << " range " << redact(range.toString()); return _pushRangeToClean(scopedLock, range, whenToDelete); } - log() << "Scheduling deletion of " << _nss.ns() << " range " << redact(range.toString()) - << " after all possibly-dependent queries finish"; + log() << "Deletion of " << _nss.ns() << " range " << redact(range.toString()) + << " will be scheduled after all possibly dependent queries finish"; + // Put it on the oldest metadata permissible; the current one might live a long time. auto& orphans = overlapMetadata->_tracker.orphans; - ChunkRange ownedRange{range.getMin().getOwned(), range.getMax().getOwned()}; - orphans.emplace_back(Deletion{std::move(ownedRange), whenToDelete}); + orphans.emplace_back( + Deletion{ChunkRange(range.getMin().getOwned(), range.getMax().getOwned()), whenToDelete}); return orphans.back().notification; } diff --git a/src/mongo/db/s/metadata_manager_test.cpp b/src/mongo/db/s/metadata_manager_test.cpp index 50f2aed8274..41f4f4bc9bd 100644 --- a/src/mongo/db/s/metadata_manager_test.cpp +++ b/src/mongo/db/s/metadata_manager_test.cpp @@ -28,8 +28,6 @@ #include "mongo/platform/basic.h" -#include "mongo/db/s/metadata_manager.h" - #include <boost/optional.hpp> #include "mongo/bson/bsonobjbuilder.h" @@ -42,6 +40,7 @@ #include "mongo/db/repl/replication_coordinator_mock.h" #include "mongo/db/s/collection_metadata.h" #include "mongo/db/s/collection_sharding_state.h" +#include "mongo/db/s/metadata_manager.h" #include "mongo/db/s/sharding_state.h" #include "mongo/db/s/type_shard_identity.h" #include "mongo/db/server_options.h" @@ -70,11 +69,6 @@ const std::string kOtherShard{"otherShard"}; const HostAndPort dummyHost("dummy", 123); class MetadataManagerTest : public ShardingMongodTestFixture { -public: - std::shared_ptr<RemoteCommandTargeterMock> configTargeter() { - return RemoteCommandTargeterMock::get(shardRegistry()->getConfigShard()->getTargeter()); - } - protected: void setUp() override { ShardingMongodTestFixture::setUp(); @@ -87,6 +81,10 @@ protected: _manager = std::make_shared<MetadataManager>(getServiceContext(), kNss, executor()); } + std::shared_ptr<RemoteCommandTargeterMock> configTargeter() const { + return RemoteCommandTargeterMock::get(shardRegistry()->getConfigShard()->getTargeter()); + } + static std::unique_ptr<CollectionMetadata> makeEmptyMetadata() { const OID epoch = OID::gen(); diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp index 797d8416b02..9efdc9c0c4c 100644 --- a/src/mongo/db/s/migration_destination_manager.cpp +++ b/src/mongo/db/s/migration_destination_manager.cpp @@ -646,15 +646,15 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx, // 2. Synchronously delete any data which might have been left orphaned in the range // being moved, and wait for completion - auto footprint = ChunkRange(min, max); + const ChunkRange footprint(min, max); auto notification = _notePending(opCtx, _nss, epoch, footprint); - // wait for the range deletion to report back + // Wait for the range deletion to report back if (!notification.waitStatus(opCtx).isOK()) { setStateFail(notification.waitStatus(opCtx).reason()); return; } - // wait for any other, overlapping queued deletions to drain + // Wait for any other, overlapping queued deletions to drain auto status = CollectionShardingState::waitForClean(opCtx, _nss, epoch, footprint); if (!status.isOK()) { setStateFail(status.reason()); @@ -1019,27 +1019,27 @@ bool MigrationDestinationManager::_flushPendingWrites(OperationContext* opCtx, return true; } -auto MigrationDestinationManager::_notePending(OperationContext* opCtx, - NamespaceString const& nss, - OID const& epoch, - ChunkRange const& range) - -> CollectionShardingState::CleanupNotification { +CollectionShardingState::CleanupNotification MigrationDestinationManager::_notePending( + OperationContext* opCtx, + NamespaceString const& nss, + OID const& epoch, + ChunkRange const& range) { AutoGetCollection autoColl(opCtx, nss, MODE_IX, MODE_X); auto css = CollectionShardingState::get(opCtx, nss); auto metadata = css->getMetadata(); - // This can currently happen because drops aren't synchronized with in-migrations. The idea - // for checking this here is that in the future we shouldn't have this problem. + // This can currently happen because drops aren't synchronized with in-migrations. The idea for + // checking this here is that in the future we shouldn't have this problem. if (!metadata || metadata->getCollVersion().epoch() != epoch) { return Status{ErrorCodes::StaleShardVersion, - str::stream() << "not noting chunk " << redact(range.toString()) + str::stream() << "not noting chunk " << range.toString() << " as pending because the epoch of " << nss.ns() << " changed"}; } - // start clearing any leftovers that would be in the new chunk + // Start clearing any leftovers that would be in the new chunk auto notification = css->beginReceive(range); if (notification.ready() && !notification.waitStatus(opCtx).isOK()) { return Status{notification.waitStatus(opCtx).code(), diff --git a/src/mongo/db/s/migration_destination_manager.h b/src/mongo/db/s/migration_destination_manager.h index 49c6f1e5089..cc59483d699 100644 --- a/src/mongo/db/s/migration_destination_manager.h +++ b/src/mongo/db/s/migration_destination_manager.h @@ -164,8 +164,10 @@ private: * it schedules deletion of any documents in the range, so that process must be seen to be * complete before migrating any new documents in. */ - auto _notePending(OperationContext*, NamespaceString const&, OID const&, ChunkRange const&) - -> CollectionShardingState::CleanupNotification; + CollectionShardingState::CleanupNotification _notePending(OperationContext*, + NamespaceString const&, + OID const&, + ChunkRange const&); /** * Stops tracking a chunk range between 'min' and 'max' that previously was having data diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp index f7c3c59f679..5ff7cdf49ad 100644 --- a/src/mongo/db/s/migration_source_manager.cpp +++ b/src/mongo/db/s/migration_source_manager.cpp @@ -510,11 +510,11 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* opC ShardingCatalogClient::kMajorityWriteConcern) .transitional_ignore(); - // Wait for the metadata update to be persisted before attempting to delete orphaned - // documents so that metadata changes propagate to secondaries first. + // Wait for the metadata update to be persisted before attempting to delete orphaned documents + // so that metadata changes propagate to secondaries first CatalogCacheLoader::get(opCtx).waitForCollectionFlush(opCtx, getNss()); - const auto range = ChunkRange(_args.getMinKey(), _args.getMaxKey()); + const ChunkRange range(_args.getMinKey(), _args.getMaxKey()); auto notification = [&] { auto const whenToClean = _args.getWaitForDelete() ? CollectionShardingState::kNow |