summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorjannaerin <golden.janna@gmail.com>2021-04-08 19:44:08 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-04-09 17:03:39 +0000
commit407665ef3b37a012a9cf3859491b1afa891b3c96 (patch)
treee2dca1cc0065c9c7fcfbc8934f75d447dca1336a
parent0f35984965301f2327082aff72ff93383313557d (diff)
downloadmongo-407665ef3b37a012a9cf3859491b1afa891b3c96.tar.gz
SERVER-55325 Integrate CancelableOperationContext into resharding's DonorStateMachine
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_service.cpp30
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_service.h2
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_service.cpp60
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_service.h7
4 files changed, 57 insertions, 42 deletions
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
index 7c30bf61faf..e3f9562fed0 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
@@ -973,13 +973,13 @@ ReshardingCoordinatorService::ReshardingCoordinator::_runUntilReadyToPersistDeci
.then([this, executor] { _tellAllDonorsToRefresh(executor); })
.then([this, executor] { return _awaitAllRecipientsInStrictConsistency(executor); })
.onCompletion([this](auto passthroughFuture) {
- _factory.emplace(_ctHolder->getStepdownToken(), _markKilledExecutor);
+ _cancelableOpCtxFactory.emplace(_ctHolder->getStepdownToken(), _markKilledExecutor);
return passthroughFuture;
})
.onError([this, self = shared_from_this(), executor](
Status status) -> StatusWith<ReshardingCoordinatorDocument> {
{
- auto opCtx = _factory->makeOperationContext(&cc());
+ auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
reshardingPauseCoordinatorBeforeStartingErrorFlow.pauseWhileSet(opCtx.get());
}
@@ -1017,7 +1017,7 @@ ReshardingCoordinatorService::ReshardingCoordinator::_persistDecisionAndFinishRe
})
.onError([this, self = shared_from_this(), executor](Status status) {
{
- auto opCtx = _factory->makeOperationContext(&cc());
+ auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
reshardingPauseCoordinatorBeforeStartingErrorFlow.pauseWhileSet(opCtx.get());
}
@@ -1035,7 +1035,7 @@ SemiFuture<void> ReshardingCoordinatorService::ReshardingCoordinator::run(
const CancellationToken& stepdownToken) noexcept {
_ctHolder = std::make_unique<CoordinatorCancellationTokenHolder>(stepdownToken);
_markKilledExecutor->startup();
- _factory.emplace(_ctHolder->getAbortToken(), _markKilledExecutor);
+ _cancelableOpCtxFactory.emplace(_ctHolder->getAbortToken(), _markKilledExecutor);
return _runUntilReadyToPersistDecision(executor)
.then([this, self = shared_from_this(), executor](
@@ -1051,7 +1051,7 @@ SemiFuture<void> ReshardingCoordinatorService::ReshardingCoordinator::run(
markCompleted(status);
}
- auto opCtx = _factory->makeOperationContext(&cc());
+ auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
reshardingPauseCoordinatorBeforeCompletion.pauseWhileSetAndNotCanceled(
opCtx.get(), _ctHolder->getStepdownToken());
@@ -1147,7 +1147,7 @@ void ReshardingCoordinatorService::ReshardingCoordinator::_insertCoordDocAndChan
return;
}
- auto opCtx = _factory->makeOperationContext(&cc());
+ auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
ReshardingCoordinatorDocument updatedCoordinatorDoc = _coordinatorDoc;
updatedCoordinatorDoc.setState(CoordinatorStateEnum::kInitializing);
@@ -1164,7 +1164,7 @@ void ReshardingCoordinatorService::ReshardingCoordinator::
return;
}
- auto opCtx = _factory->makeOperationContext(&cc());
+ auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
ReshardingCoordinatorDocument updatedCoordinatorDoc = _coordinatorDoc;
auto shardsAndChunks =
@@ -1254,7 +1254,7 @@ ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllDonorsReadyToDonat
.thenRunOn(**executor)
.then([this](ReshardingCoordinatorDocument coordinatorDocChangedOnDisk) {
{
- auto opCtx = _factory->makeOperationContext(&cc());
+ auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
reshardingPauseCoordinatorBeforeCloning.pauseWhileSetAndNotCanceled(
opCtx.get(), _ctHolder->getAbortToken());
}
@@ -1299,7 +1299,7 @@ ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllRecipientsFinished
.thenRunOn(**executor)
.then([this, executor](ReshardingCoordinatorDocument coordinatorDocChangedOnDisk) {
{
- auto opCtx = _factory->makeOperationContext(&cc());
+ auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
reshardingPauseCoordinatorInSteadyState.pauseWhileSetAndNotCanceled(
opCtx.get(), _ctHolder->getAbortToken());
}
@@ -1364,7 +1364,7 @@ Future<void> ReshardingCoordinatorService::ReshardingCoordinator::_persistDecisi
ReshardingCoordinatorDocument updatedCoordinatorDoc = coordinatorDoc;
updatedCoordinatorDoc.setState(CoordinatorStateEnum::kDecisionPersisted);
- auto opCtx = _factory->makeOperationContext(&cc());
+ auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
reshardingPauseCoordinatorBeforeDecisionPersisted.pauseWhileSetAndNotCanceled(
opCtx.get(), _ctHolder->getAbortToken());
@@ -1406,7 +1406,7 @@ ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllParticipantShardsD
_ctHolder->getStepdownToken())
.thenRunOn(**executor)
.then([this, executor](const auto& coordinatorDocsChangedOnDisk) {
- auto opCtx = _factory->makeOperationContext(&cc());
+ auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
resharding::removeCoordinatorDocAndReshardingFields(opCtx.get(),
coordinatorDocsChangedOnDisk[1]);
});
@@ -1426,7 +1426,7 @@ void ReshardingCoordinatorService::ReshardingCoordinator::
emplaceApproxBytesToCopyIfExists(updatedCoordinatorDoc, std::move(approxCopySize));
emplaceAbortReasonIfExists(updatedCoordinatorDoc, abortReason);
- auto opCtx = _factory->makeOperationContext(&cc());
+ auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
resharding::writeStateTransitionAndCatalogUpdatesThenBumpShardVersions(opCtx.get(),
updatedCoordinatorDoc);
@@ -1436,7 +1436,7 @@ void ReshardingCoordinatorService::ReshardingCoordinator::
void ReshardingCoordinatorService::ReshardingCoordinator::_tellAllRecipientsToRefresh(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor) {
- auto opCtx = _factory->makeOperationContext(&cc());
+ auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
auto recipientIds = extractShardIdsFromParticipantEntries(_coordinatorDoc.getRecipientShards());
NamespaceString nssToRefresh;
@@ -1461,7 +1461,7 @@ void ReshardingCoordinatorService::ReshardingCoordinator::_tellAllRecipientsToRe
void ReshardingCoordinatorService::ReshardingCoordinator::_tellAllDonorsToRefresh(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor) {
- auto opCtx = _factory->makeOperationContext(&cc());
+ auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
auto donorIds = extractShardIdsFromParticipantEntries(_coordinatorDoc.getDonorShards());
auto refreshCmd = createFlushReshardingStateChangeCommand(_coordinatorDoc.getSourceNss());
@@ -1474,7 +1474,7 @@ void ReshardingCoordinatorService::ReshardingCoordinator::_tellAllDonorsToRefres
void ReshardingCoordinatorService::ReshardingCoordinator::_tellAllParticipantsToRefresh(
const NamespaceString& nss, const std::shared_ptr<executor::ScopedTaskExecutor>& executor) {
- auto opCtx = _factory->makeOperationContext(&cc());
+ auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
auto donorShardIds = extractShardIdsFromParticipantEntries(_coordinatorDoc.getDonorShards());
auto recipientShardIds =
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.h b/src/mongo/db/s/resharding/resharding_coordinator_service.h
index fe0cfb6ec73..98448181c3b 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_service.h
+++ b/src/mongo/db/s/resharding/resharding_coordinator_service.h
@@ -375,7 +375,7 @@ private:
// CancelableOperationContext must have a thread that is always available to it to mark its
// opCtx as killed when the cancelToken has been cancelled.
const std::shared_ptr<ThreadPool> _markKilledExecutor;
- boost::optional<CancelableOperationContextFactory> _factory;
+ boost::optional<CancelableOperationContextFactory> _cancelableOpCtxFactory;
/**
* Must be locked while the `_canEnterCritical` or `_completionPromise`
diff --git a/src/mongo/db/s/resharding/resharding_donor_service.cpp b/src/mongo/db/s/resharding/resharding_donor_service.cpp
index 835c7061631..a1a14730c96 100644
--- a/src/mongo/db/s/resharding/resharding_donor_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_donor_service.cpp
@@ -65,26 +65,21 @@ namespace {
const WriteConcernOptions kNoWaitWriteConcern{1, WriteConcernOptions::SyncMode::UNSET, Seconds(0)};
-Timestamp generateMinFetchTimestamp(const NamespaceString& sourceNss) {
- auto opCtx = cc().makeOperationContext();
-
+Timestamp generateMinFetchTimestamp(OperationContext* opCtx, const NamespaceString& sourceNss) {
// Do a no-op write and use the OpTime as the minFetchTimestamp
writeConflictRetry(
- opCtx.get(),
- "resharding donor minFetchTimestamp",
- NamespaceString::kRsOplogNamespace.ns(),
- [&] {
- AutoGetDb db(opCtx.get(), sourceNss.db(), MODE_IX);
- Lock::CollectionLock collLock(opCtx.get(), sourceNss, MODE_S);
+ opCtx, "resharding donor minFetchTimestamp", NamespaceString::kRsOplogNamespace.ns(), [&] {
+ AutoGetDb db(opCtx, sourceNss.db(), MODE_IX);
+ Lock::CollectionLock collLock(opCtx, sourceNss, MODE_S);
- AutoGetOplog oplogWrite(opCtx.get(), OplogAccessMode::kWrite);
+ AutoGetOplog oplogWrite(opCtx, OplogAccessMode::kWrite);
const std::string msg = str::stream()
<< "All future oplog entries on the namespace " << sourceNss.ns()
<< " must include a 'destinedRecipient' field";
- WriteUnitOfWork wuow(opCtx.get());
+ WriteUnitOfWork wuow(opCtx);
opCtx->getClient()->getServiceContext()->getOpObserver()->onInternalOpMessage(
- opCtx.get(),
+ opCtx,
NamespaceString::kForceOplogBatchBoundaryNamespace,
boost::none,
BSON("msg" << msg),
@@ -204,7 +199,14 @@ ReshardingDonorService::DonorStateMachine::DonorStateMachine(
_metadata{donorDoc.getCommonReshardingMetadata()},
_recipientShardIds{donorDoc.getRecipientShards()},
_donorCtx{donorDoc.getMutableState()},
- _externalState{std::move(externalState)} {
+ _externalState{std::move(externalState)},
+ _markKilledExecutor(std::make_shared<ThreadPool>([] {
+ ThreadPool::Options options;
+ options.poolName = "ReshardingDonorCancelableOpCtxPool";
+ options.minThreads = 1;
+ options.maxThreads = 1;
+ return options;
+ }())) {
invariant(_externalState);
}
@@ -269,7 +271,7 @@ ExecutorFuture<void> ReshardingDonorService::DonorStateMachine::_notifyCoordinat
return withAutomaticRetry(**executor,
abortToken,
[this, executor] {
- auto opCtx = cc().makeOperationContext();
+ auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
return _updateCoordinator(opCtx.get(), executor);
})
.then([this, abortToken] {
@@ -296,10 +298,10 @@ ExecutorFuture<void> ReshardingDonorService::DonorStateMachine::_finishReshardin
_transitionState(DonorStateEnum::kDone);
}
- auto opCtx = cc().makeOperationContext();
+ auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
return _updateCoordinator(opCtx.get(), executor).then([this] {
{
- auto opCtx = cc().makeOperationContext();
+ auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
removeDonorDocFailpoint.pauseWhileSet(opCtx.get());
}
_removeDonorDocument();
@@ -311,12 +313,15 @@ SemiFuture<void> ReshardingDonorService::DonorStateMachine::run(
std::shared_ptr<executor::ScopedTaskExecutor> executor,
const CancellationToken& stepdownToken) noexcept {
auto abortToken = _initAbortSource(stepdownToken);
+ _markKilledExecutor->startup();
+ _cancelableOpCtxFactory.emplace(abortToken, _markKilledExecutor);
return _runUntilBlockingWritesOrErrored(executor, abortToken)
.then([this, executor, abortToken] {
return _notifyCoordinatorAndAwaitDecision(executor, abortToken);
})
- .onCompletion([executor, stepdownToken, abortToken](Status status) {
+ .onCompletion([this, executor, stepdownToken, abortToken](Status status) {
+ _cancelableOpCtxFactory.emplace(stepdownToken, _markKilledExecutor);
if (stepdownToken.isCanceled()) {
// Propagate any errors from the donor stepping down.
return ExecutorFuture<bool>(**executor, status);
@@ -416,7 +421,7 @@ void ReshardingDonorService::DonorStateMachine::
int64_t documentsToClone = 0;
{
- auto opCtx = cc().makeOperationContext();
+ auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
auto rawOpCtx = opCtx.get();
AutoGetCollection coll(rawOpCtx, _metadata.getSourceNss(), MODE_IS);
@@ -436,12 +441,15 @@ void ReshardingDonorService::DonorStateMachine::
// the {atClusterTime: <fetchTimestamp>} read on the config.cache.chunks namespace would fail
// with a SnapshotUnavailable error response.
{
- auto opCtx = cc().makeOperationContext();
+ auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
_externalState->refreshCatalogCache(opCtx.get(), _metadata.getTempReshardingNss());
_externalState->waitForCollectionFlush(opCtx.get(), _metadata.getTempReshardingNss());
}
- Timestamp minFetchTimestamp = generateMinFetchTimestamp(_metadata.getSourceNss());
+ Timestamp minFetchTimestamp = [this] {
+ auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
+ return generateMinFetchTimestamp(opCtx.get(), _metadata.getSourceNss());
+ }();
LOGV2_DEBUG(5390702,
2,
@@ -463,7 +471,7 @@ ExecutorFuture<void> ReshardingDonorService::DonorStateMachine::
return ExecutorFuture<void>(**executor, Status::OK());
}
- auto opCtx = cc().makeOperationContext();
+ auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
return _updateCoordinator(opCtx.get(), executor)
.then([this, abortToken] {
return future_util::withCancellation(_allRecipientsDoneCloning.getFuture(), abortToken);
@@ -497,7 +505,7 @@ void ReshardingDonorService::DonorStateMachine::
}
{
- auto opCtx = cc().makeOperationContext();
+ auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
auto rawOpCtx = opCtx.get();
auto generateOplogEntry = [&](ShardId destinedRecipient) {
@@ -575,7 +583,7 @@ void ReshardingDonorService::DonorStateMachine::_dropOriginalCollectionThenTrans
}
{
- auto opCtx = cc().makeOperationContext();
+ auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
resharding::data_copy::ensureCollectionDropped(
opCtx.get(), _metadata.getSourceNss(), _metadata.getSourceUUID());
}
@@ -698,7 +706,7 @@ ExecutorFuture<void> ReshardingDonorService::DonorStateMachine::_updateCoordinat
.waitUntilMajority(clientOpTime, CancellationToken::uncancelable())
.thenRunOn(**executor)
.then([this] {
- auto opCtx = cc().makeOperationContext();
+ auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
auto shardId = _externalState->myShardId(opCtx->getServiceContext());
BSONObjBuilder updateBuilder;
@@ -727,7 +735,7 @@ void ReshardingDonorService::DonorStateMachine::insertStateDocument(
void ReshardingDonorService::DonorStateMachine::_updateDonorDocument(
DonorShardContext&& newDonorCtx) {
- auto opCtx = cc().makeOperationContext();
+ auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
PersistentTaskStore<ReshardingDonorDocument> store(
NamespaceString::kDonorReshardingOperationsNamespace);
store.update(
@@ -741,7 +749,7 @@ void ReshardingDonorService::DonorStateMachine::_updateDonorDocument(
}
void ReshardingDonorService::DonorStateMachine::_removeDonorDocument() {
- auto opCtx = cc().makeOperationContext();
+ auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
const auto& nss = NamespaceString::kDonorReshardingOperationsNamespace;
writeConflictRetry(opCtx.get(), "DonorStateMachine::_removeDonorDocument", nss.toString(), [&] {
diff --git a/src/mongo/db/s/resharding/resharding_donor_service.h b/src/mongo/db/s/resharding/resharding_donor_service.h
index 2972dc39d05..11addfc69bc 100644
--- a/src/mongo/db/s/resharding/resharding_donor_service.h
+++ b/src/mongo/db/s/resharding/resharding_donor_service.h
@@ -29,6 +29,7 @@
#pragma once
+#include "mongo/db/cancelable_operation_context.h"
#include "mongo/db/repl/primary_only_service.h"
#include "mongo/db/s/resharding/donor_document_gen.h"
#include "mongo/db/s/resharding/resharding_critical_section.h"
@@ -195,6 +196,12 @@ private:
const std::unique_ptr<DonorStateMachineExternalState> _externalState;
+ // ThreadPool used by CancelableOperationContext.
+ // CancelableOperationContext must have a thread that is always available to it to mark its
+ // opCtx as killed when the cancelToken has been cancelled.
+ const std::shared_ptr<ThreadPool> _markKilledExecutor;
+ boost::optional<CancelableOperationContextFactory> _cancelableOpCtxFactory;
+
// Protects the state below
Mutex _mutex = MONGO_MAKE_LATCH("DonorStateMachine::_mutex");