diff options
19 files changed, 216 insertions, 111 deletions
diff --git a/src/mongo/db/s/config/configsvr_move_primary_command.cpp b/src/mongo/db/s/config/configsvr_move_primary_command.cpp index 227c6b05bc5..642380a4a56 100644 --- a/src/mongo/db/s/config/configsvr_move_primary_command.cpp +++ b/src/mongo/db/s/config/configsvr_move_primary_command.cpp @@ -131,7 +131,9 @@ public: auto const catalogCache = Grid::get(opCtx)->catalogCache(); auto const shardRegistry = Grid::get(opCtx)->shardRegistry(); - auto dbType = uassertStatusOK(catalogClient->getDatabase(opCtx, dbname)).value; + auto dbType = uassertStatusOK(catalogClient->getDatabase( + opCtx, dbname, repl::ReadConcernLevel::kLocalReadConcern)) + .value; const std::string to = movePrimaryRequest.getTo().toString(); @@ -166,10 +168,13 @@ public: << " to: " << toShard->toString(); const std::string whyMessage(str::stream() << "Moving primary shard of " << dbname); + + // ReplSetDistLockManager uses local read concern and majority write concern by default. auto scopedDistLock = uassertStatusOK(catalogClient->getDistLockManager()->lock( opCtx, dbname + "-movePrimary", whyMessage, DistLockManager::kDefaultLockTimeout)); - const auto shardedColls = getAllShardedCollectionsForDb(opCtx, dbname); + const auto shardedColls = + getAllShardedCollectionsForDb(opCtx, dbname, repl::ReadConcernLevel::kLocalReadConcern); // Record start in changelog uassertStatusOK(catalogClient->logChange( @@ -217,7 +222,10 @@ public: // Update the new primary in the config server metadata { - auto dbt = uassertStatusOK(catalogClient->getDatabase(opCtx, dbname)).value; + auto dbt = + uassertStatusOK(catalogClient->getDatabase( + opCtx, dbname, repl::ReadConcernLevel::kLocalReadConcern)) + .value; dbt.setPrimary(toShard->getId()); uassertStatusOK(catalogClient->updateDatabase(opCtx, dbname, dbt)); } diff --git a/src/mongo/db/s/config/configsvr_shard_collection_command.cpp b/src/mongo/db/s/config/configsvr_shard_collection_command.cpp index d041d40aabd..8e5863b4f71 100644 --- a/src/mongo/db/s/config/configsvr_shard_collection_command.cpp +++ b/src/mongo/db/s/config/configsvr_shard_collection_command.cpp @@ -748,9 +748,11 @@ public: // Until all metadata commands are on the config server, the CatalogCache on the config // server may be stale. Read the database entry directly rather than purging and reloading // the database into the CatalogCache, which is very expensive. - auto dbType = uassertStatusOK(Grid::get(opCtx)->catalogClient()->getDatabase( - opCtx, nss.db().toString())) - .value; + auto dbType = + uassertStatusOK( + Grid::get(opCtx)->catalogClient()->getDatabase( + opCtx, nss.db().toString(), repl::ReadConcernLevel::kLocalReadConcern)) + .value; uassert(ErrorCodes::IllegalOperation, str::stream() << "sharding not enabled for db " << nss.db(), dbType.getSharded()); diff --git a/src/mongo/db/s/shard_server_catalog_cache_loader.cpp b/src/mongo/db/s/shard_server_catalog_cache_loader.cpp index 7cbf205fbde..507dc7f6ac2 100644 --- a/src/mongo/db/s/shard_server_catalog_cache_loader.cpp +++ b/src/mongo/db/s/shard_server_catalog_cache_loader.cpp @@ -308,7 +308,8 @@ void ShardServerCatalogCacheLoader::onStepUp() { std::shared_ptr<Notification<void>> ShardServerCatalogCacheLoader::getChunksSince( const NamespaceString& nss, ChunkVersion version, - stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn) { + stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn, + const repl::ReadConcernLevel& readConcern) { long long currentTerm; bool isPrimary; { diff --git a/src/mongo/db/s/shard_server_catalog_cache_loader.h b/src/mongo/db/s/shard_server_catalog_cache_loader.h index fd1655db268..6c244a97fcf 100644 --- a/src/mongo/db/s/shard_server_catalog_cache_loader.h +++ b/src/mongo/db/s/shard_server_catalog_cache_loader.h @@ -99,8 +99,9 @@ public: std::shared_ptr<Notification<void>> getChunksSince( const NamespaceString& nss, ChunkVersion version, - stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn) - override; + stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn, + const repl::ReadConcernLevel& readConcern = + repl::ReadConcernLevel::kMajorityReadConcern) override; private: // Differentiates the server's role in the replica set so that the chunk loader knows whether to diff --git a/src/mongo/s/balancer_configuration.cpp b/src/mongo/s/balancer_configuration.cpp index 413a39af5b1..15efe9e5637 100644 --- a/src/mongo/s/balancer_configuration.cpp +++ b/src/mongo/s/balancer_configuration.cpp @@ -131,9 +131,10 @@ bool BalancerConfiguration::waitForDelete() const { return _balancerSettings.waitForDelete(); } -Status BalancerConfiguration::refreshAndCheck(OperationContext* opCtx) { +Status BalancerConfiguration::refreshAndCheck(OperationContext* opCtx, + const repl::ReadConcernLevel& readConcern) { // Balancer configuration - Status balancerSettingsStatus = _refreshBalancerSettings(opCtx); + Status balancerSettingsStatus = _refreshBalancerSettings(opCtx, readConcern); if (!balancerSettingsStatus.isOK()) { return {balancerSettingsStatus.code(), str::stream() << "Failed to refresh the balancer settings due to " @@ -159,11 +160,12 @@ Status BalancerConfiguration::refreshAndCheck(OperationContext* opCtx) { return Status::OK(); } -Status BalancerConfiguration::_refreshBalancerSettings(OperationContext* opCtx) { +Status BalancerConfiguration::_refreshBalancerSettings(OperationContext* opCtx, + const repl::ReadConcernLevel& readConcern) { BalancerSettingsType settings = BalancerSettingsType::createDefault(); - auto settingsObjStatus = - Grid::get(opCtx)->catalogClient()->getGlobalSettings(opCtx, BalancerSettingsType::kKey); + auto settingsObjStatus = Grid::get(opCtx)->catalogClient()->getGlobalSettings( + opCtx, BalancerSettingsType::kKey, readConcern); if (settingsObjStatus.isOK()) { auto settingsStatus = BalancerSettingsType::fromBSON(settingsObjStatus.getValue()); if (!settingsStatus.isOK()) { diff --git a/src/mongo/s/balancer_configuration.h b/src/mongo/s/balancer_configuration.h index 71a3832f537..9a4de01c9cc 100644 --- a/src/mongo/s/balancer_configuration.h +++ b/src/mongo/s/balancer_configuration.h @@ -33,6 +33,7 @@ #include <cstdint> #include "mongo/base/disallow_copying.h" +#include "mongo/db/repl/read_concern_args.h" #include "mongo/platform/atomic_word.h" #include "mongo/s/migration_secondary_throttle_options.h" #include "mongo/stdx/mutex.h" @@ -254,14 +255,18 @@ public: * This method is thread-safe but it doesn't make sense to be called from more than one thread * at a time. */ - Status refreshAndCheck(OperationContext* opCtx); + Status refreshAndCheck( + OperationContext* opCtx, + const repl::ReadConcernLevel& readConcern = repl::ReadConcernLevel::kMajorityReadConcern); private: /** * Reloads the balancer configuration from the settings document. Fails if the settings document * cannot be read, in which case the values will remain unchanged. */ - Status _refreshBalancerSettings(OperationContext* opCtx); + Status _refreshBalancerSettings( + OperationContext* opCtx, + const repl::ReadConcernLevel& readConcern = repl::ReadConcernLevel::kMajorityReadConcern); /** * Reloads the chunk sizes configuration from the settings document. Fails if the settings diff --git a/src/mongo/s/catalog/sharding_catalog_client.h b/src/mongo/s/catalog/sharding_catalog_client.h index 2069a90e5d3..81eb5c0a238 100644 --- a/src/mongo/s/catalog/sharding_catalog_client.h +++ b/src/mongo/s/catalog/sharding_catalog_client.h @@ -143,8 +143,11 @@ public: * the failure. These are some of the known failures: * - NamespaceNotFound - database does not exist */ - virtual StatusWith<repl::OpTimeWith<DatabaseType>> getDatabase(OperationContext* opCtx, - const std::string& dbName) = 0; + virtual StatusWith<repl::OpTimeWith<DatabaseType>> getDatabase( + OperationContext* opCtx, + const std::string& dbName, + const repl::ReadConcernLevel& readConcern = + repl::ReadConcernLevel::kMajorityReadConcern) = 0; /** * Retrieves the metadata for a given collection, if it exists. @@ -157,7 +160,10 @@ public: * - NamespaceNotFound - collection does not exist */ virtual StatusWith<repl::OpTimeWith<CollectionType>> getCollection( - OperationContext* opCtx, const std::string& collNs) = 0; + OperationContext* opCtx, + const std::string& collNs, + const repl::ReadConcernLevel& readConcern = + repl::ReadConcernLevel::kMajorityReadConcern) = 0; /** * Retrieves all collections undera specified database (or in the system). @@ -174,7 +180,9 @@ public: virtual Status getCollections(OperationContext* opCtx, const std::string* dbName, std::vector<CollectionType>* collections, - repl::OpTime* optime) = 0; + repl::OpTime* optime, + const repl::ReadConcernLevel& readConcern = + repl::ReadConcernLevel::kMajorityReadConcern) = 0; /** * Drops the specified collection from the collection metadata store. @@ -307,7 +315,11 @@ public: * Returns ErrorCodes::NoMatchingDocument if no such key exists or the BSON content of the * setting otherwise. */ - virtual StatusWith<BSONObj> getGlobalSettings(OperationContext* opCtx, StringData key) = 0; + virtual StatusWith<BSONObj> getGlobalSettings( + OperationContext* opCtx, + StringData key, + const repl::ReadConcernLevel& readConcern = + repl::ReadConcernLevel::kMajorityReadConcern) = 0; /** * Returns the contents of the config.version document - containing the current cluster schema @@ -348,7 +360,9 @@ public: virtual Status insertConfigDocument(OperationContext* opCtx, const std::string& ns, const BSONObj& doc, - const WriteConcernOptions& writeConcern) = 0; + const WriteConcernOptions& writeConcern, + const repl::ReadConcernLevel& readConcern = + repl::ReadConcernLevel::kMajorityReadConcern) = 0; /** * Updates a single document in the specified namespace on the config server. The document must diff --git a/src/mongo/s/catalog/sharding_catalog_client_impl.cpp b/src/mongo/s/catalog/sharding_catalog_client_impl.cpp index 4e013100eb2..e246faa1f0c 100644 --- a/src/mongo/s/catalog/sharding_catalog_client_impl.cpp +++ b/src/mongo/s/catalog/sharding_catalog_client_impl.cpp @@ -381,7 +381,7 @@ StatusWith<ShardDrainingStatus> ShardingCatalogClientImpl::removeShard(Operation } StatusWith<repl::OpTimeWith<DatabaseType>> ShardingCatalogClientImpl::getDatabase( - OperationContext* opCtx, const std::string& dbName) { + OperationContext* opCtx, const std::string& dbName, const repl::ReadConcernLevel& readConcern) { if (!NamespaceString::validDBName(dbName, NamespaceString::DollarInDbNameBehavior::Allow)) { return {ErrorCodes::InvalidNamespace, stream() << dbName << " is not a valid db name"}; } @@ -396,12 +396,12 @@ StatusWith<repl::OpTimeWith<DatabaseType>> ShardingCatalogClientImpl::getDatabas return repl::OpTimeWith<DatabaseType>(dbt); } - auto result = _fetchDatabaseMetadata(opCtx, dbName, kConfigReadSelector); + auto result = _fetchDatabaseMetadata(opCtx, dbName, kConfigReadSelector, readConcern); if (result == ErrorCodes::NamespaceNotFound) { // If we failed to find the database metadata on the 'nearest' config server, try again // against the primary, in case the database was recently created. result = _fetchDatabaseMetadata( - opCtx, dbName, ReadPreferenceSetting{ReadPreference::PrimaryOnly}); + opCtx, dbName, ReadPreferenceSetting{ReadPreference::PrimaryOnly}, readConcern); if (!result.isOK() && (result != ErrorCodes::NamespaceNotFound)) { return {result.getStatus().code(), str::stream() << "Could not confirm non-existence of database " << dbName @@ -414,12 +414,15 @@ StatusWith<repl::OpTimeWith<DatabaseType>> ShardingCatalogClientImpl::getDatabas } StatusWith<repl::OpTimeWith<DatabaseType>> ShardingCatalogClientImpl::_fetchDatabaseMetadata( - OperationContext* opCtx, const std::string& dbName, const ReadPreferenceSetting& readPref) { + OperationContext* opCtx, + const std::string& dbName, + const ReadPreferenceSetting& readPref, + const repl::ReadConcernLevel& readConcern) { dassert(dbName != "admin" && dbName != "config"); auto findStatus = _exhaustiveFindOnConfig(opCtx, readPref, - repl::ReadConcernLevel::kMajorityReadConcern, + readConcern, NamespaceString(DatabaseType::ConfigNS), BSON(DatabaseType::name(dbName)), BSONObj(), @@ -444,10 +447,10 @@ StatusWith<repl::OpTimeWith<DatabaseType>> ShardingCatalogClientImpl::_fetchData } StatusWith<repl::OpTimeWith<CollectionType>> ShardingCatalogClientImpl::getCollection( - OperationContext* opCtx, const std::string& collNs) { + OperationContext* opCtx, const std::string& collNs, const repl::ReadConcernLevel& readConcern) { auto statusFind = _exhaustiveFindOnConfig(opCtx, kConfigReadSelector, - repl::ReadConcernLevel::kMajorityReadConcern, + readConcern, NamespaceString(CollectionType::ConfigNS), BSON(CollectionType::fullNs(collNs)), BSONObj(), @@ -482,7 +485,8 @@ StatusWith<repl::OpTimeWith<CollectionType>> ShardingCatalogClientImpl::getColle Status ShardingCatalogClientImpl::getCollections(OperationContext* opCtx, const std::string* dbName, std::vector<CollectionType>* collections, - OpTime* opTime) { + OpTime* opTime, + const repl::ReadConcernLevel& readConcern) { BSONObjBuilder b; if (dbName) { invariant(!dbName->empty()); @@ -492,7 +496,7 @@ Status ShardingCatalogClientImpl::getCollections(OperationContext* opCtx, auto findStatus = _exhaustiveFindOnConfig(opCtx, kConfigReadSelector, - repl::ReadConcernLevel::kMajorityReadConcern, + readConcern, NamespaceString(CollectionType::ConfigNS), b.obj(), BSONObj(), @@ -716,11 +720,11 @@ Status ShardingCatalogClientImpl::dropCollection(OperationContext* opCtx, return Status::OK(); } -StatusWith<BSONObj> ShardingCatalogClientImpl::getGlobalSettings(OperationContext* opCtx, - StringData key) { +StatusWith<BSONObj> ShardingCatalogClientImpl::getGlobalSettings( + OperationContext* opCtx, StringData key, const repl::ReadConcernLevel& readConcern) { auto findStatus = _exhaustiveFindOnConfig(opCtx, kConfigReadSelector, - repl::ReadConcernLevel::kMajorityReadConcern, + readConcern, kSettingsNamespace, BSON("_id" << key), BSONObj(), @@ -1155,7 +1159,8 @@ void ShardingCatalogClientImpl::writeConfigServerDirect(OperationContext* opCtx, Status ShardingCatalogClientImpl::insertConfigDocument(OperationContext* opCtx, const std::string& ns, const BSONObj& doc, - const WriteConcernOptions& writeConcern) { + const WriteConcernOptions& writeConcern, + const repl::ReadConcernLevel& readConcern) { const NamespaceString nss(ns); invariant(nss.db() == NamespaceString::kAdminDb || nss.db() == NamespaceString::kConfigDb); @@ -1194,7 +1199,7 @@ Status ShardingCatalogClientImpl::insertConfigDocument(OperationContext* opCtx, auto fetchDuplicate = _exhaustiveFindOnConfig(opCtx, ReadPreferenceSetting{ReadPreference::PrimaryOnly}, - repl::ReadConcernLevel::kMajorityReadConcern, + readConcern, nss, idField.wrap(), BSONObj(), diff --git a/src/mongo/s/catalog/sharding_catalog_client_impl.h b/src/mongo/s/catalog/sharding_catalog_client_impl.h index e184c078fff..8d7e8eb6c1d 100644 --- a/src/mongo/s/catalog/sharding_catalog_client_impl.h +++ b/src/mongo/s/catalog/sharding_catalog_client_impl.h @@ -89,16 +89,24 @@ public: StatusWith<ShardDrainingStatus> removeShard(OperationContext* opCtx, const ShardId& name) override; - StatusWith<repl::OpTimeWith<DatabaseType>> getDatabase(OperationContext* opCtx, - const std::string& dbName) override; + StatusWith<repl::OpTimeWith<DatabaseType>> getDatabase( + OperationContext* opCtx, + const std::string& dbName, + const repl::ReadConcernLevel& readConcern = + repl::ReadConcernLevel::kMajorityReadConcern) override; - StatusWith<repl::OpTimeWith<CollectionType>> getCollection(OperationContext* opCtx, - const std::string& collNs) override; + StatusWith<repl::OpTimeWith<CollectionType>> getCollection( + OperationContext* opCtx, + const std::string& collNs, + const repl::ReadConcernLevel& readConcern = + repl::ReadConcernLevel::kMajorityReadConcern) override; Status getCollections(OperationContext* opCtx, const std::string* dbName, std::vector<CollectionType>* collections, - repl::OpTime* optime) override; + repl::OpTime* optime, + const repl::ReadConcernLevel& readConcern = + repl::ReadConcernLevel::kMajorityReadConcern) override; Status dropCollection(OperationContext* opCtx, const NamespaceString& ns) override; @@ -140,7 +148,11 @@ public: const WriteConcernOptions& writeConcern, repl::ReadConcernLevel readConcern) override; - StatusWith<BSONObj> getGlobalSettings(OperationContext* opCtx, StringData key) override; + StatusWith<BSONObj> getGlobalSettings( + OperationContext* opCtx, + StringData key, + const repl::ReadConcernLevel& readConcern = + repl::ReadConcernLevel::kMajorityReadConcern) override; StatusWith<VersionType> getConfigVersion(OperationContext* opCtx, repl::ReadConcernLevel readConcern) override; @@ -152,7 +164,9 @@ public: Status insertConfigDocument(OperationContext* opCtx, const std::string& ns, const BSONObj& doc, - const WriteConcernOptions& writeConcern) override; + const WriteConcernOptions& writeConcern, + const repl::ReadConcernLevel& readConcern = + repl::ReadConcernLevel::kMajorityReadConcern) override; StatusWith<bool> updateConfigDocument(OperationContext* opCtx, const std::string& ns, @@ -241,7 +255,10 @@ private: * given read preference. Returns NamespaceNotFound if no database metadata is found. */ StatusWith<repl::OpTimeWith<DatabaseType>> _fetchDatabaseMetadata( - OperationContext* opCtx, const std::string& dbName, const ReadPreferenceSetting& readPref); + OperationContext* opCtx, + const std::string& dbName, + const ReadPreferenceSetting& readPref, + const repl::ReadConcernLevel& readConcern = repl::ReadConcernLevel::kMajorityReadConcern); /** * Best effort method, which logs diagnostic events on the config server. If the config server diff --git a/src/mongo/s/catalog/sharding_catalog_client_mock.cpp b/src/mongo/s/catalog/sharding_catalog_client_mock.cpp index 1df6313afec..e24b1f5854b 100644 --- a/src/mongo/s/catalog/sharding_catalog_client_mock.cpp +++ b/src/mongo/s/catalog/sharding_catalog_client_mock.cpp @@ -78,19 +78,20 @@ Status ShardingCatalogClientMock::updateDatabase(OperationContext* opCtx, } StatusWith<repl::OpTimeWith<DatabaseType>> ShardingCatalogClientMock::getDatabase( - OperationContext* opCtx, const string& dbName) { + OperationContext* opCtx, const string& dbName, const repl::ReadConcernLevel& readConcern) { return {ErrorCodes::InternalError, "Method not implemented"}; } StatusWith<repl::OpTimeWith<CollectionType>> ShardingCatalogClientMock::getCollection( - OperationContext* opCtx, const string& collNs) { + OperationContext* opCtx, const string& collNs, const repl::ReadConcernLevel& readConcern) { return {ErrorCodes::InternalError, "Method not implemented"}; } Status ShardingCatalogClientMock::getCollections(OperationContext* opCtx, const string* dbName, vector<CollectionType>* collections, - repl::OpTime* optime) { + repl::OpTime* optime, + const repl::ReadConcernLevel& readConcern) { return {ErrorCodes::InternalError, "Method not implemented"}; } @@ -166,8 +167,8 @@ Status ShardingCatalogClientMock::logChange(OperationContext* opCtx, return {ErrorCodes::InternalError, "Method not implemented"}; } -StatusWith<BSONObj> ShardingCatalogClientMock::getGlobalSettings(OperationContext* opCtx, - StringData key) { +StatusWith<BSONObj> ShardingCatalogClientMock::getGlobalSettings( + OperationContext* opCtx, StringData key, const repl::ReadConcernLevel& readConcern) { return {ErrorCodes::InternalError, "Method not implemented"}; } @@ -183,7 +184,8 @@ void ShardingCatalogClientMock::writeConfigServerDirect(OperationContext* opCtx, Status ShardingCatalogClientMock::insertConfigDocument(OperationContext* opCtx, const std::string& ns, const BSONObj& doc, - const WriteConcernOptions& writeConcern) { + const WriteConcernOptions& writeConcern, + const repl::ReadConcernLevel& readConcern) { return {ErrorCodes::InternalError, "Method not implemented"}; } diff --git a/src/mongo/s/catalog/sharding_catalog_client_mock.h b/src/mongo/s/catalog/sharding_catalog_client_mock.h index 6189ad67660..248c992d60f 100644 --- a/src/mongo/s/catalog/sharding_catalog_client_mock.h +++ b/src/mongo/s/catalog/sharding_catalog_client_mock.h @@ -54,16 +54,24 @@ public: const std::string& dbName, const DatabaseType& db) override; - StatusWith<repl::OpTimeWith<DatabaseType>> getDatabase(OperationContext* opCtx, - const std::string& dbName) override; + StatusWith<repl::OpTimeWith<DatabaseType>> getDatabase( + OperationContext* opCtx, + const std::string& dbName, + const repl::ReadConcernLevel& readConcern = + repl::ReadConcernLevel::kMajorityReadConcern) override; - StatusWith<repl::OpTimeWith<CollectionType>> getCollection(OperationContext* opCtx, - const std::string& collNs) override; + StatusWith<repl::OpTimeWith<CollectionType>> getCollection( + OperationContext* opCtx, + const std::string& collNs, + const repl::ReadConcernLevel& readConcern = + repl::ReadConcernLevel::kMajorityReadConcern) override; Status getCollections(OperationContext* opCtx, const std::string* dbName, std::vector<CollectionType>* collections, - repl::OpTime* optime) override; + repl::OpTime* optime, + const repl::ReadConcernLevel& readConcern = + repl::ReadConcernLevel::kMajorityReadConcern) override; Status dropCollection(OperationContext* opCtx, const NamespaceString& ns) override; @@ -116,7 +124,11 @@ public: const BSONObj& detail, const WriteConcernOptions& writeConcern) override; - StatusWith<BSONObj> getGlobalSettings(OperationContext* opCtx, StringData key) override; + StatusWith<BSONObj> getGlobalSettings( + OperationContext* opCtx, + StringData key, + const repl::ReadConcernLevel& readConcern = + repl::ReadConcernLevel::kMajorityReadConcern) override; StatusWith<VersionType> getConfigVersion(OperationContext* opCtx, repl::ReadConcernLevel readConcern) override; @@ -128,7 +140,9 @@ public: Status insertConfigDocument(OperationContext* opCtx, const std::string& ns, const BSONObj& doc, - const WriteConcernOptions& writeConcern) override; + const WriteConcernOptions& writeConcern, + const repl::ReadConcernLevel& readConcern = + repl::ReadConcernLevel::kMajorityReadConcern) override; StatusWith<bool> updateConfigDocument(OperationContext* opCtx, const std::string& ns, diff --git a/src/mongo/s/catalog/sharding_catalog_manager_collection_operations.cpp b/src/mongo/s/catalog/sharding_catalog_manager_collection_operations.cpp index 017ca2f8481..09b6a611b2a 100644 --- a/src/mongo/s/catalog/sharding_catalog_manager_collection_operations.cpp +++ b/src/mongo/s/catalog/sharding_catalog_manager_collection_operations.cpp @@ -84,7 +84,8 @@ ChunkVersion createFirstChunks(OperationContext* opCtx, const ShardKeyPattern& shardKeyPattern, const ShardId& primaryShardId, const std::vector<BSONObj>& initPoints, - const bool distributeInitialChunks) { + const bool distributeInitialChunks, + repl::ReadConcernLevel readConcern) { const KeyPattern keyPattern = shardKeyPattern.getKeyPattern(); @@ -109,7 +110,8 @@ ChunkVersion createFirstChunks(OperationContext* opCtx, // Refresh the balancer settings to ensure the chunk size setting, which is sent as part of // the splitVector command and affects the number of chunks returned, has been loaded. - uassertStatusOK(Grid::get(opCtx)->getBalancerConfiguration()->refreshAndCheck(opCtx)); + uassertStatusOK( + Grid::get(opCtx)->getBalancerConfiguration()->refreshAndCheck(opCtx, readConcern)); if (numObjects > 0) { splitPoints = uassertStatusOK(shardutil::selectChunkSplitPoints( @@ -171,7 +173,8 @@ ChunkVersion createFirstChunks(OperationContext* opCtx, opCtx, ChunkType::ConfigNS, chunk.toConfigBSON(), - ShardingCatalogClient::kMajorityWriteConcern)); + ShardingCatalogClient::kMajorityWriteConcern, + readConcern)); } return version; @@ -225,7 +228,11 @@ void ShardingCatalogManager::shardCollection(OperationContext* opCtx, const auto catalogClient = Grid::get(opCtx)->catalogClient(); const auto shardRegistry = Grid::get(opCtx)->shardRegistry(); - auto dbEntry = uassertStatusOK(catalogClient->getDatabase(opCtx, nsToDatabase(ns))).value; + auto dbEntry = + uassertStatusOK(catalogClient->getDatabase( + opCtx, nsToDatabase(ns), repl::ReadConcernLevel::kLocalReadConcern)) + .value; + auto dbPrimaryShardId = dbEntry.getPrimary(); const auto primaryShard = uassertStatusOK(shardRegistry->getShard(opCtx, dbPrimaryShardId)); @@ -260,8 +267,13 @@ void ShardingCatalogManager::shardCollection(OperationContext* opCtx, ->makeFromBSON(defaultCollation)); } - const auto& collVersion = createFirstChunks( - opCtx, nss, fieldsAndOrder, dbPrimaryShardId, initPoints, distributeInitialChunks); + const auto& collVersion = createFirstChunks(opCtx, + nss, + fieldsAndOrder, + dbPrimaryShardId, + initPoints, + distributeInitialChunks, + repl::ReadConcernLevel::kLocalReadConcern); { CollectionType coll; diff --git a/src/mongo/s/catalog_cache.cpp b/src/mongo/s/catalog_cache.cpp index 1489e0215d3..6c6065f4ac4 100644 --- a/src/mongo/s/catalog_cache.cpp +++ b/src/mongo/s/catalog_cache.cpp @@ -149,11 +149,13 @@ StatusWith<CachedDatabaseInfo> CatalogCache::getDatabase(OperationContext* opCtx } StatusWith<CachedCollectionRoutingInfo> CatalogCache::getCollectionRoutingInfo( - OperationContext* opCtx, const NamespaceString& nss) { + OperationContext* opCtx, + const NamespaceString& nss, + const repl::ReadConcernLevel& readConcern) { while (true) { std::shared_ptr<DatabaseInfoEntry> dbEntry; try { - dbEntry = _getDatabase(opCtx, nss.db()); + dbEntry = _getDatabase(opCtx, nss.db(), readConcern); } catch (const DBException& ex) { return ex.toStatus(); } @@ -185,7 +187,7 @@ StatusWith<CachedCollectionRoutingInfo> CatalogCache::getCollectionRoutingInfo( refreshNotification = (collEntry.refreshCompletionNotification = std::make_shared<Notification<Status>>()); _scheduleCollectionRefresh_inlock( - dbEntry, std::move(collEntry.routingInfo), nss, 1); + dbEntry, std::move(collEntry.routingInfo), nss, 1, readConcern); } // Wait on the notification outside of the mutex @@ -306,8 +308,8 @@ void CatalogCache::purgeAllDatabases() { _databases.clear(); } -std::shared_ptr<CatalogCache::DatabaseInfoEntry> CatalogCache::_getDatabase(OperationContext* opCtx, - StringData dbName) { +std::shared_ptr<CatalogCache::DatabaseInfoEntry> CatalogCache::_getDatabase( + OperationContext* opCtx, StringData dbName, const repl::ReadConcernLevel& readConcern) { stdx::lock_guard<stdx::mutex> lg(_mutex); auto it = _databases.find(dbName); @@ -320,14 +322,15 @@ std::shared_ptr<CatalogCache::DatabaseInfoEntry> CatalogCache::_getDatabase(Oper const auto dbNameCopy = dbName.toString(); // Load the database entry - const auto opTimeWithDb = uassertStatusOK(catalogClient->getDatabase(opCtx, dbNameCopy)); + const auto opTimeWithDb = + uassertStatusOK(catalogClient->getDatabase(opCtx, dbNameCopy, readConcern)); const auto& dbDesc = opTimeWithDb.value; // Load the sharded collections entries std::vector<CollectionType> collections; repl::OpTime collLoadConfigOptime; - uassertStatusOK( - catalogClient->getCollections(opCtx, &dbNameCopy, &collections, &collLoadConfigOptime)); + uassertStatusOK(catalogClient->getCollections( + opCtx, &dbNameCopy, &collections, &collLoadConfigOptime, readConcern)); StringMap<CollectionRoutingInfoEntry> collectionEntries; for (const auto& coll : collections) { @@ -346,14 +349,15 @@ void CatalogCache::_scheduleCollectionRefresh_inlock( std::shared_ptr<DatabaseInfoEntry> dbEntry, std::shared_ptr<ChunkManager> existingRoutingInfo, const NamespaceString& nss, - int refreshAttempt) { + int refreshAttempt, + const repl::ReadConcernLevel& readConcern) { Timer t; const ChunkVersion startingCollectionVersion = (existingRoutingInfo ? existingRoutingInfo->getVersion() : ChunkVersion::UNSHARDED()); const auto refreshFailed_inlock = - [ this, t, dbEntry, nss, refreshAttempt ](const Status& status) noexcept { + [ this, t, dbEntry, nss, refreshAttempt, readConcern ](const Status& status) noexcept { log() << "Refresh for collection " << nss << " took " << t.millis() << " ms and failed" << causedBy(redact(status)); @@ -366,7 +370,8 @@ void CatalogCache::_scheduleCollectionRefresh_inlock( // refresh again if (status == ErrorCodes::ConflictingOperationInProgress && refreshAttempt < kMaxInconsistentRoutingInfoRefreshAttempts) { - _scheduleCollectionRefresh_inlock(dbEntry, nullptr, nss, refreshAttempt + 1); + _scheduleCollectionRefresh_inlock( + dbEntry, nullptr, nss, refreshAttempt + 1, readConcern); } else { // Leave needsRefresh to true so that any subsequent get attempts will kick off // another round of refresh @@ -376,7 +381,7 @@ void CatalogCache::_scheduleCollectionRefresh_inlock( }; const auto refreshCallback = - [ this, t, dbEntry, nss, existingRoutingInfo, refreshFailed_inlock ]( + [ this, t, dbEntry, nss, existingRoutingInfo, refreshFailed_inlock, readConcern ]( OperationContext * opCtx, StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> swCollAndChunks) noexcept { std::shared_ptr<ChunkManager> newRoutingInfo; @@ -416,7 +421,7 @@ void CatalogCache::_scheduleCollectionRefresh_inlock( << startingCollectionVersion; try { - _cacheLoader.getChunksSince(nss, startingCollectionVersion, refreshCallback); + _cacheLoader.getChunksSince(nss, startingCollectionVersion, refreshCallback, readConcern); } catch (const DBException& ex) { const auto status = ex.toStatus(); diff --git a/src/mongo/s/catalog_cache.h b/src/mongo/s/catalog_cache.h index 8b81c982645..1927bbc9dcf 100644 --- a/src/mongo/s/catalog_cache.h +++ b/src/mongo/s/catalog_cache.h @@ -125,8 +125,10 @@ public: * with the primary shard for the specified database. If an error occurs loading the metadata * returns a failed status. */ - StatusWith<CachedCollectionRoutingInfo> getCollectionRoutingInfo(OperationContext* opCtx, - const NamespaceString& nss); + StatusWith<CachedCollectionRoutingInfo> getCollectionRoutingInfo( + OperationContext* opCtx, + const NamespaceString& nss, + const repl::ReadConcernLevel& readConcern = repl::ReadConcernLevel::kMajorityReadConcern); StatusWith<CachedCollectionRoutingInfo> getCollectionRoutingInfo(OperationContext* opCtx, StringData ns); @@ -203,16 +205,21 @@ private: * Ensures that the specified database is in the cache, loading it if necessary. If the database * was not in cache, all the sharded collections will be in the 'needsRefresh' state. */ - std::shared_ptr<DatabaseInfoEntry> _getDatabase(OperationContext* opCtx, StringData dbName); + std::shared_ptr<DatabaseInfoEntry> _getDatabase( + OperationContext* opCtx, + StringData dbName, + const repl::ReadConcernLevel& readConcern = repl::ReadConcernLevel::kMajorityReadConcern); /** * Non-blocking call which schedules an asynchronous refresh for the specified namespace. The * namespace must be in the 'needRefresh' state. */ - void _scheduleCollectionRefresh_inlock(std::shared_ptr<DatabaseInfoEntry> dbEntry, - std::shared_ptr<ChunkManager> existingRoutingInfo, - const NamespaceString& nss, - int refreshAttempt); + void _scheduleCollectionRefresh_inlock( + std::shared_ptr<DatabaseInfoEntry> dbEntry, + std::shared_ptr<ChunkManager> existingRoutingInfo, + const NamespaceString& nss, + int refreshAttempt, + const repl::ReadConcernLevel& readConcern = repl::ReadConcernLevel::kMajorityReadConcern); // Interface from which chunks will be retrieved CatalogCacheLoader& _cacheLoader; diff --git a/src/mongo/s/catalog_cache_loader.h b/src/mongo/s/catalog_cache_loader.h index 8f2b2d3067d..3b674568a4f 100644 --- a/src/mongo/s/catalog_cache_loader.h +++ b/src/mongo/s/catalog_cache_loader.h @@ -33,6 +33,7 @@ #include "mongo/base/disallow_copying.h" #include "mongo/base/status_with.h" #include "mongo/base/string_data.h" +#include "mongo/db/repl/read_concern_args.h" #include "mongo/s/catalog/type_chunk.h" #include "mongo/s/catalog/type_collection.h" #include "mongo/s/chunk_version.h" @@ -128,8 +129,9 @@ public: virtual std::shared_ptr<Notification<void>> getChunksSince( const NamespaceString& nss, ChunkVersion version, - stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> - callbackFn) = 0; + stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn, + const repl::ReadConcernLevel& readConcern = + repl::ReadConcernLevel::kMajorityReadConcern) = 0; /** * Only used for unit-tests, clears a previously-created catalog cache loader from the specified diff --git a/src/mongo/s/commands/cluster_commands_helpers.cpp b/src/mongo/s/commands/cluster_commands_helpers.cpp index b4f38d1ebe7..ca89dbf667f 100644 --- a/src/mongo/s/commands/cluster_commands_helpers.cpp +++ b/src/mongo/s/commands/cluster_commands_helpers.cpp @@ -407,13 +407,13 @@ bool appendEmptyResultSet(BSONObjBuilder& result, Status status, const std::stri return Command::appendCommandStatus(result, status); } -std::vector<NamespaceString> getAllShardedCollectionsForDb(OperationContext* opCtx, - StringData dbName) { +std::vector<NamespaceString> getAllShardedCollectionsForDb( + OperationContext* opCtx, StringData dbName, const repl::ReadConcernLevel& readConcern) { const auto dbNameStr = dbName.toString(); std::vector<CollectionType> collectionsOnConfig; uassertStatusOK(Grid::get(opCtx)->catalogClient()->getCollections( - opCtx, &dbNameStr, &collectionsOnConfig, nullptr)); + opCtx, &dbNameStr, &collectionsOnConfig, nullptr, readConcern)); std::vector<NamespaceString> collectionsToReturn; for (const auto& coll : collectionsOnConfig) { diff --git a/src/mongo/s/commands/cluster_commands_helpers.h b/src/mongo/s/commands/cluster_commands_helpers.h index adb41976b4e..ca23d3a285c 100644 --- a/src/mongo/s/commands/cluster_commands_helpers.h +++ b/src/mongo/s/commands/cluster_commands_helpers.h @@ -136,8 +136,10 @@ bool appendEmptyResultSet(BSONObjBuilder& result, Status status, const std::stri * * Throws exception on errors. */ -std::vector<NamespaceString> getAllShardedCollectionsForDb(OperationContext* opCtx, - StringData dbName); +std::vector<NamespaceString> getAllShardedCollectionsForDb( + OperationContext* opCtx, + StringData dbName, + const repl::ReadConcernLevel& readConcern = repl::ReadConcernLevel::kMajorityReadConcern); /** * Abstracts the common pattern of refreshing a collection and checking if it is sharded used across diff --git a/src/mongo/s/config_server_catalog_cache_loader.cpp b/src/mongo/s/config_server_catalog_cache_loader.cpp index c88f90f79a6..fa2c10f21fc 100644 --- a/src/mongo/s/config_server_catalog_cache_loader.cpp +++ b/src/mongo/s/config_server_catalog_cache_loader.cpp @@ -87,13 +87,16 @@ QueryAndSort createConfigDiffQuery(const NamespaceString& nss, ChunkVersion coll /** * Blocking method, which returns the chunks which changed since the specified version. */ -CollectionAndChangedChunks getChangedChunks(OperationContext* opCtx, - const NamespaceString& nss, - ChunkVersion sinceVersion) { +CollectionAndChangedChunks getChangedChunks( + OperationContext* opCtx, + const NamespaceString& nss, + ChunkVersion sinceVersion, + const repl::ReadConcernLevel& readConcern = repl::ReadConcernLevel::kMajorityReadConcern) { const auto catalogClient = Grid::get(opCtx)->catalogClient(); // Decide whether to do a full or partial load based on the state of the collection - const auto coll = uassertStatusOK(catalogClient->getCollection(opCtx, nss.ns())).value; + const auto coll = + uassertStatusOK(catalogClient->getCollection(opCtx, nss.ns(), readConcern)).value; uassert(ErrorCodes::NamespaceNotFound, str::stream() << "Collection " << nss.ns() << " is dropped.", !coll.getDropped()); @@ -168,24 +171,26 @@ Status ConfigServerCatalogCacheLoader::waitForCollectionVersion(OperationContext std::shared_ptr<Notification<void>> ConfigServerCatalogCacheLoader::getChunksSince( const NamespaceString& nss, ChunkVersion version, - stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn) { + stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn, + const repl::ReadConcernLevel& readConcern) { auto notify = std::make_shared<Notification<void>>(); - uassertStatusOK(_threadPool.schedule([ this, nss, version, notify, callbackFn ]() noexcept { - auto opCtx = Client::getCurrent()->makeOperationContext(); - - auto swCollAndChunks = [&]() -> StatusWith<CollectionAndChangedChunks> { - try { - return getChangedChunks(opCtx.get(), nss, version); - } catch (const DBException& ex) { - return ex.toStatus(); - } - }(); - - callbackFn(opCtx.get(), std::move(swCollAndChunks)); - notify->set(); - })); + uassertStatusOK( + _threadPool.schedule([ this, nss, version, notify, callbackFn, readConcern ]() noexcept { + auto opCtx = Client::getCurrent()->makeOperationContext(); + + auto swCollAndChunks = [&]() -> StatusWith<CollectionAndChangedChunks> { + try { + return getChangedChunks(opCtx.get(), nss, version, readConcern); + } catch (const DBException& ex) { + return ex.toStatus(); + } + }(); + + callbackFn(opCtx.get(), std::move(swCollAndChunks)); + notify->set(); + })); return notify; } diff --git a/src/mongo/s/config_server_catalog_cache_loader.h b/src/mongo/s/config_server_catalog_cache_loader.h index 1a2451628da..9c584683c6b 100644 --- a/src/mongo/s/config_server_catalog_cache_loader.h +++ b/src/mongo/s/config_server_catalog_cache_loader.h @@ -54,8 +54,9 @@ public: std::shared_ptr<Notification<void>> getChunksSince( const NamespaceString& nss, ChunkVersion version, - stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn) - override; + stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn, + const repl::ReadConcernLevel& readConcern = + repl::ReadConcernLevel::kMajorityReadConcern) override; private: // Thread pool to be used to perform metadata load |