diff options
author | Antonio Fuschetto <antonio.fuschetto@mongodb.com> | 2023-03-23 16:03:58 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2023-03-23 17:45:04 +0000 |
commit | a0c9ffb2cee79bffb071c877d39272efad293e76 (patch) | |
tree | f01d7fa7691f2d172906fbd5babfeb52ab84f5c9 /src | |
parent | e1ba18ecc0c1a7b65aabc412b94e13e2e6fba6a8 (diff) | |
download | mongo-a0c9ffb2cee79bffb071c877d39272efad293e76.tar.gz |
SERVER-73929 Improve the resilient movePrimary's error handling
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/s/move_primary_coordinator.cpp | 156 | ||||
-rw-r--r-- | src/mongo/db/s/move_primary_coordinator.h | 8 |
2 files changed, 132 insertions, 32 deletions
diff --git a/src/mongo/db/s/move_primary_coordinator.cpp b/src/mongo/db/s/move_primary_coordinator.cpp index 5a9848338b9..45049a5380a 100644 --- a/src/mongo/db/s/move_primary_coordinator.cpp +++ b/src/mongo/db/s/move_primary_coordinator.cpp @@ -79,6 +79,15 @@ void MovePrimaryCoordinator::appendCommandInfo(BSONObjBuilder* cmdInfoBuilder) c BSON(MovePrimaryCoordinatorDocument::kToShardIdFieldName << _doc.getToShardId())); }; +bool MovePrimaryCoordinator::_mustAlwaysMakeProgress() { + stdx::lock_guard lk(_docMutex); + + // Any non-retryable errors while checking the preconditions should cause the operation to be + // terminated. Instead, in any of the subsequent phases, any non-retryable errors that do not + // trigger the cleanup procedure should cause the operation to be retried from the failed phase. + return _doc.getPhase() > Phase::kUnset; +}; + void MovePrimaryCoordinator::checkIfOptionsConflict(const BSONObj& doc) const { const auto otherDoc = MovePrimaryCoordinatorDocument::parse( IDLParserContext("MovePrimaryCoordinatorDocument"), doc); @@ -97,15 +106,39 @@ ExecutorFuture<void> MovePrimaryCoordinator::_runImpl( auto* opCtx = opCtxHolder.get(); getForwardableOpMetadata().setOn(opCtx); - if (_doc.getToShardId() == ShardingState::get(opCtx)->shardId()) { + const auto& toShardId = _doc.getToShardId(); + + if (toShardId == ShardingState::get(opCtx)->shardId()) { LOGV2(7120200, "Database already on requested primary shard", "db"_attr = _dbName, - "to"_attr = _doc.getToShardId()); + "to"_attr = toShardId); return ExecutorFuture<void>(**executor); } + const auto toShardEntry = [&] { + const auto config = Grid::get(opCtx)->shardRegistry()->getConfigShard(); + const auto findResponse = uassertStatusOK(config->exhaustiveFindOnConfig( + opCtx, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + repl::ReadConcernLevel::kMajorityReadConcern, + NamespaceString::kConfigsvrShardsNamespace, + BSON(ShardType::name() << toShardId), + BSONObj() /* No sorting */, + 1 /* Limit */)); + + uassert(ErrorCodes::ShardNotFound, + "Requested primary shard {} does not exist"_format(toShardId.toString()), + !findResponse.docs.empty()); + + return uassertStatusOK(ShardType::fromBSON(findResponse.docs.front())); + }(); + + uassert(ErrorCodes::ShardNotFound, + "Requested primary shard {} is draining"_format(toShardId.toString()), + !toShardEntry.getDraining()); + return runMovePrimaryWorkflow(executor, token); }); } @@ -121,6 +154,18 @@ ExecutorFuture<void> MovePrimaryCoordinator::runMovePrimaryWorkflow( auto* opCtx = opCtxHolder.get(); getForwardableOpMetadata().setOn(opCtx); + if (!_firstExecution) { + // The `_shardsvrCloneCatalogData` command to request the recipient to clone the + // catalog data for the given database is not idempotent. Therefore, if the + // recipient already started cloning data before the coordinator encounters an + // error, the movePrimary operation must be aborted. + + uasserted( + 7120202, + "movePrimary operation on database {} failed cloning data to recipient"_format( + _dbName.toString())); + } + LOGV2(7120201, "Running movePrimary operation", "db"_attr = _dbName, @@ -134,19 +179,6 @@ ExecutorFuture<void> MovePrimaryCoordinator::runMovePrimaryWorkflow( unblockWritesLegacy(opCtx); }); - if (!_firstExecution) { - // The previous execution failed with a retryable error and the recipient may - // have cloned part of the data. Orphaned data on recipient must be dropped and - // the `movePrimary` operation must fail in order to delegate to the caller the - // decision to retry the operation. - dropOrphanedDataOnRecipient(opCtx, executor); - - uasserted( - 7120202, - "movePrimary operation on database {} failed cloning data to recipient"_format( - _dbName.toString())); - } - blockWritesLegacy(opCtx); if (MONGO_unlikely(hangBeforeCloningData.shouldFail())) { @@ -163,8 +195,6 @@ ExecutorFuture<void> MovePrimaryCoordinator::runMovePrimaryWorkflow( const auto cloneResponse = cloneDataToRecipient(opCtx); const auto cloneStatus = Shard::CommandResponse::getEffectiveStatus(cloneResponse); if (!cloneStatus.isOK() || !checkClonedData(cloneResponse.getValue())) { - dropOrphanedDataOnRecipient(opCtx, executor); - uasserted( cloneStatus.isOK() ? 7120204 : cloneStatus.code(), "movePrimary operation on database {} failed cloning data to recipient"_format( @@ -191,6 +221,10 @@ ExecutorFuture<void> MovePrimaryCoordinator::runMovePrimaryWorkflow( _updateSession(opCtx); if (!_firstExecution) { + // Perform a noop write on the recipient in order to + // advance the txnNumber for this coordinator's logical + // session. This prevents requests with older txnNumbers + // from being processed. _performNoopRetryableWriteOnAllShardsAndConfigsvr( opCtx, getCurrentSession(), **executor); } @@ -238,6 +272,10 @@ ExecutorFuture<void> MovePrimaryCoordinator::runMovePrimaryWorkflow( _updateSession(opCtx); if (!_firstExecution) { + // Perform a noop write on the recipient in order to + // advance the txnNumber for this coordinator's logical + // session. This prevents requests with older txnNumbers + // from being processed. _performNoopRetryableWriteOnAllShardsAndConfigsvr( opCtx, getCurrentSession(), **executor); } @@ -257,22 +295,86 @@ ExecutorFuture<void> MovePrimaryCoordinator::runMovePrimaryWorkflow( auto* opCtx = opCtxHolder.get(); getForwardableOpMetadata().setOn(opCtx); - LOGV2_ERROR(7120207, + const auto& failedPhase = _doc.getPhase(); + if (failedPhase == Phase::kClone || status == ErrorCodes::ShardNotFound) { + LOGV2_DEBUG(7392900, + 1, + "Triggering movePrimary cleanup", + "db"_attr = _dbName, + "to"_attr = _doc.getToShardId(), + "phase"_attr = serializePhase(failedPhase), + "error"_attr = redact(status)); + + triggerCleanup(opCtx, status); + } + }); +} + +ExecutorFuture<void> MovePrimaryCoordinator::_cleanupOnAbort( + std::shared_ptr<executor::ScopedTaskExecutor> executor, + const CancellationToken& token, + const Status& status) noexcept { + return ExecutorFuture<void>(**executor) + .then([this, executor, status, anchor = shared_from_this()] { + const auto opCtxHolder = cc().makeOperationContext(); + auto* opCtx = opCtxHolder.get(); + getForwardableOpMetadata().setOn(opCtx); + + _updateSession(opCtx); + _performNoopRetryableWriteOnAllShardsAndConfigsvr( + opCtx, getCurrentSession(), **executor); + + const auto& failedPhase = _doc.getPhase(); + const auto& toShardId = _doc.getToShardId(); + + if (failedPhase <= Phase::kCommit) { + // A non-retryable error occurred before the new primary shard was actually + // committed, so any cloned data on the recipient must be dropped. + + try { + // Even if the error is `ShardNotFound`, the recipient may still be in draining + // mode, so try to drop any orphaned data anyway. + dropOrphanedDataOnRecipient(opCtx, executor); + } catch (const ExceptionFor<ErrorCodes::ShardNotFound>&) { + LOGV2_INFO(7392901, + "Failed to remove orphaned data on recipient as it has been removed", + "db"_attr = _dbName, + "to"_attr = toShardId); + } + } + + unblockReadsAndWrites(opCtx); + try { + // Even if the error is `ShardNotFound`, the recipient may still be in draining + // mode, so try to exit the critical section anyway. + exitCriticalSectionOnRecipient(opCtx); + } catch (const ExceptionFor<ErrorCodes::ShardNotFound>&) { + LOGV2_INFO(7392902, + "Failed to exit critical section on recipient as it has been removed", + "db"_attr = _dbName, + "to"_attr = toShardId); + } + + LOGV2_ERROR(7392903, "Failed movePrimary operation", "db"_attr = _dbName, - "to"_attr = _doc.getToShardId(), + "to"_attr = toShardId, + "phase"_attr = serializePhase(failedPhase), "error"_attr = redact(status)); - logChange(opCtx, "error"); - - return status; + logChange(opCtx, "error", status); }); } -void MovePrimaryCoordinator::logChange(OperationContext* opCtx, const std::string& what) const { +void MovePrimaryCoordinator::logChange(OperationContext* opCtx, + const std::string& what, + const Status& status) const { BSONObjBuilder details; details.append("from", ShardingState::get(opCtx)->shardId()); details.append("to", _doc.getToShardId()); + if (!status.isOK()) { + details.append("error", status.toString()); + } ShardingLogging::get(opCtx)->logChange( opCtx, "movePrimary.{}"_format(what), _dbName.toString(), details.obj()); } @@ -417,10 +519,6 @@ void MovePrimaryCoordinator::commitMetadataToConfig( commitCommand, Shard::RetryPolicy::kIdempotent); - if (commitResponse == ErrorCodes::ShardNotFound) { - unblockReadsAndWrites(opCtx); - } - uassertStatusOKWithContext( Shard::CommandResponse::getEffectiveStatus(commitResponse), "movePrimary operation on database {} failed to commit metadata changes"_format( @@ -554,10 +652,6 @@ void MovePrimaryCoordinator::enterCriticalSectionOnRecipient(OperationContext* o Shard::RetryPolicy::kIdempotent); }(); - if (enterCriticalSectionResponse == ErrorCodes::ShardNotFound) { - unblockReadsAndWrites(opCtx); - } - uassertStatusOKWithContext( Shard::CommandResponse::getEffectiveStatus(enterCriticalSectionResponse), "movePrimary operation on database {} failed to block read/write operations on recipient"_format( diff --git a/src/mongo/db/s/move_primary_coordinator.h b/src/mongo/db/s/move_primary_coordinator.h index 4dc971b1d43..58619966d94 100644 --- a/src/mongo/db/s/move_primary_coordinator.h +++ b/src/mongo/db/s/move_primary_coordinator.h @@ -51,8 +51,12 @@ public: private: StringData serializePhase(const Phase& phase) const override; void appendCommandInfo(BSONObjBuilder* cmdInfoBuilder) const override; + bool _mustAlwaysMakeProgress() override; ExecutorFuture<void> _runImpl(std::shared_ptr<executor::ScopedTaskExecutor> executor, const CancellationToken& token) noexcept override; + ExecutorFuture<void> _cleanupOnAbort(std::shared_ptr<executor::ScopedTaskExecutor> executor, + const CancellationToken& token, + const Status& status) noexcept override; ExecutorFuture<void> runMovePrimaryWorkflow( std::shared_ptr<executor::ScopedTaskExecutor> executor, @@ -61,7 +65,9 @@ private: /** * Logs in the `config.changelog` collection a specific event for `movePrimary` operations. */ - void logChange(OperationContext* opCtx, const std::string& what) const; + void logChange(OperationContext* opCtx, + const std::string& what, + const Status& status = Status::OK()) const; /** * Returns the list of unsharded collections for the given database. These are the collections |