diff options
Diffstat (limited to 'src/mongo/db/s/move_primary_source_manager.cpp')
-rw-r--r-- | src/mongo/db/s/move_primary_source_manager.cpp | 169 |
1 files changed, 105 insertions, 64 deletions
diff --git a/src/mongo/db/s/move_primary_source_manager.cpp b/src/mongo/db/s/move_primary_source_manager.cpp index dfb6e44ed80..42a46229012 100644 --- a/src/mongo/db/s/move_primary_source_manager.cpp +++ b/src/mongo/db/s/move_primary_source_manager.cpp @@ -43,6 +43,7 @@ #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/s/catalog_cache.h" #include "mongo/s/grid.h" +#include "mongo/s/request_types/move_primary_gen.h" #include "mongo/util/exit.h" #include "mongo/util/scopeguard.h" @@ -212,6 +213,8 @@ Status MovePrimarySourceManager::commitOnConfig(OperationContext* opCtx) { invariant(_state == kCriticalSection); ScopeGuard scopedGuard([&] { cleanupOnError(opCtx); }); + boost::optional<DatabaseVersion> expectedDbVersion; + { AutoGetDb autoDb(opCtx, getNss().dbName(), MODE_X); @@ -227,13 +230,13 @@ Status MovePrimarySourceManager::commitOnConfig(OperationContext* opCtx) { // Read operations must begin to wait on the critical section just before we send the // commit operation to the config server dss->enterCriticalSectionCommitPhase(opCtx, dssLock, _critSecReason); - } - auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard(); + expectedDbVersion = DatabaseHolder::get(opCtx)->getDbVersion(opCtx, _dbname); + } auto commitStatus = [&]() { try { - return _commitOnConfig(opCtx); + return _commitOnConfig(opCtx, *expectedDbVersion); } catch (const DBException& ex) { return ex.toStatus(); } @@ -241,8 +244,7 @@ Status MovePrimarySourceManager::commitOnConfig(OperationContext* opCtx) { if (!commitStatus.isOK()) { // Need to get the latest optime in case the refresh request goes to a secondary -- - // otherwise the read won't wait for the write that _commitOnConfig may have - // done + // otherwise the read won't wait for the write that commit on config server may have done. LOGV2(22044, "Error occurred while committing the movePrimary. Performing a majority write " "against the config server to obtain its latest optime: {error}", @@ -326,79 +328,119 @@ Status MovePrimarySourceManager::commitOnConfig(OperationContext* opCtx) { return Status::OK(); } -Status MovePrimarySourceManager::_commitOnConfig(OperationContext* opCtx) { - auto const configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard(); - - auto getDatabaseEntry = [&]() { - auto findResponse = uassertStatusOK( - configShard->exhaustiveFindOnConfig(opCtx, - ReadPreferenceSetting{ReadPreference::PrimaryOnly}, - repl::ReadConcernLevel::kMajorityReadConcern, - NamespaceString::kConfigDatabasesNamespace, - BSON(DatabaseType::kNameFieldName << _dbname), - BSON(DatabaseType::kNameFieldName << -1), - 1)); - - const auto databasesVector = std::move(findResponse.docs); - uassert(ErrorCodes::IncompatibleShardingMetadata, - str::stream() << "Tried to find max database version for database '" << _dbname - << "', but found no databases", - !databasesVector.empty()); +Status MovePrimarySourceManager::_commitOnConfig(OperationContext* opCtx, + const DatabaseVersion& expectedDbVersion) { + LOGV2_DEBUG(6854100, + 3, + "Committing movePrimary", + "db"_attr = _dbname, + "fromShard"_attr = _fromShard, + "toShard"_attr = _toShard, + "expectedDbVersion"_attr = expectedDbVersion); + + const auto commitStatus = [&] { + ConfigsvrCommitMovePrimary commitRequest(_dbname, expectedDbVersion, _toShard); + commitRequest.setDbName(NamespaceString::kAdminDb); + + const auto commitResponse = + Grid::get(opCtx)->shardRegistry()->getConfigShard()->runCommandWithFixedRetryAttempts( + opCtx, + ReadPreferenceSetting(ReadPreference::PrimaryOnly), + NamespaceString::kAdminDb.toString(), + CommandHelpers::appendMajorityWriteConcern(commitRequest.toBSON({})), + Shard::RetryPolicy::kIdempotent); + + const auto status = Shard::CommandResponse::getEffectiveStatus(commitResponse); + if (status != ErrorCodes::CommandNotFound) { + return status; + } - return DatabaseType::parse(IDLParserContext("DatabaseType"), databasesVector.front()); - }; + LOGV2(6854101, + "_configsvrCommitMovePrimary command not found on config server, so try to update " + "the metadata document directly", + "db"_attr = _dbname); - const auto dbType = getDatabaseEntry(); + // The fallback logic is not synchronized with the removeShard command and simultaneous + // invocations of movePrimary and removeShard can lead to data loss. + return _fallbackCommitOnConfig(opCtx, expectedDbVersion); + }(); - if (dbType.getPrimary() == _toShard) { - return Status::OK(); + if (!commitStatus.isOK()) { + LOGV2(6854102, + "Error committing movePrimary", + "db"_attr = _dbname, + "error"_attr = redact(commitStatus)); + return commitStatus; } - auto newDbType = dbType; - newDbType.setPrimary(_toShard); + const auto updatedDbType = [&]() { + auto findResponse = uassertStatusOK( + Grid::get(opCtx)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( + opCtx, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + repl::ReadConcernLevel::kMajorityReadConcern, + NamespaceString::kConfigDatabasesNamespace, + BSON(DatabaseType::kNameFieldName << _dbname), + BSON(DatabaseType::kNameFieldName << -1), + 1)); + + const auto databases = std::move(findResponse.docs); + uassert(ErrorCodes::IncompatibleShardingMetadata, + "Tried to find version for database {}, but found no databases"_format(_dbname), + !databases.empty()); + + return DatabaseType::parse(IDLParserContext("DatabaseType"), databases.front()); + }(); + tassert(6851100, + "Error committing movePrimary: database version went backwards", + updatedDbType.getVersion() > expectedDbVersion); + uassert(6851101, + "Error committing movePrimary: update of config.databases failed", + updatedDbType.getPrimary() != _fromShard); - auto const currentDatabaseVersion = dbType.getVersion(); + LOGV2_DEBUG(6854103, + 3, + "Commited movePrimary", + "db"_attr = _dbname, + "fromShard"_attr = _fromShard, + "toShard"_attr = _toShard, + "updatedDbVersion"_attr = updatedDbType.getVersion()); - newDbType.setVersion(currentDatabaseVersion.makeUpdated()); + return Status::OK(); +} - auto const updateQuery = [&] { - BSONObjBuilder queryBuilder; - queryBuilder.append(DatabaseType::kNameFieldName, _dbname); +Status MovePrimarySourceManager::_fallbackCommitOnConfig(OperationContext* opCtx, + const DatabaseVersion& expectedDbVersion) { + const auto query = [&] { + BSONObjBuilder bsonBuilder; + bsonBuilder.append(DatabaseType::kNameFieldName, _dbname); // Include the version in the update filter to be resilient to potential network retries and // delayed messages. - for (auto [fieldName, elem] : currentDatabaseVersion.toBSON()) { - auto dottedFieldName = DatabaseType::kVersionFieldName + "." + fieldName; - queryBuilder.appendAs(elem, dottedFieldName); + for (const auto [fieldName, fieldValue] : expectedDbVersion.toBSON()) { + const auto dottedFieldName = DatabaseType::kVersionFieldName + "." + fieldName; + bsonBuilder.appendAs(fieldValue, dottedFieldName); } - return queryBuilder.obj(); + return bsonBuilder.obj(); }(); - auto updateStatus = Grid::get(opCtx)->catalogClient()->updateConfigDocument( - opCtx, - NamespaceString::kConfigDatabasesNamespace, - updateQuery, - newDbType.toBSON(), - false, - ShardingCatalogClient::kMajorityWriteConcern); - - if (!updateStatus.isOK()) { - LOGV2(5448803, - "Error committing movePrimary for {db}: {error}", - "Error committing movePrimary", - "db"_attr = _dbname, - "error"_attr = redact(updateStatus.getStatus())); - return updateStatus.getStatus(); - } + const auto update = [&] { + const auto newDbVersion = expectedDbVersion.makeUpdated(); - const auto updatedDbType = getDatabaseEntry(); - tassert(6851100, - "Error committing movePrimary: database version went backwards", - updatedDbType.getVersion() > currentDatabaseVersion); - uassert(6851101, - "Error committing movePrimary: update of `config.databases` failed", - updatedDbType.getPrimary() != _fromShard); + BSONObjBuilder bsonBuilder; + bsonBuilder.append(DatabaseType::kPrimaryFieldName, _toShard); + bsonBuilder.append(DatabaseType::kVersionFieldName, newDbVersion.toBSON()); + return BSON("$set" << bsonBuilder.obj()); + }(); - return Status::OK(); + return Grid::get(opCtx) + ->catalogClient() + ->updateConfigDocument(opCtx, + NamespaceString::kConfigDatabasesNamespace, + query, + update, + false, + ShardingCatalogClient::kMajorityWriteConcern) + .getStatus(); } Status MovePrimarySourceManager::cleanStaleData(OperationContext* opCtx) { @@ -429,7 +471,6 @@ Status MovePrimarySourceManager::cleanStaleData(OperationContext* opCtx) { return Status::OK(); } - void MovePrimarySourceManager::cleanupOnError(OperationContext* opCtx) { if (_state == kDone) { return; |