summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/move_primary_source_manager.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/s/move_primary_source_manager.cpp')
-rw-r--r--src/mongo/db/s/move_primary_source_manager.cpp169
1 files changed, 105 insertions, 64 deletions
diff --git a/src/mongo/db/s/move_primary_source_manager.cpp b/src/mongo/db/s/move_primary_source_manager.cpp
index dfb6e44ed80..42a46229012 100644
--- a/src/mongo/db/s/move_primary_source_manager.cpp
+++ b/src/mongo/db/s/move_primary_source_manager.cpp
@@ -43,6 +43,7 @@
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/s/catalog_cache.h"
#include "mongo/s/grid.h"
+#include "mongo/s/request_types/move_primary_gen.h"
#include "mongo/util/exit.h"
#include "mongo/util/scopeguard.h"
@@ -212,6 +213,8 @@ Status MovePrimarySourceManager::commitOnConfig(OperationContext* opCtx) {
invariant(_state == kCriticalSection);
ScopeGuard scopedGuard([&] { cleanupOnError(opCtx); });
+ boost::optional<DatabaseVersion> expectedDbVersion;
+
{
AutoGetDb autoDb(opCtx, getNss().dbName(), MODE_X);
@@ -227,13 +230,13 @@ Status MovePrimarySourceManager::commitOnConfig(OperationContext* opCtx) {
// Read operations must begin to wait on the critical section just before we send the
// commit operation to the config server
dss->enterCriticalSectionCommitPhase(opCtx, dssLock, _critSecReason);
- }
- auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard();
+ expectedDbVersion = DatabaseHolder::get(opCtx)->getDbVersion(opCtx, _dbname);
+ }
auto commitStatus = [&]() {
try {
- return _commitOnConfig(opCtx);
+ return _commitOnConfig(opCtx, *expectedDbVersion);
} catch (const DBException& ex) {
return ex.toStatus();
}
@@ -241,8 +244,7 @@ Status MovePrimarySourceManager::commitOnConfig(OperationContext* opCtx) {
if (!commitStatus.isOK()) {
// Need to get the latest optime in case the refresh request goes to a secondary --
- // otherwise the read won't wait for the write that _commitOnConfig may have
- // done
+ // otherwise the read won't wait for the write that commit on config server may have done.
LOGV2(22044,
"Error occurred while committing the movePrimary. Performing a majority write "
"against the config server to obtain its latest optime: {error}",
@@ -326,79 +328,119 @@ Status MovePrimarySourceManager::commitOnConfig(OperationContext* opCtx) {
return Status::OK();
}
-Status MovePrimarySourceManager::_commitOnConfig(OperationContext* opCtx) {
- auto const configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard();
-
- auto getDatabaseEntry = [&]() {
- auto findResponse = uassertStatusOK(
- configShard->exhaustiveFindOnConfig(opCtx,
- ReadPreferenceSetting{ReadPreference::PrimaryOnly},
- repl::ReadConcernLevel::kMajorityReadConcern,
- NamespaceString::kConfigDatabasesNamespace,
- BSON(DatabaseType::kNameFieldName << _dbname),
- BSON(DatabaseType::kNameFieldName << -1),
- 1));
-
- const auto databasesVector = std::move(findResponse.docs);
- uassert(ErrorCodes::IncompatibleShardingMetadata,
- str::stream() << "Tried to find max database version for database '" << _dbname
- << "', but found no databases",
- !databasesVector.empty());
+Status MovePrimarySourceManager::_commitOnConfig(OperationContext* opCtx,
+ const DatabaseVersion& expectedDbVersion) {
+ LOGV2_DEBUG(6854100,
+ 3,
+ "Committing movePrimary",
+ "db"_attr = _dbname,
+ "fromShard"_attr = _fromShard,
+ "toShard"_attr = _toShard,
+ "expectedDbVersion"_attr = expectedDbVersion);
+
+ const auto commitStatus = [&] {
+ ConfigsvrCommitMovePrimary commitRequest(_dbname, expectedDbVersion, _toShard);
+ commitRequest.setDbName(NamespaceString::kAdminDb);
+
+ const auto commitResponse =
+ Grid::get(opCtx)->shardRegistry()->getConfigShard()->runCommandWithFixedRetryAttempts(
+ opCtx,
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly),
+ NamespaceString::kAdminDb.toString(),
+ CommandHelpers::appendMajorityWriteConcern(commitRequest.toBSON({})),
+ Shard::RetryPolicy::kIdempotent);
+
+ const auto status = Shard::CommandResponse::getEffectiveStatus(commitResponse);
+ if (status != ErrorCodes::CommandNotFound) {
+ return status;
+ }
- return DatabaseType::parse(IDLParserContext("DatabaseType"), databasesVector.front());
- };
+ LOGV2(6854101,
+ "_configsvrCommitMovePrimary command not found on config server, so try to update "
+ "the metadata document directly",
+ "db"_attr = _dbname);
- const auto dbType = getDatabaseEntry();
+ // The fallback logic is not synchronized with the removeShard command and simultaneous
+ // invocations of movePrimary and removeShard can lead to data loss.
+ return _fallbackCommitOnConfig(opCtx, expectedDbVersion);
+ }();
- if (dbType.getPrimary() == _toShard) {
- return Status::OK();
+ if (!commitStatus.isOK()) {
+ LOGV2(6854102,
+ "Error committing movePrimary",
+ "db"_attr = _dbname,
+ "error"_attr = redact(commitStatus));
+ return commitStatus;
}
- auto newDbType = dbType;
- newDbType.setPrimary(_toShard);
+ const auto updatedDbType = [&]() {
+ auto findResponse = uassertStatusOK(
+ Grid::get(opCtx)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig(
+ opCtx,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ repl::ReadConcernLevel::kMajorityReadConcern,
+ NamespaceString::kConfigDatabasesNamespace,
+ BSON(DatabaseType::kNameFieldName << _dbname),
+ BSON(DatabaseType::kNameFieldName << -1),
+ 1));
+
+ const auto databases = std::move(findResponse.docs);
+ uassert(ErrorCodes::IncompatibleShardingMetadata,
+ "Tried to find version for database {}, but found no databases"_format(_dbname),
+ !databases.empty());
+
+ return DatabaseType::parse(IDLParserContext("DatabaseType"), databases.front());
+ }();
+ tassert(6851100,
+ "Error committing movePrimary: database version went backwards",
+ updatedDbType.getVersion() > expectedDbVersion);
+ uassert(6851101,
+ "Error committing movePrimary: update of config.databases failed",
+ updatedDbType.getPrimary() != _fromShard);
- auto const currentDatabaseVersion = dbType.getVersion();
+ LOGV2_DEBUG(6854103,
+ 3,
+ "Commited movePrimary",
+ "db"_attr = _dbname,
+ "fromShard"_attr = _fromShard,
+ "toShard"_attr = _toShard,
+ "updatedDbVersion"_attr = updatedDbType.getVersion());
- newDbType.setVersion(currentDatabaseVersion.makeUpdated());
+ return Status::OK();
+}
- auto const updateQuery = [&] {
- BSONObjBuilder queryBuilder;
- queryBuilder.append(DatabaseType::kNameFieldName, _dbname);
+Status MovePrimarySourceManager::_fallbackCommitOnConfig(OperationContext* opCtx,
+ const DatabaseVersion& expectedDbVersion) {
+ const auto query = [&] {
+ BSONObjBuilder bsonBuilder;
+ bsonBuilder.append(DatabaseType::kNameFieldName, _dbname);
// Include the version in the update filter to be resilient to potential network retries and
// delayed messages.
- for (auto [fieldName, elem] : currentDatabaseVersion.toBSON()) {
- auto dottedFieldName = DatabaseType::kVersionFieldName + "." + fieldName;
- queryBuilder.appendAs(elem, dottedFieldName);
+ for (const auto [fieldName, fieldValue] : expectedDbVersion.toBSON()) {
+ const auto dottedFieldName = DatabaseType::kVersionFieldName + "." + fieldName;
+ bsonBuilder.appendAs(fieldValue, dottedFieldName);
}
- return queryBuilder.obj();
+ return bsonBuilder.obj();
}();
- auto updateStatus = Grid::get(opCtx)->catalogClient()->updateConfigDocument(
- opCtx,
- NamespaceString::kConfigDatabasesNamespace,
- updateQuery,
- newDbType.toBSON(),
- false,
- ShardingCatalogClient::kMajorityWriteConcern);
-
- if (!updateStatus.isOK()) {
- LOGV2(5448803,
- "Error committing movePrimary for {db}: {error}",
- "Error committing movePrimary",
- "db"_attr = _dbname,
- "error"_attr = redact(updateStatus.getStatus()));
- return updateStatus.getStatus();
- }
+ const auto update = [&] {
+ const auto newDbVersion = expectedDbVersion.makeUpdated();
- const auto updatedDbType = getDatabaseEntry();
- tassert(6851100,
- "Error committing movePrimary: database version went backwards",
- updatedDbType.getVersion() > currentDatabaseVersion);
- uassert(6851101,
- "Error committing movePrimary: update of `config.databases` failed",
- updatedDbType.getPrimary() != _fromShard);
+ BSONObjBuilder bsonBuilder;
+ bsonBuilder.append(DatabaseType::kPrimaryFieldName, _toShard);
+ bsonBuilder.append(DatabaseType::kVersionFieldName, newDbVersion.toBSON());
+ return BSON("$set" << bsonBuilder.obj());
+ }();
- return Status::OK();
+ return Grid::get(opCtx)
+ ->catalogClient()
+ ->updateConfigDocument(opCtx,
+ NamespaceString::kConfigDatabasesNamespace,
+ query,
+ update,
+ false,
+ ShardingCatalogClient::kMajorityWriteConcern)
+ .getStatus();
}
Status MovePrimarySourceManager::cleanStaleData(OperationContext* opCtx) {
@@ -429,7 +471,6 @@ Status MovePrimarySourceManager::cleanStaleData(OperationContext* opCtx) {
return Status::OK();
}
-
void MovePrimarySourceManager::cleanupOnError(OperationContext* opCtx) {
if (_state == kDone) {
return;