diff options
Diffstat (limited to 'src/mongo/db/s/collection_range_deleter.cpp')
-rw-r--r-- | src/mongo/db/s/collection_range_deleter.cpp | 258 |
1 files changed, 165 insertions, 93 deletions
diff --git a/src/mongo/db/s/collection_range_deleter.cpp b/src/mongo/db/s/collection_range_deleter.cpp index 0a267cd263d..8ffbf1f0d21 100644 --- a/src/mongo/db/s/collection_range_deleter.cpp +++ b/src/mongo/db/s/collection_range_deleter.cpp @@ -33,6 +33,7 @@ #include "mongo/db/s/collection_range_deleter.h" #include <algorithm> +#include <utility> #include "mongo/db/catalog/collection.h" #include "mongo/db/client.h" @@ -41,13 +42,16 @@ #include "mongo/db/exec/working_set_common.h" #include "mongo/db/index/index_descriptor.h" #include "mongo/db/keypattern.h" +#include "mongo/db/operation_context.h" #include "mongo/db/query/internal_plans.h" #include "mongo/db/query/query_knobs.h" #include "mongo/db/query/query_planner.h" #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/db/s/collection_sharding_state.h" +#include "mongo/db/s/metadata_manager.h" #include "mongo/db/s/sharding_state.h" +#include "mongo/db/service_context.h" #include "mongo/db/write_concern.h" #include "mongo/executor/task_executor.h" #include "mongo/util/log.h" @@ -57,7 +61,6 @@ namespace mongo { class ChunkRange; -class OldClientWriteContext; using CallbackArgs = executor::TaskExecutor::CallbackArgs; using logger::LogComponent; @@ -67,123 +70,144 @@ namespace { const WriteConcernOptions kMajorityWriteConcern(WriteConcernOptions::kMajority, WriteConcernOptions::SyncMode::UNSET, Seconds(60)); - } // unnamed namespace -CollectionRangeDeleter::CollectionRangeDeleter(NamespaceString nss) : _nss(std::move(nss)) {} - -void CollectionRangeDeleter::run() { - Client::initThread(getThreadName()); - ON_BLOCK_EXIT([&] { Client::destroy(); }); - auto opCtx = cc().makeOperationContext().get(); - - const int maxToDelete = std::max(int(internalQueryExecYieldIterations.load()), 1); - bool hasNextRangeToClean = cleanupNextRange(opCtx, maxToDelete); - - // If there are more ranges to run, we add <this> back onto the task executor to run again. - if (hasNextRangeToClean) { - auto executor = ShardingState::get(opCtx)->getRangeDeleterTaskExecutor(); - executor->scheduleWork([this](const CallbackArgs& cbArgs) { run(); }); - } else { - delete this; - } +CollectionRangeDeleter::~CollectionRangeDeleter() { + // notify anybody still sleeping on orphan ranges + clear(Status{ErrorCodes::InterruptedDueToReplStateChange, + "Collection sharding metadata destroyed"}); } -bool CollectionRangeDeleter::cleanupNextRange(OperationContext* opCtx, int maxToDelete) { - +bool CollectionRangeDeleter::cleanUpNextRange(OperationContext* opCtx, + NamespaceString const& nss, + int maxToDelete, + CollectionRangeDeleter* rangeDeleterForTestOnly) { + StatusWith<int> wrote = 0; + auto range = boost::optional<ChunkRange>(boost::none); { - AutoGetCollection autoColl(opCtx, _nss, MODE_IX); + AutoGetCollection autoColl(opCtx, nss, MODE_IX); auto* collection = autoColl.getCollection(); if (!collection) { - return false; + return false; // collection was dropped } - auto* collectionShardingState = CollectionShardingState::get(opCtx, _nss); - dassert(collectionShardingState != nullptr); // every collection gets one - auto& metadataManager = collectionShardingState->_metadataManager; + auto* css = CollectionShardingState::get(opCtx, nss); + { + auto scopedCollectionMetadata = css->getMetadata(); + if (!scopedCollectionMetadata) { + return false; // collection was unsharded + } - if (!_rangeInProgress && !metadataManager.hasRangesToClean()) { - // Nothing left to do - return false; - } + // We don't actually know if this is the same collection that we were originally + // scheduled to do deletions on, or another one with the same name. But it doesn't + // matter: if it has deletions scheduled, now is as good a time as any to do them. + auto self = rangeDeleterForTestOnly ? rangeDeleterForTestOnly + : &css->_metadataManager._rangesToClean; + { + stdx::lock_guard<stdx::mutex> scopedLock(css->_metadataManager._managerLock); + if (self->isEmpty()) + return false; + + const auto& frontRange = self->_orphans.front().range; + range.emplace(frontRange.getMin().getOwned(), frontRange.getMax().getOwned()); + } - if (!_rangeInProgress || !metadataManager.isInRangesToClean(_rangeInProgress.get())) { - // No valid chunk in progress, get a new one - if (!metadataManager.hasRangesToClean()) { - return false; + try { + auto keyPattern = scopedCollectionMetadata->getKeyPattern(); + + wrote = self->_doDeletion(opCtx, collection, keyPattern, *range, maxToDelete); + } catch (const DBException& e) { + wrote = e.toStatus(); + warning() << e.what(); } - _rangeInProgress = metadataManager.getNextRangeToClean(); - } - auto scopedCollectionMetadata = collectionShardingState->getMetadata(); - int numDocumentsDeleted = - _doDeletion(opCtx, collection, scopedCollectionMetadata->getKeyPattern(), maxToDelete); - if (numDocumentsDeleted <= 0) { - metadataManager.removeRangeToClean(_rangeInProgress.get()); - _rangeInProgress = boost::none; - return metadataManager.hasRangesToClean(); - } + if (!wrote.isOK() || wrote.getValue() == 0) { + stdx::lock_guard<stdx::mutex> scopedLock(css->_metadataManager._managerLock); + self->_pop(wrote.getStatus()); + return true; + } + } // drop scopedCollectionMetadata + } // drop autoColl + + invariant(range); + invariantOK(wrote.getStatus()); + invariant(wrote.getValue() > 0); + + log() << "Deleted " << wrote.getValue() << " documents in " << nss.ns() << " range " << *range; + + // Wait for replication outside the lock + const auto clientOpTime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); + WriteConcernResult unusedWCResult; + Status status = Status::OK(); + try { + status = waitForWriteConcern(opCtx, clientOpTime, kMajorityWriteConcern, &unusedWCResult); + } catch (const DBException& e) { + status = e.toStatus(); } - // wait for replication - WriteConcernResult wcResult; - auto currentClientOpTime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); - Status status = - waitForWriteConcern(opCtx, currentClientOpTime, kMajorityWriteConcern, &wcResult); if (!status.isOK()) { - warning() << "Error when waiting for write concern after removing chunks in " << _nss - << " : " << status.reason(); + warning() << "Error when waiting for write concern after removing " << nss << " range " + << *range << " : " << status.reason(); } return true; } -int CollectionRangeDeleter::_doDeletion(OperationContext* opCtx, - Collection* collection, - const BSONObj& keyPattern, - int maxToDelete) { - invariant(_rangeInProgress); - invariant(collection); +StatusWith<int> CollectionRangeDeleter::_doDeletion(OperationContext* opCtx, + Collection* collection, + BSONObj const& keyPattern, + ChunkRange const& range, + int maxToDelete) { + invariant(collection != nullptr); + invariant(!isEmpty()); + + auto const& nss = collection->ns(); // The IndexChunk has a keyPattern that may apply to more than one index - we need to // select the index and get the full index keyPattern here. - const IndexDescriptor* idx = - collection->getIndexCatalog()->findShardKeyPrefixedIndex(opCtx, keyPattern, false); + auto catalog = collection->getIndexCatalog(); + const IndexDescriptor* idx = catalog->findShardKeyPrefixedIndex(opCtx, keyPattern, false); if (idx == NULL) { - warning() << "Unable to find shard key index for " << keyPattern.toString() << " in " - << _nss; - return -1; + std::string msg = str::stream() << "Unable to find shard key index for " + << keyPattern.toString() << " in " << nss.ns(); + log() << msg; + return {ErrorCodes::InternalError, msg}; } - KeyPattern indexKeyPattern(idx->keyPattern().getOwned()); - // Extend bounds to match the index we found - const BSONObj& min = - Helpers::toKeyFormat(indexKeyPattern.extendRangeBound(_rangeInProgress->getMin(), false)); - const BSONObj& max = - Helpers::toKeyFormat(indexKeyPattern.extendRangeBound(_rangeInProgress->getMax(), false)); + KeyPattern indexKeyPattern(idx->keyPattern().getOwned()); + auto extend = [&](auto& key) { + return Helpers::toKeyFormat(indexKeyPattern.extendRangeBound(key, false)); + }; + const BSONObj& min = extend(range.getMin()); + const BSONObj& max = extend(range.getMax()); - LOG(1) << "begin removal of " << min << " to " << max << " in " << _nss; + LOG(1) << "begin removal of " << min << " to " << max << " in " << nss.ns(); auto indexName = idx->indexName(); - IndexDescriptor* desc = collection->getIndexCatalog()->findIndexByName(opCtx, indexName); - if (!desc) { - warning() << "shard key index with name " << indexName << " on '" << _nss - << "' was dropped"; - return -1; + IndexDescriptor* descriptor = collection->getIndexCatalog()->findIndexByName(opCtx, indexName); + if (!descriptor) { + std::string msg = str::stream() << "shard key index with name " << indexName << " on '" + << nss.ns() << "' was dropped"; + log() << msg; + return {ErrorCodes::InternalError, msg}; + } + + boost::optional<Helpers::RemoveSaver> saver; + if (serverGlobalParams.moveParanoia) { + saver.emplace("moveChunk", nss.ns(), "cleaning"); } int numDeleted = 0; do { - auto exec = InternalPlanner::indexScan(opCtx, - collection, - desc, - min, - max, - BoundInclusion::kIncludeStartKeyOnly, - PlanExecutor::NO_YIELD, - InternalPlanner::FORWARD, - InternalPlanner::IXSCAN_FETCH); + auto halfOpen = BoundInclusion::kIncludeStartKeyOnly; + auto manual = PlanExecutor::YIELD_MANUAL; + auto forward = InternalPlanner::FORWARD; + auto fetch = InternalPlanner::IXSCAN_FETCH; + + auto exec = InternalPlanner::indexScan( + opCtx, collection, descriptor, min, max, halfOpen, manual, forward, fetch); + RecordId rloc; BSONObj obj; PlanExecutor::ExecState state = exec->getNext(&obj, &rloc); @@ -193,23 +217,71 @@ int CollectionRangeDeleter::_doDeletion(OperationContext* opCtx, if (state == PlanExecutor::FAILURE || state == PlanExecutor::DEAD) { warning(LogComponent::kSharding) << PlanExecutor::statestr(state) << " - cursor error while trying to delete " << min - << " to " << max << " in " << _nss << ": " << WorkingSetCommon::toStatusString(obj) + << " to " << max << " in " << nss << ": " << WorkingSetCommon::toStatusString(obj) << ", stats: " << Explain::getWinningPlanStats(exec.get()); break; } - invariant(PlanExecutor::ADVANCED == state); - WriteUnitOfWork wuow(opCtx); - if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(opCtx, _nss)) { - warning() << "stepped down from primary while deleting chunk; orphaning data in " - << _nss << " in range [" << min << ", " << max << ")"; - break; + { + WriteUnitOfWork wuow(opCtx); + if (saver) { + saver->goingToDelete(obj); + } + collection->deleteDocument(opCtx, rloc, nullptr, true); + + wuow.commit(); } - OpDebug* const nullOpDebug = nullptr; - collection->deleteDocument(opCtx, rloc, nullOpDebug, true); - wuow.commit(); } while (++numDeleted < maxToDelete); + return numDeleted; } +auto CollectionRangeDeleter::overlaps(ChunkRange const& range) const -> DeleteNotification { + // start search with newest entries by using reverse iterators + auto it = find_if(_orphans.rbegin(), _orphans.rend(), [&](auto& cleanee) { + return bool(cleanee.range.overlapWith(range)); + }); + return it != _orphans.rend() ? it->notification : DeleteNotification(); +} + +void CollectionRangeDeleter::add(ChunkRange const& range) { + // We ignore the case of overlapping, or even equal, ranges. + // Deleting overlapping ranges is quick. + _orphans.emplace_back(Deletion{ChunkRange(range.getMin().getOwned(), range.getMax().getOwned()), + std::make_shared<Notification<Status>>()}); +} + +void CollectionRangeDeleter::append(BSONObjBuilder* builder) const { + BSONArrayBuilder arr(builder->subarrayStart("rangesToClean")); + for (auto const& entry : _orphans) { + BSONObjBuilder obj; + entry.range.append(&obj); + arr.append(obj.done()); + } + arr.done(); +} + +size_t CollectionRangeDeleter::size() const { + return _orphans.size(); +} + +bool CollectionRangeDeleter::isEmpty() const { + return _orphans.empty(); +} + +void CollectionRangeDeleter::clear(Status status) { + for (auto& range : _orphans) { + if (*(range.notification)) { + continue; // was triggered in the test driver + } + range.notification->set(status); // wake up anything still waiting + } + _orphans.clear(); +} + +void CollectionRangeDeleter::_pop(Status result) { + _orphans.front().notification->set(result); // wake up waitForClean + _orphans.pop_front(); +} + } // namespace mongo |