summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKshitij Gupta <kshitij.gupta@mongodb.com>2021-09-21 21:45:35 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-09-21 22:32:57 +0000
commit6f7a69e8d5aef3d924a639c4f8625bc259334ffb (patch)
treebff545636896605edac400fc8668f6bbcb0e1bb2
parent4a43e817011e7cde84c2dce87619d81921f19c9a (diff)
downloadmongo-6f7a69e8d5aef3d924a639c4f8625bc259334ffb.tar.gz
SERVER-59329: Make sure that withTemporaryOperationContext throw an error if the node is no longer a primary
-rw-r--r--src/mongo/db/s/range_deletion_util.cpp164
1 files changed, 92 insertions, 72 deletions
diff --git a/src/mongo/db/s/range_deletion_util.cpp b/src/mongo/db/s/range_deletion_util.cpp
index 5ad24b06cef..4a0e7dc968e 100644
--- a/src/mongo/db/s/range_deletion_util.cpp
+++ b/src/mongo/db/s/range_deletion_util.cpp
@@ -242,7 +242,7 @@ StatusWith<int> deleteNextBatch(OperationContext* opCtx,
template <typename Callable>
-auto withTemporaryOperationContext(Callable&& callable) {
+auto withTemporaryOperationContext(Callable&& callable, const NamespaceString& nss) {
ThreadClient tc(migrationutil::kRangeDeletionThreadName, getGlobalServiceContext());
{
stdx::lock_guard<Client> lk(*tc.get());
@@ -255,6 +255,16 @@ auto withTemporaryOperationContext(Callable&& callable) {
opCtx->setAlwaysInterruptAtStepDownOrUp();
invariant(opCtx->shouldAlwaysInterruptAtStepDownOrUp());
+ {
+ auto replCoord = repl::ReplicationCoordinator::get(opCtx);
+ Lock::GlobalLock lock(opCtx, MODE_IX);
+ uassert(ErrorCodes::PrimarySteppedDown,
+ str::stream() << "Not primary while running range deletion task for collection"
+ << nss,
+ replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet &&
+ replCoord->canAcceptWritesFor(opCtx, nss));
+ }
+
return callable(opCtx);
}
@@ -297,48 +307,51 @@ ExecutorFuture<void> deleteRangeInBatches(const std::shared_ptr<executor::TaskEx
int numDocsToRemovePerBatch,
Milliseconds delayBetweenBatches) {
return AsyncTry([=] {
- return withTemporaryOperationContext([=](OperationContext* opCtx) {
- LOGV2_DEBUG(5346200,
- 1,
- "Starting batch deletion",
- "namespace"_attr = nss,
- "range"_attr = redact(range.toString()),
- "numDocsToRemovePerBatch"_attr = numDocsToRemovePerBatch,
- "delayBetweenBatches"_attr = delayBetweenBatches);
-
- if (migrationId) {
- ensureRangeDeletionTaskStillExists(opCtx, *migrationId);
- }
-
- AutoGetCollection collection(opCtx, nss, MODE_IX);
-
- // Ensure the collection exists and has not been dropped or dropped and
- // recreated.
- uassert(
- ErrorCodes::RangeDeletionAbandonedBecauseCollectionWithUUIDDoesNotExist,
- "Collection has been dropped since enqueuing this range "
- "deletion task. No need to delete documents.",
- !collectionUuidHasChanged(nss, collection.getCollection(), collectionUuid));
-
- auto numDeleted = uassertStatusOK(deleteNextBatch(opCtx,
- collection.getCollection(),
- keyPattern,
- range,
- numDocsToRemovePerBatch));
-
- LOGV2_DEBUG(
- 23769,
- 1,
- "Deleted {numDeleted} documents in pass in namespace {namespace} with "
- "UUID {collectionUUID} for range {range}",
- "Deleted documents in pass",
- "numDeleted"_attr = numDeleted,
- "namespace"_attr = nss.ns(),
- "collectionUUID"_attr = collectionUuid,
- "range"_attr = range.toString());
-
- return numDeleted;
- });
+ return withTemporaryOperationContext(
+ [=](OperationContext* opCtx) {
+ LOGV2_DEBUG(5346200,
+ 1,
+ "Starting batch deletion",
+ "namespace"_attr = nss,
+ "range"_attr = redact(range.toString()),
+ "numDocsToRemovePerBatch"_attr = numDocsToRemovePerBatch,
+ "delayBetweenBatches"_attr = delayBetweenBatches);
+
+ if (migrationId) {
+ ensureRangeDeletionTaskStillExists(opCtx, *migrationId);
+ }
+
+ AutoGetCollection collection(opCtx, nss, MODE_IX);
+
+ // Ensure the collection exists and has not been dropped or dropped and
+ // recreated.
+ uassert(
+ ErrorCodes::RangeDeletionAbandonedBecauseCollectionWithUUIDDoesNotExist,
+ "Collection has been dropped since enqueuing this range "
+ "deletion task. No need to delete documents.",
+ !collectionUuidHasChanged(
+ nss, collection.getCollection(), collectionUuid));
+
+ auto numDeleted = uassertStatusOK(deleteNextBatch(opCtx,
+ collection.getCollection(),
+ keyPattern,
+ range,
+ numDocsToRemovePerBatch));
+
+ LOGV2_DEBUG(
+ 23769,
+ 1,
+ "Deleted {numDeleted} documents in pass in namespace {namespace} with "
+ "UUID {collectionUUID} for range {range}",
+ "Deleted documents in pass",
+ "numDeleted"_attr = numDeleted,
+ "namespace"_attr = nss.ns(),
+ "collectionUUID"_attr = collectionUuid,
+ "range"_attr = range.toString());
+
+ return numDeleted;
+ },
+ nss);
})
.until([](StatusWith<int> swNumDeleted) {
// Continue iterating until there are no more documents to delete, retrying on
@@ -365,23 +378,28 @@ ExecutorFuture<void> deleteRangeInBatches(const std::shared_ptr<executor::TaskEx
void notifySecondariesThatDeletionIsOccurring(const NamespaceString& nss,
const UUID& collectionUuid,
const ChunkRange& range) {
- withTemporaryOperationContext([&](OperationContext* opCtx) {
- AutoGetCollection autoAdmin(opCtx, NamespaceString::kServerConfigurationNamespace, MODE_IX);
- Helpers::upsert(opCtx,
- NamespaceString::kServerConfigurationNamespace.ns(),
- BSON("_id"
- << "startRangeDeletion"
- << "ns" << nss.ns() << "uuid" << collectionUuid << "min"
- << range.getMin() << "max" << range.getMax()));
- });
+ withTemporaryOperationContext(
+ [&](OperationContext* opCtx) {
+ AutoGetCollection autoAdmin(
+ opCtx, NamespaceString::kServerConfigurationNamespace, MODE_IX);
+ Helpers::upsert(opCtx,
+ NamespaceString::kServerConfigurationNamespace.ns(),
+ BSON("_id"
+ << "startRangeDeletion"
+ << "ns" << nss.ns() << "uuid" << collectionUuid << "min"
+ << range.getMin() << "max" << range.getMax()));
+ },
+ nss);
}
void removePersistentRangeDeletionTask(const NamespaceString& nss, UUID migrationId) {
- withTemporaryOperationContext([&](OperationContext* opCtx) {
- PersistentTaskStore<RangeDeletionTask> store(NamespaceString::kRangeDeletionNamespace);
+ withTemporaryOperationContext(
+ [&](OperationContext* opCtx) {
+ PersistentTaskStore<RangeDeletionTask> store(NamespaceString::kRangeDeletionNamespace);
- store.remove(opCtx, BSON(RangeDeletionTask::kIdFieldName << migrationId));
- });
+ store.remove(opCtx, BSON(RangeDeletionTask::kIdFieldName << migrationId));
+ },
+ nss);
}
ExecutorFuture<void> waitForDeletionsToMajorityReplicate(
@@ -389,23 +407,25 @@ ExecutorFuture<void> waitForDeletionsToMajorityReplicate(
const NamespaceString& nss,
const UUID& collectionUuid,
const ChunkRange& range) {
- return withTemporaryOperationContext([=](OperationContext* opCtx) {
- repl::ReplClientInfo::forClient(opCtx->getClient()).setLastOpToSystemLastOpTime(opCtx);
- auto clientOpTime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
+ return withTemporaryOperationContext(
+ [=](OperationContext* opCtx) {
+ repl::ReplClientInfo::forClient(opCtx->getClient()).setLastOpToSystemLastOpTime(opCtx);
+ auto clientOpTime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
- LOGV2_DEBUG(5346202,
- 1,
- "Waiting for majority replication of local deletions",
- "namespace"_attr = nss.ns(),
- "collectionUUID"_attr = collectionUuid,
- "range"_attr = redact(range.toString()),
- "clientOpTime"_attr = clientOpTime);
-
- // Asynchronously wait for majority write concern.
- return WaitForMajorityService::get(opCtx->getServiceContext())
- .waitUntilMajority(clientOpTime, CancellationToken::uncancelable())
- .thenRunOn(executor);
- });
+ LOGV2_DEBUG(5346202,
+ 1,
+ "Waiting for majority replication of local deletions",
+ "namespace"_attr = nss.ns(),
+ "collectionUUID"_attr = collectionUuid,
+ "range"_attr = redact(range.toString()),
+ "clientOpTime"_attr = clientOpTime);
+
+ // Asynchronously wait for majority write concern.
+ return WaitForMajorityService::get(opCtx->getServiceContext())
+ .waitUntilMajority(clientOpTime, CancellationToken::uncancelable())
+ .thenRunOn(executor);
+ },
+ nss);
}
std::vector<RangeDeletionTask> getPersistentRangeDeletionTasks(OperationContext* opCtx,