/** * Copyright (C) 2016 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding #include "mongo/platform/basic.h" #include "mongo/db/s/collection_range_deleter.h" #include #include "mongo/db/catalog/collection.h" #include "mongo/db/client.h" #include "mongo/db/db_raii.h" #include "mongo/db/dbhelpers.h" #include "mongo/db/exec/working_set_common.h" #include "mongo/db/index/index_descriptor.h" #include "mongo/db/keypattern.h" #include "mongo/db/query/internal_plans.h" #include "mongo/db/query/query_knobs.h" #include "mongo/db/query/query_planner.h" #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/s/sharding_state.h" #include "mongo/db/write_concern.h" #include "mongo/executor/task_executor.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" #include "mongo/util/scopeguard.h" namespace mongo { class ChunkRange; class OldClientWriteContext; using CallbackArgs = executor::TaskExecutor::CallbackArgs; using logger::LogComponent; namespace { const WriteConcernOptions kMajorityWriteConcern(WriteConcernOptions::kMajority, WriteConcernOptions::SyncMode::UNSET, Seconds(60)); } // unnamed namespace CollectionRangeDeleter::CollectionRangeDeleter(NamespaceString nss) : _nss(std::move(nss)) {} void CollectionRangeDeleter::run() { Client::initThread(getThreadName().c_str()); ON_BLOCK_EXIT([&] { Client::destroy(); }); auto opCtx = cc().makeOperationContext().get(); const int maxToDelete = std::max(int(internalQueryExecYieldIterations.load()), 1); bool hasNextRangeToClean = cleanupNextRange(opCtx, maxToDelete); // If there are more ranges to run, we add back onto the task executor to run again. if (hasNextRangeToClean) { auto executor = ShardingState::get(opCtx)->getRangeDeleterTaskExecutor(); executor->scheduleWork([this](const CallbackArgs& cbArgs) { run(); }); } else { delete this; } } bool CollectionRangeDeleter::cleanupNextRange(OperationContext* opCtx, int maxToDelete) { { AutoGetCollection autoColl(opCtx, _nss, MODE_IX); auto* collection = autoColl.getCollection(); if (!collection) { return false; } auto* collectionShardingState = CollectionShardingState::get(opCtx, _nss); dassert(collectionShardingState != nullptr); // every collection gets one auto& metadataManager = collectionShardingState->_metadataManager; if (!_rangeInProgress && !metadataManager.hasRangesToClean()) { // Nothing left to do return false; } if (!_rangeInProgress || !metadataManager.isInRangesToClean(_rangeInProgress.get())) { // No valid chunk in progress, get a new one if (!metadataManager.hasRangesToClean()) { return false; } _rangeInProgress = metadataManager.getNextRangeToClean(); } auto scopedCollectionMetadata = collectionShardingState->getMetadata(); int numDocumentsDeleted = _doDeletion(opCtx, collection, scopedCollectionMetadata->getKeyPattern(), maxToDelete); if (numDocumentsDeleted <= 0) { metadataManager.removeRangeToClean(_rangeInProgress.get()); _rangeInProgress = boost::none; return metadataManager.hasRangesToClean(); } } // wait for replication WriteConcernResult wcResult; auto currentClientOpTime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); Status status = waitForWriteConcern(opCtx, currentClientOpTime, kMajorityWriteConcern, &wcResult); if (!status.isOK()) { warning() << "Error when waiting for write concern after removing chunks in " << _nss << " : " << status.reason(); } return true; } int CollectionRangeDeleter::_doDeletion(OperationContext* opCtx, Collection* collection, const BSONObj& keyPattern, int maxToDelete) { invariant(_rangeInProgress); invariant(collection); // The IndexChunk has a keyPattern that may apply to more than one index - we need to // select the index and get the full index keyPattern here. const IndexDescriptor* idx = collection->getIndexCatalog()->findShardKeyPrefixedIndex(opCtx, keyPattern, false); if (idx == NULL) { warning() << "Unable to find shard key index for " << keyPattern.toString() << " in " << _nss; return -1; } KeyPattern indexKeyPattern(idx->keyPattern().getOwned()); // Extend bounds to match the index we found const BSONObj& min = Helpers::toKeyFormat(indexKeyPattern.extendRangeBound(_rangeInProgress->getMin(), false)); const BSONObj& max = Helpers::toKeyFormat(indexKeyPattern.extendRangeBound(_rangeInProgress->getMax(), false)); LOG(1) << "begin removal of " << min << " to " << max << " in " << _nss; auto indexName = idx->indexName(); IndexDescriptor* desc = collection->getIndexCatalog()->findIndexByName(opCtx, indexName); if (!desc) { warning() << "shard key index with name " << indexName << " on '" << _nss << "' was dropped"; return -1; } int numDeleted = 0; do { auto exec = InternalPlanner::indexScan(opCtx, collection, desc, min, max, BoundInclusion::kIncludeStartKeyOnly, PlanExecutor::YIELD_MANUAL, InternalPlanner::FORWARD, InternalPlanner::IXSCAN_FETCH); RecordId rloc; BSONObj obj; PlanExecutor::ExecState state = exec->getNext(&obj, &rloc); if (state == PlanExecutor::IS_EOF) { break; } if (state == PlanExecutor::FAILURE || state == PlanExecutor::DEAD) { warning(LogComponent::kSharding) << PlanExecutor::statestr(state) << " - cursor error while trying to delete " << min << " to " << max << " in " << _nss << ": " << WorkingSetCommon::toStatusString(obj) << ", stats: " << Explain::getWinningPlanStats(exec.get()); break; } invariant(PlanExecutor::ADVANCED == state); WriteUnitOfWork wuow(opCtx); if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(opCtx, _nss)) { warning() << "stepped down from primary while deleting chunk; orphaning data in " << _nss << " in range [" << min << ", " << max << ")"; break; } OpDebug* const nullOpDebug = nullptr; collection->deleteDocument(opCtx, rloc, nullOpDebug, true); wuow.commit(); } while (++numDeleted < maxToDelete); return numDeleted; } } // namespace mongo