summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMax Hirschhorn <max.hirschhorn@mongodb.com>2021-10-23 13:25:07 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-10-23 13:52:00 +0000
commit79029cea7a5cc1d6e6053804c4528a979a6c61f7 (patch)
tree05ebf890b306b2315b384600b51fb54b0a435733 /src
parentb688682bf85c689dde7c8e0d1c0b3f1d8bbacc3c (diff)
downloadmongo-79029cea7a5cc1d6e6053804c4528a979a6c61f7.tar.gz
SERVER-60859 Explicitly quiesce commit monitor in ReshardingCoordinator.
Also changes the commit monitor to use CancelableOperationContext so its internal operations can be interrupted more rapidly. (cherry picked from commit 1a80e8dad2fba68bde96eb080bd1c4319f50e778)
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_commit_monitor.cpp15
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_commit_monitor_test.cpp2
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_service.cpp55
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_service.h28
4 files changed, 75 insertions, 25 deletions
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor.cpp b/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor.cpp
index ff3ad2eef8d..a616bc90df4 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor.cpp
+++ b/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor.cpp
@@ -37,6 +37,7 @@
#include "mongo/base/error_codes.h"
#include "mongo/bson/bsonobj.h"
#include "mongo/client/read_preference.h"
+#include "mongo/db/cancelable_operation_context.h"
#include "mongo/db/s/resharding/resharding_metrics.h"
#include "mongo/db/s/resharding/resharding_server_parameters_gen.h"
#include "mongo/logv2/log.h"
@@ -102,17 +103,18 @@ CoordinatorCommitMonitor::CoordinatorCommitMonitor(
SemiFuture<void> CoordinatorCommitMonitor::waitUntilRecipientsAreWithinCommitThreshold() const {
return _makeFuture()
.onError([](Status status) {
- if (ErrorCodes::isCancellationError(status.code())) {
+ if (ErrorCodes::isCancellationError(status.code()) ||
+ ErrorCodes::isInterruption(status.code())) {
LOGV2_DEBUG(5392003,
kDiagnosticLogLevel,
- "The resharding commit monitor is interrupted",
+ "The resharding commit monitor has been interrupted",
"error"_attr = status);
} else {
LOGV2_WARNING(5392004,
"Stopped the resharding commit monitor due to an error",
"error"_attr = status);
}
- return Status::OK();
+ return status;
})
.semi();
}
@@ -133,7 +135,7 @@ CoordinatorCommitMonitor::queryRemainingOperationTimeForRecipients() const {
"Querying recipient shards for the remaining operation time",
"namespace"_attr = _ns);
- auto opCtx = cc().makeOperationContext();
+ auto opCtx = CancelableOperationContext(cc().makeOperationContext(), _cancelToken, _executor);
auto executor = _networkExecutor ? _networkExecutor : _executor;
AsyncRequestsSender ars(opCtx.get(),
executor,
@@ -187,10 +189,11 @@ CoordinatorCommitMonitor::queryRemainingOperationTimeForRecipients() const {
ExecutorFuture<void> CoordinatorCommitMonitor::_makeFuture() const {
return ExecutorFuture<void>(_executor)
.then([this] { return queryRemainingOperationTimeForRecipients(); })
- .onError([](Status status) {
- if (ErrorCodes::isCancellationError(status.code()))
+ .onError([this](Status status) {
+ if (_cancelToken.isCanceled()) {
// Do not retry on cancellation errors.
iasserted(status);
+ }
// Absorbs any exception thrown by the query phase, except for cancellation errors, and
// retries. The intention is to handle short term issues with querying recipients (e.g.,
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor_test.cpp b/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor_test.cpp
index d2aff55ae5c..e7f90cc41fa 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor_test.cpp
@@ -248,7 +248,7 @@ TEST_F(CoordinatorCommitMonitorTest, UnblocksWhenCancellationTokenIsCancelled) {
return future;
}();
- future.get();
+ ASSERT_EQ(future.getNoThrow(), ErrorCodes::CallbackCanceled);
}
TEST_F(CoordinatorCommitMonitorTest, RetriesWhenEncountersErrorsWhileQueryingRecipients) {
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
index e87e5d2a705..ab2c3acd454 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
@@ -1315,6 +1315,13 @@ SemiFuture<void> ReshardingCoordinatorService::ReshardingCoordinator::run(
return status;
})
.thenRunOn(_coordinatorService->getInstanceCleanupExecutor())
+ .onCompletion([this](Status outerStatus) {
+ // Wait for the commit monitor to halt. We ignore any ignores because the
+ // ReshardingCoordinator instance is already exiting at this point.
+ return _commitMonitorQuiesced
+ .thenRunOn(_coordinatorService->getInstanceCleanupExecutor())
+ .onCompletion([outerStatus](Status) { return outerStatus; });
+ })
.onCompletion([this, self = shared_from_this()](Status status) {
// On stepdown or shutdown, the _scopedExecutor may have already been shut down.
// Schedule cleanup work on the parent executor.
@@ -1432,11 +1439,21 @@ ReshardingCoordinatorService::ReshardingCoordinator::getObserver() {
}
void ReshardingCoordinatorService::ReshardingCoordinator::onOkayToEnterCritical() {
+ _fulfillOkayToEnterCritical(Status::OK());
+}
+
+void ReshardingCoordinatorService::ReshardingCoordinator::_fulfillOkayToEnterCritical(
+ Status status) {
auto lg = stdx::lock_guard(_fulfillmentMutex);
if (_canEnterCritical.getFuture().isReady())
return;
- LOGV2(5391601, "Marking resharding operation okay to enter critical section");
- _canEnterCritical.emplaceValue();
+
+ if (status.isOK()) {
+ LOGV2(5391601, "Marking resharding operation okay to enter critical section");
+ _canEnterCritical.emplaceValue();
+ } else {
+ _canEnterCritical.setError(std::move(status));
+ }
}
void ReshardingCoordinatorService::ReshardingCoordinator::_insertCoordDocAndChangeOrigCollEntry() {
@@ -1575,20 +1592,23 @@ ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllRecipientsFinished
void ReshardingCoordinatorService::ReshardingCoordinator::_startCommitMonitor(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor) {
- _ctHolder->getAbortToken().onCancel().thenRunOn(**executor).getAsync([this](Status status) {
- if (status.isOK())
- _commitMonitorCancellationSource.cancel();
- });
+ if (_commitMonitor) {
+ return;
+ }
- auto commitMonitor = std::make_shared<resharding::CoordinatorCommitMonitor>(
+ _commitMonitor = std::make_shared<resharding::CoordinatorCommitMonitor>(
_coordinatorDoc.getSourceNss(),
extractShardIdsFromParticipantEntries(_coordinatorDoc.getRecipientShards()),
**executor,
- _commitMonitorCancellationSource.token());
+ _ctHolder->getCommitMonitorToken());
- commitMonitor->waitUntilRecipientsAreWithinCommitThreshold()
- .thenRunOn(**executor)
- .getAsync([this](Status) { onOkayToEnterCritical(); });
+ _commitMonitorQuiesced = _commitMonitor->waitUntilRecipientsAreWithinCommitThreshold()
+ .thenRunOn(**executor)
+ .onCompletion([this](Status status) {
+ _fulfillOkayToEnterCritical(status);
+ return status;
+ })
+ .share();
}
ExecutorFuture<void>
@@ -1603,10 +1623,15 @@ ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllRecipientsFinished
_startCommitMonitor(executor);
LOGV2(5391602, "Resharding operation waiting for an okay to enter critical section");
- return _canEnterCritical.getFuture().thenRunOn(**executor).then([this] {
- _commitMonitorCancellationSource.cancel();
- LOGV2(5391603, "Resharding operation is okay to enter critical section");
- });
+ return _canEnterCritical.getFuture()
+ .thenRunOn(**executor)
+ .onCompletion([this](Status status) {
+ _ctHolder->cancelCommitMonitor();
+ if (status.isOK()) {
+ LOGV2(5391603, "Resharding operation is okay to enter critical section");
+ }
+ return status;
+ });
})
.then([this, executor] {
{
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.h b/src/mongo/db/s/resharding/resharding_coordinator_service.h
index 3737bd8a62f..747cbd86ac6 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_service.h
+++ b/src/mongo/db/s/resharding/resharding_coordinator_service.h
@@ -43,6 +43,8 @@
namespace mongo {
namespace resharding {
+class CoordinatorCommitMonitor;
+
CollectionType createTempReshardingCollectionType(
OperationContext* opCtx,
const ReshardingCoordinatorDocument& coordinatorDoc,
@@ -115,7 +117,8 @@ public:
CoordinatorCancellationTokenHolder(CancellationToken stepdownToken)
: _stepdownToken(stepdownToken),
_abortSource(CancellationSource(stepdownToken)),
- _abortToken(_abortSource.token()) {}
+ _abortToken(_abortSource.token()),
+ _commitMonitorCancellationSource(CancellationSource(_abortToken)) {}
/**
* Returns whether the any token has been canceled.
@@ -148,6 +151,10 @@ public:
_abortSource.cancel();
}
+ void cancelCommitMonitor() {
+ _commitMonitorCancellationSource.cancel();
+ }
+
const CancellationToken& getStepdownToken() {
return _stepdownToken;
}
@@ -156,6 +163,10 @@ public:
return _abortToken;
}
+ CancellationToken getCommitMonitorToken() {
+ return _commitMonitorCancellationSource.token();
+ }
+
private:
// The token passed in by the PrimaryOnlyService runner that is canceled when this shard's
// underlying replica set node is stepping down or shutting down.
@@ -167,6 +178,10 @@ private:
// The token to wait on in cases where a user wants to wait on either a resharding operation
// being aborted or the replica set node stepping/shutting down.
CancellationToken _abortToken;
+
+ // The source created by inheriting from the abort token.
+ // Provides the means to cancel the commit monitor (e.g., due to receiving the commit command).
+ CancellationSource _commitMonitorCancellationSource;
};
class ReshardingCoordinatorService : public repl::PrimaryOnlyService {
@@ -468,6 +483,13 @@ private:
*/
void _updateChunkImbalanceMetrics(const NamespaceString& nss);
+ /**
+ * When called with Status::OK(), the coordinator will eventually enter the critical section.
+ *
+ * When called with an error Status, the coordinator will never enter the critical section.
+ */
+ void _fulfillOkayToEnterCritical(Status status);
+
// The unique key for a given resharding operation. InstanceID is an alias for BSONObj. The
// value of this is the UUID that will be used as the collection UUID for the new sharded
// collection. The object looks like: {_id: 'reshardingUUID'}
@@ -519,8 +541,8 @@ private:
// Callback handle for scheduled work to handle critical section timeout.
boost::optional<executor::TaskExecutor::CallbackHandle> _criticalSectionTimeoutCbHandle;
- // Provides the means to cancel the commit monitor (e.g., due to receiving the commit command).
- CancellationSource _commitMonitorCancellationSource;
+ SharedSemiFuture<void> _commitMonitorQuiesced;
+ std::shared_ptr<resharding::CoordinatorCommitMonitor> _commitMonitor;
std::shared_ptr<ReshardingCoordinatorExternalState> _reshardingCoordinatorExternalState;
};