diff options
author | Max Hirschhorn <max.hirschhorn@mongodb.com> | 2021-10-23 02:38:24 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-10-23 03:08:03 +0000 |
commit | 1a80e8dad2fba68bde96eb080bd1c4319f50e778 (patch) | |
tree | 497d5fdfb32e5949fb9a0712e7341bb7e53e9bcf /src/mongo | |
parent | 56a5ddbf5a2ba92169e81ec24f9387c5fe1c0931 (diff) | |
download | mongo-1a80e8dad2fba68bde96eb080bd1c4319f50e778.tar.gz |
SERVER-60859 Explicitly quiesce commit monitor in ReshardingCoordinator.
Also changes the commit monitor to use CancelableOperationContext so its
internal operations can be interrupted more rapidly.
Diffstat (limited to 'src/mongo')
4 files changed, 75 insertions, 25 deletions
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor.cpp b/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor.cpp index ff3ad2eef8d..a616bc90df4 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor.cpp +++ b/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor.cpp @@ -37,6 +37,7 @@ #include "mongo/base/error_codes.h" #include "mongo/bson/bsonobj.h" #include "mongo/client/read_preference.h" +#include "mongo/db/cancelable_operation_context.h" #include "mongo/db/s/resharding/resharding_metrics.h" #include "mongo/db/s/resharding/resharding_server_parameters_gen.h" #include "mongo/logv2/log.h" @@ -102,17 +103,18 @@ CoordinatorCommitMonitor::CoordinatorCommitMonitor( SemiFuture<void> CoordinatorCommitMonitor::waitUntilRecipientsAreWithinCommitThreshold() const { return _makeFuture() .onError([](Status status) { - if (ErrorCodes::isCancellationError(status.code())) { + if (ErrorCodes::isCancellationError(status.code()) || + ErrorCodes::isInterruption(status.code())) { LOGV2_DEBUG(5392003, kDiagnosticLogLevel, - "The resharding commit monitor is interrupted", + "The resharding commit monitor has been interrupted", "error"_attr = status); } else { LOGV2_WARNING(5392004, "Stopped the resharding commit monitor due to an error", "error"_attr = status); } - return Status::OK(); + return status; }) .semi(); } @@ -133,7 +135,7 @@ CoordinatorCommitMonitor::queryRemainingOperationTimeForRecipients() const { "Querying recipient shards for the remaining operation time", "namespace"_attr = _ns); - auto opCtx = cc().makeOperationContext(); + auto opCtx = CancelableOperationContext(cc().makeOperationContext(), _cancelToken, _executor); auto executor = _networkExecutor ? _networkExecutor : _executor; AsyncRequestsSender ars(opCtx.get(), executor, @@ -187,10 +189,11 @@ CoordinatorCommitMonitor::queryRemainingOperationTimeForRecipients() const { ExecutorFuture<void> CoordinatorCommitMonitor::_makeFuture() const { return ExecutorFuture<void>(_executor) .then([this] { return queryRemainingOperationTimeForRecipients(); }) - .onError([](Status status) { - if (ErrorCodes::isCancellationError(status.code())) + .onError([this](Status status) { + if (_cancelToken.isCanceled()) { // Do not retry on cancellation errors. iasserted(status); + } // Absorbs any exception thrown by the query phase, except for cancellation errors, and // retries. The intention is to handle short term issues with querying recipients (e.g., diff --git a/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor_test.cpp b/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor_test.cpp index d2aff55ae5c..e7f90cc41fa 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor_test.cpp +++ b/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor_test.cpp @@ -248,7 +248,7 @@ TEST_F(CoordinatorCommitMonitorTest, UnblocksWhenCancellationTokenIsCancelled) { return future; }(); - future.get(); + ASSERT_EQ(future.getNoThrow(), ErrorCodes::CallbackCanceled); } TEST_F(CoordinatorCommitMonitorTest, RetriesWhenEncountersErrorsWhileQueryingRecipients) { diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp index e87e5d2a705..ab2c3acd454 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp +++ b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp @@ -1315,6 +1315,13 @@ SemiFuture<void> ReshardingCoordinatorService::ReshardingCoordinator::run( return status; }) .thenRunOn(_coordinatorService->getInstanceCleanupExecutor()) + .onCompletion([this](Status outerStatus) { + // Wait for the commit monitor to halt. We ignore any ignores because the + // ReshardingCoordinator instance is already exiting at this point. + return _commitMonitorQuiesced + .thenRunOn(_coordinatorService->getInstanceCleanupExecutor()) + .onCompletion([outerStatus](Status) { return outerStatus; }); + }) .onCompletion([this, self = shared_from_this()](Status status) { // On stepdown or shutdown, the _scopedExecutor may have already been shut down. // Schedule cleanup work on the parent executor. @@ -1432,11 +1439,21 @@ ReshardingCoordinatorService::ReshardingCoordinator::getObserver() { } void ReshardingCoordinatorService::ReshardingCoordinator::onOkayToEnterCritical() { + _fulfillOkayToEnterCritical(Status::OK()); +} + +void ReshardingCoordinatorService::ReshardingCoordinator::_fulfillOkayToEnterCritical( + Status status) { auto lg = stdx::lock_guard(_fulfillmentMutex); if (_canEnterCritical.getFuture().isReady()) return; - LOGV2(5391601, "Marking resharding operation okay to enter critical section"); - _canEnterCritical.emplaceValue(); + + if (status.isOK()) { + LOGV2(5391601, "Marking resharding operation okay to enter critical section"); + _canEnterCritical.emplaceValue(); + } else { + _canEnterCritical.setError(std::move(status)); + } } void ReshardingCoordinatorService::ReshardingCoordinator::_insertCoordDocAndChangeOrigCollEntry() { @@ -1575,20 +1592,23 @@ ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllRecipientsFinished void ReshardingCoordinatorService::ReshardingCoordinator::_startCommitMonitor( const std::shared_ptr<executor::ScopedTaskExecutor>& executor) { - _ctHolder->getAbortToken().onCancel().thenRunOn(**executor).getAsync([this](Status status) { - if (status.isOK()) - _commitMonitorCancellationSource.cancel(); - }); + if (_commitMonitor) { + return; + } - auto commitMonitor = std::make_shared<resharding::CoordinatorCommitMonitor>( + _commitMonitor = std::make_shared<resharding::CoordinatorCommitMonitor>( _coordinatorDoc.getSourceNss(), extractShardIdsFromParticipantEntries(_coordinatorDoc.getRecipientShards()), **executor, - _commitMonitorCancellationSource.token()); + _ctHolder->getCommitMonitorToken()); - commitMonitor->waitUntilRecipientsAreWithinCommitThreshold() - .thenRunOn(**executor) - .getAsync([this](Status) { onOkayToEnterCritical(); }); + _commitMonitorQuiesced = _commitMonitor->waitUntilRecipientsAreWithinCommitThreshold() + .thenRunOn(**executor) + .onCompletion([this](Status status) { + _fulfillOkayToEnterCritical(status); + return status; + }) + .share(); } ExecutorFuture<void> @@ -1603,10 +1623,15 @@ ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllRecipientsFinished _startCommitMonitor(executor); LOGV2(5391602, "Resharding operation waiting for an okay to enter critical section"); - return _canEnterCritical.getFuture().thenRunOn(**executor).then([this] { - _commitMonitorCancellationSource.cancel(); - LOGV2(5391603, "Resharding operation is okay to enter critical section"); - }); + return _canEnterCritical.getFuture() + .thenRunOn(**executor) + .onCompletion([this](Status status) { + _ctHolder->cancelCommitMonitor(); + if (status.isOK()) { + LOGV2(5391603, "Resharding operation is okay to enter critical section"); + } + return status; + }); }) .then([this, executor] { { diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.h b/src/mongo/db/s/resharding/resharding_coordinator_service.h index 3737bd8a62f..747cbd86ac6 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_service.h +++ b/src/mongo/db/s/resharding/resharding_coordinator_service.h @@ -43,6 +43,8 @@ namespace mongo { namespace resharding { +class CoordinatorCommitMonitor; + CollectionType createTempReshardingCollectionType( OperationContext* opCtx, const ReshardingCoordinatorDocument& coordinatorDoc, @@ -115,7 +117,8 @@ public: CoordinatorCancellationTokenHolder(CancellationToken stepdownToken) : _stepdownToken(stepdownToken), _abortSource(CancellationSource(stepdownToken)), - _abortToken(_abortSource.token()) {} + _abortToken(_abortSource.token()), + _commitMonitorCancellationSource(CancellationSource(_abortToken)) {} /** * Returns whether the any token has been canceled. @@ -148,6 +151,10 @@ public: _abortSource.cancel(); } + void cancelCommitMonitor() { + _commitMonitorCancellationSource.cancel(); + } + const CancellationToken& getStepdownToken() { return _stepdownToken; } @@ -156,6 +163,10 @@ public: return _abortToken; } + CancellationToken getCommitMonitorToken() { + return _commitMonitorCancellationSource.token(); + } + private: // The token passed in by the PrimaryOnlyService runner that is canceled when this shard's // underlying replica set node is stepping down or shutting down. @@ -167,6 +178,10 @@ private: // The token to wait on in cases where a user wants to wait on either a resharding operation // being aborted or the replica set node stepping/shutting down. CancellationToken _abortToken; + + // The source created by inheriting from the abort token. + // Provides the means to cancel the commit monitor (e.g., due to receiving the commit command). + CancellationSource _commitMonitorCancellationSource; }; class ReshardingCoordinatorService : public repl::PrimaryOnlyService { @@ -468,6 +483,13 @@ private: */ void _updateChunkImbalanceMetrics(const NamespaceString& nss); + /** + * When called with Status::OK(), the coordinator will eventually enter the critical section. + * + * When called with an error Status, the coordinator will never enter the critical section. + */ + void _fulfillOkayToEnterCritical(Status status); + // The unique key for a given resharding operation. InstanceID is an alias for BSONObj. The // value of this is the UUID that will be used as the collection UUID for the new sharded // collection. The object looks like: {_id: 'reshardingUUID'} @@ -519,8 +541,8 @@ private: // Callback handle for scheduled work to handle critical section timeout. boost::optional<executor::TaskExecutor::CallbackHandle> _criticalSectionTimeoutCbHandle; - // Provides the means to cancel the commit monitor (e.g., due to receiving the commit command). - CancellationSource _commitMonitorCancellationSource; + SharedSemiFuture<void> _commitMonitorQuiesced; + std::shared_ptr<resharding::CoordinatorCommitMonitor> _commitMonitor; std::shared_ptr<ReshardingCoordinatorExternalState> _reshardingCoordinatorExternalState; }; |