diff options
Diffstat (limited to 'src')
52 files changed, 2811 insertions, 2505 deletions
diff --git a/src/mongo/db/commands/mr.cpp b/src/mongo/db/commands/mr.cpp index d7b8816891f..af0c740fc70 100644 --- a/src/mongo/db/commands/mr.cpp +++ b/src/mongo/db/commands/mr.cpp @@ -68,14 +68,11 @@ #include "mongo/db/server_options.h" #include "mongo/db/service_context.h" #include "mongo/s/catalog_cache.h" -#include "mongo/s/chunk.h" -#include "mongo/s/chunk_manager.h" #include "mongo/s/client/parallel.h" #include "mongo/s/client/shard_connection.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" #include "mongo/s/shard_key_pattern.h" -#include "mongo/s/sharding_raii.h" #include "mongo/s/stale_exception.h" #include "mongo/scripting/engine.h" #include "mongo/stdx/mutex.h" @@ -1796,7 +1793,6 @@ public: state.prepTempCollection(); ON_BLOCK_EXIT_OBJ(state, &State::dropTempCollections); - BSONList values; if (!config.outputOptions.outDB.empty()) { BSONObjBuilder loc; if (!config.outputOptions.outDB.empty()) @@ -1804,33 +1800,29 @@ public: if (!config.outputOptions.collectionName.empty()) loc.append("collection", config.outputOptions.collectionName); result.append("result", loc.obj()); - } else { - if (!config.outputOptions.collectionName.empty()) - result.append("result", config.outputOptions.collectionName); - } - - auto scopedDbStatus = ScopedShardDatabase::getExisting(opCtx, dbname); - if (!scopedDbStatus.isOK()) { - return appendCommandStatus(result, scopedDbStatus.getStatus()); + } else if (!config.outputOptions.collectionName.empty()) { + result.append("result", config.outputOptions.collectionName); } - auto confOut = scopedDbStatus.getValue().db(); + std::vector<std::shared_ptr<Chunk>> chunks; - vector<shared_ptr<Chunk>> chunks; - - if (confOut->isSharded(config.outputOptions.finalNamespace.ns())) { - shared_ptr<ChunkManager> cm = - confOut->getChunkManager(opCtx, config.outputOptions.finalNamespace.ns()); + if (config.outputOptions.outType != Config::OutputType::INMEMORY) { + auto outRoutingInfoStatus = Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo( + opCtx, config.outputOptions.finalNamespace); + if (!outRoutingInfoStatus.isOK()) { + return appendCommandStatus(result, outRoutingInfoStatus.getStatus()); + } - // Fetch result from other shards 1 chunk at a time. It would be better to do just one - // big $or query, but then the sorting would not be efficient. - const string shardName = ShardingState::get(opCtx)->getShardName(); - const ChunkMap& chunkMap = cm->getChunkMap(); + if (auto cm = outRoutingInfoStatus.getValue().cm()) { + // Fetch result from other shards 1 chunk at a time. It would be better to do just + // one big $or query, but then the sorting would not be efficient. + const string shardName = ShardingState::get(opCtx)->getShardName(); - for (ChunkMap::const_iterator it = chunkMap.begin(); it != chunkMap.end(); ++it) { - shared_ptr<Chunk> chunk = it->second; - if (chunk->getShardId() == shardName) { - chunks.push_back(chunk); + for (const auto& chunkEntry : cm->chunkMap()) { + const auto& chunk = chunkEntry.second; + if (chunk->getShardId() == shardName) { + chunks.push_back(chunk); + } } } } @@ -1839,6 +1831,8 @@ public: unsigned int index = 0; BSONObj query; BSONArrayBuilder chunkSizes; + BSONList values; + while (true) { shared_ptr<Chunk> chunk; if (chunks.size() > 0) { @@ -1855,6 +1849,7 @@ public: ParallelSortClusteredCursor cursor( servers, inputNS, Query(query).sort(sortKey), QueryOption_NoCursorTimeout); cursor.init(opCtx); + int chunkSize = 0; while (cursor.more() || !values.empty()) { @@ -1880,7 +1875,9 @@ public: state.insert(config.tempNamespace, res); else state.emit(res); + values.clear(); + if (!t.isEmpty()) values.push_back(t); } @@ -1889,6 +1886,7 @@ public: chunkSizes.append(chunk->getMin()); chunkSizes.append(chunkSize); } + if (++index >= chunks.size()) break; } @@ -1907,8 +1905,10 @@ public: countsB.append("output", outputCount); result.append("counts", countsB.obj()); - return 1; + return true; } + } mapReduceFinishCommand; -} -} + +} // namespace +} // namespace mongo diff --git a/src/mongo/db/s/balancer/balancer.cpp b/src/mongo/db/s/balancer/balancer.cpp index 60a765cf5d6..b85f08b7189 100644 --- a/src/mongo/db/s/balancer/balancer.cpp +++ b/src/mongo/db/s/balancer/balancer.cpp @@ -46,12 +46,11 @@ #include "mongo/s/balancer_configuration.h" #include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/catalog/type_chunk.h" -#include "mongo/s/client/shard.h" +#include "mongo/s/catalog_cache.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/cluster_identity_loader.h" #include "mongo/s/grid.h" #include "mongo/s/shard_util.h" -#include "mongo/s/sharding_raii.h" #include "mongo/stdx/memory.h" #include "mongo/util/exit.h" #include "mongo/util/log.h" @@ -534,18 +533,20 @@ Status Balancer::_enforceTagRanges(OperationContext* opCtx) { } for (const auto& splitInfo : chunksToSplitStatus.getValue()) { - auto scopedCMStatus = ScopedChunkManager::refreshAndGet(opCtx, splitInfo.nss); - if (!scopedCMStatus.isOK()) { - return scopedCMStatus.getStatus(); + auto routingInfoStatus = + Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh( + opCtx, splitInfo.nss); + if (!routingInfoStatus.isOK()) { + return routingInfoStatus.getStatus(); } - const auto& scopedCM = scopedCMStatus.getValue(); + auto cm = routingInfoStatus.getValue().cm(); auto splitStatus = shardutil::splitChunkAtMultiplePoints(opCtx, splitInfo.shardId, splitInfo.nss, - scopedCM.cm()->getShardKeyPattern(), + cm->getShardKeyPattern(), splitInfo.collectionVersion, ChunkRange(splitInfo.minKey, splitInfo.maxKey), splitInfo.splitKeys); @@ -613,8 +614,9 @@ int Balancer::_moveChunks(OperationContext* opCtx, void Balancer::_splitOrMarkJumbo(OperationContext* opCtx, const NamespaceString& nss, const BSONObj& minKey) { - auto scopedCM = uassertStatusOK(ScopedChunkManager::refreshAndGet(opCtx, nss)); - const auto cm = scopedCM.cm().get(); + auto routingInfo = uassertStatusOK( + Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh(opCtx, nss)); + const auto cm = routingInfo.cm().get(); auto chunk = cm->findIntersectingChunkWithSimpleCollation(minKey); diff --git a/src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.cpp b/src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.cpp index a4574dfc676..5d96be7004b 100644 --- a/src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.cpp +++ b/src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.cpp @@ -43,7 +43,6 @@ #include "mongo/s/catalog/type_tags.h" #include "mongo/s/catalog_cache.h" #include "mongo/s/grid.h" -#include "mongo/s/sharding_raii.h" #include "mongo/stdx/memory.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" @@ -72,7 +71,7 @@ StatusWith<DistributionStatus> createCollectionDistributionStatus( shardToChunksMap[stat.shardId]; } - for (const auto& entry : chunkMgr->getChunkMap()) { + for (const auto& entry : chunkMgr->chunkMap()) { const auto& chunkEntry = entry.second; ChunkType chunk; @@ -299,17 +298,16 @@ BalancerChunkSelectionPolicyImpl::selectSpecificChunkToMove(OperationContext* op return shardStatsStatus.getStatus(); } - const auto shardStats = std::move(shardStatsStatus.getValue()); - - const NamespaceString nss(chunk.getNS()); + const auto& shardStats = shardStatsStatus.getValue(); - auto scopedCMStatus = ScopedChunkManager::refreshAndGet(opCtx, nss); - if (!scopedCMStatus.isOK()) { - return scopedCMStatus.getStatus(); + auto routingInfoStatus = + Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh(opCtx, + chunk.getNS()); + if (!routingInfoStatus.isOK()) { + return routingInfoStatus.getStatus(); } - const auto& scopedCM = scopedCMStatus.getValue(); - const auto cm = scopedCM.cm().get(); + const auto cm = routingInfoStatus.getValue().cm().get(); const auto collInfoStatus = createCollectionDistributionStatus(opCtx, shardStats, cm); if (!collInfoStatus.isOK()) { @@ -331,15 +329,14 @@ Status BalancerChunkSelectionPolicyImpl::checkMoveAllowed(OperationContext* opCt auto shardStats = std::move(shardStatsStatus.getValue()); - const NamespaceString nss(chunk.getNS()); - - auto scopedCMStatus = ScopedChunkManager::refreshAndGet(opCtx, nss); - if (!scopedCMStatus.isOK()) { - return scopedCMStatus.getStatus(); + auto routingInfoStatus = + Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh(opCtx, + chunk.getNS()); + if (!routingInfoStatus.isOK()) { + return routingInfoStatus.getStatus(); } - const auto& scopedCM = scopedCMStatus.getValue(); - const auto cm = scopedCM.cm().get(); + const auto cm = routingInfoStatus.getValue().cm().get(); const auto collInfoStatus = createCollectionDistributionStatus(opCtx, shardStats, cm); if (!collInfoStatus.isOK()) { @@ -366,13 +363,13 @@ Status BalancerChunkSelectionPolicyImpl::checkMoveAllowed(OperationContext* opCt StatusWith<SplitInfoVector> BalancerChunkSelectionPolicyImpl::_getSplitCandidatesForCollection( OperationContext* opCtx, const NamespaceString& nss, const ShardStatisticsVector& shardStats) { - auto scopedCMStatus = ScopedChunkManager::refreshAndGet(opCtx, nss); - if (!scopedCMStatus.isOK()) { - return scopedCMStatus.getStatus(); + auto routingInfoStatus = + Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh(opCtx, nss); + if (!routingInfoStatus.isOK()) { + return routingInfoStatus.getStatus(); } - const auto& scopedCM = scopedCMStatus.getValue(); - const auto cm = scopedCM.cm().get(); + const auto cm = routingInfoStatus.getValue().cm().get(); const auto& shardKeyPattern = cm->getShardKeyPattern().getKeyPattern(); @@ -420,13 +417,13 @@ StatusWith<MigrateInfoVector> BalancerChunkSelectionPolicyImpl::_getMigrateCandi const NamespaceString& nss, const ShardStatisticsVector& shardStats, bool aggressiveBalanceHint) { - auto scopedCMStatus = ScopedChunkManager::refreshAndGet(opCtx, nss); - if (!scopedCMStatus.isOK()) { - return scopedCMStatus.getStatus(); + auto routingInfoStatus = + Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh(opCtx, nss); + if (!routingInfoStatus.isOK()) { + return routingInfoStatus.getStatus(); } - const auto& scopedCM = scopedCMStatus.getValue(); - const auto cm = scopedCM.cm().get(); + const auto cm = routingInfoStatus.getValue().cm().get(); const auto& shardKeyPattern = cm->getShardKeyPattern().getKeyPattern(); diff --git a/src/mongo/db/s/balancer/migration_manager.cpp b/src/mongo/db/s/balancer/migration_manager.cpp index 7f267b97e67..9c70e8458ba 100644 --- a/src/mongo/db/s/balancer/migration_manager.cpp +++ b/src/mongo/db/s/balancer/migration_manager.cpp @@ -45,10 +45,10 @@ #include "mongo/executor/task_executor_pool.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/s/catalog/sharding_catalog_client.h" +#include "mongo/s/catalog_cache.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" #include "mongo/s/move_chunk_request.h" -#include "mongo/s/sharding_raii.h" #include "mongo/util/log.h" #include "mongo/util/net/hostandport.h" #include "mongo/util/scopeguard.h" @@ -180,14 +180,16 @@ Status MigrationManager::executeManualMigration( RemoteCommandResponse remoteCommandResponse = _schedule(opCtx, migrateInfo, maxChunkSizeBytes, secondaryThrottle, waitForDelete)->get(); - auto scopedCMStatus = ScopedChunkManager::refreshAndGet(opCtx, NamespaceString(migrateInfo.ns)); - if (!scopedCMStatus.isOK()) { - return scopedCMStatus.getStatus(); + auto routingInfoStatus = + Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh( + opCtx, migrateInfo.ns); + if (!routingInfoStatus.isOK()) { + return routingInfoStatus.getStatus(); } - const auto& scopedCM = scopedCMStatus.getValue(); + auto& routingInfo = routingInfoStatus.getValue(); - auto chunk = scopedCM.cm()->findIntersectingChunkWithSimpleCollation(migrateInfo.minKey); + auto chunk = routingInfo.cm()->findIntersectingChunkWithSimpleCollation(migrateInfo.minKey); invariant(chunk); Status commandStatus = _processRemoteCommandResponse( @@ -310,18 +312,20 @@ void MigrationManager::finishRecovery(OperationContext* opCtx, auto& migrateInfos = nssAndMigrateInfos.second; invariant(!migrateInfos.empty()); - auto scopedCMStatus = ScopedChunkManager::refreshAndGet(opCtx, nss); - if (!scopedCMStatus.isOK()) { + auto routingInfoStatus = + Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh(opCtx, + nss); + if (!routingInfoStatus.isOK()) { // This shouldn't happen because the collection was intact and sharded when the previous // config primary was active and the dist locks have been held by the balancer // throughout. Abort migration recovery. log() << "Unable to reload chunk metadata for collection '" << nss << "' during balancer recovery. Abandoning recovery." - << causedBy(redact(scopedCMStatus.getStatus())); + << causedBy(redact(routingInfoStatus.getStatus())); return; } - const auto& scopedCM = scopedCMStatus.getValue(); + auto& routingInfo = routingInfoStatus.getValue(); int scheduledMigrations = 0; @@ -332,7 +336,7 @@ void MigrationManager::finishRecovery(OperationContext* opCtx, migrateInfos.pop_front(); auto chunk = - scopedCM.cm()->findIntersectingChunkWithSimpleCollation(migrationInfo.minKey); + routingInfo.cm()->findIntersectingChunkWithSimpleCollation(migrationInfo.minKey); invariant(chunk); if (chunk->getShardId() != migrationInfo.from) { diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp index 9354f60b8e1..fda5fffdcfb 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp @@ -48,7 +48,6 @@ #include "mongo/executor/task_executor.h" #include "mongo/executor/task_executor_pool.h" #include "mongo/rpc/get_status_from_command_result.h" -#include "mongo/s/chunk.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" #include "mongo/util/elapsed_tracker.h" diff --git a/src/mongo/db/s/split_vector_command.cpp b/src/mongo/db/s/split_vector_command.cpp index 73a424e5d27..b2dc34a6396 100644 --- a/src/mongo/db/s/split_vector_command.cpp +++ b/src/mongo/db/s/split_vector_command.cpp @@ -50,7 +50,6 @@ #include "mongo/db/keypattern.h" #include "mongo/db/query/internal_plans.h" #include "mongo/s/catalog/type_chunk.h" -#include "mongo/s/chunk.h" #include "mongo/util/log.h" #include "mongo/util/timer.h" diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript index f6ef30c691d..eede0298c6d 100644 --- a/src/mongo/s/SConscript +++ b/src/mongo/s/SConscript @@ -243,12 +243,10 @@ env.Library( 'chunk.cpp', 'chunk_manager.cpp', 'cluster_identity_loader.cpp', - 'config.cpp', 'config_server_client.cpp', 'grid.cpp', 'shard_util.cpp', 'sharding_egress_metadata_hook.cpp', - 'sharding_raii.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/db/audit', @@ -265,8 +263,10 @@ env.Library( env.CppUnitTest( target='catalog_cache_test', source=[ - 'chunk_manager_test.cpp', 'chunk_manager_index_bounds_test.cpp', + 'chunk_manager_query_test.cpp', + 'chunk_manager_refresh_test.cpp', + 'chunk_manager_test_fixture.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/s/catalog/sharding_catalog_test_fixture', diff --git a/src/mongo/s/catalog/type_chunk.cpp b/src/mongo/s/catalog/type_chunk.cpp index ff6df77d6f5..e5091f16954 100644 --- a/src/mongo/s/catalog/type_chunk.cpp +++ b/src/mongo/s/catalog/type_chunk.cpp @@ -132,6 +132,15 @@ bool ChunkRange::operator!=(const ChunkRange& other) const { return !(*this == other); } +ChunkType::ChunkType() = default; + +ChunkType::ChunkType(NamespaceString nss, ChunkRange range, ChunkVersion version, ShardId shardId) + : _ns(nss.ns()), + _min(range.getMin()), + _max(range.getMax()), + _version(version), + _shard(std::move(shardId)) {} + StatusWith<ChunkType> ChunkType::fromConfigBSON(const BSONObj& source) { ChunkType chunk; diff --git a/src/mongo/s/catalog/type_chunk.h b/src/mongo/s/catalog/type_chunk.h index 06a26db34be..6484f97b03b 100644 --- a/src/mongo/s/catalog/type_chunk.h +++ b/src/mongo/s/catalog/type_chunk.h @@ -32,6 +32,7 @@ #include <string> #include "mongo/bson/bsonobj.h" +#include "mongo/db/namespace_string.h" #include "mongo/s/chunk_version.h" #include "mongo/s/shard_id.h" @@ -143,6 +144,9 @@ public: static const BSONField<Date_t> DEPRECATED_lastmod; static const BSONField<OID> DEPRECATED_epoch; + ChunkType(); + ChunkType(NamespaceString nss, ChunkRange range, ChunkVersion version, ShardId shardId); + /** * Constructs a new ChunkType object from BSON that has the config server's config.chunks * collection format. diff --git a/src/mongo/s/catalog_cache.cpp b/src/mongo/s/catalog_cache.cpp index d2c8eaf5504..c3607d6053f 100644 --- a/src/mongo/s/catalog_cache.cpp +++ b/src/mongo/s/catalog_cache.cpp @@ -26,64 +26,391 @@ * it in the license file. */ +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding + #include "mongo/platform/basic.h" #include "mongo/s/catalog_cache.h" +#include "mongo/base/status.h" #include "mongo/base/status_with.h" +#include "mongo/db/query/collation/collator_factory_interface.h" +#include "mongo/db/repl/optime_with.h" #include "mongo/s/catalog/sharding_catalog_client.h" +#include "mongo/s/catalog/type_collection.h" #include "mongo/s/catalog/type_database.h" -#include "mongo/s/config.h" +#include "mongo/s/chunk_diff.h" +#include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" +#include "mongo/stdx/memory.h" +#include "mongo/util/log.h" +#include "mongo/util/timer.h" namespace mongo { +namespace { -using std::shared_ptr; -using std::string; +// How many times to try refreshing the routing info if the set of chunks loaded from the config +// server is found to be inconsistent. +const int kMaxInconsistentRoutingInfoRefreshAttempts = 3; -CatalogCache::CatalogCache() = default; +/** + * This is an adapter so we can use config diffs - mongos and mongod do them slightly differently. + * + * The mongos adapter here tracks all shards, and stores ranges by (max, Chunk) in the map. + */ +class CMConfigDiffTracker : public ConfigDiffTracker<std::shared_ptr<Chunk>> { +public: + CMConfigDiffTracker(const NamespaceString& nss, + RangeMap* currMap, + ChunkVersion* maxVersion, + MaxChunkVersionMap* maxShardVersions) + : ConfigDiffTracker<std::shared_ptr<Chunk>>( + nss.ns(), currMap, maxVersion, maxShardVersions) {} -CatalogCache::~CatalogCache() = default; + bool isTracked(const ChunkType& chunk) const final { + // Mongos tracks all shards + return true; + } -StatusWith<std::shared_ptr<DBConfig>> CatalogCache::getDatabase(OperationContext* opCtx, - StringData dbName) { - stdx::lock_guard<stdx::mutex> guard(_mutex); + bool isMinKeyIndexed() const final { + return false; + } - auto it = _databases.find(dbName); - if (it != _databases.end()) { - return it->second; + std::pair<BSONObj, std::shared_ptr<Chunk>> rangeFor(OperationContext* opCtx, + const ChunkType& chunk) const final { + return std::make_pair(chunk.getMax(), std::make_shared<Chunk>(chunk)); } - // Need to load from the store - auto status = Grid::get(opCtx)->catalogClient(opCtx)->getDatabase(opCtx, dbName.toString()); - if (!status.isOK()) { - return status.getStatus(); + ShardId shardFor(OperationContext* opCtx, const ShardId& shardId) const final { + const auto shard = + uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, shardId)); + return shard->getId(); } +}; + +} // namespace + +CatalogCache::CatalogCache() = default; + +CatalogCache::~CatalogCache() = default; + +StatusWith<CachedDatabaseInfo> CatalogCache::getDatabase(OperationContext* opCtx, + StringData dbName) { + stdx::lock_guard<stdx::mutex> lg(_mutex); - const auto& dbOpTimePair = status.getValue(); - auto db = std::make_shared<DBConfig>(dbOpTimePair.value, dbOpTimePair.opTime); try { - db->load(opCtx); - auto emplaceResult = _databases.try_emplace(dbName, std::move(db)); - return emplaceResult.first->second; + return {CachedDatabaseInfo(_getDatabase_inlock(opCtx, dbName))}; } catch (const DBException& ex) { return ex.toStatus(); } } -void CatalogCache::invalidate(StringData dbName) { - stdx::lock_guard<stdx::mutex> guard(_mutex); +StatusWith<CachedCollectionRoutingInfo> CatalogCache::getCollectionRoutingInfo( + OperationContext* opCtx, const NamespaceString& nss) { + int numRefreshAttempts = 0; - ShardedDatabasesMap::iterator it = _databases.find(dbName); - if (it != _databases.end()) { - _databases.erase(it); + while (true) { + stdx::unique_lock<stdx::mutex> ul(_mutex); + + std::shared_ptr<DatabaseInfoEntry> dbEntry; + try { + dbEntry = _getDatabase_inlock(opCtx, nss.db()); + } catch (const DBException& ex) { + return ex.toStatus(); + } + + auto& collections = dbEntry->collections; + + auto it = collections.find(nss.ns()); + if (it == collections.end()) { + auto shardStatus = + Grid::get(opCtx)->shardRegistry()->getShard(opCtx, dbEntry->primaryShardId); + if (!shardStatus.isOK()) { + return {ErrorCodes::fromInt(40371), + str::stream() << "The primary shard for collection " << nss.ns() + << " could not be loaded due to error " + << shardStatus.getStatus().toString()}; + } + + return {CachedCollectionRoutingInfo( + dbEntry->primaryShardId, nss, std::move(shardStatus.getValue()))}; + } + + auto& collEntry = it->second; + + if (collEntry.needsRefresh) { + numRefreshAttempts++; + + try { + auto newRoutingInfo = + refreshCollectionRoutingInfo(opCtx, nss, std::move(collEntry.routingInfo)); + if (newRoutingInfo == nullptr) { + collections.erase(it); + + // Loop around so we can return an "unsharded" routing info + continue; + } + + collEntry.routingInfo = std::move(newRoutingInfo); + collEntry.needsRefresh = false; + } catch (const DBException& ex) { + // It is possible that the metadata is being changed concurrently, so retry the + // refresh with a wait + if (ex.getCode() == ErrorCodes::ConflictingOperationInProgress && + numRefreshAttempts < kMaxInconsistentRoutingInfoRefreshAttempts) { + ul.unlock(); + + log() << "Metadata refresh for " << nss.ns() << " failed and will be retried" + << causedBy(redact(ex)); + + // Do the sleep outside of the mutex + sleepFor(Milliseconds(10) * numRefreshAttempts); + continue; + } + + return ex.toStatus(); + } + } + + return {CachedCollectionRoutingInfo(dbEntry->primaryShardId, collEntry.routingInfo)}; + } +} + +StatusWith<CachedCollectionRoutingInfo> CatalogCache::getCollectionRoutingInfo( + OperationContext* opCtx, StringData ns) { + return getCollectionRoutingInfo(opCtx, NamespaceString(ns)); +} + +StatusWith<CachedCollectionRoutingInfo> CatalogCache::getShardedCollectionRoutingInfoWithRefresh( + OperationContext* opCtx, const NamespaceString& nss) { + invalidateShardedCollection(nss); + + auto routingInfoStatus = getCollectionRoutingInfo(opCtx, nss); + if (routingInfoStatus.isOK() && !routingInfoStatus.getValue().cm()) { + return {ErrorCodes::NamespaceNotSharded, + str::stream() << "Collection " << nss.ns() << " is not sharded."}; } + + return routingInfoStatus; } -void CatalogCache::invalidateAll() { - stdx::lock_guard<stdx::mutex> guard(_mutex); +StatusWith<CachedCollectionRoutingInfo> CatalogCache::getShardedCollectionRoutingInfoWithRefresh( + OperationContext* opCtx, StringData ns) { + return getShardedCollectionRoutingInfoWithRefresh(opCtx, NamespaceString(ns)); +} + +void CatalogCache::onStaleConfigError(CachedCollectionRoutingInfo&& ccrt) { + if (!ccrt._cm) { + // Here we received a stale config error for a collection which we previously thought was + // unsharded. + invalidateShardedCollection(ccrt._nss); + return; + } + // Here we received a stale config error for a collection which we previously though was sharded + stdx::lock_guard<stdx::mutex> lg(_mutex); + + auto it = _databases.find(NamespaceString(ccrt._cm->getns()).db()); + if (it == _databases.end()) { + // If the database does not exist, the collection must have been dropped so there is + // nothing to invalidate. The getCollectionRoutingInfo will handle the reload of the + // entire database and its collections. + return; + } + + auto& collections = it->second->collections; + + auto itColl = collections.find(ccrt._cm->getns()); + if (itColl == collections.end()) { + // If the collection does not exist, this means it must have been dropped since the last + // time we retrieved a cache entry for it. Doing nothing in this case will cause the + // next call to getCollectionRoutingInfo to return an unsharded collection. + return; + } else if (itColl->second.routingInfo->getVersion() == ccrt._cm->getVersion()) { + // If the versions match, the last version of the routing information that we used is no + // longer valid, so trigger a refresh. + itColl->second.needsRefresh = true; + } +} + +void CatalogCache::invalidateShardedCollection(const NamespaceString& nss) { + stdx::lock_guard<stdx::mutex> lg(_mutex); + + auto it = _databases.find(nss.db()); + if (it == _databases.end()) { + return; + } + + it->second->collections[nss.ns()].needsRefresh = true; +} + +void CatalogCache::invalidateShardedCollection(StringData ns) { + invalidateShardedCollection(NamespaceString(ns)); +} + +void CatalogCache::purgeDatabase(StringData dbName) { + stdx::lock_guard<stdx::mutex> lg(_mutex); + + auto it = _databases.find(dbName); + if (it == _databases.end()) { + return; + } + + _databases.erase(it); +} + +void CatalogCache::purgeAllDatabases() { + stdx::lock_guard<stdx::mutex> lg(_mutex); _databases.clear(); } +std::shared_ptr<ChunkManager> CatalogCache::refreshCollectionRoutingInfo( + OperationContext* opCtx, + const NamespaceString& nss, + std::shared_ptr<ChunkManager> existingRoutingInfo) { + Timer t; + + const auto catalogClient = Grid::get(opCtx)->catalogClient(opCtx); + + // Decide whether to do a full or partial load based on the state of the collection + auto collStatus = catalogClient->getCollection(opCtx, nss.ns()); + if (collStatus == ErrorCodes::NamespaceNotFound) { + return nullptr; + } + + const auto coll = uassertStatusOK(std::move(collStatus)).value; + if (coll.getDropped()) { + return nullptr; + } + + ChunkVersion startingCollectionVersion; + ChunkMap chunkMap = + SimpleBSONObjComparator::kInstance.makeBSONObjIndexedMap<std::shared_ptr<Chunk>>(); + + if (!existingRoutingInfo) { + // If we don't have a basis chunk manager, do a full refresh + startingCollectionVersion = ChunkVersion(0, 0, coll.getEpoch()); + } else if (existingRoutingInfo->getVersion().epoch() != coll.getEpoch()) { + // If the collection's epoch has changed, do a full refresh + startingCollectionVersion = ChunkVersion(0, 0, coll.getEpoch()); + } else { + // Otherwise do a partial refresh + startingCollectionVersion = existingRoutingInfo->getVersion(); + chunkMap = existingRoutingInfo->chunkMap(); + } + + log() << "Refreshing chunks based on version " << startingCollectionVersion; + + // Diff tracker should *always* find at least one chunk if collection exists + const auto diffQuery = + CMConfigDiffTracker::createConfigDiffQuery(nss, startingCollectionVersion); + + // Query the chunks which have changed + std::vector<ChunkType> newChunks; + repl::OpTime opTime; + uassertStatusOK(Grid::get(opCtx)->catalogClient(opCtx)->getChunks( + opCtx, + diffQuery.query, + diffQuery.sort, + boost::none, + &newChunks, + &opTime, + repl::ReadConcernLevel::kMajorityReadConcern)); + + ChunkVersion collectionVersion = startingCollectionVersion; + + ShardVersionMap unusedShardVersions; + CMConfigDiffTracker differ(nss, &chunkMap, &collectionVersion, &unusedShardVersions); + + const int diffsApplied = differ.calculateConfigDiff(opCtx, newChunks); + + if (diffsApplied < 1) { + log() << "Refresh took " << t.millis() << " ms and failed because the collection's " + "sharding metadata either changed in between or " + "became corrupted"; + + uasserted(ErrorCodes::ConflictingOperationInProgress, + "Collection sharding status changed during refresh or became corrupted"); + } + + // If at least one diff was applied, the metadata is correct, but it might not have changed so + // in this case there is no need to recreate the chunk manager. + // + // NOTE: In addition to the above statement, it is also important that we return the same chunk + // manager object, because the write commands' code relies on changes of the chunk manager's + // sequence number to detect batch writes not making progress because of chunks moving across + // shards too frequently. + if (collectionVersion == startingCollectionVersion) { + log() << "Refresh took " << t.millis() << " ms and didn't find any metadata changes"; + + return existingRoutingInfo; + } + + std::unique_ptr<CollatorInterface> defaultCollator; + if (!coll.getDefaultCollation().isEmpty()) { + // The collation should have been validated upon collection creation + defaultCollator = uassertStatusOK(CollatorFactoryInterface::get(opCtx->getServiceContext()) + ->makeFromBSON(coll.getDefaultCollation())); + } + + log() << "Refresh took " << t.millis() << " ms and found version " << collectionVersion; + + return stdx::make_unique<ChunkManager>(nss, + coll.getKeyPattern(), + std::move(defaultCollator), + coll.getUnique(), + std::move(chunkMap), + collectionVersion); +} + +std::shared_ptr<CatalogCache::DatabaseInfoEntry> CatalogCache::_getDatabase_inlock( + OperationContext* opCtx, StringData dbName) { + auto it = _databases.find(dbName); + if (it != _databases.end()) { + return it->second; + } + + const auto catalogClient = Grid::get(opCtx)->catalogClient(opCtx); + + const auto dbNameCopy = dbName.toString(); + + // Load the database entry + const auto opTimeWithDb = uassertStatusOK(catalogClient->getDatabase(opCtx, dbNameCopy)); + const auto& dbDesc = opTimeWithDb.value; + + // Load the sharded collections entries + std::vector<CollectionType> collections; + repl::OpTime collLoadConfigOptime; + uassertStatusOK( + catalogClient->getCollections(opCtx, &dbNameCopy, &collections, &collLoadConfigOptime)); + + StringMap<CollectionRoutingInfoEntry> collectionEntries; + for (const auto& coll : collections) { + collectionEntries[coll.getNs().ns()].needsRefresh = true; + } + + return _databases[dbName] = std::shared_ptr<DatabaseInfoEntry>(new DatabaseInfoEntry{ + dbDesc.getPrimary(), dbDesc.getSharded(), std::move(collectionEntries)}); +} + +CachedDatabaseInfo::CachedDatabaseInfo(std::shared_ptr<CatalogCache::DatabaseInfoEntry> db) + : _db(std::move(db)) {} + +const ShardId& CachedDatabaseInfo::primaryId() const { + return _db->primaryShardId; +} + +bool CachedDatabaseInfo::shardingEnabled() const { + return _db->shardingEnabled; +} + +CachedCollectionRoutingInfo::CachedCollectionRoutingInfo(ShardId primaryId, + std::shared_ptr<ChunkManager> cm) + : _primaryId(std::move(primaryId)), _cm(std::move(cm)) {} + +CachedCollectionRoutingInfo::CachedCollectionRoutingInfo(ShardId primaryId, + NamespaceString nss, + std::shared_ptr<Shard> primary) + : _primaryId(std::move(primaryId)), _nss(std::move(nss)), _primary(std::move(primary)) {} + } // namespace mongo diff --git a/src/mongo/s/catalog_cache.h b/src/mongo/s/catalog_cache.h index 0e63f94b52a..528b2df4673 100644 --- a/src/mongo/s/catalog_cache.h +++ b/src/mongo/s/catalog_cache.h @@ -28,19 +28,20 @@ #pragma once -#include <memory> - #include "mongo/base/disallow_copying.h" #include "mongo/base/string_data.h" +#include "mongo/s/chunk_manager.h" +#include "mongo/s/chunk_version.h" +#include "mongo/s/client/shard.h" #include "mongo/stdx/mutex.h" +#include "mongo/util/concurrency/notification.h" #include "mongo/util/string_map.h" namespace mongo { -class DBConfig; +class CachedDatabaseInfo; +class CachedCollectionRoutingInfo; class OperationContext; -template <typename T> -class StatusWith; /** * This is the root of the "read-only" hierarchy of cached catalog metadata. It is read only @@ -62,26 +63,184 @@ public: * * Returns the database cache entry if the database exists or a failed status otherwise. */ - StatusWith<std::shared_ptr<DBConfig>> getDatabase(OperationContext* opCtx, StringData dbName); + StatusWith<CachedDatabaseInfo> getDatabase(OperationContext* opCtx, StringData dbName); + + /** + * Blocking shortcut method to get a specific sharded collection from a given database using the + * complete namespace. If the collection is sharded returns a ScopedChunkManager initialized + * with ChunkManager. If the collection is not sharded, returns a ScopedChunkManager initialized + * with the primary shard for the specified database. If an error occurs loading the metadata + * returns a failed status. + */ + StatusWith<CachedCollectionRoutingInfo> getCollectionRoutingInfo(OperationContext* opCtx, + const NamespaceString& nss); + StatusWith<CachedCollectionRoutingInfo> getCollectionRoutingInfo(OperationContext* opCtx, + StringData ns); + + /** + * Same as getCollectionRoutingInfo above, but in addition causes the namespace to be refreshed + * and returns a NamespaceNotSharded error if the collection is not sharded. + */ + StatusWith<CachedCollectionRoutingInfo> getShardedCollectionRoutingInfoWithRefresh( + OperationContext* opCtx, const NamespaceString& nss); + StatusWith<CachedCollectionRoutingInfo> getShardedCollectionRoutingInfoWithRefresh( + OperationContext* opCtx, StringData ns); + + /** + * Non-blocking method to be called whenever using the specified routing table has encountered a + * stale config exception. Returns immediately and causes the routing table to be refreshed the + * next time getCollectionRoutingInfo is called. Does nothing if the routing table has been + * refreshed already. + */ + void onStaleConfigError(CachedCollectionRoutingInfo&&); + + /** + * Non-blocking method, which indiscriminately causes the routing table for the specified + * namespace to be refreshed the next time getCollectionRoutingInfo is called. + */ + void invalidateShardedCollection(const NamespaceString& nss); + void invalidateShardedCollection(StringData ns); + + /** + * Blocking method, which removes the entire specified database (including its collections) from + * the cache. + */ + void purgeDatabase(StringData dbName); /** - * Removes the database information for the specified name from the cache, so that the - * next time getDatabase is called, it will be reloaded. + * Blocking method, which removes all databases (including their collections) from the cache. */ - void invalidate(StringData dbName); + void purgeAllDatabases(); /** - * Purges all cached database information, which will cause the data to be reloaded again. + * Blocking method, which refreshes the routing information for the specified collection. If + * 'existingRoutingInfo' has been specified uses this as a basis to perform an 'incremental' + * refresh, which only fetches the chunks which changed. Otherwise does a full refresh, fetching + * all the chunks for the collection. + * + * Returns the refreshed routing information if the collection is still sharded or nullptr if it + * is not. If refresh fails for any reason, throws a DBException. + * + * With the exception of ConflictingOperationInProgress, error codes thrown from this method are + * final in that there is nothing that can be done to remedy them other than pass the error to + * the user. + * + * ConflictingOperationInProgress indicates that the chunk metadata was found to be + * inconsistent. Since this may be transient, due to the collection being dropped or recreated, + * the caller must retry the reload up to some configurable number of attempts. + * + * NOTE: Should never be called directly and is exposed as public for testing purposes only. */ - void invalidateAll(); + static std::shared_ptr<ChunkManager> refreshCollectionRoutingInfo( + OperationContext* opCtx, + const NamespaceString& nss, + std::shared_ptr<ChunkManager> existingRoutingInfo); private: - using ShardedDatabasesMap = StringMap<std::shared_ptr<DBConfig>>; + // Make the cache entries friends so they can access the private classes below + friend class CachedDatabaseInfo; + friend class CachedCollectionRoutingInfo; + + /** + * Cache entry describing a collection. + */ + struct CollectionRoutingInfoEntry { + std::shared_ptr<ChunkManager> routingInfo; + + bool needsRefresh{true}; + }; + + /** + * Cache entry describing a database. + */ + struct DatabaseInfoEntry { + ShardId primaryShardId; + + bool shardingEnabled; + + StringMap<CollectionRoutingInfoEntry> collections; + }; + + using DatabaseInfoMap = StringMap<std::shared_ptr<DatabaseInfoEntry>>; + + /** + * Ensures that the specified database is in the cache, loading it if necessary. If the database + * was not in cache, all the sharded collections will be in the 'needsRefresh' state. + */ + std::shared_ptr<DatabaseInfoEntry> _getDatabase_inlock(OperationContext* opCtx, + StringData dbName); // Mutex to serialize access to the structures below stdx::mutex _mutex; - ShardedDatabasesMap _databases; + // Map from DB name to the info for that database + DatabaseInfoMap _databases; +}; + +/** + * Constructed exclusively by the CatalogCache, contains a reference to the cached information for + * the specified database. + */ +class CachedDatabaseInfo { +public: + const ShardId& primaryId() const; + + bool shardingEnabled() const; + +private: + friend class CatalogCache; + + CachedDatabaseInfo(std::shared_ptr<CatalogCache::DatabaseInfoEntry> db); + + std::shared_ptr<CatalogCache::DatabaseInfoEntry> _db; +}; + +/** + * Constructed exclusively by the CatalogCache contains a reference to the routing information for + * the specified collection. + */ +class CachedCollectionRoutingInfo { +public: + /** + * Returns the ID of the primary shard for the database owining this collection, regardless of + * whether it is sharded or not. + */ + const ShardId& primaryId() const { + return _primaryId; + } + + /** + * If the collection is sharded, returns a chunk manager for it. Otherwise, nullptr. + */ + std::shared_ptr<ChunkManager> cm() const { + return _cm; + } + + /** + * If the collection is not sharded, returns its primary shard. Otherwise, nullptr. + */ + std::shared_ptr<Shard> primary() const { + return _primary; + } + +private: + friend class CatalogCache; + + CachedCollectionRoutingInfo(ShardId primaryId, std::shared_ptr<ChunkManager> cm); + + CachedCollectionRoutingInfo(ShardId primaryId, + NamespaceString nss, + std::shared_ptr<Shard> primary); + + // The id of the primary shard containing the database + ShardId _primaryId; + + // Reference to the corresponding chunk manager (if sharded) or null + std::shared_ptr<ChunkManager> _cm; + + // Reference to the primary of the database (if not sharded) or null + NamespaceString _nss; + std::shared_ptr<Shard> _primary; }; } // namespace mongo diff --git a/src/mongo/s/chunk_diff.cpp b/src/mongo/s/chunk_diff.cpp index f21555043ad..07d569a503d 100644 --- a/src/mongo/s/chunk_diff.cpp +++ b/src/mongo/s/chunk_diff.cpp @@ -105,7 +105,7 @@ int ConfigDiffTracker<ValType>::calculateConfigDiff(OperationContext* opCtx, // Store epoch now so it doesn't change when we change max OID currEpoch = _maxVersion->epoch(); - _validDiffs = 0; + int validDiffs = 0; for (const ChunkType& chunk : chunks) { const ChunkVersion& chunkVersion = chunk.getVersion(); @@ -121,7 +121,7 @@ int ConfigDiffTracker<ValType>::calculateConfigDiff(OperationContext* opCtx, return -1; } - _validDiffs++; + validDiffs++; // Get max changed version and chunk version if (chunkVersion > *_maxVersion) { @@ -151,7 +151,7 @@ int ConfigDiffTracker<ValType>::calculateConfigDiff(OperationContext* opCtx, } } - LOG(3) << "found " << _validDiffs << " new chunks for collection " << _ns << " (tracking " + LOG(3) << "found " << validDiffs << " new chunks for collection " << _ns << " (tracking " << newTracked.size() << "), new version is " << *_maxVersion; for (const ChunkType& chunk : newTracked) { @@ -167,7 +167,7 @@ int ConfigDiffTracker<ValType>::calculateConfigDiff(OperationContext* opCtx, _currMap->insert(rangeFor(opCtx, chunk)); } - return _validDiffs; + return validDiffs; } ConfigDiffTrackerBase::QueryAndSort ConfigDiffTrackerBase::createConfigDiffQuery( diff --git a/src/mongo/s/chunk_diff.h b/src/mongo/s/chunk_diff.h index 0cea9fa678a..5910cc93eed 100644 --- a/src/mongo/s/chunk_diff.h +++ b/src/mongo/s/chunk_diff.h @@ -93,12 +93,8 @@ public: RangeMap* currMap, ChunkVersion* maxVersion, MaxChunkVersionMap* maxShardVersions); - virtual ~ConfigDiffTracker(); - // Call after load for more information - int numValidDiffs() const { - return _validDiffs; - } + virtual ~ConfigDiffTracker(); // Applies changes to the config data from a vector of chunks passed in. Also includes minor // version changes for particular major-version chunks if explicitly specified. @@ -135,9 +131,6 @@ private: RangeMap* const _currMap; ChunkVersion* const _maxVersion; MaxChunkVersionMap* const _maxShardVersions; - - // Store for later use - int _validDiffs{0}; }; } // namespace mongo diff --git a/src/mongo/s/chunk_manager.cpp b/src/mongo/s/chunk_manager.cpp index 20cfd7e098f..047bfa1d696 100644 --- a/src/mongo/s/chunk_manager.cpp +++ b/src/mongo/s/chunk_manager.cpp @@ -32,296 +32,50 @@ #include "mongo/s/chunk_manager.h" -#include <boost/next_prior.hpp> #include <vector> #include "mongo/bson/simple_bsonobj_comparator.h" -#include "mongo/client/read_preference.h" #include "mongo/db/matcher/extensions_callback_noop.h" #include "mongo/db/query/collation/collation_index_key.h" #include "mongo/db/query/index_bounds_builder.h" #include "mongo/db/query/query_planner.h" #include "mongo/db/query/query_planner_common.h" -#include "mongo/s/catalog/sharding_catalog_client.h" -#include "mongo/s/chunk_diff.h" -#include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" #include "mongo/util/log.h" -#include "mongo/util/timer.h" namespace mongo { - -using std::map; -using std::pair; -using std::set; -using std::shared_ptr; -using std::string; -using std::unique_ptr; - namespace { // Used to generate sequence numbers to assign to each newly created ChunkManager AtomicUInt32 nextCMSequenceNumber(0); -/** - * This is an adapter so we can use config diffs - mongos and mongod do them slightly differently. - * - * The mongos adapter here tracks all shards, and stores ranges by (max, Chunk) in the map. - */ -class CMConfigDiffTracker : public ConfigDiffTracker<shared_ptr<Chunk>> { -public: - CMConfigDiffTracker(const std::string& ns, - RangeMap* currMap, - ChunkVersion* maxVersion, - MaxChunkVersionMap* maxShardVersions, - ChunkManager* manager) - : ConfigDiffTracker<shared_ptr<Chunk>>(ns, currMap, maxVersion, maxShardVersions), - _manager(manager) {} - - bool isTracked(const ChunkType& chunk) const final { - // Mongos tracks all shards - return true; - } - - bool isMinKeyIndexed() const final { - return false; - } - - pair<BSONObj, shared_ptr<Chunk>> rangeFor(OperationContext* opCtx, - const ChunkType& chunk) const final { - return std::make_pair(chunk.getMax(), std::make_shared<Chunk>(chunk)); - } - - ShardId shardFor(OperationContext* opCtx, const ShardId& shardId) const final { - const auto shard = - uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, shardId)); - return shard->getId(); - } - -private: - ChunkManager* const _manager; -}; - -bool allOfType(BSONType type, const BSONObj& o) { - BSONObjIterator it(o); - while (it.more()) { - if (it.next().type() != type) { - return false; - } - } - return true; -} - -bool isChunkMapValid(const ChunkMap& chunkMap) { -#define ENSURE(x) \ - do { \ - if (!(x)) { \ - log() << "ChunkManager::_isValid failed: " #x; \ - return false; \ - } \ - } while (0) - - if (chunkMap.empty()) { - return true; - } - - // Check endpoints - ENSURE(allOfType(MinKey, chunkMap.begin()->second->getMin())); - ENSURE(allOfType(MaxKey, boost::prior(chunkMap.end())->second->getMax())); - - // Make sure there are no gaps or overlaps - for (ChunkMap::const_iterator it = boost::next(chunkMap.begin()), end = chunkMap.end(); - it != end; - ++it) { - ChunkMap::const_iterator last = boost::prior(it); - - if (SimpleBSONObjComparator::kInstance.evaluate(it->second->getMin() != - last->second->getMax())) { - log() << last->second->toString(); - log() << it->second->toString(); - log() << it->second->getMin(); - log() << last->second->getMax(); - } - - ENSURE(SimpleBSONObjComparator::kInstance.evaluate(it->second->getMin() == - last->second->getMax())); +void checkAllElementsAreOfType(BSONType type, const BSONObj& o) { + for (const auto&& element : o) { + uassert(ErrorCodes::ConflictingOperationInProgress, + str::stream() << "Not all elements of " << o << " are of type " << typeName(type), + element.type() == type); } - - return true; - -#undef ENSURE } } // namespace ChunkManager::ChunkManager(NamespaceString nss, - const OID& epoch, - const ShardKeyPattern& shardKeyPattern, + KeyPattern shardKeyPattern, std::unique_ptr<CollatorInterface> defaultCollator, - bool unique) + bool unique, + ChunkMap chunkMap, + ChunkVersion collectionVersion) : _sequenceNumber(nextCMSequenceNumber.addAndFetch(1)), _nss(std::move(nss)), - _keyPattern(shardKeyPattern.getKeyPattern()), + _shardKeyPattern(shardKeyPattern), _defaultCollator(std::move(defaultCollator)), _unique(unique), - _chunkMap(SimpleBSONObjComparator::kInstance.makeBSONObjIndexedMap<std::shared_ptr<Chunk>>()), - _chunkRangeMap( - SimpleBSONObjComparator::kInstance.makeBSONObjIndexedMap<ShardAndChunkRange>()), - _version(0, 0, epoch) {} + _chunkMap(std::move(chunkMap)), + _chunkMapViews(_constructChunkMapViews(collectionVersion.epoch(), _chunkMap)), + _collectionVersion(collectionVersion) {} ChunkManager::~ChunkManager() = default; -void ChunkManager::loadExistingRanges(OperationContext* opCtx, const ChunkManager* oldManager) { - invariant(!_version.isSet()); - - int tries = 3; - - while (tries--) { - ChunkMap chunkMap = - SimpleBSONObjComparator::kInstance.makeBSONObjIndexedMap<std::shared_ptr<Chunk>>(); - set<ShardId> shardIds; - ShardVersionMap shardVersions; - - Timer t; - - log() << "ChunkManager loading chunks for " << _nss - << " sequenceNumber: " << _sequenceNumber - << " based on: " << (oldManager ? oldManager->getVersion().toString() : "(empty)"); - - if (_load(opCtx, chunkMap, shardIds, &shardVersions, oldManager)) { - // TODO: Merge into diff code above, so we validate in one place - if (isChunkMapValid(chunkMap)) { - _chunkMap = std::move(chunkMap); - _shardVersions = std::move(shardVersions); - _chunkRangeMap = _constructRanges(_chunkMap); - - log() << "ChunkManager load took " << t.millis() << " ms and found version " - << _version; - - return; - } - } - - warning() << "ChunkManager load failed after " << t.millis() - << " ms and will be retried up to " << tries << " more times"; - - sleepmillis(10 * (3 - tries)); - } - - // This will abort construction so we should never have a reference to an invalid config - msgasserted(13282, - str::stream() << "Couldn't load a valid config for " << _nss.ns() - << " after 3 attempts. Please try again."); -} - -bool ChunkManager::_load(OperationContext* opCtx, - ChunkMap& chunkMap, - set<ShardId>& shardIds, - ShardVersionMap* shardVersions, - const ChunkManager* oldManager) { - // Reset the max version, but not the epoch, when we aren't loading from the oldManager - _version = ChunkVersion(0, 0, _version.epoch()); - - // If we have a previous version of the ChunkManager to work from, use that info to reduce - // our config query - if (oldManager && oldManager->getVersion().isSet()) { - // Get the old max version - _version = oldManager->getVersion(); - - // Load a copy of the old versions - *shardVersions = oldManager->_shardVersions; - - // Load a copy of the chunk map, replacing the chunk manager with our own - const ChunkMap& oldChunkMap = oldManager->getChunkMap(); - - for (const auto& oldChunkMapEntry : oldChunkMap) { - const auto& oldC = oldChunkMapEntry.second; - chunkMap.emplace(oldC->getMax(), std::make_shared<Chunk>(*oldC)); - } - - LOG(2) << "loading chunk manager for collection " << _nss - << " using old chunk manager w/ version " << _version.toString() << " and " - << oldChunkMap.size() << " chunks"; - } - - // Get the diff query required - const auto diffQuery = CMConfigDiffTracker::createConfigDiffQuery(_nss, _version); - - // Attach a diff tracker for the versioned chunk data - CMConfigDiffTracker differ(_nss.ns(), &chunkMap, &_version, shardVersions, this); - - // Diff tracker should *always* find at least one chunk if collection exists - repl::OpTime opTime; - std::vector<ChunkType> chunks; - uassertStatusOK(Grid::get(opCtx)->catalogClient(opCtx)->getChunks( - opCtx, - diffQuery.query, - diffQuery.sort, - boost::none, - &chunks, - &opTime, - repl::ReadConcernLevel::kMajorityReadConcern)); - - invariant(opTime >= _configOpTime); - _configOpTime = opTime; - - int diffsApplied = differ.calculateConfigDiff(opCtx, chunks); - if (diffsApplied > 0) { - LOG(2) << "loaded " << diffsApplied << " chunks into new chunk manager for " << _nss - << " with version " << _version; - - // Add all existing shards we find to the shards set - for (ShardVersionMap::iterator it = shardVersions->begin(); it != shardVersions->end();) { - auto shardStatus = Grid::get(opCtx)->shardRegistry()->getShard(opCtx, it->first); - if (shardStatus.isOK()) { - shardIds.insert(it->first); - ++it; - } else { - invariant(shardStatus == ErrorCodes::ShardNotFound); - shardVersions->erase(it++); - } - } - - _configOpTime = opTime; - - return true; - } else if (diffsApplied == 0) { - // No chunks were found for the ns - warning() << "no chunks found when reloading " << _nss << ", previous version was " - << _version; - - // Set all our data to empty - chunkMap.clear(); - shardVersions->clear(); - - _version = ChunkVersion(0, 0, OID()); - _configOpTime = opTime; - - return true; - } else { // diffsApplied < 0 - - bool allInconsistent = (differ.numValidDiffs() == 0); - if (allInconsistent) { - // All versions are different, this can be normal - warning() << "major change in chunk information found when reloading " << _nss - << ", previous version was " << _version; - } else { - // Inconsistent load halfway through (due to yielding cursor during load) - // should be rare - warning() << "inconsistent chunks found when reloading " << _nss - << ", previous version was " << _version << ", this should be rare"; - } - - // Set all our data to empty to be extra safe - chunkMap.clear(); - shardVersions->clear(); - - _version = ChunkVersion(0, 0, OID()); - - return allInconsistent; - } -} - std::shared_ptr<Chunk> ChunkManager::findIntersectingChunk(const BSONObj& shardKey, const BSONObj& collation) const { const bool hasSimpleCollation = (collation.isEmpty() && !_defaultCollator) || @@ -351,7 +105,7 @@ std::shared_ptr<Chunk> ChunkManager::findIntersectingChunkWithSimpleCollation( void ChunkManager::getShardIdsForQuery(OperationContext* opCtx, const BSONObj& query, const BSONObj& collation, - set<ShardId>* shardIds) const { + std::set<ShardId>* shardIds) const { auto qr = stdx::make_unique<QueryRequest>(_nss); qr->setFilter(query); @@ -370,7 +124,7 @@ void ChunkManager::getShardIdsForQuery(OperationContext* opCtx, } // Fast path for targeting equalities on the shard key. - auto shardKeyToFind = _keyPattern.extractShardKeyFromQuery(*cq); + auto shardKeyToFind = _shardKeyPattern.extractShardKeyFromQuery(*cq); if (!shardKeyToFind.isEmpty()) { try { auto chunk = findIntersectingChunk(shardKeyToFind, collation); @@ -387,20 +141,20 @@ void ChunkManager::getShardIdsForQuery(OperationContext* opCtx, // Query { a : { $gte : 1, $lt : 2 }, // b : { $gte : 3, $lt : 4 } } // => Bounds { a : [1, 2), b : [3, 4) } - IndexBounds bounds = getIndexBoundsForQuery(_keyPattern.toBSON(), *cq); + IndexBounds bounds = getIndexBoundsForQuery(_shardKeyPattern.toBSON(), *cq); // Transforms bounds for each shard key field into full shard key ranges // for example : // Key { a : 1, b : 1 } // Bounds { a : [1, 2), b : [3, 4) } // => Ranges { a : 1, b : 3 } => { a : 2, b : 4 } - BoundList ranges = _keyPattern.flattenBounds(bounds); + BoundList ranges = _shardKeyPattern.flattenBounds(bounds); for (BoundList::const_iterator it = ranges.begin(); it != ranges.end(); ++it) { getShardIdsForRange(it->first /*min*/, it->second /*max*/, shardIds); // once we know we need to visit all shards no need to keep looping - if (shardIds->size() == _shardVersions.size()) { + if (shardIds->size() == _chunkMapViews.shardVersions.size()) { break; } } @@ -409,38 +163,38 @@ void ChunkManager::getShardIdsForQuery(OperationContext* opCtx, // For now, we satisfy that assumption by adding a shard with no matches rather than returning // an empty set of shards. if (shardIds->empty()) { - shardIds->insert(_chunkRangeMap.begin()->second.getShardId()); + shardIds->insert(_chunkMapViews.chunkRangeMap.begin()->second.shardId); } } void ChunkManager::getShardIdsForRange(const BSONObj& min, const BSONObj& max, std::set<ShardId>* shardIds) const { - auto it = _chunkRangeMap.upper_bound(min); - auto end = _chunkRangeMap.upper_bound(max); + auto it = _chunkMapViews.chunkRangeMap.upper_bound(min); + auto end = _chunkMapViews.chunkRangeMap.upper_bound(max); // The chunk range map must always cover the entire key space - invariant(it != _chunkRangeMap.end()); + invariant(it != _chunkMapViews.chunkRangeMap.end()); // We need to include the last chunk - if (end != _chunkRangeMap.cend()) { + if (end != _chunkMapViews.chunkRangeMap.cend()) { ++end; } for (; it != end; ++it) { - shardIds->insert(it->second.getShardId()); + shardIds->insert(it->second.shardId); // No need to iterate through the rest of the ranges, because we already know we need to use // all shards. - if (shardIds->size() == _shardVersions.size()) { + if (shardIds->size() == _chunkMapViews.shardVersions.size()) { break; } } } -void ChunkManager::getAllShardIds(set<ShardId>* all) const { - std::transform(_shardVersions.begin(), - _shardVersions.end(), +void ChunkManager::getAllShardIds(std::set<ShardId>* all) const { + std::transform(_chunkMapViews.shardVersions.begin(), + _chunkMapViews.shardVersions.end(), std::inserter(*all, all->begin()), [](const ShardVersionMap::value_type& pair) { return pair.first; }); } @@ -457,7 +211,7 @@ IndexBounds ChunkManager::getIndexBoundsForQuery(const BSONObj& key, } // Consider shard key as an index - string accessMethod = IndexNames::findPluginName(key); + std::string accessMethod = IndexNames::findPluginName(key); dassert(accessMethod == IndexNames::BTREE || accessMethod == IndexNames::HASHED); // Use query framework to generate index bounds @@ -564,19 +318,19 @@ bool ChunkManager::compatibleWith(const ChunkManager& other, const ShardId& shar } ChunkVersion ChunkManager::getVersion(const ShardId& shardName) const { - auto it = _shardVersions.find(shardName); - if (it == _shardVersions.end()) { + auto it = _chunkMapViews.shardVersions.find(shardName); + if (it == _chunkMapViews.shardVersions.end()) { // Shards without explicitly tracked shard versions (meaning they have no chunks) always // have a version of (0, 0, epoch) - return ChunkVersion(0, 0, _version.epoch()); + return ChunkVersion(0, 0, _collectionVersion.epoch()); } return it->second; } -string ChunkManager::toString() const { +std::string ChunkManager::toString() const { StringBuilder sb; - sb << "ChunkManager: " << _nss.ns() << " key:" << _keyPattern.toString() << '\n'; + sb << "ChunkManager: " << _nss.ns() << " key:" << _shardKeyPattern.toString() << '\n'; for (const auto& entry : _chunkMap) { sb << "\t" << entry.second->toString() << '\n'; @@ -585,47 +339,82 @@ string ChunkManager::toString() const { return sb.str(); } -ChunkManager::ChunkRangeMap ChunkManager::_constructRanges(const ChunkMap& chunkMap) { +ChunkManager::ChunkMapViews ChunkManager::_constructChunkMapViews(const OID& epoch, + const ChunkMap& chunkMap) { + invariant(!chunkMap.empty()); + ChunkRangeMap chunkRangeMap = SimpleBSONObjComparator::kInstance.makeBSONObjIndexedMap<ShardAndChunkRange>(); - if (chunkMap.empty()) { - return chunkRangeMap; - } + ShardVersionMap shardVersions; ChunkMap::const_iterator current = chunkMap.cbegin(); while (current != chunkMap.cend()) { - const auto rangeFirst = current; + const auto& firstChunkInRange = current->second; + + // Tracks the max shard version for the shard on which the current range will reside + auto shardVersionIt = shardVersions.find(firstChunkInRange->getShardId()); + if (shardVersionIt == shardVersions.end()) { + shardVersionIt = + shardVersions.emplace(firstChunkInRange->getShardId(), ChunkVersion(0, 0, epoch)) + .first; + } + + auto& maxShardVersion = shardVersionIt->second; + current = std::find_if( - current, chunkMap.cend(), [&rangeFirst](const ChunkMap::value_type& chunkMapEntry) { - return chunkMapEntry.second->getShardId() != rangeFirst->second->getShardId(); + current, + chunkMap.cend(), + [&firstChunkInRange, &maxShardVersion](const ChunkMap::value_type& chunkMapEntry) { + const auto& currentChunk = chunkMapEntry.second; + + if (currentChunk->getShardId() != firstChunkInRange->getShardId()) + return true; + + if (currentChunk->getLastmod() > maxShardVersion) + maxShardVersion = currentChunk->getLastmod(); + + return false; }); + const auto rangeLast = std::prev(current); - const BSONObj rangeMin = rangeFirst->second->getMin(); + const BSONObj rangeMin = firstChunkInRange->getMin(); const BSONObj rangeMax = rangeLast->second->getMax(); - auto insertResult = chunkRangeMap.insert(std::make_pair( - rangeMax, ShardAndChunkRange(rangeMin, rangeMax, rangeFirst->second->getShardId()))); - invariant(insertResult.second); - if (insertResult.first != chunkRangeMap.begin()) { + const auto insertResult = chunkRangeMap.insert(std::make_pair( + rangeMax, ShardAndChunkRange{{rangeMin, rangeMax}, firstChunkInRange->getShardId()})); + uassert(ErrorCodes::ConflictingOperationInProgress, + str::stream() << "Metadata contains two chunks with the same max value " + << rangeMax, + insertResult.second); + + const auto& insertIterator = insertResult.first; + + if (insertIterator != chunkRangeMap.begin()) { // Make sure there are no gaps in the ranges - insertResult.first--; - invariant( - SimpleBSONObjComparator::kInstance.evaluate(insertResult.first->first == rangeMin)); + uassert(ErrorCodes::ConflictingOperationInProgress, + str::stream() << "Gap or an overlap between ranges " + << insertIterator->second.range.toString() + << " and " + << std::prev(insertIterator)->second.range.toString(), + SimpleBSONObjComparator::kInstance.evaluate(std::prev(insertIterator)->first == + rangeMin)); } + + // If a shard has chunks it must have a shard version, otherwise we have an invalid chunk + // somewhere, which should have been caught at chunk load time + invariant(maxShardVersion.isSet()); } invariant(!chunkRangeMap.empty()); - invariant(allOfType(MinKey, chunkRangeMap.begin()->second.getMin())); - invariant(allOfType(MaxKey, chunkRangeMap.rbegin()->first)); + invariant(!shardVersions.empty()); - return chunkRangeMap; -} + checkAllElementsAreOfType(MinKey, chunkRangeMap.begin()->second.min()); + checkAllElementsAreOfType(MaxKey, chunkRangeMap.rbegin()->first); -repl::OpTime ChunkManager::getConfigOpTime() const { - return _configOpTime; + return {std::move(chunkRangeMap), std::move(shardVersions)}; } } // namespace mongo diff --git a/src/mongo/s/chunk_manager.h b/src/mongo/s/chunk_manager.h index 365d4d5df62..f1edeefc668 100644 --- a/src/mongo/s/chunk_manager.h +++ b/src/mongo/s/chunk_manager.h @@ -35,8 +35,6 @@ #include "mongo/base/disallow_copying.h" #include "mongo/db/namespace_string.h" #include "mongo/db/query/collation/collator_interface.h" -#include "mongo/db/repl/optime.h" -#include "mongo/s/catalog/type_chunk.h" #include "mongo/s/chunk.h" #include "mongo/s/chunk_version.h" #include "mongo/s/client/shard.h" @@ -60,10 +58,11 @@ class ChunkManager { public: ChunkManager(NamespaceString nss, - const OID& epoch, - const ShardKeyPattern& shardKeyPattern, + KeyPattern shardKeyPattern, std::unique_ptr<CollatorInterface> defaultCollator, - bool unique); + bool unique, + ChunkMap chunkMap, + ChunkVersion collectionVersion); ~ChunkManager(); @@ -79,7 +78,7 @@ public: } const ShardKeyPattern& getShardKeyPattern() const { - return _keyPattern; + return _shardKeyPattern; } const CollatorInterface* getDefaultCollator() const { @@ -91,10 +90,12 @@ public: } ChunkVersion getVersion() const { - return _version; + return _collectionVersion; } - const ChunkMap& getChunkMap() const { + ChunkVersion getVersion(const ShardId& shardId) const; + + const ChunkMap& chunkMap() const { return _chunkMap; } @@ -102,12 +103,9 @@ public: return _chunkMap.size(); } - // Loads existing ranges based on info in chunk manager - void loadExistingRanges(OperationContext* opCtx, const ChunkManager* oldManager); - - // - // Methods to use once loaded / created - // + const ShardVersionMap& shardVersions() const { + return _chunkMapViews.shardVersions; + } /** * Given a shard key (or a prefix) that has been extracted from a document, returns the chunk @@ -177,57 +175,46 @@ public: std::string toString() const; - ChunkVersion getVersion(const ShardId& shardName) const; - - /** - * Returns the opTime of config server the last time chunks were loaded. - */ - repl::OpTime getConfigOpTime() const; - private: + friend class CollectionRoutingDataLoader; + /** * Represents a range of chunk keys [getMin(), getMax()) and the id of the shard on which they * reside according to the metadata. */ - class ShardAndChunkRange { - public: - ShardAndChunkRange(const BSONObj& min, const BSONObj& max, ShardId inShardId) - : _range(min, max), _shardId(std::move(inShardId)) {} - - const BSONObj& getMin() const { - return _range.getMin(); - } - - const BSONObj& getMax() const { - return _range.getMax(); + struct ShardAndChunkRange { + const BSONObj& min() const { + return range.getMin(); } - const ShardId& getShardId() const { - return _shardId; + const BSONObj& max() const { + return range.getMax(); } - private: - ChunkRange _range; - ShardId _shardId; + ChunkRange range; + ShardId shardId; }; using ChunkRangeMap = BSONObjIndexedMap<ShardAndChunkRange>; /** - * If load was successful, returns true and it is guaranteed that the _chunkMap and - * _chunkRangeMap are consistent with each other. If false is returned, it is not safe to use - * the chunk manager anymore. + * Contains different transformations of the chunk map for efficient querying */ - bool _load(OperationContext* opCtx, - ChunkMap& chunks, - std::set<ShardId>& shardIds, - ShardVersionMap* shardVersions, - const ChunkManager* oldManager); + struct ChunkMapViews { + // Transformation of the chunk map containing what range of keys reside on which shard. The + // index is the max key of the respective range and the union of all ranges in a such + // constructed map must cover the complete space from [MinKey, MaxKey). + const ChunkRangeMap chunkRangeMap; + + // Map from shard id to the maximum chunk version for that shard. If a shard contains no + // chunks, it won't be present in this map. + const ShardVersionMap shardVersions; + }; /** - * Merges consecutive chunks, which reside on the same shard into a single range. + * Does a single pass over the chunkMap and constructs the ChunkMapViews object. */ - static ChunkRangeMap _constructRanges(const ChunkMap& chunkMap); + static ChunkMapViews _constructChunkMapViews(const OID& epoch, const ChunkMap& chunkMap); // The shard versioning mechanism hinges on keeping track of the number of times we reload // ChunkManagers. @@ -237,7 +224,7 @@ private: const NamespaceString _nss; // The key pattern used to shard the collection - const ShardKeyPattern _keyPattern; + const ShardKeyPattern _shardKeyPattern; // Default collation to use for routing data queries for this collection const std::unique_ptr<CollatorInterface> _defaultCollator; @@ -247,23 +234,15 @@ private: // Map from the max for each chunk to an entry describing the chunk. The union of all chunks' // ranges must cover the complete space from [MinKey, MaxKey). - ChunkMap _chunkMap; - - // Transformation of the chunk map containing what range of keys reside on which shard. The - // index is the max key of the respective range and the union of all ranges in a such - // constructed map must cover the complete space from [MinKey, MaxKey). - ChunkRangeMap _chunkRangeMap; + const ChunkMap _chunkMap; - // Max known version per shard - ShardVersionMap _shardVersions; + // Different transformations of the chunk map for efficient querying + const ChunkMapViews _chunkMapViews; // Max version across all chunks - ChunkVersion _version; + const ChunkVersion _collectionVersion; - // OpTime of config server the last time chunks were loaded. - repl::OpTime _configOpTime; - - // Auto-split throttling state + // Auto-split throttling state (state mutable by write commands) struct AutoSplitThrottle { public: AutoSplitThrottle() : _splitTickets(maxParallelSplits) {} @@ -280,8 +259,6 @@ private: ChunkManager*, Chunk*, long); - - friend class TestableChunkManager; }; } // namespace mongo diff --git a/src/mongo/s/chunk_manager_test.cpp b/src/mongo/s/chunk_manager_query_test.cpp index 08f8357b776..013c4618d6c 100644 --- a/src/mongo/s/chunk_manager_test.cpp +++ b/src/mongo/s/chunk_manager_query_test.cpp @@ -32,143 +32,13 @@ #include <set> -#include "mongo/client/remote_command_targeter_mock.h" -#include "mongo/db/client.h" #include "mongo/db/query/collation/collator_interface_mock.h" -#include "mongo/s/catalog/sharding_catalog_test_fixture.h" -#include "mongo/s/catalog/type_chunk.h" -#include "mongo/s/catalog/type_collection.h" -#include "mongo/s/catalog/type_shard.h" #include "mongo/s/chunk_manager.h" -#include "mongo/stdx/memory.h" -#include "mongo/util/scopeguard.h" +#include "mongo/s/chunk_manager_test_fixture.h" namespace mongo { namespace { -using executor::RemoteCommandResponse; -using executor::RemoteCommandRequest; - -const NamespaceString kNss("TestDB", "TestColl"); - -class ChunkManagerTestFixture : public ShardingCatalogTestFixture { -protected: - void setUp() override { - ShardingCatalogTestFixture::setUp(); - setRemote(HostAndPort("FakeRemoteClient:34567")); - configTargeter()->setFindHostReturnValue(HostAndPort{CONFIG_HOST_PORT}); - } - - /** - * Returns a chunk manager with chunks at the specified split points. Each individual chunk is - * placed on a separate shard with id ranging from "0" to the number of chunks. - */ - std::unique_ptr<ChunkManager> makeChunkManager( - const ShardKeyPattern& shardKeyPattern, - std::unique_ptr<CollatorInterface> defaultCollator, - bool unique, - const std::vector<BSONObj>& splitPoints) { - ChunkVersion version(1, 0, OID::gen()); - - std::vector<BSONObj> shards; - std::vector<BSONObj> initialChunks; - - auto splitPointsIncludingEnds(splitPoints); - splitPointsIncludingEnds.insert(splitPointsIncludingEnds.begin(), - shardKeyPattern.getKeyPattern().globalMin()); - splitPointsIncludingEnds.push_back(shardKeyPattern.getKeyPattern().globalMax()); - - for (size_t i = 1; i < splitPointsIncludingEnds.size(); ++i) { - ShardType shard; - shard.setName(str::stream() << (i - 1)); - shard.setHost(str::stream() << "Host" << (i - 1) << ":12345"); - - shards.push_back(shard.toBSON()); - - ChunkType chunk; - chunk.setNS(kNss.ns()); - chunk.setMin(shardKeyPattern.getKeyPattern().extendRangeBound( - splitPointsIncludingEnds[i - 1], false)); - chunk.setMax(shardKeyPattern.getKeyPattern().extendRangeBound( - splitPointsIncludingEnds[i], false)); - chunk.setShard(shard.getName()); - chunk.setVersion(version); - - initialChunks.push_back(chunk.toConfigBSON()); - - version.incMajor(); - } - - // Load the initial manager - auto manager = stdx::make_unique<ChunkManager>( - kNss, version.epoch(), shardKeyPattern, std::move(defaultCollator), unique); - - auto future = launchAsync([&manager] { - ON_BLOCK_EXIT([&] { Client::destroy(); }); - Client::initThread("Test"); - auto opCtx = cc().makeOperationContext(); - manager->loadExistingRanges(opCtx.get(), nullptr); - }); - - expectFindOnConfigSendBSONObjVector(initialChunks); - expectFindOnConfigSendBSONObjVector(shards); - - future.timed_get(kFutureTimeout); - - return manager; - } -}; - -using ChunkManagerLoadTest = ChunkManagerTestFixture; - -TEST_F(ChunkManagerLoadTest, IncrementalLoadAfterSplit) { - const ShardKeyPattern shardKeyPattern(BSON("_id" << 1)); - - auto initialManager(makeChunkManager(shardKeyPattern, nullptr, true, {})); - - ChunkVersion version = initialManager->getVersion(); - - CollectionType collType; - collType.setNs(kNss); - collType.setEpoch(version.epoch()); - collType.setUpdatedAt(jsTime()); - collType.setKeyPattern(shardKeyPattern.toBSON()); - collType.setUnique(false); - - ChunkManager manager(kNss, version.epoch(), shardKeyPattern, nullptr, true); - - auto future = - launchAsync([&] { manager.loadExistingRanges(operationContext(), initialManager.get()); }); - - // Return set of chunks, which represent a split - expectFindOnConfigSendBSONObjVector([&]() { - version.incMajor(); - - ChunkType chunk1; - chunk1.setNS(kNss.ns()); - chunk1.setMin(shardKeyPattern.getKeyPattern().globalMin()); - chunk1.setMax(BSON("_id" << 0)); - chunk1.setShard({"0"}); - chunk1.setVersion(version); - - version.incMinor(); - - ChunkType chunk2; - chunk2.setNS(kNss.ns()); - chunk2.setMin(BSON("_id" << 0)); - chunk2.setMax(shardKeyPattern.getKeyPattern().globalMax()); - chunk2.setShard({"0"}); - chunk2.setVersion(version); - - return std::vector<BSONObj>{chunk1.toConfigBSON(), chunk2.toConfigBSON()}; - }()); - - future.timed_get(kFutureTimeout); -} - -/** - * Fixture to be used as a shortcut for tests which exercise the getShardIdsForQuery routing logic - */ class ChunkManagerQueryTest : public ChunkManagerTestFixture { protected: void runQueryTest(const BSONObj& shardKey, diff --git a/src/mongo/s/chunk_manager_refresh_test.cpp b/src/mongo/s/chunk_manager_refresh_test.cpp new file mode 100644 index 00000000000..504893acf3c --- /dev/null +++ b/src/mongo/s/chunk_manager_refresh_test.cpp @@ -0,0 +1,194 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault + +#include "mongo/platform/basic.h" + +#include <set> + +#include "mongo/s/catalog/type_chunk.h" +#include "mongo/s/catalog/type_collection.h" +#include "mongo/s/catalog/type_shard.h" +#include "mongo/s/catalog_cache.h" +#include "mongo/s/chunk_manager_test_fixture.h" +#include "mongo/util/scopeguard.h" + +namespace mongo { +namespace { + +using ChunkManagerLoadTest = ChunkManagerTestFixture; + +TEST_F(ChunkManagerLoadTest, IncrementalLoadAfterSplit) { + const ShardKeyPattern shardKeyPattern(BSON("_id" << 1)); + + auto initialRoutingInfo(makeChunkManager(shardKeyPattern, nullptr, true, {})); + ASSERT_EQ(1, initialRoutingInfo->numChunks()); + + auto future = launchAsync([&] { + auto client = serviceContext()->makeClient("Test"); + auto opCtx = client->makeOperationContext(); + return CatalogCache::refreshCollectionRoutingInfo(opCtx.get(), kNss, initialRoutingInfo); + }); + + ChunkVersion version = initialRoutingInfo->getVersion(); + + expectFindOnConfigSendBSONObjVector([&]() { + CollectionType collType; + collType.setNs(kNss); + collType.setEpoch(version.epoch()); + collType.setKeyPattern(shardKeyPattern.toBSON()); + collType.setUnique(false); + + return std::vector<BSONObj>{collType.toBSON()}; + }()); + + // Return set of chunks, which represent a split + expectFindOnConfigSendBSONObjVector([&]() { + version.incMajor(); + ChunkType chunk1( + kNss, {shardKeyPattern.getKeyPattern().globalMin(), BSON("_id" << 0)}, version, {"0"}); + + version.incMinor(); + ChunkType chunk2( + kNss, {BSON("_id" << 0), shardKeyPattern.getKeyPattern().globalMax()}, version, {"0"}); + + return std::vector<BSONObj>{chunk1.toConfigBSON(), chunk2.toConfigBSON()}; + }()); + + auto newRoutingInfo(future.timed_get(kFutureTimeout)); + ASSERT_EQ(2, newRoutingInfo->numChunks()); + ASSERT_EQ(version, newRoutingInfo->getVersion()); + ASSERT_EQ(version, newRoutingInfo->getVersion({"0"})); + ASSERT_EQ(ChunkVersion(0, 0, version.epoch()), newRoutingInfo->getVersion({"1"})); +} + +TEST_F(ChunkManagerLoadTest, IncrementalLoadAfterMove) { + const ShardKeyPattern shardKeyPattern(BSON("_id" << 1)); + + auto initialRoutingInfo(makeChunkManager(shardKeyPattern, nullptr, true, {BSON("_id" << 0)})); + ASSERT_EQ(2, initialRoutingInfo->numChunks()); + + auto future = launchAsync([&] { + auto client = serviceContext()->makeClient("Test"); + auto opCtx = client->makeOperationContext(); + return CatalogCache::refreshCollectionRoutingInfo(opCtx.get(), kNss, initialRoutingInfo); + }); + + ChunkVersion version = initialRoutingInfo->getVersion(); + + expectFindOnConfigSendBSONObjVector([&]() { + CollectionType collType; + collType.setNs(kNss); + collType.setEpoch(version.epoch()); + collType.setKeyPattern(shardKeyPattern.toBSON()); + collType.setUnique(false); + + return std::vector<BSONObj>{collType.toBSON()}; + }()); + + ChunkVersion expectedDestShardVersion; + + // Return set of chunks, which represent a move + expectFindOnConfigSendBSONObjVector([&]() { + version.incMajor(); + expectedDestShardVersion = version; + ChunkType chunk1( + kNss, {shardKeyPattern.getKeyPattern().globalMin(), BSON("_id" << 0)}, version, {"1"}); + + version.incMinor(); + ChunkType chunk2( + kNss, {BSON("_id" << 0), shardKeyPattern.getKeyPattern().globalMax()}, version, {"0"}); + + return std::vector<BSONObj>{chunk1.toConfigBSON(), chunk2.toConfigBSON()}; + }()); + + auto newRoutingInfo(future.timed_get(kFutureTimeout)); + ASSERT_EQ(2, newRoutingInfo->numChunks()); + ASSERT_EQ(version, newRoutingInfo->getVersion()); + ASSERT_EQ(version, newRoutingInfo->getVersion({"0"})); + ASSERT_EQ(expectedDestShardVersion, newRoutingInfo->getVersion({"1"})); +} + +TEST_F(ChunkManagerLoadTest, IncrementalLoadAfterMoveLastChunk) { + const ShardKeyPattern shardKeyPattern(BSON("_id" << 1)); + + auto initialRoutingInfo(makeChunkManager(shardKeyPattern, nullptr, true, {})); + ASSERT_EQ(1, initialRoutingInfo->numChunks()); + + auto future = launchAsync([&] { + auto client = serviceContext()->makeClient("Test"); + auto opCtx = client->makeOperationContext(); + return CatalogCache::refreshCollectionRoutingInfo(opCtx.get(), kNss, initialRoutingInfo); + }); + + ChunkVersion version = initialRoutingInfo->getVersion(); + + expectFindOnConfigSendBSONObjVector([&]() { + CollectionType collType; + collType.setNs(kNss); + collType.setEpoch(version.epoch()); + collType.setKeyPattern(shardKeyPattern.toBSON()); + collType.setUnique(false); + + return std::vector<BSONObj>{collType.toBSON()}; + }()); + + // Return set of chunks, which represent a move + expectFindOnConfigSendBSONObjVector([&]() { + version.incMajor(); + ChunkType chunk1(kNss, + {shardKeyPattern.getKeyPattern().globalMin(), + shardKeyPattern.getKeyPattern().globalMax()}, + version, + {"1"}); + + return std::vector<BSONObj>{chunk1.toConfigBSON()}; + }()); + + expectFindOnConfigSendBSONObjVector([&]() { + ShardType shard1; + shard1.setName("0"); + shard1.setHost(str::stream() << "Host0:12345"); + + ShardType shard2; + shard2.setName("1"); + shard2.setHost(str::stream() << "Host1:12345"); + + return std::vector<BSONObj>{shard1.toBSON(), shard2.toBSON()}; + }()); + + auto newRoutingInfo(future.timed_get(kFutureTimeout)); + ASSERT_EQ(1, newRoutingInfo->numChunks()); + ASSERT_EQ(version, newRoutingInfo->getVersion()); + ASSERT_EQ(ChunkVersion(0, 0, version.epoch()), newRoutingInfo->getVersion({"0"})); + ASSERT_EQ(version, newRoutingInfo->getVersion({"1"})); +} + +} // namespace +} // namespace mongo diff --git a/src/mongo/s/chunk_manager_test_fixture.cpp b/src/mongo/s/chunk_manager_test_fixture.cpp new file mode 100644 index 00000000000..ada08673d70 --- /dev/null +++ b/src/mongo/s/chunk_manager_test_fixture.cpp @@ -0,0 +1,122 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault + +#include "mongo/platform/basic.h" + +#include <set> +#include <vector> + +#include "mongo/s/chunk_manager_test_fixture.h" + +#include "mongo/client/remote_command_targeter_mock.h" +#include "mongo/db/client.h" +#include "mongo/db/query/collation/collator_factory_mock.h" +#include "mongo/s/catalog/type_chunk.h" +#include "mongo/s/catalog/type_collection.h" +#include "mongo/s/catalog/type_shard.h" +#include "mongo/s/catalog_cache.h" +#include "mongo/stdx/memory.h" +#include "mongo/util/scopeguard.h" + +namespace mongo { + +const NamespaceString ChunkManagerTestFixture::kNss("TestDB", "TestColl"); + +void ChunkManagerTestFixture::setUp() { + ShardingCatalogTestFixture::setUp(); + setRemote(HostAndPort("FakeRemoteClient:34567")); + configTargeter()->setFindHostReturnValue(HostAndPort{CONFIG_HOST_PORT}); + + CollatorFactoryInterface::set(serviceContext(), stdx::make_unique<CollatorFactoryMock>()); +} + +std::shared_ptr<ChunkManager> ChunkManagerTestFixture::makeChunkManager( + const ShardKeyPattern& shardKeyPattern, + std::unique_ptr<CollatorInterface> defaultCollator, + bool unique, + const std::vector<BSONObj>& splitPoints) { + ChunkVersion version(1, 0, OID::gen()); + + const BSONObj collectionBSON = [&]() { + CollectionType coll; + coll.setNs(kNss); + coll.setEpoch(version.epoch()); + coll.setKeyPattern(shardKeyPattern.getKeyPattern()); + coll.setUnique(unique); + + if (defaultCollator) { + coll.setDefaultCollation(defaultCollator->getSpec().toBSON()); + } + + return coll.toBSON(); + }(); + + std::vector<BSONObj> shards; + std::vector<BSONObj> initialChunks; + + auto splitPointsIncludingEnds(splitPoints); + splitPointsIncludingEnds.insert(splitPointsIncludingEnds.begin(), + shardKeyPattern.getKeyPattern().globalMin()); + splitPointsIncludingEnds.push_back(shardKeyPattern.getKeyPattern().globalMax()); + + for (size_t i = 1; i < splitPointsIncludingEnds.size(); ++i) { + ShardType shard; + shard.setName(str::stream() << (i - 1)); + shard.setHost(str::stream() << "Host" << (i - 1) << ":12345"); + + shards.push_back(shard.toBSON()); + + ChunkType chunk( + kNss, + {shardKeyPattern.getKeyPattern().extendRangeBound(splitPointsIncludingEnds[i - 1], + false), + shardKeyPattern.getKeyPattern().extendRangeBound(splitPointsIncludingEnds[i], false)}, + version, + shard.getName()); + + initialChunks.push_back(chunk.toConfigBSON()); + + version.incMajor(); + } + + auto future = launchAsync([&] { + auto client = serviceContext()->makeClient("Test"); + auto opCtx = client->makeOperationContext(); + return CatalogCache::refreshCollectionRoutingInfo(opCtx.get(), kNss, nullptr); + }); + + expectFindOnConfigSendBSONObjVector({collectionBSON}); + expectFindOnConfigSendBSONObjVector(initialChunks); + expectFindOnConfigSendBSONObjVector(shards); + + return future.timed_get(kFutureTimeout); +} + +} // namespace mongo diff --git a/src/mongo/s/chunk_manager_test_fixture.h b/src/mongo/s/chunk_manager_test_fixture.h new file mode 100644 index 00000000000..aaa059dd49d --- /dev/null +++ b/src/mongo/s/chunk_manager_test_fixture.h @@ -0,0 +1,62 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <vector> + +#include "mongo/db/namespace_string.h" +#include "mongo/s/catalog/sharding_catalog_test_fixture.h" +#include "mongo/stdx/memory.h" + +namespace mongo { + +class BSONObj; +class ChunkManager; +class CollatorInterface; +class ShardKeyPattern; + +class ChunkManagerTestFixture : public ShardingCatalogTestFixture { +protected: + void setUp() override; + + /** + * Returns a chunk manager with chunks at the specified split points. Each individual chunk is + * placed on a separate shard with shard id being a single number ranging from "0" to the number + * of chunks. + */ + std::shared_ptr<ChunkManager> makeChunkManager( + const ShardKeyPattern& shardKeyPattern, + std::unique_ptr<CollatorInterface> defaultCollator, + bool unique, + const std::vector<BSONObj>& splitPoints); + + static const NamespaceString kNss; +}; + +} // namespace mongo diff --git a/src/mongo/s/client/parallel.cpp b/src/mongo/s/client/parallel.cpp index 89d2836dc92..9e2aaaaf4e5 100644 --- a/src/mongo/s/client/parallel.cpp +++ b/src/mongo/s/client/parallel.cpp @@ -39,11 +39,9 @@ #include "mongo/db/bson/dotted_path_support.h" #include "mongo/db/query/query_request.h" #include "mongo/s/catalog_cache.h" -#include "mongo/s/chunk_manager.h" #include "mongo/s/client/shard_connection.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" -#include "mongo/s/sharding_raii.h" #include "mongo/s/stale_exception.h" #include "mongo/util/log.h" #include "mongo/util/net/socket_exception.h" @@ -321,42 +319,20 @@ void ParallelSortClusteredCursor::fullInit(OperationContext* opCtx) { finishInit(opCtx); } -void ParallelSortClusteredCursor::_markStaleNS(OperationContext* opCtx, - const NamespaceString& staleNS, - const StaleConfigException& e, - bool& forceReload) { - if (e.requiresFullReload()) { - Grid::get(opCtx)->catalogCache()->invalidate(staleNS.db()); - } - - if (_staleNSMap.find(staleNS.ns()) == _staleNSMap.end()) +void ParallelSortClusteredCursor::_markStaleNS(const NamespaceString& staleNS, + const StaleConfigException& e) { + if (_staleNSMap.find(staleNS.ns()) == _staleNSMap.end()) { _staleNSMap[staleNS.ns()] = 1; + } - int tries = ++_staleNSMap[staleNS.ns()]; + const int tries = ++_staleNSMap[staleNS.ns()]; if (tries >= 5) { throw SendStaleConfigException(staleNS.ns(), - str::stream() << "too many retries of stale version info", + "too many retries of stale version info", e.getVersionReceived(), e.getVersionWanted()); } - - forceReload = tries > 2; -} - -void ParallelSortClusteredCursor::_handleStaleNS(OperationContext* opCtx, - const NamespaceString& staleNS, - bool forceReload) { - auto scopedCMStatus = ScopedChunkManager::get(opCtx, staleNS); - if (!scopedCMStatus.isOK()) { - log() << "cannot reload database info for stale namespace " << staleNS.ns(); - return; - } - - const auto& scopedCM = scopedCMStatus.getValue(); - - // Reload chunk manager, potentially forcing the namespace - scopedCM.db()->getChunkManagerIfExists(opCtx, staleNS.ns(), true, forceReload); } void ParallelSortClusteredCursor::setupVersionAndHandleSlaveOk( @@ -459,12 +435,12 @@ void ParallelSortClusteredCursor::startInit(OperationContext* opCtx) { shared_ptr<Shard> primary; { - auto scopedCMStatus = ScopedChunkManager::get(opCtx, nss); - if (scopedCMStatus != ErrorCodes::NamespaceNotFound) { - uassertStatusOK(scopedCMStatus.getStatus()); - const auto& scopedCM = scopedCMStatus.getValue(); - manager = scopedCM.cm(); - primary = scopedCM.primary(); + auto routingInfoStatus = + Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss); + if (routingInfoStatus != ErrorCodes::NamespaceNotFound) { + auto routingInfo = uassertStatusOK(std::move(routingInfoStatus)); + manager = routingInfo.cm(); + primary = routingInfo.primary(); } } @@ -642,20 +618,17 @@ void ParallelSortClusteredCursor::startInit(OperationContext* opCtx) { if (staleNS.size() == 0) staleNS = nss; // ns is the *versioned* namespace, be careful of this - // Probably need to retry fully - bool forceReload; - _markStaleNS(opCtx, staleNS, e, forceReload); + _markStaleNS(staleNS, e); + Grid::get(opCtx)->catalogCache()->invalidateShardedCollection(staleNS); - LOG(1) << "stale config of ns " << staleNS - << " during initialization, will retry with forced : " << forceReload + LOG(1) << "stale config of ns " << staleNS << " during initialization, will retry" << causedBy(redact(e)); // This is somewhat strange - if (staleNS != nss) + if (staleNS != nss) { warning() << "versioned ns " << nss.ns() << " doesn't match stale config namespace " << staleNS; - - _handleStaleNS(opCtx, staleNS, forceReload); + } // Restart with new chunk manager startInit(opCtx); @@ -860,26 +833,21 @@ void ParallelSortClusteredCursor::finishInit(OperationContext* opCtx) { if (retry) { // Refresh stale namespaces if (staleNSExceptions.size()) { - for (map<string, StaleConfigException>::iterator i = staleNSExceptions.begin(), - end = staleNSExceptions.end(); - i != end; - ++i) { - NamespaceString staleNS(i->first); - const StaleConfigException& exception = i->second; + for (const auto& exEntry : staleNSExceptions) { + const NamespaceString staleNS(exEntry.first); + const StaleConfigException& ex = exEntry.second; - bool forceReload; - _markStaleNS(opCtx, staleNS, exception, forceReload); + _markStaleNS(staleNS, ex); + Grid::get(opCtx)->catalogCache()->invalidateShardedCollection(staleNS); - LOG(1) << "stale config of ns " << staleNS - << " on finishing query, will retry with forced : " << forceReload - << causedBy(redact(exception)); + LOG(1) << "stale config of ns " << staleNS << " on finishing query, will retry" + << causedBy(redact(ex)); // This is somewhat strange - if (staleNS != ns) + if (staleNS != ns) { warning() << "versioned ns " << ns << " doesn't match stale config namespace " << staleNS; - - _handleStaleNS(opCtx, staleNS, forceReload); + } } } diff --git a/src/mongo/s/client/parallel.h b/src/mongo/s/client/parallel.h index d375858bae0..d1680f1e74f 100644 --- a/src/mongo/s/client/parallel.h +++ b/src/mongo/s/client/parallel.h @@ -117,11 +117,7 @@ private: void _finishCons(); - void _markStaleNS(OperationContext* opCtx, - const NamespaceString& staleNS, - const StaleConfigException& e, - bool& forceReload); - void _handleStaleNS(OperationContext* opCtx, const NamespaceString& staleNS, bool forceReload); + void _markStaleNS(const NamespaceString& staleNS, const StaleConfigException& e); bool _didInit; bool _done; diff --git a/src/mongo/s/client/version_manager.cpp b/src/mongo/s/client/version_manager.cpp index 299a89f5941..7f97716f620 100644 --- a/src/mongo/s/client/version_manager.cpp +++ b/src/mongo/s/client/version_manager.cpp @@ -36,13 +36,13 @@ #include "mongo/db/namespace_string.h" #include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/catalog_cache.h" +#include "mongo/s/catalog_cache.h" #include "mongo/s/chunk_version.h" #include "mongo/s/client/shard_connection.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" #include "mongo/s/is_mongos.h" #include "mongo/s/set_shard_version_request.h" -#include "mongo/s/sharding_raii.h" #include "mongo/s/stale_exception.h" #include "mongo/util/log.h" @@ -257,21 +257,21 @@ bool checkShardVersion(OperationContext* opCtx, const NamespaceString nss(ns); + auto const catalogCache = Grid::get(opCtx)->catalogCache(); + if (authoritative) { - ScopedChunkManager::refreshAndGet(opCtx, nss); + Grid::get(opCtx)->catalogCache()->invalidateShardedCollection(nss); } - auto scopedCMStatus = ScopedChunkManager::get(opCtx, nss); - - if (!scopedCMStatus.isOK()) { + auto routingInfoStatus = catalogCache->getCollectionRoutingInfo(opCtx, nss); + if (!routingInfoStatus.isOK()) { return false; } - const auto& scopedCM = scopedCMStatus.getValue(); + auto& routingInfo = routingInfoStatus.getValue(); - auto conf = scopedCM.db(); - const auto manager = scopedCM.cm(); - const auto primary = scopedCM.primary(); + const auto manager = routingInfo.cm(); + const auto primary = routingInfo.primary(); unsigned long long officialSequenceNumber = 0; @@ -379,16 +379,7 @@ bool checkShardVersion(OperationContext* opCtx, return true; } - if (result["reloadConfig"].trueValue()) { - if (result["version"].timestampTime() == Date_t()) { - warning() << "reloading full configuration for " << conf->name() - << ", connection state indicates significant version changes"; - - Grid::get(opCtx)->catalogCache()->invalidate(nss.db()); - } - - conf->getChunkManager(opCtx, nss.ns(), true); - } + Grid::get(opCtx)->catalogCache()->onStaleConfigError(std::move(routingInfo)); const int maxNumTries = 7; if (tryNumber < maxNumTries) { diff --git a/src/mongo/s/commands/chunk_manager_targeter.cpp b/src/mongo/s/commands/chunk_manager_targeter.cpp index 609dee87d9e..42de7ba2903 100644 --- a/src/mongo/s/commands/chunk_manager_targeter.cpp +++ b/src/mongo/s/commands/chunk_manager_targeter.cpp @@ -32,31 +32,18 @@ #include "mongo/s/commands/chunk_manager_targeter.h" -#include <boost/thread/tss.hpp> - #include "mongo/db/matcher/extensions_callback_noop.h" #include "mongo/db/operation_context.h" #include "mongo/db/query/canonical_query.h" #include "mongo/db/query/collation/collation_index_key.h" -#include "mongo/s/catalog_cache.h" -#include "mongo/s/chunk.h" #include "mongo/s/client/shard_registry.h" +#include "mongo/s/commands/cluster_commands_common.h" #include "mongo/s/grid.h" #include "mongo/s/shard_key_pattern.h" -#include "mongo/s/sharding_raii.h" -#include "mongo/stdx/memory.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" namespace mongo { - -using std::shared_ptr; -using str::stream; -using std::map; -using std::set; -using std::string; -using std::vector; - namespace { enum UpdateType { UpdateType_Replacement, UpdateType_OpStyle, UpdateType_Unknown }; @@ -65,11 +52,6 @@ enum CompareResult { CompareResult_Unknown, CompareResult_GTE, CompareResult_LT const ShardKeyPattern virtualIdShardKey(BSON("_id" << 1)); -// To match legacy reload behavior, we have to backoff on config reload per-thread -// TODO: Centralize this behavior better by refactoring config reload in mongos -boost::thread_specific_ptr<Backoff> perThreadBackoff; -const int maxWaitMillis = 500; - /** * There are two styles of update expressions: * @@ -138,15 +120,6 @@ bool isExactIdQuery(OperationContext* opCtx, const CanonicalQuery& query, ChunkM return true; } -void refreshBackoff() { - if (!perThreadBackoff.get()) { - perThreadBackoff.reset(new Backoff(maxWaitMillis, maxWaitMillis * 2)); - } - - perThreadBackoff.get()->nextSleepMillis(); -} - - // // Utilities to compare shard versions // @@ -173,25 +146,19 @@ CompareResult compareShardVersions(const ChunkVersion& shardVersionA, return CompareResult_Unknown; } - if (shardVersionA < shardVersionB) { + if (shardVersionA < shardVersionB) return CompareResult_LT; - } - else return CompareResult_GTE; } -ChunkVersion getShardVersion(StringData shardName, - const ChunkManager* manager, - const Shard* primary) { - dassert(!(manager && primary)); - dassert(manager || primary); - - if (primary) { - return ChunkVersion::UNSHARDED(); +ChunkVersion getShardVersion(const CachedCollectionRoutingInfo& routingInfo, + const ShardId& shardId) { + if (routingInfo.cm()) { + return routingInfo.cm()->getVersion(shardId); } - return manager->getVersion(shardName.toString()); + return ChunkVersion::UNSHARDED(); } /** @@ -205,26 +172,21 @@ ChunkVersion getShardVersion(StringData shardName, * Note that the signature here is weird since our cached map of chunk versions is stored in a * ChunkManager or is implicit in the primary shard of the collection. */ -CompareResult compareAllShardVersions(const ChunkManager* cachedChunkManager, - const Shard* cachedPrimary, - const map<ShardId, ChunkVersion>& remoteShardVersions) { +CompareResult compareAllShardVersions(const CachedCollectionRoutingInfo& routingInfo, + const ShardVersionMap& remoteShardVersions) { CompareResult finalResult = CompareResult_GTE; - for (map<ShardId, ChunkVersion>::const_iterator it = remoteShardVersions.begin(); - it != remoteShardVersions.end(); - ++it) { - // Get the remote and cached version for the next shard - const ShardId& shardName = it->first; - const ChunkVersion& remoteShardVersion = it->second; + for (const auto& shardVersionEntry : remoteShardVersions) { + const ShardId& shardId = shardVersionEntry.first; + const ChunkVersion& remoteShardVersion = shardVersionEntry.second; ChunkVersion cachedShardVersion; try { // Throws b/c shard constructor throws - cachedShardVersion = - getShardVersion(shardName.toString(), cachedChunkManager, cachedPrimary); + cachedShardVersion = getShardVersion(routingInfo, shardId); } catch (const DBException& ex) { - warning() << "could not lookup shard " << shardName + warning() << "could not lookup shard " << shardId << " in local cache, shard metadata may have changed" << " or be unavailable" << causedBy(ex); @@ -236,6 +198,7 @@ CompareResult compareAllShardVersions(const ChunkManager* cachedChunkManager, if (result == CompareResult_Unknown) return result; + if (result == CompareResult_LT) finalResult = CompareResult_LT; @@ -248,10 +211,10 @@ CompareResult compareAllShardVersions(const ChunkManager* cachedChunkManager, /** * Whether or not the manager/primary pair is different from the other manager/primary pair. */ -bool isMetadataDifferent(const shared_ptr<ChunkManager>& managerA, - const shared_ptr<Shard>& primaryA, - const shared_ptr<ChunkManager>& managerB, - const shared_ptr<Shard>& primaryB) { +bool isMetadataDifferent(const std::shared_ptr<ChunkManager>& managerA, + const std::shared_ptr<Shard>& primaryA, + const std::shared_ptr<ChunkManager>& managerB, + const std::shared_ptr<Shard>& primaryB) { if ((managerA && !managerB) || (!managerA && managerB) || (primaryA && !primaryB) || (!primaryA && primaryB)) return true; @@ -268,10 +231,10 @@ bool isMetadataDifferent(const shared_ptr<ChunkManager>& managerA, * Whether or not the manager/primary pair was changed or refreshed from a previous version * of the metadata. */ -bool wasMetadataRefreshed(const shared_ptr<ChunkManager>& managerA, - const shared_ptr<Shard>& primaryA, - const shared_ptr<ChunkManager>& managerB, - const shared_ptr<Shard>& primaryB) { +bool wasMetadataRefreshed(const std::shared_ptr<ChunkManager>& managerA, + const std::shared_ptr<Shard>& primaryA, + const std::shared_ptr<ChunkManager>& managerB, + const std::shared_ptr<Shard>& primaryB) { if (isMetadataDifferent(managerA, primaryA, managerB, primaryB)) return true; @@ -290,14 +253,18 @@ ChunkManagerTargeter::ChunkManagerTargeter(const NamespaceString& nss, TargeterS Status ChunkManagerTargeter::init(OperationContext* opCtx) { - auto scopedCMStatus = ScopedChunkManager::getOrCreate(opCtx, _nss); - if (!scopedCMStatus.isOK()) { - return scopedCMStatus.getStatus(); + auto shardDbStatus = createShardDatabase(opCtx, _nss.db()); + if (!shardDbStatus.isOK()) { + return shardDbStatus.getStatus(); } - const auto& scopedCM = scopedCMStatus.getValue(); - _manager = scopedCM.cm(); - _primary = scopedCM.primary(); + const auto routingInfoStatus = + Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, _nss); + if (!routingInfoStatus.isOK()) { + return routingInfoStatus.getStatus(); + } + + _routingInfo = std::move(routingInfoStatus.getValue()); return Status::OK(); } @@ -311,21 +278,21 @@ Status ChunkManagerTargeter::targetInsert(OperationContext* opCtx, ShardEndpoint** endpoint) const { BSONObj shardKey; - if (_manager) { + if (_routingInfo->cm()) { // // Sharded collections have the following requirements for targeting: // // Inserts must contain the exact shard key. // - shardKey = _manager->getShardKeyPattern().extractShardKeyFromDoc(doc); + shardKey = _routingInfo->cm()->getShardKeyPattern().extractShardKeyFromDoc(doc); // Check shard key exists if (shardKey.isEmpty()) { - return Status(ErrorCodes::ShardKeyNotFound, - stream() << "document " << doc - << " does not contain shard key for pattern " - << _manager->getShardKeyPattern().toString()); + return {ErrorCodes::ShardKeyNotFound, + str::stream() << "document " << doc + << " does not contain shard key for pattern " + << _routingInfo->cm()->getShardKeyPattern().toString()}; } // Check shard key size on insert @@ -338,13 +305,13 @@ Status ChunkManagerTargeter::targetInsert(OperationContext* opCtx, if (!shardKey.isEmpty()) { *endpoint = targetShardKey(shardKey, CollationSpec::kSimpleSpec, doc.objsize()).release(); } else { - if (!_primary) { + if (!_routingInfo->primary()) { return Status(ErrorCodes::NamespaceNotFound, str::stream() << "could not target insert in collection " << getNS().ns() << "; no metadata found"); } - *endpoint = new ShardEndpoint(_primary->getId(), ChunkVersion::UNSHARDED()); + *endpoint = new ShardEndpoint(_routingInfo->primary()->getId(), ChunkVersion::UNSHARDED()); } return Status::OK(); @@ -376,14 +343,14 @@ Status ChunkManagerTargeter::targetUpdate( UpdateType updateType = getUpdateExprType(updateDoc.getUpdateExpr()); if (updateType == UpdateType_Unknown) { - return Status(ErrorCodes::UnsupportedFormat, - stream() << "update document " << updateExpr - << " has mixed $operator and non-$operator style fields"); + return {ErrorCodes::UnsupportedFormat, + str::stream() << "update document " << updateExpr + << " has mixed $operator and non-$operator style fields"}; } BSONObj shardKey; - if (_manager) { + if (_routingInfo->cm()) { // // Sharded collections have the following futher requirements for targeting: // @@ -395,7 +362,7 @@ Status ChunkManagerTargeter::targetUpdate( if (updateType == UpdateType_OpStyle) { // Target using the query StatusWith<BSONObj> status = - _manager->getShardKeyPattern().extractShardKeyFromQuery(opCtx, query); + _routingInfo->cm()->getShardKeyPattern().extractShardKeyFromQuery(opCtx, query); // Bad query if (!status.isOK()) @@ -404,7 +371,7 @@ Status ChunkManagerTargeter::targetUpdate( shardKey = status.getValue(); } else { // Target using the replacement document - shardKey = _manager->getShardKeyPattern().extractShardKeyFromDoc(updateExpr); + shardKey = _routingInfo->cm()->getShardKeyPattern().extractShardKeyFromDoc(updateExpr); } // Check shard key size on upsert. @@ -431,13 +398,13 @@ Status ChunkManagerTargeter::targetUpdate( // We failed to target a single shard. // Upserts are required to target a single shard. - if (_manager && updateDoc.getUpsert()) { + if (_routingInfo->cm() && updateDoc.getUpsert()) { return Status(ErrorCodes::ShardKeyNotFound, str::stream() << "An upsert on a sharded collection must contain the shard " "key and have the simple collation. Update request: " << updateDoc.toBSON() << ", shard key pattern: " - << _manager->getShardKeyPattern().toString()); + << _routingInfo->cm()->getShardKeyPattern().toString()); } // Parse update query. @@ -454,8 +421,8 @@ Status ChunkManagerTargeter::targetUpdate( } // Single (non-multi) updates must target a single shard or be exact-ID. - if (_manager && !updateDoc.getMulti() && - !isExactIdQuery(opCtx, *cq.getValue(), _manager.get())) { + if (_routingInfo->cm() && !updateDoc.getMulti() && + !isExactIdQuery(opCtx, *cq.getValue(), _routingInfo->cm().get())) { return Status(ErrorCodes::ShardKeyNotFound, str::stream() << "A single update on a sharded collection must contain an exact " @@ -464,7 +431,7 @@ Status ChunkManagerTargeter::targetUpdate( "request: " << updateDoc.toBSON() << ", shard key pattern: " - << _manager->getShardKeyPattern().toString()); + << _routingInfo->cm()->getShardKeyPattern().toString()); } if (updateType == UpdateType_OpStyle) { @@ -480,7 +447,7 @@ Status ChunkManagerTargeter::targetDelete( std::vector<std::unique_ptr<ShardEndpoint>>* endpoints) const { BSONObj shardKey; - if (_manager) { + if (_routingInfo->cm()) { // // Sharded collections have the following further requirements for targeting: // @@ -489,7 +456,8 @@ Status ChunkManagerTargeter::targetDelete( // Get the shard key StatusWith<BSONObj> status = - _manager->getShardKeyPattern().extractShardKeyFromQuery(opCtx, deleteDoc.getQuery()); + _routingInfo->cm()->getShardKeyPattern().extractShardKeyFromQuery(opCtx, + deleteDoc.getQuery()); // Bad query if (!status.isOK()) @@ -527,8 +495,8 @@ Status ChunkManagerTargeter::targetDelete( } // Single deletes must target a single shard or be exact-ID. - if (_manager && deleteDoc.getLimit() == 1 && - !isExactIdQuery(opCtx, *cq.getValue(), _manager.get())) { + if (_routingInfo->cm() && deleteDoc.getLimit() == 1 && + !isExactIdQuery(opCtx, *cq.getValue(), _routingInfo->cm().get())) { return Status(ErrorCodes::ShardKeyNotFound, str::stream() << "A single delete on a sharded collection must contain an exact " @@ -537,7 +505,7 @@ Status ChunkManagerTargeter::targetDelete( "request: " << deleteDoc.toBSON() << ", shard key pattern: " - << _manager->getShardKeyPattern().toString()); + << _routingInfo->cm()->getShardKeyPattern().toString()); } return targetQuery(opCtx, deleteDoc.getQuery(), collation, endpoints); @@ -558,26 +526,28 @@ Status ChunkManagerTargeter::targetQuery( const BSONObj& query, const BSONObj& collation, std::vector<std::unique_ptr<ShardEndpoint>>* endpoints) const { - if (!_primary && !_manager) { - return Status(ErrorCodes::NamespaceNotFound, - stream() << "could not target query in " << getNS().ns() - << "; no metadata found"); + if (!_routingInfo->primary() && !_routingInfo->cm()) { + return {ErrorCodes::NamespaceNotFound, + str::stream() << "could not target query in " << getNS().ns() + << "; no metadata found"}; } - set<ShardId> shardIds; - if (_manager) { + std::set<ShardId> shardIds; + if (_routingInfo->cm()) { try { - _manager->getShardIdsForQuery(opCtx, query, collation, &shardIds); + _routingInfo->cm()->getShardIdsForQuery(opCtx, query, collation, &shardIds); } catch (const DBException& ex) { return ex.toStatus(); } } else { - shardIds.insert(_primary->getId()); + shardIds.insert(_routingInfo->primary()->getId()); } for (const ShardId& shardId : shardIds) { endpoints->push_back(stdx::make_unique<ShardEndpoint>( - shardId, _manager ? _manager->getVersion(shardId) : ChunkVersion::UNSHARDED())); + shardId, + _routingInfo->cm() ? _routingInfo->cm()->getVersion(shardId) + : ChunkVersion::UNSHARDED())); } return Status::OK(); @@ -586,7 +556,7 @@ Status ChunkManagerTargeter::targetQuery( std::unique_ptr<ShardEndpoint> ChunkManagerTargeter::targetShardKey(const BSONObj& shardKey, const BSONObj& collation, long long estDataSize) const { - auto chunk = _manager->findIntersectingChunk(shardKey, collation); + auto chunk = _routingInfo->cm()->findIntersectingChunk(shardKey, collation); // Track autosplit stats for sharded collections // Note: this is only best effort accounting and is not accurate. @@ -595,27 +565,29 @@ std::unique_ptr<ShardEndpoint> ChunkManagerTargeter::targetShardKey(const BSONOb } return stdx::make_unique<ShardEndpoint>(chunk->getShardId(), - _manager->getVersion(chunk->getShardId())); + _routingInfo->cm()->getVersion(chunk->getShardId())); } Status ChunkManagerTargeter::targetCollection( std::vector<std::unique_ptr<ShardEndpoint>>* endpoints) const { - if (!_primary && !_manager) { - return Status(ErrorCodes::NamespaceNotFound, - str::stream() << "could not target full range of " << getNS().ns() - << "; metadata not found"); + if (!_routingInfo->primary() && !_routingInfo->cm()) { + return {ErrorCodes::NamespaceNotFound, + str::stream() << "could not target full range of " << getNS().ns() + << "; metadata not found"}; } - set<ShardId> shardIds; - if (_manager) { - _manager->getAllShardIds(&shardIds); + std::set<ShardId> shardIds; + if (_routingInfo->cm()) { + _routingInfo->cm()->getAllShardIds(&shardIds); } else { - shardIds.insert(_primary->getId()); + shardIds.insert(_routingInfo->primary()->getId()); } for (const ShardId& shardId : shardIds) { endpoints->push_back(stdx::make_unique<ShardEndpoint>( - shardId, _manager ? _manager->getVersion(shardId) : ChunkVersion::UNSHARDED())); + shardId, + _routingInfo->cm() ? _routingInfo->cm()->getVersion(shardId) + : ChunkVersion::UNSHARDED())); } return Status::OK(); @@ -623,19 +595,20 @@ Status ChunkManagerTargeter::targetCollection( Status ChunkManagerTargeter::targetAllShards( std::vector<std::unique_ptr<ShardEndpoint>>* endpoints) const { - if (!_primary && !_manager) { - return Status(ErrorCodes::NamespaceNotFound, - str::stream() << "could not target every shard with versions for " - << getNS().ns() - << "; metadata not found"); + if (!_routingInfo->primary() && !_routingInfo->cm()) { + return {ErrorCodes::NamespaceNotFound, + str::stream() << "could not target every shard with versions for " << getNS().ns() + << "; metadata not found"}; } - vector<ShardId> shardIds; + std::vector<ShardId> shardIds; grid.shardRegistry()->getAllShardIds(&shardIds); for (const ShardId& shardId : shardIds) { endpoints->push_back(stdx::make_unique<ShardEndpoint>( - shardId, _manager ? _manager->getVersion(shardId) : ChunkVersion::UNSHARDED())); + shardId, + _routingInfo->cm() ? _routingInfo->cm()->getVersion(shardId) + : ChunkVersion::UNSHARDED())); } return Status::OK(); @@ -649,8 +622,7 @@ void ChunkManagerTargeter::noteStaleResponse(const ShardEndpoint& endpoint, if (staleInfo["vWanted"].eoo()) { // If we don't have a vWanted sent, assume the version is higher than our current // version. - remoteShardVersion = - getShardVersion(endpoint.shardName.toString(), _manager.get(), _primary.get()); + remoteShardVersion = getShardVersion(*_routingInfo, endpoint.shardName); remoteShardVersion.incMajor(); } else { remoteShardVersion = ChunkVersion::fromBSON(staleInfo, "vWanted"); @@ -699,18 +671,14 @@ Status ChunkManagerTargeter::refreshIfNeeded(OperationContext* opCtx, bool* wasC // Get the latest metadata information from the cache if there were issues // - shared_ptr<ChunkManager> lastManager = _manager; - shared_ptr<Shard> lastPrimary = _primary; + auto lastManager = _routingInfo->cm(); + auto lastPrimary = _routingInfo->primary(); - auto scopedCMStatus = ScopedChunkManager::getOrCreate(opCtx, _nss); - if (!scopedCMStatus.isOK()) { - return scopedCMStatus.getStatus(); + auto initStatus = init(opCtx); + if (!initStatus.isOK()) { + return initStatus; } - const auto& scopedCM = scopedCMStatus.getValue(); - _manager = scopedCM.cm(); - _primary = scopedCM.primary(); - // We now have the latest metadata from the cache. // @@ -718,8 +686,6 @@ Status ChunkManagerTargeter::refreshIfNeeded(OperationContext* opCtx, bool* wasC // Either we couldn't target at all, or we have stale versions, but not both. // - dassert(!(_needsTargetingRefresh && !_remoteShardVersions.empty())); - if (_needsTargetingRefresh) { // Reset the field _needsTargetingRefresh = false; @@ -728,63 +694,44 @@ Status ChunkManagerTargeter::refreshIfNeeded(OperationContext* opCtx, bool* wasC // the // metadata since we last got it from the cache. - bool alreadyRefreshed = wasMetadataRefreshed(lastManager, lastPrimary, _manager, _primary); + bool alreadyRefreshed = wasMetadataRefreshed( + lastManager, lastPrimary, _routingInfo->cm(), _routingInfo->primary()); // If didn't already refresh the targeting information, refresh it if (!alreadyRefreshed) { // To match previous behavior, we just need an incremental refresh here - return refreshNow(opCtx, RefreshType_RefreshChunkManager); + return refreshNow(opCtx); } - *wasChanged = isMetadataDifferent(lastManager, lastPrimary, _manager, _primary); + *wasChanged = isMetadataDifferent( + lastManager, lastPrimary, _routingInfo->cm(), _routingInfo->primary()); return Status::OK(); } else if (!_remoteShardVersions.empty()) { // If we got stale shard versions from remote shards, we may need to refresh // NOTE: Not sure yet if this can happen simultaneously with targeting issues - CompareResult result = - compareAllShardVersions(_manager.get(), _primary.get(), _remoteShardVersions); + CompareResult result = compareAllShardVersions(*_routingInfo, _remoteShardVersions); + // Reset the versions _remoteShardVersions.clear(); - if (result == CompareResult_Unknown) { + if (result == CompareResult_Unknown || result == CompareResult_LT) { // Our current shard versions aren't all comparable to the old versions, maybe drop - return refreshNow(opCtx, RefreshType_ReloadDatabase); - } else if (result == CompareResult_LT) { - // Our current shard versions are less than the remote versions, but no drop - return refreshNow(opCtx, RefreshType_RefreshChunkManager); + return refreshNow(opCtx); } - *wasChanged = isMetadataDifferent(lastManager, lastPrimary, _manager, _primary); + *wasChanged = isMetadataDifferent( + lastManager, lastPrimary, _routingInfo->cm(), _routingInfo->primary()); return Status::OK(); } - // unreachable - dassert(false); - return Status::OK(); + MONGO_UNREACHABLE; } -Status ChunkManagerTargeter::refreshNow(OperationContext* opCtx, RefreshType refreshType) { - if (refreshType == RefreshType_ReloadDatabase) { - Grid::get(opCtx)->catalogCache()->invalidate(_nss.db().toString()); - } - - // Try not to spam the configs - refreshBackoff(); +Status ChunkManagerTargeter::refreshNow(OperationContext* opCtx) { + Grid::get(opCtx)->catalogCache()->onStaleConfigError(std::move(*_routingInfo)); - ScopedChunkManager::refreshAndGet(opCtx, _nss); - - auto scopedCMStatus = ScopedChunkManager::get(opCtx, _nss); - if (!scopedCMStatus.isOK()) { - return scopedCMStatus.getStatus(); - } - - const auto& scopedCM = scopedCMStatus.getValue(); - - _manager = scopedCM.cm(); - _primary = scopedCM.primary(); - - return Status::OK(); + return init(opCtx); } } // namespace mongo diff --git a/src/mongo/s/commands/chunk_manager_targeter.cpp-8454ea5f b/src/mongo/s/commands/chunk_manager_targeter.cpp-8454ea5f new file mode 100644 index 00000000000..42de7ba2903 --- /dev/null +++ b/src/mongo/s/commands/chunk_manager_targeter.cpp-8454ea5f @@ -0,0 +1,737 @@ +/** + * Copyright (C) 2013 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding + +#include "mongo/platform/basic.h" + +#include "mongo/s/commands/chunk_manager_targeter.h" + +#include "mongo/db/matcher/extensions_callback_noop.h" +#include "mongo/db/operation_context.h" +#include "mongo/db/query/canonical_query.h" +#include "mongo/db/query/collation/collation_index_key.h" +#include "mongo/s/client/shard_registry.h" +#include "mongo/s/commands/cluster_commands_common.h" +#include "mongo/s/grid.h" +#include "mongo/s/shard_key_pattern.h" +#include "mongo/util/log.h" +#include "mongo/util/mongoutils/str.h" + +namespace mongo { +namespace { + +enum UpdateType { UpdateType_Replacement, UpdateType_OpStyle, UpdateType_Unknown }; + +enum CompareResult { CompareResult_Unknown, CompareResult_GTE, CompareResult_LT }; + +const ShardKeyPattern virtualIdShardKey(BSON("_id" << 1)); + +/** + * There are two styles of update expressions: + * + * Replacement style: coll.update({ x : 1 }, { y : 2 }) + * OpStyle: coll.update({ x : 1 }, { $set : { y : 2 } }) + */ +UpdateType getUpdateExprType(const BSONObj& updateExpr) { + // Empty update is replacement-style, by default + if (updateExpr.isEmpty()) { + return UpdateType_Replacement; + } + + UpdateType updateType = UpdateType_Unknown; + + BSONObjIterator it(updateExpr); + while (it.more()) { + BSONElement next = it.next(); + + if (next.fieldName()[0] == '$') { + if (updateType == UpdateType_Unknown) { + updateType = UpdateType_OpStyle; + } else if (updateType == UpdateType_Replacement) { + return UpdateType_Unknown; + } + } else { + if (updateType == UpdateType_Unknown) { + updateType = UpdateType_Replacement; + } else if (updateType == UpdateType_OpStyle) { + return UpdateType_Unknown; + } + } + } + + return updateType; +} + +/** + * This returns "does the query have an _id field" and "is the _id field querying for a direct + * value like _id : 3 and not _id : { $gt : 3 }" + * + * If the query does not use the collection default collation, the _id field cannot contain strings, + * objects, or arrays. + * + * Ex: { _id : 1 } => true + * { foo : <anything>, _id : 1 } => true + * { _id : { $lt : 30 } } => false + * { foo : <anything> } => false + */ +bool isExactIdQuery(OperationContext* opCtx, const CanonicalQuery& query, ChunkManager* manager) { + auto shardKey = virtualIdShardKey.extractShardKeyFromQuery(query); + BSONElement idElt = shardKey["_id"]; + + if (!idElt) { + return false; + } + + if (CollationIndexKey::isCollatableType(idElt.type()) && manager && + !query.getQueryRequest().getCollation().isEmpty() && + !CollatorInterface::collatorsMatch(query.getCollator(), manager->getDefaultCollator())) { + + // The collation applies to the _id field, but the user specified a collation which doesn't + // match the collection default. + return false; + } + + return true; +} + +// +// Utilities to compare shard versions +// + +/** + * Returns the relationship of two shard versions. Shard versions of a collection that has not + * been dropped and recreated and where there is at least one chunk on a shard are comparable, + * otherwise the result is ambiguous. + */ +CompareResult compareShardVersions(const ChunkVersion& shardVersionA, + const ChunkVersion& shardVersionB) { + // Collection may have been dropped + if (!shardVersionA.hasEqualEpoch(shardVersionB)) { + return CompareResult_Unknown; + } + + // Zero shard versions are only comparable to themselves + if (!shardVersionA.isSet() || !shardVersionB.isSet()) { + // If both are zero... + if (!shardVersionA.isSet() && !shardVersionB.isSet()) { + return CompareResult_GTE; + } + + return CompareResult_Unknown; + } + + if (shardVersionA < shardVersionB) + return CompareResult_LT; + else + return CompareResult_GTE; +} + +ChunkVersion getShardVersion(const CachedCollectionRoutingInfo& routingInfo, + const ShardId& shardId) { + if (routingInfo.cm()) { + return routingInfo.cm()->getVersion(shardId); + } + + return ChunkVersion::UNSHARDED(); +} + +/** + * Returns the relationship between two maps of shard versions. As above, these maps are often + * comparable when the collection has not been dropped and there is at least one chunk on the + * shards. If any versions in the maps are not comparable, the result is _Unknown. + * + * If any versions in the first map (cached) are _LT the versions in the second map (remote), + * the first (cached) versions are _LT the second (remote) versions. + * + * Note that the signature here is weird since our cached map of chunk versions is stored in a + * ChunkManager or is implicit in the primary shard of the collection. + */ +CompareResult compareAllShardVersions(const CachedCollectionRoutingInfo& routingInfo, + const ShardVersionMap& remoteShardVersions) { + CompareResult finalResult = CompareResult_GTE; + + for (const auto& shardVersionEntry : remoteShardVersions) { + const ShardId& shardId = shardVersionEntry.first; + const ChunkVersion& remoteShardVersion = shardVersionEntry.second; + + ChunkVersion cachedShardVersion; + + try { + // Throws b/c shard constructor throws + cachedShardVersion = getShardVersion(routingInfo, shardId); + } catch (const DBException& ex) { + warning() << "could not lookup shard " << shardId + << " in local cache, shard metadata may have changed" + << " or be unavailable" << causedBy(ex); + + return CompareResult_Unknown; + } + + // Compare the remote and cached versions + CompareResult result = compareShardVersions(cachedShardVersion, remoteShardVersion); + + if (result == CompareResult_Unknown) + return result; + + if (result == CompareResult_LT) + finalResult = CompareResult_LT; + + // Note that we keep going after _LT b/c there could be more _Unknowns. + } + + return finalResult; +} + +/** + * Whether or not the manager/primary pair is different from the other manager/primary pair. + */ +bool isMetadataDifferent(const std::shared_ptr<ChunkManager>& managerA, + const std::shared_ptr<Shard>& primaryA, + const std::shared_ptr<ChunkManager>& managerB, + const std::shared_ptr<Shard>& primaryB) { + if ((managerA && !managerB) || (!managerA && managerB) || (primaryA && !primaryB) || + (!primaryA && primaryB)) + return true; + + if (managerA) { + return !managerA->getVersion().isStrictlyEqualTo(managerB->getVersion()); + } + + dassert(NULL != primaryA.get()); + return primaryA->getId() != primaryB->getId(); +} + +/** +* Whether or not the manager/primary pair was changed or refreshed from a previous version +* of the metadata. +*/ +bool wasMetadataRefreshed(const std::shared_ptr<ChunkManager>& managerA, + const std::shared_ptr<Shard>& primaryA, + const std::shared_ptr<ChunkManager>& managerB, + const std::shared_ptr<Shard>& primaryB) { + if (isMetadataDifferent(managerA, primaryA, managerB, primaryB)) + return true; + + if (managerA) { + dassert(managerB.get()); // otherwise metadata would be different + return managerA->getSequenceNumber() != managerB->getSequenceNumber(); + } + + return false; +} + +} // namespace + +ChunkManagerTargeter::ChunkManagerTargeter(const NamespaceString& nss, TargeterStats* stats) + : _nss(nss), _needsTargetingRefresh(false), _stats(stats) {} + + +Status ChunkManagerTargeter::init(OperationContext* opCtx) { + auto shardDbStatus = createShardDatabase(opCtx, _nss.db()); + if (!shardDbStatus.isOK()) { + return shardDbStatus.getStatus(); + } + + const auto routingInfoStatus = + Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, _nss); + if (!routingInfoStatus.isOK()) { + return routingInfoStatus.getStatus(); + } + + _routingInfo = std::move(routingInfoStatus.getValue()); + + return Status::OK(); +} + +const NamespaceString& ChunkManagerTargeter::getNS() const { + return _nss; +} + +Status ChunkManagerTargeter::targetInsert(OperationContext* opCtx, + const BSONObj& doc, + ShardEndpoint** endpoint) const { + BSONObj shardKey; + + if (_routingInfo->cm()) { + // + // Sharded collections have the following requirements for targeting: + // + // Inserts must contain the exact shard key. + // + + shardKey = _routingInfo->cm()->getShardKeyPattern().extractShardKeyFromDoc(doc); + + // Check shard key exists + if (shardKey.isEmpty()) { + return {ErrorCodes::ShardKeyNotFound, + str::stream() << "document " << doc + << " does not contain shard key for pattern " + << _routingInfo->cm()->getShardKeyPattern().toString()}; + } + + // Check shard key size on insert + Status status = ShardKeyPattern::checkShardKeySize(shardKey); + if (!status.isOK()) + return status; + } + + // Target the shard key or database primary + if (!shardKey.isEmpty()) { + *endpoint = targetShardKey(shardKey, CollationSpec::kSimpleSpec, doc.objsize()).release(); + } else { + if (!_routingInfo->primary()) { + return Status(ErrorCodes::NamespaceNotFound, + str::stream() << "could not target insert in collection " << getNS().ns() + << "; no metadata found"); + } + + *endpoint = new ShardEndpoint(_routingInfo->primary()->getId(), ChunkVersion::UNSHARDED()); + } + + return Status::OK(); +} + +Status ChunkManagerTargeter::targetUpdate( + OperationContext* opCtx, + const BatchedUpdateDocument& updateDoc, + std::vector<std::unique_ptr<ShardEndpoint>>* endpoints) const { + // + // Update targeting may use either the query or the update. This is to support save-style + // updates, of the form: + // + // coll.update({ _id : xxx }, { _id : xxx, shardKey : 1, foo : bar }, { upsert : true }) + // + // Because drivers do not know the shard key, they can't pull the shard key automatically + // into the query doc, and to correctly support upsert we must target a single shard. + // + // The rule is simple - If the update is replacement style (no '$set'), we target using the + // update. If the update is replacement style, we target using the query. + // + // If we have the exact shard key in either the query or replacement doc, we target using + // that extracted key. + // + + BSONObj query = updateDoc.getQuery(); + BSONObj updateExpr = updateDoc.getUpdateExpr(); + + UpdateType updateType = getUpdateExprType(updateDoc.getUpdateExpr()); + + if (updateType == UpdateType_Unknown) { + return {ErrorCodes::UnsupportedFormat, + str::stream() << "update document " << updateExpr + << " has mixed $operator and non-$operator style fields"}; + } + + BSONObj shardKey; + + if (_routingInfo->cm()) { + // + // Sharded collections have the following futher requirements for targeting: + // + // Upserts must be targeted exactly by shard key. + // Non-multi updates must be targeted exactly by shard key *or* exact _id. + // + + // Get the shard key + if (updateType == UpdateType_OpStyle) { + // Target using the query + StatusWith<BSONObj> status = + _routingInfo->cm()->getShardKeyPattern().extractShardKeyFromQuery(opCtx, query); + + // Bad query + if (!status.isOK()) + return status.getStatus(); + + shardKey = status.getValue(); + } else { + // Target using the replacement document + shardKey = _routingInfo->cm()->getShardKeyPattern().extractShardKeyFromDoc(updateExpr); + } + + // Check shard key size on upsert. + if (updateDoc.getUpsert()) { + Status status = ShardKeyPattern::checkShardKeySize(shardKey); + if (!status.isOK()) + return status; + } + } + + const BSONObj collation = updateDoc.isCollationSet() ? updateDoc.getCollation() : BSONObj(); + + // Target the shard key, query, or replacement doc + if (!shardKey.isEmpty()) { + try { + endpoints->push_back( + targetShardKey(shardKey, collation, (query.objsize() + updateExpr.objsize()))); + return Status::OK(); + } catch (const DBException&) { + // This update is potentially not constrained to a single shard + } + } + + // We failed to target a single shard. + + // Upserts are required to target a single shard. + if (_routingInfo->cm() && updateDoc.getUpsert()) { + return Status(ErrorCodes::ShardKeyNotFound, + str::stream() << "An upsert on a sharded collection must contain the shard " + "key and have the simple collation. Update request: " + << updateDoc.toBSON() + << ", shard key pattern: " + << _routingInfo->cm()->getShardKeyPattern().toString()); + } + + // Parse update query. + auto qr = stdx::make_unique<QueryRequest>(getNS()); + qr->setFilter(updateDoc.getQuery()); + if (!collation.isEmpty()) { + qr->setCollation(collation); + } + auto cq = CanonicalQuery::canonicalize(opCtx, std::move(qr), ExtensionsCallbackNoop()); + if (!cq.isOK()) { + return Status(cq.getStatus().code(), + str::stream() << "Could not parse update query " << updateDoc.getQuery() + << causedBy(cq.getStatus())); + } + + // Single (non-multi) updates must target a single shard or be exact-ID. + if (_routingInfo->cm() && !updateDoc.getMulti() && + !isExactIdQuery(opCtx, *cq.getValue(), _routingInfo->cm().get())) { + return Status(ErrorCodes::ShardKeyNotFound, + str::stream() + << "A single update on a sharded collection must contain an exact " + "match on _id (and have the collection default collation) or " + "contain the shard key (and have the simple collation). Update " + "request: " + << updateDoc.toBSON() + << ", shard key pattern: " + << _routingInfo->cm()->getShardKeyPattern().toString()); + } + + if (updateType == UpdateType_OpStyle) { + return targetQuery(opCtx, query, collation, endpoints); + } else { + return targetDoc(opCtx, updateExpr, collation, endpoints); + } +} + +Status ChunkManagerTargeter::targetDelete( + OperationContext* opCtx, + const BatchedDeleteDocument& deleteDoc, + std::vector<std::unique_ptr<ShardEndpoint>>* endpoints) const { + BSONObj shardKey; + + if (_routingInfo->cm()) { + // + // Sharded collections have the following further requirements for targeting: + // + // Limit-1 deletes must be targeted exactly by shard key *or* exact _id + // + + // Get the shard key + StatusWith<BSONObj> status = + _routingInfo->cm()->getShardKeyPattern().extractShardKeyFromQuery(opCtx, + deleteDoc.getQuery()); + + // Bad query + if (!status.isOK()) + return status.getStatus(); + + shardKey = status.getValue(); + } + + const BSONObj collation = deleteDoc.isCollationSet() ? deleteDoc.getCollation() : BSONObj(); + + + // Target the shard key or delete query + if (!shardKey.isEmpty()) { + try { + endpoints->push_back(targetShardKey(shardKey, collation, 0)); + return Status::OK(); + } catch (const DBException&) { + // This delete is potentially not constrained to a single shard + } + } + + // We failed to target a single shard. + + // Parse delete query. + auto qr = stdx::make_unique<QueryRequest>(getNS()); + qr->setFilter(deleteDoc.getQuery()); + if (!collation.isEmpty()) { + qr->setCollation(collation); + } + auto cq = CanonicalQuery::canonicalize(opCtx, std::move(qr), ExtensionsCallbackNoop()); + if (!cq.isOK()) { + return Status(cq.getStatus().code(), + str::stream() << "Could not parse delete query " << deleteDoc.getQuery() + << causedBy(cq.getStatus())); + } + + // Single deletes must target a single shard or be exact-ID. + if (_routingInfo->cm() && deleteDoc.getLimit() == 1 && + !isExactIdQuery(opCtx, *cq.getValue(), _routingInfo->cm().get())) { + return Status(ErrorCodes::ShardKeyNotFound, + str::stream() + << "A single delete on a sharded collection must contain an exact " + "match on _id (and have the collection default collation) or " + "contain the shard key (and have the simple collation). Delete " + "request: " + << deleteDoc.toBSON() + << ", shard key pattern: " + << _routingInfo->cm()->getShardKeyPattern().toString()); + } + + return targetQuery(opCtx, deleteDoc.getQuery(), collation, endpoints); +} + +Status ChunkManagerTargeter::targetDoc( + OperationContext* opCtx, + const BSONObj& doc, + const BSONObj& collation, + std::vector<std::unique_ptr<ShardEndpoint>>* endpoints) const { + // NOTE: This is weird and fragile, but it's the way our language works right now - + // documents are either A) invalid or B) valid equality queries over themselves. + return targetQuery(opCtx, doc, collation, endpoints); +} + +Status ChunkManagerTargeter::targetQuery( + OperationContext* opCtx, + const BSONObj& query, + const BSONObj& collation, + std::vector<std::unique_ptr<ShardEndpoint>>* endpoints) const { + if (!_routingInfo->primary() && !_routingInfo->cm()) { + return {ErrorCodes::NamespaceNotFound, + str::stream() << "could not target query in " << getNS().ns() + << "; no metadata found"}; + } + + std::set<ShardId> shardIds; + if (_routingInfo->cm()) { + try { + _routingInfo->cm()->getShardIdsForQuery(opCtx, query, collation, &shardIds); + } catch (const DBException& ex) { + return ex.toStatus(); + } + } else { + shardIds.insert(_routingInfo->primary()->getId()); + } + + for (const ShardId& shardId : shardIds) { + endpoints->push_back(stdx::make_unique<ShardEndpoint>( + shardId, + _routingInfo->cm() ? _routingInfo->cm()->getVersion(shardId) + : ChunkVersion::UNSHARDED())); + } + + return Status::OK(); +} + +std::unique_ptr<ShardEndpoint> ChunkManagerTargeter::targetShardKey(const BSONObj& shardKey, + const BSONObj& collation, + long long estDataSize) const { + auto chunk = _routingInfo->cm()->findIntersectingChunk(shardKey, collation); + + // Track autosplit stats for sharded collections + // Note: this is only best effort accounting and is not accurate. + if (estDataSize > 0) { + _stats->chunkSizeDelta[chunk->getMin()] += estDataSize; + } + + return stdx::make_unique<ShardEndpoint>(chunk->getShardId(), + _routingInfo->cm()->getVersion(chunk->getShardId())); +} + +Status ChunkManagerTargeter::targetCollection( + std::vector<std::unique_ptr<ShardEndpoint>>* endpoints) const { + if (!_routingInfo->primary() && !_routingInfo->cm()) { + return {ErrorCodes::NamespaceNotFound, + str::stream() << "could not target full range of " << getNS().ns() + << "; metadata not found"}; + } + + std::set<ShardId> shardIds; + if (_routingInfo->cm()) { + _routingInfo->cm()->getAllShardIds(&shardIds); + } else { + shardIds.insert(_routingInfo->primary()->getId()); + } + + for (const ShardId& shardId : shardIds) { + endpoints->push_back(stdx::make_unique<ShardEndpoint>( + shardId, + _routingInfo->cm() ? _routingInfo->cm()->getVersion(shardId) + : ChunkVersion::UNSHARDED())); + } + + return Status::OK(); +} + +Status ChunkManagerTargeter::targetAllShards( + std::vector<std::unique_ptr<ShardEndpoint>>* endpoints) const { + if (!_routingInfo->primary() && !_routingInfo->cm()) { + return {ErrorCodes::NamespaceNotFound, + str::stream() << "could not target every shard with versions for " << getNS().ns() + << "; metadata not found"}; + } + + std::vector<ShardId> shardIds; + grid.shardRegistry()->getAllShardIds(&shardIds); + + for (const ShardId& shardId : shardIds) { + endpoints->push_back(stdx::make_unique<ShardEndpoint>( + shardId, + _routingInfo->cm() ? _routingInfo->cm()->getVersion(shardId) + : ChunkVersion::UNSHARDED())); + } + + return Status::OK(); +} + +void ChunkManagerTargeter::noteStaleResponse(const ShardEndpoint& endpoint, + const BSONObj& staleInfo) { + dassert(!_needsTargetingRefresh); + + ChunkVersion remoteShardVersion; + if (staleInfo["vWanted"].eoo()) { + // If we don't have a vWanted sent, assume the version is higher than our current + // version. + remoteShardVersion = getShardVersion(*_routingInfo, endpoint.shardName); + remoteShardVersion.incMajor(); + } else { + remoteShardVersion = ChunkVersion::fromBSON(staleInfo, "vWanted"); + } + + ShardVersionMap::iterator it = _remoteShardVersions.find(endpoint.shardName); + if (it == _remoteShardVersions.end()) { + _remoteShardVersions.insert(std::make_pair(endpoint.shardName, remoteShardVersion)); + } else { + ChunkVersion& previouslyNotedVersion = it->second; + if (previouslyNotedVersion.hasEqualEpoch(remoteShardVersion)) { + if (previouslyNotedVersion.isOlderThan(remoteShardVersion)) { + previouslyNotedVersion = remoteShardVersion; + } + } else { + // Epoch changed midway while applying the batch so set the version to something + // unique + // and non-existent to force a reload when refreshIsNeeded is called. + previouslyNotedVersion = ChunkVersion::IGNORED(); + } + } +} + +void ChunkManagerTargeter::noteCouldNotTarget() { + dassert(_remoteShardVersions.empty()); + _needsTargetingRefresh = true; +} + +Status ChunkManagerTargeter::refreshIfNeeded(OperationContext* opCtx, bool* wasChanged) { + bool dummy; + if (!wasChanged) { + wasChanged = &dummy; + } + + *wasChanged = false; + + // + // Did we have any stale config or targeting errors at all? + // + + if (!_needsTargetingRefresh && _remoteShardVersions.empty()) { + return Status::OK(); + } + + // + // Get the latest metadata information from the cache if there were issues + // + + auto lastManager = _routingInfo->cm(); + auto lastPrimary = _routingInfo->primary(); + + auto initStatus = init(opCtx); + if (!initStatus.isOK()) { + return initStatus; + } + + // We now have the latest metadata from the cache. + + // + // See if and how we need to do a remote refresh. + // Either we couldn't target at all, or we have stale versions, but not both. + // + + if (_needsTargetingRefresh) { + // Reset the field + _needsTargetingRefresh = false; + + // If we couldn't target, we might need to refresh if we haven't remotely refreshed + // the + // metadata since we last got it from the cache. + + bool alreadyRefreshed = wasMetadataRefreshed( + lastManager, lastPrimary, _routingInfo->cm(), _routingInfo->primary()); + + // If didn't already refresh the targeting information, refresh it + if (!alreadyRefreshed) { + // To match previous behavior, we just need an incremental refresh here + return refreshNow(opCtx); + } + + *wasChanged = isMetadataDifferent( + lastManager, lastPrimary, _routingInfo->cm(), _routingInfo->primary()); + return Status::OK(); + } else if (!_remoteShardVersions.empty()) { + // If we got stale shard versions from remote shards, we may need to refresh + // NOTE: Not sure yet if this can happen simultaneously with targeting issues + + CompareResult result = compareAllShardVersions(*_routingInfo, _remoteShardVersions); + + // Reset the versions + _remoteShardVersions.clear(); + + if (result == CompareResult_Unknown || result == CompareResult_LT) { + // Our current shard versions aren't all comparable to the old versions, maybe drop + return refreshNow(opCtx); + } + + *wasChanged = isMetadataDifferent( + lastManager, lastPrimary, _routingInfo->cm(), _routingInfo->primary()); + return Status::OK(); + } + + MONGO_UNREACHABLE; +} + +Status ChunkManagerTargeter::refreshNow(OperationContext* opCtx) { + Grid::get(opCtx)->catalogCache()->onStaleConfigError(std::move(*_routingInfo)); + + return init(opCtx); +} + +} // namespace mongo diff --git a/src/mongo/s/commands/chunk_manager_targeter.h b/src/mongo/s/commands/chunk_manager_targeter.h index 36fe46a3fe5..97c0f4c1455 100644 --- a/src/mongo/s/commands/chunk_manager_targeter.h +++ b/src/mongo/s/commands/chunk_manager_targeter.h @@ -35,12 +35,12 @@ #include "mongo/bson/bsonobj_comparator_interface.h" #include "mongo/bson/simple_bsonobj_comparator.h" #include "mongo/db/namespace_string.h" +#include "mongo/s/catalog_cache.h" #include "mongo/s/ns_targeter.h" namespace mongo { class ChunkManager; -class CollatorInterface; class OperationContext; class Shard; struct ChunkVersion; @@ -109,21 +109,12 @@ public: Status refreshIfNeeded(OperationContext* opCtx, bool* wasChanged); private: - // Different ways we can refresh metadata - enum RefreshType { - // The version has gone up, but the collection hasn't been dropped - RefreshType_RefreshChunkManager, - // The collection may have been dropped, so we need to reload the db - RefreshType_ReloadDatabase - }; - - typedef std::map<ShardId, ChunkVersion> ShardVersionMap; - + using ShardVersionMap = std::map<ShardId, ChunkVersion>; /** * Performs an actual refresh from the config server. */ - Status refreshNow(OperationContext* opCtx, RefreshType refreshType); + Status refreshNow(OperationContext* opCtx); /** * Returns a vector of ShardEndpoints where a document might need to be placed. @@ -170,10 +161,8 @@ private: // Represents only the view and not really part of the targeter state. This is not owned here. TargeterStats* _stats; - // Zero or one of these are filled at all times - // If sharded, _manager, if unsharded, _primary, on error, neither - std::shared_ptr<ChunkManager> _manager; - std::shared_ptr<Shard> _primary; + // The latest loaded routing cache entry + boost::optional<CachedCollectionRoutingInfo> _routingInfo; // Map of shard->remote shard version reported from stale errors ShardVersionMap _remoteShardVersions; diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp index a6887ea0498..d9a182b2938 100644 --- a/src/mongo/s/commands/cluster_aggregate.cpp +++ b/src/mongo/s/commands/cluster_aggregate.cpp @@ -47,7 +47,7 @@ #include "mongo/db/views/view.h" #include "mongo/executor/task_executor_pool.h" #include "mongo/rpc/get_status_from_command_result.h" -#include "mongo/s/chunk_manager.h" +#include "mongo/s/catalog_cache.h" #include "mongo/s/client/shard_connection.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/commands/cluster_commands_common.h" @@ -55,7 +55,6 @@ #include "mongo/s/grid.h" #include "mongo/s/query/cluster_query_knobs.h" #include "mongo/s/query/store_possible_cursor.h" -#include "mongo/s/sharding_raii.h" #include "mongo/s/stale_exception.h" #include "mongo/util/log.h" @@ -66,20 +65,22 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, BSONObj cmdObj, int options, BSONObjBuilder* result) { - auto scopedShardDbStatus = - ScopedShardDatabase::getExisting(opCtx, namespaces.executionNss.db()); - if (!scopedShardDbStatus.isOK()) { - appendEmptyResultSet( - *result, scopedShardDbStatus.getStatus(), namespaces.requestedNss.ns()); - return Status::OK(); - } - auto request = AggregationRequest::parseFromBSON(namespaces.executionNss, cmdObj); if (!request.isOK()) { return request.getStatus(); } - const auto conf = scopedShardDbStatus.getValue().db(); + auto const catalogCache = Grid::get(opCtx)->catalogCache(); + + auto executionNsRoutingInfoStatus = + catalogCache->getCollectionRoutingInfo(opCtx, namespaces.executionNss); + if (!executionNsRoutingInfoStatus.isOK()) { + appendEmptyResultSet( + *result, executionNsRoutingInfoStatus.getStatus(), namespaces.requestedNss.ns()); + return Status::OK(); + } + + const auto& executionNsRoutingInfo = executionNsRoutingInfoStatus.getValue(); // Determine the appropriate collation and 'resolve' involved namespaces to make the // ExpressionContext. @@ -91,16 +92,20 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, // command on an unsharded collection. StringMap<ExpressionContext::ResolvedNamespace> resolvedNamespaces; LiteParsedPipeline liteParsedPipeline(request.getValue()); - for (auto&& ns : liteParsedPipeline.getInvolvedNamespaces()) { - uassert(28769, str::stream() << ns.ns() << " cannot be sharded", !conf->isSharded(ns.ns())); - resolvedNamespaces[ns.coll()] = {ns, std::vector<BSONObj>{}}; + for (auto&& nss : liteParsedPipeline.getInvolvedNamespaces()) { + const auto resolvedNsRoutingInfo = + uassertStatusOK(catalogCache->getCollectionRoutingInfo(opCtx, nss)); + uassert( + 28769, str::stream() << nss.ns() << " cannot be sharded", !resolvedNsRoutingInfo.cm()); + resolvedNamespaces.try_emplace(nss.coll(), nss, std::vector<BSONObj>{}); } - if (!conf->isSharded(namespaces.executionNss.ns())) { - return aggPassthrough(opCtx, namespaces, conf, cmdObj, result, options); + if (!executionNsRoutingInfo.cm()) { + return aggPassthrough( + opCtx, namespaces, executionNsRoutingInfo.primary()->getId(), cmdObj, result, options); } - auto chunkMgr = conf->getChunkManager(opCtx, namespaces.executionNss.ns()); + const auto chunkMgr = executionNsRoutingInfo.cm(); std::unique_ptr<CollatorInterface> collation; if (!request.getValue().getCollation().isEmpty()) { @@ -260,9 +265,10 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, // Run merging command on random shard, unless a stage needs the primary shard. Need to use // ShardConnection so that the merging mongod is sent the config servers on connection init. auto& prng = opCtx->getClient()->getPrng(); - const auto& mergingShardId = + const auto mergingShardId = (needPrimaryShardMerger || internalQueryAlwaysMergeOnPrimaryShard.load()) - ? conf->getPrimaryId() + ? uassertStatusOK(catalogCache->getDatabase(opCtx, namespaces.executionNss.db())) + .primaryId() : shardResults[prng.nextInt32(shardResults.size())].shardTargetId; const auto mergingShard = uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, mergingShardId)); @@ -426,12 +432,12 @@ BSONObj ClusterAggregate::aggRunCommand(OperationContext* opCtx, Status ClusterAggregate::aggPassthrough(OperationContext* opCtx, const Namespaces& namespaces, - DBConfig* conf, + const ShardId& shardId, BSONObj cmdObj, BSONObjBuilder* out, int queryOptions) { // Temporary hack. See comment on declaration for details. - auto shardStatus = Grid::get(opCtx)->shardRegistry()->getShard(opCtx, conf->getPrimaryId()); + auto shardStatus = Grid::get(opCtx)->shardRegistry()->getShard(opCtx, shardId); if (!shardStatus.isOK()) { return shardStatus.getStatus(); } diff --git a/src/mongo/s/commands/cluster_aggregate.h b/src/mongo/s/commands/cluster_aggregate.h index b0fdd5d7375..d8e29744766 100644 --- a/src/mongo/s/commands/cluster_aggregate.h +++ b/src/mongo/s/commands/cluster_aggregate.h @@ -37,11 +37,11 @@ #include "mongo/db/pipeline/document_source.h" #include "mongo/db/pipeline/document_source_merge_cursors.h" #include "mongo/s/commands/strategy.h" -#include "mongo/s/config.h" namespace mongo { class OperationContext; +class ShardId; /** * Methods for running aggregation across a sharded cluster. @@ -90,7 +90,7 @@ private: static Status aggPassthrough(OperationContext* opCtx, const Namespaces& namespaces, - DBConfig* conf, + const ShardId& shardId, BSONObj cmd, BSONObjBuilder* result, int queryOptions); diff --git a/src/mongo/s/commands/cluster_commands_common.cpp b/src/mongo/s/commands/cluster_commands_common.cpp index 3ad8c373b9a..225cef2b6ef 100644 --- a/src/mongo/s/commands/cluster_commands_common.cpp +++ b/src/mongo/s/commands/cluster_commands_common.cpp @@ -40,7 +40,6 @@ #include "mongo/s/client/shard_connection.h" #include "mongo/s/client/version_manager.h" #include "mongo/s/grid.h" -#include "mongo/s/sharding_raii.h" #include "mongo/s/stale_exception.h" #include "mongo/util/log.h" @@ -54,17 +53,21 @@ namespace { bool forceRemoteCheckShardVersionCB(OperationContext* opCtx, const string& ns) { const NamespaceString nss(ns); + if (!nss.isValid()) { + return false; + } + // This will force the database catalog entry to be reloaded - Grid::get(opCtx)->catalogCache()->invalidate(nss.db()); + Grid::get(opCtx)->catalogCache()->invalidateShardedCollection(nss); - auto scopedCMStatus = ScopedChunkManager::get(opCtx, nss); - if (!scopedCMStatus.isOK()) { + auto routingInfoStatus = Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss); + if (!routingInfoStatus.isOK()) { return false; } - const auto& scopedCM = scopedCMStatus.getValue(); + auto& routingInfo = routingInfoStatus.getValue(); - return scopedCM.cm() != nullptr; + return routingInfo.cm() != nullptr; } } // namespace @@ -261,4 +264,36 @@ std::vector<NamespaceString> getAllShardedCollectionsForDb(OperationContext* opC return collectionsToReturn; } +CachedCollectionRoutingInfo getShardedCollection(OperationContext* opCtx, + const NamespaceString& nss) { + auto routingInfo = + uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss)); + uassert(ErrorCodes::NamespaceNotSharded, + str::stream() << "Collection " << nss.ns() << " is not sharded.", + routingInfo.cm()); + + return routingInfo; +} + +StatusWith<CachedDatabaseInfo> createShardDatabase(OperationContext* opCtx, StringData dbName) { + auto dbStatus = Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, dbName); + if (dbStatus == ErrorCodes::NamespaceNotFound) { + auto createDbStatus = + Grid::get(opCtx)->catalogClient(opCtx)->createDatabase(opCtx, dbName.toString()); + if (createDbStatus.isOK() || createDbStatus == ErrorCodes::NamespaceExists) { + dbStatus = Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, dbName); + } else { + dbStatus = createDbStatus; + } + } + + if (dbStatus.isOK()) { + return dbStatus; + } + + return {dbStatus.getStatus().code(), + str::stream() << "Database " << dbName << " not found due to " + << dbStatus.getStatus().reason()}; +} + } // namespace mongo diff --git a/src/mongo/s/commands/cluster_commands_common.h b/src/mongo/s/commands/cluster_commands_common.h index 7d6465bc400..960eb03e73b 100644 --- a/src/mongo/s/commands/cluster_commands_common.h +++ b/src/mongo/s/commands/cluster_commands_common.h @@ -39,6 +39,8 @@ namespace mongo { class AScopedConnection; +class CachedCollectionRoutingInfo; +class CachedDatabaseInfo; class DBClientBase; class DBClientCursor; class OperationContext; @@ -140,4 +142,17 @@ bool appendEmptyResultSet(BSONObjBuilder& result, Status status, const std::stri std::vector<NamespaceString> getAllShardedCollectionsForDb(OperationContext* opCtx, StringData dbName); +/** + * Abstracts the common pattern of refreshing a collection and checking if it is sharded used across + * multiple commands. + */ +CachedCollectionRoutingInfo getShardedCollection(OperationContext* opCtx, + const NamespaceString& nss); + +/** + * If the specified database exists already, loads it in the cache (if not already there) and + * returns it. Otherwise, if it does not exist, this call will implicitly create it as non-sharded. + */ +StatusWith<CachedDatabaseInfo> createShardDatabase(OperationContext* opCtx, StringData dbName); + } // namespace mongo diff --git a/src/mongo/s/commands/cluster_count_cmd.cpp b/src/mongo/s/commands/cluster_count_cmd.cpp index 2fcf11086b9..45c46c26185 100644 --- a/src/mongo/s/commands/cluster_count_cmd.cpp +++ b/src/mongo/s/commands/cluster_count_cmd.cpp @@ -42,73 +42,42 @@ #include "mongo/util/timer.h" namespace mongo { - -using std::string; -using std::vector; - namespace { -long long applySkipLimit(long long num, const BSONObj& cmd) { - BSONElement s = cmd["skip"]; - BSONElement l = cmd["limit"]; - - if (s.isNumber()) { - num = num - s.numberLong(); - if (num < 0) { - num = 0; - } - } - - if (l.isNumber()) { - long long limit = l.numberLong(); - if (limit < 0) { - limit = -limit; - } - - // 0 limit means no limit - if (limit < num && limit != 0) { - num = limit; - } - } - - return num; -} - - class ClusterCountCmd : public Command { public: ClusterCountCmd() : Command("count", false) {} - virtual bool slaveOk() const { + bool slaveOk() const override { return true; } - virtual bool adminOnly() const { + bool adminOnly() const override { return false; } - - virtual bool supportsWriteConcern(const BSONObj& cmd) const override { + bool supportsWriteConcern(const BSONObj& cmd) const override { return false; } - virtual void addRequiredPrivileges(const std::string& dbname, - const BSONObj& cmdObj, - std::vector<Privilege>* out) { + void addRequiredPrivileges(const std::string& dbname, + const BSONObj& cmdObj, + std::vector<Privilege>* out) override { ActionSet actions; actions.addAction(ActionType::find); out->push_back(Privilege(parseResourcePattern(dbname, cmdObj), actions)); } - virtual bool run(OperationContext* opCtx, - const std::string& dbname, - BSONObj& cmdObj, - int options, - std::string& errmsg, - BSONObjBuilder& result) { + bool run(OperationContext* opCtx, + const std::string& dbname, + BSONObj& cmdObj, + int options, + std::string& errmsg, + BSONObjBuilder& result) override { const NamespaceString nss(parseNs(dbname, cmdObj)); - uassert( - ErrorCodes::InvalidNamespace, "count command requires valid namespace", nss.isValid()); + uassert(ErrorCodes::InvalidNamespace, + str::stream() << "Invalid namespace specified '" << nss.ns() << "'", + nss.isValid()); long long skip = 0; @@ -167,7 +136,7 @@ public: } } - vector<Strategy::CommandResult> countResult; + std::vector<Strategy::CommandResult> countResult; Strategy::commandOp(opCtx, dbname, countCmdBuilder.done(), @@ -214,20 +183,19 @@ public: long long total = 0; BSONObjBuilder shardSubTotal(result.subobjStart("shards")); - for (vector<Strategy::CommandResult>::const_iterator iter = countResult.begin(); - iter != countResult.end(); - ++iter) { - const ShardId& shardName = iter->shardTargetId; + for (const auto& resultEntry : countResult) { + const ShardId& shardName = resultEntry.shardTargetId; + const auto resultBSON = resultEntry.result; - if (iter->result["ok"].trueValue()) { - long long shardCount = iter->result["n"].numberLong(); + if (resultBSON["ok"].trueValue()) { + long long shardCount = resultBSON["n"].numberLong(); shardSubTotal.appendNumber(shardName.toString(), shardCount); total += shardCount; } else { shardSubTotal.doneFast(); errmsg = "failed on : " + shardName.toString(); - result.append("cause", iter->result); + result.append("cause", resultBSON); // Add "code" to the top-level response, if the failure of the sharded command // can be accounted to a single error @@ -247,17 +215,16 @@ public: return true; } - virtual Status explain(OperationContext* opCtx, - const std::string& dbname, - const BSONObj& cmdObj, - ExplainCommon::Verbosity verbosity, - const rpc::ServerSelectionMetadata& serverSelectionMetadata, - BSONObjBuilder* out) const { + Status explain(OperationContext* opCtx, + const std::string& dbname, + const BSONObj& cmdObj, + ExplainCommon::Verbosity verbosity, + const rpc::ServerSelectionMetadata& serverSelectionMetadata, + BSONObjBuilder* out) const override { const NamespaceString nss(parseNs(dbname, cmdObj)); - if (!nss.isValid()) { - return Status{ErrorCodes::InvalidNamespace, - str::stream() << "Invalid collection name: " << nss.ns()}; - } + uassert(ErrorCodes::InvalidNamespace, + str::stream() << "Invalid namespace specified '" << nss.ns() << "'", + nss.isValid()); // Extract the targeting query. BSONObj targetingQuery; @@ -284,7 +251,7 @@ public: // We will time how long it takes to run the commands on the shards Timer timer; - vector<Strategy::CommandResult> shardResults; + std::vector<Strategy::CommandResult> shardResults; Strategy::commandOp(opCtx, dbname, explainCmdBob.obj(), @@ -329,6 +296,33 @@ public: opCtx, shardResults, mongosStageName, millisElapsed, out); } +private: + static long long applySkipLimit(long long num, const BSONObj& cmd) { + BSONElement s = cmd["skip"]; + BSONElement l = cmd["limit"]; + + if (s.isNumber()) { + num = num - s.numberLong(); + if (num < 0) { + num = 0; + } + } + + if (l.isNumber()) { + long long limit = l.numberLong(); + if (limit < 0) { + limit = -limit; + } + + // 0 limit means no limit + if (limit < num && limit != 0) { + num = limit; + } + } + + return num; + } + } clusterCountCmd; } // namespace diff --git a/src/mongo/s/commands/cluster_drop_cmd.cpp b/src/mongo/s/commands/cluster_drop_cmd.cpp index 2c44a5a1dbc..583c8902cbf 100644 --- a/src/mongo/s/commands/cluster_drop_cmd.cpp +++ b/src/mongo/s/commands/cluster_drop_cmd.cpp @@ -41,7 +41,6 @@ #include "mongo/s/commands/cluster_commands_common.h" #include "mongo/s/commands/sharded_command_processing.h" #include "mongo/s/grid.h" -#include "mongo/s/sharding_raii.h" #include "mongo/s/stale_exception.h" #include "mongo/util/log.h" @@ -80,20 +79,20 @@ public: BSONObjBuilder& result) override { const NamespaceString nss(parseNsCollectionRequired(dbname, cmdObj)); - auto scopedDbStatus = ScopedShardDatabase::getExisting(opCtx, dbname); - if (scopedDbStatus == ErrorCodes::NamespaceNotFound) { + auto const catalogCache = Grid::get(opCtx)->catalogCache(); + + auto routingInfoStatus = catalogCache->getCollectionRoutingInfo(opCtx, nss); + if (routingInfoStatus == ErrorCodes::NamespaceNotFound) { return true; } - uassertStatusOK(scopedDbStatus.getStatus()); - - auto const db = scopedDbStatus.getValue().db(); + auto routingInfo = uassertStatusOK(std::move(routingInfoStatus)); - if (!db->isSharded(nss.ns())) { - _dropUnshardedCollectionFromShard(opCtx, db->getPrimaryId(), nss, &result); + if (!routingInfo.cm()) { + _dropUnshardedCollectionFromShard(opCtx, routingInfo.primaryId(), nss, &result); } else { uassertStatusOK(Grid::get(opCtx)->catalogClient(opCtx)->dropCollection(opCtx, nss)); - db->markNSNotSharded(nss.ns()); + catalogCache->invalidateShardedCollection(nss); } return true; diff --git a/src/mongo/s/commands/cluster_drop_database_cmd.cpp b/src/mongo/s/commands/cluster_drop_database_cmd.cpp index f86cf073273..178fc5f36bc 100644 --- a/src/mongo/s/commands/cluster_drop_database_cmd.cpp +++ b/src/mongo/s/commands/cluster_drop_database_cmd.cpp @@ -40,9 +40,7 @@ #include "mongo/s/client/shard_registry.h" #include "mongo/s/commands/cluster_commands_common.h" #include "mongo/s/commands/sharded_command_processing.h" -#include "mongo/s/config.h" #include "mongo/s/grid.h" -#include "mongo/s/sharding_raii.h" #include "mongo/util/log.h" namespace mongo { @@ -93,17 +91,19 @@ public: auto scopedDistLock = uassertStatusOK(catalogClient->getDistLockManager()->lock( opCtx, dbname, "dropDatabase", DistLockManager::kDefaultLockTimeout)); + auto const catalogCache = Grid::get(opCtx)->catalogCache(); + // Refresh the database metadata so it kicks off a full reload - Grid::get(opCtx)->catalogCache()->invalidate(dbname); + catalogCache->purgeDatabase(dbname); - auto scopedDbStatus = ScopedShardDatabase::getExisting(opCtx, dbname); + auto dbInfoStatus = catalogCache->getDatabase(opCtx, dbname); - if (scopedDbStatus == ErrorCodes::NamespaceNotFound) { + if (dbInfoStatus == ErrorCodes::NamespaceNotFound) { result.append("info", "database does not exist"); return true; } - uassertStatusOK(scopedDbStatus.getStatus()); + uassertStatusOK(dbInfoStatus.getStatus()); catalogClient->logChange(opCtx, "dropDatabase.start", @@ -111,16 +111,15 @@ public: BSONObj(), ShardingCatalogClient::kMajorityWriteConcern); - auto const db = scopedDbStatus.getValue().db(); + auto& dbInfo = dbInfoStatus.getValue(); // Drop the database's collections from metadata for (const auto& nss : getAllShardedCollectionsForDb(opCtx, dbname)) { uassertStatusOK(catalogClient->dropCollection(opCtx, nss)); - db->markNSNotSharded(nss.ns()); } // Drop the database from the primary shard first - _dropDatabaseFromShard(opCtx, db->getPrimaryId(), dbname); + _dropDatabaseFromShard(opCtx, dbInfo.primaryId(), dbname); // Drop the database from each of the remaining shards { @@ -146,7 +145,7 @@ public: } // Invalidate the database so the next access will do a full reload - Grid::get(opCtx)->catalogCache()->invalidate(dbname); + catalogCache->purgeDatabase(dbname); catalogClient->logChange( opCtx, "dropDatabase", dbname, BSONObj(), ShardingCatalogClient::kMajorityWriteConcern); diff --git a/src/mongo/s/commands/cluster_enable_sharding_cmd.cpp b/src/mongo/s/commands/cluster_enable_sharding_cmd.cpp index 1db7ea7ef03..64537920cb7 100644 --- a/src/mongo/s/commands/cluster_enable_sharding_cmd.cpp +++ b/src/mongo/s/commands/cluster_enable_sharding_cmd.cpp @@ -41,7 +41,6 @@ #include "mongo/db/commands.h" #include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/catalog_cache.h" -#include "mongo/s/config.h" #include "mongo/s/grid.h" #include "mongo/util/log.h" @@ -109,7 +108,7 @@ public: audit::logEnableSharding(Client::getCurrent(), dbname); // Make sure to force update of any stale metadata - Grid::get(opCtx)->catalogCache()->invalidate(dbname); + Grid::get(opCtx)->catalogCache()->purgeDatabase(dbname); return true; } diff --git a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp index 578968205af..feae1fab5e2 100644 --- a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp +++ b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp @@ -40,7 +40,6 @@ #include "mongo/db/query/collation/collator_factory_interface.h" #include "mongo/s/balancer_configuration.h" #include "mongo/s/catalog_cache.h" -#include "mongo/s/chunk_manager.h" #include "mongo/s/client/shard_connection.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/commands/cluster_explain.h" @@ -48,7 +47,6 @@ #include "mongo/s/commands/sharded_command_processing.h" #include "mongo/s/commands/strategy.h" #include "mongo/s/grid.h" -#include "mongo/s/sharding_raii.h" #include "mongo/s/stale_exception.h" #include "mongo/util/timer.h" @@ -63,48 +61,42 @@ class FindAndModifyCmd : public Command { public: FindAndModifyCmd() : Command("findAndModify", false, "findandmodify") {} - virtual bool slaveOk() const { + bool slaveOk() const override { return true; } - virtual bool adminOnly() const { + bool adminOnly() const override { return false; } - - virtual bool supportsWriteConcern(const BSONObj& cmd) const override { + bool supportsWriteConcern(const BSONObj& cmd) const override { return true; } - virtual void addRequiredPrivileges(const std::string& dbname, - const BSONObj& cmdObj, - std::vector<Privilege>* out) { + void addRequiredPrivileges(const std::string& dbname, + const BSONObj& cmdObj, + std::vector<Privilege>* out) override { find_and_modify::addPrivilegesRequiredForFindAndModify(this, dbname, cmdObj, out); } - virtual Status explain(OperationContext* opCtx, - const std::string& dbName, - const BSONObj& cmdObj, - ExplainCommon::Verbosity verbosity, - const rpc::ServerSelectionMetadata& serverSelectionMetadata, - BSONObjBuilder* out) const { - const NamespaceString nss = parseNsCollectionRequired(dbName, cmdObj); + Status explain(OperationContext* opCtx, + const std::string& dbName, + const BSONObj& cmdObj, + ExplainCommon::Verbosity verbosity, + const rpc::ServerSelectionMetadata& serverSelectionMetadata, + BSONObjBuilder* out) const override { + const NamespaceString nss(parseNsCollectionRequired(dbName, cmdObj)); - auto scopedDB = uassertStatusOK(ScopedShardDatabase::getExisting(opCtx, dbName)); - const auto conf = scopedDB.db(); + auto routingInfo = + uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss)); shared_ptr<ChunkManager> chunkMgr; shared_ptr<Shard> shard; - if (!conf->isSharded(nss.ns())) { - auto shardStatus = - Grid::get(opCtx)->shardRegistry()->getShard(opCtx, conf->getPrimaryId()); - if (!shardStatus.isOK()) { - return shardStatus.getStatus(); - } - shard = shardStatus.getValue(); + if (!routingInfo.cm()) { + shard = routingInfo.primary(); } else { - chunkMgr = _getChunkManager(opCtx, conf, nss); + chunkMgr = routingInfo.cm(); const BSONObj query = cmdObj.getObjectField("query"); @@ -118,7 +110,7 @@ public: return collationElementStatus; } - StatusWith<BSONObj> status = _getShardKey(opCtx, chunkMgr, query); + StatusWith<BSONObj> status = _getShardKey(opCtx, *chunkMgr, query); if (!status.isOK()) { return status.getStatus(); } @@ -131,6 +123,7 @@ public: if (!shardStatus.isOK()) { return shardStatus.getStatus(); } + shard = shardStatus.getValue(); } @@ -143,7 +136,7 @@ public: Timer timer; BSONObjBuilder result; - bool ok = _runCommand(opCtx, conf, chunkMgr, shard->getId(), nss, explainCmd.obj(), result); + bool ok = _runCommand(opCtx, chunkMgr, shard->getId(), nss, explainCmd.obj(), result); long long millisElapsed = timer.millis(); if (!ok) { @@ -164,24 +157,23 @@ public: opCtx, shardResults, ClusterExplain::kSingleShard, millisElapsed, out); } - virtual bool run(OperationContext* opCtx, - const std::string& dbName, - BSONObj& cmdObj, - int options, - std::string& errmsg, - BSONObjBuilder& result) { + bool run(OperationContext* opCtx, + const std::string& dbName, + BSONObj& cmdObj, + int options, + std::string& errmsg, + BSONObjBuilder& result) override { const NamespaceString nss = parseNsCollectionRequired(dbName, cmdObj); // findAndModify should only be creating database if upsert is true, but this would require // that the parsing be pulled into this function. - auto scopedDb = uassertStatusOK(ScopedShardDatabase::getOrCreate(opCtx, dbName)); - const auto conf = scopedDb.db(); - - if (!conf->isSharded(nss.ns())) { - return _runCommand(opCtx, conf, nullptr, conf->getPrimaryId(), nss, cmdObj, result); + auto routingInfo = + uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss)); + if (!routingInfo.cm()) { + return _runCommand(opCtx, nullptr, routingInfo.primaryId(), nss, cmdObj, result); } - shared_ptr<ChunkManager> chunkMgr = _getChunkManager(opCtx, conf, nss); + const auto chunkMgr = routingInfo.cm(); const BSONObj query = cmdObj.getObjectField("query"); @@ -195,17 +187,11 @@ public: return appendCommandStatus(result, collationElementStatus); } - StatusWith<BSONObj> status = _getShardKey(opCtx, chunkMgr, query); - if (!status.isOK()) { - // Bad query - return appendCommandStatus(result, status.getStatus()); - } + BSONObj shardKey = uassertStatusOK(_getShardKey(opCtx, *chunkMgr, query)); - BSONObj shardKey = status.getValue(); auto chunk = chunkMgr->findIntersectingChunk(shardKey, collation); - const bool ok = - _runCommand(opCtx, conf, chunkMgr, chunk->getShardId(), nss, cmdObj, result); + const bool ok = _runCommand(opCtx, chunkMgr, chunk->getShardId(), nss, cmdObj, result); if (ok) { updateChunkWriteStatsAndSplitIfNeeded( opCtx, chunkMgr.get(), chunk.get(), cmdObj.getObjectField("update").objsize()); @@ -215,21 +201,12 @@ public: } private: - shared_ptr<ChunkManager> _getChunkManager(OperationContext* opCtx, - DBConfig* conf, - const NamespaceString& nss) const { - shared_ptr<ChunkManager> chunkMgr = conf->getChunkManager(opCtx, nss.ns()); - massert(13002, "shard internal error chunk manager should never be null", chunkMgr); - - return chunkMgr; - } - - StatusWith<BSONObj> _getShardKey(OperationContext* opCtx, - shared_ptr<ChunkManager> chunkMgr, - const BSONObj& query) const { + static StatusWith<BSONObj> _getShardKey(OperationContext* opCtx, + const ChunkManager& chunkMgr, + const BSONObj& query) { // Verify that the query has an equality predicate using the shard key StatusWith<BSONObj> status = - chunkMgr->getShardKeyPattern().extractShardKeyFromQuery(opCtx, query); + chunkMgr.getShardKeyPattern().extractShardKeyFromQuery(opCtx, query); if (!status.isOK()) { return status; @@ -245,20 +222,19 @@ private: return shardKey; } - bool _runCommand(OperationContext* opCtx, - DBConfig* conf, - shared_ptr<ChunkManager> chunkManager, - const ShardId& shardId, - const NamespaceString& nss, - const BSONObj& cmdObj, - BSONObjBuilder& result) const { + static bool _runCommand(OperationContext* opCtx, + shared_ptr<ChunkManager> chunkManager, + const ShardId& shardId, + const NamespaceString& nss, + const BSONObj& cmdObj, + BSONObjBuilder& result) { BSONObj res; const auto shard = uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, shardId)); ShardConnection conn(shard->getConnString(), nss.ns(), chunkManager); - bool ok = conn->runCommand(conf->name(), cmdObj, res); + bool ok = conn->runCommand(nss.db().toString(), cmdObj, res); conn.done(); // ErrorCodes::RecvStaleConfig is the code for RecvStaleConfigException. diff --git a/src/mongo/s/commands/cluster_flush_router_config_cmd.cpp b/src/mongo/s/commands/cluster_flush_router_config_cmd.cpp index 35150ac3aca..8931166863c 100644 --- a/src/mongo/s/commands/cluster_flush_router_config_cmd.cpp +++ b/src/mongo/s/commands/cluster_flush_router_config_cmd.cpp @@ -70,7 +70,7 @@ public: int options, std::string& errmsg, BSONObjBuilder& result) { - Grid::get(opCtx)->catalogCache()->invalidateAll(); + Grid::get(opCtx)->catalogCache()->purgeAllDatabases(); result.appendBool("flushed", true); return true; diff --git a/src/mongo/s/commands/cluster_get_shard_version_cmd.cpp b/src/mongo/s/commands/cluster_get_shard_version_cmd.cpp index 00e104c1e45..2bb23453bba 100644 --- a/src/mongo/s/commands/cluster_get_shard_version_cmd.cpp +++ b/src/mongo/s/commands/cluster_get_shard_version_cmd.cpp @@ -35,8 +35,9 @@ #include "mongo/db/auth/authorization_manager.h" #include "mongo/db/auth/authorization_session.h" #include "mongo/db/commands.h" +#include "mongo/s/catalog_cache.h" +#include "mongo/s/commands/cluster_commands_common.h" #include "mongo/s/grid.h" -#include "mongo/s/sharding_raii.h" #include "mongo/util/log.h" namespace mongo { @@ -86,13 +87,10 @@ public: BSONObjBuilder& result) override { const NamespaceString nss(parseNs(dbname, cmdObj)); - auto scopedDb = uassertStatusOK(ScopedShardDatabase::getExisting(opCtx, nss.db())); - auto config = scopedDb.db(); + auto routingInfo = getShardedCollection(opCtx, nss); + const auto cm = routingInfo.cm(); - auto cm = config->getChunkManagerIfExists(opCtx, nss.ns()); - uassert(ErrorCodes::NamespaceNotSharded, "ns [" + nss.ns() + " is not sharded.", cm); - - for (const auto& cmEntry : cm->getChunkMap()) { + for (const auto& cmEntry : cm->chunkMap()) { log() << redact(cmEntry.second->toString()); } diff --git a/src/mongo/s/commands/cluster_map_reduce_cmd.cpp b/src/mongo/s/commands/cluster_map_reduce_cmd.cpp index 088b8d6d4d1..b155d322b3e 100644 --- a/src/mongo/s/commands/cluster_map_reduce_cmd.cpp +++ b/src/mongo/s/commands/cluster_map_reduce_cmd.cpp @@ -45,27 +45,17 @@ #include "mongo/s/catalog/dist_lock_manager.h" #include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/catalog_cache.h" -#include "mongo/s/chunk_manager.h" #include "mongo/s/client/shard_connection.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/commands/cluster_commands_common.h" #include "mongo/s/commands/cluster_write.h" #include "mongo/s/commands/sharded_command_processing.h" #include "mongo/s/commands/strategy.h" -#include "mongo/s/config.h" #include "mongo/s/grid.h" -#include "mongo/s/sharding_raii.h" #include "mongo/stdx/chrono.h" #include "mongo/util/log.h" namespace mongo { - -using std::shared_ptr; -using std::map; -using std::set; -using std::string; -using std::vector; - namespace { AtomicUInt32 JOB_NUMBER; @@ -75,7 +65,7 @@ const Milliseconds kNoDistLockTimeout(-1); /** * Generates a unique name for the temporary M/R output collection. */ -string getTmpName(StringData coll) { +std::string getTmpName(StringData coll) { return str::stream() << "tmp.mrs." << coll << "_" << time(0) << "_" << JOB_NUMBER.fetchAndAdd(1); } @@ -85,14 +75,14 @@ string getTmpName(StringData coll) { * be sent to the shards as part of the first phase of map/reduce. */ BSONObj fixForShards(const BSONObj& orig, - const string& output, - string& badShardedField, + const std::string& output, + std::string& badShardedField, int maxChunkSizeBytes) { BSONObjBuilder b; BSONObjIterator i(orig); while (i.more()) { BSONElement e = i.next(); - const string fn = e.fieldName(); + const std::string fn = e.fieldName(); if (fn == bypassDocumentValidationCommandOption() || fn == "map" || fn == "mapreduce" || fn == "mapReduce" || fn == "mapparams" || fn == "reduce" || fn == "query" || @@ -160,47 +150,49 @@ class MRCmd : public Command { public: MRCmd() : Command("mapReduce", false, "mapreduce") {} - virtual bool slaveOk() const { + bool slaveOk() const override { return true; } - virtual bool adminOnly() const { + bool adminOnly() const override { return false; } - virtual bool supportsWriteConcern(const BSONObj& cmd) const override { + std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const override { + return parseNsCollectionRequired(dbname, cmdObj).ns(); + } + + bool supportsWriteConcern(const BSONObj& cmd) const override { return mr::mrSupportsWriteConcern(cmd); } - virtual void help(std::stringstream& help) const { + void help(std::stringstream& help) const override { help << "Runs the sharded map/reduce command"; } - virtual void addRequiredPrivileges(const std::string& dbname, - const BSONObj& cmdObj, - std::vector<Privilege>* out) { + void addRequiredPrivileges(const std::string& dbname, + const BSONObj& cmdObj, + std::vector<Privilege>* out) override { mr::addPrivilegesRequiredForMapReduce(this, dbname, cmdObj, out); } - virtual bool run(OperationContext* opCtx, - const std::string& dbname, - BSONObj& cmdObj, - int options, - std::string& errmsg, - BSONObjBuilder& result) { + bool run(OperationContext* opCtx, + const std::string& dbname, + BSONObj& cmdObj, + int options, + std::string& errmsg, + BSONObjBuilder& result) override { Timer t; const NamespaceString nss(parseNs(dbname, cmdObj)); - uassert(ErrorCodes::InvalidNamespace, "Invalid namespace", nss.isValid()); - - const string shardResultCollection = getTmpName(nss.coll()); + const std::string shardResultCollection = getTmpName(nss.coll()); bool shardedOutput = false; - NamespaceString outputCollNss; bool customOutDB = false; + NamespaceString outputCollNss; bool inlineOutput = false; - string outDB = dbname; + std::string outDB = dbname; BSONElement outElmt = cmdObj.getField("out"); if (outElmt.type() == Object) { @@ -218,7 +210,8 @@ public: !customOut.hasField("db")); } else { // Mode must be 1st element - const string finalColShort = customOut.firstElement().str(); + const std::string finalColShort = customOut.firstElement().str(); + if (customOut.hasField("db")) { customOutDB = true; outDB = customOut.getField("db").str(); @@ -231,44 +224,27 @@ public: } } - // Ensure the input database exists - auto status = Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, dbname); - if (!status.isOK()) { - return appendCommandStatus(result, status.getStatus()); - } + auto const catalogCache = Grid::get(opCtx)->catalogCache(); - shared_ptr<DBConfig> confIn = status.getValue(); + // Ensure the input database exists and set up the input collection + auto inputRoutingInfo = uassertStatusOK(catalogCache->getCollectionRoutingInfo(opCtx, nss)); - shared_ptr<DBConfig> confOut; - if (customOutDB) { - // Create the output database implicitly, since we have a custom output requested - auto scopedDb = uassertStatusOK(ScopedShardDatabase::getOrCreate(opCtx, outDB)); - confOut = scopedDb.getSharedDbReference(); - } else { - confOut = confIn; - } + const bool shardedInput = inputRoutingInfo.cm() != nullptr; - if (confOut->getPrimaryId() == "config" && !inlineOutput) { - return appendCommandStatus( - result, - Status(ErrorCodes::CommandNotSupported, - str::stream() << "Can not execute mapReduce with output database " << outDB - << " which lives on config servers")); + // Create the output database implicitly if we have a custom output requested + if (customOutDB) { + uassertStatusOK(createShardDatabase(opCtx, outDB)); } - const bool shardedInput = confIn && confIn->isSharded(nss.ns()); - - if (!shardedOutput) { - uassert(15920, - "Cannot output to a non-sharded collection because " - "sharded collection exists already", - !confOut->isSharded(outputCollNss.ns())); - - // TODO: Should we also prevent going from non-sharded to sharded? During the - // transition client may see partial data. - } + // Ensure that the output database doesn't reside on the config server + auto outputDbInfo = uassertStatusOK(catalogCache->getDatabase(opCtx, outDB)); + uassert(ErrorCodes::CommandNotSupported, + str::stream() << "Can not execute mapReduce with output database " << outDB + << " which lives on config servers", + inlineOutput || outputDbInfo.primaryId() != "config"); int64_t maxChunkSizeBytes = 0; + if (shardedOutput) { // Will need to figure out chunks, ask shards for points maxChunkSizeBytes = cmdObj["maxChunkSizeBytes"].numberLong(); @@ -279,29 +255,40 @@ public: // maxChunkSizeBytes is sent as int BSON field invariant(maxChunkSizeBytes < std::numeric_limits<int>::max()); + } else if (outputCollNss.isValid()) { + auto outputRoutingInfo = + uassertStatusOK(catalogCache->getCollectionRoutingInfo(opCtx, outputCollNss)); + + uassert(15920, + "Cannot output to a non-sharded collection because " + "sharded collection exists already", + !outputRoutingInfo.cm()); + + // TODO: Should we also prevent going from non-sharded to sharded? During the + // transition client may see partial data. } const auto shardRegistry = Grid::get(opCtx)->shardRegistry(); // modify command to run on shards with output to tmp collection - string badShardedField; + std::string badShardedField; BSONObj shardedCommand = fixForShards(cmdObj, shardResultCollection, badShardedField, maxChunkSizeBytes); if (!shardedInput && !shardedOutput && !customOutDB) { LOG(1) << "simple MR, just passthrough"; - const auto shard = - uassertStatusOK(shardRegistry->getShard(opCtx, confIn->getPrimaryId())); + invariant(inputRoutingInfo.primary()); - ShardConnection conn(shard->getConnString(), ""); + ShardConnection conn(inputRoutingInfo.primary()->getConnString(), ""); BSONObj res; bool ok = conn->runCommand(dbname, cmdObj, res); conn.done(); if (auto wcErrorElem = res["writeConcernError"]) { - appendWriteConcernErrorToCmdResponse(shard->getId(), wcErrorElem, result); + appendWriteConcernErrorToCmdResponse( + inputRoutingInfo.primary()->getId(), wcErrorElem, result); } result.appendElementsUnique(res); @@ -323,12 +310,13 @@ public: collation = cmdObj["collation"].embeddedObjectUserCheck(); } - set<string> servers; - vector<Strategy::CommandResult> mrCommandResults; + std::set<std::string> servers; + std::vector<Strategy::CommandResult> mrCommandResults; BSONObjBuilder shardResultsB; BSONObjBuilder shardCountsB; - map<string, int64_t> countsMap; + std::map<std::string, int64_t> countsMap; + auto splitPts = SimpleBSONObjComparator::kInstance.makeBSONObjSet(); { @@ -349,12 +337,12 @@ public: for (const auto& mrResult : mrCommandResults) { // Need to gather list of all servers even if an error happened - string server; - { + const auto server = [&]() { const auto shard = uassertStatusOK(shardRegistry->getShard(opCtx, mrResult.shardTargetId)); - server = shard->getConnString().toString(); - } + return shard->getConnString().toString(); + }(); + servers.insert(server); if (!ok) { @@ -386,15 +374,14 @@ public: if (singleResult.hasField("splitKeys")) { BSONElement splitKeys = singleResult.getField("splitKeys"); - vector<BSONElement> pts = splitKeys.Array(); - for (vector<BSONElement>::iterator it = pts.begin(); it != pts.end(); ++it) { - splitPts.insert(it->Obj().getOwned()); + for (const auto& splitPt : splitKeys.Array()) { + splitPts.insert(splitPt.Obj().getOwned()); } } } if (!ok) { - _cleanUp(servers, dbname, shardResultCollection); + cleanUp(servers, dbname, shardResultCollection); // Add "code" to the top-level response, if the failure of the sharded command // can be accounted to a single error. @@ -442,16 +429,15 @@ public: bool ok = true; BSONObj singleResult; - bool hasWCError = false; if (!shardedOutput) { - const auto shard = - uassertStatusOK(shardRegistry->getShard(opCtx, confOut->getPrimaryId())); + LOG(1) << "MR with single shard output, NS=" << outputCollNss + << " primary=" << outputDbInfo.primaryId(); - LOG(1) << "MR with single shard output, NS=" << outputCollNss.ns() - << " primary=" << shard->toString(); + const auto outputShard = + uassertStatusOK(shardRegistry->getShard(opCtx, outputDbInfo.primaryId())); - ShardConnection conn(shard->getConnString(), outputCollNss.ns()); + ShardConnection conn(outputShard->getConnString(), outputCollNss.ns()); ok = conn->runCommand(outDB, finalCmd.obj(), singleResult); BSONObj counts = singleResult.getObjectField("counts"); @@ -460,79 +446,19 @@ public: outputCount = counts.getIntField("output"); conn.done(); - if (!hasWCError) { - if (auto wcErrorElem = singleResult["writeConcernError"]) { - appendWriteConcernErrorToCmdResponse(shard->getId(), wcErrorElem, result); - hasWCError = true; - } + + if (auto wcErrorElem = singleResult["writeConcernError"]) { + appendWriteConcernErrorToCmdResponse(outputShard->getId(), wcErrorElem, result); } } else { LOG(1) << "MR with sharded output, NS=" << outputCollNss.ns(); - // Create the sharded collection if needed - if (!confOut->isSharded(outputCollNss.ns())) { - // Enable sharding on the output db - Status status = Grid::get(opCtx)->catalogClient(opCtx)->enableSharding( - opCtx, outputCollNss.db().toString()); - - // If the database has sharding already enabled, we can ignore the error - if (status.isOK()) { - // Invalidate the output database so it gets reloaded on the next fetch attempt - Grid::get(opCtx)->catalogCache()->invalidate(outputCollNss.db()); - } else if (status != ErrorCodes::AlreadyInitialized) { - uassertStatusOK(status); - } - - confOut.reset(); - confOut = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getDatabase( - opCtx, outputCollNss.db().toString())); + auto outputRoutingInfo = + uassertStatusOK(catalogCache->getCollectionRoutingInfo(opCtx, outputCollNss)); - // Shard collection according to split points - vector<BSONObj> sortedSplitPts; - - // Points will be properly sorted using the set - for (const auto& splitPt : splitPts) { - sortedSplitPts.push_back(splitPt); - } - - // Pre-split the collection onto all the shards for this database. Note that - // it's not completely safe to pre-split onto non-primary shards using the - // shardcollection method (a conflict may result if multiple map-reduces are - // writing to the same output collection, for instance). - // - // TODO: pre-split mapReduce output in a safer way. - - const std::set<ShardId> outShardIds = [&]() { - std::vector<ShardId> shardIds; - shardRegistry->getAllShardIds(&shardIds); - uassert(ErrorCodes::ShardNotFound, - str::stream() - << "Unable to find shards on which to place output collection " - << outputCollNss.ns(), - !shardIds.empty()); - - return std::set<ShardId>(shardIds.begin(), shardIds.end()); - }(); - - - BSONObj sortKey = BSON("_id" << 1); - ShardKeyPattern sortKeyPattern(sortKey); - - // The collection default collation for the output collection. This is empty, - // representing the simple binary comparison collation. - BSONObj defaultCollation; - - uassertStatusOK( - Grid::get(opCtx)->catalogClient(opCtx)->shardCollection(opCtx, - outputCollNss.ns(), - sortKeyPattern, - defaultCollation, - true, - sortedSplitPts, - outShardIds)); - - // Make sure the cached metadata for the collection knows that we are now sharded - confOut->getChunkManager(opCtx, outputCollNss.ns(), true /* reload */); + // Create the sharded collection if needed + if (!outputRoutingInfo.cm()) { + outputRoutingInfo = createShardedOutputCollection(opCtx, outputCollNss, splitPts); } auto chunkSizes = SimpleBSONObjComparator::kInstance.makeBSONObjIndexedMap<int>(); @@ -567,14 +493,16 @@ public: throw; } + bool hasWCError = false; + for (const auto& mrResult : mrCommandResults) { - string server; - { + const auto server = [&]() { const auto shard = uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard( opCtx, mrResult.shardTargetId)); - server = shard->getConnString().toString(); - } + return shard->getConnString().toString(); + }(); + singleResult = mrResult.result; if (!hasWCError) { if (auto wcErrorElem = singleResult["writeConcernError"]) { @@ -597,7 +525,8 @@ public: // get the size inserted for each chunk // split cannot be called here since we already have the distributed lock if (singleResult.hasField("chunkSizes")) { - vector<BSONElement> sizes = singleResult.getField("chunkSizes").Array(); + std::vector<BSONElement> sizes = + singleResult.getField("chunkSizes").Array(); for (unsigned int i = 0; i < sizes.size(); i += 2) { BSONObj key = sizes[i].Obj().getOwned(); const long long size = sizes[i + 1].numberLong(); @@ -610,34 +539,37 @@ public: } // Do the splitting round - shared_ptr<ChunkManager> cm = - confOut->getChunkManagerIfExists(opCtx, outputCollNss.ns()); + catalogCache->onStaleConfigError(std::move(outputRoutingInfo)); + outputRoutingInfo = + uassertStatusOK(catalogCache->getCollectionRoutingInfo(opCtx, outputCollNss)); uassert(34359, str::stream() << "Failed to write mapreduce output to " << outputCollNss.ns() << "; expected that collection to be sharded, but it was not", - cm); + outputRoutingInfo.cm()); + + const auto outputCM = outputRoutingInfo.cm(); for (const auto& chunkSize : chunkSizes) { BSONObj key = chunkSize.first; const int size = chunkSize.second; invariant(size < std::numeric_limits<int>::max()); - // key reported should be the chunk's minimum - shared_ptr<Chunk> c = cm->findIntersectingChunkWithSimpleCollation(key); + // Key reported should be the chunk's minimum + auto c = outputCM->findIntersectingChunkWithSimpleCollation(key); if (!c) { warning() << "Mongod reported " << size << " bytes inserted for key " << key << " but can't find chunk"; } else { - updateChunkWriteStatsAndSplitIfNeeded(opCtx, cm.get(), c.get(), size); + updateChunkWriteStatsAndSplitIfNeeded(opCtx, outputCM.get(), c.get(), size); } } } - _cleanUp(servers, dbname, shardResultCollection); + cleanUp(servers, dbname, shardResultCollection); if (!ok) { errmsg = str::stream() << "MR post processing failed: " << singleResult.toString(); - return 0; + return false; } // copy some elements from a single result @@ -672,9 +604,69 @@ public: private: /** + * Creates and shards the collection for the output results. + */ + static CachedCollectionRoutingInfo createShardedOutputCollection(OperationContext* opCtx, + const NamespaceString& nss, + const BSONObjSet& splitPts) { + auto const catalogClient = Grid::get(opCtx)->catalogClient(opCtx); + auto const catalogCache = Grid::get(opCtx)->catalogCache(); + auto const shardRegistry = Grid::get(opCtx)->shardRegistry(); + + // Enable sharding on the output db + Status status = catalogClient->enableSharding(opCtx, nss.db().toString()); + + // If the database has sharding already enabled, we can ignore the error + if (status.isOK()) { + // Invalidate the output database so it gets reloaded on the next fetch attempt + catalogCache->purgeDatabase(nss.db()); + } else if (status != ErrorCodes::AlreadyInitialized) { + uassertStatusOK(status); + } + + // Points will be properly sorted using the set + const std::vector<BSONObj> sortedSplitPts(splitPts.begin(), splitPts.end()); + + // Pre-split the collection onto all the shards for this database. Note that + // it's not completely safe to pre-split onto non-primary shards using the + // shardcollection method (a conflict may result if multiple map-reduces are + // writing to the same output collection, for instance). + // + // TODO: pre-split mapReduce output in a safer way. + + const std::set<ShardId> outShardIds = [&]() { + std::vector<ShardId> shardIds; + shardRegistry->getAllShardIds(&shardIds); + uassert(ErrorCodes::ShardNotFound, + str::stream() << "Unable to find shards on which to place output collection " + << nss.ns(), + !shardIds.empty()); + + return std::set<ShardId>(shardIds.begin(), shardIds.end()); + }(); + + + BSONObj sortKey = BSON("_id" << 1); + ShardKeyPattern sortKeyPattern(sortKey); + + // The collection default collation for the output collection. This is empty, + // representing the simple binary comparison collation. + BSONObj defaultCollation; + + uassertStatusOK(Grid::get(opCtx)->catalogClient(opCtx)->shardCollection( + opCtx, nss.ns(), sortKeyPattern, defaultCollation, true, sortedSplitPts, outShardIds)); + + // Make sure the cached metadata for the collection knows that we are now sharded + catalogCache->invalidateShardedCollection(nss); + return uassertStatusOK(catalogCache->getCollectionRoutingInfo(opCtx, nss)); + } + + /** * Drops the temporary results collections from each shard. */ - void _cleanUp(const set<string>& servers, string dbName, string shardResultCollection) { + static void cleanUp(const std::set<std::string>& servers, + const std::string& dbName, + const std::string& shardResultCollection) { try { // drop collections with tmp results on each shard for (const auto& server : servers) { diff --git a/src/mongo/s/commands/cluster_merge_chunks_cmd.cpp b/src/mongo/s/commands/cluster_merge_chunks_cmd.cpp index 6b247823381..2a85b645df0 100644 --- a/src/mongo/s/commands/cluster_merge_chunks_cmd.cpp +++ b/src/mongo/s/commands/cluster_merge_chunks_cmd.cpp @@ -37,12 +37,10 @@ #include "mongo/db/namespace_string.h" #include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/catalog_cache.h" -#include "mongo/s/chunk_manager.h" #include "mongo/s/client/shard_connection.h" #include "mongo/s/client/shard_registry.h" -#include "mongo/s/config.h" +#include "mongo/s/commands/cluster_commands_common.h" #include "mongo/s/grid.h" -#include "mongo/s/sharding_raii.h" namespace mongo { @@ -60,14 +58,14 @@ class ClusterMergeChunksCommand : public Command { public: ClusterMergeChunksCommand() : Command("mergeChunks") {} - virtual void help(stringstream& h) const { + void help(stringstream& h) const override { h << "Merge Chunks command\n" << "usage: { mergeChunks : <ns>, bounds : [ <min key>, <max key> ] }"; } - virtual Status checkAuthForCommand(Client* client, - const std::string& dbname, - const BSONObj& cmdObj) { + Status checkAuthForCommand(Client* client, + const std::string& dbname, + const BSONObj& cmdObj) override { if (!AuthorizationSession::get(client)->isAuthorizedForActionsOnResource( ResourcePattern::forExactNamespace(NamespaceString(parseNs(dbname, cmdObj))), ActionType::splitChunk)) { @@ -76,17 +74,19 @@ public: return Status::OK(); } - virtual std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const { + std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const override { return parseNsFullyQualified(dbname, cmdObj); } - virtual bool adminOnly() const { + bool adminOnly() const override { return true; } - virtual bool slaveOk() const { + + bool slaveOk() const override { return false; } - virtual bool supportsWriteConcern(const BSONObj& cmd) const override { + + bool supportsWriteConcern(const BSONObj& cmd) const override { return false; } @@ -104,10 +104,13 @@ public: BSONObj& cmdObj, int, string& errmsg, - BSONObjBuilder& result) { + BSONObjBuilder& result) override { const NamespaceString nss(parseNs(dbname, cmdObj)); - auto scopedCM = uassertStatusOK(ScopedChunkManager::refreshAndGet(opCtx, nss)); + auto routingInfo = uassertStatusOK( + Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh(opCtx, + nss)); + const auto cm = routingInfo.cm(); vector<BSONObj> bounds; if (!FieldParser::extract(cmdObj, boundsField, &bounds, &errmsg)) { @@ -137,8 +140,6 @@ public: return false; } - auto const cm = scopedCM.cm(); - if (!cm->getShardKeyPattern().isShardKey(minKey) || !cm->getShardKeyPattern().isShardKey(maxKey)) { errmsg = stream() << "shard key bounds " @@ -179,6 +180,8 @@ public: bool ok = conn->runCommand("admin", remoteCmdObjB.obj(), remoteResult); conn.done(); + Grid::get(opCtx)->catalogCache()->onStaleConfigError(std::move(routingInfo)); + result.appendElements(remoteResult); return ok; } diff --git a/src/mongo/s/commands/cluster_move_chunk_cmd.cpp b/src/mongo/s/commands/cluster_move_chunk_cmd.cpp index c3cb18ceb15..e597e80eeaa 100644 --- a/src/mongo/s/commands/cluster_move_chunk_cmd.cpp +++ b/src/mongo/s/commands/cluster_move_chunk_cmd.cpp @@ -40,12 +40,11 @@ #include "mongo/db/write_concern_options.h" #include "mongo/s/balancer_configuration.h" #include "mongo/s/catalog_cache.h" -#include "mongo/s/client/shard_connection.h" #include "mongo/s/client/shard_registry.h" +#include "mongo/s/commands/cluster_commands_common.h" #include "mongo/s/config_server_client.h" #include "mongo/s/grid.h" #include "mongo/s/migration_secondary_throttle_options.h" -#include "mongo/s/sharding_raii.h" #include "mongo/util/log.h" #include "mongo/util/timer.h" @@ -60,19 +59,19 @@ class MoveChunkCmd : public Command { public: MoveChunkCmd() : Command("moveChunk", false, "movechunk") {} - virtual bool slaveOk() const { + bool slaveOk() const override { return true; } - virtual bool adminOnly() const { + bool adminOnly() const override { return true; } - virtual bool supportsWriteConcern(const BSONObj& cmd) const override { + bool supportsWriteConcern(const BSONObj& cmd) const override { return true; } - virtual void help(std::stringstream& help) const { + void help(std::stringstream& help) const override { help << "Example: move chunk that contains the doc {num : 7} to shard001\n" << " { movechunk : 'test.foo' , find : { num : 7 } , to : 'shard0001' }\n" << "Example: move chunk with lower bound 0 and upper bound 10 to shard001\n" @@ -80,9 +79,9 @@ public: << " , to : 'shard001' }\n"; } - virtual Status checkAuthForCommand(Client* client, - const std::string& dbname, - const BSONObj& cmdObj) { + Status checkAuthForCommand(Client* client, + const std::string& dbname, + const BSONObj& cmdObj) override { if (!AuthorizationSession::get(client)->isAuthorizedForActionsOnResource( ResourcePattern::forExactNamespace(NamespaceString(parseNs(dbname, cmdObj))), ActionType::moveChunk)) { @@ -92,21 +91,24 @@ public: return Status::OK(); } - virtual std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const { + std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const override { return parseNsFullyQualified(dbname, cmdObj); } - virtual bool run(OperationContext* opCtx, - const std::string& dbname, - BSONObj& cmdObj, - int options, - std::string& errmsg, - BSONObjBuilder& result) { + bool run(OperationContext* opCtx, + const std::string& dbname, + BSONObj& cmdObj, + int options, + std::string& errmsg, + BSONObjBuilder& result) override { Timer t; const NamespaceString nss(parseNs(dbname, cmdObj)); - auto scopedCM = uassertStatusOK(ScopedChunkManager::refreshAndGet(opCtx, nss)); + auto routingInfo = uassertStatusOK( + Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh(opCtx, + nss)); + const auto cm = routingInfo.cm(); const auto toElt = cmdObj["to"]; uassert(ErrorCodes::TypeMismatch, @@ -145,8 +147,6 @@ public: return false; } - auto const cm = scopedCM.cm(); - shared_ptr<Chunk> chunk; if (!find.isEmpty()) { @@ -199,9 +199,7 @@ public: secondaryThrottle, cmdObj["_waitForDelete"].trueValue())); - // Proactively refresh the chunk manager. Not strictly necessary, but this way it's - // immediately up-to-date the next time it's used. - scopedCM.db()->getChunkManagerIfExists(opCtx, nss.ns(), true); + Grid::get(opCtx)->catalogCache()->onStaleConfigError(std::move(routingInfo)); result.append("millis", t.millis()); return true; diff --git a/src/mongo/s/commands/cluster_move_primary_cmd.cpp b/src/mongo/s/commands/cluster_move_primary_cmd.cpp index dc192f1a6a6..e1746bebd72 100644 --- a/src/mongo/s/commands/cluster_move_primary_cmd.cpp +++ b/src/mongo/s/commands/cluster_move_primary_cmd.cpp @@ -49,7 +49,6 @@ #include "mongo/s/client/shard_registry.h" #include "mongo/s/commands/cluster_commands_common.h" #include "mongo/s/commands/sharded_command_processing.h" -#include "mongo/s/config.h" #include "mongo/s/grid.h" #include "mongo/util/log.h" @@ -125,9 +124,9 @@ public: auto const shardRegistry = Grid::get(opCtx)->shardRegistry(); // Flush all cached information. This can't be perfect, but it's better than nothing. - catalogCache->invalidate(dbname); + catalogCache->purgeDatabase(dbname); - auto config = uassertStatusOK(catalogCache->getDatabase(opCtx, dbname)); + auto dbInfo = uassertStatusOK(catalogCache->getDatabase(opCtx, dbname)); const auto toElt = cmdObj["to"]; uassert(ErrorCodes::TypeMismatch, @@ -139,8 +138,7 @@ public: return false; } - const auto fromShard = - uassertStatusOK(shardRegistry->getShard(opCtx, config->getPrimaryId())); + const auto fromShard = uassertStatusOK(shardRegistry->getShard(opCtx, dbInfo.primaryId())); const auto toShard = [&]() { auto toShardStatus = shardRegistry->getShard(opCtx, to); @@ -223,7 +221,7 @@ public: // Ensure the next attempt to retrieve the database or any of its collections will do a full // reload - catalogCache->invalidate(dbname); + catalogCache->purgeDatabase(dbname); const string oldPrimary = fromShard->getConnString().toString(); diff --git a/src/mongo/s/commands/cluster_plan_cache_cmd.cpp b/src/mongo/s/commands/cluster_plan_cache_cmd.cpp index b29eff8b0b2..65f149927d0 100644 --- a/src/mongo/s/commands/cluster_plan_cache_cmd.cpp +++ b/src/mongo/s/commands/cluster_plan_cache_cmd.cpp @@ -33,11 +33,11 @@ #include "mongo/db/commands.h" #include "mongo/db/query/collation/collation_spec.h" #include "mongo/s/commands/strategy.h" -#include "mongo/s/config.h" #include "mongo/s/grid.h" #include "mongo/s/stale_exception.h" namespace mongo { +namespace { using std::string; using std::stringstream; @@ -153,8 +153,6 @@ bool ClusterPlanCacheCmd::run(OperationContext* opCtx, // Register plan cache commands at startup // -namespace { - MONGO_INITIALIZER(RegisterPlanCacheCommands)(InitializerContext* context) { // Leaked intentionally: a Command registers itself when constructed. @@ -174,5 +172,4 @@ MONGO_INITIALIZER(RegisterPlanCacheCommands)(InitializerContext* context) { } } // namespace - } // namespace mongo diff --git a/src/mongo/s/commands/cluster_shard_collection_cmd.cpp b/src/mongo/s/commands/cluster_shard_collection_cmd.cpp index 7692e764e02..a1d99a608e6 100644 --- a/src/mongo/s/commands/cluster_shard_collection_cmd.cpp +++ b/src/mongo/s/commands/cluster_shard_collection_cmd.cpp @@ -53,15 +53,12 @@ #include "mongo/s/balancer_configuration.h" #include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/catalog_cache.h" -#include "mongo/s/chunk_manager.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/commands/cluster_write.h" -#include "mongo/s/config.h" #include "mongo/s/config_server_client.h" #include "mongo/s/grid.h" #include "mongo/s/migration_secondary_throttle_options.h" #include "mongo/s/shard_util.h" -#include "mongo/s/sharding_raii.h" #include "mongo/util/log.h" namespace mongo { @@ -187,19 +184,21 @@ public: auto const catalogClient = Grid::get(opCtx)->catalogClient(opCtx); auto const shardRegistry = Grid::get(opCtx)->shardRegistry(); + auto const catalogCache = Grid::get(opCtx)->catalogCache(); - auto scopedShardedDb = uassertStatusOK(ScopedShardDatabase::getExisting(opCtx, nss.db())); - const auto config = scopedShardedDb.db(); + auto dbInfo = uassertStatusOK(catalogCache->getDatabase(opCtx, nss.db())); // Ensure sharding is allowed on the database uassert(ErrorCodes::IllegalOperation, str::stream() << "sharding not enabled for db " << nss.db(), - config->isShardingEnabled()); + dbInfo.shardingEnabled()); + + auto routingInfo = uassertStatusOK(catalogCache->getCollectionRoutingInfo(opCtx, nss)); // Ensure that the collection is not sharded already uassert(ErrorCodes::IllegalOperation, str::stream() << "sharding already enabled for collection " << nss.ns(), - !config->isSharded(nss.ns())); + !routingInfo.cm()); // NOTE: We *must* take ownership of the key here - otherwise the shared BSONObj becomes // corrupt as soon as the command ends. @@ -279,13 +278,7 @@ public: } // The rest of the checks require a connection to the primary db - const ConnectionString shardConnString = [&]() { - const auto shard = - uassertStatusOK(shardRegistry->getShard(opCtx, config->getPrimaryId())); - return shard->getConnString(); - }(); - - ScopedDbConnection conn(shardConnString); + ScopedDbConnection conn(routingInfo.primary()->getConnString()); // Retrieve the collection metadata in order to verify that it is legal to shard this // collection. @@ -590,17 +583,21 @@ public: initSplits, std::set<ShardId>{})); - // Make sure the cached metadata for the collection knows that we are now sharded - config->getChunkManager(opCtx, nss.ns(), true /* reload */); - result << "collectionsharded" << nss.ns(); + // Make sure the cached metadata for the collection knows that we are now sharded + catalogCache->invalidateShardedCollection(nss); + // Only initially move chunks when using a hashed shard key if (isHashedShardKey && isEmpty) { - // Reload the new config info. If we created more than one initial chunk, then - // we need to move them around to balance. - auto chunkManager = config->getChunkManager(opCtx, nss.ns(), true); - ChunkMap chunkMap = chunkManager->getChunkMap(); + routingInfo = uassertStatusOK(catalogCache->getCollectionRoutingInfo(opCtx, nss)); + uassert(ErrorCodes::ConflictingOperationInProgress, + "Collection was successfully written as sharded but got dropped before it " + "could be evenly distributed", + routingInfo.cm()); + auto chunkManager = routingInfo.cm(); + + const auto chunkMap = chunkManager->chunkMap(); // 2. Move and commit each "big chunk" to a different shard. int i = 0; @@ -646,7 +643,13 @@ public: } // Reload the config info, after all the migrations - chunkManager = config->getChunkManager(opCtx, nss.ns(), true); + catalogCache->invalidateShardedCollection(nss); + routingInfo = uassertStatusOK(catalogCache->getCollectionRoutingInfo(opCtx, nss)); + uassert(ErrorCodes::ConflictingOperationInProgress, + "Collection was successfully written as sharded but got dropped before it " + "could be evenly distributed", + routingInfo.cm()); + chunkManager = routingInfo.cm(); // 3. Subdivide the big chunks by splitting at each of the points in "allSplits" // that we haven't already split by. @@ -689,10 +692,6 @@ public: subSplits.push_back(splitPoint); } } - - // Proactively refresh the chunk manager. Not really necessary, but this way it's - // immediately up-to-date the next time it's used. - config->getChunkManager(opCtx, nss.ns(), true); } return true; diff --git a/src/mongo/s/commands/cluster_split_cmd.cpp b/src/mongo/s/commands/cluster_split_cmd.cpp index b63da3b2ee7..80ed1526663 100644 --- a/src/mongo/s/commands/cluster_split_cmd.cpp +++ b/src/mongo/s/commands/cluster_split_cmd.cpp @@ -37,15 +37,13 @@ #include "mongo/db/auth/action_type.h" #include "mongo/db/auth/authorization_manager.h" #include "mongo/db/auth/authorization_session.h" -#include "mongo/db/client.h" #include "mongo/db/commands.h" #include "mongo/db/field_parser.h" #include "mongo/s/catalog_cache.h" -#include "mongo/s/chunk_manager.h" #include "mongo/s/client/shard_registry.h" +#include "mongo/s/commands/cluster_commands_common.h" #include "mongo/s/grid.h" #include "mongo/s/shard_util.h" -#include "mongo/s/sharding_raii.h" #include "mongo/util/log.h" namespace mongo { @@ -90,20 +88,19 @@ class SplitCollectionCmd : public Command { public: SplitCollectionCmd() : Command("split", false, "split") {} - virtual bool slaveOk() const { + bool slaveOk() const override { return true; } - virtual bool adminOnly() const { + bool adminOnly() const override { return true; } - - virtual bool supportsWriteConcern(const BSONObj& cmd) const override { + bool supportsWriteConcern(const BSONObj& cmd) const override { return false; } - virtual void help(std::stringstream& help) const { + void help(std::stringstream& help) const override { help << " example: - split the shard that contains give key\n" << " { split : 'alleyinsider.blog.posts' , find : { ts : 1 } }\n" << " example: - split the shard that contains the key with this as the middle\n" @@ -111,9 +108,9 @@ public: << " NOTE: this does not move the chunks, it just creates a logical separation."; } - virtual Status checkAuthForCommand(Client* client, - const std::string& dbname, - const BSONObj& cmdObj) { + Status checkAuthForCommand(Client* client, + const std::string& dbname, + const BSONObj& cmdObj) override { if (!AuthorizationSession::get(client)->isAuthorizedForActionsOnResource( ResourcePattern::forExactNamespace(NamespaceString(parseNs(dbname, cmdObj))), ActionType::splitChunk)) { @@ -122,19 +119,22 @@ public: return Status::OK(); } - virtual std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const { + std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const override { return parseNsFullyQualified(dbname, cmdObj); } - virtual bool run(OperationContext* opCtx, - const std::string& dbname, - BSONObj& cmdObj, - int options, - std::string& errmsg, - BSONObjBuilder& result) { + bool run(OperationContext* opCtx, + const std::string& dbname, + BSONObj& cmdObj, + int options, + std::string& errmsg, + BSONObjBuilder& result) override { const NamespaceString nss(parseNs(dbname, cmdObj)); - auto scopedCM = uassertStatusOK(ScopedChunkManager::refreshAndGet(opCtx, nss)); + auto routingInfo = uassertStatusOK( + Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh(opCtx, + nss)); + const auto cm = routingInfo.cm(); const BSONField<BSONObj> findField("find", BSONObj()); const BSONField<BSONArray> boundsField("bounds", BSONArray()); @@ -190,8 +190,6 @@ public: return false; } - auto const cm = scopedCM.cm(); - std::shared_ptr<Chunk> chunk; if (!find.isEmpty()) { @@ -275,9 +273,7 @@ public: ChunkRange(chunk->getMin(), chunk->getMax()), {splitPoint})); - // Proactively refresh the chunk manager. Not strictly necessary, but this way it's - // immediately up-to-date the next time it's used. - scopedCM.db()->getChunkManagerIfExists(opCtx, nss.ns(), true); + Grid::get(opCtx)->catalogCache()->onStaleConfigError(std::move(routingInfo)); return true; } diff --git a/src/mongo/s/commands/cluster_write.cpp b/src/mongo/s/commands/cluster_write.cpp index 730d5e8a178..8b8a3f2e644 100644 --- a/src/mongo/s/commands/cluster_write.cpp +++ b/src/mongo/s/commands/cluster_write.cpp @@ -39,14 +39,13 @@ #include "mongo/s/balancer_configuration.h" #include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/catalog/type_collection.h" -#include "mongo/s/chunk.h" +#include "mongo/s/catalog_cache.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/commands/chunk_manager_targeter.h" #include "mongo/s/commands/dbclient_multi_command.h" #include "mongo/s/config_server_client.h" #include "mongo/s/grid.h" #include "mongo/s/shard_util.h" -#include "mongo/s/sharding_raii.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" @@ -66,11 +65,6 @@ void toBatchError(const Status& status, BatchedCommandResponse* response) { dassert(response->isValid(NULL)); } -void reloadChunkManager(OperationContext* opCtx, const NamespaceString& nss) { - auto config = uassertStatusOK(ScopedShardDatabase::getExisting(opCtx, nss.db())); - config.db()->getChunkManagerIfExists(opCtx, nss.ns(), true); -} - /** * Given a maxChunkSize configuration and the number of chunks in a particular sharded collection, * returns an optimal chunk size to use in order to achieve a good ratio between number of chunks @@ -176,30 +170,31 @@ BSONObj findExtremeKeyForShard(OperationContext* opCtx, void splitIfNeeded(OperationContext* opCtx, const NamespaceString& nss, const TargeterStats& stats) { - auto scopedCMStatus = ScopedChunkManager::get(opCtx, nss); - if (!scopedCMStatus.isOK()) { - warning() << "failed to get collection information for " << nss - << " while checking for auto-split" << causedBy(scopedCMStatus.getStatus()); + auto routingInfoStatus = Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss); + if (!routingInfoStatus.isOK()) { + log() << "failed to get collection information for " << nss + << " while checking for auto-split" << causedBy(routingInfoStatus.getStatus()); return; } - const auto& scopedCM = scopedCMStatus.getValue(); + auto& routingInfo = routingInfoStatus.getValue(); - if (!scopedCM.cm()) { + if (!routingInfo.cm()) { return; } for (auto it = stats.chunkSizeDelta.cbegin(); it != stats.chunkSizeDelta.cend(); ++it) { std::shared_ptr<Chunk> chunk; try { - chunk = scopedCM.cm()->findIntersectingChunkWithSimpleCollation(it->first); + chunk = routingInfo.cm()->findIntersectingChunkWithSimpleCollation(it->first); } catch (const AssertionException& ex) { warning() << "could not find chunk while checking for auto-split: " << causedBy(redact(ex)); return; } - updateChunkWriteStatsAndSplitIfNeeded(opCtx, scopedCM.cm().get(), chunk.get(), it->second); + updateChunkWriteStatsAndSplitIfNeeded( + opCtx, routingInfo.cm().get(), chunk.get(), it->second); } } @@ -472,21 +467,22 @@ void updateChunkWriteStatsAndSplitIfNeeded(OperationContext* opCtx, << (suggestedMigrateChunk ? "" : (std::string) " (migrate suggested" + (shouldBalance ? ")" : ", but no migrations allowed)")); + // Reload the chunk manager after the split + auto routingInfo = uassertStatusOK( + Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh(opCtx, + nss)); + if (!shouldBalance || !suggestedMigrateChunk) { - reloadChunkManager(opCtx, nss); return; } // Top chunk optimization - try to move the top chunk out of this shard to prevent the hot - // spot - // from staying on a single shard. This is based on the assumption that succeeding inserts - // will - // fall on the top chunk. + // spot from staying on a single shard. This is based on the assumption that succeeding + // inserts will fall on the top chunk. // We need to use the latest chunk manager (after the split) in order to have the most // up-to-date view of the chunk we are about to move - auto scopedCM = uassertStatusOK(ScopedChunkManager::refreshAndGet(opCtx, nss)); - auto suggestedChunk = scopedCM.cm()->findIntersectingChunkWithSimpleCollation( + auto suggestedChunk = routingInfo.cm()->findIntersectingChunkWithSimpleCollation( suggestedMigrateChunk->getMin()); ChunkType chunkToMove; @@ -498,7 +494,8 @@ void updateChunkWriteStatsAndSplitIfNeeded(OperationContext* opCtx, uassertStatusOK(configsvr_client::rebalanceChunk(opCtx, chunkToMove)); - reloadChunkManager(opCtx, nss); + // Ensure the collection gets reloaded because of the move + Grid::get(opCtx)->catalogCache()->invalidateShardedCollection(nss); } catch (const DBException& ex) { chunk->randomizeBytesWritten(); diff --git a/src/mongo/s/commands/commands_public.cpp b/src/mongo/s/commands/commands_public.cpp index 7d14499e9d8..e219d9396f9 100644 --- a/src/mongo/s/commands/commands_public.cpp +++ b/src/mongo/s/commands/commands_public.cpp @@ -54,17 +54,14 @@ #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/catalog_cache.h" -#include "mongo/s/chunk_manager.h" #include "mongo/s/client/shard_connection.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/commands/cluster_commands_common.h" #include "mongo/s/commands/cluster_explain.h" #include "mongo/s/commands/run_on_all_shards_cmd.h" #include "mongo/s/commands/sharded_command_processing.h" -#include "mongo/s/config.h" #include "mongo/s/grid.h" #include "mongo/s/query/store_possible_cursor.h" -#include "mongo/s/sharding_raii.h" #include "mongo/s/stale_exception.h" #include "mongo/scripting/engine.h" #include "mongo/util/log.h" @@ -86,20 +83,19 @@ using std::vector; namespace { bool cursorCommandPassthrough(OperationContext* opCtx, - shared_ptr<DBConfig> conf, + StringData dbName, + const ShardId& shardId, const BSONObj& cmdObj, const NamespaceString& nss, int options, BSONObjBuilder* out) { - const auto shardStatus = - Grid::get(opCtx)->shardRegistry()->getShard(opCtx, conf->getPrimaryId()); + const auto shardStatus = Grid::get(opCtx)->shardRegistry()->getShard(opCtx, shardId); if (!shardStatus.isOK()) { - invariant(shardStatus.getStatus() == ErrorCodes::ShardNotFound); return Command::appendCommandStatus(*out, shardStatus.getStatus()); } const auto shard = shardStatus.getValue(); ScopedDbConnection conn(shard->getConnString()); - auto cursor = conn->query(str::stream() << conf->name() << ".$cmd", + auto cursor = conn->query(str::stream() << dbName << ".$cmd", cmdObj, -1, // nToReturn 0, // nToSkip @@ -155,11 +151,13 @@ StatusWith<BSONObj> getCollation(const BSONObj& cmdObj) { } class PublicGridCommand : public Command { -public: +protected: PublicGridCommand(const char* n, const char* oldname = NULL) : Command(n, false, oldname) {} + virtual bool slaveOk() const { return true; } + virtual bool adminOnly() const { return false; } @@ -170,41 +168,29 @@ public: return false; } - // all grid commands are designed not to lock - -protected: - bool passthrough(OperationContext* opCtx, - DBConfig* conf, - const BSONObj& cmdObj, - BSONObjBuilder& result) { - return _passthrough(opCtx, conf->name(), conf, cmdObj, 0, result); - } - bool adminPassthrough(OperationContext* opCtx, - DBConfig* conf, + const ShardId& shardId, const BSONObj& cmdObj, BSONObjBuilder& result) { - return _passthrough(opCtx, "admin", conf, cmdObj, 0, result); + return passthrough(opCtx, "admin", shardId, cmdObj, result); } bool passthrough(OperationContext* opCtx, - DBConfig* conf, + const std::string& db, + const ShardId& shardId, const BSONObj& cmdObj, - int options, BSONObjBuilder& result) { - return _passthrough(opCtx, conf->name(), conf, cmdObj, options, result); + return passthrough(opCtx, db, shardId, cmdObj, 0, result); } -private: - bool _passthrough(OperationContext* opCtx, - const string& db, - DBConfig* conf, - const BSONObj& cmdObj, - int options, - BSONObjBuilder& result) { - const auto shardStatus = - Grid::get(opCtx)->shardRegistry()->getShard(opCtx, conf->getPrimaryId()); - const auto shard = uassertStatusOK(shardStatus); + bool passthrough(OperationContext* opCtx, + const std::string& db, + const ShardId& shardId, + const BSONObj& cmdObj, + int options, + BSONObjBuilder& result) { + const auto shard = + uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, shardId)); ShardConnection conn(shard->getConnString(), ""); @@ -223,53 +209,50 @@ private: }; class AllShardsCollectionCommand : public RunOnAllShardsCommand { -public: +protected: AllShardsCollectionCommand(const char* n, const char* oldname = NULL, bool useShardConn = false, bool implicitCreateDb = false) : RunOnAllShardsCommand(n, oldname, useShardConn, implicitCreateDb) {} - virtual void getShardIds(OperationContext* opCtx, - const string& dbName, - BSONObj& cmdObj, - vector<ShardId>& shardIds) { + void getShardIds(OperationContext* opCtx, + const string& dbName, + BSONObj& cmdObj, + vector<ShardId>& shardIds) override { const NamespaceString nss(parseNsCollectionRequired(dbName, cmdObj)); - auto status = Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, dbName); - uassertStatusOK(status.getStatus()); - - shared_ptr<DBConfig> conf = status.getValue(); - - if (!conf->isSharded(nss.ns())) { - shardIds.push_back(conf->getPrimaryId()); - } else { + const auto routingInfo = + uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss)); + if (routingInfo.cm()) { + // If it's a sharded collection, send it to all shards Grid::get(opCtx)->shardRegistry()->getAllShardIds(&shardIds); + } else { + // Otherwise just send it to the primary shard for the database + shardIds.push_back(routingInfo.primaryId()); } } }; class NotAllowedOnShardedCollectionCmd : public PublicGridCommand { -public: +protected: NotAllowedOnShardedCollectionCmd(const char* n) : PublicGridCommand(n) {} - virtual bool run(OperationContext* opCtx, - const string& dbName, - BSONObj& cmdObj, - int options, - string& errmsg, - BSONObjBuilder& result) { + bool run(OperationContext* opCtx, + const string& dbName, + BSONObj& cmdObj, + int options, + string& errmsg, + BSONObjBuilder& result) override { const NamespaceString nss(parseNs(dbName, cmdObj)); - auto conf = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, dbName)); - if (!conf->isSharded(nss.ns())) { - return passthrough(opCtx, conf.get(), cmdObj, options, result); - } + const auto routingInfo = + uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss)); + uassert(ErrorCodes::IllegalOperation, + str::stream() << "can't do command: " << getName() << " on sharded collection", + !routingInfo.cm()); - return appendCommandStatus( - result, - Status(ErrorCodes::IllegalOperation, - str::stream() << "can't do command: " << getName() << " on sharded collection")); + return passthrough(opCtx, dbName, routingInfo.primaryId(), cmdObj, options, result); } }; @@ -407,6 +390,7 @@ public: class ReIndexCmd : public AllShardsCollectionCommand { public: ReIndexCmd() : AllShardsCollectionCommand("reIndex") {} + virtual void addRequiredPrivileges(const std::string& dbname, const BSONObj& cmdObj, std::vector<Privilege>* out) { @@ -418,6 +402,7 @@ public: virtual bool supportsWriteConcern(const BSONObj& cmd) const override { return false; } + } reIndexCmd; class CollectionModCmd : public AllShardsCollectionCommand { @@ -434,12 +419,13 @@ public: virtual bool supportsWriteConcern(const BSONObj& cmd) const override { return true; } -} collectionModCmd; +} collectionModCmd; class ValidateCmd : public PublicGridCommand { public: ValidateCmd() : PublicGridCommand("validate") {} + virtual void addRequiredPrivileges(const std::string& dbname, const BSONObj& cmdObj, std::vector<Privilege>* out) { @@ -460,13 +446,13 @@ public: BSONObjBuilder& output) { const NamespaceString nss(parseNsCollectionRequired(dbName, cmdObj)); - auto conf = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, dbName)); - if (!conf->isSharded(nss.ns())) { - return passthrough(opCtx, conf.get(), cmdObj, output); + auto routingInfo = + uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss)); + if (!routingInfo.cm()) { + return passthrough(opCtx, dbName, routingInfo.primaryId(), cmdObj, output); } - shared_ptr<ChunkManager> cm = conf->getChunkManager(opCtx, nss.ns()); - massert(40051, "chunk manager should not be null", cm); + const auto cm = routingInfo.cm(); vector<Strategy::CommandResult> results; const BSONObj query; @@ -512,33 +498,35 @@ public: } return true; } + } validateCmd; class CreateCmd : public PublicGridCommand { public: CreateCmd() : PublicGridCommand("create") {} - virtual Status checkAuthForCommand(Client* client, - const std::string& dbname, - const BSONObj& cmdObj) { + + Status checkAuthForCommand(Client* client, + const std::string& dbname, + const BSONObj& cmdObj) override { const NamespaceString nss(parseNsCollectionRequired(dbname, cmdObj)); return AuthorizationSession::get(client)->checkAuthForCreate(nss, cmdObj); } - virtual bool supportsWriteConcern(const BSONObj& cmd) const override { + + bool supportsWriteConcern(const BSONObj& cmd) const override { return true; } + bool run(OperationContext* opCtx, const string& dbName, BSONObj& cmdObj, int, string& errmsg, - BSONObjBuilder& result) { - auto dbStatus = ScopedShardDatabase::getOrCreate(opCtx, dbName); - if (!dbStatus.isOK()) { - return appendCommandStatus(result, dbStatus.getStatus()); - } + BSONObjBuilder& result) override { + uassertStatusOK(createShardDatabase(opCtx, dbName)); - auto scopedDb = std::move(dbStatus.getValue()); - return passthrough(opCtx, scopedDb.db(), cmdObj, result); + const auto dbInfo = + uassertStatusOK(Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, dbName)); + return passthrough(opCtx, dbName, dbInfo.primaryId(), cmdObj, result); } } createCmd; @@ -546,23 +534,27 @@ public: class RenameCollectionCmd : public PublicGridCommand { public: RenameCollectionCmd() : PublicGridCommand("renameCollection") {} + virtual Status checkAuthForCommand(Client* client, const std::string& dbname, const BSONObj& cmdObj) { return rename_collection::checkAuthForRenameCollectionCommand(client, dbname, cmdObj); } + virtual bool adminOnly() const { return true; } + virtual bool supportsWriteConcern(const BSONObj& cmd) const override { return true; } + bool run(OperationContext* opCtx, const string& dbName, BSONObj& cmdObj, - int, + int options, string& errmsg, - BSONObjBuilder& result) { + BSONObjBuilder& result) override { const auto fullNsFromElt = cmdObj.firstElement(); uassert(ErrorCodes::InvalidNamespace, "'renameCollection' must be of type String", @@ -571,10 +563,6 @@ public: uassert(ErrorCodes::InvalidNamespace, str::stream() << "Invalid source namespace: " << fullnsFrom.ns(), fullnsFrom.isValid()); - const string dbNameFrom = fullnsFrom.db().toString(); - - auto confFrom = - uassertStatusOK(Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, dbNameFrom)); const auto fullnsToElt = cmdObj["to"]; uassert(ErrorCodes::InvalidNamespace, @@ -584,24 +572,22 @@ public: uassert(ErrorCodes::InvalidNamespace, str::stream() << "Invalid target namespace: " << fullnsTo.ns(), fullnsTo.isValid()); - const string dbNameTo = fullnsTo.db().toString(); - auto confTo = - uassertStatusOK(Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, dbNameTo)); - uassert( - 13138, "You can't rename a sharded collection", !confFrom->isSharded(fullnsFrom.ns())); - uassert( - 13139, "You can't rename to a sharded collection", !confTo->isSharded(fullnsTo.ns())); + const auto fromRoutingInfo = uassertStatusOK( + Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, fullnsFrom)); + uassert(13138, "You can't rename a sharded collection", !fromRoutingInfo.cm()); - auto shardTo = confTo->getPrimaryId(); - auto shardFrom = confFrom->getPrimaryId(); + const auto toRoutingInfo = uassertStatusOK( + Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, fullnsTo)); + uassert(13139, "You can't rename to a sharded collection", !toRoutingInfo.cm()); uassert(13137, "Source and destination collections must be on same shard", - shardFrom == shardTo); + fromRoutingInfo.primaryId() == toRoutingInfo.primaryId()); - return adminPassthrough(opCtx, confFrom.get(), cmdObj, result); + return adminPassthrough(opCtx, fromRoutingInfo.primaryId(), cmdObj, result); } + } renameCollectionCmd; class CopyDBCmd : public PublicGridCommand { @@ -637,14 +623,14 @@ public: "Invalid todb argument", NamespaceString::validDBName(todb, NamespaceString::DollarInDbNameBehavior::Allow)); - auto scopedToDb = uassertStatusOK(ScopedShardDatabase::getOrCreate(opCtx, todb)); + auto toDbInfo = uassertStatusOK(createShardDatabase(opCtx, todb)); uassert(ErrorCodes::IllegalOperation, "Cannot copy to a sharded database", - !scopedToDb.db()->isShardingEnabled()); + !toDbInfo.shardingEnabled()); - const string fromhost = cmdObj.getStringField("fromhost"); + const std::string fromhost = cmdObj.getStringField("fromhost"); if (!fromhost.empty()) { - return adminPassthrough(opCtx, scopedToDb.db(), cmdObj, result); + return adminPassthrough(opCtx, toDbInfo.primaryId(), cmdObj, result); } const auto fromDbElt = cmdObj["fromdb"]; @@ -657,10 +643,10 @@ public: "invalid fromdb argument", NamespaceString::validDBName(fromdb, NamespaceString::DollarInDbNameBehavior::Allow)); - auto scopedFromDb = uassertStatusOK(ScopedShardDatabase::getExisting(opCtx, fromdb)); + auto fromDbInfo = uassertStatusOK(createShardDatabase(opCtx, fromdb)); uassert(ErrorCodes::IllegalOperation, "Cannot copy from a sharded database", - !scopedFromDb.db()->isShardingEnabled()); + !fromDbInfo.shardingEnabled()); BSONObjBuilder b; BSONForEach(e, cmdObj) { @@ -670,12 +656,12 @@ public: } { - const auto shard = uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard( - opCtx, scopedFromDb.db()->getPrimaryId())); + const auto shard = uassertStatusOK( + Grid::get(opCtx)->shardRegistry()->getShard(opCtx, fromDbInfo.primaryId())); b.append("fromhost", shard->getConnString().toString()); } - return adminPassthrough(opCtx, scopedToDb.db(), b.obj(), result); + return adminPassthrough(opCtx, toDbInfo.primaryId(), b.obj(), result); } } clusterCopyDBCmd; @@ -683,38 +669,38 @@ public: class CollectionStats : public PublicGridCommand { public: CollectionStats() : PublicGridCommand("collStats", "collstats") {} - virtual void addRequiredPrivileges(const std::string& dbname, - const BSONObj& cmdObj, - std::vector<Privilege>* out) { + + void addRequiredPrivileges(const std::string& dbname, + const BSONObj& cmdObj, + std::vector<Privilege>* out) override { ActionSet actions; actions.addAction(ActionType::collStats); out->push_back(Privilege(parseResourcePattern(dbname, cmdObj), actions)); } - virtual bool supportsWriteConcern(const BSONObj& cmd) const override { + bool supportsWriteConcern(const BSONObj& cmd) const override { return false; } bool run(OperationContext* opCtx, const string& dbName, BSONObj& cmdObj, - int, + int options, string& errmsg, - BSONObjBuilder& result) { + BSONObjBuilder& result) override { const NamespaceString nss(parseNsCollectionRequired(dbName, cmdObj)); - auto conf = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, dbName)); - if (!conf->isSharded(nss.ns())) { + auto routingInfo = + uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss)); + if (!routingInfo.cm()) { result.appendBool("sharded", false); - result.append("primary", conf->getPrimaryId().toString()); - - return passthrough(opCtx, conf.get(), cmdObj, result); + result.append("primary", routingInfo.primaryId().toString()); + return passthrough(opCtx, dbName, routingInfo.primaryId(), cmdObj, result); } - result.appendBool("sharded", true); + const auto cm = routingInfo.cm(); - shared_ptr<ChunkManager> cm = conf->getChunkManager(opCtx, nss.ns()); - massert(12594, "how could chunk manager be null!", cm); + result.appendBool("sharded", true); BSONObjBuilder shardStats; map<string, long long> counts; @@ -860,35 +846,38 @@ public: class DataSizeCmd : public PublicGridCommand { public: DataSizeCmd() : PublicGridCommand("dataSize", "datasize") {} - virtual string parseNs(const string& dbname, const BSONObj& cmdObj) const override { + + std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const override { return parseNsFullyQualified(dbname, cmdObj); } - virtual void addRequiredPrivileges(const std::string& dbname, - const BSONObj& cmdObj, - std::vector<Privilege>* out) { + + void addRequiredPrivileges(const std::string& dbname, + const BSONObj& cmdObj, + std::vector<Privilege>* out) override { ActionSet actions; actions.addAction(ActionType::find); out->push_back(Privilege(parseResourcePattern(dbname, cmdObj), actions)); } - virtual bool supportsWriteConcern(const BSONObj& cmd) const override { + + bool supportsWriteConcern(const BSONObj& cmd) const override { return false; } + bool run(OperationContext* opCtx, const string& dbName, BSONObj& cmdObj, - int, + int options, string& errmsg, - BSONObjBuilder& result) { - const string fullns = parseNs(dbName, cmdObj); - const string nsDBName = nsToDatabase(fullns); + BSONObjBuilder& result) override { + const NamespaceString nss(parseNs(dbName, cmdObj)); - auto conf = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, nsDBName)); - if (!conf->isSharded(fullns)) { - return passthrough(opCtx, conf.get(), cmdObj, result); + auto routingInfo = + uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss)); + if (!routingInfo.cm()) { + return passthrough(opCtx, dbName, routingInfo.primaryId(), cmdObj, result); } - shared_ptr<ChunkManager> cm = conf->getChunkManager(opCtx, fullns); - massert(13407, "how could chunk manager be null!", cm); + const auto cm = routingInfo.cm(); BSONObj min = cmdObj.getObjectField("min"); BSONObj max = cmdObj.getObjectField("max"); @@ -919,13 +908,12 @@ public: for (const ShardId& shardId : shardIds) { const auto shardStatus = Grid::get(opCtx)->shardRegistry()->getShard(opCtx, shardId); if (!shardStatus.isOK()) { - invariant(shardStatus.getStatus() == ErrorCodes::ShardNotFound); continue; } ScopedDbConnection conn(shardStatus.getValue()->getConnString()); BSONObj res; - bool ok = conn->runCommand(conf->name(), cmdObj, res); + bool ok = conn->runCommand(dbName, cmdObj, res); conn.done(); if (!ok) { @@ -949,19 +937,20 @@ public: class ConvertToCappedCmd : public NotAllowedOnShardedCollectionCmd { public: ConvertToCappedCmd() : NotAllowedOnShardedCollectionCmd("convertToCapped") {} - virtual void addRequiredPrivileges(const std::string& dbname, - const BSONObj& cmdObj, - std::vector<Privilege>* out) { + + void addRequiredPrivileges(const std::string& dbname, + const BSONObj& cmdObj, + std::vector<Privilege>* out) override { ActionSet actions; actions.addAction(ActionType::convertToCapped); out->push_back(Privilege(parseResourcePattern(dbname, cmdObj), actions)); } - virtual bool supportsWriteConcern(const BSONObj& cmd) const override { + bool supportsWriteConcern(const BSONObj& cmd) const override { return true; } - virtual std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const { + std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const override { return parseNsCollectionRequired(dbname, cmdObj).ns(); } @@ -970,23 +959,24 @@ public: class GroupCmd : public NotAllowedOnShardedCollectionCmd { public: GroupCmd() : NotAllowedOnShardedCollectionCmd("group") {} - virtual void addRequiredPrivileges(const std::string& dbname, - const BSONObj& cmdObj, - std::vector<Privilege>* out) { + + void addRequiredPrivileges(const std::string& dbname, + const BSONObj& cmdObj, + std::vector<Privilege>* out) override { ActionSet actions; actions.addAction(ActionType::find); out->push_back(Privilege(parseResourcePattern(dbname, cmdObj), actions)); } - virtual bool supportsWriteConcern(const BSONObj& cmd) const override { + bool supportsWriteConcern(const BSONObj& cmd) const override { return false; } - virtual bool passOptions() const { + bool passOptions() const override { return true; } - virtual std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const { + std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const override { const auto nsElt = cmdObj.firstElement().embeddedObjectUserCheck()["ns"]; uassert(ErrorCodes::InvalidNamespace, "'ns' must be of type String", @@ -1003,7 +993,7 @@ public: const BSONObj& cmdObj, ExplainCommon::Verbosity verbosity, const rpc::ServerSelectionMetadata& serverSelectionMetadata, - BSONObjBuilder* out) const { + BSONObjBuilder* out) const override { // We will time how long it takes to run the commands on the shards. Timer timer; @@ -1019,36 +1009,17 @@ public: const NamespaceString nss(parseNs(dbname, cmdObj)); - // Note that this implementation will not handle targeting retries and fails when the - // sharding metadata is too stale - auto status = Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, nss.db()); - if (!status.isOK()) { - return Status(status.getStatus().code(), - str::stream() << "Passthrough command failed: " << command.toString() - << " on ns " - << nss.ns() - << ". Caused by " - << causedBy(status.getStatus())); - } - - shared_ptr<DBConfig> conf = status.getValue(); - if (conf->isSharded(nss.ns())) { - return Status(ErrorCodes::IllegalOperation, - str::stream() << "Passthrough command failed: " << command.toString() - << " on ns " - << nss.ns() - << ". Cannot run on sharded namespace."); - } - - const auto primaryShardStatus = - Grid::get(opCtx)->shardRegistry()->getShard(opCtx, conf->getPrimaryId()); - if (!primaryShardStatus.isOK()) { - return primaryShardStatus.getStatus(); - } + auto routingInfo = + uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss)); + uassert(ErrorCodes::IllegalOperation, + str::stream() << "Passthrough command failed: " << command.toString() << " on ns " + << nss.ns() + << ". Cannot run on sharded namespace.", + !routingInfo.cm()); BSONObj shardResult; try { - ShardConnection conn(primaryShardStatus.getValue()->getConnString(), ""); + ShardConnection conn(routingInfo.primary()->getConnString(), ""); // TODO: this can throw a stale config when mongos is not up-to-date -- fix. if (!conn->runCommand(nss.db().toString(), command, shardResult, options)) { @@ -1060,6 +1031,7 @@ public: << "; result: " << shardResult); } + conn.done(); } catch (const DBException& ex) { return ex.toStatus(); @@ -1067,9 +1039,9 @@ public: // Fill out the command result. Strategy::CommandResult cmdResult; - cmdResult.shardTargetId = conf->getPrimaryId(); + cmdResult.shardTargetId = routingInfo.primaryId(); cmdResult.result = shardResult; - cmdResult.target = primaryShardStatus.getValue()->getConnString(); + cmdResult.target = routingInfo.primary()->getConnString(); return ClusterExplain::buildExplainResult( opCtx, {cmdResult}, ClusterExplain::kSingleShard, timer.millis(), out); @@ -1080,15 +1052,18 @@ public: class SplitVectorCmd : public NotAllowedOnShardedCollectionCmd { public: SplitVectorCmd() : NotAllowedOnShardedCollectionCmd("splitVector") {} - virtual bool passOptions() const { + + bool passOptions() const override { return true; } - virtual bool supportsWriteConcern(const BSONObj& cmd) const override { + + bool supportsWriteConcern(const BSONObj& cmd) const override { return false; } - virtual Status checkAuthForCommand(Client* client, - const std::string& dbname, - const BSONObj& cmdObj) { + + Status checkAuthForCommand(Client* client, + const std::string& dbname, + const BSONObj& cmdObj) override { if (!AuthorizationSession::get(client)->isAuthorizedForActionsOnResource( ResourcePattern::forExactNamespace(NamespaceString(parseNs(dbname, cmdObj))), ActionType::splitVector)) { @@ -1096,43 +1071,49 @@ public: } return Status::OK(); } - virtual bool run(OperationContext* opCtx, - const string& dbName, - BSONObj& cmdObj, - int options, - string& errmsg, - BSONObjBuilder& result) { - string x = parseNs(dbName, cmdObj); - if (!str::startsWith(x, dbName)) { - errmsg = str::stream() << "doing a splitVector across dbs isn't supported via mongos"; - return false; - } + + std::string parseNs(const string& dbname, const BSONObj& cmdObj) const override { + return parseNsFullyQualified(dbname, cmdObj); + } + + bool run(OperationContext* opCtx, + const string& dbName, + BSONObj& cmdObj, + int options, + string& errmsg, + BSONObjBuilder& result) override { + const std::string ns = parseNs(dbName, cmdObj); + uassert(ErrorCodes::IllegalOperation, + "Performing splitVector across dbs isn't supported via mongos", + str::startsWith(ns, dbName)); + return NotAllowedOnShardedCollectionCmd::run( opCtx, dbName, cmdObj, options, errmsg, result); } - virtual std::string parseNs(const string& dbname, const BSONObj& cmdObj) const { - return parseNsFullyQualified(dbname, cmdObj); - } } splitVectorCmd; class DistinctCmd : public PublicGridCommand { public: DistinctCmd() : PublicGridCommand("distinct") {} - virtual void help(stringstream& help) const { + + void help(stringstream& help) const override { help << "{ distinct : 'collection name' , key : 'a.b' , query : {} }"; } - virtual bool passOptions() const { + + bool passOptions() const override { return true; } - virtual void addRequiredPrivileges(const std::string& dbname, - const BSONObj& cmdObj, - std::vector<Privilege>* out) { + + void addRequiredPrivileges(const std::string& dbname, + const BSONObj& cmdObj, + std::vector<Privilege>* out) override { ActionSet actions; actions.addAction(ActionType::find); out->push_back(Privilege(parseResourcePattern(dbname, cmdObj), actions)); } - virtual bool supportsWriteConcern(const BSONObj& cmd) const override { + + bool supportsWriteConcern(const BSONObj& cmd) const override { return false; } @@ -1141,18 +1122,13 @@ public: BSONObj& cmdObj, int options, string& errmsg, - BSONObjBuilder& result) { + BSONObjBuilder& result) override { const NamespaceString nss(parseNsCollectionRequired(dbName, cmdObj)); - auto status = Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, dbName); - if (!status.isOK()) { - return appendEmptyResultSet(result, status.getStatus(), nss.ns()); - } - - shared_ptr<DBConfig> conf = status.getValue(); - if (!conf->isSharded(nss.ns())) { - - if (passthrough(opCtx, conf.get(), cmdObj, options, result)) { + auto routingInfo = + uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss)); + if (!routingInfo.cm()) { + if (passthrough(opCtx, dbName, routingInfo.primaryId(), cmdObj, options, result)) { return true; } @@ -1192,10 +1168,9 @@ public: return false; } - shared_ptr<ChunkManager> cm = conf->getChunkManager(opCtx, nss.ns()); - massert(10420, "how could chunk manager be null!", cm); + const auto cm = routingInfo.cm(); - BSONObj query = getQuery(cmdObj); + auto query = getQuery(cmdObj); auto queryCollation = getCollation(cmdObj); if (!queryCollation.isOK()) { return appendEmptyResultSet(result, queryCollation.getStatus(), nss.ns()); @@ -1230,7 +1205,7 @@ public: ShardConnection conn(shardStatus.getValue()->getConnString(), nss.ns()); BSONObj res; - bool ok = conn->runCommand(conf->name(), cmdObj, res, options); + bool ok = conn->runCommand(nss.db().toString(), cmdObj, res, options); conn.done(); if (!ok) { @@ -1340,16 +1315,18 @@ public: return ClusterExplain::buildExplainResult( opCtx, shardResults, mongosStageName, millisElapsed, out); } + } disinctCmd; class FileMD5Cmd : public PublicGridCommand { public: FileMD5Cmd() : PublicGridCommand("filemd5") {} - virtual void help(stringstream& help) const { + + void help(stringstream& help) const override { help << " example: { filemd5 : ObjectId(aaaaaaa) , root : \"fs\" }"; } - virtual std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const { + std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const override { std::string collectionName; if (const auto rootElt = cmdObj["root"]) { uassert(ErrorCodes::InvalidNamespace, @@ -1363,31 +1340,32 @@ public: return NamespaceString(dbname, collectionName).ns(); } - virtual void addRequiredPrivileges(const std::string& dbname, - const BSONObj& cmdObj, - std::vector<Privilege>* out) { + void addRequiredPrivileges(const std::string& dbname, + const BSONObj& cmdObj, + std::vector<Privilege>* out) override { out->push_back(Privilege(parseResourcePattern(dbname, cmdObj), ActionType::find)); } - virtual bool supportsWriteConcern(const BSONObj& cmd) const override { + bool supportsWriteConcern(const BSONObj& cmd) const override { return false; } bool run(OperationContext* opCtx, const string& dbName, BSONObj& cmdObj, - int, + int options, string& errmsg, - BSONObjBuilder& result) { + BSONObjBuilder& result) override { const NamespaceString nss(parseNs(dbName, cmdObj)); - auto conf = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, dbName)); - if (!conf->isSharded(nss.ns())) { - return passthrough(opCtx, conf.get(), cmdObj, result); + auto routingInfo = + uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss)); + if (!routingInfo.cm()) { + return passthrough(opCtx, dbName, routingInfo.primaryId(), cmdObj, result); } - shared_ptr<ChunkManager> cm = conf->getChunkManager(opCtx, nss.ns()); - massert(13091, "how could chunk manager be null!", cm); + const auto cm = routingInfo.cm(); + if (SimpleBSONObjComparator::kInstance.evaluate(cm->getShardKeyPattern().toBSON() == BSON("files_id" << 1))) { BSONObj finder = BSON("files_id" << cmdObj.firstElement()); @@ -1461,13 +1439,15 @@ public: errmsg = string("sharded filemd5 failed because: ") + res["errmsg"].valuestrsafe(); + return false; } - uassert(16246, - "Shard " + conf->name() + - " is too old to support GridFS sharded by {files_id:1, n:1}", - res.hasField("md5state")); + uassert( + 16246, + str::stream() << "Shard for database " << nss.db() + << " is too old to support GridFS sharded by {files_id:1, n:1}", + res.hasField("md5state")); lastResult = res; int nNext = res["numChunks"].numberInt(); @@ -1497,20 +1477,24 @@ public: class Geo2dFindNearCmd : public PublicGridCommand { public: Geo2dFindNearCmd() : PublicGridCommand("geoNear") {} - void help(stringstream& h) const { + + void help(stringstream& h) const override { h << "http://dochub.mongodb.org/core/geo#GeospatialIndexing-geoNearCommand"; } - virtual bool passOptions() const { + + bool passOptions() const override { return true; } - virtual void addRequiredPrivileges(const std::string& dbname, - const BSONObj& cmdObj, - std::vector<Privilege>* out) { + + void addRequiredPrivileges(const std::string& dbname, + const BSONObj& cmdObj, + std::vector<Privilege>* out) override { ActionSet actions; actions.addAction(ActionType::find); out->push_back(Privilege(parseResourcePattern(dbname, cmdObj), actions)); } - virtual bool supportsWriteConcern(const BSONObj& cmd) const override { + + bool supportsWriteConcern(const BSONObj& cmd) const override { return false; } @@ -1519,16 +1503,16 @@ public: BSONObj& cmdObj, int options, string& errmsg, - BSONObjBuilder& result) { + BSONObjBuilder& result) override { const NamespaceString nss(parseNsCollectionRequired(dbName, cmdObj)); - auto conf = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, dbName)); - if (!conf->isSharded(nss.ns())) { - return passthrough(opCtx, conf.get(), cmdObj, options, result); + auto routingInfo = + uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss)); + if (!routingInfo.cm()) { + return passthrough(opCtx, dbName, routingInfo.primaryId(), cmdObj, result); } - shared_ptr<ChunkManager> cm = conf->getChunkManager(opCtx, nss.ns()); - massert(13500, "how could chunk manager be null!", cm); + const auto cm = routingInfo.cm(); BSONObj query = getQuery(cmdObj); auto collation = getCollation(cmdObj); @@ -1628,64 +1612,76 @@ public: return true; } + } geo2dFindNearCmd; -class CompactCmd : public PublicGridCommand { +class CompactCmd : public Command { public: - CompactCmd() : PublicGridCommand("compact") {} - virtual void addRequiredPrivileges(const std::string& dbname, - const BSONObj& cmdObj, - std::vector<Privilege>* out) { + CompactCmd() : Command("compact") {} + + bool slaveOk() const override { + return true; + } + + bool adminOnly() const override { + return false; + } + + void addRequiredPrivileges(const std::string& dbname, + const BSONObj& cmdObj, + std::vector<Privilege>* out) override { ActionSet actions; actions.addAction(ActionType::compact); out->push_back(Privilege(parseResourcePattern(dbname, cmdObj), actions)); } - virtual bool supportsWriteConcern(const BSONObj& cmd) const override { + + bool supportsWriteConcern(const BSONObj& cmd) const override { return false; } - virtual bool run(OperationContext* opCtx, - const string& dbName, - BSONObj& cmdObj, - int, - string& errmsg, - BSONObjBuilder& result) { - errmsg = "compact not allowed through mongos"; - return false; + + bool run(OperationContext* opCtx, + const string& dbName, + BSONObj& cmdObj, + int options, + string& errmsg, + BSONObjBuilder& result) override { + uasserted(ErrorCodes::CommandNotSupported, "compact not allowed through mongos"); } + } compactCmd; class EvalCmd : public PublicGridCommand { public: EvalCmd() : PublicGridCommand("eval", "$eval") {} + virtual void addRequiredPrivileges(const std::string& dbname, const BSONObj& cmdObj, std::vector<Privilege>* out) { // $eval can do pretty much anything, so require all privileges. RoleGraph::generateUniversalPrivileges(out); } - virtual bool supportsWriteConcern(const BSONObj& cmd) const override { + + bool supportsWriteConcern(const BSONObj& cmd) const override { return false; } - virtual bool run(OperationContext* opCtx, - const string& dbName, - BSONObj& cmdObj, - int, - string& errmsg, - BSONObjBuilder& result) { + + bool run(OperationContext* opCtx, + const string& dbName, + BSONObj& cmdObj, + int options, + string& errmsg, + BSONObjBuilder& result) override { RARELY { warning() << "the eval command is deprecated" << startupWarningsLog; } - // $eval isn't allowed to access sharded collections, but we need to leave the - // shard to detect that. - auto status = Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, dbName); - if (!status.isOK()) { - return appendCommandStatus(result, status.getStatus()); - } - - shared_ptr<DBConfig> conf = status.getValue(); - return passthrough(opCtx, conf.get(), cmdObj, result); + // $eval isn't allowed to access sharded collections, but we need to leave the shard to + // detect that + const auto dbInfo = + uassertStatusOK(Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, dbName)); + return passthrough(opCtx, dbName, dbInfo.primaryId(), cmdObj, result); } + } evalCmd; class CmdListCollections final : public PublicGridCommand { @@ -1711,7 +1707,7 @@ public: str::stream() << "Not authorized to create users on db: " << dbname); } - virtual bool supportsWriteConcern(const BSONObj& cmd) const override { + bool supportsWriteConcern(const BSONObj& cmd) const override { return false; } @@ -1723,18 +1719,23 @@ public: BSONObjBuilder& result) final { auto nss = NamespaceString::makeListCollectionsNSS(dbName); - auto conf = Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, dbName); - if (!conf.isOK()) { - return appendEmptyResultSet(result, conf.getStatus(), dbName + ".$cmd.listCollections"); + auto dbInfoStatus = Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, dbName); + if (!dbInfoStatus.isOK()) { + return appendEmptyResultSet(result, dbInfoStatus.getStatus(), nss.ns()); } - return cursorCommandPassthrough(opCtx, conf.getValue(), cmdObj, nss, options, &result); + const auto& dbInfo = dbInfoStatus.getValue(); + + return cursorCommandPassthrough( + opCtx, dbName, dbInfo.primaryId(), cmdObj, nss, options, &result); } + } cmdListCollections; class CmdListIndexes final : public PublicGridCommand { public: CmdListIndexes() : PublicGridCommand("listIndexes") {} + virtual Status checkAuthForCommand(Client* client, const std::string& dbname, const BSONObj& cmdObj) { @@ -1757,7 +1758,7 @@ public: << ns.coll()); } - virtual bool supportsWriteConcern(const BSONObj& cmd) const override { + bool supportsWriteConcern(const BSONObj& cmd) const override { return false; } @@ -1767,18 +1768,15 @@ public: int options, string& errmsg, BSONObjBuilder& result) final { - auto conf = Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, dbName); - if (!conf.isOK()) { - return appendCommandStatus(result, conf.getStatus()); - } + const NamespaceString nss(parseNsCollectionRequired(dbName, cmdObj)); + + const auto routingInfo = + uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss)); - const NamespaceString targetNss(parseNsCollectionRequired(dbName, cmdObj)); - const NamespaceString commandNss = - NamespaceString::makeListIndexesNSS(targetNss.db(), targetNss.coll()); - dassert(targetNss == commandNss.getTargetNSForListIndexes()); + const auto commandNss = NamespaceString::makeListIndexesNSS(nss.db(), nss.coll()); return cursorCommandPassthrough( - opCtx, conf.getValue(), cmdObj, commandNss, options, &result); + opCtx, nss.db(), routingInfo.primaryId(), cmdObj, commandNss, options, &result); } } cmdListIndexes; diff --git a/src/mongo/s/commands/run_on_all_shards_cmd.cpp b/src/mongo/s/commands/run_on_all_shards_cmd.cpp index b534bf0628a..881b7d654ab 100644 --- a/src/mongo/s/commands/run_on_all_shards_cmd.cpp +++ b/src/mongo/s/commands/run_on_all_shards_cmd.cpp @@ -36,12 +36,12 @@ #include <set> #include "mongo/db/jsobj.h" +#include "mongo/s/catalog_cache.h" #include "mongo/s/client/shard.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/commands/cluster_commands_common.h" #include "mongo/s/commands/sharded_command_processing.h" #include "mongo/s/grid.h" -#include "mongo/s/sharding_raii.h" #include "mongo/util/log.h" namespace mongo { @@ -80,7 +80,7 @@ bool RunOnAllShardsCommand::run(OperationContext* opCtx, LOG(1) << "RunOnAllShardsCommand db: " << dbName << " cmd:" << redact(cmdObj); if (_implicitCreateDb) { - uassertStatusOK(ScopedShardDatabase::getOrCreate(opCtx, dbName)); + uassertStatusOK(createShardDatabase(opCtx, dbName)); } std::vector<ShardId> shardIds; diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp index 0182a091ab7..647e6601dfa 100644 --- a/src/mongo/s/commands/strategy.cpp +++ b/src/mongo/s/commands/strategy.cpp @@ -54,8 +54,6 @@ #include "mongo/rpc/metadata/server_selection_metadata.h" #include "mongo/rpc/metadata/tracking_metadata.h" #include "mongo/s/catalog_cache.h" -#include "mongo/s/chunk_manager.h" -#include "mongo/s/chunk_version.h" #include "mongo/s/client/parallel.h" #include "mongo/s/client/shard_connection.h" #include "mongo/s/client/shard_registry.h" @@ -365,11 +363,10 @@ void Strategy::clientCommandOp(OperationContext* opCtx, ShardConnection::checkMyConnectionVersions(opCtx, staleNS); if (loops < 4) { - // This throws out the entire database cache entry in response to - // StaleConfigException instead of just the collection which encountered it. There - // is no good reason for it other than the lack of lower-granularity cache - // invalidation. - Grid::get(opCtx)->catalogCache()->invalidate(NamespaceString(staleNS).db()); + const NamespaceString nss(staleNS); + if (nss.isValid()) { + Grid::get(opCtx)->catalogCache()->invalidateShardedCollection(nss); + } } } catch (const DBException& e) { OpQueryReplyBuilder reply; diff --git a/src/mongo/s/config.cpp b/src/mongo/s/config.cpp deleted file mode 100644 index f5aec193923..00000000000 --- a/src/mongo/s/config.cpp +++ /dev/null @@ -1,366 +0,0 @@ -/** - * Copyright (C) 2008-2015 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding - -#include "mongo/platform/basic.h" - -#include "mongo/s/config.h" - -#include <vector> - -#include "mongo/db/lasterror.h" -#include "mongo/db/operation_context.h" -#include "mongo/db/query/collation/collator_factory_interface.h" -#include "mongo/s/catalog/sharding_catalog_client.h" -#include "mongo/s/catalog/type_chunk.h" -#include "mongo/s/catalog/type_collection.h" -#include "mongo/s/catalog/type_database.h" -#include "mongo/s/catalog_cache.h" -#include "mongo/s/chunk_manager.h" -#include "mongo/s/chunk_version.h" -#include "mongo/s/grid.h" -#include "mongo/stdx/memory.h" -#include "mongo/util/log.h" - -namespace mongo { - -struct CollectionInfo { - // The config server opTime at which the chunk manager below was loaded - const repl::OpTime configOpTime; - - // The chunk manager - const std::shared_ptr<ChunkManager> cm; -}; - -DBConfig::DBConfig(const DatabaseType& dbt, repl::OpTime configOpTime) - : _name(dbt.getName()), - _shardingEnabled(dbt.getSharded()), - _primaryId(dbt.getPrimary()), - _configOpTime(std::move(configOpTime)) {} - -DBConfig::~DBConfig() = default; - -bool DBConfig::isSharded(const std::string& ns) { - stdx::lock_guard<stdx::mutex> lk(_lock); - - return _collections.count(ns) > 0; -} - -void DBConfig::markNSNotSharded(const std::string& ns) { - stdx::lock_guard<stdx::mutex> lk(_lock); - - CollectionInfoMap::iterator it = _collections.find(ns); - if (it != _collections.end()) { - _collections.erase(it); - } -} - -std::shared_ptr<ChunkManager> DBConfig::getChunkManagerIfExists(OperationContext* opCtx, - const std::string& ns, - bool shouldReload, - bool forceReload) { - // Don't report exceptions here as errors in GetLastError - LastError::Disabled ignoreForGLE(&LastError::get(cc())); - - try { - return getChunkManager(opCtx, ns, shouldReload, forceReload); - } catch (const DBException&) { - return nullptr; - } -} - -std::shared_ptr<ChunkManager> DBConfig::getChunkManager(OperationContext* opCtx, - const std::string& ns, - bool shouldReload, - bool forceReload) { - ChunkVersion oldVersion; - std::shared_ptr<ChunkManager> oldManager; - - { - stdx::lock_guard<stdx::mutex> lk(_lock); - - auto it = _collections.find(ns); - - const bool earlyReload = (it == _collections.end()) && (shouldReload || forceReload); - if (earlyReload) { - // This is to catch cases where there this is a new sharded collection. - // Note: read the _reloadCount inside the _lock mutex, so _loadIfNeeded will always - // be forced to perform a reload. - const auto currentReloadIteration = _reloadCount.load(); - _loadIfNeeded(opCtx, currentReloadIteration); - - it = _collections.find(ns); - } - - uassert(ErrorCodes::NamespaceNotSharded, - str::stream() << "Collection is not sharded: " << ns, - it != _collections.end()); - - const auto& ci = it->second; - - if (!(shouldReload || forceReload) || earlyReload) { - return ci.cm; - } - - if (ci.cm) { - oldManager = ci.cm; - oldVersion = ci.cm->getVersion(); - } - } - - // TODO: We need to keep this first one-chunk check in until we have a more efficient way of - // creating/reusing a chunk manager, as doing so requires copying the full set of chunks - // currently - std::vector<ChunkType> newestChunk; - if (oldVersion.isSet() && !forceReload) { - uassertStatusOK(Grid::get(opCtx)->catalogClient(opCtx)->getChunks( - opCtx, - BSON(ChunkType::ns(ns)), - BSON(ChunkType::DEPRECATED_lastmod() << -1), - 1, - &newestChunk, - nullptr, - repl::ReadConcernLevel::kMajorityReadConcern)); - - if (!newestChunk.empty()) { - invariant(newestChunk.size() == 1); - ChunkVersion v = newestChunk[0].getVersion(); - if (v.equals(oldVersion)) { - stdx::lock_guard<stdx::mutex> lk(_lock); - - auto it = _collections.find(ns); - uassert(15885, - str::stream() << "not sharded after reloading from chunks : " << ns, - it != _collections.end()); - - const auto& ci = it->second; - return ci.cm; - } - } - } else if (!oldVersion.isSet()) { - warning() << "version 0 found when " << (forceReload ? "reloading" : "checking") - << " chunk manager; collection '" << ns << "' initially detected as sharded"; - } - - std::unique_ptr<ChunkManager> tempChunkManager; - - { - stdx::lock_guard<stdx::mutex> lll(_hitConfigServerLock); - - if (!newestChunk.empty() && !forceReload) { - // If we have a target we're going for see if we've hit already - stdx::lock_guard<stdx::mutex> lk(_lock); - - auto it = _collections.find(ns); - - if (it != _collections.end()) { - const auto& ci = it->second; - - ChunkVersion currentVersion = newestChunk[0].getVersion(); - - // Only reload if the version we found is newer than our own in the same epoch - if (currentVersion <= ci.cm->getVersion() && - ci.cm->getVersion().hasEqualEpoch(currentVersion)) { - return ci.cm; - } - } - } - - // Reload the chunk manager outside of the DBConfig's mutex so as to not block operations - // for different collections on the same database - tempChunkManager.reset(new ChunkManager( - NamespaceString(oldManager->getns()), - oldManager->getVersion().epoch(), - oldManager->getShardKeyPattern(), - oldManager->getDefaultCollator() ? oldManager->getDefaultCollator()->clone() : nullptr, - oldManager->isUnique())); - tempChunkManager->loadExistingRanges(opCtx, oldManager.get()); - - if (!tempChunkManager->numChunks()) { - // Maybe we're not sharded any more, so do a full reload - const auto currentReloadIteration = _reloadCount.load(); - - const bool successful = [&]() { - stdx::lock_guard<stdx::mutex> lk(_lock); - return _loadIfNeeded(opCtx, currentReloadIteration); - }(); - - // If we aren't successful loading the database entry, we don't want to keep the stale - // object around which has invalid data. - if (!successful) { - Grid::get(opCtx)->catalogCache()->invalidate(_name); - } - - return getChunkManager(opCtx, ns); - } - } - - stdx::lock_guard<stdx::mutex> lk(_lock); - - auto it = _collections.find(ns); - uassert(14822, - str::stream() << "Collection " << ns << " became unsharded in the middle.", - it != _collections.end()); - - const auto& ci = it->second; - - // Reset if our versions aren't the same - bool shouldReset = !tempChunkManager->getVersion().equals(ci.cm->getVersion()); - - // Also reset if we're forced to do so - if (!shouldReset && forceReload) { - shouldReset = true; - warning() << "chunk manager reload forced for collection '" << ns << "', config version is " - << tempChunkManager->getVersion(); - } - - // - // LEGACY BEHAVIOR - // - // It's possible to get into a state when dropping collections when our new version is - // less than our prev version. Behave identically to legacy mongos, for now, and warn to - // draw attention to the problem. - // - // TODO: Assert in next version, to allow smooth upgrades - // - - if (shouldReset && tempChunkManager->getVersion() < ci.cm->getVersion()) { - shouldReset = false; - - warning() << "not resetting chunk manager for collection '" << ns << "', config version is " - << tempChunkManager->getVersion() << " and " - << "old version is " << ci.cm->getVersion(); - } - - // end legacy behavior - - if (shouldReset) { - const auto cmOpTime = tempChunkManager->getConfigOpTime(); - - // The existing ChunkManager could have been updated since we last checked, so replace the - // existing chunk manager only if it is strictly newer. - if (cmOpTime > ci.cm->getConfigOpTime()) { - _collections.erase(ns); - auto emplacedEntryIt = - _collections.emplace(ns, CollectionInfo{cmOpTime, std::move(tempChunkManager)}) - .first; - return emplacedEntryIt->second.cm; - } - } - - return ci.cm; -} - -bool DBConfig::load(OperationContext* opCtx) { - const auto currentReloadIteration = _reloadCount.load(); - stdx::lock_guard<stdx::mutex> lk(_lock); - return _loadIfNeeded(opCtx, currentReloadIteration); -} - -bool DBConfig::_loadIfNeeded(OperationContext* opCtx, Counter reloadIteration) { - if (reloadIteration != _reloadCount.load()) { - return true; - } - - const auto catalogClient = Grid::get(opCtx)->catalogClient(opCtx); - - auto status = catalogClient->getDatabase(opCtx, _name); - if (status == ErrorCodes::NamespaceNotFound) { - return false; - } - - // All other errors are connectivity, etc so throw an exception. - uassertStatusOK(status.getStatus()); - - const auto& dbOpTimePair = status.getValue(); - const auto& dbt = dbOpTimePair.value; - invariant(_name == dbt.getName()); - _primaryId = dbt.getPrimary(); - - invariant(dbOpTimePair.opTime >= _configOpTime); - _configOpTime = dbOpTimePair.opTime; - - // Load all collections - std::vector<CollectionType> collections; - repl::OpTime configOpTimeWhenLoadingColl; - uassertStatusOK( - catalogClient->getCollections(opCtx, &_name, &collections, &configOpTimeWhenLoadingColl)); - - invariant(configOpTimeWhenLoadingColl >= _configOpTime); - - for (const auto& coll : collections) { - auto collIter = _collections.find(coll.getNs().ns()); - if (collIter != _collections.end()) { - invariant(configOpTimeWhenLoadingColl >= collIter->second.configOpTime); - } - - _collections.erase(coll.getNs().ns()); - - if (!coll.getDropped()) { - std::unique_ptr<CollatorInterface> defaultCollator; - if (!coll.getDefaultCollation().isEmpty()) { - auto statusWithCollator = CollatorFactoryInterface::get(opCtx->getServiceContext()) - ->makeFromBSON(coll.getDefaultCollation()); - - // The collation was validated upon collection creation. - invariantOK(statusWithCollator.getStatus()); - - defaultCollator = std::move(statusWithCollator.getValue()); - } - - std::unique_ptr<ChunkManager> manager( - stdx::make_unique<ChunkManager>(coll.getNs(), - coll.getEpoch(), - ShardKeyPattern(coll.getKeyPattern()), - std::move(defaultCollator), - coll.getUnique())); - - // Do the blocking collection load - manager->loadExistingRanges(opCtx, nullptr); - - // Collections with no chunks are unsharded, no matter what the collections entry says - if (manager->numChunks()) { - _collections.emplace( - coll.getNs().ns(), - CollectionInfo{configOpTimeWhenLoadingColl, std::move(manager)}); - } - } - } - - _reloadCount.fetchAndAdd(1); - - return true; -} - -ShardId DBConfig::getPrimaryId() { - stdx::lock_guard<stdx::mutex> lk(_lock); - return _primaryId; -} - -} // namespace mongo diff --git a/src/mongo/s/config.h b/src/mongo/s/config.h deleted file mode 100644 index bbd63cf3b3b..00000000000 --- a/src/mongo/s/config.h +++ /dev/null @@ -1,146 +0,0 @@ -/** - * Copyright (C) 2008-2015 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include <string> - -#include "mongo/db/repl/optime.h" -#include "mongo/platform/atomic_word.h" -#include "mongo/s/shard_id.h" -#include "mongo/stdx/mutex.h" - -namespace mongo { - -class ChunkManager; -struct CollectionInfo; -class DatabaseType; -class OperationContext; - -/** - * Represents the cache entry for a database. - */ -class DBConfig { -public: - DBConfig(const DatabaseType& dbt, repl::OpTime configOpTime); - ~DBConfig(); - - /** - * The name of the database which this entry caches. - */ - const std::string& name() const { - return _name; - } - - ShardId getPrimaryId(); - - /** - * Returns whether 'enableSharding' has been called for this database. - */ - bool isShardingEnabled() const { - return _shardingEnabled; - } - - /** - * Removes the specified namespace from the set of collections under this database entry so that - * from then onwards it will be treated as unsharded. - * - * Note that this method doesn't do any writes to the config metadata, but simply drops the - * specified namespace from the cache. - */ - void markNSNotSharded(const std::string& ns); - - /** - * @return whether or not the 'ns' collection is partitioned - */ - bool isSharded(const std::string& ns); - - std::shared_ptr<ChunkManager> getChunkManager(OperationContext* opCtx, - const std::string& ns, - bool reload = false, - bool forceReload = false); - std::shared_ptr<ChunkManager> getChunkManagerIfExists(OperationContext* opCtx, - const std::string& ns, - bool reload = false, - bool forceReload = false); - - /** - * Returns true if it is successful at loading the DBConfig, false if the database is not found, - * and throws on all other errors. - */ - bool load(OperationContext* opCtx); - -protected: - typedef std::map<std::string, CollectionInfo> CollectionInfoMap; - typedef AtomicUInt64::WordType Counter; - - /** - * Returns true if it is successful at loading the DBConfig, false if the database is not found, - * and throws on all other errors. - * Also returns true without reloading if reloadIteration is not equal to the _reloadCount. - * This is to avoid multiple threads attempting to reload do duplicate work. - */ - bool _loadIfNeeded(OperationContext* opCtx, Counter reloadIteration); - - // All member variables are labeled with one of the following codes indicating the - // synchronization rules for accessing them. - // - // (L) Must hold _lock for access. - // (S) Self synchronizing, no explicit locking needed. - // - // Mutex lock order: - // _hitConfigServerLock -> _lock - // - - // Name of the database which this entry caches - const std::string _name; - - // Whether sharding is enabled for this database - const bool _shardingEnabled; - - // Primary shard id - ShardId _primaryId; // (L) - - // Set of collections and lock to protect access - stdx::mutex _lock; - CollectionInfoMap _collections; // (L) - - // OpTime of config server when the database definition was loaded. - repl::OpTime _configOpTime; // (L) - - // Ensures that only one thread at a time loads collection configuration data from - // the config server - stdx::mutex _hitConfigServerLock; - - // Increments every time this performs a full reload. Since a full reload can take a very - // long time for very large clusters, this can be used to minimize duplicate work when multiple - // threads tries to perform full rerload at roughly the same time. - AtomicUInt64 _reloadCount; // (S) -}; - -} // namespace mongo diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp index d944954635a..3d4c384c506 100644 --- a/src/mongo/s/query/cluster_find.cpp +++ b/src/mongo/s/query/cluster_find.cpp @@ -52,7 +52,6 @@ #include "mongo/s/query/cluster_client_cursor_impl.h" #include "mongo/s/query/cluster_cursor_manager.h" #include "mongo/s/query/store_possible_cursor.h" -#include "mongo/s/sharding_raii.h" #include "mongo/s/stale_exception.h" #include "mongo/stdx/memory.h" #include "mongo/util/fail_point_service.h" @@ -319,31 +318,34 @@ StatusWith<CursorId> ClusterFind::runQuery(OperationContext* opCtx, << query.getQueryRequest().getProj()}; } + auto const catalogCache = Grid::get(opCtx)->catalogCache(); + // Re-target and re-send the initial find command to the shards until we have established the // shard version. for (size_t retries = 1; retries <= kMaxStaleConfigRetries; ++retries) { - auto scopedCMStatus = ScopedChunkManager::get(opCtx, query.nss()); - if (scopedCMStatus == ErrorCodes::NamespaceNotFound) { + auto routingInfoStatus = catalogCache->getCollectionRoutingInfo(opCtx, query.nss()); + if (routingInfoStatus == ErrorCodes::NamespaceNotFound) { // If the database doesn't exist, we successfully return an empty result set without // creating a cursor. return CursorId(0); - } else if (!scopedCMStatus.isOK()) { - return scopedCMStatus.getStatus(); + } else if (!routingInfoStatus.isOK()) { + return routingInfoStatus.getStatus(); } - const auto& scopedCM = scopedCMStatus.getValue(); + auto& routingInfo = routingInfoStatus.getValue(); auto cursorId = runQueryWithoutRetrying(opCtx, query, readPref, - scopedCM.cm().get(), - scopedCM.primary(), + routingInfo.cm().get(), + routingInfo.primary(), results, viewDefinition); if (cursorId.isOK()) { return cursorId; } - auto status = std::move(cursorId.getStatus()); + + const auto& status = cursorId.getStatus(); if (!ErrorCodes::isStaleShardingError(status.code()) && status != ErrorCodes::ShardNotFound) { @@ -357,11 +359,7 @@ StatusWith<CursorId> ClusterFind::runQuery(OperationContext* opCtx, << " on attempt " << retries << " of " << kMaxStaleConfigRetries << ": " << redact(status); - if (status == ErrorCodes::StaleEpoch) { - Grid::get(opCtx)->catalogCache()->invalidate(query.nss().db().toString()); - } else { - scopedCM.db()->getChunkManagerIfExists(opCtx, query.nss().ns(), true); - } + catalogCache->onStaleConfigError(std::move(routingInfo)); } return {ErrorCodes::StaleShardVersion, diff --git a/src/mongo/s/sharding_raii.cpp b/src/mongo/s/sharding_raii.cpp deleted file mode 100644 index b90f975ed35..00000000000 --- a/src/mongo/s/sharding_raii.cpp +++ /dev/null @@ -1,159 +0,0 @@ -/** - * Copyright (C) 2016 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#include "mongo/platform/basic.h" - -#include "mongo/s/sharding_raii.h" - -#include "mongo/base/status_with.h" -#include "mongo/s/catalog/sharding_catalog_client.h" -#include "mongo/s/catalog_cache.h" -#include "mongo/s/chunk_manager.h" -#include "mongo/s/client/shard_registry.h" -#include "mongo/s/grid.h" - -namespace mongo { - -using std::shared_ptr; - -ScopedShardDatabase::ScopedShardDatabase(std::shared_ptr<DBConfig> db) : _db(db) { - invariant(_db); -} - -ScopedShardDatabase::~ScopedShardDatabase() = default; - -StatusWith<ScopedShardDatabase> ScopedShardDatabase::getExisting(OperationContext* opCtx, - StringData dbName) { - auto dbStatus = Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, dbName.toString()); - if (!dbStatus.isOK()) { - return {dbStatus.getStatus().code(), - str::stream() << "Database " << dbName << " was not found due to " - << dbStatus.getStatus().toString()}; - } - - return {ScopedShardDatabase(std::move(dbStatus.getValue()))}; -} - -StatusWith<ScopedShardDatabase> ScopedShardDatabase::getOrCreate(OperationContext* opCtx, - StringData dbName) { - auto dbStatus = getExisting(opCtx, dbName); - if (dbStatus.isOK()) { - return dbStatus; - } - - if (dbStatus == ErrorCodes::NamespaceNotFound) { - auto statusCreateDb = - Grid::get(opCtx)->catalogClient(opCtx)->createDatabase(opCtx, dbName.toString()); - if (statusCreateDb.isOK() || statusCreateDb == ErrorCodes::NamespaceExists) { - return getExisting(opCtx, dbName); - } - - return statusCreateDb; - } - - return dbStatus.getStatus(); -} - -ScopedChunkManager::ScopedChunkManager(ScopedShardDatabase db, std::shared_ptr<ChunkManager> cm) - : _db(std::move(db)), _cm(std::move(cm)) {} - -ScopedChunkManager::ScopedChunkManager(ScopedShardDatabase db, std::shared_ptr<Shard> primary) - : _db(std::move(db)), _primary(std::move(primary)) {} - -ScopedChunkManager::~ScopedChunkManager() = default; - -StatusWith<ScopedChunkManager> ScopedChunkManager::get(OperationContext* opCtx, - const NamespaceString& nss) { - auto scopedDbStatus = ScopedShardDatabase::getExisting(opCtx, nss.db()); - if (!scopedDbStatus.isOK()) { - return scopedDbStatus.getStatus(); - } - - auto scopedDb = std::move(scopedDbStatus.getValue()); - - auto cm = scopedDb.db()->getChunkManagerIfExists(opCtx, nss.ns()); - if (cm) { - return {ScopedChunkManager(std::move(scopedDb), std::move(cm))}; - } - - auto shardStatus = - Grid::get(opCtx)->shardRegistry()->getShard(opCtx, scopedDb.db()->getPrimaryId()); - if (!shardStatus.isOK()) { - return {ErrorCodes::fromInt(40371), - str::stream() << "The primary shard for collection " << nss.ns() - << " could not be loaded due to error " - << shardStatus.getStatus().toString()}; - } - - return {ScopedChunkManager(std::move(scopedDb), std::move(shardStatus.getValue()))}; -} - -StatusWith<ScopedChunkManager> ScopedChunkManager::getOrCreate(OperationContext* opCtx, - const NamespaceString& nss) { - auto scopedDbStatus = ScopedShardDatabase::getOrCreate(opCtx, nss.db()); - if (!scopedDbStatus.isOK()) { - return scopedDbStatus.getStatus(); - } - - return ScopedChunkManager::get(opCtx, nss); -} - -StatusWith<ScopedChunkManager> ScopedChunkManager::refreshAndGet(OperationContext* opCtx, - const NamespaceString& nss) { - auto scopedDbStatus = ScopedShardDatabase::getExisting(opCtx, nss.db()); - if (!scopedDbStatus.isOK()) { - return scopedDbStatus.getStatus(); - } - - auto scopedDb = std::move(scopedDbStatus.getValue()); - - try { - std::shared_ptr<ChunkManager> cm = - scopedDb.db()->getChunkManager(opCtx, nss.ns(), true, false); - - if (!cm) { - return {ErrorCodes::NamespaceNotSharded, - str::stream() << "Collection " << nss.ns() - << " does not exist or is not sharded."}; - } - - if (cm->getChunkMap().empty()) { - return {ErrorCodes::NamespaceNotSharded, - str::stream() << "Collection " << nss.ns() - << " is marked as sharded, but does not have any chunks. This " - "most likely indicates a corrupted metadata or " - "partially completed 'shardCollection' command."}; - } - - return {ScopedChunkManager(std::move(scopedDb), std::move(cm))}; - } catch (const AssertionException& e) { - return e.toStatus(); - } -} - -} // namespace mongo diff --git a/src/mongo/s/sharding_raii.h b/src/mongo/s/sharding_raii.h deleted file mode 100644 index 0c54f281985..00000000000 --- a/src/mongo/s/sharding_raii.h +++ /dev/null @@ -1,152 +0,0 @@ -/** - * Copyright (C) 2016 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include "mongo/base/disallow_copying.h" -#include "mongo/s/chunk_manager.h" -#include "mongo/s/config.h" - -namespace mongo { - -class OperationContext; - -class ScopedShardDatabase { - MONGO_DISALLOW_COPYING(ScopedShardDatabase); - -public: - ScopedShardDatabase(ScopedShardDatabase&&) = default; - ~ScopedShardDatabase(); - - /** - * Ensures that the specified database exists in the cache and if it does, returns it. - * Otherwise, either returns NamespaceNotFound if the database does not exist, or any other - * error code indicating why the database could not be loaded. - */ - static StatusWith<ScopedShardDatabase> getExisting(OperationContext* opCtx, StringData dbName); - - /** - * If the specified database exists already, loads it in the cache (if not already there) and - * returns it. Otherwise, if it does not exis, this call will implicitly create it as - * non-sharded. - */ - static StatusWith<ScopedShardDatabase> getOrCreate(OperationContext* opCtx, StringData dbName); - - /** - * Returns the underlying database cache entry. - */ - DBConfig* db() const { - return _db.get(); - } - - /** - * This method is here only for compatibility with the legacy M/R code, which requires a shared - * reference to the underlying database. It should not be used in new code. - */ - std::shared_ptr<DBConfig> getSharedDbReference() const { - return _db; - } - -private: - explicit ScopedShardDatabase(std::shared_ptr<DBConfig> db); - - // Reference to the corresponding database. Never null. - std::shared_ptr<DBConfig> _db; -}; - -class ScopedChunkManager { - MONGO_DISALLOW_COPYING(ScopedChunkManager); - -public: - ScopedChunkManager(ScopedChunkManager&&) = default; - ~ScopedChunkManager(); - - /** - * If the specified namespace is sharded, returns a ScopedChunkManager initialized with that - * collection's routing information. If it is not, the object returned is initialized with the - * database primary node on which the unsharded collection must reside. - * - * Returns NamespaceNotFound if the database does not exist, or any other error indicating - * problem communicating with the config server. - */ - static StatusWith<ScopedChunkManager> get(OperationContext* opCtx, const NamespaceString& nss); - - /** - * If the database holding the specified namespace does not exist, creates it and then behaves - * like the 'get' method above. - */ - static StatusWith<ScopedChunkManager> getOrCreate(OperationContext* opCtx, - const NamespaceString& nss); - - /** - * If the specified database and collection do not exist in the cache, tries to load them from - * the config server and returns a reference. If they are already in the cache, makes a call to - * the config server to check if there are any incremental updates to the collection chunk - * metadata and if so incorporates those. Otherwise, if it does not exist or any other error - * occurs, passes that error back. - */ - static StatusWith<ScopedChunkManager> refreshAndGet(OperationContext* opCtx, - const NamespaceString& nss); - - /** - * Returns the underlying database for which we hold reference. - */ - DBConfig* db() const { - return _db.db(); - } - - /** - * If the collection is sharded, returns a chunk manager for it. Otherwise, nullptr. - */ - std::shared_ptr<ChunkManager> cm() const { - return _cm; - } - - /** - * If the collection is not sharded, returns its primary shard. Otherwise, nullptr. - */ - std::shared_ptr<Shard> primary() const { - return _primary; - } - -private: - ScopedChunkManager(ScopedShardDatabase db, std::shared_ptr<ChunkManager> cm); - - ScopedChunkManager(ScopedShardDatabase db, std::shared_ptr<Shard> primary); - - // Scoped reference to the owning database. - ScopedShardDatabase _db; - - // Reference to the corresponding chunk manager (if sharded) or null - std::shared_ptr<ChunkManager> _cm; - - // Reference to the primary of the database (if not sharded) or null - std::shared_ptr<Shard> _primary; -}; - -} // namespace mongo |