summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMax Hirschhorn <max.hirschhorn@mongodb.com>2021-10-23 02:38:24 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-10-23 03:08:03 +0000
commit1a80e8dad2fba68bde96eb080bd1c4319f50e778 (patch)
tree497d5fdfb32e5949fb9a0712e7341bb7e53e9bcf
parent56a5ddbf5a2ba92169e81ec24f9387c5fe1c0931 (diff)
downloadmongo-1a80e8dad2fba68bde96eb080bd1c4319f50e778.tar.gz
SERVER-60859 Explicitly quiesce commit monitor in ReshardingCoordinator.
Also changes the commit monitor to use CancelableOperationContext so its internal operations can be interrupted more rapidly.
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_commit_monitor.cpp15
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_commit_monitor_test.cpp2
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_service.cpp55
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_service.h28
4 files changed, 75 insertions, 25 deletions
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor.cpp b/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor.cpp
index ff3ad2eef8d..a616bc90df4 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor.cpp
+++ b/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor.cpp
@@ -37,6 +37,7 @@
#include "mongo/base/error_codes.h"
#include "mongo/bson/bsonobj.h"
#include "mongo/client/read_preference.h"
+#include "mongo/db/cancelable_operation_context.h"
#include "mongo/db/s/resharding/resharding_metrics.h"
#include "mongo/db/s/resharding/resharding_server_parameters_gen.h"
#include "mongo/logv2/log.h"
@@ -102,17 +103,18 @@ CoordinatorCommitMonitor::CoordinatorCommitMonitor(
SemiFuture<void> CoordinatorCommitMonitor::waitUntilRecipientsAreWithinCommitThreshold() const {
return _makeFuture()
.onError([](Status status) {
- if (ErrorCodes::isCancellationError(status.code())) {
+ if (ErrorCodes::isCancellationError(status.code()) ||
+ ErrorCodes::isInterruption(status.code())) {
LOGV2_DEBUG(5392003,
kDiagnosticLogLevel,
- "The resharding commit monitor is interrupted",
+ "The resharding commit monitor has been interrupted",
"error"_attr = status);
} else {
LOGV2_WARNING(5392004,
"Stopped the resharding commit monitor due to an error",
"error"_attr = status);
}
- return Status::OK();
+ return status;
})
.semi();
}
@@ -133,7 +135,7 @@ CoordinatorCommitMonitor::queryRemainingOperationTimeForRecipients() const {
"Querying recipient shards for the remaining operation time",
"namespace"_attr = _ns);
- auto opCtx = cc().makeOperationContext();
+ auto opCtx = CancelableOperationContext(cc().makeOperationContext(), _cancelToken, _executor);
auto executor = _networkExecutor ? _networkExecutor : _executor;
AsyncRequestsSender ars(opCtx.get(),
executor,
@@ -187,10 +189,11 @@ CoordinatorCommitMonitor::queryRemainingOperationTimeForRecipients() const {
ExecutorFuture<void> CoordinatorCommitMonitor::_makeFuture() const {
return ExecutorFuture<void>(_executor)
.then([this] { return queryRemainingOperationTimeForRecipients(); })
- .onError([](Status status) {
- if (ErrorCodes::isCancellationError(status.code()))
+ .onError([this](Status status) {
+ if (_cancelToken.isCanceled()) {
// Do not retry on cancellation errors.
iasserted(status);
+ }
// Absorbs any exception thrown by the query phase, except for cancellation errors, and
// retries. The intention is to handle short term issues with querying recipients (e.g.,
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor_test.cpp b/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor_test.cpp
index d2aff55ae5c..e7f90cc41fa 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor_test.cpp
@@ -248,7 +248,7 @@ TEST_F(CoordinatorCommitMonitorTest, UnblocksWhenCancellationTokenIsCancelled) {
return future;
}();
- future.get();
+ ASSERT_EQ(future.getNoThrow(), ErrorCodes::CallbackCanceled);
}
TEST_F(CoordinatorCommitMonitorTest, RetriesWhenEncountersErrorsWhileQueryingRecipients) {
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
index e87e5d2a705..ab2c3acd454 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
@@ -1315,6 +1315,13 @@ SemiFuture<void> ReshardingCoordinatorService::ReshardingCoordinator::run(
return status;
})
.thenRunOn(_coordinatorService->getInstanceCleanupExecutor())
+ .onCompletion([this](Status outerStatus) {
+ // Wait for the commit monitor to halt. We ignore any ignores because the
+ // ReshardingCoordinator instance is already exiting at this point.
+ return _commitMonitorQuiesced
+ .thenRunOn(_coordinatorService->getInstanceCleanupExecutor())
+ .onCompletion([outerStatus](Status) { return outerStatus; });
+ })
.onCompletion([this, self = shared_from_this()](Status status) {
// On stepdown or shutdown, the _scopedExecutor may have already been shut down.
// Schedule cleanup work on the parent executor.
@@ -1432,11 +1439,21 @@ ReshardingCoordinatorService::ReshardingCoordinator::getObserver() {
}
void ReshardingCoordinatorService::ReshardingCoordinator::onOkayToEnterCritical() {
+ _fulfillOkayToEnterCritical(Status::OK());
+}
+
+void ReshardingCoordinatorService::ReshardingCoordinator::_fulfillOkayToEnterCritical(
+ Status status) {
auto lg = stdx::lock_guard(_fulfillmentMutex);
if (_canEnterCritical.getFuture().isReady())
return;
- LOGV2(5391601, "Marking resharding operation okay to enter critical section");
- _canEnterCritical.emplaceValue();
+
+ if (status.isOK()) {
+ LOGV2(5391601, "Marking resharding operation okay to enter critical section");
+ _canEnterCritical.emplaceValue();
+ } else {
+ _canEnterCritical.setError(std::move(status));
+ }
}
void ReshardingCoordinatorService::ReshardingCoordinator::_insertCoordDocAndChangeOrigCollEntry() {
@@ -1575,20 +1592,23 @@ ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllRecipientsFinished
void ReshardingCoordinatorService::ReshardingCoordinator::_startCommitMonitor(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor) {
- _ctHolder->getAbortToken().onCancel().thenRunOn(**executor).getAsync([this](Status status) {
- if (status.isOK())
- _commitMonitorCancellationSource.cancel();
- });
+ if (_commitMonitor) {
+ return;
+ }
- auto commitMonitor = std::make_shared<resharding::CoordinatorCommitMonitor>(
+ _commitMonitor = std::make_shared<resharding::CoordinatorCommitMonitor>(
_coordinatorDoc.getSourceNss(),
extractShardIdsFromParticipantEntries(_coordinatorDoc.getRecipientShards()),
**executor,
- _commitMonitorCancellationSource.token());
+ _ctHolder->getCommitMonitorToken());
- commitMonitor->waitUntilRecipientsAreWithinCommitThreshold()
- .thenRunOn(**executor)
- .getAsync([this](Status) { onOkayToEnterCritical(); });
+ _commitMonitorQuiesced = _commitMonitor->waitUntilRecipientsAreWithinCommitThreshold()
+ .thenRunOn(**executor)
+ .onCompletion([this](Status status) {
+ _fulfillOkayToEnterCritical(status);
+ return status;
+ })
+ .share();
}
ExecutorFuture<void>
@@ -1603,10 +1623,15 @@ ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllRecipientsFinished
_startCommitMonitor(executor);
LOGV2(5391602, "Resharding operation waiting for an okay to enter critical section");
- return _canEnterCritical.getFuture().thenRunOn(**executor).then([this] {
- _commitMonitorCancellationSource.cancel();
- LOGV2(5391603, "Resharding operation is okay to enter critical section");
- });
+ return _canEnterCritical.getFuture()
+ .thenRunOn(**executor)
+ .onCompletion([this](Status status) {
+ _ctHolder->cancelCommitMonitor();
+ if (status.isOK()) {
+ LOGV2(5391603, "Resharding operation is okay to enter critical section");
+ }
+ return status;
+ });
})
.then([this, executor] {
{
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.h b/src/mongo/db/s/resharding/resharding_coordinator_service.h
index 3737bd8a62f..747cbd86ac6 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_service.h
+++ b/src/mongo/db/s/resharding/resharding_coordinator_service.h
@@ -43,6 +43,8 @@
namespace mongo {
namespace resharding {
+class CoordinatorCommitMonitor;
+
CollectionType createTempReshardingCollectionType(
OperationContext* opCtx,
const ReshardingCoordinatorDocument& coordinatorDoc,
@@ -115,7 +117,8 @@ public:
CoordinatorCancellationTokenHolder(CancellationToken stepdownToken)
: _stepdownToken(stepdownToken),
_abortSource(CancellationSource(stepdownToken)),
- _abortToken(_abortSource.token()) {}
+ _abortToken(_abortSource.token()),
+ _commitMonitorCancellationSource(CancellationSource(_abortToken)) {}
/**
* Returns whether the any token has been canceled.
@@ -148,6 +151,10 @@ public:
_abortSource.cancel();
}
+ void cancelCommitMonitor() {
+ _commitMonitorCancellationSource.cancel();
+ }
+
const CancellationToken& getStepdownToken() {
return _stepdownToken;
}
@@ -156,6 +163,10 @@ public:
return _abortToken;
}
+ CancellationToken getCommitMonitorToken() {
+ return _commitMonitorCancellationSource.token();
+ }
+
private:
// The token passed in by the PrimaryOnlyService runner that is canceled when this shard's
// underlying replica set node is stepping down or shutting down.
@@ -167,6 +178,10 @@ private:
// The token to wait on in cases where a user wants to wait on either a resharding operation
// being aborted or the replica set node stepping/shutting down.
CancellationToken _abortToken;
+
+ // The source created by inheriting from the abort token.
+ // Provides the means to cancel the commit monitor (e.g., due to receiving the commit command).
+ CancellationSource _commitMonitorCancellationSource;
};
class ReshardingCoordinatorService : public repl::PrimaryOnlyService {
@@ -468,6 +483,13 @@ private:
*/
void _updateChunkImbalanceMetrics(const NamespaceString& nss);
+ /**
+ * When called with Status::OK(), the coordinator will eventually enter the critical section.
+ *
+ * When called with an error Status, the coordinator will never enter the critical section.
+ */
+ void _fulfillOkayToEnterCritical(Status status);
+
// The unique key for a given resharding operation. InstanceID is an alias for BSONObj. The
// value of this is the UUID that will be used as the collection UUID for the new sharded
// collection. The object looks like: {_id: 'reshardingUUID'}
@@ -519,8 +541,8 @@ private:
// Callback handle for scheduled work to handle critical section timeout.
boost::optional<executor::TaskExecutor::CallbackHandle> _criticalSectionTimeoutCbHandle;
- // Provides the means to cancel the commit monitor (e.g., due to receiving the commit command).
- CancellationSource _commitMonitorCancellationSource;
+ SharedSemiFuture<void> _commitMonitorQuiesced;
+ std::shared_ptr<resharding::CoordinatorCommitMonitor> _commitMonitor;
std::shared_ptr<ReshardingCoordinatorExternalState> _reshardingCoordinatorExternalState;
};