diff options
-rw-r--r-- | jstests/sharding/database_and_shard_versioning_all_commands.js | 10 | ||||
-rw-r--r-- | src/mongo/db/commands.h | 1 | ||||
-rw-r--r-- | src/mongo/s/catalog_cache.cpp | 42 | ||||
-rw-r--r-- | src/mongo/s/catalog_cache.h | 49 | ||||
-rw-r--r-- | src/mongo/s/catalog_cache_test_fixture.cpp | 2 | ||||
-rw-r--r-- | src/mongo/s/client/parallel.cpp | 6 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_commands_helpers.cpp | 27 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_find.cpp | 8 |
8 files changed, 84 insertions, 61 deletions
diff --git a/jstests/sharding/database_and_shard_versioning_all_commands.js b/jstests/sharding/database_and_shard_versioning_all_commands.js index 2f01bf66f73..559874cb353 100644 --- a/jstests/sharding/database_and_shard_versioning_all_commands.js +++ b/jstests/sharding/database_and_shard_versioning_all_commands.js @@ -62,7 +62,7 @@ buildInfo: {skip: "executes locally on mongos (not sent to any remote node)"}, clearLog: {skip: "executes locally on mongos (not sent to any remote node)"}, collMod: { - sendsDbVersion: false, + sendsDbVersion: true, sendsShardVersion: true, setUp: function(mongosConn) { // Expects the collection to exist, and doesn't implicitly create it. @@ -107,7 +107,7 @@ copydb: {skip: "Not captured by the profiler; will be tested separately (TODO SERVER-33429)"}, count: { - sendsDbVersion: false, + sendsDbVersion: true, sendsShardVersion: true, command: {count: collName, query: {x: 1}}, }, @@ -154,7 +154,7 @@ command: {delete: collName, deletes: [{q: {_id: 1}, limit: 1}]} }, distinct: { - sendsDbVersion: false, + sendsDbVersion: true, sendsShardVersion: true, command: {distinct: collName, key: "x"}, }, @@ -164,7 +164,7 @@ dropDatabase: {skip: "Not captured by the profiler; will be tested separately (TODO SERVER-33429)"}, dropIndexes: { - sendsDbVersion: false, + sendsDbVersion: true, sendsShardVersion: true, setUp: function(mongosConn) { // Expects the collection to exist, and doesn't implicitly create it. @@ -366,7 +366,7 @@ // Expects the collection to exist, and doesn't implicitly create it. assert.commandWorked(mongosConn.getDB(dbName).runCommand({create: collName})); }, - sendsDbVersion: false, + sendsDbVersion: true, sendsShardVersion: true, command: {reIndex: collName}, cleanUp: function(mongosConn) { diff --git a/src/mongo/db/commands.h b/src/mongo/db/commands.h index 805879b08d2..e9516c75f29 100644 --- a/src/mongo/db/commands.h +++ b/src/mongo/db/commands.h @@ -128,6 +128,7 @@ struct CommandHelpers { arg == "$clusterTime" || // arg == "maxTimeMS" || // arg == "readConcern" || // + arg == "databaseVersion" || // arg == "shardVersion" || // arg == "tracking_info" || // arg == "writeConcern" || // diff --git a/src/mongo/s/catalog_cache.cpp b/src/mongo/s/catalog_cache.cpp index 34e346af58b..85d454b702e 100644 --- a/src/mongo/s/catalog_cache.cpp +++ b/src/mongo/s/catalog_cache.cpp @@ -120,7 +120,10 @@ CatalogCache::~CatalogCache() = default; StatusWith<CachedDatabaseInfo> CatalogCache::getDatabase(OperationContext* opCtx, StringData dbName) { try { - return {CachedDatabaseInfo(_getDatabase(opCtx, dbName))}; + auto dbEntry = _getDatabase(opCtx, dbName); + auto primaryShard = uassertStatusOK( + Grid::get(opCtx)->shardRegistry()->getShard(opCtx, dbEntry->primaryShardId)); + return {CachedDatabaseInfo(std::move(dbEntry), std::move(primaryShard))}; } catch (const DBException& ex) { return ex.toStatus(); } @@ -148,17 +151,12 @@ StatusWith<CachedCollectionRoutingInfo> CatalogCache::getCollectionRoutingInfo( auto it = collections.find(nss.ns()); if (it == collections.end()) { - auto shardStatus = - Grid::get(opCtx)->shardRegistry()->getShard(opCtx, dbEntry->primaryShardId); - if (!shardStatus.isOK()) { - return {ErrorCodes::Error(40371), - str::stream() << "The primary shard for collection " << nss.ns() - << " could not be loaded due to error " - << shardStatus.getStatus().toString()}; - } - return {CachedCollectionRoutingInfo( - dbEntry->primaryShardId, nss, std::move(shardStatus.getValue()))}; + nss, + {dbEntry, + uassertStatusOK( + Grid::get(opCtx)->shardRegistry()->getShard(opCtx, dbEntry->primaryShardId))}, + nullptr)}; } auto& collEntry = it->second; @@ -199,7 +197,12 @@ StatusWith<CachedCollectionRoutingInfo> CatalogCache::getCollectionRoutingInfo( continue; } - return {CachedCollectionRoutingInfo(dbEntry->primaryShardId, collEntry.routingInfo)}; + return {CachedCollectionRoutingInfo( + nss, + {dbEntry, + uassertStatusOK( + Grid::get(opCtx)->shardRegistry()->getShard(opCtx, dbEntry->primaryShardId))}, + collEntry.routingInfo)}; } } @@ -476,8 +479,9 @@ void CatalogCache::Stats::report(BSONObjBuilder* builder) const { builder->append("countFailedRefreshes", countFailedRefreshes.load()); } -CachedDatabaseInfo::CachedDatabaseInfo(std::shared_ptr<CatalogCache::DatabaseInfoEntry> db) - : _db(std::move(db)) {} +CachedDatabaseInfo::CachedDatabaseInfo(std::shared_ptr<CatalogCache::DatabaseInfoEntry> db, + std::shared_ptr<Shard> primaryShard) + : _db(std::move(db)), _primaryShard(std::move(primaryShard)) {} const ShardId& CachedDatabaseInfo::primaryId() const { return _db->primaryShardId; @@ -491,13 +495,9 @@ boost::optional<DatabaseVersion> CachedDatabaseInfo::databaseVersion() const { return _db->databaseVersion; } -CachedCollectionRoutingInfo::CachedCollectionRoutingInfo(ShardId primaryId, +CachedCollectionRoutingInfo::CachedCollectionRoutingInfo(NamespaceString nss, + CachedDatabaseInfo db, std::shared_ptr<ChunkManager> cm) - : _primaryId(std::move(primaryId)), _cm(std::move(cm)) {} - -CachedCollectionRoutingInfo::CachedCollectionRoutingInfo(ShardId primaryId, - NamespaceString nss, - std::shared_ptr<Shard> primary) - : _primaryId(std::move(primaryId)), _nss(std::move(nss)), _primary(std::move(primary)) {} + : _nss(std::move(nss)), _db(std::move(db)), _cm(std::move(cm)) {} } // namespace mongo diff --git a/src/mongo/s/catalog_cache.h b/src/mongo/s/catalog_cache.h index b4b93eced47..bd857a8b4b8 100644 --- a/src/mongo/s/catalog_cache.h +++ b/src/mongo/s/catalog_cache.h @@ -246,6 +246,9 @@ private: class CachedDatabaseInfo { public: const ShardId& primaryId() const; + std::shared_ptr<Shard> primary() const { + return _primaryShard; + }; bool shardingEnabled() const; @@ -254,9 +257,11 @@ public: private: friend class CatalogCache; - CachedDatabaseInfo(std::shared_ptr<CatalogCache::DatabaseInfoEntry> db); + CachedDatabaseInfo(std::shared_ptr<CatalogCache::DatabaseInfoEntry> db, + std::shared_ptr<Shard> primaryShard); std::shared_ptr<CatalogCache::DatabaseInfoEntry> _db; + std::shared_ptr<Shard> _primaryShard; }; /** @@ -266,12 +271,19 @@ private: class CachedCollectionRoutingInfo { public: /** - * Returns the ID of the primary shard for the database owining this collection, regardless of - * whether it is sharded or not. + * These serve the same purpose: to route to the primary shard for the collection's database. + * Paths that have been updated to attach a databaseVersion use db(). Once all paths have been + * updated, primaryId() and primary() can be deleted. */ const ShardId& primaryId() const { - return _primaryId; - } + return _db.primaryId(); + }; + std::shared_ptr<Shard> primary() const { + return _db.primary(); + }; + CachedDatabaseInfo db() const { + return _db; + }; /** * If the collection is sharded, returns a chunk manager for it. Otherwise, nullptr. @@ -280,31 +292,22 @@ public: return _cm; } - /** - * If the collection is not sharded, returns its primary shard. Otherwise, nullptr. - */ - std::shared_ptr<Shard> primary() const { - return _primary; - } - private: friend class CatalogCache; + friend class CachedDatabaseInfo; - CachedCollectionRoutingInfo(ShardId primaryId, std::shared_ptr<ChunkManager> cm); + CachedCollectionRoutingInfo(NamespaceString nss, + CachedDatabaseInfo db, + std::shared_ptr<ChunkManager> cm); - CachedCollectionRoutingInfo(ShardId primaryId, - NamespaceString nss, - std::shared_ptr<Shard> primary); + NamespaceString _nss; - // The id of the primary shard containing the database - ShardId _primaryId; + // Copy of the database's cached info. + CachedDatabaseInfo _db; - // Reference to the corresponding chunk manager (if sharded) or null + // Shared reference to the collection's cached chunk distribution if sharded, otherwise null. + // This is a shared reference rather than a copy because the chunk distribution can be large. std::shared_ptr<ChunkManager> _cm; - - // Reference to the primary of the database (if not sharded) or null - NamespaceString _nss; - std::shared_ptr<Shard> _primary; }; } // namespace mongo diff --git a/src/mongo/s/catalog_cache_test_fixture.cpp b/src/mongo/s/catalog_cache_test_fixture.cpp index 4fcb9803907..63598b1d7dc 100644 --- a/src/mongo/s/catalog_cache_test_fixture.cpp +++ b/src/mongo/s/catalog_cache_test_fixture.cpp @@ -145,7 +145,7 @@ std::shared_ptr<ChunkManager> CatalogCacheTestFixture::makeChunkManager( auto routingInfo = future.timed_get(kFutureTimeout); ASSERT(routingInfo->cm()); - ASSERT(!routingInfo->primary()); + ASSERT(routingInfo->primary()); return routingInfo->cm(); } diff --git a/src/mongo/s/client/parallel.cpp b/src/mongo/s/client/parallel.cpp index 0c35d6f5c68..04fee51c514 100644 --- a/src/mongo/s/client/parallel.cpp +++ b/src/mongo/s/client/parallel.cpp @@ -436,7 +436,11 @@ void ParallelSortClusteredCursor::startInit(OperationContext* opCtx) { if (routingInfoStatus != ErrorCodes::NamespaceNotFound) { auto routingInfo = uassertStatusOK(std::move(routingInfoStatus)); manager = routingInfo.cm(); - primary = routingInfo.primary(); + // ParallelSortClusteredCursor has two states - either !cm && primary, which means + // unsharded collection, or cm && !primary, which means sharded collection. + if (!manager) { + primary = routingInfo.primary(); + } } } diff --git a/src/mongo/s/commands/cluster_commands_helpers.cpp b/src/mongo/s/commands/cluster_commands_helpers.cpp index 50e170fec57..6fe459226eb 100644 --- a/src/mongo/s/commands/cluster_commands_helpers.cpp +++ b/src/mongo/s/commands/cluster_commands_helpers.cpp @@ -70,6 +70,12 @@ void appendWriteConcernErrorToCmdResponse(const ShardId& shardId, namespace { +BSONObj appendDbVersion(BSONObj cmdObj, DatabaseVersion version) { + BSONObjBuilder cmdWithVersionBob(std::move(cmdObj)); + cmdWithVersionBob.append("databaseVersion", version.toBSON()); + return cmdWithVersionBob.obj(); +} + std::vector<AsyncRequestsSender::Request> buildUnversionedRequestsForAllShards( OperationContext* opCtx, const BSONObj& cmdObj) { std::vector<AsyncRequestsSender::Request> requests; @@ -98,11 +104,22 @@ std::vector<AsyncRequestsSender::Request> buildVersionedRequestsForTargetedShard } } else { // The collection is unsharded. Target only the primary shard for the database. - // Don't append shard version info when contacting the config servers. - requests.emplace_back(routingInfo.primaryId(), - !routingInfo.primary()->isConfig() - ? appendShardVersion(cmdObj, ChunkVersion::UNSHARDED()) - : cmdObj); + + // Attach shardVersion "UNSHARDED", unless targeting the config server. + const auto cmdObjWithShardVersion = (routingInfo.db().primaryId() != "config") + ? appendShardVersion(cmdObj, ChunkVersion::UNSHARDED()) + : cmdObj; + + // Attach the databaseVersion if we have one cached for the database. + // TODO: After 4.0 is released, require the routingInfo to have a databaseVersion for all + // databases besides "config" and "admin" (whose primary shard cannot be changed). + // (In v4.0, if the cluster is in fcv=3.6, we may not have a databaseVersion cached for any + // database). + const auto cmdObjWithShardVersionAndDbVersion = routingInfo.db().databaseVersion() + ? appendDbVersion(cmdObjWithShardVersion, *routingInfo.db().databaseVersion()) + : cmdObjWithShardVersion; + + requests.emplace_back(routingInfo.db().primaryId(), cmdObjWithShardVersionAndDbVersion); } return requests; } diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp index 2a2521ef4fa..91c58187615 100644 --- a/src/mongo/s/query/cluster_find.cpp +++ b/src/mongo/s/query/cluster_find.cpp @@ -174,11 +174,7 @@ CursorId runQueryWithoutRetrying(OperationContext* opCtx, // Get the set of shards on which we will run the query. std::vector<std::shared_ptr<Shard>> shards; - if (primary) { - shards.emplace_back(std::move(primary)); - } else { - invariant(chunkManager); - + if (chunkManager) { std::set<ShardId> shardIds; chunkManager->getShardIdsForQuery(opCtx, query.getQueryRequest().getFilter(), @@ -188,6 +184,8 @@ CursorId runQueryWithoutRetrying(OperationContext* opCtx, for (auto id : shardIds) { shards.emplace_back(uassertStatusOK(shardRegistry->getShard(opCtx, id))); } + } else { + shards.emplace_back(std::move(primary)); } // Construct the query and parameters. |