summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/collection_range_deleter.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/s/collection_range_deleter.cpp')
-rw-r--r--src/mongo/db/s/collection_range_deleter.cpp258
1 files changed, 165 insertions, 93 deletions
diff --git a/src/mongo/db/s/collection_range_deleter.cpp b/src/mongo/db/s/collection_range_deleter.cpp
index 0a267cd263d..8ffbf1f0d21 100644
--- a/src/mongo/db/s/collection_range_deleter.cpp
+++ b/src/mongo/db/s/collection_range_deleter.cpp
@@ -33,6 +33,7 @@
#include "mongo/db/s/collection_range_deleter.h"
#include <algorithm>
+#include <utility>
#include "mongo/db/catalog/collection.h"
#include "mongo/db/client.h"
@@ -41,13 +42,16 @@
#include "mongo/db/exec/working_set_common.h"
#include "mongo/db/index/index_descriptor.h"
#include "mongo/db/keypattern.h"
+#include "mongo/db/operation_context.h"
#include "mongo/db/query/internal_plans.h"
#include "mongo/db/query/query_knobs.h"
#include "mongo/db/query/query_planner.h"
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/repl/replication_coordinator_global.h"
#include "mongo/db/s/collection_sharding_state.h"
+#include "mongo/db/s/metadata_manager.h"
#include "mongo/db/s/sharding_state.h"
+#include "mongo/db/service_context.h"
#include "mongo/db/write_concern.h"
#include "mongo/executor/task_executor.h"
#include "mongo/util/log.h"
@@ -57,7 +61,6 @@
namespace mongo {
class ChunkRange;
-class OldClientWriteContext;
using CallbackArgs = executor::TaskExecutor::CallbackArgs;
using logger::LogComponent;
@@ -67,123 +70,144 @@ namespace {
const WriteConcernOptions kMajorityWriteConcern(WriteConcernOptions::kMajority,
WriteConcernOptions::SyncMode::UNSET,
Seconds(60));
-
} // unnamed namespace
-CollectionRangeDeleter::CollectionRangeDeleter(NamespaceString nss) : _nss(std::move(nss)) {}
-
-void CollectionRangeDeleter::run() {
- Client::initThread(getThreadName());
- ON_BLOCK_EXIT([&] { Client::destroy(); });
- auto opCtx = cc().makeOperationContext().get();
-
- const int maxToDelete = std::max(int(internalQueryExecYieldIterations.load()), 1);
- bool hasNextRangeToClean = cleanupNextRange(opCtx, maxToDelete);
-
- // If there are more ranges to run, we add <this> back onto the task executor to run again.
- if (hasNextRangeToClean) {
- auto executor = ShardingState::get(opCtx)->getRangeDeleterTaskExecutor();
- executor->scheduleWork([this](const CallbackArgs& cbArgs) { run(); });
- } else {
- delete this;
- }
+CollectionRangeDeleter::~CollectionRangeDeleter() {
+ // notify anybody still sleeping on orphan ranges
+ clear(Status{ErrorCodes::InterruptedDueToReplStateChange,
+ "Collection sharding metadata destroyed"});
}
-bool CollectionRangeDeleter::cleanupNextRange(OperationContext* opCtx, int maxToDelete) {
-
+bool CollectionRangeDeleter::cleanUpNextRange(OperationContext* opCtx,
+ NamespaceString const& nss,
+ int maxToDelete,
+ CollectionRangeDeleter* rangeDeleterForTestOnly) {
+ StatusWith<int> wrote = 0;
+ auto range = boost::optional<ChunkRange>(boost::none);
{
- AutoGetCollection autoColl(opCtx, _nss, MODE_IX);
+ AutoGetCollection autoColl(opCtx, nss, MODE_IX);
auto* collection = autoColl.getCollection();
if (!collection) {
- return false;
+ return false; // collection was dropped
}
- auto* collectionShardingState = CollectionShardingState::get(opCtx, _nss);
- dassert(collectionShardingState != nullptr); // every collection gets one
- auto& metadataManager = collectionShardingState->_metadataManager;
+ auto* css = CollectionShardingState::get(opCtx, nss);
+ {
+ auto scopedCollectionMetadata = css->getMetadata();
+ if (!scopedCollectionMetadata) {
+ return false; // collection was unsharded
+ }
- if (!_rangeInProgress && !metadataManager.hasRangesToClean()) {
- // Nothing left to do
- return false;
- }
+ // We don't actually know if this is the same collection that we were originally
+ // scheduled to do deletions on, or another one with the same name. But it doesn't
+ // matter: if it has deletions scheduled, now is as good a time as any to do them.
+ auto self = rangeDeleterForTestOnly ? rangeDeleterForTestOnly
+ : &css->_metadataManager._rangesToClean;
+ {
+ stdx::lock_guard<stdx::mutex> scopedLock(css->_metadataManager._managerLock);
+ if (self->isEmpty())
+ return false;
+
+ const auto& frontRange = self->_orphans.front().range;
+ range.emplace(frontRange.getMin().getOwned(), frontRange.getMax().getOwned());
+ }
- if (!_rangeInProgress || !metadataManager.isInRangesToClean(_rangeInProgress.get())) {
- // No valid chunk in progress, get a new one
- if (!metadataManager.hasRangesToClean()) {
- return false;
+ try {
+ auto keyPattern = scopedCollectionMetadata->getKeyPattern();
+
+ wrote = self->_doDeletion(opCtx, collection, keyPattern, *range, maxToDelete);
+ } catch (const DBException& e) {
+ wrote = e.toStatus();
+ warning() << e.what();
}
- _rangeInProgress = metadataManager.getNextRangeToClean();
- }
- auto scopedCollectionMetadata = collectionShardingState->getMetadata();
- int numDocumentsDeleted =
- _doDeletion(opCtx, collection, scopedCollectionMetadata->getKeyPattern(), maxToDelete);
- if (numDocumentsDeleted <= 0) {
- metadataManager.removeRangeToClean(_rangeInProgress.get());
- _rangeInProgress = boost::none;
- return metadataManager.hasRangesToClean();
- }
+ if (!wrote.isOK() || wrote.getValue() == 0) {
+ stdx::lock_guard<stdx::mutex> scopedLock(css->_metadataManager._managerLock);
+ self->_pop(wrote.getStatus());
+ return true;
+ }
+ } // drop scopedCollectionMetadata
+ } // drop autoColl
+
+ invariant(range);
+ invariantOK(wrote.getStatus());
+ invariant(wrote.getValue() > 0);
+
+ log() << "Deleted " << wrote.getValue() << " documents in " << nss.ns() << " range " << *range;
+
+ // Wait for replication outside the lock
+ const auto clientOpTime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
+ WriteConcernResult unusedWCResult;
+ Status status = Status::OK();
+ try {
+ status = waitForWriteConcern(opCtx, clientOpTime, kMajorityWriteConcern, &unusedWCResult);
+ } catch (const DBException& e) {
+ status = e.toStatus();
}
- // wait for replication
- WriteConcernResult wcResult;
- auto currentClientOpTime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
- Status status =
- waitForWriteConcern(opCtx, currentClientOpTime, kMajorityWriteConcern, &wcResult);
if (!status.isOK()) {
- warning() << "Error when waiting for write concern after removing chunks in " << _nss
- << " : " << status.reason();
+ warning() << "Error when waiting for write concern after removing " << nss << " range "
+ << *range << " : " << status.reason();
}
return true;
}
-int CollectionRangeDeleter::_doDeletion(OperationContext* opCtx,
- Collection* collection,
- const BSONObj& keyPattern,
- int maxToDelete) {
- invariant(_rangeInProgress);
- invariant(collection);
+StatusWith<int> CollectionRangeDeleter::_doDeletion(OperationContext* opCtx,
+ Collection* collection,
+ BSONObj const& keyPattern,
+ ChunkRange const& range,
+ int maxToDelete) {
+ invariant(collection != nullptr);
+ invariant(!isEmpty());
+
+ auto const& nss = collection->ns();
// The IndexChunk has a keyPattern that may apply to more than one index - we need to
// select the index and get the full index keyPattern here.
- const IndexDescriptor* idx =
- collection->getIndexCatalog()->findShardKeyPrefixedIndex(opCtx, keyPattern, false);
+ auto catalog = collection->getIndexCatalog();
+ const IndexDescriptor* idx = catalog->findShardKeyPrefixedIndex(opCtx, keyPattern, false);
if (idx == NULL) {
- warning() << "Unable to find shard key index for " << keyPattern.toString() << " in "
- << _nss;
- return -1;
+ std::string msg = str::stream() << "Unable to find shard key index for "
+ << keyPattern.toString() << " in " << nss.ns();
+ log() << msg;
+ return {ErrorCodes::InternalError, msg};
}
- KeyPattern indexKeyPattern(idx->keyPattern().getOwned());
-
// Extend bounds to match the index we found
- const BSONObj& min =
- Helpers::toKeyFormat(indexKeyPattern.extendRangeBound(_rangeInProgress->getMin(), false));
- const BSONObj& max =
- Helpers::toKeyFormat(indexKeyPattern.extendRangeBound(_rangeInProgress->getMax(), false));
+ KeyPattern indexKeyPattern(idx->keyPattern().getOwned());
+ auto extend = [&](auto& key) {
+ return Helpers::toKeyFormat(indexKeyPattern.extendRangeBound(key, false));
+ };
+ const BSONObj& min = extend(range.getMin());
+ const BSONObj& max = extend(range.getMax());
- LOG(1) << "begin removal of " << min << " to " << max << " in " << _nss;
+ LOG(1) << "begin removal of " << min << " to " << max << " in " << nss.ns();
auto indexName = idx->indexName();
- IndexDescriptor* desc = collection->getIndexCatalog()->findIndexByName(opCtx, indexName);
- if (!desc) {
- warning() << "shard key index with name " << indexName << " on '" << _nss
- << "' was dropped";
- return -1;
+ IndexDescriptor* descriptor = collection->getIndexCatalog()->findIndexByName(opCtx, indexName);
+ if (!descriptor) {
+ std::string msg = str::stream() << "shard key index with name " << indexName << " on '"
+ << nss.ns() << "' was dropped";
+ log() << msg;
+ return {ErrorCodes::InternalError, msg};
+ }
+
+ boost::optional<Helpers::RemoveSaver> saver;
+ if (serverGlobalParams.moveParanoia) {
+ saver.emplace("moveChunk", nss.ns(), "cleaning");
}
int numDeleted = 0;
do {
- auto exec = InternalPlanner::indexScan(opCtx,
- collection,
- desc,
- min,
- max,
- BoundInclusion::kIncludeStartKeyOnly,
- PlanExecutor::NO_YIELD,
- InternalPlanner::FORWARD,
- InternalPlanner::IXSCAN_FETCH);
+ auto halfOpen = BoundInclusion::kIncludeStartKeyOnly;
+ auto manual = PlanExecutor::YIELD_MANUAL;
+ auto forward = InternalPlanner::FORWARD;
+ auto fetch = InternalPlanner::IXSCAN_FETCH;
+
+ auto exec = InternalPlanner::indexScan(
+ opCtx, collection, descriptor, min, max, halfOpen, manual, forward, fetch);
+
RecordId rloc;
BSONObj obj;
PlanExecutor::ExecState state = exec->getNext(&obj, &rloc);
@@ -193,23 +217,71 @@ int CollectionRangeDeleter::_doDeletion(OperationContext* opCtx,
if (state == PlanExecutor::FAILURE || state == PlanExecutor::DEAD) {
warning(LogComponent::kSharding)
<< PlanExecutor::statestr(state) << " - cursor error while trying to delete " << min
- << " to " << max << " in " << _nss << ": " << WorkingSetCommon::toStatusString(obj)
+ << " to " << max << " in " << nss << ": " << WorkingSetCommon::toStatusString(obj)
<< ", stats: " << Explain::getWinningPlanStats(exec.get());
break;
}
-
invariant(PlanExecutor::ADVANCED == state);
- WriteUnitOfWork wuow(opCtx);
- if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(opCtx, _nss)) {
- warning() << "stepped down from primary while deleting chunk; orphaning data in "
- << _nss << " in range [" << min << ", " << max << ")";
- break;
+ {
+ WriteUnitOfWork wuow(opCtx);
+ if (saver) {
+ saver->goingToDelete(obj);
+ }
+ collection->deleteDocument(opCtx, rloc, nullptr, true);
+
+ wuow.commit();
}
- OpDebug* const nullOpDebug = nullptr;
- collection->deleteDocument(opCtx, rloc, nullOpDebug, true);
- wuow.commit();
} while (++numDeleted < maxToDelete);
+
return numDeleted;
}
+auto CollectionRangeDeleter::overlaps(ChunkRange const& range) const -> DeleteNotification {
+ // start search with newest entries by using reverse iterators
+ auto it = find_if(_orphans.rbegin(), _orphans.rend(), [&](auto& cleanee) {
+ return bool(cleanee.range.overlapWith(range));
+ });
+ return it != _orphans.rend() ? it->notification : DeleteNotification();
+}
+
+void CollectionRangeDeleter::add(ChunkRange const& range) {
+ // We ignore the case of overlapping, or even equal, ranges.
+ // Deleting overlapping ranges is quick.
+ _orphans.emplace_back(Deletion{ChunkRange(range.getMin().getOwned(), range.getMax().getOwned()),
+ std::make_shared<Notification<Status>>()});
+}
+
+void CollectionRangeDeleter::append(BSONObjBuilder* builder) const {
+ BSONArrayBuilder arr(builder->subarrayStart("rangesToClean"));
+ for (auto const& entry : _orphans) {
+ BSONObjBuilder obj;
+ entry.range.append(&obj);
+ arr.append(obj.done());
+ }
+ arr.done();
+}
+
+size_t CollectionRangeDeleter::size() const {
+ return _orphans.size();
+}
+
+bool CollectionRangeDeleter::isEmpty() const {
+ return _orphans.empty();
+}
+
+void CollectionRangeDeleter::clear(Status status) {
+ for (auto& range : _orphans) {
+ if (*(range.notification)) {
+ continue; // was triggered in the test driver
+ }
+ range.notification->set(status); // wake up anything still waiting
+ }
+ _orphans.clear();
+}
+
+void CollectionRangeDeleter::_pop(Status result) {
+ _orphans.front().notification->set(result); // wake up waitForClean
+ _orphans.pop_front();
+}
+
} // namespace mongo