summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAntonio Fuschetto <antonio.fuschetto@mongodb.com>2023-03-23 16:03:58 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2023-03-23 17:45:04 +0000
commita0c9ffb2cee79bffb071c877d39272efad293e76 (patch)
treef01d7fa7691f2d172906fbd5babfeb52ab84f5c9 /src
parente1ba18ecc0c1a7b65aabc412b94e13e2e6fba6a8 (diff)
downloadmongo-a0c9ffb2cee79bffb071c877d39272efad293e76.tar.gz
SERVER-73929 Improve the resilient movePrimary's error handling
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/s/move_primary_coordinator.cpp156
-rw-r--r--src/mongo/db/s/move_primary_coordinator.h8
2 files changed, 132 insertions, 32 deletions
diff --git a/src/mongo/db/s/move_primary_coordinator.cpp b/src/mongo/db/s/move_primary_coordinator.cpp
index 5a9848338b9..45049a5380a 100644
--- a/src/mongo/db/s/move_primary_coordinator.cpp
+++ b/src/mongo/db/s/move_primary_coordinator.cpp
@@ -79,6 +79,15 @@ void MovePrimaryCoordinator::appendCommandInfo(BSONObjBuilder* cmdInfoBuilder) c
BSON(MovePrimaryCoordinatorDocument::kToShardIdFieldName << _doc.getToShardId()));
};
+bool MovePrimaryCoordinator::_mustAlwaysMakeProgress() {
+ stdx::lock_guard lk(_docMutex);
+
+ // Any non-retryable errors while checking the preconditions should cause the operation to be
+ // terminated. Instead, in any of the subsequent phases, any non-retryable errors that do not
+ // trigger the cleanup procedure should cause the operation to be retried from the failed phase.
+ return _doc.getPhase() > Phase::kUnset;
+};
+
void MovePrimaryCoordinator::checkIfOptionsConflict(const BSONObj& doc) const {
const auto otherDoc = MovePrimaryCoordinatorDocument::parse(
IDLParserContext("MovePrimaryCoordinatorDocument"), doc);
@@ -97,15 +106,39 @@ ExecutorFuture<void> MovePrimaryCoordinator::_runImpl(
auto* opCtx = opCtxHolder.get();
getForwardableOpMetadata().setOn(opCtx);
- if (_doc.getToShardId() == ShardingState::get(opCtx)->shardId()) {
+ const auto& toShardId = _doc.getToShardId();
+
+ if (toShardId == ShardingState::get(opCtx)->shardId()) {
LOGV2(7120200,
"Database already on requested primary shard",
"db"_attr = _dbName,
- "to"_attr = _doc.getToShardId());
+ "to"_attr = toShardId);
return ExecutorFuture<void>(**executor);
}
+ const auto toShardEntry = [&] {
+ const auto config = Grid::get(opCtx)->shardRegistry()->getConfigShard();
+ const auto findResponse = uassertStatusOK(config->exhaustiveFindOnConfig(
+ opCtx,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ repl::ReadConcernLevel::kMajorityReadConcern,
+ NamespaceString::kConfigsvrShardsNamespace,
+ BSON(ShardType::name() << toShardId),
+ BSONObj() /* No sorting */,
+ 1 /* Limit */));
+
+ uassert(ErrorCodes::ShardNotFound,
+ "Requested primary shard {} does not exist"_format(toShardId.toString()),
+ !findResponse.docs.empty());
+
+ return uassertStatusOK(ShardType::fromBSON(findResponse.docs.front()));
+ }();
+
+ uassert(ErrorCodes::ShardNotFound,
+ "Requested primary shard {} is draining"_format(toShardId.toString()),
+ !toShardEntry.getDraining());
+
return runMovePrimaryWorkflow(executor, token);
});
}
@@ -121,6 +154,18 @@ ExecutorFuture<void> MovePrimaryCoordinator::runMovePrimaryWorkflow(
auto* opCtx = opCtxHolder.get();
getForwardableOpMetadata().setOn(opCtx);
+ if (!_firstExecution) {
+ // The `_shardsvrCloneCatalogData` command to request the recipient to clone the
+ // catalog data for the given database is not idempotent. Therefore, if the
+ // recipient already started cloning data before the coordinator encounters an
+ // error, the movePrimary operation must be aborted.
+
+ uasserted(
+ 7120202,
+ "movePrimary operation on database {} failed cloning data to recipient"_format(
+ _dbName.toString()));
+ }
+
LOGV2(7120201,
"Running movePrimary operation",
"db"_attr = _dbName,
@@ -134,19 +179,6 @@ ExecutorFuture<void> MovePrimaryCoordinator::runMovePrimaryWorkflow(
unblockWritesLegacy(opCtx);
});
- if (!_firstExecution) {
- // The previous execution failed with a retryable error and the recipient may
- // have cloned part of the data. Orphaned data on recipient must be dropped and
- // the `movePrimary` operation must fail in order to delegate to the caller the
- // decision to retry the operation.
- dropOrphanedDataOnRecipient(opCtx, executor);
-
- uasserted(
- 7120202,
- "movePrimary operation on database {} failed cloning data to recipient"_format(
- _dbName.toString()));
- }
-
blockWritesLegacy(opCtx);
if (MONGO_unlikely(hangBeforeCloningData.shouldFail())) {
@@ -163,8 +195,6 @@ ExecutorFuture<void> MovePrimaryCoordinator::runMovePrimaryWorkflow(
const auto cloneResponse = cloneDataToRecipient(opCtx);
const auto cloneStatus = Shard::CommandResponse::getEffectiveStatus(cloneResponse);
if (!cloneStatus.isOK() || !checkClonedData(cloneResponse.getValue())) {
- dropOrphanedDataOnRecipient(opCtx, executor);
-
uasserted(
cloneStatus.isOK() ? 7120204 : cloneStatus.code(),
"movePrimary operation on database {} failed cloning data to recipient"_format(
@@ -191,6 +221,10 @@ ExecutorFuture<void> MovePrimaryCoordinator::runMovePrimaryWorkflow(
_updateSession(opCtx);
if (!_firstExecution) {
+ // Perform a noop write on the recipient in order to
+ // advance the txnNumber for this coordinator's logical
+ // session. This prevents requests with older txnNumbers
+ // from being processed.
_performNoopRetryableWriteOnAllShardsAndConfigsvr(
opCtx, getCurrentSession(), **executor);
}
@@ -238,6 +272,10 @@ ExecutorFuture<void> MovePrimaryCoordinator::runMovePrimaryWorkflow(
_updateSession(opCtx);
if (!_firstExecution) {
+ // Perform a noop write on the recipient in order to
+ // advance the txnNumber for this coordinator's logical
+ // session. This prevents requests with older txnNumbers
+ // from being processed.
_performNoopRetryableWriteOnAllShardsAndConfigsvr(
opCtx, getCurrentSession(), **executor);
}
@@ -257,22 +295,86 @@ ExecutorFuture<void> MovePrimaryCoordinator::runMovePrimaryWorkflow(
auto* opCtx = opCtxHolder.get();
getForwardableOpMetadata().setOn(opCtx);
- LOGV2_ERROR(7120207,
+ const auto& failedPhase = _doc.getPhase();
+ if (failedPhase == Phase::kClone || status == ErrorCodes::ShardNotFound) {
+ LOGV2_DEBUG(7392900,
+ 1,
+ "Triggering movePrimary cleanup",
+ "db"_attr = _dbName,
+ "to"_attr = _doc.getToShardId(),
+ "phase"_attr = serializePhase(failedPhase),
+ "error"_attr = redact(status));
+
+ triggerCleanup(opCtx, status);
+ }
+ });
+}
+
+ExecutorFuture<void> MovePrimaryCoordinator::_cleanupOnAbort(
+ std::shared_ptr<executor::ScopedTaskExecutor> executor,
+ const CancellationToken& token,
+ const Status& status) noexcept {
+ return ExecutorFuture<void>(**executor)
+ .then([this, executor, status, anchor = shared_from_this()] {
+ const auto opCtxHolder = cc().makeOperationContext();
+ auto* opCtx = opCtxHolder.get();
+ getForwardableOpMetadata().setOn(opCtx);
+
+ _updateSession(opCtx);
+ _performNoopRetryableWriteOnAllShardsAndConfigsvr(
+ opCtx, getCurrentSession(), **executor);
+
+ const auto& failedPhase = _doc.getPhase();
+ const auto& toShardId = _doc.getToShardId();
+
+ if (failedPhase <= Phase::kCommit) {
+ // A non-retryable error occurred before the new primary shard was actually
+ // committed, so any cloned data on the recipient must be dropped.
+
+ try {
+ // Even if the error is `ShardNotFound`, the recipient may still be in draining
+ // mode, so try to drop any orphaned data anyway.
+ dropOrphanedDataOnRecipient(opCtx, executor);
+ } catch (const ExceptionFor<ErrorCodes::ShardNotFound>&) {
+ LOGV2_INFO(7392901,
+ "Failed to remove orphaned data on recipient as it has been removed",
+ "db"_attr = _dbName,
+ "to"_attr = toShardId);
+ }
+ }
+
+ unblockReadsAndWrites(opCtx);
+ try {
+ // Even if the error is `ShardNotFound`, the recipient may still be in draining
+ // mode, so try to exit the critical section anyway.
+ exitCriticalSectionOnRecipient(opCtx);
+ } catch (const ExceptionFor<ErrorCodes::ShardNotFound>&) {
+ LOGV2_INFO(7392902,
+ "Failed to exit critical section on recipient as it has been removed",
+ "db"_attr = _dbName,
+ "to"_attr = toShardId);
+ }
+
+ LOGV2_ERROR(7392903,
"Failed movePrimary operation",
"db"_attr = _dbName,
- "to"_attr = _doc.getToShardId(),
+ "to"_attr = toShardId,
+ "phase"_attr = serializePhase(failedPhase),
"error"_attr = redact(status));
- logChange(opCtx, "error");
-
- return status;
+ logChange(opCtx, "error", status);
});
}
-void MovePrimaryCoordinator::logChange(OperationContext* opCtx, const std::string& what) const {
+void MovePrimaryCoordinator::logChange(OperationContext* opCtx,
+ const std::string& what,
+ const Status& status) const {
BSONObjBuilder details;
details.append("from", ShardingState::get(opCtx)->shardId());
details.append("to", _doc.getToShardId());
+ if (!status.isOK()) {
+ details.append("error", status.toString());
+ }
ShardingLogging::get(opCtx)->logChange(
opCtx, "movePrimary.{}"_format(what), _dbName.toString(), details.obj());
}
@@ -417,10 +519,6 @@ void MovePrimaryCoordinator::commitMetadataToConfig(
commitCommand,
Shard::RetryPolicy::kIdempotent);
- if (commitResponse == ErrorCodes::ShardNotFound) {
- unblockReadsAndWrites(opCtx);
- }
-
uassertStatusOKWithContext(
Shard::CommandResponse::getEffectiveStatus(commitResponse),
"movePrimary operation on database {} failed to commit metadata changes"_format(
@@ -554,10 +652,6 @@ void MovePrimaryCoordinator::enterCriticalSectionOnRecipient(OperationContext* o
Shard::RetryPolicy::kIdempotent);
}();
- if (enterCriticalSectionResponse == ErrorCodes::ShardNotFound) {
- unblockReadsAndWrites(opCtx);
- }
-
uassertStatusOKWithContext(
Shard::CommandResponse::getEffectiveStatus(enterCriticalSectionResponse),
"movePrimary operation on database {} failed to block read/write operations on recipient"_format(
diff --git a/src/mongo/db/s/move_primary_coordinator.h b/src/mongo/db/s/move_primary_coordinator.h
index 4dc971b1d43..58619966d94 100644
--- a/src/mongo/db/s/move_primary_coordinator.h
+++ b/src/mongo/db/s/move_primary_coordinator.h
@@ -51,8 +51,12 @@ public:
private:
StringData serializePhase(const Phase& phase) const override;
void appendCommandInfo(BSONObjBuilder* cmdInfoBuilder) const override;
+ bool _mustAlwaysMakeProgress() override;
ExecutorFuture<void> _runImpl(std::shared_ptr<executor::ScopedTaskExecutor> executor,
const CancellationToken& token) noexcept override;
+ ExecutorFuture<void> _cleanupOnAbort(std::shared_ptr<executor::ScopedTaskExecutor> executor,
+ const CancellationToken& token,
+ const Status& status) noexcept override;
ExecutorFuture<void> runMovePrimaryWorkflow(
std::shared_ptr<executor::ScopedTaskExecutor> executor,
@@ -61,7 +65,9 @@ private:
/**
* Logs in the `config.changelog` collection a specific event for `movePrimary` operations.
*/
- void logChange(OperationContext* opCtx, const std::string& what) const;
+ void logChange(OperationContext* opCtx,
+ const std::string& what,
+ const Status& status = Status::OK()) const;
/**
* Returns the list of unsharded collections for the given database. These are the collections