diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2017-03-12 17:27:34 -0400 |
---|---|---|
committer | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2017-03-12 17:27:34 -0400 |
commit | 8125c55a251805899552d0af4776930216223703 (patch) | |
tree | 3e1a43aa762a2136422b4fa66f70ee89222d29a1 | |
parent | ae2518adace4ba7ed6a16eba6943bff6ea4ade10 (diff) | |
download | mongo-8125c55a251805899552d0af4776930216223703.tar.gz |
Revert "SERVER-22611 Sharding catalog cache refactor"
This reverts commit ae2518adace4ba7ed6a16eba6943bff6ea4ade10.
62 files changed, 2601 insertions, 2833 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharding.yml b/buildscripts/resmokeconfig/suites/sharding.yml index df402c2569e..632c5a4c3fc 100644 --- a/buildscripts/resmokeconfig/suites/sharding.yml +++ b/buildscripts/resmokeconfig/suites/sharding.yml @@ -3,6 +3,8 @@ selector: roots: - jstests/sharding/*.js exclude_files: + # TODO: Enable when SERVER-22672 is complete + - jstests/sharding/printShardingStatus.js executor: js_test: diff --git a/buildscripts/resmokeconfig/suites/sharding_auth.yml b/buildscripts/resmokeconfig/suites/sharding_auth.yml index 530e8442962..915e64ae523 100644 --- a/buildscripts/resmokeconfig/suites/sharding_auth.yml +++ b/buildscripts/resmokeconfig/suites/sharding_auth.yml @@ -26,6 +26,8 @@ selector: - jstests/sharding/migration_with_source_ops.js # SERVER-21713 - jstests/sharding/movechunk_interrupt_at_primary_stepdown.js # SERVER-21713 - jstests/sharding/movechunk_parallel.js # SERVER-21713 + # TODO: Enable when SERVER-22672 is complete + - jstests/sharding/printShardingStatus.js executor: js_test: diff --git a/buildscripts/resmokeconfig/suites/sharding_auth_audit.yml b/buildscripts/resmokeconfig/suites/sharding_auth_audit.yml index 4e8deecb1d1..d3a9c3abba0 100644 --- a/buildscripts/resmokeconfig/suites/sharding_auth_audit.yml +++ b/buildscripts/resmokeconfig/suites/sharding_auth_audit.yml @@ -26,6 +26,8 @@ selector: - jstests/sharding/movechunk_parallel.js # SERVER-21713 - jstests/sharding/migration_server_status.js # SERVER-21713 - jstests/sharding/migration_move_chunk_after_receive.js # SERVER-21713 + # TODO: Enable when SERVER-22672 is complete + - jstests/sharding/printShardingStatus.js executor: js_test: diff --git a/buildscripts/resmokeconfig/suites/sharding_ese.yml b/buildscripts/resmokeconfig/suites/sharding_ese.yml index 90b493d0e02..085aba317f2 100644 --- a/buildscripts/resmokeconfig/suites/sharding_ese.yml +++ b/buildscripts/resmokeconfig/suites/sharding_ese.yml @@ -7,6 +7,8 @@ selector: roots: - jstests/sharding/*.js exclude_files: + # TODO: Enable when SERVER-22672 is complete + - jstests/sharding/printShardingStatus.js executor: js_test: diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml index 149ea021dc3..e7426070dfc 100644 --- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml +++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml @@ -5,6 +5,8 @@ selector: exclude_files: # Doesn't use ShardingTest so won't actually be run in a mixed version configuration - jstests/sharding/config_version_rollback.js + # TODO: Enable when SERVER-22672 is complete + - jstests/sharding/printShardingStatus.js # Behavior change to addShard - jstests/sharding/addshard_idempotent.js # SERVER-27438: Preserves $comment in OP_QUERY and tests for this change. diff --git a/jstests/core/counta.js b/jstests/core/counta.js index a554943483b..26ce4d2269e 100644 --- a/jstests/core/counta.js +++ b/jstests/core/counta.js @@ -23,5 +23,7 @@ }); // count must return error if collection name is absent - assert.commandFailedWithCode(db.runCommand("count"), ErrorCodes.InvalidNamespace); + var res = assert.commandFailed(db.runCommand("count")); + assert.eq(ErrorCodes.InvalidNamespace, res.code); + })(); diff --git a/jstests/sharding/coll_epoch_test1.js b/jstests/sharding/coll_epoch_test1.js index 2cc7c26c60b..b3cca85d495 100644 --- a/jstests/sharding/coll_epoch_test1.js +++ b/jstests/sharding/coll_epoch_test1.js @@ -21,7 +21,7 @@ // // Test that inserts and queries go to the correct shard even when the collection has been - // sharded from another mongos + // sharded in the background // jsTest.log("Enabling sharding for the first time..."); @@ -44,7 +44,7 @@ // // Test that inserts and queries go to the correct shard even when the collection has been - // resharded from another mongos, with a different key + // re-sharded in the background // jsTest.log("Re-enabling sharding with a different key..."); @@ -65,10 +65,24 @@ // // Test that inserts and queries go to the correct shard even when the collection has been - // unsharded from another mongos + // unsharded and moved to a different primary // - jsTest.log("Re-creating unsharded collection from a sharded collection..."); + jsTest.log( + "Re-creating unsharded collection from a sharded collection on different primary..."); + + var getOtherShard = function(shard) { + for (var id in shards) { + if (shards[id] != shard) + return shards[id]; + } + }; + + var otherShard = getOtherShard(config.databases.findOne({_id: coll.getDB() + ""}).primary); + assert.commandWorked(admin.runCommand({movePrimary: coll.getDB() + "", to: otherShard})); + st.configRS.awaitLastOpCommitted(); // TODO: Remove after collection lifecyle project (PM-85) + + jsTest.log("moved primary..."); bulk = insertMongos.getCollection(coll + "").initializeUnorderedBulkOp(); for (var i = 0; i < 100; i++) { @@ -82,19 +96,13 @@ assert(coll.drop()); // - // Test that inserts and queries go to correct shard even when the collection has been unsharded - // and resharded from another mongos on a different primary + // Test that inserts and queries go to correct shard even when the collection has been + // unsharded, + // resharded, and moved to a different primary // jsTest.log("Re-creating sharded collection with different primary..."); - var getOtherShard = function(shard) { - for (var id in shards) { - if (shards[id] != shard) - return shards[id]; - } - }; - assert.commandWorked(admin.runCommand({ movePrimary: coll.getDB() + "", to: getOtherShard(config.databases.findOne({_id: coll.getDB() + ""}).primary) diff --git a/jstests/sharding/find_and_modify_after_multi_write.js b/jstests/sharding/find_and_modify_after_multi_write.js index 15f54120706..004fe8d8ead 100644 --- a/jstests/sharding/find_and_modify_after_multi_write.js +++ b/jstests/sharding/find_and_modify_after_multi_write.js @@ -9,6 +9,7 @@ var st = new ShardingTest({shards: 2, mongos: 2}); var testDB = st.s.getDB('test'); + testDB.dropDatabase(); assert.commandWorked(testDB.adminCommand({enableSharding: 'test'})); st.ensurePrimaryShard('test', 'shard0000'); diff --git a/jstests/sharding/mongos_validate_backoff.js b/jstests/sharding/mongos_validate_backoff.js new file mode 100644 index 00000000000..f78dae0677e --- /dev/null +++ b/jstests/sharding/mongos_validate_backoff.js @@ -0,0 +1,60 @@ +// Ensures that single mongos shard-key errors are fast, but slow down when many are triggered +(function() { + 'use strict'; + + var st = new ShardingTest({shards: 1, mongos: 1}); + + var mongos = st.s0; + var admin = mongos.getDB("admin"); + var coll = mongos.getCollection("foo.bar"); + + assert.commandWorked(admin.runCommand({enableSharding: coll.getDB() + ""})); + + coll.ensureIndex({shardKey: 1}); + assert.commandWorked(admin.runCommand({shardCollection: coll + "", key: {shardKey: 1}})); + + var timeBadInsert = function() { + var start = new Date().getTime(); + + // Bad insert, no shard key + assert.writeError(coll.insert({hello: "world"})); + + var end = new Date().getTime(); + + return end - start; + }; + + // We need to work at least twice in order to check resetting the counter + var successNeeded = 2; + var success = 0; + + // Loop over this test a few times, to ensure that the error counters get reset if we don't have + // bad inserts over a long enough time. + for (var test = 0; test < 5; test++) { + var firstWait = timeBadInsert(); + var lastWait = 0; + + for (var i = 0; i < 20; i++) { + printjson(lastWait = timeBadInsert()); + } + + // As a heuristic test, we want to make sure that the error wait after sleeping is much less + // than the error wait after a lot of errors. + if (lastWait > firstWait * 2 * 2) { + success++; + } + + if (success >= successNeeded) { + break; + } + + // Abort if we've failed too many times + assert.lt(test, 4); + + // Sleeping for long enough to reset our exponential counter + sleep(3000); + } + + st.stop(); + +})(); diff --git a/jstests/sharding/printShardingStatus.js b/jstests/sharding/printShardingStatus.js index 85330311fd4..798338c39c0 100644 --- a/jstests/sharding/printShardingStatus.js +++ b/jstests/sharding/printShardingStatus.js @@ -181,7 +181,6 @@ var output = grabStatusOutput(st.config); assertPresentInOutput(output, collName, "collection"); - // If any of the previous collection names are present, then their optional indicators // might also be present. This might taint the results when we go searching through // the output. @@ -191,7 +190,6 @@ } assertPresentInOutput(output, "unique: " + (!!args.unique), "unique shard key indicator"); - if (args.hasOwnProperty("unique") && typeof(args.unique) != "boolean") { // non-bool: actual value must be shown assertPresentInOutput( @@ -206,12 +204,7 @@ assertPresentInOutput(output, tojson(args.noBalance), "noBalance indicator (non bool)"); } - try { - mongos.getCollection(collName).drop(); - } catch (e) { - // Ignore drop errors because they are from the illegal values in the collection entry - assert.writeOK(mongos.getDB("config").collections.remove({_id: collName})); - } + assert(mongos.getCollection(collName).drop()); testCollDetailsNum++; } diff --git a/src/mongo/db/commands/mr.cpp b/src/mongo/db/commands/mr.cpp index af0c740fc70..d7b8816891f 100644 --- a/src/mongo/db/commands/mr.cpp +++ b/src/mongo/db/commands/mr.cpp @@ -68,11 +68,14 @@ #include "mongo/db/server_options.h" #include "mongo/db/service_context.h" #include "mongo/s/catalog_cache.h" +#include "mongo/s/chunk.h" +#include "mongo/s/chunk_manager.h" #include "mongo/s/client/parallel.h" #include "mongo/s/client/shard_connection.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" #include "mongo/s/shard_key_pattern.h" +#include "mongo/s/sharding_raii.h" #include "mongo/s/stale_exception.h" #include "mongo/scripting/engine.h" #include "mongo/stdx/mutex.h" @@ -1793,6 +1796,7 @@ public: state.prepTempCollection(); ON_BLOCK_EXIT_OBJ(state, &State::dropTempCollections); + BSONList values; if (!config.outputOptions.outDB.empty()) { BSONObjBuilder loc; if (!config.outputOptions.outDB.empty()) @@ -1800,29 +1804,33 @@ public: if (!config.outputOptions.collectionName.empty()) loc.append("collection", config.outputOptions.collectionName); result.append("result", loc.obj()); - } else if (!config.outputOptions.collectionName.empty()) { - result.append("result", config.outputOptions.collectionName); + } else { + if (!config.outputOptions.collectionName.empty()) + result.append("result", config.outputOptions.collectionName); } - std::vector<std::shared_ptr<Chunk>> chunks; + auto scopedDbStatus = ScopedShardDatabase::getExisting(opCtx, dbname); + if (!scopedDbStatus.isOK()) { + return appendCommandStatus(result, scopedDbStatus.getStatus()); + } - if (config.outputOptions.outType != Config::OutputType::INMEMORY) { - auto outRoutingInfoStatus = Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo( - opCtx, config.outputOptions.finalNamespace); - if (!outRoutingInfoStatus.isOK()) { - return appendCommandStatus(result, outRoutingInfoStatus.getStatus()); - } + auto confOut = scopedDbStatus.getValue().db(); - if (auto cm = outRoutingInfoStatus.getValue().cm()) { - // Fetch result from other shards 1 chunk at a time. It would be better to do just - // one big $or query, but then the sorting would not be efficient. - const string shardName = ShardingState::get(opCtx)->getShardName(); + vector<shared_ptr<Chunk>> chunks; - for (const auto& chunkEntry : cm->chunkMap()) { - const auto& chunk = chunkEntry.second; - if (chunk->getShardId() == shardName) { - chunks.push_back(chunk); - } + if (confOut->isSharded(config.outputOptions.finalNamespace.ns())) { + shared_ptr<ChunkManager> cm = + confOut->getChunkManager(opCtx, config.outputOptions.finalNamespace.ns()); + + // Fetch result from other shards 1 chunk at a time. It would be better to do just one + // big $or query, but then the sorting would not be efficient. + const string shardName = ShardingState::get(opCtx)->getShardName(); + const ChunkMap& chunkMap = cm->getChunkMap(); + + for (ChunkMap::const_iterator it = chunkMap.begin(); it != chunkMap.end(); ++it) { + shared_ptr<Chunk> chunk = it->second; + if (chunk->getShardId() == shardName) { + chunks.push_back(chunk); } } } @@ -1831,8 +1839,6 @@ public: unsigned int index = 0; BSONObj query; BSONArrayBuilder chunkSizes; - BSONList values; - while (true) { shared_ptr<Chunk> chunk; if (chunks.size() > 0) { @@ -1849,7 +1855,6 @@ public: ParallelSortClusteredCursor cursor( servers, inputNS, Query(query).sort(sortKey), QueryOption_NoCursorTimeout); cursor.init(opCtx); - int chunkSize = 0; while (cursor.more() || !values.empty()) { @@ -1875,9 +1880,7 @@ public: state.insert(config.tempNamespace, res); else state.emit(res); - values.clear(); - if (!t.isEmpty()) values.push_back(t); } @@ -1886,7 +1889,6 @@ public: chunkSizes.append(chunk->getMin()); chunkSizes.append(chunkSize); } - if (++index >= chunks.size()) break; } @@ -1905,10 +1907,8 @@ public: countsB.append("output", outputCount); result.append("counts", countsB.obj()); - return true; + return 1; } - } mapReduceFinishCommand; - -} // namespace -} // namespace mongo +} +} diff --git a/src/mongo/db/s/balancer/balancer.cpp b/src/mongo/db/s/balancer/balancer.cpp index b85f08b7189..60a765cf5d6 100644 --- a/src/mongo/db/s/balancer/balancer.cpp +++ b/src/mongo/db/s/balancer/balancer.cpp @@ -46,11 +46,12 @@ #include "mongo/s/balancer_configuration.h" #include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/catalog/type_chunk.h" -#include "mongo/s/catalog_cache.h" +#include "mongo/s/client/shard.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/cluster_identity_loader.h" #include "mongo/s/grid.h" #include "mongo/s/shard_util.h" +#include "mongo/s/sharding_raii.h" #include "mongo/stdx/memory.h" #include "mongo/util/exit.h" #include "mongo/util/log.h" @@ -533,20 +534,18 @@ Status Balancer::_enforceTagRanges(OperationContext* opCtx) { } for (const auto& splitInfo : chunksToSplitStatus.getValue()) { - auto routingInfoStatus = - Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh( - opCtx, splitInfo.nss); - if (!routingInfoStatus.isOK()) { - return routingInfoStatus.getStatus(); + auto scopedCMStatus = ScopedChunkManager::refreshAndGet(opCtx, splitInfo.nss); + if (!scopedCMStatus.isOK()) { + return scopedCMStatus.getStatus(); } - auto cm = routingInfoStatus.getValue().cm(); + const auto& scopedCM = scopedCMStatus.getValue(); auto splitStatus = shardutil::splitChunkAtMultiplePoints(opCtx, splitInfo.shardId, splitInfo.nss, - cm->getShardKeyPattern(), + scopedCM.cm()->getShardKeyPattern(), splitInfo.collectionVersion, ChunkRange(splitInfo.minKey, splitInfo.maxKey), splitInfo.splitKeys); @@ -614,9 +613,8 @@ int Balancer::_moveChunks(OperationContext* opCtx, void Balancer::_splitOrMarkJumbo(OperationContext* opCtx, const NamespaceString& nss, const BSONObj& minKey) { - auto routingInfo = uassertStatusOK( - Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh(opCtx, nss)); - const auto cm = routingInfo.cm().get(); + auto scopedCM = uassertStatusOK(ScopedChunkManager::refreshAndGet(opCtx, nss)); + const auto cm = scopedCM.cm().get(); auto chunk = cm->findIntersectingChunkWithSimpleCollation(minKey); diff --git a/src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.cpp b/src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.cpp index 2fe6ce10746..a4574dfc676 100644 --- a/src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.cpp +++ b/src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.cpp @@ -43,6 +43,7 @@ #include "mongo/s/catalog/type_tags.h" #include "mongo/s/catalog_cache.h" #include "mongo/s/grid.h" +#include "mongo/s/sharding_raii.h" #include "mongo/stdx/memory.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" @@ -71,7 +72,7 @@ StatusWith<DistributionStatus> createCollectionDistributionStatus( shardToChunksMap[stat.shardId]; } - for (const auto& entry : chunkMgr->chunkMap()) { + for (const auto& entry : chunkMgr->getChunkMap()) { const auto& chunkEntry = entry.second; ChunkType chunk; @@ -298,16 +299,17 @@ BalancerChunkSelectionPolicyImpl::selectSpecificChunkToMove(OperationContext* op return shardStatsStatus.getStatus(); } - auto& shardStats = std::move(shardStatsStatus.getValue()); + const auto shardStats = std::move(shardStatsStatus.getValue()); + + const NamespaceString nss(chunk.getNS()); - auto routingInfoStatus = - Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh(opCtx, - chunk.getNS()); - if (!routingInfoStatus.isOK()) { - return routingInfoStatus.getStatus(); + auto scopedCMStatus = ScopedChunkManager::refreshAndGet(opCtx, nss); + if (!scopedCMStatus.isOK()) { + return scopedCMStatus.getStatus(); } - const auto cm = routingInfoStatus.getValue().cm().get(); + const auto& scopedCM = scopedCMStatus.getValue(); + const auto cm = scopedCM.cm().get(); const auto collInfoStatus = createCollectionDistributionStatus(opCtx, shardStats, cm); if (!collInfoStatus.isOK()) { @@ -329,14 +331,15 @@ Status BalancerChunkSelectionPolicyImpl::checkMoveAllowed(OperationContext* opCt auto shardStats = std::move(shardStatsStatus.getValue()); - auto routingInfoStatus = - Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh(opCtx, - chunk.getNS()); - if (!routingInfoStatus.isOK()) { - return routingInfoStatus.getStatus(); + const NamespaceString nss(chunk.getNS()); + + auto scopedCMStatus = ScopedChunkManager::refreshAndGet(opCtx, nss); + if (!scopedCMStatus.isOK()) { + return scopedCMStatus.getStatus(); } - const auto cm = routingInfoStatus.getValue().cm().get(); + const auto& scopedCM = scopedCMStatus.getValue(); + const auto cm = scopedCM.cm().get(); const auto collInfoStatus = createCollectionDistributionStatus(opCtx, shardStats, cm); if (!collInfoStatus.isOK()) { @@ -363,13 +366,13 @@ Status BalancerChunkSelectionPolicyImpl::checkMoveAllowed(OperationContext* opCt StatusWith<SplitInfoVector> BalancerChunkSelectionPolicyImpl::_getSplitCandidatesForCollection( OperationContext* opCtx, const NamespaceString& nss, const ShardStatisticsVector& shardStats) { - auto routingInfoStatus = - Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh(opCtx, nss); - if (!routingInfoStatus.isOK()) { - return routingInfoStatus.getStatus(); + auto scopedCMStatus = ScopedChunkManager::refreshAndGet(opCtx, nss); + if (!scopedCMStatus.isOK()) { + return scopedCMStatus.getStatus(); } - const auto cm = routingInfoStatus.getValue().cm().get(); + const auto& scopedCM = scopedCMStatus.getValue(); + const auto cm = scopedCM.cm().get(); const auto& shardKeyPattern = cm->getShardKeyPattern().getKeyPattern(); @@ -417,13 +420,13 @@ StatusWith<MigrateInfoVector> BalancerChunkSelectionPolicyImpl::_getMigrateCandi const NamespaceString& nss, const ShardStatisticsVector& shardStats, bool aggressiveBalanceHint) { - auto routingInfoStatus = - Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh(opCtx, nss); - if (!routingInfoStatus.isOK()) { - return routingInfoStatus.getStatus(); + auto scopedCMStatus = ScopedChunkManager::refreshAndGet(opCtx, nss); + if (!scopedCMStatus.isOK()) { + return scopedCMStatus.getStatus(); } - const auto cm = routingInfoStatus.getValue().cm().get(); + const auto& scopedCM = scopedCMStatus.getValue(); + const auto cm = scopedCM.cm().get(); const auto& shardKeyPattern = cm->getShardKeyPattern().getKeyPattern(); diff --git a/src/mongo/db/s/balancer/migration_manager.cpp b/src/mongo/db/s/balancer/migration_manager.cpp index 9c70e8458ba..7f267b97e67 100644 --- a/src/mongo/db/s/balancer/migration_manager.cpp +++ b/src/mongo/db/s/balancer/migration_manager.cpp @@ -45,10 +45,10 @@ #include "mongo/executor/task_executor_pool.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/s/catalog/sharding_catalog_client.h" -#include "mongo/s/catalog_cache.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" #include "mongo/s/move_chunk_request.h" +#include "mongo/s/sharding_raii.h" #include "mongo/util/log.h" #include "mongo/util/net/hostandport.h" #include "mongo/util/scopeguard.h" @@ -180,16 +180,14 @@ Status MigrationManager::executeManualMigration( RemoteCommandResponse remoteCommandResponse = _schedule(opCtx, migrateInfo, maxChunkSizeBytes, secondaryThrottle, waitForDelete)->get(); - auto routingInfoStatus = - Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh( - opCtx, migrateInfo.ns); - if (!routingInfoStatus.isOK()) { - return routingInfoStatus.getStatus(); + auto scopedCMStatus = ScopedChunkManager::refreshAndGet(opCtx, NamespaceString(migrateInfo.ns)); + if (!scopedCMStatus.isOK()) { + return scopedCMStatus.getStatus(); } - auto& routingInfo = routingInfoStatus.getValue(); + const auto& scopedCM = scopedCMStatus.getValue(); - auto chunk = routingInfo.cm()->findIntersectingChunkWithSimpleCollation(migrateInfo.minKey); + auto chunk = scopedCM.cm()->findIntersectingChunkWithSimpleCollation(migrateInfo.minKey); invariant(chunk); Status commandStatus = _processRemoteCommandResponse( @@ -312,20 +310,18 @@ void MigrationManager::finishRecovery(OperationContext* opCtx, auto& migrateInfos = nssAndMigrateInfos.second; invariant(!migrateInfos.empty()); - auto routingInfoStatus = - Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh(opCtx, - nss); - if (!routingInfoStatus.isOK()) { + auto scopedCMStatus = ScopedChunkManager::refreshAndGet(opCtx, nss); + if (!scopedCMStatus.isOK()) { // This shouldn't happen because the collection was intact and sharded when the previous // config primary was active and the dist locks have been held by the balancer // throughout. Abort migration recovery. log() << "Unable to reload chunk metadata for collection '" << nss << "' during balancer recovery. Abandoning recovery." - << causedBy(redact(routingInfoStatus.getStatus())); + << causedBy(redact(scopedCMStatus.getStatus())); return; } - auto& routingInfo = routingInfoStatus.getValue(); + const auto& scopedCM = scopedCMStatus.getValue(); int scheduledMigrations = 0; @@ -336,7 +332,7 @@ void MigrationManager::finishRecovery(OperationContext* opCtx, migrateInfos.pop_front(); auto chunk = - routingInfo.cm()->findIntersectingChunkWithSimpleCollation(migrationInfo.minKey); + scopedCM.cm()->findIntersectingChunkWithSimpleCollation(migrationInfo.minKey); invariant(chunk); if (chunk->getShardId() != migrationInfo.from) { diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp index fda5fffdcfb..9354f60b8e1 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp @@ -48,6 +48,7 @@ #include "mongo/executor/task_executor.h" #include "mongo/executor/task_executor_pool.h" #include "mongo/rpc/get_status_from_command_result.h" +#include "mongo/s/chunk.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" #include "mongo/util/elapsed_tracker.h" diff --git a/src/mongo/db/s/split_vector_command.cpp b/src/mongo/db/s/split_vector_command.cpp index b2dc34a6396..73a424e5d27 100644 --- a/src/mongo/db/s/split_vector_command.cpp +++ b/src/mongo/db/s/split_vector_command.cpp @@ -50,6 +50,7 @@ #include "mongo/db/keypattern.h" #include "mongo/db/query/internal_plans.h" #include "mongo/s/catalog/type_chunk.h" +#include "mongo/s/chunk.h" #include "mongo/util/log.h" #include "mongo/util/timer.h" diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript index eede0298c6d..f6ef30c691d 100644 --- a/src/mongo/s/SConscript +++ b/src/mongo/s/SConscript @@ -243,10 +243,12 @@ env.Library( 'chunk.cpp', 'chunk_manager.cpp', 'cluster_identity_loader.cpp', + 'config.cpp', 'config_server_client.cpp', 'grid.cpp', 'shard_util.cpp', 'sharding_egress_metadata_hook.cpp', + 'sharding_raii.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/db/audit', @@ -263,10 +265,8 @@ env.Library( env.CppUnitTest( target='catalog_cache_test', source=[ + 'chunk_manager_test.cpp', 'chunk_manager_index_bounds_test.cpp', - 'chunk_manager_query_test.cpp', - 'chunk_manager_refresh_test.cpp', - 'chunk_manager_test_fixture.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/s/catalog/sharding_catalog_test_fixture', diff --git a/src/mongo/s/catalog/type_chunk.cpp b/src/mongo/s/catalog/type_chunk.cpp index e5091f16954..ff6df77d6f5 100644 --- a/src/mongo/s/catalog/type_chunk.cpp +++ b/src/mongo/s/catalog/type_chunk.cpp @@ -132,15 +132,6 @@ bool ChunkRange::operator!=(const ChunkRange& other) const { return !(*this == other); } -ChunkType::ChunkType() = default; - -ChunkType::ChunkType(NamespaceString nss, ChunkRange range, ChunkVersion version, ShardId shardId) - : _ns(nss.ns()), - _min(range.getMin()), - _max(range.getMax()), - _version(version), - _shard(std::move(shardId)) {} - StatusWith<ChunkType> ChunkType::fromConfigBSON(const BSONObj& source) { ChunkType chunk; diff --git a/src/mongo/s/catalog/type_chunk.h b/src/mongo/s/catalog/type_chunk.h index 6484f97b03b..06a26db34be 100644 --- a/src/mongo/s/catalog/type_chunk.h +++ b/src/mongo/s/catalog/type_chunk.h @@ -32,7 +32,6 @@ #include <string> #include "mongo/bson/bsonobj.h" -#include "mongo/db/namespace_string.h" #include "mongo/s/chunk_version.h" #include "mongo/s/shard_id.h" @@ -144,9 +143,6 @@ public: static const BSONField<Date_t> DEPRECATED_lastmod; static const BSONField<OID> DEPRECATED_epoch; - ChunkType(); - ChunkType(NamespaceString nss, ChunkRange range, ChunkVersion version, ShardId shardId); - /** * Constructs a new ChunkType object from BSON that has the config server's config.chunks * collection format. diff --git a/src/mongo/s/catalog_cache.cpp b/src/mongo/s/catalog_cache.cpp index c3607d6053f..d2c8eaf5504 100644 --- a/src/mongo/s/catalog_cache.cpp +++ b/src/mongo/s/catalog_cache.cpp @@ -26,391 +26,64 @@ * it in the license file. */ -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding - #include "mongo/platform/basic.h" #include "mongo/s/catalog_cache.h" -#include "mongo/base/status.h" #include "mongo/base/status_with.h" -#include "mongo/db/query/collation/collator_factory_interface.h" -#include "mongo/db/repl/optime_with.h" #include "mongo/s/catalog/sharding_catalog_client.h" -#include "mongo/s/catalog/type_collection.h" #include "mongo/s/catalog/type_database.h" -#include "mongo/s/chunk_diff.h" -#include "mongo/s/client/shard_registry.h" +#include "mongo/s/config.h" #include "mongo/s/grid.h" -#include "mongo/stdx/memory.h" -#include "mongo/util/log.h" -#include "mongo/util/timer.h" namespace mongo { -namespace { -// How many times to try refreshing the routing info if the set of chunks loaded from the config -// server is found to be inconsistent. -const int kMaxInconsistentRoutingInfoRefreshAttempts = 3; +using std::shared_ptr; +using std::string; -/** - * This is an adapter so we can use config diffs - mongos and mongod do them slightly differently. - * - * The mongos adapter here tracks all shards, and stores ranges by (max, Chunk) in the map. - */ -class CMConfigDiffTracker : public ConfigDiffTracker<std::shared_ptr<Chunk>> { -public: - CMConfigDiffTracker(const NamespaceString& nss, - RangeMap* currMap, - ChunkVersion* maxVersion, - MaxChunkVersionMap* maxShardVersions) - : ConfigDiffTracker<std::shared_ptr<Chunk>>( - nss.ns(), currMap, maxVersion, maxShardVersions) {} +CatalogCache::CatalogCache() = default; - bool isTracked(const ChunkType& chunk) const final { - // Mongos tracks all shards - return true; - } +CatalogCache::~CatalogCache() = default; - bool isMinKeyIndexed() const final { - return false; - } +StatusWith<std::shared_ptr<DBConfig>> CatalogCache::getDatabase(OperationContext* opCtx, + StringData dbName) { + stdx::lock_guard<stdx::mutex> guard(_mutex); - std::pair<BSONObj, std::shared_ptr<Chunk>> rangeFor(OperationContext* opCtx, - const ChunkType& chunk) const final { - return std::make_pair(chunk.getMax(), std::make_shared<Chunk>(chunk)); + auto it = _databases.find(dbName); + if (it != _databases.end()) { + return it->second; } - ShardId shardFor(OperationContext* opCtx, const ShardId& shardId) const final { - const auto shard = - uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, shardId)); - return shard->getId(); + // Need to load from the store + auto status = Grid::get(opCtx)->catalogClient(opCtx)->getDatabase(opCtx, dbName.toString()); + if (!status.isOK()) { + return status.getStatus(); } -}; - -} // namespace - -CatalogCache::CatalogCache() = default; - -CatalogCache::~CatalogCache() = default; - -StatusWith<CachedDatabaseInfo> CatalogCache::getDatabase(OperationContext* opCtx, - StringData dbName) { - stdx::lock_guard<stdx::mutex> lg(_mutex); + const auto& dbOpTimePair = status.getValue(); + auto db = std::make_shared<DBConfig>(dbOpTimePair.value, dbOpTimePair.opTime); try { - return {CachedDatabaseInfo(_getDatabase_inlock(opCtx, dbName))}; + db->load(opCtx); + auto emplaceResult = _databases.try_emplace(dbName, std::move(db)); + return emplaceResult.first->second; } catch (const DBException& ex) { return ex.toStatus(); } } -StatusWith<CachedCollectionRoutingInfo> CatalogCache::getCollectionRoutingInfo( - OperationContext* opCtx, const NamespaceString& nss) { - int numRefreshAttempts = 0; - - while (true) { - stdx::unique_lock<stdx::mutex> ul(_mutex); - - std::shared_ptr<DatabaseInfoEntry> dbEntry; - try { - dbEntry = _getDatabase_inlock(opCtx, nss.db()); - } catch (const DBException& ex) { - return ex.toStatus(); - } - - auto& collections = dbEntry->collections; - - auto it = collections.find(nss.ns()); - if (it == collections.end()) { - auto shardStatus = - Grid::get(opCtx)->shardRegistry()->getShard(opCtx, dbEntry->primaryShardId); - if (!shardStatus.isOK()) { - return {ErrorCodes::fromInt(40371), - str::stream() << "The primary shard for collection " << nss.ns() - << " could not be loaded due to error " - << shardStatus.getStatus().toString()}; - } - - return {CachedCollectionRoutingInfo( - dbEntry->primaryShardId, nss, std::move(shardStatus.getValue()))}; - } - - auto& collEntry = it->second; - - if (collEntry.needsRefresh) { - numRefreshAttempts++; - - try { - auto newRoutingInfo = - refreshCollectionRoutingInfo(opCtx, nss, std::move(collEntry.routingInfo)); - if (newRoutingInfo == nullptr) { - collections.erase(it); - - // Loop around so we can return an "unsharded" routing info - continue; - } - - collEntry.routingInfo = std::move(newRoutingInfo); - collEntry.needsRefresh = false; - } catch (const DBException& ex) { - // It is possible that the metadata is being changed concurrently, so retry the - // refresh with a wait - if (ex.getCode() == ErrorCodes::ConflictingOperationInProgress && - numRefreshAttempts < kMaxInconsistentRoutingInfoRefreshAttempts) { - ul.unlock(); - - log() << "Metadata refresh for " << nss.ns() << " failed and will be retried" - << causedBy(redact(ex)); - - // Do the sleep outside of the mutex - sleepFor(Milliseconds(10) * numRefreshAttempts); - continue; - } - - return ex.toStatus(); - } - } - - return {CachedCollectionRoutingInfo(dbEntry->primaryShardId, collEntry.routingInfo)}; - } -} - -StatusWith<CachedCollectionRoutingInfo> CatalogCache::getCollectionRoutingInfo( - OperationContext* opCtx, StringData ns) { - return getCollectionRoutingInfo(opCtx, NamespaceString(ns)); -} - -StatusWith<CachedCollectionRoutingInfo> CatalogCache::getShardedCollectionRoutingInfoWithRefresh( - OperationContext* opCtx, const NamespaceString& nss) { - invalidateShardedCollection(nss); - - auto routingInfoStatus = getCollectionRoutingInfo(opCtx, nss); - if (routingInfoStatus.isOK() && !routingInfoStatus.getValue().cm()) { - return {ErrorCodes::NamespaceNotSharded, - str::stream() << "Collection " << nss.ns() << " is not sharded."}; - } - - return routingInfoStatus; -} - -StatusWith<CachedCollectionRoutingInfo> CatalogCache::getShardedCollectionRoutingInfoWithRefresh( - OperationContext* opCtx, StringData ns) { - return getShardedCollectionRoutingInfoWithRefresh(opCtx, NamespaceString(ns)); -} - -void CatalogCache::onStaleConfigError(CachedCollectionRoutingInfo&& ccrt) { - if (!ccrt._cm) { - // Here we received a stale config error for a collection which we previously thought was - // unsharded. - invalidateShardedCollection(ccrt._nss); - return; - } - - // Here we received a stale config error for a collection which we previously though was sharded - stdx::lock_guard<stdx::mutex> lg(_mutex); - - auto it = _databases.find(NamespaceString(ccrt._cm->getns()).db()); - if (it == _databases.end()) { - // If the database does not exist, the collection must have been dropped so there is - // nothing to invalidate. The getCollectionRoutingInfo will handle the reload of the - // entire database and its collections. - return; - } - - auto& collections = it->second->collections; - - auto itColl = collections.find(ccrt._cm->getns()); - if (itColl == collections.end()) { - // If the collection does not exist, this means it must have been dropped since the last - // time we retrieved a cache entry for it. Doing nothing in this case will cause the - // next call to getCollectionRoutingInfo to return an unsharded collection. - return; - } else if (itColl->second.routingInfo->getVersion() == ccrt._cm->getVersion()) { - // If the versions match, the last version of the routing information that we used is no - // longer valid, so trigger a refresh. - itColl->second.needsRefresh = true; - } -} - -void CatalogCache::invalidateShardedCollection(const NamespaceString& nss) { - stdx::lock_guard<stdx::mutex> lg(_mutex); - - auto it = _databases.find(nss.db()); - if (it == _databases.end()) { - return; - } +void CatalogCache::invalidate(StringData dbName) { + stdx::lock_guard<stdx::mutex> guard(_mutex); - it->second->collections[nss.ns()].needsRefresh = true; -} - -void CatalogCache::invalidateShardedCollection(StringData ns) { - invalidateShardedCollection(NamespaceString(ns)); -} - -void CatalogCache::purgeDatabase(StringData dbName) { - stdx::lock_guard<stdx::mutex> lg(_mutex); - - auto it = _databases.find(dbName); - if (it == _databases.end()) { - return; - } - - _databases.erase(it); -} - -void CatalogCache::purgeAllDatabases() { - stdx::lock_guard<stdx::mutex> lg(_mutex); - _databases.clear(); -} - -std::shared_ptr<ChunkManager> CatalogCache::refreshCollectionRoutingInfo( - OperationContext* opCtx, - const NamespaceString& nss, - std::shared_ptr<ChunkManager> existingRoutingInfo) { - Timer t; - - const auto catalogClient = Grid::get(opCtx)->catalogClient(opCtx); - - // Decide whether to do a full or partial load based on the state of the collection - auto collStatus = catalogClient->getCollection(opCtx, nss.ns()); - if (collStatus == ErrorCodes::NamespaceNotFound) { - return nullptr; - } - - const auto coll = uassertStatusOK(std::move(collStatus)).value; - if (coll.getDropped()) { - return nullptr; - } - - ChunkVersion startingCollectionVersion; - ChunkMap chunkMap = - SimpleBSONObjComparator::kInstance.makeBSONObjIndexedMap<std::shared_ptr<Chunk>>(); - - if (!existingRoutingInfo) { - // If we don't have a basis chunk manager, do a full refresh - startingCollectionVersion = ChunkVersion(0, 0, coll.getEpoch()); - } else if (existingRoutingInfo->getVersion().epoch() != coll.getEpoch()) { - // If the collection's epoch has changed, do a full refresh - startingCollectionVersion = ChunkVersion(0, 0, coll.getEpoch()); - } else { - // Otherwise do a partial refresh - startingCollectionVersion = existingRoutingInfo->getVersion(); - chunkMap = existingRoutingInfo->chunkMap(); - } - - log() << "Refreshing chunks based on version " << startingCollectionVersion; - - // Diff tracker should *always* find at least one chunk if collection exists - const auto diffQuery = - CMConfigDiffTracker::createConfigDiffQuery(nss, startingCollectionVersion); - - // Query the chunks which have changed - std::vector<ChunkType> newChunks; - repl::OpTime opTime; - uassertStatusOK(Grid::get(opCtx)->catalogClient(opCtx)->getChunks( - opCtx, - diffQuery.query, - diffQuery.sort, - boost::none, - &newChunks, - &opTime, - repl::ReadConcernLevel::kMajorityReadConcern)); - - ChunkVersion collectionVersion = startingCollectionVersion; - - ShardVersionMap unusedShardVersions; - CMConfigDiffTracker differ(nss, &chunkMap, &collectionVersion, &unusedShardVersions); - - const int diffsApplied = differ.calculateConfigDiff(opCtx, newChunks); - - if (diffsApplied < 1) { - log() << "Refresh took " << t.millis() << " ms and failed because the collection's " - "sharding metadata either changed in between or " - "became corrupted"; - - uasserted(ErrorCodes::ConflictingOperationInProgress, - "Collection sharding status changed during refresh or became corrupted"); - } - - // If at least one diff was applied, the metadata is correct, but it might not have changed so - // in this case there is no need to recreate the chunk manager. - // - // NOTE: In addition to the above statement, it is also important that we return the same chunk - // manager object, because the write commands' code relies on changes of the chunk manager's - // sequence number to detect batch writes not making progress because of chunks moving across - // shards too frequently. - if (collectionVersion == startingCollectionVersion) { - log() << "Refresh took " << t.millis() << " ms and didn't find any metadata changes"; - - return existingRoutingInfo; - } - - std::unique_ptr<CollatorInterface> defaultCollator; - if (!coll.getDefaultCollation().isEmpty()) { - // The collation should have been validated upon collection creation - defaultCollator = uassertStatusOK(CollatorFactoryInterface::get(opCtx->getServiceContext()) - ->makeFromBSON(coll.getDefaultCollation())); - } - - log() << "Refresh took " << t.millis() << " ms and found version " << collectionVersion; - - return stdx::make_unique<ChunkManager>(nss, - coll.getKeyPattern(), - std::move(defaultCollator), - coll.getUnique(), - std::move(chunkMap), - collectionVersion); -} - -std::shared_ptr<CatalogCache::DatabaseInfoEntry> CatalogCache::_getDatabase_inlock( - OperationContext* opCtx, StringData dbName) { - auto it = _databases.find(dbName); + ShardedDatabasesMap::iterator it = _databases.find(dbName); if (it != _databases.end()) { - return it->second; - } - - const auto catalogClient = Grid::get(opCtx)->catalogClient(opCtx); - - const auto dbNameCopy = dbName.toString(); - - // Load the database entry - const auto opTimeWithDb = uassertStatusOK(catalogClient->getDatabase(opCtx, dbNameCopy)); - const auto& dbDesc = opTimeWithDb.value; - - // Load the sharded collections entries - std::vector<CollectionType> collections; - repl::OpTime collLoadConfigOptime; - uassertStatusOK( - catalogClient->getCollections(opCtx, &dbNameCopy, &collections, &collLoadConfigOptime)); - - StringMap<CollectionRoutingInfoEntry> collectionEntries; - for (const auto& coll : collections) { - collectionEntries[coll.getNs().ns()].needsRefresh = true; + _databases.erase(it); } - - return _databases[dbName] = std::shared_ptr<DatabaseInfoEntry>(new DatabaseInfoEntry{ - dbDesc.getPrimary(), dbDesc.getSharded(), std::move(collectionEntries)}); } -CachedDatabaseInfo::CachedDatabaseInfo(std::shared_ptr<CatalogCache::DatabaseInfoEntry> db) - : _db(std::move(db)) {} - -const ShardId& CachedDatabaseInfo::primaryId() const { - return _db->primaryShardId; -} +void CatalogCache::invalidateAll() { + stdx::lock_guard<stdx::mutex> guard(_mutex); -bool CachedDatabaseInfo::shardingEnabled() const { - return _db->shardingEnabled; + _databases.clear(); } -CachedCollectionRoutingInfo::CachedCollectionRoutingInfo(ShardId primaryId, - std::shared_ptr<ChunkManager> cm) - : _primaryId(std::move(primaryId)), _cm(std::move(cm)) {} - -CachedCollectionRoutingInfo::CachedCollectionRoutingInfo(ShardId primaryId, - NamespaceString nss, - std::shared_ptr<Shard> primary) - : _primaryId(std::move(primaryId)), _nss(std::move(nss)), _primary(std::move(primary)) {} - } // namespace mongo diff --git a/src/mongo/s/catalog_cache.h b/src/mongo/s/catalog_cache.h index 528b2df4673..0e63f94b52a 100644 --- a/src/mongo/s/catalog_cache.h +++ b/src/mongo/s/catalog_cache.h @@ -28,20 +28,19 @@ #pragma once +#include <memory> + #include "mongo/base/disallow_copying.h" #include "mongo/base/string_data.h" -#include "mongo/s/chunk_manager.h" -#include "mongo/s/chunk_version.h" -#include "mongo/s/client/shard.h" #include "mongo/stdx/mutex.h" -#include "mongo/util/concurrency/notification.h" #include "mongo/util/string_map.h" namespace mongo { -class CachedDatabaseInfo; -class CachedCollectionRoutingInfo; +class DBConfig; class OperationContext; +template <typename T> +class StatusWith; /** * This is the root of the "read-only" hierarchy of cached catalog metadata. It is read only @@ -63,184 +62,26 @@ public: * * Returns the database cache entry if the database exists or a failed status otherwise. */ - StatusWith<CachedDatabaseInfo> getDatabase(OperationContext* opCtx, StringData dbName); - - /** - * Blocking shortcut method to get a specific sharded collection from a given database using the - * complete namespace. If the collection is sharded returns a ScopedChunkManager initialized - * with ChunkManager. If the collection is not sharded, returns a ScopedChunkManager initialized - * with the primary shard for the specified database. If an error occurs loading the metadata - * returns a failed status. - */ - StatusWith<CachedCollectionRoutingInfo> getCollectionRoutingInfo(OperationContext* opCtx, - const NamespaceString& nss); - StatusWith<CachedCollectionRoutingInfo> getCollectionRoutingInfo(OperationContext* opCtx, - StringData ns); - - /** - * Same as getCollectionRoutingInfo above, but in addition causes the namespace to be refreshed - * and returns a NamespaceNotSharded error if the collection is not sharded. - */ - StatusWith<CachedCollectionRoutingInfo> getShardedCollectionRoutingInfoWithRefresh( - OperationContext* opCtx, const NamespaceString& nss); - StatusWith<CachedCollectionRoutingInfo> getShardedCollectionRoutingInfoWithRefresh( - OperationContext* opCtx, StringData ns); - - /** - * Non-blocking method to be called whenever using the specified routing table has encountered a - * stale config exception. Returns immediately and causes the routing table to be refreshed the - * next time getCollectionRoutingInfo is called. Does nothing if the routing table has been - * refreshed already. - */ - void onStaleConfigError(CachedCollectionRoutingInfo&&); - - /** - * Non-blocking method, which indiscriminately causes the routing table for the specified - * namespace to be refreshed the next time getCollectionRoutingInfo is called. - */ - void invalidateShardedCollection(const NamespaceString& nss); - void invalidateShardedCollection(StringData ns); - - /** - * Blocking method, which removes the entire specified database (including its collections) from - * the cache. - */ - void purgeDatabase(StringData dbName); + StatusWith<std::shared_ptr<DBConfig>> getDatabase(OperationContext* opCtx, StringData dbName); /** - * Blocking method, which removes all databases (including their collections) from the cache. + * Removes the database information for the specified name from the cache, so that the + * next time getDatabase is called, it will be reloaded. */ - void purgeAllDatabases(); + void invalidate(StringData dbName); /** - * Blocking method, which refreshes the routing information for the specified collection. If - * 'existingRoutingInfo' has been specified uses this as a basis to perform an 'incremental' - * refresh, which only fetches the chunks which changed. Otherwise does a full refresh, fetching - * all the chunks for the collection. - * - * Returns the refreshed routing information if the collection is still sharded or nullptr if it - * is not. If refresh fails for any reason, throws a DBException. - * - * With the exception of ConflictingOperationInProgress, error codes thrown from this method are - * final in that there is nothing that can be done to remedy them other than pass the error to - * the user. - * - * ConflictingOperationInProgress indicates that the chunk metadata was found to be - * inconsistent. Since this may be transient, due to the collection being dropped or recreated, - * the caller must retry the reload up to some configurable number of attempts. - * - * NOTE: Should never be called directly and is exposed as public for testing purposes only. + * Purges all cached database information, which will cause the data to be reloaded again. */ - static std::shared_ptr<ChunkManager> refreshCollectionRoutingInfo( - OperationContext* opCtx, - const NamespaceString& nss, - std::shared_ptr<ChunkManager> existingRoutingInfo); + void invalidateAll(); private: - // Make the cache entries friends so they can access the private classes below - friend class CachedDatabaseInfo; - friend class CachedCollectionRoutingInfo; - - /** - * Cache entry describing a collection. - */ - struct CollectionRoutingInfoEntry { - std::shared_ptr<ChunkManager> routingInfo; - - bool needsRefresh{true}; - }; - - /** - * Cache entry describing a database. - */ - struct DatabaseInfoEntry { - ShardId primaryShardId; - - bool shardingEnabled; - - StringMap<CollectionRoutingInfoEntry> collections; - }; - - using DatabaseInfoMap = StringMap<std::shared_ptr<DatabaseInfoEntry>>; - - /** - * Ensures that the specified database is in the cache, loading it if necessary. If the database - * was not in cache, all the sharded collections will be in the 'needsRefresh' state. - */ - std::shared_ptr<DatabaseInfoEntry> _getDatabase_inlock(OperationContext* opCtx, - StringData dbName); + using ShardedDatabasesMap = StringMap<std::shared_ptr<DBConfig>>; // Mutex to serialize access to the structures below stdx::mutex _mutex; - // Map from DB name to the info for that database - DatabaseInfoMap _databases; -}; - -/** - * Constructed exclusively by the CatalogCache, contains a reference to the cached information for - * the specified database. - */ -class CachedDatabaseInfo { -public: - const ShardId& primaryId() const; - - bool shardingEnabled() const; - -private: - friend class CatalogCache; - - CachedDatabaseInfo(std::shared_ptr<CatalogCache::DatabaseInfoEntry> db); - - std::shared_ptr<CatalogCache::DatabaseInfoEntry> _db; -}; - -/** - * Constructed exclusively by the CatalogCache contains a reference to the routing information for - * the specified collection. - */ -class CachedCollectionRoutingInfo { -public: - /** - * Returns the ID of the primary shard for the database owining this collection, regardless of - * whether it is sharded or not. - */ - const ShardId& primaryId() const { - return _primaryId; - } - - /** - * If the collection is sharded, returns a chunk manager for it. Otherwise, nullptr. - */ - std::shared_ptr<ChunkManager> cm() const { - return _cm; - } - - /** - * If the collection is not sharded, returns its primary shard. Otherwise, nullptr. - */ - std::shared_ptr<Shard> primary() const { - return _primary; - } - -private: - friend class CatalogCache; - - CachedCollectionRoutingInfo(ShardId primaryId, std::shared_ptr<ChunkManager> cm); - - CachedCollectionRoutingInfo(ShardId primaryId, - NamespaceString nss, - std::shared_ptr<Shard> primary); - - // The id of the primary shard containing the database - ShardId _primaryId; - - // Reference to the corresponding chunk manager (if sharded) or null - std::shared_ptr<ChunkManager> _cm; - - // Reference to the primary of the database (if not sharded) or null - NamespaceString _nss; - std::shared_ptr<Shard> _primary; + ShardedDatabasesMap _databases; }; } // namespace mongo diff --git a/src/mongo/s/chunk_diff.cpp b/src/mongo/s/chunk_diff.cpp index 07d569a503d..f21555043ad 100644 --- a/src/mongo/s/chunk_diff.cpp +++ b/src/mongo/s/chunk_diff.cpp @@ -105,7 +105,7 @@ int ConfigDiffTracker<ValType>::calculateConfigDiff(OperationContext* opCtx, // Store epoch now so it doesn't change when we change max OID currEpoch = _maxVersion->epoch(); - int validDiffs = 0; + _validDiffs = 0; for (const ChunkType& chunk : chunks) { const ChunkVersion& chunkVersion = chunk.getVersion(); @@ -121,7 +121,7 @@ int ConfigDiffTracker<ValType>::calculateConfigDiff(OperationContext* opCtx, return -1; } - validDiffs++; + _validDiffs++; // Get max changed version and chunk version if (chunkVersion > *_maxVersion) { @@ -151,7 +151,7 @@ int ConfigDiffTracker<ValType>::calculateConfigDiff(OperationContext* opCtx, } } - LOG(3) << "found " << validDiffs << " new chunks for collection " << _ns << " (tracking " + LOG(3) << "found " << _validDiffs << " new chunks for collection " << _ns << " (tracking " << newTracked.size() << "), new version is " << *_maxVersion; for (const ChunkType& chunk : newTracked) { @@ -167,7 +167,7 @@ int ConfigDiffTracker<ValType>::calculateConfigDiff(OperationContext* opCtx, _currMap->insert(rangeFor(opCtx, chunk)); } - return validDiffs; + return _validDiffs; } ConfigDiffTrackerBase::QueryAndSort ConfigDiffTrackerBase::createConfigDiffQuery( diff --git a/src/mongo/s/chunk_diff.h b/src/mongo/s/chunk_diff.h index 5910cc93eed..0cea9fa678a 100644 --- a/src/mongo/s/chunk_diff.h +++ b/src/mongo/s/chunk_diff.h @@ -93,9 +93,13 @@ public: RangeMap* currMap, ChunkVersion* maxVersion, MaxChunkVersionMap* maxShardVersions); - virtual ~ConfigDiffTracker(); + // Call after load for more information + int numValidDiffs() const { + return _validDiffs; + } + // Applies changes to the config data from a vector of chunks passed in. Also includes minor // version changes for particular major-version chunks if explicitly specified. // Returns the number of diffs processed, or -1 if the diffs were inconsistent. @@ -131,6 +135,9 @@ private: RangeMap* const _currMap; ChunkVersion* const _maxVersion; MaxChunkVersionMap* const _maxShardVersions; + + // Store for later use + int _validDiffs{0}; }; } // namespace mongo diff --git a/src/mongo/s/chunk_manager.cpp b/src/mongo/s/chunk_manager.cpp index 047bfa1d696..20cfd7e098f 100644 --- a/src/mongo/s/chunk_manager.cpp +++ b/src/mongo/s/chunk_manager.cpp @@ -32,50 +32,296 @@ #include "mongo/s/chunk_manager.h" +#include <boost/next_prior.hpp> #include <vector> #include "mongo/bson/simple_bsonobj_comparator.h" +#include "mongo/client/read_preference.h" #include "mongo/db/matcher/extensions_callback_noop.h" #include "mongo/db/query/collation/collation_index_key.h" #include "mongo/db/query/index_bounds_builder.h" #include "mongo/db/query/query_planner.h" #include "mongo/db/query/query_planner_common.h" +#include "mongo/s/catalog/sharding_catalog_client.h" +#include "mongo/s/chunk_diff.h" +#include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" #include "mongo/util/log.h" +#include "mongo/util/timer.h" namespace mongo { + +using std::map; +using std::pair; +using std::set; +using std::shared_ptr; +using std::string; +using std::unique_ptr; + namespace { // Used to generate sequence numbers to assign to each newly created ChunkManager AtomicUInt32 nextCMSequenceNumber(0); -void checkAllElementsAreOfType(BSONType type, const BSONObj& o) { - for (const auto&& element : o) { - uassert(ErrorCodes::ConflictingOperationInProgress, - str::stream() << "Not all elements of " << o << " are of type " << typeName(type), - element.type() == type); +/** + * This is an adapter so we can use config diffs - mongos and mongod do them slightly differently. + * + * The mongos adapter here tracks all shards, and stores ranges by (max, Chunk) in the map. + */ +class CMConfigDiffTracker : public ConfigDiffTracker<shared_ptr<Chunk>> { +public: + CMConfigDiffTracker(const std::string& ns, + RangeMap* currMap, + ChunkVersion* maxVersion, + MaxChunkVersionMap* maxShardVersions, + ChunkManager* manager) + : ConfigDiffTracker<shared_ptr<Chunk>>(ns, currMap, maxVersion, maxShardVersions), + _manager(manager) {} + + bool isTracked(const ChunkType& chunk) const final { + // Mongos tracks all shards + return true; + } + + bool isMinKeyIndexed() const final { + return false; + } + + pair<BSONObj, shared_ptr<Chunk>> rangeFor(OperationContext* opCtx, + const ChunkType& chunk) const final { + return std::make_pair(chunk.getMax(), std::make_shared<Chunk>(chunk)); + } + + ShardId shardFor(OperationContext* opCtx, const ShardId& shardId) const final { + const auto shard = + uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, shardId)); + return shard->getId(); + } + +private: + ChunkManager* const _manager; +}; + +bool allOfType(BSONType type, const BSONObj& o) { + BSONObjIterator it(o); + while (it.more()) { + if (it.next().type() != type) { + return false; + } + } + return true; +} + +bool isChunkMapValid(const ChunkMap& chunkMap) { +#define ENSURE(x) \ + do { \ + if (!(x)) { \ + log() << "ChunkManager::_isValid failed: " #x; \ + return false; \ + } \ + } while (0) + + if (chunkMap.empty()) { + return true; + } + + // Check endpoints + ENSURE(allOfType(MinKey, chunkMap.begin()->second->getMin())); + ENSURE(allOfType(MaxKey, boost::prior(chunkMap.end())->second->getMax())); + + // Make sure there are no gaps or overlaps + for (ChunkMap::const_iterator it = boost::next(chunkMap.begin()), end = chunkMap.end(); + it != end; + ++it) { + ChunkMap::const_iterator last = boost::prior(it); + + if (SimpleBSONObjComparator::kInstance.evaluate(it->second->getMin() != + last->second->getMax())) { + log() << last->second->toString(); + log() << it->second->toString(); + log() << it->second->getMin(); + log() << last->second->getMax(); + } + + ENSURE(SimpleBSONObjComparator::kInstance.evaluate(it->second->getMin() == + last->second->getMax())); } + + return true; + +#undef ENSURE } } // namespace ChunkManager::ChunkManager(NamespaceString nss, - KeyPattern shardKeyPattern, + const OID& epoch, + const ShardKeyPattern& shardKeyPattern, std::unique_ptr<CollatorInterface> defaultCollator, - bool unique, - ChunkMap chunkMap, - ChunkVersion collectionVersion) + bool unique) : _sequenceNumber(nextCMSequenceNumber.addAndFetch(1)), _nss(std::move(nss)), - _shardKeyPattern(shardKeyPattern), + _keyPattern(shardKeyPattern.getKeyPattern()), _defaultCollator(std::move(defaultCollator)), _unique(unique), - _chunkMap(std::move(chunkMap)), - _chunkMapViews(_constructChunkMapViews(collectionVersion.epoch(), _chunkMap)), - _collectionVersion(collectionVersion) {} + _chunkMap(SimpleBSONObjComparator::kInstance.makeBSONObjIndexedMap<std::shared_ptr<Chunk>>()), + _chunkRangeMap( + SimpleBSONObjComparator::kInstance.makeBSONObjIndexedMap<ShardAndChunkRange>()), + _version(0, 0, epoch) {} ChunkManager::~ChunkManager() = default; +void ChunkManager::loadExistingRanges(OperationContext* opCtx, const ChunkManager* oldManager) { + invariant(!_version.isSet()); + + int tries = 3; + + while (tries--) { + ChunkMap chunkMap = + SimpleBSONObjComparator::kInstance.makeBSONObjIndexedMap<std::shared_ptr<Chunk>>(); + set<ShardId> shardIds; + ShardVersionMap shardVersions; + + Timer t; + + log() << "ChunkManager loading chunks for " << _nss + << " sequenceNumber: " << _sequenceNumber + << " based on: " << (oldManager ? oldManager->getVersion().toString() : "(empty)"); + + if (_load(opCtx, chunkMap, shardIds, &shardVersions, oldManager)) { + // TODO: Merge into diff code above, so we validate in one place + if (isChunkMapValid(chunkMap)) { + _chunkMap = std::move(chunkMap); + _shardVersions = std::move(shardVersions); + _chunkRangeMap = _constructRanges(_chunkMap); + + log() << "ChunkManager load took " << t.millis() << " ms and found version " + << _version; + + return; + } + } + + warning() << "ChunkManager load failed after " << t.millis() + << " ms and will be retried up to " << tries << " more times"; + + sleepmillis(10 * (3 - tries)); + } + + // This will abort construction so we should never have a reference to an invalid config + msgasserted(13282, + str::stream() << "Couldn't load a valid config for " << _nss.ns() + << " after 3 attempts. Please try again."); +} + +bool ChunkManager::_load(OperationContext* opCtx, + ChunkMap& chunkMap, + set<ShardId>& shardIds, + ShardVersionMap* shardVersions, + const ChunkManager* oldManager) { + // Reset the max version, but not the epoch, when we aren't loading from the oldManager + _version = ChunkVersion(0, 0, _version.epoch()); + + // If we have a previous version of the ChunkManager to work from, use that info to reduce + // our config query + if (oldManager && oldManager->getVersion().isSet()) { + // Get the old max version + _version = oldManager->getVersion(); + + // Load a copy of the old versions + *shardVersions = oldManager->_shardVersions; + + // Load a copy of the chunk map, replacing the chunk manager with our own + const ChunkMap& oldChunkMap = oldManager->getChunkMap(); + + for (const auto& oldChunkMapEntry : oldChunkMap) { + const auto& oldC = oldChunkMapEntry.second; + chunkMap.emplace(oldC->getMax(), std::make_shared<Chunk>(*oldC)); + } + + LOG(2) << "loading chunk manager for collection " << _nss + << " using old chunk manager w/ version " << _version.toString() << " and " + << oldChunkMap.size() << " chunks"; + } + + // Get the diff query required + const auto diffQuery = CMConfigDiffTracker::createConfigDiffQuery(_nss, _version); + + // Attach a diff tracker for the versioned chunk data + CMConfigDiffTracker differ(_nss.ns(), &chunkMap, &_version, shardVersions, this); + + // Diff tracker should *always* find at least one chunk if collection exists + repl::OpTime opTime; + std::vector<ChunkType> chunks; + uassertStatusOK(Grid::get(opCtx)->catalogClient(opCtx)->getChunks( + opCtx, + diffQuery.query, + diffQuery.sort, + boost::none, + &chunks, + &opTime, + repl::ReadConcernLevel::kMajorityReadConcern)); + + invariant(opTime >= _configOpTime); + _configOpTime = opTime; + + int diffsApplied = differ.calculateConfigDiff(opCtx, chunks); + if (diffsApplied > 0) { + LOG(2) << "loaded " << diffsApplied << " chunks into new chunk manager for " << _nss + << " with version " << _version; + + // Add all existing shards we find to the shards set + for (ShardVersionMap::iterator it = shardVersions->begin(); it != shardVersions->end();) { + auto shardStatus = Grid::get(opCtx)->shardRegistry()->getShard(opCtx, it->first); + if (shardStatus.isOK()) { + shardIds.insert(it->first); + ++it; + } else { + invariant(shardStatus == ErrorCodes::ShardNotFound); + shardVersions->erase(it++); + } + } + + _configOpTime = opTime; + + return true; + } else if (diffsApplied == 0) { + // No chunks were found for the ns + warning() << "no chunks found when reloading " << _nss << ", previous version was " + << _version; + + // Set all our data to empty + chunkMap.clear(); + shardVersions->clear(); + + _version = ChunkVersion(0, 0, OID()); + _configOpTime = opTime; + + return true; + } else { // diffsApplied < 0 + + bool allInconsistent = (differ.numValidDiffs() == 0); + if (allInconsistent) { + // All versions are different, this can be normal + warning() << "major change in chunk information found when reloading " << _nss + << ", previous version was " << _version; + } else { + // Inconsistent load halfway through (due to yielding cursor during load) + // should be rare + warning() << "inconsistent chunks found when reloading " << _nss + << ", previous version was " << _version << ", this should be rare"; + } + + // Set all our data to empty to be extra safe + chunkMap.clear(); + shardVersions->clear(); + + _version = ChunkVersion(0, 0, OID()); + + return allInconsistent; + } +} + std::shared_ptr<Chunk> ChunkManager::findIntersectingChunk(const BSONObj& shardKey, const BSONObj& collation) const { const bool hasSimpleCollation = (collation.isEmpty() && !_defaultCollator) || @@ -105,7 +351,7 @@ std::shared_ptr<Chunk> ChunkManager::findIntersectingChunkWithSimpleCollation( void ChunkManager::getShardIdsForQuery(OperationContext* opCtx, const BSONObj& query, const BSONObj& collation, - std::set<ShardId>* shardIds) const { + set<ShardId>* shardIds) const { auto qr = stdx::make_unique<QueryRequest>(_nss); qr->setFilter(query); @@ -124,7 +370,7 @@ void ChunkManager::getShardIdsForQuery(OperationContext* opCtx, } // Fast path for targeting equalities on the shard key. - auto shardKeyToFind = _shardKeyPattern.extractShardKeyFromQuery(*cq); + auto shardKeyToFind = _keyPattern.extractShardKeyFromQuery(*cq); if (!shardKeyToFind.isEmpty()) { try { auto chunk = findIntersectingChunk(shardKeyToFind, collation); @@ -141,20 +387,20 @@ void ChunkManager::getShardIdsForQuery(OperationContext* opCtx, // Query { a : { $gte : 1, $lt : 2 }, // b : { $gte : 3, $lt : 4 } } // => Bounds { a : [1, 2), b : [3, 4) } - IndexBounds bounds = getIndexBoundsForQuery(_shardKeyPattern.toBSON(), *cq); + IndexBounds bounds = getIndexBoundsForQuery(_keyPattern.toBSON(), *cq); // Transforms bounds for each shard key field into full shard key ranges // for example : // Key { a : 1, b : 1 } // Bounds { a : [1, 2), b : [3, 4) } // => Ranges { a : 1, b : 3 } => { a : 2, b : 4 } - BoundList ranges = _shardKeyPattern.flattenBounds(bounds); + BoundList ranges = _keyPattern.flattenBounds(bounds); for (BoundList::const_iterator it = ranges.begin(); it != ranges.end(); ++it) { getShardIdsForRange(it->first /*min*/, it->second /*max*/, shardIds); // once we know we need to visit all shards no need to keep looping - if (shardIds->size() == _chunkMapViews.shardVersions.size()) { + if (shardIds->size() == _shardVersions.size()) { break; } } @@ -163,38 +409,38 @@ void ChunkManager::getShardIdsForQuery(OperationContext* opCtx, // For now, we satisfy that assumption by adding a shard with no matches rather than returning // an empty set of shards. if (shardIds->empty()) { - shardIds->insert(_chunkMapViews.chunkRangeMap.begin()->second.shardId); + shardIds->insert(_chunkRangeMap.begin()->second.getShardId()); } } void ChunkManager::getShardIdsForRange(const BSONObj& min, const BSONObj& max, std::set<ShardId>* shardIds) const { - auto it = _chunkMapViews.chunkRangeMap.upper_bound(min); - auto end = _chunkMapViews.chunkRangeMap.upper_bound(max); + auto it = _chunkRangeMap.upper_bound(min); + auto end = _chunkRangeMap.upper_bound(max); // The chunk range map must always cover the entire key space - invariant(it != _chunkMapViews.chunkRangeMap.end()); + invariant(it != _chunkRangeMap.end()); // We need to include the last chunk - if (end != _chunkMapViews.chunkRangeMap.cend()) { + if (end != _chunkRangeMap.cend()) { ++end; } for (; it != end; ++it) { - shardIds->insert(it->second.shardId); + shardIds->insert(it->second.getShardId()); // No need to iterate through the rest of the ranges, because we already know we need to use // all shards. - if (shardIds->size() == _chunkMapViews.shardVersions.size()) { + if (shardIds->size() == _shardVersions.size()) { break; } } } -void ChunkManager::getAllShardIds(std::set<ShardId>* all) const { - std::transform(_chunkMapViews.shardVersions.begin(), - _chunkMapViews.shardVersions.end(), +void ChunkManager::getAllShardIds(set<ShardId>* all) const { + std::transform(_shardVersions.begin(), + _shardVersions.end(), std::inserter(*all, all->begin()), [](const ShardVersionMap::value_type& pair) { return pair.first; }); } @@ -211,7 +457,7 @@ IndexBounds ChunkManager::getIndexBoundsForQuery(const BSONObj& key, } // Consider shard key as an index - std::string accessMethod = IndexNames::findPluginName(key); + string accessMethod = IndexNames::findPluginName(key); dassert(accessMethod == IndexNames::BTREE || accessMethod == IndexNames::HASHED); // Use query framework to generate index bounds @@ -318,19 +564,19 @@ bool ChunkManager::compatibleWith(const ChunkManager& other, const ShardId& shar } ChunkVersion ChunkManager::getVersion(const ShardId& shardName) const { - auto it = _chunkMapViews.shardVersions.find(shardName); - if (it == _chunkMapViews.shardVersions.end()) { + auto it = _shardVersions.find(shardName); + if (it == _shardVersions.end()) { // Shards without explicitly tracked shard versions (meaning they have no chunks) always // have a version of (0, 0, epoch) - return ChunkVersion(0, 0, _collectionVersion.epoch()); + return ChunkVersion(0, 0, _version.epoch()); } return it->second; } -std::string ChunkManager::toString() const { +string ChunkManager::toString() const { StringBuilder sb; - sb << "ChunkManager: " << _nss.ns() << " key:" << _shardKeyPattern.toString() << '\n'; + sb << "ChunkManager: " << _nss.ns() << " key:" << _keyPattern.toString() << '\n'; for (const auto& entry : _chunkMap) { sb << "\t" << entry.second->toString() << '\n'; @@ -339,82 +585,47 @@ std::string ChunkManager::toString() const { return sb.str(); } -ChunkManager::ChunkMapViews ChunkManager::_constructChunkMapViews(const OID& epoch, - const ChunkMap& chunkMap) { - invariant(!chunkMap.empty()); - +ChunkManager::ChunkRangeMap ChunkManager::_constructRanges(const ChunkMap& chunkMap) { ChunkRangeMap chunkRangeMap = SimpleBSONObjComparator::kInstance.makeBSONObjIndexedMap<ShardAndChunkRange>(); - ShardVersionMap shardVersions; + if (chunkMap.empty()) { + return chunkRangeMap; + } ChunkMap::const_iterator current = chunkMap.cbegin(); while (current != chunkMap.cend()) { - const auto& firstChunkInRange = current->second; - - // Tracks the max shard version for the shard on which the current range will reside - auto shardVersionIt = shardVersions.find(firstChunkInRange->getShardId()); - if (shardVersionIt == shardVersions.end()) { - shardVersionIt = - shardVersions.emplace(firstChunkInRange->getShardId(), ChunkVersion(0, 0, epoch)) - .first; - } - - auto& maxShardVersion = shardVersionIt->second; - + const auto rangeFirst = current; current = std::find_if( - current, - chunkMap.cend(), - [&firstChunkInRange, &maxShardVersion](const ChunkMap::value_type& chunkMapEntry) { - const auto& currentChunk = chunkMapEntry.second; - - if (currentChunk->getShardId() != firstChunkInRange->getShardId()) - return true; - - if (currentChunk->getLastmod() > maxShardVersion) - maxShardVersion = currentChunk->getLastmod(); - - return false; + current, chunkMap.cend(), [&rangeFirst](const ChunkMap::value_type& chunkMapEntry) { + return chunkMapEntry.second->getShardId() != rangeFirst->second->getShardId(); }); - const auto rangeLast = std::prev(current); - const BSONObj rangeMin = firstChunkInRange->getMin(); + const BSONObj rangeMin = rangeFirst->second->getMin(); const BSONObj rangeMax = rangeLast->second->getMax(); - const auto insertResult = chunkRangeMap.insert(std::make_pair( - rangeMax, ShardAndChunkRange{{rangeMin, rangeMax}, firstChunkInRange->getShardId()})); - uassert(ErrorCodes::ConflictingOperationInProgress, - str::stream() << "Metadata contains two chunks with the same max value " - << rangeMax, - insertResult.second); - - const auto& insertIterator = insertResult.first; - - if (insertIterator != chunkRangeMap.begin()) { + auto insertResult = chunkRangeMap.insert(std::make_pair( + rangeMax, ShardAndChunkRange(rangeMin, rangeMax, rangeFirst->second->getShardId()))); + invariant(insertResult.second); + if (insertResult.first != chunkRangeMap.begin()) { // Make sure there are no gaps in the ranges - uassert(ErrorCodes::ConflictingOperationInProgress, - str::stream() << "Gap or an overlap between ranges " - << insertIterator->second.range.toString() - << " and " - << std::prev(insertIterator)->second.range.toString(), - SimpleBSONObjComparator::kInstance.evaluate(std::prev(insertIterator)->first == - rangeMin)); + insertResult.first--; + invariant( + SimpleBSONObjComparator::kInstance.evaluate(insertResult.first->first == rangeMin)); } - - // If a shard has chunks it must have a shard version, otherwise we have an invalid chunk - // somewhere, which should have been caught at chunk load time - invariant(maxShardVersion.isSet()); } invariant(!chunkRangeMap.empty()); - invariant(!shardVersions.empty()); + invariant(allOfType(MinKey, chunkRangeMap.begin()->second.getMin())); + invariant(allOfType(MaxKey, chunkRangeMap.rbegin()->first)); - checkAllElementsAreOfType(MinKey, chunkRangeMap.begin()->second.min()); - checkAllElementsAreOfType(MaxKey, chunkRangeMap.rbegin()->first); + return chunkRangeMap; +} - return {std::move(chunkRangeMap), std::move(shardVersions)}; +repl::OpTime ChunkManager::getConfigOpTime() const { + return _configOpTime; } } // namespace mongo diff --git a/src/mongo/s/chunk_manager.h b/src/mongo/s/chunk_manager.h index f1edeefc668..365d4d5df62 100644 --- a/src/mongo/s/chunk_manager.h +++ b/src/mongo/s/chunk_manager.h @@ -35,6 +35,8 @@ #include "mongo/base/disallow_copying.h" #include "mongo/db/namespace_string.h" #include "mongo/db/query/collation/collator_interface.h" +#include "mongo/db/repl/optime.h" +#include "mongo/s/catalog/type_chunk.h" #include "mongo/s/chunk.h" #include "mongo/s/chunk_version.h" #include "mongo/s/client/shard.h" @@ -58,11 +60,10 @@ class ChunkManager { public: ChunkManager(NamespaceString nss, - KeyPattern shardKeyPattern, + const OID& epoch, + const ShardKeyPattern& shardKeyPattern, std::unique_ptr<CollatorInterface> defaultCollator, - bool unique, - ChunkMap chunkMap, - ChunkVersion collectionVersion); + bool unique); ~ChunkManager(); @@ -78,7 +79,7 @@ public: } const ShardKeyPattern& getShardKeyPattern() const { - return _shardKeyPattern; + return _keyPattern; } const CollatorInterface* getDefaultCollator() const { @@ -90,12 +91,10 @@ public: } ChunkVersion getVersion() const { - return _collectionVersion; + return _version; } - ChunkVersion getVersion(const ShardId& shardId) const; - - const ChunkMap& chunkMap() const { + const ChunkMap& getChunkMap() const { return _chunkMap; } @@ -103,9 +102,12 @@ public: return _chunkMap.size(); } - const ShardVersionMap& shardVersions() const { - return _chunkMapViews.shardVersions; - } + // Loads existing ranges based on info in chunk manager + void loadExistingRanges(OperationContext* opCtx, const ChunkManager* oldManager); + + // + // Methods to use once loaded / created + // /** * Given a shard key (or a prefix) that has been extracted from a document, returns the chunk @@ -175,46 +177,57 @@ public: std::string toString() const; -private: - friend class CollectionRoutingDataLoader; + ChunkVersion getVersion(const ShardId& shardName) const; + + /** + * Returns the opTime of config server the last time chunks were loaded. + */ + repl::OpTime getConfigOpTime() const; +private: /** * Represents a range of chunk keys [getMin(), getMax()) and the id of the shard on which they * reside according to the metadata. */ - struct ShardAndChunkRange { - const BSONObj& min() const { - return range.getMin(); + class ShardAndChunkRange { + public: + ShardAndChunkRange(const BSONObj& min, const BSONObj& max, ShardId inShardId) + : _range(min, max), _shardId(std::move(inShardId)) {} + + const BSONObj& getMin() const { + return _range.getMin(); + } + + const BSONObj& getMax() const { + return _range.getMax(); } - const BSONObj& max() const { - return range.getMax(); + const ShardId& getShardId() const { + return _shardId; } - ChunkRange range; - ShardId shardId; + private: + ChunkRange _range; + ShardId _shardId; }; using ChunkRangeMap = BSONObjIndexedMap<ShardAndChunkRange>; /** - * Contains different transformations of the chunk map for efficient querying + * If load was successful, returns true and it is guaranteed that the _chunkMap and + * _chunkRangeMap are consistent with each other. If false is returned, it is not safe to use + * the chunk manager anymore. */ - struct ChunkMapViews { - // Transformation of the chunk map containing what range of keys reside on which shard. The - // index is the max key of the respective range and the union of all ranges in a such - // constructed map must cover the complete space from [MinKey, MaxKey). - const ChunkRangeMap chunkRangeMap; - - // Map from shard id to the maximum chunk version for that shard. If a shard contains no - // chunks, it won't be present in this map. - const ShardVersionMap shardVersions; - }; + bool _load(OperationContext* opCtx, + ChunkMap& chunks, + std::set<ShardId>& shardIds, + ShardVersionMap* shardVersions, + const ChunkManager* oldManager); /** - * Does a single pass over the chunkMap and constructs the ChunkMapViews object. + * Merges consecutive chunks, which reside on the same shard into a single range. */ - static ChunkMapViews _constructChunkMapViews(const OID& epoch, const ChunkMap& chunkMap); + static ChunkRangeMap _constructRanges(const ChunkMap& chunkMap); // The shard versioning mechanism hinges on keeping track of the number of times we reload // ChunkManagers. @@ -224,7 +237,7 @@ private: const NamespaceString _nss; // The key pattern used to shard the collection - const ShardKeyPattern _shardKeyPattern; + const ShardKeyPattern _keyPattern; // Default collation to use for routing data queries for this collection const std::unique_ptr<CollatorInterface> _defaultCollator; @@ -234,15 +247,23 @@ private: // Map from the max for each chunk to an entry describing the chunk. The union of all chunks' // ranges must cover the complete space from [MinKey, MaxKey). - const ChunkMap _chunkMap; + ChunkMap _chunkMap; + + // Transformation of the chunk map containing what range of keys reside on which shard. The + // index is the max key of the respective range and the union of all ranges in a such + // constructed map must cover the complete space from [MinKey, MaxKey). + ChunkRangeMap _chunkRangeMap; - // Different transformations of the chunk map for efficient querying - const ChunkMapViews _chunkMapViews; + // Max known version per shard + ShardVersionMap _shardVersions; // Max version across all chunks - const ChunkVersion _collectionVersion; + ChunkVersion _version; - // Auto-split throttling state (state mutable by write commands) + // OpTime of config server the last time chunks were loaded. + repl::OpTime _configOpTime; + + // Auto-split throttling state struct AutoSplitThrottle { public: AutoSplitThrottle() : _splitTickets(maxParallelSplits) {} @@ -259,6 +280,8 @@ private: ChunkManager*, Chunk*, long); + + friend class TestableChunkManager; }; } // namespace mongo diff --git a/src/mongo/s/chunk_manager_refresh_test.cpp b/src/mongo/s/chunk_manager_refresh_test.cpp deleted file mode 100644 index 504893acf3c..00000000000 --- a/src/mongo/s/chunk_manager_refresh_test.cpp +++ /dev/null @@ -1,194 +0,0 @@ -/** - * Copyright (C) 2017 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault - -#include "mongo/platform/basic.h" - -#include <set> - -#include "mongo/s/catalog/type_chunk.h" -#include "mongo/s/catalog/type_collection.h" -#include "mongo/s/catalog/type_shard.h" -#include "mongo/s/catalog_cache.h" -#include "mongo/s/chunk_manager_test_fixture.h" -#include "mongo/util/scopeguard.h" - -namespace mongo { -namespace { - -using ChunkManagerLoadTest = ChunkManagerTestFixture; - -TEST_F(ChunkManagerLoadTest, IncrementalLoadAfterSplit) { - const ShardKeyPattern shardKeyPattern(BSON("_id" << 1)); - - auto initialRoutingInfo(makeChunkManager(shardKeyPattern, nullptr, true, {})); - ASSERT_EQ(1, initialRoutingInfo->numChunks()); - - auto future = launchAsync([&] { - auto client = serviceContext()->makeClient("Test"); - auto opCtx = client->makeOperationContext(); - return CatalogCache::refreshCollectionRoutingInfo(opCtx.get(), kNss, initialRoutingInfo); - }); - - ChunkVersion version = initialRoutingInfo->getVersion(); - - expectFindOnConfigSendBSONObjVector([&]() { - CollectionType collType; - collType.setNs(kNss); - collType.setEpoch(version.epoch()); - collType.setKeyPattern(shardKeyPattern.toBSON()); - collType.setUnique(false); - - return std::vector<BSONObj>{collType.toBSON()}; - }()); - - // Return set of chunks, which represent a split - expectFindOnConfigSendBSONObjVector([&]() { - version.incMajor(); - ChunkType chunk1( - kNss, {shardKeyPattern.getKeyPattern().globalMin(), BSON("_id" << 0)}, version, {"0"}); - - version.incMinor(); - ChunkType chunk2( - kNss, {BSON("_id" << 0), shardKeyPattern.getKeyPattern().globalMax()}, version, {"0"}); - - return std::vector<BSONObj>{chunk1.toConfigBSON(), chunk2.toConfigBSON()}; - }()); - - auto newRoutingInfo(future.timed_get(kFutureTimeout)); - ASSERT_EQ(2, newRoutingInfo->numChunks()); - ASSERT_EQ(version, newRoutingInfo->getVersion()); - ASSERT_EQ(version, newRoutingInfo->getVersion({"0"})); - ASSERT_EQ(ChunkVersion(0, 0, version.epoch()), newRoutingInfo->getVersion({"1"})); -} - -TEST_F(ChunkManagerLoadTest, IncrementalLoadAfterMove) { - const ShardKeyPattern shardKeyPattern(BSON("_id" << 1)); - - auto initialRoutingInfo(makeChunkManager(shardKeyPattern, nullptr, true, {BSON("_id" << 0)})); - ASSERT_EQ(2, initialRoutingInfo->numChunks()); - - auto future = launchAsync([&] { - auto client = serviceContext()->makeClient("Test"); - auto opCtx = client->makeOperationContext(); - return CatalogCache::refreshCollectionRoutingInfo(opCtx.get(), kNss, initialRoutingInfo); - }); - - ChunkVersion version = initialRoutingInfo->getVersion(); - - expectFindOnConfigSendBSONObjVector([&]() { - CollectionType collType; - collType.setNs(kNss); - collType.setEpoch(version.epoch()); - collType.setKeyPattern(shardKeyPattern.toBSON()); - collType.setUnique(false); - - return std::vector<BSONObj>{collType.toBSON()}; - }()); - - ChunkVersion expectedDestShardVersion; - - // Return set of chunks, which represent a move - expectFindOnConfigSendBSONObjVector([&]() { - version.incMajor(); - expectedDestShardVersion = version; - ChunkType chunk1( - kNss, {shardKeyPattern.getKeyPattern().globalMin(), BSON("_id" << 0)}, version, {"1"}); - - version.incMinor(); - ChunkType chunk2( - kNss, {BSON("_id" << 0), shardKeyPattern.getKeyPattern().globalMax()}, version, {"0"}); - - return std::vector<BSONObj>{chunk1.toConfigBSON(), chunk2.toConfigBSON()}; - }()); - - auto newRoutingInfo(future.timed_get(kFutureTimeout)); - ASSERT_EQ(2, newRoutingInfo->numChunks()); - ASSERT_EQ(version, newRoutingInfo->getVersion()); - ASSERT_EQ(version, newRoutingInfo->getVersion({"0"})); - ASSERT_EQ(expectedDestShardVersion, newRoutingInfo->getVersion({"1"})); -} - -TEST_F(ChunkManagerLoadTest, IncrementalLoadAfterMoveLastChunk) { - const ShardKeyPattern shardKeyPattern(BSON("_id" << 1)); - - auto initialRoutingInfo(makeChunkManager(shardKeyPattern, nullptr, true, {})); - ASSERT_EQ(1, initialRoutingInfo->numChunks()); - - auto future = launchAsync([&] { - auto client = serviceContext()->makeClient("Test"); - auto opCtx = client->makeOperationContext(); - return CatalogCache::refreshCollectionRoutingInfo(opCtx.get(), kNss, initialRoutingInfo); - }); - - ChunkVersion version = initialRoutingInfo->getVersion(); - - expectFindOnConfigSendBSONObjVector([&]() { - CollectionType collType; - collType.setNs(kNss); - collType.setEpoch(version.epoch()); - collType.setKeyPattern(shardKeyPattern.toBSON()); - collType.setUnique(false); - - return std::vector<BSONObj>{collType.toBSON()}; - }()); - - // Return set of chunks, which represent a move - expectFindOnConfigSendBSONObjVector([&]() { - version.incMajor(); - ChunkType chunk1(kNss, - {shardKeyPattern.getKeyPattern().globalMin(), - shardKeyPattern.getKeyPattern().globalMax()}, - version, - {"1"}); - - return std::vector<BSONObj>{chunk1.toConfigBSON()}; - }()); - - expectFindOnConfigSendBSONObjVector([&]() { - ShardType shard1; - shard1.setName("0"); - shard1.setHost(str::stream() << "Host0:12345"); - - ShardType shard2; - shard2.setName("1"); - shard2.setHost(str::stream() << "Host1:12345"); - - return std::vector<BSONObj>{shard1.toBSON(), shard2.toBSON()}; - }()); - - auto newRoutingInfo(future.timed_get(kFutureTimeout)); - ASSERT_EQ(1, newRoutingInfo->numChunks()); - ASSERT_EQ(version, newRoutingInfo->getVersion()); - ASSERT_EQ(ChunkVersion(0, 0, version.epoch()), newRoutingInfo->getVersion({"0"})); - ASSERT_EQ(version, newRoutingInfo->getVersion({"1"})); -} - -} // namespace -} // namespace mongo diff --git a/src/mongo/s/chunk_manager_query_test.cpp b/src/mongo/s/chunk_manager_test.cpp index 013c4618d6c..08f8357b776 100644 --- a/src/mongo/s/chunk_manager_query_test.cpp +++ b/src/mongo/s/chunk_manager_test.cpp @@ -32,13 +32,143 @@ #include <set> +#include "mongo/client/remote_command_targeter_mock.h" +#include "mongo/db/client.h" #include "mongo/db/query/collation/collator_interface_mock.h" +#include "mongo/s/catalog/sharding_catalog_test_fixture.h" +#include "mongo/s/catalog/type_chunk.h" +#include "mongo/s/catalog/type_collection.h" +#include "mongo/s/catalog/type_shard.h" #include "mongo/s/chunk_manager.h" -#include "mongo/s/chunk_manager_test_fixture.h" +#include "mongo/stdx/memory.h" +#include "mongo/util/scopeguard.h" namespace mongo { namespace { +using executor::RemoteCommandResponse; +using executor::RemoteCommandRequest; + +const NamespaceString kNss("TestDB", "TestColl"); + +class ChunkManagerTestFixture : public ShardingCatalogTestFixture { +protected: + void setUp() override { + ShardingCatalogTestFixture::setUp(); + setRemote(HostAndPort("FakeRemoteClient:34567")); + configTargeter()->setFindHostReturnValue(HostAndPort{CONFIG_HOST_PORT}); + } + + /** + * Returns a chunk manager with chunks at the specified split points. Each individual chunk is + * placed on a separate shard with id ranging from "0" to the number of chunks. + */ + std::unique_ptr<ChunkManager> makeChunkManager( + const ShardKeyPattern& shardKeyPattern, + std::unique_ptr<CollatorInterface> defaultCollator, + bool unique, + const std::vector<BSONObj>& splitPoints) { + ChunkVersion version(1, 0, OID::gen()); + + std::vector<BSONObj> shards; + std::vector<BSONObj> initialChunks; + + auto splitPointsIncludingEnds(splitPoints); + splitPointsIncludingEnds.insert(splitPointsIncludingEnds.begin(), + shardKeyPattern.getKeyPattern().globalMin()); + splitPointsIncludingEnds.push_back(shardKeyPattern.getKeyPattern().globalMax()); + + for (size_t i = 1; i < splitPointsIncludingEnds.size(); ++i) { + ShardType shard; + shard.setName(str::stream() << (i - 1)); + shard.setHost(str::stream() << "Host" << (i - 1) << ":12345"); + + shards.push_back(shard.toBSON()); + + ChunkType chunk; + chunk.setNS(kNss.ns()); + chunk.setMin(shardKeyPattern.getKeyPattern().extendRangeBound( + splitPointsIncludingEnds[i - 1], false)); + chunk.setMax(shardKeyPattern.getKeyPattern().extendRangeBound( + splitPointsIncludingEnds[i], false)); + chunk.setShard(shard.getName()); + chunk.setVersion(version); + + initialChunks.push_back(chunk.toConfigBSON()); + + version.incMajor(); + } + + // Load the initial manager + auto manager = stdx::make_unique<ChunkManager>( + kNss, version.epoch(), shardKeyPattern, std::move(defaultCollator), unique); + + auto future = launchAsync([&manager] { + ON_BLOCK_EXIT([&] { Client::destroy(); }); + Client::initThread("Test"); + auto opCtx = cc().makeOperationContext(); + manager->loadExistingRanges(opCtx.get(), nullptr); + }); + + expectFindOnConfigSendBSONObjVector(initialChunks); + expectFindOnConfigSendBSONObjVector(shards); + + future.timed_get(kFutureTimeout); + + return manager; + } +}; + +using ChunkManagerLoadTest = ChunkManagerTestFixture; + +TEST_F(ChunkManagerLoadTest, IncrementalLoadAfterSplit) { + const ShardKeyPattern shardKeyPattern(BSON("_id" << 1)); + + auto initialManager(makeChunkManager(shardKeyPattern, nullptr, true, {})); + + ChunkVersion version = initialManager->getVersion(); + + CollectionType collType; + collType.setNs(kNss); + collType.setEpoch(version.epoch()); + collType.setUpdatedAt(jsTime()); + collType.setKeyPattern(shardKeyPattern.toBSON()); + collType.setUnique(false); + + ChunkManager manager(kNss, version.epoch(), shardKeyPattern, nullptr, true); + + auto future = + launchAsync([&] { manager.loadExistingRanges(operationContext(), initialManager.get()); }); + + // Return set of chunks, which represent a split + expectFindOnConfigSendBSONObjVector([&]() { + version.incMajor(); + + ChunkType chunk1; + chunk1.setNS(kNss.ns()); + chunk1.setMin(shardKeyPattern.getKeyPattern().globalMin()); + chunk1.setMax(BSON("_id" << 0)); + chunk1.setShard({"0"}); + chunk1.setVersion(version); + + version.incMinor(); + + ChunkType chunk2; + chunk2.setNS(kNss.ns()); + chunk2.setMin(BSON("_id" << 0)); + chunk2.setMax(shardKeyPattern.getKeyPattern().globalMax()); + chunk2.setShard({"0"}); + chunk2.setVersion(version); + + return std::vector<BSONObj>{chunk1.toConfigBSON(), chunk2.toConfigBSON()}; + }()); + + future.timed_get(kFutureTimeout); +} + +/** + * Fixture to be used as a shortcut for tests which exercise the getShardIdsForQuery routing logic + */ class ChunkManagerQueryTest : public ChunkManagerTestFixture { protected: void runQueryTest(const BSONObj& shardKey, diff --git a/src/mongo/s/chunk_manager_test_fixture.cpp b/src/mongo/s/chunk_manager_test_fixture.cpp deleted file mode 100644 index ada08673d70..00000000000 --- a/src/mongo/s/chunk_manager_test_fixture.cpp +++ /dev/null @@ -1,122 +0,0 @@ -/** - * Copyright (C) 2017 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault - -#include "mongo/platform/basic.h" - -#include <set> -#include <vector> - -#include "mongo/s/chunk_manager_test_fixture.h" - -#include "mongo/client/remote_command_targeter_mock.h" -#include "mongo/db/client.h" -#include "mongo/db/query/collation/collator_factory_mock.h" -#include "mongo/s/catalog/type_chunk.h" -#include "mongo/s/catalog/type_collection.h" -#include "mongo/s/catalog/type_shard.h" -#include "mongo/s/catalog_cache.h" -#include "mongo/stdx/memory.h" -#include "mongo/util/scopeguard.h" - -namespace mongo { - -const NamespaceString ChunkManagerTestFixture::kNss("TestDB", "TestColl"); - -void ChunkManagerTestFixture::setUp() { - ShardingCatalogTestFixture::setUp(); - setRemote(HostAndPort("FakeRemoteClient:34567")); - configTargeter()->setFindHostReturnValue(HostAndPort{CONFIG_HOST_PORT}); - - CollatorFactoryInterface::set(serviceContext(), stdx::make_unique<CollatorFactoryMock>()); -} - -std::shared_ptr<ChunkManager> ChunkManagerTestFixture::makeChunkManager( - const ShardKeyPattern& shardKeyPattern, - std::unique_ptr<CollatorInterface> defaultCollator, - bool unique, - const std::vector<BSONObj>& splitPoints) { - ChunkVersion version(1, 0, OID::gen()); - - const BSONObj collectionBSON = [&]() { - CollectionType coll; - coll.setNs(kNss); - coll.setEpoch(version.epoch()); - coll.setKeyPattern(shardKeyPattern.getKeyPattern()); - coll.setUnique(unique); - - if (defaultCollator) { - coll.setDefaultCollation(defaultCollator->getSpec().toBSON()); - } - - return coll.toBSON(); - }(); - - std::vector<BSONObj> shards; - std::vector<BSONObj> initialChunks; - - auto splitPointsIncludingEnds(splitPoints); - splitPointsIncludingEnds.insert(splitPointsIncludingEnds.begin(), - shardKeyPattern.getKeyPattern().globalMin()); - splitPointsIncludingEnds.push_back(shardKeyPattern.getKeyPattern().globalMax()); - - for (size_t i = 1; i < splitPointsIncludingEnds.size(); ++i) { - ShardType shard; - shard.setName(str::stream() << (i - 1)); - shard.setHost(str::stream() << "Host" << (i - 1) << ":12345"); - - shards.push_back(shard.toBSON()); - - ChunkType chunk( - kNss, - {shardKeyPattern.getKeyPattern().extendRangeBound(splitPointsIncludingEnds[i - 1], - false), - shardKeyPattern.getKeyPattern().extendRangeBound(splitPointsIncludingEnds[i], false)}, - version, - shard.getName()); - - initialChunks.push_back(chunk.toConfigBSON()); - - version.incMajor(); - } - - auto future = launchAsync([&] { - auto client = serviceContext()->makeClient("Test"); - auto opCtx = client->makeOperationContext(); - return CatalogCache::refreshCollectionRoutingInfo(opCtx.get(), kNss, nullptr); - }); - - expectFindOnConfigSendBSONObjVector({collectionBSON}); - expectFindOnConfigSendBSONObjVector(initialChunks); - expectFindOnConfigSendBSONObjVector(shards); - - return future.timed_get(kFutureTimeout); -} - -} // namespace mongo diff --git a/src/mongo/s/chunk_manager_test_fixture.h b/src/mongo/s/chunk_manager_test_fixture.h deleted file mode 100644 index aaa059dd49d..00000000000 --- a/src/mongo/s/chunk_manager_test_fixture.h +++ /dev/null @@ -1,62 +0,0 @@ -/** - * Copyright (C) 2017 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include <vector> - -#include "mongo/db/namespace_string.h" -#include "mongo/s/catalog/sharding_catalog_test_fixture.h" -#include "mongo/stdx/memory.h" - -namespace mongo { - -class BSONObj; -class ChunkManager; -class CollatorInterface; -class ShardKeyPattern; - -class ChunkManagerTestFixture : public ShardingCatalogTestFixture { -protected: - void setUp() override; - - /** - * Returns a chunk manager with chunks at the specified split points. Each individual chunk is - * placed on a separate shard with shard id being a single number ranging from "0" to the number - * of chunks. - */ - std::shared_ptr<ChunkManager> makeChunkManager( - const ShardKeyPattern& shardKeyPattern, - std::unique_ptr<CollatorInterface> defaultCollator, - bool unique, - const std::vector<BSONObj>& splitPoints); - - static const NamespaceString kNss; -}; - -} // namespace mongo diff --git a/src/mongo/s/client/parallel.cpp b/src/mongo/s/client/parallel.cpp index 9e2aaaaf4e5..89d2836dc92 100644 --- a/src/mongo/s/client/parallel.cpp +++ b/src/mongo/s/client/parallel.cpp @@ -39,9 +39,11 @@ #include "mongo/db/bson/dotted_path_support.h" #include "mongo/db/query/query_request.h" #include "mongo/s/catalog_cache.h" +#include "mongo/s/chunk_manager.h" #include "mongo/s/client/shard_connection.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" +#include "mongo/s/sharding_raii.h" #include "mongo/s/stale_exception.h" #include "mongo/util/log.h" #include "mongo/util/net/socket_exception.h" @@ -319,20 +321,42 @@ void ParallelSortClusteredCursor::fullInit(OperationContext* opCtx) { finishInit(opCtx); } -void ParallelSortClusteredCursor::_markStaleNS(const NamespaceString& staleNS, - const StaleConfigException& e) { - if (_staleNSMap.find(staleNS.ns()) == _staleNSMap.end()) { - _staleNSMap[staleNS.ns()] = 1; +void ParallelSortClusteredCursor::_markStaleNS(OperationContext* opCtx, + const NamespaceString& staleNS, + const StaleConfigException& e, + bool& forceReload) { + if (e.requiresFullReload()) { + Grid::get(opCtx)->catalogCache()->invalidate(staleNS.db()); } - const int tries = ++_staleNSMap[staleNS.ns()]; + if (_staleNSMap.find(staleNS.ns()) == _staleNSMap.end()) + _staleNSMap[staleNS.ns()] = 1; + + int tries = ++_staleNSMap[staleNS.ns()]; if (tries >= 5) { throw SendStaleConfigException(staleNS.ns(), - "too many retries of stale version info", + str::stream() << "too many retries of stale version info", e.getVersionReceived(), e.getVersionWanted()); } + + forceReload = tries > 2; +} + +void ParallelSortClusteredCursor::_handleStaleNS(OperationContext* opCtx, + const NamespaceString& staleNS, + bool forceReload) { + auto scopedCMStatus = ScopedChunkManager::get(opCtx, staleNS); + if (!scopedCMStatus.isOK()) { + log() << "cannot reload database info for stale namespace " << staleNS.ns(); + return; + } + + const auto& scopedCM = scopedCMStatus.getValue(); + + // Reload chunk manager, potentially forcing the namespace + scopedCM.db()->getChunkManagerIfExists(opCtx, staleNS.ns(), true, forceReload); } void ParallelSortClusteredCursor::setupVersionAndHandleSlaveOk( @@ -435,12 +459,12 @@ void ParallelSortClusteredCursor::startInit(OperationContext* opCtx) { shared_ptr<Shard> primary; { - auto routingInfoStatus = - Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss); - if (routingInfoStatus != ErrorCodes::NamespaceNotFound) { - auto routingInfo = uassertStatusOK(std::move(routingInfoStatus)); - manager = routingInfo.cm(); - primary = routingInfo.primary(); + auto scopedCMStatus = ScopedChunkManager::get(opCtx, nss); + if (scopedCMStatus != ErrorCodes::NamespaceNotFound) { + uassertStatusOK(scopedCMStatus.getStatus()); + const auto& scopedCM = scopedCMStatus.getValue(); + manager = scopedCM.cm(); + primary = scopedCM.primary(); } } @@ -618,17 +642,20 @@ void ParallelSortClusteredCursor::startInit(OperationContext* opCtx) { if (staleNS.size() == 0) staleNS = nss; // ns is the *versioned* namespace, be careful of this - _markStaleNS(staleNS, e); - Grid::get(opCtx)->catalogCache()->invalidateShardedCollection(staleNS); + // Probably need to retry fully + bool forceReload; + _markStaleNS(opCtx, staleNS, e, forceReload); - LOG(1) << "stale config of ns " << staleNS << " during initialization, will retry" + LOG(1) << "stale config of ns " << staleNS + << " during initialization, will retry with forced : " << forceReload << causedBy(redact(e)); // This is somewhat strange - if (staleNS != nss) { + if (staleNS != nss) warning() << "versioned ns " << nss.ns() << " doesn't match stale config namespace " << staleNS; - } + + _handleStaleNS(opCtx, staleNS, forceReload); // Restart with new chunk manager startInit(opCtx); @@ -833,21 +860,26 @@ void ParallelSortClusteredCursor::finishInit(OperationContext* opCtx) { if (retry) { // Refresh stale namespaces if (staleNSExceptions.size()) { - for (const auto& exEntry : staleNSExceptions) { - const NamespaceString staleNS(exEntry.first); - const StaleConfigException& ex = exEntry.second; + for (map<string, StaleConfigException>::iterator i = staleNSExceptions.begin(), + end = staleNSExceptions.end(); + i != end; + ++i) { + NamespaceString staleNS(i->first); + const StaleConfigException& exception = i->second; - _markStaleNS(staleNS, ex); - Grid::get(opCtx)->catalogCache()->invalidateShardedCollection(staleNS); + bool forceReload; + _markStaleNS(opCtx, staleNS, exception, forceReload); - LOG(1) << "stale config of ns " << staleNS << " on finishing query, will retry" - << causedBy(redact(ex)); + LOG(1) << "stale config of ns " << staleNS + << " on finishing query, will retry with forced : " << forceReload + << causedBy(redact(exception)); // This is somewhat strange - if (staleNS != ns) { + if (staleNS != ns) warning() << "versioned ns " << ns << " doesn't match stale config namespace " << staleNS; - } + + _handleStaleNS(opCtx, staleNS, forceReload); } } diff --git a/src/mongo/s/client/parallel.h b/src/mongo/s/client/parallel.h index d1680f1e74f..d375858bae0 100644 --- a/src/mongo/s/client/parallel.h +++ b/src/mongo/s/client/parallel.h @@ -117,7 +117,11 @@ private: void _finishCons(); - void _markStaleNS(const NamespaceString& staleNS, const StaleConfigException& e); + void _markStaleNS(OperationContext* opCtx, + const NamespaceString& staleNS, + const StaleConfigException& e, + bool& forceReload); + void _handleStaleNS(OperationContext* opCtx, const NamespaceString& staleNS, bool forceReload); bool _didInit; bool _done; diff --git a/src/mongo/s/client/version_manager.cpp b/src/mongo/s/client/version_manager.cpp index 7f97716f620..299a89f5941 100644 --- a/src/mongo/s/client/version_manager.cpp +++ b/src/mongo/s/client/version_manager.cpp @@ -36,13 +36,13 @@ #include "mongo/db/namespace_string.h" #include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/catalog_cache.h" -#include "mongo/s/catalog_cache.h" #include "mongo/s/chunk_version.h" #include "mongo/s/client/shard_connection.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" #include "mongo/s/is_mongos.h" #include "mongo/s/set_shard_version_request.h" +#include "mongo/s/sharding_raii.h" #include "mongo/s/stale_exception.h" #include "mongo/util/log.h" @@ -257,21 +257,21 @@ bool checkShardVersion(OperationContext* opCtx, const NamespaceString nss(ns); - auto const catalogCache = Grid::get(opCtx)->catalogCache(); - if (authoritative) { - Grid::get(opCtx)->catalogCache()->invalidateShardedCollection(nss); + ScopedChunkManager::refreshAndGet(opCtx, nss); } - auto routingInfoStatus = catalogCache->getCollectionRoutingInfo(opCtx, nss); - if (!routingInfoStatus.isOK()) { + auto scopedCMStatus = ScopedChunkManager::get(opCtx, nss); + + if (!scopedCMStatus.isOK()) { return false; } - auto& routingInfo = routingInfoStatus.getValue(); + const auto& scopedCM = scopedCMStatus.getValue(); - const auto manager = routingInfo.cm(); - const auto primary = routingInfo.primary(); + auto conf = scopedCM.db(); + const auto manager = scopedCM.cm(); + const auto primary = scopedCM.primary(); unsigned long long officialSequenceNumber = 0; @@ -379,7 +379,16 @@ bool checkShardVersion(OperationContext* opCtx, return true; } - Grid::get(opCtx)->catalogCache()->onStaleConfigError(std::move(routingInfo)); + if (result["reloadConfig"].trueValue()) { + if (result["version"].timestampTime() == Date_t()) { + warning() << "reloading full configuration for " << conf->name() + << ", connection state indicates significant version changes"; + + Grid::get(opCtx)->catalogCache()->invalidate(nss.db()); + } + + conf->getChunkManager(opCtx, nss.ns(), true); + } const int maxNumTries = 7; if (tryNumber < maxNumTries) { diff --git a/src/mongo/s/commands/chunk_manager_targeter.cpp b/src/mongo/s/commands/chunk_manager_targeter.cpp index 42de7ba2903..609dee87d9e 100644 --- a/src/mongo/s/commands/chunk_manager_targeter.cpp +++ b/src/mongo/s/commands/chunk_manager_targeter.cpp @@ -32,18 +32,31 @@ #include "mongo/s/commands/chunk_manager_targeter.h" +#include <boost/thread/tss.hpp> + #include "mongo/db/matcher/extensions_callback_noop.h" #include "mongo/db/operation_context.h" #include "mongo/db/query/canonical_query.h" #include "mongo/db/query/collation/collation_index_key.h" +#include "mongo/s/catalog_cache.h" +#include "mongo/s/chunk.h" #include "mongo/s/client/shard_registry.h" -#include "mongo/s/commands/cluster_commands_common.h" #include "mongo/s/grid.h" #include "mongo/s/shard_key_pattern.h" +#include "mongo/s/sharding_raii.h" +#include "mongo/stdx/memory.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" namespace mongo { + +using std::shared_ptr; +using str::stream; +using std::map; +using std::set; +using std::string; +using std::vector; + namespace { enum UpdateType { UpdateType_Replacement, UpdateType_OpStyle, UpdateType_Unknown }; @@ -52,6 +65,11 @@ enum CompareResult { CompareResult_Unknown, CompareResult_GTE, CompareResult_LT const ShardKeyPattern virtualIdShardKey(BSON("_id" << 1)); +// To match legacy reload behavior, we have to backoff on config reload per-thread +// TODO: Centralize this behavior better by refactoring config reload in mongos +boost::thread_specific_ptr<Backoff> perThreadBackoff; +const int maxWaitMillis = 500; + /** * There are two styles of update expressions: * @@ -120,6 +138,15 @@ bool isExactIdQuery(OperationContext* opCtx, const CanonicalQuery& query, ChunkM return true; } +void refreshBackoff() { + if (!perThreadBackoff.get()) { + perThreadBackoff.reset(new Backoff(maxWaitMillis, maxWaitMillis * 2)); + } + + perThreadBackoff.get()->nextSleepMillis(); +} + + // // Utilities to compare shard versions // @@ -146,19 +173,25 @@ CompareResult compareShardVersions(const ChunkVersion& shardVersionA, return CompareResult_Unknown; } - if (shardVersionA < shardVersionB) + if (shardVersionA < shardVersionB) { return CompareResult_LT; + } + else return CompareResult_GTE; } -ChunkVersion getShardVersion(const CachedCollectionRoutingInfo& routingInfo, - const ShardId& shardId) { - if (routingInfo.cm()) { - return routingInfo.cm()->getVersion(shardId); +ChunkVersion getShardVersion(StringData shardName, + const ChunkManager* manager, + const Shard* primary) { + dassert(!(manager && primary)); + dassert(manager || primary); + + if (primary) { + return ChunkVersion::UNSHARDED(); } - return ChunkVersion::UNSHARDED(); + return manager->getVersion(shardName.toString()); } /** @@ -172,21 +205,26 @@ ChunkVersion getShardVersion(const CachedCollectionRoutingInfo& routingInfo, * Note that the signature here is weird since our cached map of chunk versions is stored in a * ChunkManager or is implicit in the primary shard of the collection. */ -CompareResult compareAllShardVersions(const CachedCollectionRoutingInfo& routingInfo, - const ShardVersionMap& remoteShardVersions) { +CompareResult compareAllShardVersions(const ChunkManager* cachedChunkManager, + const Shard* cachedPrimary, + const map<ShardId, ChunkVersion>& remoteShardVersions) { CompareResult finalResult = CompareResult_GTE; - for (const auto& shardVersionEntry : remoteShardVersions) { - const ShardId& shardId = shardVersionEntry.first; - const ChunkVersion& remoteShardVersion = shardVersionEntry.second; + for (map<ShardId, ChunkVersion>::const_iterator it = remoteShardVersions.begin(); + it != remoteShardVersions.end(); + ++it) { + // Get the remote and cached version for the next shard + const ShardId& shardName = it->first; + const ChunkVersion& remoteShardVersion = it->second; ChunkVersion cachedShardVersion; try { // Throws b/c shard constructor throws - cachedShardVersion = getShardVersion(routingInfo, shardId); + cachedShardVersion = + getShardVersion(shardName.toString(), cachedChunkManager, cachedPrimary); } catch (const DBException& ex) { - warning() << "could not lookup shard " << shardId + warning() << "could not lookup shard " << shardName << " in local cache, shard metadata may have changed" << " or be unavailable" << causedBy(ex); @@ -198,7 +236,6 @@ CompareResult compareAllShardVersions(const CachedCollectionRoutingInfo& routing if (result == CompareResult_Unknown) return result; - if (result == CompareResult_LT) finalResult = CompareResult_LT; @@ -211,10 +248,10 @@ CompareResult compareAllShardVersions(const CachedCollectionRoutingInfo& routing /** * Whether or not the manager/primary pair is different from the other manager/primary pair. */ -bool isMetadataDifferent(const std::shared_ptr<ChunkManager>& managerA, - const std::shared_ptr<Shard>& primaryA, - const std::shared_ptr<ChunkManager>& managerB, - const std::shared_ptr<Shard>& primaryB) { +bool isMetadataDifferent(const shared_ptr<ChunkManager>& managerA, + const shared_ptr<Shard>& primaryA, + const shared_ptr<ChunkManager>& managerB, + const shared_ptr<Shard>& primaryB) { if ((managerA && !managerB) || (!managerA && managerB) || (primaryA && !primaryB) || (!primaryA && primaryB)) return true; @@ -231,10 +268,10 @@ bool isMetadataDifferent(const std::shared_ptr<ChunkManager>& managerA, * Whether or not the manager/primary pair was changed or refreshed from a previous version * of the metadata. */ -bool wasMetadataRefreshed(const std::shared_ptr<ChunkManager>& managerA, - const std::shared_ptr<Shard>& primaryA, - const std::shared_ptr<ChunkManager>& managerB, - const std::shared_ptr<Shard>& primaryB) { +bool wasMetadataRefreshed(const shared_ptr<ChunkManager>& managerA, + const shared_ptr<Shard>& primaryA, + const shared_ptr<ChunkManager>& managerB, + const shared_ptr<Shard>& primaryB) { if (isMetadataDifferent(managerA, primaryA, managerB, primaryB)) return true; @@ -253,18 +290,14 @@ ChunkManagerTargeter::ChunkManagerTargeter(const NamespaceString& nss, TargeterS Status ChunkManagerTargeter::init(OperationContext* opCtx) { - auto shardDbStatus = createShardDatabase(opCtx, _nss.db()); - if (!shardDbStatus.isOK()) { - return shardDbStatus.getStatus(); + auto scopedCMStatus = ScopedChunkManager::getOrCreate(opCtx, _nss); + if (!scopedCMStatus.isOK()) { + return scopedCMStatus.getStatus(); } - const auto routingInfoStatus = - Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, _nss); - if (!routingInfoStatus.isOK()) { - return routingInfoStatus.getStatus(); - } - - _routingInfo = std::move(routingInfoStatus.getValue()); + const auto& scopedCM = scopedCMStatus.getValue(); + _manager = scopedCM.cm(); + _primary = scopedCM.primary(); return Status::OK(); } @@ -278,21 +311,21 @@ Status ChunkManagerTargeter::targetInsert(OperationContext* opCtx, ShardEndpoint** endpoint) const { BSONObj shardKey; - if (_routingInfo->cm()) { + if (_manager) { // // Sharded collections have the following requirements for targeting: // // Inserts must contain the exact shard key. // - shardKey = _routingInfo->cm()->getShardKeyPattern().extractShardKeyFromDoc(doc); + shardKey = _manager->getShardKeyPattern().extractShardKeyFromDoc(doc); // Check shard key exists if (shardKey.isEmpty()) { - return {ErrorCodes::ShardKeyNotFound, - str::stream() << "document " << doc - << " does not contain shard key for pattern " - << _routingInfo->cm()->getShardKeyPattern().toString()}; + return Status(ErrorCodes::ShardKeyNotFound, + stream() << "document " << doc + << " does not contain shard key for pattern " + << _manager->getShardKeyPattern().toString()); } // Check shard key size on insert @@ -305,13 +338,13 @@ Status ChunkManagerTargeter::targetInsert(OperationContext* opCtx, if (!shardKey.isEmpty()) { *endpoint = targetShardKey(shardKey, CollationSpec::kSimpleSpec, doc.objsize()).release(); } else { - if (!_routingInfo->primary()) { + if (!_primary) { return Status(ErrorCodes::NamespaceNotFound, str::stream() << "could not target insert in collection " << getNS().ns() << "; no metadata found"); } - *endpoint = new ShardEndpoint(_routingInfo->primary()->getId(), ChunkVersion::UNSHARDED()); + *endpoint = new ShardEndpoint(_primary->getId(), ChunkVersion::UNSHARDED()); } return Status::OK(); @@ -343,14 +376,14 @@ Status ChunkManagerTargeter::targetUpdate( UpdateType updateType = getUpdateExprType(updateDoc.getUpdateExpr()); if (updateType == UpdateType_Unknown) { - return {ErrorCodes::UnsupportedFormat, - str::stream() << "update document " << updateExpr - << " has mixed $operator and non-$operator style fields"}; + return Status(ErrorCodes::UnsupportedFormat, + stream() << "update document " << updateExpr + << " has mixed $operator and non-$operator style fields"); } BSONObj shardKey; - if (_routingInfo->cm()) { + if (_manager) { // // Sharded collections have the following futher requirements for targeting: // @@ -362,7 +395,7 @@ Status ChunkManagerTargeter::targetUpdate( if (updateType == UpdateType_OpStyle) { // Target using the query StatusWith<BSONObj> status = - _routingInfo->cm()->getShardKeyPattern().extractShardKeyFromQuery(opCtx, query); + _manager->getShardKeyPattern().extractShardKeyFromQuery(opCtx, query); // Bad query if (!status.isOK()) @@ -371,7 +404,7 @@ Status ChunkManagerTargeter::targetUpdate( shardKey = status.getValue(); } else { // Target using the replacement document - shardKey = _routingInfo->cm()->getShardKeyPattern().extractShardKeyFromDoc(updateExpr); + shardKey = _manager->getShardKeyPattern().extractShardKeyFromDoc(updateExpr); } // Check shard key size on upsert. @@ -398,13 +431,13 @@ Status ChunkManagerTargeter::targetUpdate( // We failed to target a single shard. // Upserts are required to target a single shard. - if (_routingInfo->cm() && updateDoc.getUpsert()) { + if (_manager && updateDoc.getUpsert()) { return Status(ErrorCodes::ShardKeyNotFound, str::stream() << "An upsert on a sharded collection must contain the shard " "key and have the simple collation. Update request: " << updateDoc.toBSON() << ", shard key pattern: " - << _routingInfo->cm()->getShardKeyPattern().toString()); + << _manager->getShardKeyPattern().toString()); } // Parse update query. @@ -421,8 +454,8 @@ Status ChunkManagerTargeter::targetUpdate( } // Single (non-multi) updates must target a single shard or be exact-ID. - if (_routingInfo->cm() && !updateDoc.getMulti() && - !isExactIdQuery(opCtx, *cq.getValue(), _routingInfo->cm().get())) { + if (_manager && !updateDoc.getMulti() && + !isExactIdQuery(opCtx, *cq.getValue(), _manager.get())) { return Status(ErrorCodes::ShardKeyNotFound, str::stream() << "A single update on a sharded collection must contain an exact " @@ -431,7 +464,7 @@ Status ChunkManagerTargeter::targetUpdate( "request: " << updateDoc.toBSON() << ", shard key pattern: " - << _routingInfo->cm()->getShardKeyPattern().toString()); + << _manager->getShardKeyPattern().toString()); } if (updateType == UpdateType_OpStyle) { @@ -447,7 +480,7 @@ Status ChunkManagerTargeter::targetDelete( std::vector<std::unique_ptr<ShardEndpoint>>* endpoints) const { BSONObj shardKey; - if (_routingInfo->cm()) { + if (_manager) { // // Sharded collections have the following further requirements for targeting: // @@ -456,8 +489,7 @@ Status ChunkManagerTargeter::targetDelete( // Get the shard key StatusWith<BSONObj> status = - _routingInfo->cm()->getShardKeyPattern().extractShardKeyFromQuery(opCtx, - deleteDoc.getQuery()); + _manager->getShardKeyPattern().extractShardKeyFromQuery(opCtx, deleteDoc.getQuery()); // Bad query if (!status.isOK()) @@ -495,8 +527,8 @@ Status ChunkManagerTargeter::targetDelete( } // Single deletes must target a single shard or be exact-ID. - if (_routingInfo->cm() && deleteDoc.getLimit() == 1 && - !isExactIdQuery(opCtx, *cq.getValue(), _routingInfo->cm().get())) { + if (_manager && deleteDoc.getLimit() == 1 && + !isExactIdQuery(opCtx, *cq.getValue(), _manager.get())) { return Status(ErrorCodes::ShardKeyNotFound, str::stream() << "A single delete on a sharded collection must contain an exact " @@ -505,7 +537,7 @@ Status ChunkManagerTargeter::targetDelete( "request: " << deleteDoc.toBSON() << ", shard key pattern: " - << _routingInfo->cm()->getShardKeyPattern().toString()); + << _manager->getShardKeyPattern().toString()); } return targetQuery(opCtx, deleteDoc.getQuery(), collation, endpoints); @@ -526,28 +558,26 @@ Status ChunkManagerTargeter::targetQuery( const BSONObj& query, const BSONObj& collation, std::vector<std::unique_ptr<ShardEndpoint>>* endpoints) const { - if (!_routingInfo->primary() && !_routingInfo->cm()) { - return {ErrorCodes::NamespaceNotFound, - str::stream() << "could not target query in " << getNS().ns() - << "; no metadata found"}; + if (!_primary && !_manager) { + return Status(ErrorCodes::NamespaceNotFound, + stream() << "could not target query in " << getNS().ns() + << "; no metadata found"); } - std::set<ShardId> shardIds; - if (_routingInfo->cm()) { + set<ShardId> shardIds; + if (_manager) { try { - _routingInfo->cm()->getShardIdsForQuery(opCtx, query, collation, &shardIds); + _manager->getShardIdsForQuery(opCtx, query, collation, &shardIds); } catch (const DBException& ex) { return ex.toStatus(); } } else { - shardIds.insert(_routingInfo->primary()->getId()); + shardIds.insert(_primary->getId()); } for (const ShardId& shardId : shardIds) { endpoints->push_back(stdx::make_unique<ShardEndpoint>( - shardId, - _routingInfo->cm() ? _routingInfo->cm()->getVersion(shardId) - : ChunkVersion::UNSHARDED())); + shardId, _manager ? _manager->getVersion(shardId) : ChunkVersion::UNSHARDED())); } return Status::OK(); @@ -556,7 +586,7 @@ Status ChunkManagerTargeter::targetQuery( std::unique_ptr<ShardEndpoint> ChunkManagerTargeter::targetShardKey(const BSONObj& shardKey, const BSONObj& collation, long long estDataSize) const { - auto chunk = _routingInfo->cm()->findIntersectingChunk(shardKey, collation); + auto chunk = _manager->findIntersectingChunk(shardKey, collation); // Track autosplit stats for sharded collections // Note: this is only best effort accounting and is not accurate. @@ -565,29 +595,27 @@ std::unique_ptr<ShardEndpoint> ChunkManagerTargeter::targetShardKey(const BSONOb } return stdx::make_unique<ShardEndpoint>(chunk->getShardId(), - _routingInfo->cm()->getVersion(chunk->getShardId())); + _manager->getVersion(chunk->getShardId())); } Status ChunkManagerTargeter::targetCollection( std::vector<std::unique_ptr<ShardEndpoint>>* endpoints) const { - if (!_routingInfo->primary() && !_routingInfo->cm()) { - return {ErrorCodes::NamespaceNotFound, - str::stream() << "could not target full range of " << getNS().ns() - << "; metadata not found"}; + if (!_primary && !_manager) { + return Status(ErrorCodes::NamespaceNotFound, + str::stream() << "could not target full range of " << getNS().ns() + << "; metadata not found"); } - std::set<ShardId> shardIds; - if (_routingInfo->cm()) { - _routingInfo->cm()->getAllShardIds(&shardIds); + set<ShardId> shardIds; + if (_manager) { + _manager->getAllShardIds(&shardIds); } else { - shardIds.insert(_routingInfo->primary()->getId()); + shardIds.insert(_primary->getId()); } for (const ShardId& shardId : shardIds) { endpoints->push_back(stdx::make_unique<ShardEndpoint>( - shardId, - _routingInfo->cm() ? _routingInfo->cm()->getVersion(shardId) - : ChunkVersion::UNSHARDED())); + shardId, _manager ? _manager->getVersion(shardId) : ChunkVersion::UNSHARDED())); } return Status::OK(); @@ -595,20 +623,19 @@ Status ChunkManagerTargeter::targetCollection( Status ChunkManagerTargeter::targetAllShards( std::vector<std::unique_ptr<ShardEndpoint>>* endpoints) const { - if (!_routingInfo->primary() && !_routingInfo->cm()) { - return {ErrorCodes::NamespaceNotFound, - str::stream() << "could not target every shard with versions for " << getNS().ns() - << "; metadata not found"}; + if (!_primary && !_manager) { + return Status(ErrorCodes::NamespaceNotFound, + str::stream() << "could not target every shard with versions for " + << getNS().ns() + << "; metadata not found"); } - std::vector<ShardId> shardIds; + vector<ShardId> shardIds; grid.shardRegistry()->getAllShardIds(&shardIds); for (const ShardId& shardId : shardIds) { endpoints->push_back(stdx::make_unique<ShardEndpoint>( - shardId, - _routingInfo->cm() ? _routingInfo->cm()->getVersion(shardId) - : ChunkVersion::UNSHARDED())); + shardId, _manager ? _manager->getVersion(shardId) : ChunkVersion::UNSHARDED())); } return Status::OK(); @@ -622,7 +649,8 @@ void ChunkManagerTargeter::noteStaleResponse(const ShardEndpoint& endpoint, if (staleInfo["vWanted"].eoo()) { // If we don't have a vWanted sent, assume the version is higher than our current // version. - remoteShardVersion = getShardVersion(*_routingInfo, endpoint.shardName); + remoteShardVersion = + getShardVersion(endpoint.shardName.toString(), _manager.get(), _primary.get()); remoteShardVersion.incMajor(); } else { remoteShardVersion = ChunkVersion::fromBSON(staleInfo, "vWanted"); @@ -671,14 +699,18 @@ Status ChunkManagerTargeter::refreshIfNeeded(OperationContext* opCtx, bool* wasC // Get the latest metadata information from the cache if there were issues // - auto lastManager = _routingInfo->cm(); - auto lastPrimary = _routingInfo->primary(); + shared_ptr<ChunkManager> lastManager = _manager; + shared_ptr<Shard> lastPrimary = _primary; - auto initStatus = init(opCtx); - if (!initStatus.isOK()) { - return initStatus; + auto scopedCMStatus = ScopedChunkManager::getOrCreate(opCtx, _nss); + if (!scopedCMStatus.isOK()) { + return scopedCMStatus.getStatus(); } + const auto& scopedCM = scopedCMStatus.getValue(); + _manager = scopedCM.cm(); + _primary = scopedCM.primary(); + // We now have the latest metadata from the cache. // @@ -686,6 +718,8 @@ Status ChunkManagerTargeter::refreshIfNeeded(OperationContext* opCtx, bool* wasC // Either we couldn't target at all, or we have stale versions, but not both. // + dassert(!(_needsTargetingRefresh && !_remoteShardVersions.empty())); + if (_needsTargetingRefresh) { // Reset the field _needsTargetingRefresh = false; @@ -694,44 +728,63 @@ Status ChunkManagerTargeter::refreshIfNeeded(OperationContext* opCtx, bool* wasC // the // metadata since we last got it from the cache. - bool alreadyRefreshed = wasMetadataRefreshed( - lastManager, lastPrimary, _routingInfo->cm(), _routingInfo->primary()); + bool alreadyRefreshed = wasMetadataRefreshed(lastManager, lastPrimary, _manager, _primary); // If didn't already refresh the targeting information, refresh it if (!alreadyRefreshed) { // To match previous behavior, we just need an incremental refresh here - return refreshNow(opCtx); + return refreshNow(opCtx, RefreshType_RefreshChunkManager); } - *wasChanged = isMetadataDifferent( - lastManager, lastPrimary, _routingInfo->cm(), _routingInfo->primary()); + *wasChanged = isMetadataDifferent(lastManager, lastPrimary, _manager, _primary); return Status::OK(); } else if (!_remoteShardVersions.empty()) { // If we got stale shard versions from remote shards, we may need to refresh // NOTE: Not sure yet if this can happen simultaneously with targeting issues - CompareResult result = compareAllShardVersions(*_routingInfo, _remoteShardVersions); - + CompareResult result = + compareAllShardVersions(_manager.get(), _primary.get(), _remoteShardVersions); // Reset the versions _remoteShardVersions.clear(); - if (result == CompareResult_Unknown || result == CompareResult_LT) { + if (result == CompareResult_Unknown) { // Our current shard versions aren't all comparable to the old versions, maybe drop - return refreshNow(opCtx); + return refreshNow(opCtx, RefreshType_ReloadDatabase); + } else if (result == CompareResult_LT) { + // Our current shard versions are less than the remote versions, but no drop + return refreshNow(opCtx, RefreshType_RefreshChunkManager); } - *wasChanged = isMetadataDifferent( - lastManager, lastPrimary, _routingInfo->cm(), _routingInfo->primary()); + *wasChanged = isMetadataDifferent(lastManager, lastPrimary, _manager, _primary); return Status::OK(); } - MONGO_UNREACHABLE; + // unreachable + dassert(false); + return Status::OK(); } -Status ChunkManagerTargeter::refreshNow(OperationContext* opCtx) { - Grid::get(opCtx)->catalogCache()->onStaleConfigError(std::move(*_routingInfo)); +Status ChunkManagerTargeter::refreshNow(OperationContext* opCtx, RefreshType refreshType) { + if (refreshType == RefreshType_ReloadDatabase) { + Grid::get(opCtx)->catalogCache()->invalidate(_nss.db().toString()); + } + + // Try not to spam the configs + refreshBackoff(); - return init(opCtx); + ScopedChunkManager::refreshAndGet(opCtx, _nss); + + auto scopedCMStatus = ScopedChunkManager::get(opCtx, _nss); + if (!scopedCMStatus.isOK()) { + return scopedCMStatus.getStatus(); + } + + const auto& scopedCM = scopedCMStatus.getValue(); + + _manager = scopedCM.cm(); + _primary = scopedCM.primary(); + + return Status::OK(); } } // namespace mongo diff --git a/src/mongo/s/commands/chunk_manager_targeter.cpp-8454ea5f b/src/mongo/s/commands/chunk_manager_targeter.cpp-8454ea5f deleted file mode 100644 index 42de7ba2903..00000000000 --- a/src/mongo/s/commands/chunk_manager_targeter.cpp-8454ea5f +++ /dev/null @@ -1,737 +0,0 @@ -/** - * Copyright (C) 2013 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding - -#include "mongo/platform/basic.h" - -#include "mongo/s/commands/chunk_manager_targeter.h" - -#include "mongo/db/matcher/extensions_callback_noop.h" -#include "mongo/db/operation_context.h" -#include "mongo/db/query/canonical_query.h" -#include "mongo/db/query/collation/collation_index_key.h" -#include "mongo/s/client/shard_registry.h" -#include "mongo/s/commands/cluster_commands_common.h" -#include "mongo/s/grid.h" -#include "mongo/s/shard_key_pattern.h" -#include "mongo/util/log.h" -#include "mongo/util/mongoutils/str.h" - -namespace mongo { -namespace { - -enum UpdateType { UpdateType_Replacement, UpdateType_OpStyle, UpdateType_Unknown }; - -enum CompareResult { CompareResult_Unknown, CompareResult_GTE, CompareResult_LT }; - -const ShardKeyPattern virtualIdShardKey(BSON("_id" << 1)); - -/** - * There are two styles of update expressions: - * - * Replacement style: coll.update({ x : 1 }, { y : 2 }) - * OpStyle: coll.update({ x : 1 }, { $set : { y : 2 } }) - */ -UpdateType getUpdateExprType(const BSONObj& updateExpr) { - // Empty update is replacement-style, by default - if (updateExpr.isEmpty()) { - return UpdateType_Replacement; - } - - UpdateType updateType = UpdateType_Unknown; - - BSONObjIterator it(updateExpr); - while (it.more()) { - BSONElement next = it.next(); - - if (next.fieldName()[0] == '$') { - if (updateType == UpdateType_Unknown) { - updateType = UpdateType_OpStyle; - } else if (updateType == UpdateType_Replacement) { - return UpdateType_Unknown; - } - } else { - if (updateType == UpdateType_Unknown) { - updateType = UpdateType_Replacement; - } else if (updateType == UpdateType_OpStyle) { - return UpdateType_Unknown; - } - } - } - - return updateType; -} - -/** - * This returns "does the query have an _id field" and "is the _id field querying for a direct - * value like _id : 3 and not _id : { $gt : 3 }" - * - * If the query does not use the collection default collation, the _id field cannot contain strings, - * objects, or arrays. - * - * Ex: { _id : 1 } => true - * { foo : <anything>, _id : 1 } => true - * { _id : { $lt : 30 } } => false - * { foo : <anything> } => false - */ -bool isExactIdQuery(OperationContext* opCtx, const CanonicalQuery& query, ChunkManager* manager) { - auto shardKey = virtualIdShardKey.extractShardKeyFromQuery(query); - BSONElement idElt = shardKey["_id"]; - - if (!idElt) { - return false; - } - - if (CollationIndexKey::isCollatableType(idElt.type()) && manager && - !query.getQueryRequest().getCollation().isEmpty() && - !CollatorInterface::collatorsMatch(query.getCollator(), manager->getDefaultCollator())) { - - // The collation applies to the _id field, but the user specified a collation which doesn't - // match the collection default. - return false; - } - - return true; -} - -// -// Utilities to compare shard versions -// - -/** - * Returns the relationship of two shard versions. Shard versions of a collection that has not - * been dropped and recreated and where there is at least one chunk on a shard are comparable, - * otherwise the result is ambiguous. - */ -CompareResult compareShardVersions(const ChunkVersion& shardVersionA, - const ChunkVersion& shardVersionB) { - // Collection may have been dropped - if (!shardVersionA.hasEqualEpoch(shardVersionB)) { - return CompareResult_Unknown; - } - - // Zero shard versions are only comparable to themselves - if (!shardVersionA.isSet() || !shardVersionB.isSet()) { - // If both are zero... - if (!shardVersionA.isSet() && !shardVersionB.isSet()) { - return CompareResult_GTE; - } - - return CompareResult_Unknown; - } - - if (shardVersionA < shardVersionB) - return CompareResult_LT; - else - return CompareResult_GTE; -} - -ChunkVersion getShardVersion(const CachedCollectionRoutingInfo& routingInfo, - const ShardId& shardId) { - if (routingInfo.cm()) { - return routingInfo.cm()->getVersion(shardId); - } - - return ChunkVersion::UNSHARDED(); -} - -/** - * Returns the relationship between two maps of shard versions. As above, these maps are often - * comparable when the collection has not been dropped and there is at least one chunk on the - * shards. If any versions in the maps are not comparable, the result is _Unknown. - * - * If any versions in the first map (cached) are _LT the versions in the second map (remote), - * the first (cached) versions are _LT the second (remote) versions. - * - * Note that the signature here is weird since our cached map of chunk versions is stored in a - * ChunkManager or is implicit in the primary shard of the collection. - */ -CompareResult compareAllShardVersions(const CachedCollectionRoutingInfo& routingInfo, - const ShardVersionMap& remoteShardVersions) { - CompareResult finalResult = CompareResult_GTE; - - for (const auto& shardVersionEntry : remoteShardVersions) { - const ShardId& shardId = shardVersionEntry.first; - const ChunkVersion& remoteShardVersion = shardVersionEntry.second; - - ChunkVersion cachedShardVersion; - - try { - // Throws b/c shard constructor throws - cachedShardVersion = getShardVersion(routingInfo, shardId); - } catch (const DBException& ex) { - warning() << "could not lookup shard " << shardId - << " in local cache, shard metadata may have changed" - << " or be unavailable" << causedBy(ex); - - return CompareResult_Unknown; - } - - // Compare the remote and cached versions - CompareResult result = compareShardVersions(cachedShardVersion, remoteShardVersion); - - if (result == CompareResult_Unknown) - return result; - - if (result == CompareResult_LT) - finalResult = CompareResult_LT; - - // Note that we keep going after _LT b/c there could be more _Unknowns. - } - - return finalResult; -} - -/** - * Whether or not the manager/primary pair is different from the other manager/primary pair. - */ -bool isMetadataDifferent(const std::shared_ptr<ChunkManager>& managerA, - const std::shared_ptr<Shard>& primaryA, - const std::shared_ptr<ChunkManager>& managerB, - const std::shared_ptr<Shard>& primaryB) { - if ((managerA && !managerB) || (!managerA && managerB) || (primaryA && !primaryB) || - (!primaryA && primaryB)) - return true; - - if (managerA) { - return !managerA->getVersion().isStrictlyEqualTo(managerB->getVersion()); - } - - dassert(NULL != primaryA.get()); - return primaryA->getId() != primaryB->getId(); -} - -/** -* Whether or not the manager/primary pair was changed or refreshed from a previous version -* of the metadata. -*/ -bool wasMetadataRefreshed(const std::shared_ptr<ChunkManager>& managerA, - const std::shared_ptr<Shard>& primaryA, - const std::shared_ptr<ChunkManager>& managerB, - const std::shared_ptr<Shard>& primaryB) { - if (isMetadataDifferent(managerA, primaryA, managerB, primaryB)) - return true; - - if (managerA) { - dassert(managerB.get()); // otherwise metadata would be different - return managerA->getSequenceNumber() != managerB->getSequenceNumber(); - } - - return false; -} - -} // namespace - -ChunkManagerTargeter::ChunkManagerTargeter(const NamespaceString& nss, TargeterStats* stats) - : _nss(nss), _needsTargetingRefresh(false), _stats(stats) {} - - -Status ChunkManagerTargeter::init(OperationContext* opCtx) { - auto shardDbStatus = createShardDatabase(opCtx, _nss.db()); - if (!shardDbStatus.isOK()) { - return shardDbStatus.getStatus(); - } - - const auto routingInfoStatus = - Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, _nss); - if (!routingInfoStatus.isOK()) { - return routingInfoStatus.getStatus(); - } - - _routingInfo = std::move(routingInfoStatus.getValue()); - - return Status::OK(); -} - -const NamespaceString& ChunkManagerTargeter::getNS() const { - return _nss; -} - -Status ChunkManagerTargeter::targetInsert(OperationContext* opCtx, - const BSONObj& doc, - ShardEndpoint** endpoint) const { - BSONObj shardKey; - - if (_routingInfo->cm()) { - // - // Sharded collections have the following requirements for targeting: - // - // Inserts must contain the exact shard key. - // - - shardKey = _routingInfo->cm()->getShardKeyPattern().extractShardKeyFromDoc(doc); - - // Check shard key exists - if (shardKey.isEmpty()) { - return {ErrorCodes::ShardKeyNotFound, - str::stream() << "document " << doc - << " does not contain shard key for pattern " - << _routingInfo->cm()->getShardKeyPattern().toString()}; - } - - // Check shard key size on insert - Status status = ShardKeyPattern::checkShardKeySize(shardKey); - if (!status.isOK()) - return status; - } - - // Target the shard key or database primary - if (!shardKey.isEmpty()) { - *endpoint = targetShardKey(shardKey, CollationSpec::kSimpleSpec, doc.objsize()).release(); - } else { - if (!_routingInfo->primary()) { - return Status(ErrorCodes::NamespaceNotFound, - str::stream() << "could not target insert in collection " << getNS().ns() - << "; no metadata found"); - } - - *endpoint = new ShardEndpoint(_routingInfo->primary()->getId(), ChunkVersion::UNSHARDED()); - } - - return Status::OK(); -} - -Status ChunkManagerTargeter::targetUpdate( - OperationContext* opCtx, - const BatchedUpdateDocument& updateDoc, - std::vector<std::unique_ptr<ShardEndpoint>>* endpoints) const { - // - // Update targeting may use either the query or the update. This is to support save-style - // updates, of the form: - // - // coll.update({ _id : xxx }, { _id : xxx, shardKey : 1, foo : bar }, { upsert : true }) - // - // Because drivers do not know the shard key, they can't pull the shard key automatically - // into the query doc, and to correctly support upsert we must target a single shard. - // - // The rule is simple - If the update is replacement style (no '$set'), we target using the - // update. If the update is replacement style, we target using the query. - // - // If we have the exact shard key in either the query or replacement doc, we target using - // that extracted key. - // - - BSONObj query = updateDoc.getQuery(); - BSONObj updateExpr = updateDoc.getUpdateExpr(); - - UpdateType updateType = getUpdateExprType(updateDoc.getUpdateExpr()); - - if (updateType == UpdateType_Unknown) { - return {ErrorCodes::UnsupportedFormat, - str::stream() << "update document " << updateExpr - << " has mixed $operator and non-$operator style fields"}; - } - - BSONObj shardKey; - - if (_routingInfo->cm()) { - // - // Sharded collections have the following futher requirements for targeting: - // - // Upserts must be targeted exactly by shard key. - // Non-multi updates must be targeted exactly by shard key *or* exact _id. - // - - // Get the shard key - if (updateType == UpdateType_OpStyle) { - // Target using the query - StatusWith<BSONObj> status = - _routingInfo->cm()->getShardKeyPattern().extractShardKeyFromQuery(opCtx, query); - - // Bad query - if (!status.isOK()) - return status.getStatus(); - - shardKey = status.getValue(); - } else { - // Target using the replacement document - shardKey = _routingInfo->cm()->getShardKeyPattern().extractShardKeyFromDoc(updateExpr); - } - - // Check shard key size on upsert. - if (updateDoc.getUpsert()) { - Status status = ShardKeyPattern::checkShardKeySize(shardKey); - if (!status.isOK()) - return status; - } - } - - const BSONObj collation = updateDoc.isCollationSet() ? updateDoc.getCollation() : BSONObj(); - - // Target the shard key, query, or replacement doc - if (!shardKey.isEmpty()) { - try { - endpoints->push_back( - targetShardKey(shardKey, collation, (query.objsize() + updateExpr.objsize()))); - return Status::OK(); - } catch (const DBException&) { - // This update is potentially not constrained to a single shard - } - } - - // We failed to target a single shard. - - // Upserts are required to target a single shard. - if (_routingInfo->cm() && updateDoc.getUpsert()) { - return Status(ErrorCodes::ShardKeyNotFound, - str::stream() << "An upsert on a sharded collection must contain the shard " - "key and have the simple collation. Update request: " - << updateDoc.toBSON() - << ", shard key pattern: " - << _routingInfo->cm()->getShardKeyPattern().toString()); - } - - // Parse update query. - auto qr = stdx::make_unique<QueryRequest>(getNS()); - qr->setFilter(updateDoc.getQuery()); - if (!collation.isEmpty()) { - qr->setCollation(collation); - } - auto cq = CanonicalQuery::canonicalize(opCtx, std::move(qr), ExtensionsCallbackNoop()); - if (!cq.isOK()) { - return Status(cq.getStatus().code(), - str::stream() << "Could not parse update query " << updateDoc.getQuery() - << causedBy(cq.getStatus())); - } - - // Single (non-multi) updates must target a single shard or be exact-ID. - if (_routingInfo->cm() && !updateDoc.getMulti() && - !isExactIdQuery(opCtx, *cq.getValue(), _routingInfo->cm().get())) { - return Status(ErrorCodes::ShardKeyNotFound, - str::stream() - << "A single update on a sharded collection must contain an exact " - "match on _id (and have the collection default collation) or " - "contain the shard key (and have the simple collation). Update " - "request: " - << updateDoc.toBSON() - << ", shard key pattern: " - << _routingInfo->cm()->getShardKeyPattern().toString()); - } - - if (updateType == UpdateType_OpStyle) { - return targetQuery(opCtx, query, collation, endpoints); - } else { - return targetDoc(opCtx, updateExpr, collation, endpoints); - } -} - -Status ChunkManagerTargeter::targetDelete( - OperationContext* opCtx, - const BatchedDeleteDocument& deleteDoc, - std::vector<std::unique_ptr<ShardEndpoint>>* endpoints) const { - BSONObj shardKey; - - if (_routingInfo->cm()) { - // - // Sharded collections have the following further requirements for targeting: - // - // Limit-1 deletes must be targeted exactly by shard key *or* exact _id - // - - // Get the shard key - StatusWith<BSONObj> status = - _routingInfo->cm()->getShardKeyPattern().extractShardKeyFromQuery(opCtx, - deleteDoc.getQuery()); - - // Bad query - if (!status.isOK()) - return status.getStatus(); - - shardKey = status.getValue(); - } - - const BSONObj collation = deleteDoc.isCollationSet() ? deleteDoc.getCollation() : BSONObj(); - - - // Target the shard key or delete query - if (!shardKey.isEmpty()) { - try { - endpoints->push_back(targetShardKey(shardKey, collation, 0)); - return Status::OK(); - } catch (const DBException&) { - // This delete is potentially not constrained to a single shard - } - } - - // We failed to target a single shard. - - // Parse delete query. - auto qr = stdx::make_unique<QueryRequest>(getNS()); - qr->setFilter(deleteDoc.getQuery()); - if (!collation.isEmpty()) { - qr->setCollation(collation); - } - auto cq = CanonicalQuery::canonicalize(opCtx, std::move(qr), ExtensionsCallbackNoop()); - if (!cq.isOK()) { - return Status(cq.getStatus().code(), - str::stream() << "Could not parse delete query " << deleteDoc.getQuery() - << causedBy(cq.getStatus())); - } - - // Single deletes must target a single shard or be exact-ID. - if (_routingInfo->cm() && deleteDoc.getLimit() == 1 && - !isExactIdQuery(opCtx, *cq.getValue(), _routingInfo->cm().get())) { - return Status(ErrorCodes::ShardKeyNotFound, - str::stream() - << "A single delete on a sharded collection must contain an exact " - "match on _id (and have the collection default collation) or " - "contain the shard key (and have the simple collation). Delete " - "request: " - << deleteDoc.toBSON() - << ", shard key pattern: " - << _routingInfo->cm()->getShardKeyPattern().toString()); - } - - return targetQuery(opCtx, deleteDoc.getQuery(), collation, endpoints); -} - -Status ChunkManagerTargeter::targetDoc( - OperationContext* opCtx, - const BSONObj& doc, - const BSONObj& collation, - std::vector<std::unique_ptr<ShardEndpoint>>* endpoints) const { - // NOTE: This is weird and fragile, but it's the way our language works right now - - // documents are either A) invalid or B) valid equality queries over themselves. - return targetQuery(opCtx, doc, collation, endpoints); -} - -Status ChunkManagerTargeter::targetQuery( - OperationContext* opCtx, - const BSONObj& query, - const BSONObj& collation, - std::vector<std::unique_ptr<ShardEndpoint>>* endpoints) const { - if (!_routingInfo->primary() && !_routingInfo->cm()) { - return {ErrorCodes::NamespaceNotFound, - str::stream() << "could not target query in " << getNS().ns() - << "; no metadata found"}; - } - - std::set<ShardId> shardIds; - if (_routingInfo->cm()) { - try { - _routingInfo->cm()->getShardIdsForQuery(opCtx, query, collation, &shardIds); - } catch (const DBException& ex) { - return ex.toStatus(); - } - } else { - shardIds.insert(_routingInfo->primary()->getId()); - } - - for (const ShardId& shardId : shardIds) { - endpoints->push_back(stdx::make_unique<ShardEndpoint>( - shardId, - _routingInfo->cm() ? _routingInfo->cm()->getVersion(shardId) - : ChunkVersion::UNSHARDED())); - } - - return Status::OK(); -} - -std::unique_ptr<ShardEndpoint> ChunkManagerTargeter::targetShardKey(const BSONObj& shardKey, - const BSONObj& collation, - long long estDataSize) const { - auto chunk = _routingInfo->cm()->findIntersectingChunk(shardKey, collation); - - // Track autosplit stats for sharded collections - // Note: this is only best effort accounting and is not accurate. - if (estDataSize > 0) { - _stats->chunkSizeDelta[chunk->getMin()] += estDataSize; - } - - return stdx::make_unique<ShardEndpoint>(chunk->getShardId(), - _routingInfo->cm()->getVersion(chunk->getShardId())); -} - -Status ChunkManagerTargeter::targetCollection( - std::vector<std::unique_ptr<ShardEndpoint>>* endpoints) const { - if (!_routingInfo->primary() && !_routingInfo->cm()) { - return {ErrorCodes::NamespaceNotFound, - str::stream() << "could not target full range of " << getNS().ns() - << "; metadata not found"}; - } - - std::set<ShardId> shardIds; - if (_routingInfo->cm()) { - _routingInfo->cm()->getAllShardIds(&shardIds); - } else { - shardIds.insert(_routingInfo->primary()->getId()); - } - - for (const ShardId& shardId : shardIds) { - endpoints->push_back(stdx::make_unique<ShardEndpoint>( - shardId, - _routingInfo->cm() ? _routingInfo->cm()->getVersion(shardId) - : ChunkVersion::UNSHARDED())); - } - - return Status::OK(); -} - -Status ChunkManagerTargeter::targetAllShards( - std::vector<std::unique_ptr<ShardEndpoint>>* endpoints) const { - if (!_routingInfo->primary() && !_routingInfo->cm()) { - return {ErrorCodes::NamespaceNotFound, - str::stream() << "could not target every shard with versions for " << getNS().ns() - << "; metadata not found"}; - } - - std::vector<ShardId> shardIds; - grid.shardRegistry()->getAllShardIds(&shardIds); - - for (const ShardId& shardId : shardIds) { - endpoints->push_back(stdx::make_unique<ShardEndpoint>( - shardId, - _routingInfo->cm() ? _routingInfo->cm()->getVersion(shardId) - : ChunkVersion::UNSHARDED())); - } - - return Status::OK(); -} - -void ChunkManagerTargeter::noteStaleResponse(const ShardEndpoint& endpoint, - const BSONObj& staleInfo) { - dassert(!_needsTargetingRefresh); - - ChunkVersion remoteShardVersion; - if (staleInfo["vWanted"].eoo()) { - // If we don't have a vWanted sent, assume the version is higher than our current - // version. - remoteShardVersion = getShardVersion(*_routingInfo, endpoint.shardName); - remoteShardVersion.incMajor(); - } else { - remoteShardVersion = ChunkVersion::fromBSON(staleInfo, "vWanted"); - } - - ShardVersionMap::iterator it = _remoteShardVersions.find(endpoint.shardName); - if (it == _remoteShardVersions.end()) { - _remoteShardVersions.insert(std::make_pair(endpoint.shardName, remoteShardVersion)); - } else { - ChunkVersion& previouslyNotedVersion = it->second; - if (previouslyNotedVersion.hasEqualEpoch(remoteShardVersion)) { - if (previouslyNotedVersion.isOlderThan(remoteShardVersion)) { - previouslyNotedVersion = remoteShardVersion; - } - } else { - // Epoch changed midway while applying the batch so set the version to something - // unique - // and non-existent to force a reload when refreshIsNeeded is called. - previouslyNotedVersion = ChunkVersion::IGNORED(); - } - } -} - -void ChunkManagerTargeter::noteCouldNotTarget() { - dassert(_remoteShardVersions.empty()); - _needsTargetingRefresh = true; -} - -Status ChunkManagerTargeter::refreshIfNeeded(OperationContext* opCtx, bool* wasChanged) { - bool dummy; - if (!wasChanged) { - wasChanged = &dummy; - } - - *wasChanged = false; - - // - // Did we have any stale config or targeting errors at all? - // - - if (!_needsTargetingRefresh && _remoteShardVersions.empty()) { - return Status::OK(); - } - - // - // Get the latest metadata information from the cache if there were issues - // - - auto lastManager = _routingInfo->cm(); - auto lastPrimary = _routingInfo->primary(); - - auto initStatus = init(opCtx); - if (!initStatus.isOK()) { - return initStatus; - } - - // We now have the latest metadata from the cache. - - // - // See if and how we need to do a remote refresh. - // Either we couldn't target at all, or we have stale versions, but not both. - // - - if (_needsTargetingRefresh) { - // Reset the field - _needsTargetingRefresh = false; - - // If we couldn't target, we might need to refresh if we haven't remotely refreshed - // the - // metadata since we last got it from the cache. - - bool alreadyRefreshed = wasMetadataRefreshed( - lastManager, lastPrimary, _routingInfo->cm(), _routingInfo->primary()); - - // If didn't already refresh the targeting information, refresh it - if (!alreadyRefreshed) { - // To match previous behavior, we just need an incremental refresh here - return refreshNow(opCtx); - } - - *wasChanged = isMetadataDifferent( - lastManager, lastPrimary, _routingInfo->cm(), _routingInfo->primary()); - return Status::OK(); - } else if (!_remoteShardVersions.empty()) { - // If we got stale shard versions from remote shards, we may need to refresh - // NOTE: Not sure yet if this can happen simultaneously with targeting issues - - CompareResult result = compareAllShardVersions(*_routingInfo, _remoteShardVersions); - - // Reset the versions - _remoteShardVersions.clear(); - - if (result == CompareResult_Unknown || result == CompareResult_LT) { - // Our current shard versions aren't all comparable to the old versions, maybe drop - return refreshNow(opCtx); - } - - *wasChanged = isMetadataDifferent( - lastManager, lastPrimary, _routingInfo->cm(), _routingInfo->primary()); - return Status::OK(); - } - - MONGO_UNREACHABLE; -} - -Status ChunkManagerTargeter::refreshNow(OperationContext* opCtx) { - Grid::get(opCtx)->catalogCache()->onStaleConfigError(std::move(*_routingInfo)); - - return init(opCtx); -} - -} // namespace mongo diff --git a/src/mongo/s/commands/chunk_manager_targeter.h b/src/mongo/s/commands/chunk_manager_targeter.h index 97c0f4c1455..36fe46a3fe5 100644 --- a/src/mongo/s/commands/chunk_manager_targeter.h +++ b/src/mongo/s/commands/chunk_manager_targeter.h @@ -35,12 +35,12 @@ #include "mongo/bson/bsonobj_comparator_interface.h" #include "mongo/bson/simple_bsonobj_comparator.h" #include "mongo/db/namespace_string.h" -#include "mongo/s/catalog_cache.h" #include "mongo/s/ns_targeter.h" namespace mongo { class ChunkManager; +class CollatorInterface; class OperationContext; class Shard; struct ChunkVersion; @@ -109,12 +109,21 @@ public: Status refreshIfNeeded(OperationContext* opCtx, bool* wasChanged); private: - using ShardVersionMap = std::map<ShardId, ChunkVersion>; + // Different ways we can refresh metadata + enum RefreshType { + // The version has gone up, but the collection hasn't been dropped + RefreshType_RefreshChunkManager, + // The collection may have been dropped, so we need to reload the db + RefreshType_ReloadDatabase + }; + + typedef std::map<ShardId, ChunkVersion> ShardVersionMap; + /** * Performs an actual refresh from the config server. */ - Status refreshNow(OperationContext* opCtx); + Status refreshNow(OperationContext* opCtx, RefreshType refreshType); /** * Returns a vector of ShardEndpoints where a document might need to be placed. @@ -161,8 +170,10 @@ private: // Represents only the view and not really part of the targeter state. This is not owned here. TargeterStats* _stats; - // The latest loaded routing cache entry - boost::optional<CachedCollectionRoutingInfo> _routingInfo; + // Zero or one of these are filled at all times + // If sharded, _manager, if unsharded, _primary, on error, neither + std::shared_ptr<ChunkManager> _manager; + std::shared_ptr<Shard> _primary; // Map of shard->remote shard version reported from stale errors ShardVersionMap _remoteShardVersions; diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp index d9a182b2938..a6887ea0498 100644 --- a/src/mongo/s/commands/cluster_aggregate.cpp +++ b/src/mongo/s/commands/cluster_aggregate.cpp @@ -47,7 +47,7 @@ #include "mongo/db/views/view.h" #include "mongo/executor/task_executor_pool.h" #include "mongo/rpc/get_status_from_command_result.h" -#include "mongo/s/catalog_cache.h" +#include "mongo/s/chunk_manager.h" #include "mongo/s/client/shard_connection.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/commands/cluster_commands_common.h" @@ -55,6 +55,7 @@ #include "mongo/s/grid.h" #include "mongo/s/query/cluster_query_knobs.h" #include "mongo/s/query/store_possible_cursor.h" +#include "mongo/s/sharding_raii.h" #include "mongo/s/stale_exception.h" #include "mongo/util/log.h" @@ -65,22 +66,20 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, BSONObj cmdObj, int options, BSONObjBuilder* result) { + auto scopedShardDbStatus = + ScopedShardDatabase::getExisting(opCtx, namespaces.executionNss.db()); + if (!scopedShardDbStatus.isOK()) { + appendEmptyResultSet( + *result, scopedShardDbStatus.getStatus(), namespaces.requestedNss.ns()); + return Status::OK(); + } + auto request = AggregationRequest::parseFromBSON(namespaces.executionNss, cmdObj); if (!request.isOK()) { return request.getStatus(); } - auto const catalogCache = Grid::get(opCtx)->catalogCache(); - - auto executionNsRoutingInfoStatus = - catalogCache->getCollectionRoutingInfo(opCtx, namespaces.executionNss); - if (!executionNsRoutingInfoStatus.isOK()) { - appendEmptyResultSet( - *result, executionNsRoutingInfoStatus.getStatus(), namespaces.requestedNss.ns()); - return Status::OK(); - } - - const auto& executionNsRoutingInfo = executionNsRoutingInfoStatus.getValue(); + const auto conf = scopedShardDbStatus.getValue().db(); // Determine the appropriate collation and 'resolve' involved namespaces to make the // ExpressionContext. @@ -92,20 +91,16 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, // command on an unsharded collection. StringMap<ExpressionContext::ResolvedNamespace> resolvedNamespaces; LiteParsedPipeline liteParsedPipeline(request.getValue()); - for (auto&& nss : liteParsedPipeline.getInvolvedNamespaces()) { - const auto resolvedNsRoutingInfo = - uassertStatusOK(catalogCache->getCollectionRoutingInfo(opCtx, nss)); - uassert( - 28769, str::stream() << nss.ns() << " cannot be sharded", !resolvedNsRoutingInfo.cm()); - resolvedNamespaces.try_emplace(nss.coll(), nss, std::vector<BSONObj>{}); + for (auto&& ns : liteParsedPipeline.getInvolvedNamespaces()) { + uassert(28769, str::stream() << ns.ns() << " cannot be sharded", !conf->isSharded(ns.ns())); + resolvedNamespaces[ns.coll()] = {ns, std::vector<BSONObj>{}}; } - if (!executionNsRoutingInfo.cm()) { - return aggPassthrough( - opCtx, namespaces, executionNsRoutingInfo.primary()->getId(), cmdObj, result, options); + if (!conf->isSharded(namespaces.executionNss.ns())) { + return aggPassthrough(opCtx, namespaces, conf, cmdObj, result, options); } - const auto chunkMgr = executionNsRoutingInfo.cm(); + auto chunkMgr = conf->getChunkManager(opCtx, namespaces.executionNss.ns()); std::unique_ptr<CollatorInterface> collation; if (!request.getValue().getCollation().isEmpty()) { @@ -265,10 +260,9 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, // Run merging command on random shard, unless a stage needs the primary shard. Need to use // ShardConnection so that the merging mongod is sent the config servers on connection init. auto& prng = opCtx->getClient()->getPrng(); - const auto mergingShardId = + const auto& mergingShardId = (needPrimaryShardMerger || internalQueryAlwaysMergeOnPrimaryShard.load()) - ? uassertStatusOK(catalogCache->getDatabase(opCtx, namespaces.executionNss.db())) - .primaryId() + ? conf->getPrimaryId() : shardResults[prng.nextInt32(shardResults.size())].shardTargetId; const auto mergingShard = uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, mergingShardId)); @@ -432,12 +426,12 @@ BSONObj ClusterAggregate::aggRunCommand(OperationContext* opCtx, Status ClusterAggregate::aggPassthrough(OperationContext* opCtx, const Namespaces& namespaces, - const ShardId& shardId, + DBConfig* conf, BSONObj cmdObj, BSONObjBuilder* out, int queryOptions) { // Temporary hack. See comment on declaration for details. - auto shardStatus = Grid::get(opCtx)->shardRegistry()->getShard(opCtx, shardId); + auto shardStatus = Grid::get(opCtx)->shardRegistry()->getShard(opCtx, conf->getPrimaryId()); if (!shardStatus.isOK()) { return shardStatus.getStatus(); } diff --git a/src/mongo/s/commands/cluster_aggregate.h b/src/mongo/s/commands/cluster_aggregate.h index d8e29744766..b0fdd5d7375 100644 --- a/src/mongo/s/commands/cluster_aggregate.h +++ b/src/mongo/s/commands/cluster_aggregate.h @@ -37,11 +37,11 @@ #include "mongo/db/pipeline/document_source.h" #include "mongo/db/pipeline/document_source_merge_cursors.h" #include "mongo/s/commands/strategy.h" +#include "mongo/s/config.h" namespace mongo { class OperationContext; -class ShardId; /** * Methods for running aggregation across a sharded cluster. @@ -90,7 +90,7 @@ private: static Status aggPassthrough(OperationContext* opCtx, const Namespaces& namespaces, - const ShardId& shardId, + DBConfig* conf, BSONObj cmd, BSONObjBuilder* result, int queryOptions); diff --git a/src/mongo/s/commands/cluster_commands_common.cpp b/src/mongo/s/commands/cluster_commands_common.cpp index 225cef2b6ef..3ad8c373b9a 100644 --- a/src/mongo/s/commands/cluster_commands_common.cpp +++ b/src/mongo/s/commands/cluster_commands_common.cpp @@ -40,6 +40,7 @@ #include "mongo/s/client/shard_connection.h" #include "mongo/s/client/version_manager.h" #include "mongo/s/grid.h" +#include "mongo/s/sharding_raii.h" #include "mongo/s/stale_exception.h" #include "mongo/util/log.h" @@ -53,21 +54,17 @@ namespace { bool forceRemoteCheckShardVersionCB(OperationContext* opCtx, const string& ns) { const NamespaceString nss(ns); - if (!nss.isValid()) { - return false; - } - // This will force the database catalog entry to be reloaded - Grid::get(opCtx)->catalogCache()->invalidateShardedCollection(nss); + Grid::get(opCtx)->catalogCache()->invalidate(nss.db()); - auto routingInfoStatus = Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss); - if (!routingInfoStatus.isOK()) { + auto scopedCMStatus = ScopedChunkManager::get(opCtx, nss); + if (!scopedCMStatus.isOK()) { return false; } - auto& routingInfo = routingInfoStatus.getValue(); + const auto& scopedCM = scopedCMStatus.getValue(); - return routingInfo.cm() != nullptr; + return scopedCM.cm() != nullptr; } } // namespace @@ -264,36 +261,4 @@ std::vector<NamespaceString> getAllShardedCollectionsForDb(OperationContext* opC return collectionsToReturn; } -CachedCollectionRoutingInfo getShardedCollection(OperationContext* opCtx, - const NamespaceString& nss) { - auto routingInfo = - uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss)); - uassert(ErrorCodes::NamespaceNotSharded, - str::stream() << "Collection " << nss.ns() << " is not sharded.", - routingInfo.cm()); - - return routingInfo; -} - -StatusWith<CachedDatabaseInfo> createShardDatabase(OperationContext* opCtx, StringData dbName) { - auto dbStatus = Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, dbName); - if (dbStatus == ErrorCodes::NamespaceNotFound) { - auto createDbStatus = - Grid::get(opCtx)->catalogClient(opCtx)->createDatabase(opCtx, dbName.toString()); - if (createDbStatus.isOK() || createDbStatus == ErrorCodes::NamespaceExists) { - dbStatus = Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, dbName); - } else { - dbStatus = createDbStatus; - } - } - - if (dbStatus.isOK()) { - return dbStatus; - } - - return {dbStatus.getStatus().code(), - str::stream() << "Database " << dbName << " not found due to " - << dbStatus.getStatus().reason()}; -} - } // namespace mongo diff --git a/src/mongo/s/commands/cluster_commands_common.h b/src/mongo/s/commands/cluster_commands_common.h index 960eb03e73b..7d6465bc400 100644 --- a/src/mongo/s/commands/cluster_commands_common.h +++ b/src/mongo/s/commands/cluster_commands_common.h @@ -39,8 +39,6 @@ namespace mongo { class AScopedConnection; -class CachedCollectionRoutingInfo; -class CachedDatabaseInfo; class DBClientBase; class DBClientCursor; class OperationContext; @@ -142,17 +140,4 @@ bool appendEmptyResultSet(BSONObjBuilder& result, Status status, const std::stri std::vector<NamespaceString> getAllShardedCollectionsForDb(OperationContext* opCtx, StringData dbName); -/** - * Abstracts the common pattern of refreshing a collection and checking if it is sharded used across - * multiple commands. - */ -CachedCollectionRoutingInfo getShardedCollection(OperationContext* opCtx, - const NamespaceString& nss); - -/** - * If the specified database exists already, loads it in the cache (if not already there) and - * returns it. Otherwise, if it does not exist, this call will implicitly create it as non-sharded. - */ -StatusWith<CachedDatabaseInfo> createShardDatabase(OperationContext* opCtx, StringData dbName); - } // namespace mongo diff --git a/src/mongo/s/commands/cluster_count_cmd.cpp b/src/mongo/s/commands/cluster_count_cmd.cpp index 45c46c26185..2fcf11086b9 100644 --- a/src/mongo/s/commands/cluster_count_cmd.cpp +++ b/src/mongo/s/commands/cluster_count_cmd.cpp @@ -42,42 +42,73 @@ #include "mongo/util/timer.h" namespace mongo { + +using std::string; +using std::vector; + namespace { +long long applySkipLimit(long long num, const BSONObj& cmd) { + BSONElement s = cmd["skip"]; + BSONElement l = cmd["limit"]; + + if (s.isNumber()) { + num = num - s.numberLong(); + if (num < 0) { + num = 0; + } + } + + if (l.isNumber()) { + long long limit = l.numberLong(); + if (limit < 0) { + limit = -limit; + } + + // 0 limit means no limit + if (limit < num && limit != 0) { + num = limit; + } + } + + return num; +} + + class ClusterCountCmd : public Command { public: ClusterCountCmd() : Command("count", false) {} - bool slaveOk() const override { + virtual bool slaveOk() const { return true; } - bool adminOnly() const override { + virtual bool adminOnly() const { return false; } - bool supportsWriteConcern(const BSONObj& cmd) const override { + + virtual bool supportsWriteConcern(const BSONObj& cmd) const override { return false; } - void addRequiredPrivileges(const std::string& dbname, - const BSONObj& cmdObj, - std::vector<Privilege>* out) override { + virtual void addRequiredPrivileges(const std::string& dbname, + const BSONObj& cmdObj, + std::vector<Privilege>* out) { ActionSet actions; actions.addAction(ActionType::find); out->push_back(Privilege(parseResourcePattern(dbname, cmdObj), actions)); } - bool run(OperationContext* opCtx, - const std::string& dbname, - BSONObj& cmdObj, - int options, - std::string& errmsg, - BSONObjBuilder& result) override { + virtual bool run(OperationContext* opCtx, + const std::string& dbname, + BSONObj& cmdObj, + int options, + std::string& errmsg, + BSONObjBuilder& result) { const NamespaceString nss(parseNs(dbname, cmdObj)); - uassert(ErrorCodes::InvalidNamespace, - str::stream() << "Invalid namespace specified '" << nss.ns() << "'", - nss.isValid()); + uassert( + ErrorCodes::InvalidNamespace, "count command requires valid namespace", nss.isValid()); long long skip = 0; @@ -136,7 +167,7 @@ public: } } - std::vector<Strategy::CommandResult> countResult; + vector<Strategy::CommandResult> countResult; Strategy::commandOp(opCtx, dbname, countCmdBuilder.done(), @@ -183,19 +214,20 @@ public: long long total = 0; BSONObjBuilder shardSubTotal(result.subobjStart("shards")); - for (const auto& resultEntry : countResult) { - const ShardId& shardName = resultEntry.shardTargetId; - const auto resultBSON = resultEntry.result; + for (vector<Strategy::CommandResult>::const_iterator iter = countResult.begin(); + iter != countResult.end(); + ++iter) { + const ShardId& shardName = iter->shardTargetId; - if (resultBSON["ok"].trueValue()) { - long long shardCount = resultBSON["n"].numberLong(); + if (iter->result["ok"].trueValue()) { + long long shardCount = iter->result["n"].numberLong(); shardSubTotal.appendNumber(shardName.toString(), shardCount); total += shardCount; } else { shardSubTotal.doneFast(); errmsg = "failed on : " + shardName.toString(); - result.append("cause", resultBSON); + result.append("cause", iter->result); // Add "code" to the top-level response, if the failure of the sharded command // can be accounted to a single error @@ -215,16 +247,17 @@ public: return true; } - Status explain(OperationContext* opCtx, - const std::string& dbname, - const BSONObj& cmdObj, - ExplainCommon::Verbosity verbosity, - const rpc::ServerSelectionMetadata& serverSelectionMetadata, - BSONObjBuilder* out) const override { + virtual Status explain(OperationContext* opCtx, + const std::string& dbname, + const BSONObj& cmdObj, + ExplainCommon::Verbosity verbosity, + const rpc::ServerSelectionMetadata& serverSelectionMetadata, + BSONObjBuilder* out) const { const NamespaceString nss(parseNs(dbname, cmdObj)); - uassert(ErrorCodes::InvalidNamespace, - str::stream() << "Invalid namespace specified '" << nss.ns() << "'", - nss.isValid()); + if (!nss.isValid()) { + return Status{ErrorCodes::InvalidNamespace, + str::stream() << "Invalid collection name: " << nss.ns()}; + } // Extract the targeting query. BSONObj targetingQuery; @@ -251,7 +284,7 @@ public: // We will time how long it takes to run the commands on the shards Timer timer; - std::vector<Strategy::CommandResult> shardResults; + vector<Strategy::CommandResult> shardResults; Strategy::commandOp(opCtx, dbname, explainCmdBob.obj(), @@ -296,33 +329,6 @@ public: opCtx, shardResults, mongosStageName, millisElapsed, out); } -private: - static long long applySkipLimit(long long num, const BSONObj& cmd) { - BSONElement s = cmd["skip"]; - BSONElement l = cmd["limit"]; - - if (s.isNumber()) { - num = num - s.numberLong(); - if (num < 0) { - num = 0; - } - } - - if (l.isNumber()) { - long long limit = l.numberLong(); - if (limit < 0) { - limit = -limit; - } - - // 0 limit means no limit - if (limit < num && limit != 0) { - num = limit; - } - } - - return num; - } - } clusterCountCmd; } // namespace diff --git a/src/mongo/s/commands/cluster_drop_cmd.cpp b/src/mongo/s/commands/cluster_drop_cmd.cpp index 583c8902cbf..2c44a5a1dbc 100644 --- a/src/mongo/s/commands/cluster_drop_cmd.cpp +++ b/src/mongo/s/commands/cluster_drop_cmd.cpp @@ -41,6 +41,7 @@ #include "mongo/s/commands/cluster_commands_common.h" #include "mongo/s/commands/sharded_command_processing.h" #include "mongo/s/grid.h" +#include "mongo/s/sharding_raii.h" #include "mongo/s/stale_exception.h" #include "mongo/util/log.h" @@ -79,20 +80,20 @@ public: BSONObjBuilder& result) override { const NamespaceString nss(parseNsCollectionRequired(dbname, cmdObj)); - auto const catalogCache = Grid::get(opCtx)->catalogCache(); - - auto routingInfoStatus = catalogCache->getCollectionRoutingInfo(opCtx, nss); - if (routingInfoStatus == ErrorCodes::NamespaceNotFound) { + auto scopedDbStatus = ScopedShardDatabase::getExisting(opCtx, dbname); + if (scopedDbStatus == ErrorCodes::NamespaceNotFound) { return true; } - auto routingInfo = uassertStatusOK(std::move(routingInfoStatus)); + uassertStatusOK(scopedDbStatus.getStatus()); + + auto const db = scopedDbStatus.getValue().db(); - if (!routingInfo.cm()) { - _dropUnshardedCollectionFromShard(opCtx, routingInfo.primaryId(), nss, &result); + if (!db->isSharded(nss.ns())) { + _dropUnshardedCollectionFromShard(opCtx, db->getPrimaryId(), nss, &result); } else { uassertStatusOK(Grid::get(opCtx)->catalogClient(opCtx)->dropCollection(opCtx, nss)); - catalogCache->invalidateShardedCollection(nss); + db->markNSNotSharded(nss.ns()); } return true; diff --git a/src/mongo/s/commands/cluster_drop_database_cmd.cpp b/src/mongo/s/commands/cluster_drop_database_cmd.cpp index 178fc5f36bc..f86cf073273 100644 --- a/src/mongo/s/commands/cluster_drop_database_cmd.cpp +++ b/src/mongo/s/commands/cluster_drop_database_cmd.cpp @@ -40,7 +40,9 @@ #include "mongo/s/client/shard_registry.h" #include "mongo/s/commands/cluster_commands_common.h" #include "mongo/s/commands/sharded_command_processing.h" +#include "mongo/s/config.h" #include "mongo/s/grid.h" +#include "mongo/s/sharding_raii.h" #include "mongo/util/log.h" namespace mongo { @@ -91,19 +93,17 @@ public: auto scopedDistLock = uassertStatusOK(catalogClient->getDistLockManager()->lock( opCtx, dbname, "dropDatabase", DistLockManager::kDefaultLockTimeout)); - auto const catalogCache = Grid::get(opCtx)->catalogCache(); - // Refresh the database metadata so it kicks off a full reload - catalogCache->purgeDatabase(dbname); + Grid::get(opCtx)->catalogCache()->invalidate(dbname); - auto dbInfoStatus = catalogCache->getDatabase(opCtx, dbname); + auto scopedDbStatus = ScopedShardDatabase::getExisting(opCtx, dbname); - if (dbInfoStatus == ErrorCodes::NamespaceNotFound) { + if (scopedDbStatus == ErrorCodes::NamespaceNotFound) { result.append("info", "database does not exist"); return true; } - uassertStatusOK(dbInfoStatus.getStatus()); + uassertStatusOK(scopedDbStatus.getStatus()); catalogClient->logChange(opCtx, "dropDatabase.start", @@ -111,15 +111,16 @@ public: BSONObj(), ShardingCatalogClient::kMajorityWriteConcern); - auto& dbInfo = dbInfoStatus.getValue(); + auto const db = scopedDbStatus.getValue().db(); // Drop the database's collections from metadata for (const auto& nss : getAllShardedCollectionsForDb(opCtx, dbname)) { uassertStatusOK(catalogClient->dropCollection(opCtx, nss)); + db->markNSNotSharded(nss.ns()); } // Drop the database from the primary shard first - _dropDatabaseFromShard(opCtx, dbInfo.primaryId(), dbname); + _dropDatabaseFromShard(opCtx, db->getPrimaryId(), dbname); // Drop the database from each of the remaining shards { @@ -145,7 +146,7 @@ public: } // Invalidate the database so the next access will do a full reload - catalogCache->purgeDatabase(dbname); + Grid::get(opCtx)->catalogCache()->invalidate(dbname); catalogClient->logChange( opCtx, "dropDatabase", dbname, BSONObj(), ShardingCatalogClient::kMajorityWriteConcern); diff --git a/src/mongo/s/commands/cluster_enable_sharding_cmd.cpp b/src/mongo/s/commands/cluster_enable_sharding_cmd.cpp index 64537920cb7..1db7ea7ef03 100644 --- a/src/mongo/s/commands/cluster_enable_sharding_cmd.cpp +++ b/src/mongo/s/commands/cluster_enable_sharding_cmd.cpp @@ -41,6 +41,7 @@ #include "mongo/db/commands.h" #include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/catalog_cache.h" +#include "mongo/s/config.h" #include "mongo/s/grid.h" #include "mongo/util/log.h" @@ -108,7 +109,7 @@ public: audit::logEnableSharding(Client::getCurrent(), dbname); // Make sure to force update of any stale metadata - Grid::get(opCtx)->catalogCache()->purgeDatabase(dbname); + Grid::get(opCtx)->catalogCache()->invalidate(dbname); return true; } diff --git a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp index feae1fab5e2..578968205af 100644 --- a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp +++ b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp @@ -40,6 +40,7 @@ #include "mongo/db/query/collation/collator_factory_interface.h" #include "mongo/s/balancer_configuration.h" #include "mongo/s/catalog_cache.h" +#include "mongo/s/chunk_manager.h" #include "mongo/s/client/shard_connection.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/commands/cluster_explain.h" @@ -47,6 +48,7 @@ #include "mongo/s/commands/sharded_command_processing.h" #include "mongo/s/commands/strategy.h" #include "mongo/s/grid.h" +#include "mongo/s/sharding_raii.h" #include "mongo/s/stale_exception.h" #include "mongo/util/timer.h" @@ -61,42 +63,48 @@ class FindAndModifyCmd : public Command { public: FindAndModifyCmd() : Command("findAndModify", false, "findandmodify") {} - bool slaveOk() const override { + virtual bool slaveOk() const { return true; } - bool adminOnly() const override { + virtual bool adminOnly() const { return false; } - bool supportsWriteConcern(const BSONObj& cmd) const override { + + virtual bool supportsWriteConcern(const BSONObj& cmd) const override { return true; } - void addRequiredPrivileges(const std::string& dbname, - const BSONObj& cmdObj, - std::vector<Privilege>* out) override { + virtual void addRequiredPrivileges(const std::string& dbname, + const BSONObj& cmdObj, + std::vector<Privilege>* out) { find_and_modify::addPrivilegesRequiredForFindAndModify(this, dbname, cmdObj, out); } - Status explain(OperationContext* opCtx, - const std::string& dbName, - const BSONObj& cmdObj, - ExplainCommon::Verbosity verbosity, - const rpc::ServerSelectionMetadata& serverSelectionMetadata, - BSONObjBuilder* out) const override { - const NamespaceString nss(parseNsCollectionRequired(dbName, cmdObj)); + virtual Status explain(OperationContext* opCtx, + const std::string& dbName, + const BSONObj& cmdObj, + ExplainCommon::Verbosity verbosity, + const rpc::ServerSelectionMetadata& serverSelectionMetadata, + BSONObjBuilder* out) const { + const NamespaceString nss = parseNsCollectionRequired(dbName, cmdObj); - auto routingInfo = - uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss)); + auto scopedDB = uassertStatusOK(ScopedShardDatabase::getExisting(opCtx, dbName)); + const auto conf = scopedDB.db(); shared_ptr<ChunkManager> chunkMgr; shared_ptr<Shard> shard; - if (!routingInfo.cm()) { - shard = routingInfo.primary(); + if (!conf->isSharded(nss.ns())) { + auto shardStatus = + Grid::get(opCtx)->shardRegistry()->getShard(opCtx, conf->getPrimaryId()); + if (!shardStatus.isOK()) { + return shardStatus.getStatus(); + } + shard = shardStatus.getValue(); } else { - chunkMgr = routingInfo.cm(); + chunkMgr = _getChunkManager(opCtx, conf, nss); const BSONObj query = cmdObj.getObjectField("query"); @@ -110,7 +118,7 @@ public: return collationElementStatus; } - StatusWith<BSONObj> status = _getShardKey(opCtx, *chunkMgr, query); + StatusWith<BSONObj> status = _getShardKey(opCtx, chunkMgr, query); if (!status.isOK()) { return status.getStatus(); } @@ -123,7 +131,6 @@ public: if (!shardStatus.isOK()) { return shardStatus.getStatus(); } - shard = shardStatus.getValue(); } @@ -136,7 +143,7 @@ public: Timer timer; BSONObjBuilder result; - bool ok = _runCommand(opCtx, chunkMgr, shard->getId(), nss, explainCmd.obj(), result); + bool ok = _runCommand(opCtx, conf, chunkMgr, shard->getId(), nss, explainCmd.obj(), result); long long millisElapsed = timer.millis(); if (!ok) { @@ -157,23 +164,24 @@ public: opCtx, shardResults, ClusterExplain::kSingleShard, millisElapsed, out); } - bool run(OperationContext* opCtx, - const std::string& dbName, - BSONObj& cmdObj, - int options, - std::string& errmsg, - BSONObjBuilder& result) override { + virtual bool run(OperationContext* opCtx, + const std::string& dbName, + BSONObj& cmdObj, + int options, + std::string& errmsg, + BSONObjBuilder& result) { const NamespaceString nss = parseNsCollectionRequired(dbName, cmdObj); // findAndModify should only be creating database if upsert is true, but this would require // that the parsing be pulled into this function. - auto routingInfo = - uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss)); - if (!routingInfo.cm()) { - return _runCommand(opCtx, nullptr, routingInfo.primaryId(), nss, cmdObj, result); + auto scopedDb = uassertStatusOK(ScopedShardDatabase::getOrCreate(opCtx, dbName)); + const auto conf = scopedDb.db(); + + if (!conf->isSharded(nss.ns())) { + return _runCommand(opCtx, conf, nullptr, conf->getPrimaryId(), nss, cmdObj, result); } - const auto chunkMgr = routingInfo.cm(); + shared_ptr<ChunkManager> chunkMgr = _getChunkManager(opCtx, conf, nss); const BSONObj query = cmdObj.getObjectField("query"); @@ -187,11 +195,17 @@ public: return appendCommandStatus(result, collationElementStatus); } - BSONObj shardKey = uassertStatusOK(_getShardKey(opCtx, *chunkMgr, query)); + StatusWith<BSONObj> status = _getShardKey(opCtx, chunkMgr, query); + if (!status.isOK()) { + // Bad query + return appendCommandStatus(result, status.getStatus()); + } + BSONObj shardKey = status.getValue(); auto chunk = chunkMgr->findIntersectingChunk(shardKey, collation); - const bool ok = _runCommand(opCtx, chunkMgr, chunk->getShardId(), nss, cmdObj, result); + const bool ok = + _runCommand(opCtx, conf, chunkMgr, chunk->getShardId(), nss, cmdObj, result); if (ok) { updateChunkWriteStatsAndSplitIfNeeded( opCtx, chunkMgr.get(), chunk.get(), cmdObj.getObjectField("update").objsize()); @@ -201,12 +215,21 @@ public: } private: - static StatusWith<BSONObj> _getShardKey(OperationContext* opCtx, - const ChunkManager& chunkMgr, - const BSONObj& query) { + shared_ptr<ChunkManager> _getChunkManager(OperationContext* opCtx, + DBConfig* conf, + const NamespaceString& nss) const { + shared_ptr<ChunkManager> chunkMgr = conf->getChunkManager(opCtx, nss.ns()); + massert(13002, "shard internal error chunk manager should never be null", chunkMgr); + + return chunkMgr; + } + + StatusWith<BSONObj> _getShardKey(OperationContext* opCtx, + shared_ptr<ChunkManager> chunkMgr, + const BSONObj& query) const { // Verify that the query has an equality predicate using the shard key StatusWith<BSONObj> status = - chunkMgr.getShardKeyPattern().extractShardKeyFromQuery(opCtx, query); + chunkMgr->getShardKeyPattern().extractShardKeyFromQuery(opCtx, query); if (!status.isOK()) { return status; @@ -222,19 +245,20 @@ private: return shardKey; } - static bool _runCommand(OperationContext* opCtx, - shared_ptr<ChunkManager> chunkManager, - const ShardId& shardId, - const NamespaceString& nss, - const BSONObj& cmdObj, - BSONObjBuilder& result) { + bool _runCommand(OperationContext* opCtx, + DBConfig* conf, + shared_ptr<ChunkManager> chunkManager, + const ShardId& shardId, + const NamespaceString& nss, + const BSONObj& cmdObj, + BSONObjBuilder& result) const { BSONObj res; const auto shard = uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, shardId)); ShardConnection conn(shard->getConnString(), nss.ns(), chunkManager); - bool ok = conn->runCommand(nss.db().toString(), cmdObj, res); + bool ok = conn->runCommand(conf->name(), cmdObj, res); conn.done(); // ErrorCodes::RecvStaleConfig is the code for RecvStaleConfigException. diff --git a/src/mongo/s/commands/cluster_flush_router_config_cmd.cpp b/src/mongo/s/commands/cluster_flush_router_config_cmd.cpp index 8931166863c..35150ac3aca 100644 --- a/src/mongo/s/commands/cluster_flush_router_config_cmd.cpp +++ b/src/mongo/s/commands/cluster_flush_router_config_cmd.cpp @@ -70,7 +70,7 @@ public: int options, std::string& errmsg, BSONObjBuilder& result) { - Grid::get(opCtx)->catalogCache()->purgeAllDatabases(); + Grid::get(opCtx)->catalogCache()->invalidateAll(); result.appendBool("flushed", true); return true; diff --git a/src/mongo/s/commands/cluster_get_shard_version_cmd.cpp b/src/mongo/s/commands/cluster_get_shard_version_cmd.cpp index 2bb23453bba..00e104c1e45 100644 --- a/src/mongo/s/commands/cluster_get_shard_version_cmd.cpp +++ b/src/mongo/s/commands/cluster_get_shard_version_cmd.cpp @@ -35,9 +35,8 @@ #include "mongo/db/auth/authorization_manager.h" #include "mongo/db/auth/authorization_session.h" #include "mongo/db/commands.h" -#include "mongo/s/catalog_cache.h" -#include "mongo/s/commands/cluster_commands_common.h" #include "mongo/s/grid.h" +#include "mongo/s/sharding_raii.h" #include "mongo/util/log.h" namespace mongo { @@ -87,10 +86,13 @@ public: BSONObjBuilder& result) override { const NamespaceString nss(parseNs(dbname, cmdObj)); - auto routingInfo = getShardedCollection(opCtx, nss); - const auto cm = routingInfo.cm(); + auto scopedDb = uassertStatusOK(ScopedShardDatabase::getExisting(opCtx, nss.db())); + auto config = scopedDb.db(); - for (const auto& cmEntry : cm->chunkMap()) { + auto cm = config->getChunkManagerIfExists(opCtx, nss.ns()); + uassert(ErrorCodes::NamespaceNotSharded, "ns [" + nss.ns() + " is not sharded.", cm); + + for (const auto& cmEntry : cm->getChunkMap()) { log() << redact(cmEntry.second->toString()); } diff --git a/src/mongo/s/commands/cluster_map_reduce_cmd.cpp b/src/mongo/s/commands/cluster_map_reduce_cmd.cpp index b155d322b3e..088b8d6d4d1 100644 --- a/src/mongo/s/commands/cluster_map_reduce_cmd.cpp +++ b/src/mongo/s/commands/cluster_map_reduce_cmd.cpp @@ -45,17 +45,27 @@ #include "mongo/s/catalog/dist_lock_manager.h" #include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/catalog_cache.h" +#include "mongo/s/chunk_manager.h" #include "mongo/s/client/shard_connection.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/commands/cluster_commands_common.h" #include "mongo/s/commands/cluster_write.h" #include "mongo/s/commands/sharded_command_processing.h" #include "mongo/s/commands/strategy.h" +#include "mongo/s/config.h" #include "mongo/s/grid.h" +#include "mongo/s/sharding_raii.h" #include "mongo/stdx/chrono.h" #include "mongo/util/log.h" namespace mongo { + +using std::shared_ptr; +using std::map; +using std::set; +using std::string; +using std::vector; + namespace { AtomicUInt32 JOB_NUMBER; @@ -65,7 +75,7 @@ const Milliseconds kNoDistLockTimeout(-1); /** * Generates a unique name for the temporary M/R output collection. */ -std::string getTmpName(StringData coll) { +string getTmpName(StringData coll) { return str::stream() << "tmp.mrs." << coll << "_" << time(0) << "_" << JOB_NUMBER.fetchAndAdd(1); } @@ -75,14 +85,14 @@ std::string getTmpName(StringData coll) { * be sent to the shards as part of the first phase of map/reduce. */ BSONObj fixForShards(const BSONObj& orig, - const std::string& output, - std::string& badShardedField, + const string& output, + string& badShardedField, int maxChunkSizeBytes) { BSONObjBuilder b; BSONObjIterator i(orig); while (i.more()) { BSONElement e = i.next(); - const std::string fn = e.fieldName(); + const string fn = e.fieldName(); if (fn == bypassDocumentValidationCommandOption() || fn == "map" || fn == "mapreduce" || fn == "mapReduce" || fn == "mapparams" || fn == "reduce" || fn == "query" || @@ -150,49 +160,47 @@ class MRCmd : public Command { public: MRCmd() : Command("mapReduce", false, "mapreduce") {} - bool slaveOk() const override { + virtual bool slaveOk() const { return true; } - bool adminOnly() const override { + virtual bool adminOnly() const { return false; } - std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const override { - return parseNsCollectionRequired(dbname, cmdObj).ns(); - } - - bool supportsWriteConcern(const BSONObj& cmd) const override { + virtual bool supportsWriteConcern(const BSONObj& cmd) const override { return mr::mrSupportsWriteConcern(cmd); } - void help(std::stringstream& help) const override { + virtual void help(std::stringstream& help) const { help << "Runs the sharded map/reduce command"; } - void addRequiredPrivileges(const std::string& dbname, - const BSONObj& cmdObj, - std::vector<Privilege>* out) override { + virtual void addRequiredPrivileges(const std::string& dbname, + const BSONObj& cmdObj, + std::vector<Privilege>* out) { mr::addPrivilegesRequiredForMapReduce(this, dbname, cmdObj, out); } - bool run(OperationContext* opCtx, - const std::string& dbname, - BSONObj& cmdObj, - int options, - std::string& errmsg, - BSONObjBuilder& result) override { + virtual bool run(OperationContext* opCtx, + const std::string& dbname, + BSONObj& cmdObj, + int options, + std::string& errmsg, + BSONObjBuilder& result) { Timer t; const NamespaceString nss(parseNs(dbname, cmdObj)); - const std::string shardResultCollection = getTmpName(nss.coll()); + uassert(ErrorCodes::InvalidNamespace, "Invalid namespace", nss.isValid()); + + const string shardResultCollection = getTmpName(nss.coll()); bool shardedOutput = false; - bool customOutDB = false; NamespaceString outputCollNss; + bool customOutDB = false; bool inlineOutput = false; - std::string outDB = dbname; + string outDB = dbname; BSONElement outElmt = cmdObj.getField("out"); if (outElmt.type() == Object) { @@ -210,8 +218,7 @@ public: !customOut.hasField("db")); } else { // Mode must be 1st element - const std::string finalColShort = customOut.firstElement().str(); - + const string finalColShort = customOut.firstElement().str(); if (customOut.hasField("db")) { customOutDB = true; outDB = customOut.getField("db").str(); @@ -224,27 +231,44 @@ public: } } - auto const catalogCache = Grid::get(opCtx)->catalogCache(); - - // Ensure the input database exists and set up the input collection - auto inputRoutingInfo = uassertStatusOK(catalogCache->getCollectionRoutingInfo(opCtx, nss)); + // Ensure the input database exists + auto status = Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, dbname); + if (!status.isOK()) { + return appendCommandStatus(result, status.getStatus()); + } - const bool shardedInput = inputRoutingInfo.cm() != nullptr; + shared_ptr<DBConfig> confIn = status.getValue(); - // Create the output database implicitly if we have a custom output requested + shared_ptr<DBConfig> confOut; if (customOutDB) { - uassertStatusOK(createShardDatabase(opCtx, outDB)); + // Create the output database implicitly, since we have a custom output requested + auto scopedDb = uassertStatusOK(ScopedShardDatabase::getOrCreate(opCtx, outDB)); + confOut = scopedDb.getSharedDbReference(); + } else { + confOut = confIn; } - // Ensure that the output database doesn't reside on the config server - auto outputDbInfo = uassertStatusOK(catalogCache->getDatabase(opCtx, outDB)); - uassert(ErrorCodes::CommandNotSupported, - str::stream() << "Can not execute mapReduce with output database " << outDB - << " which lives on config servers", - inlineOutput || outputDbInfo.primaryId() != "config"); + if (confOut->getPrimaryId() == "config" && !inlineOutput) { + return appendCommandStatus( + result, + Status(ErrorCodes::CommandNotSupported, + str::stream() << "Can not execute mapReduce with output database " << outDB + << " which lives on config servers")); + } - int64_t maxChunkSizeBytes = 0; + const bool shardedInput = confIn && confIn->isSharded(nss.ns()); + + if (!shardedOutput) { + uassert(15920, + "Cannot output to a non-sharded collection because " + "sharded collection exists already", + !confOut->isSharded(outputCollNss.ns())); + + // TODO: Should we also prevent going from non-sharded to sharded? During the + // transition client may see partial data. + } + int64_t maxChunkSizeBytes = 0; if (shardedOutput) { // Will need to figure out chunks, ask shards for points maxChunkSizeBytes = cmdObj["maxChunkSizeBytes"].numberLong(); @@ -255,40 +279,29 @@ public: // maxChunkSizeBytes is sent as int BSON field invariant(maxChunkSizeBytes < std::numeric_limits<int>::max()); - } else if (outputCollNss.isValid()) { - auto outputRoutingInfo = - uassertStatusOK(catalogCache->getCollectionRoutingInfo(opCtx, outputCollNss)); - - uassert(15920, - "Cannot output to a non-sharded collection because " - "sharded collection exists already", - !outputRoutingInfo.cm()); - - // TODO: Should we also prevent going from non-sharded to sharded? During the - // transition client may see partial data. } const auto shardRegistry = Grid::get(opCtx)->shardRegistry(); // modify command to run on shards with output to tmp collection - std::string badShardedField; + string badShardedField; BSONObj shardedCommand = fixForShards(cmdObj, shardResultCollection, badShardedField, maxChunkSizeBytes); if (!shardedInput && !shardedOutput && !customOutDB) { LOG(1) << "simple MR, just passthrough"; - invariant(inputRoutingInfo.primary()); + const auto shard = + uassertStatusOK(shardRegistry->getShard(opCtx, confIn->getPrimaryId())); - ShardConnection conn(inputRoutingInfo.primary()->getConnString(), ""); + ShardConnection conn(shard->getConnString(), ""); BSONObj res; bool ok = conn->runCommand(dbname, cmdObj, res); conn.done(); if (auto wcErrorElem = res["writeConcernError"]) { - appendWriteConcernErrorToCmdResponse( - inputRoutingInfo.primary()->getId(), wcErrorElem, result); + appendWriteConcernErrorToCmdResponse(shard->getId(), wcErrorElem, result); } result.appendElementsUnique(res); @@ -310,13 +323,12 @@ public: collation = cmdObj["collation"].embeddedObjectUserCheck(); } - std::set<std::string> servers; - std::vector<Strategy::CommandResult> mrCommandResults; + set<string> servers; + vector<Strategy::CommandResult> mrCommandResults; BSONObjBuilder shardResultsB; BSONObjBuilder shardCountsB; - std::map<std::string, int64_t> countsMap; - + map<string, int64_t> countsMap; auto splitPts = SimpleBSONObjComparator::kInstance.makeBSONObjSet(); { @@ -337,12 +349,12 @@ public: for (const auto& mrResult : mrCommandResults) { // Need to gather list of all servers even if an error happened - const auto server = [&]() { + string server; + { const auto shard = uassertStatusOK(shardRegistry->getShard(opCtx, mrResult.shardTargetId)); - return shard->getConnString().toString(); - }(); - + server = shard->getConnString().toString(); + } servers.insert(server); if (!ok) { @@ -374,14 +386,15 @@ public: if (singleResult.hasField("splitKeys")) { BSONElement splitKeys = singleResult.getField("splitKeys"); - for (const auto& splitPt : splitKeys.Array()) { - splitPts.insert(splitPt.Obj().getOwned()); + vector<BSONElement> pts = splitKeys.Array(); + for (vector<BSONElement>::iterator it = pts.begin(); it != pts.end(); ++it) { + splitPts.insert(it->Obj().getOwned()); } } } if (!ok) { - cleanUp(servers, dbname, shardResultCollection); + _cleanUp(servers, dbname, shardResultCollection); // Add "code" to the top-level response, if the failure of the sharded command // can be accounted to a single error. @@ -429,15 +442,16 @@ public: bool ok = true; BSONObj singleResult; + bool hasWCError = false; if (!shardedOutput) { - LOG(1) << "MR with single shard output, NS=" << outputCollNss - << " primary=" << outputDbInfo.primaryId(); + const auto shard = + uassertStatusOK(shardRegistry->getShard(opCtx, confOut->getPrimaryId())); - const auto outputShard = - uassertStatusOK(shardRegistry->getShard(opCtx, outputDbInfo.primaryId())); + LOG(1) << "MR with single shard output, NS=" << outputCollNss.ns() + << " primary=" << shard->toString(); - ShardConnection conn(outputShard->getConnString(), outputCollNss.ns()); + ShardConnection conn(shard->getConnString(), outputCollNss.ns()); ok = conn->runCommand(outDB, finalCmd.obj(), singleResult); BSONObj counts = singleResult.getObjectField("counts"); @@ -446,19 +460,79 @@ public: outputCount = counts.getIntField("output"); conn.done(); - - if (auto wcErrorElem = singleResult["writeConcernError"]) { - appendWriteConcernErrorToCmdResponse(outputShard->getId(), wcErrorElem, result); + if (!hasWCError) { + if (auto wcErrorElem = singleResult["writeConcernError"]) { + appendWriteConcernErrorToCmdResponse(shard->getId(), wcErrorElem, result); + hasWCError = true; + } } } else { LOG(1) << "MR with sharded output, NS=" << outputCollNss.ns(); - auto outputRoutingInfo = - uassertStatusOK(catalogCache->getCollectionRoutingInfo(opCtx, outputCollNss)); - // Create the sharded collection if needed - if (!outputRoutingInfo.cm()) { - outputRoutingInfo = createShardedOutputCollection(opCtx, outputCollNss, splitPts); + if (!confOut->isSharded(outputCollNss.ns())) { + // Enable sharding on the output db + Status status = Grid::get(opCtx)->catalogClient(opCtx)->enableSharding( + opCtx, outputCollNss.db().toString()); + + // If the database has sharding already enabled, we can ignore the error + if (status.isOK()) { + // Invalidate the output database so it gets reloaded on the next fetch attempt + Grid::get(opCtx)->catalogCache()->invalidate(outputCollNss.db()); + } else if (status != ErrorCodes::AlreadyInitialized) { + uassertStatusOK(status); + } + + confOut.reset(); + confOut = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getDatabase( + opCtx, outputCollNss.db().toString())); + + // Shard collection according to split points + vector<BSONObj> sortedSplitPts; + + // Points will be properly sorted using the set + for (const auto& splitPt : splitPts) { + sortedSplitPts.push_back(splitPt); + } + + // Pre-split the collection onto all the shards for this database. Note that + // it's not completely safe to pre-split onto non-primary shards using the + // shardcollection method (a conflict may result if multiple map-reduces are + // writing to the same output collection, for instance). + // + // TODO: pre-split mapReduce output in a safer way. + + const std::set<ShardId> outShardIds = [&]() { + std::vector<ShardId> shardIds; + shardRegistry->getAllShardIds(&shardIds); + uassert(ErrorCodes::ShardNotFound, + str::stream() + << "Unable to find shards on which to place output collection " + << outputCollNss.ns(), + !shardIds.empty()); + + return std::set<ShardId>(shardIds.begin(), shardIds.end()); + }(); + + + BSONObj sortKey = BSON("_id" << 1); + ShardKeyPattern sortKeyPattern(sortKey); + + // The collection default collation for the output collection. This is empty, + // representing the simple binary comparison collation. + BSONObj defaultCollation; + + uassertStatusOK( + Grid::get(opCtx)->catalogClient(opCtx)->shardCollection(opCtx, + outputCollNss.ns(), + sortKeyPattern, + defaultCollation, + true, + sortedSplitPts, + outShardIds)); + + // Make sure the cached metadata for the collection knows that we are now sharded + confOut->getChunkManager(opCtx, outputCollNss.ns(), true /* reload */); } auto chunkSizes = SimpleBSONObjComparator::kInstance.makeBSONObjIndexedMap<int>(); @@ -493,16 +567,14 @@ public: throw; } - bool hasWCError = false; - for (const auto& mrResult : mrCommandResults) { - const auto server = [&]() { + string server; + { const auto shard = uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard( opCtx, mrResult.shardTargetId)); - return shard->getConnString().toString(); - }(); - + server = shard->getConnString().toString(); + } singleResult = mrResult.result; if (!hasWCError) { if (auto wcErrorElem = singleResult["writeConcernError"]) { @@ -525,8 +597,7 @@ public: // get the size inserted for each chunk // split cannot be called here since we already have the distributed lock if (singleResult.hasField("chunkSizes")) { - std::vector<BSONElement> sizes = - singleResult.getField("chunkSizes").Array(); + vector<BSONElement> sizes = singleResult.getField("chunkSizes").Array(); for (unsigned int i = 0; i < sizes.size(); i += 2) { BSONObj key = sizes[i].Obj().getOwned(); const long long size = sizes[i + 1].numberLong(); @@ -539,37 +610,34 @@ public: } // Do the splitting round - catalogCache->onStaleConfigError(std::move(outputRoutingInfo)); - outputRoutingInfo = - uassertStatusOK(catalogCache->getCollectionRoutingInfo(opCtx, outputCollNss)); + shared_ptr<ChunkManager> cm = + confOut->getChunkManagerIfExists(opCtx, outputCollNss.ns()); uassert(34359, str::stream() << "Failed to write mapreduce output to " << outputCollNss.ns() << "; expected that collection to be sharded, but it was not", - outputRoutingInfo.cm()); - - const auto outputCM = outputRoutingInfo.cm(); + cm); for (const auto& chunkSize : chunkSizes) { BSONObj key = chunkSize.first; const int size = chunkSize.second; invariant(size < std::numeric_limits<int>::max()); - // Key reported should be the chunk's minimum - auto c = outputCM->findIntersectingChunkWithSimpleCollation(key); + // key reported should be the chunk's minimum + shared_ptr<Chunk> c = cm->findIntersectingChunkWithSimpleCollation(key); if (!c) { warning() << "Mongod reported " << size << " bytes inserted for key " << key << " but can't find chunk"; } else { - updateChunkWriteStatsAndSplitIfNeeded(opCtx, outputCM.get(), c.get(), size); + updateChunkWriteStatsAndSplitIfNeeded(opCtx, cm.get(), c.get(), size); } } } - cleanUp(servers, dbname, shardResultCollection); + _cleanUp(servers, dbname, shardResultCollection); if (!ok) { errmsg = str::stream() << "MR post processing failed: " << singleResult.toString(); - return false; + return 0; } // copy some elements from a single result @@ -604,69 +672,9 @@ public: private: /** - * Creates and shards the collection for the output results. - */ - static CachedCollectionRoutingInfo createShardedOutputCollection(OperationContext* opCtx, - const NamespaceString& nss, - const BSONObjSet& splitPts) { - auto const catalogClient = Grid::get(opCtx)->catalogClient(opCtx); - auto const catalogCache = Grid::get(opCtx)->catalogCache(); - auto const shardRegistry = Grid::get(opCtx)->shardRegistry(); - - // Enable sharding on the output db - Status status = catalogClient->enableSharding(opCtx, nss.db().toString()); - - // If the database has sharding already enabled, we can ignore the error - if (status.isOK()) { - // Invalidate the output database so it gets reloaded on the next fetch attempt - catalogCache->purgeDatabase(nss.db()); - } else if (status != ErrorCodes::AlreadyInitialized) { - uassertStatusOK(status); - } - - // Points will be properly sorted using the set - const std::vector<BSONObj> sortedSplitPts(splitPts.begin(), splitPts.end()); - - // Pre-split the collection onto all the shards for this database. Note that - // it's not completely safe to pre-split onto non-primary shards using the - // shardcollection method (a conflict may result if multiple map-reduces are - // writing to the same output collection, for instance). - // - // TODO: pre-split mapReduce output in a safer way. - - const std::set<ShardId> outShardIds = [&]() { - std::vector<ShardId> shardIds; - shardRegistry->getAllShardIds(&shardIds); - uassert(ErrorCodes::ShardNotFound, - str::stream() << "Unable to find shards on which to place output collection " - << nss.ns(), - !shardIds.empty()); - - return std::set<ShardId>(shardIds.begin(), shardIds.end()); - }(); - - - BSONObj sortKey = BSON("_id" << 1); - ShardKeyPattern sortKeyPattern(sortKey); - - // The collection default collation for the output collection. This is empty, - // representing the simple binary comparison collation. - BSONObj defaultCollation; - - uassertStatusOK(Grid::get(opCtx)->catalogClient(opCtx)->shardCollection( - opCtx, nss.ns(), sortKeyPattern, defaultCollation, true, sortedSplitPts, outShardIds)); - - // Make sure the cached metadata for the collection knows that we are now sharded - catalogCache->invalidateShardedCollection(nss); - return uassertStatusOK(catalogCache->getCollectionRoutingInfo(opCtx, nss)); - } - - /** * Drops the temporary results collections from each shard. */ - static void cleanUp(const std::set<std::string>& servers, - const std::string& dbName, - const std::string& shardResultCollection) { + void _cleanUp(const set<string>& servers, string dbName, string shardResultCollection) { try { // drop collections with tmp results on each shard for (const auto& server : servers) { diff --git a/src/mongo/s/commands/cluster_merge_chunks_cmd.cpp b/src/mongo/s/commands/cluster_merge_chunks_cmd.cpp index 2a85b645df0..6b247823381 100644 --- a/src/mongo/s/commands/cluster_merge_chunks_cmd.cpp +++ b/src/mongo/s/commands/cluster_merge_chunks_cmd.cpp @@ -37,10 +37,12 @@ #include "mongo/db/namespace_string.h" #include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/catalog_cache.h" +#include "mongo/s/chunk_manager.h" #include "mongo/s/client/shard_connection.h" #include "mongo/s/client/shard_registry.h" -#include "mongo/s/commands/cluster_commands_common.h" +#include "mongo/s/config.h" #include "mongo/s/grid.h" +#include "mongo/s/sharding_raii.h" namespace mongo { @@ -58,14 +60,14 @@ class ClusterMergeChunksCommand : public Command { public: ClusterMergeChunksCommand() : Command("mergeChunks") {} - void help(stringstream& h) const override { + virtual void help(stringstream& h) const { h << "Merge Chunks command\n" << "usage: { mergeChunks : <ns>, bounds : [ <min key>, <max key> ] }"; } - Status checkAuthForCommand(Client* client, - const std::string& dbname, - const BSONObj& cmdObj) override { + virtual Status checkAuthForCommand(Client* client, + const std::string& dbname, + const BSONObj& cmdObj) { if (!AuthorizationSession::get(client)->isAuthorizedForActionsOnResource( ResourcePattern::forExactNamespace(NamespaceString(parseNs(dbname, cmdObj))), ActionType::splitChunk)) { @@ -74,19 +76,17 @@ public: return Status::OK(); } - std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const override { + virtual std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const { return parseNsFullyQualified(dbname, cmdObj); } - bool adminOnly() const override { + virtual bool adminOnly() const { return true; } - - bool slaveOk() const override { + virtual bool slaveOk() const { return false; } - - bool supportsWriteConcern(const BSONObj& cmd) const override { + virtual bool supportsWriteConcern(const BSONObj& cmd) const override { return false; } @@ -104,13 +104,10 @@ public: BSONObj& cmdObj, int, string& errmsg, - BSONObjBuilder& result) override { + BSONObjBuilder& result) { const NamespaceString nss(parseNs(dbname, cmdObj)); - auto routingInfo = uassertStatusOK( - Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh(opCtx, - nss)); - const auto cm = routingInfo.cm(); + auto scopedCM = uassertStatusOK(ScopedChunkManager::refreshAndGet(opCtx, nss)); vector<BSONObj> bounds; if (!FieldParser::extract(cmdObj, boundsField, &bounds, &errmsg)) { @@ -140,6 +137,8 @@ public: return false; } + auto const cm = scopedCM.cm(); + if (!cm->getShardKeyPattern().isShardKey(minKey) || !cm->getShardKeyPattern().isShardKey(maxKey)) { errmsg = stream() << "shard key bounds " @@ -180,8 +179,6 @@ public: bool ok = conn->runCommand("admin", remoteCmdObjB.obj(), remoteResult); conn.done(); - Grid::get(opCtx)->catalogCache()->onStaleConfigError(std::move(routingInfo)); - result.appendElements(remoteResult); return ok; } diff --git a/src/mongo/s/commands/cluster_move_chunk_cmd.cpp b/src/mongo/s/commands/cluster_move_chunk_cmd.cpp index e597e80eeaa..c3cb18ceb15 100644 --- a/src/mongo/s/commands/cluster_move_chunk_cmd.cpp +++ b/src/mongo/s/commands/cluster_move_chunk_cmd.cpp @@ -40,11 +40,12 @@ #include "mongo/db/write_concern_options.h" #include "mongo/s/balancer_configuration.h" #include "mongo/s/catalog_cache.h" +#include "mongo/s/client/shard_connection.h" #include "mongo/s/client/shard_registry.h" -#include "mongo/s/commands/cluster_commands_common.h" #include "mongo/s/config_server_client.h" #include "mongo/s/grid.h" #include "mongo/s/migration_secondary_throttle_options.h" +#include "mongo/s/sharding_raii.h" #include "mongo/util/log.h" #include "mongo/util/timer.h" @@ -59,19 +60,19 @@ class MoveChunkCmd : public Command { public: MoveChunkCmd() : Command("moveChunk", false, "movechunk") {} - bool slaveOk() const override { + virtual bool slaveOk() const { return true; } - bool adminOnly() const override { + virtual bool adminOnly() const { return true; } - bool supportsWriteConcern(const BSONObj& cmd) const override { + virtual bool supportsWriteConcern(const BSONObj& cmd) const override { return true; } - void help(std::stringstream& help) const override { + virtual void help(std::stringstream& help) const { help << "Example: move chunk that contains the doc {num : 7} to shard001\n" << " { movechunk : 'test.foo' , find : { num : 7 } , to : 'shard0001' }\n" << "Example: move chunk with lower bound 0 and upper bound 10 to shard001\n" @@ -79,9 +80,9 @@ public: << " , to : 'shard001' }\n"; } - Status checkAuthForCommand(Client* client, - const std::string& dbname, - const BSONObj& cmdObj) override { + virtual Status checkAuthForCommand(Client* client, + const std::string& dbname, + const BSONObj& cmdObj) { if (!AuthorizationSession::get(client)->isAuthorizedForActionsOnResource( ResourcePattern::forExactNamespace(NamespaceString(parseNs(dbname, cmdObj))), ActionType::moveChunk)) { @@ -91,24 +92,21 @@ public: return Status::OK(); } - std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const override { + virtual std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const { return parseNsFullyQualified(dbname, cmdObj); } - bool run(OperationContext* opCtx, - const std::string& dbname, - BSONObj& cmdObj, - int options, - std::string& errmsg, - BSONObjBuilder& result) override { + virtual bool run(OperationContext* opCtx, + const std::string& dbname, + BSONObj& cmdObj, + int options, + std::string& errmsg, + BSONObjBuilder& result) { Timer t; const NamespaceString nss(parseNs(dbname, cmdObj)); - auto routingInfo = uassertStatusOK( - Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh(opCtx, - nss)); - const auto cm = routingInfo.cm(); + auto scopedCM = uassertStatusOK(ScopedChunkManager::refreshAndGet(opCtx, nss)); const auto toElt = cmdObj["to"]; uassert(ErrorCodes::TypeMismatch, @@ -147,6 +145,8 @@ public: return false; } + auto const cm = scopedCM.cm(); + shared_ptr<Chunk> chunk; if (!find.isEmpty()) { @@ -199,7 +199,9 @@ public: secondaryThrottle, cmdObj["_waitForDelete"].trueValue())); - Grid::get(opCtx)->catalogCache()->onStaleConfigError(std::move(routingInfo)); + // Proactively refresh the chunk manager. Not strictly necessary, but this way it's + // immediately up-to-date the next time it's used. + scopedCM.db()->getChunkManagerIfExists(opCtx, nss.ns(), true); result.append("millis", t.millis()); return true; diff --git a/src/mongo/s/commands/cluster_move_primary_cmd.cpp b/src/mongo/s/commands/cluster_move_primary_cmd.cpp index e1746bebd72..dc192f1a6a6 100644 --- a/src/mongo/s/commands/cluster_move_primary_cmd.cpp +++ b/src/mongo/s/commands/cluster_move_primary_cmd.cpp @@ -49,6 +49,7 @@ #include "mongo/s/client/shard_registry.h" #include "mongo/s/commands/cluster_commands_common.h" #include "mongo/s/commands/sharded_command_processing.h" +#include "mongo/s/config.h" #include "mongo/s/grid.h" #include "mongo/util/log.h" @@ -124,9 +125,9 @@ public: auto const shardRegistry = Grid::get(opCtx)->shardRegistry(); // Flush all cached information. This can't be perfect, but it's better than nothing. - catalogCache->purgeDatabase(dbname); + catalogCache->invalidate(dbname); - auto dbInfo = uassertStatusOK(catalogCache->getDatabase(opCtx, dbname)); + auto config = uassertStatusOK(catalogCache->getDatabase(opCtx, dbname)); const auto toElt = cmdObj["to"]; uassert(ErrorCodes::TypeMismatch, @@ -138,7 +139,8 @@ public: return false; } - const auto fromShard = uassertStatusOK(shardRegistry->getShard(opCtx, dbInfo.primaryId())); + const auto fromShard = + uassertStatusOK(shardRegistry->getShard(opCtx, config->getPrimaryId())); const auto toShard = [&]() { auto toShardStatus = shardRegistry->getShard(opCtx, to); @@ -221,7 +223,7 @@ public: // Ensure the next attempt to retrieve the database or any of its collections will do a full // reload - catalogCache->purgeDatabase(dbname); + catalogCache->invalidate(dbname); const string oldPrimary = fromShard->getConnString().toString(); diff --git a/src/mongo/s/commands/cluster_plan_cache_cmd.cpp b/src/mongo/s/commands/cluster_plan_cache_cmd.cpp index 65f149927d0..b29eff8b0b2 100644 --- a/src/mongo/s/commands/cluster_plan_cache_cmd.cpp +++ b/src/mongo/s/commands/cluster_plan_cache_cmd.cpp @@ -33,11 +33,11 @@ #include "mongo/db/commands.h" #include "mongo/db/query/collation/collation_spec.h" #include "mongo/s/commands/strategy.h" +#include "mongo/s/config.h" #include "mongo/s/grid.h" #include "mongo/s/stale_exception.h" namespace mongo { -namespace { using std::string; using std::stringstream; @@ -153,6 +153,8 @@ bool ClusterPlanCacheCmd::run(OperationContext* opCtx, // Register plan cache commands at startup // +namespace { + MONGO_INITIALIZER(RegisterPlanCacheCommands)(InitializerContext* context) { // Leaked intentionally: a Command registers itself when constructed. @@ -172,4 +174,5 @@ MONGO_INITIALIZER(RegisterPlanCacheCommands)(InitializerContext* context) { } } // namespace + } // namespace mongo diff --git a/src/mongo/s/commands/cluster_shard_collection_cmd.cpp b/src/mongo/s/commands/cluster_shard_collection_cmd.cpp index a1d99a608e6..7692e764e02 100644 --- a/src/mongo/s/commands/cluster_shard_collection_cmd.cpp +++ b/src/mongo/s/commands/cluster_shard_collection_cmd.cpp @@ -53,12 +53,15 @@ #include "mongo/s/balancer_configuration.h" #include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/catalog_cache.h" +#include "mongo/s/chunk_manager.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/commands/cluster_write.h" +#include "mongo/s/config.h" #include "mongo/s/config_server_client.h" #include "mongo/s/grid.h" #include "mongo/s/migration_secondary_throttle_options.h" #include "mongo/s/shard_util.h" +#include "mongo/s/sharding_raii.h" #include "mongo/util/log.h" namespace mongo { @@ -184,21 +187,19 @@ public: auto const catalogClient = Grid::get(opCtx)->catalogClient(opCtx); auto const shardRegistry = Grid::get(opCtx)->shardRegistry(); - auto const catalogCache = Grid::get(opCtx)->catalogCache(); - auto dbInfo = uassertStatusOK(catalogCache->getDatabase(opCtx, nss.db())); + auto scopedShardedDb = uassertStatusOK(ScopedShardDatabase::getExisting(opCtx, nss.db())); + const auto config = scopedShardedDb.db(); // Ensure sharding is allowed on the database uassert(ErrorCodes::IllegalOperation, str::stream() << "sharding not enabled for db " << nss.db(), - dbInfo.shardingEnabled()); - - auto routingInfo = uassertStatusOK(catalogCache->getCollectionRoutingInfo(opCtx, nss)); + config->isShardingEnabled()); // Ensure that the collection is not sharded already uassert(ErrorCodes::IllegalOperation, str::stream() << "sharding already enabled for collection " << nss.ns(), - !routingInfo.cm()); + !config->isSharded(nss.ns())); // NOTE: We *must* take ownership of the key here - otherwise the shared BSONObj becomes // corrupt as soon as the command ends. @@ -278,7 +279,13 @@ public: } // The rest of the checks require a connection to the primary db - ScopedDbConnection conn(routingInfo.primary()->getConnString()); + const ConnectionString shardConnString = [&]() { + const auto shard = + uassertStatusOK(shardRegistry->getShard(opCtx, config->getPrimaryId())); + return shard->getConnString(); + }(); + + ScopedDbConnection conn(shardConnString); // Retrieve the collection metadata in order to verify that it is legal to shard this // collection. @@ -583,21 +590,17 @@ public: initSplits, std::set<ShardId>{})); - result << "collectionsharded" << nss.ns(); - // Make sure the cached metadata for the collection knows that we are now sharded - catalogCache->invalidateShardedCollection(nss); + config->getChunkManager(opCtx, nss.ns(), true /* reload */); + + result << "collectionsharded" << nss.ns(); // Only initially move chunks when using a hashed shard key if (isHashedShardKey && isEmpty) { - routingInfo = uassertStatusOK(catalogCache->getCollectionRoutingInfo(opCtx, nss)); - uassert(ErrorCodes::ConflictingOperationInProgress, - "Collection was successfully written as sharded but got dropped before it " - "could be evenly distributed", - routingInfo.cm()); - auto chunkManager = routingInfo.cm(); - - const auto chunkMap = chunkManager->chunkMap(); + // Reload the new config info. If we created more than one initial chunk, then + // we need to move them around to balance. + auto chunkManager = config->getChunkManager(opCtx, nss.ns(), true); + ChunkMap chunkMap = chunkManager->getChunkMap(); // 2. Move and commit each "big chunk" to a different shard. int i = 0; @@ -643,13 +646,7 @@ public: } // Reload the config info, after all the migrations - catalogCache->invalidateShardedCollection(nss); - routingInfo = uassertStatusOK(catalogCache->getCollectionRoutingInfo(opCtx, nss)); - uassert(ErrorCodes::ConflictingOperationInProgress, - "Collection was successfully written as sharded but got dropped before it " - "could be evenly distributed", - routingInfo.cm()); - chunkManager = routingInfo.cm(); + chunkManager = config->getChunkManager(opCtx, nss.ns(), true); // 3. Subdivide the big chunks by splitting at each of the points in "allSplits" // that we haven't already split by. @@ -692,6 +689,10 @@ public: subSplits.push_back(splitPoint); } } + + // Proactively refresh the chunk manager. Not really necessary, but this way it's + // immediately up-to-date the next time it's used. + config->getChunkManager(opCtx, nss.ns(), true); } return true; diff --git a/src/mongo/s/commands/cluster_split_cmd.cpp b/src/mongo/s/commands/cluster_split_cmd.cpp index 80ed1526663..b63da3b2ee7 100644 --- a/src/mongo/s/commands/cluster_split_cmd.cpp +++ b/src/mongo/s/commands/cluster_split_cmd.cpp @@ -37,13 +37,15 @@ #include "mongo/db/auth/action_type.h" #include "mongo/db/auth/authorization_manager.h" #include "mongo/db/auth/authorization_session.h" +#include "mongo/db/client.h" #include "mongo/db/commands.h" #include "mongo/db/field_parser.h" #include "mongo/s/catalog_cache.h" +#include "mongo/s/chunk_manager.h" #include "mongo/s/client/shard_registry.h" -#include "mongo/s/commands/cluster_commands_common.h" #include "mongo/s/grid.h" #include "mongo/s/shard_util.h" +#include "mongo/s/sharding_raii.h" #include "mongo/util/log.h" namespace mongo { @@ -88,19 +90,20 @@ class SplitCollectionCmd : public Command { public: SplitCollectionCmd() : Command("split", false, "split") {} - bool slaveOk() const override { + virtual bool slaveOk() const { return true; } - bool adminOnly() const override { + virtual bool adminOnly() const { return true; } - bool supportsWriteConcern(const BSONObj& cmd) const override { + + virtual bool supportsWriteConcern(const BSONObj& cmd) const override { return false; } - void help(std::stringstream& help) const override { + virtual void help(std::stringstream& help) const { help << " example: - split the shard that contains give key\n" << " { split : 'alleyinsider.blog.posts' , find : { ts : 1 } }\n" << " example: - split the shard that contains the key with this as the middle\n" @@ -108,9 +111,9 @@ public: << " NOTE: this does not move the chunks, it just creates a logical separation."; } - Status checkAuthForCommand(Client* client, - const std::string& dbname, - const BSONObj& cmdObj) override { + virtual Status checkAuthForCommand(Client* client, + const std::string& dbname, + const BSONObj& cmdObj) { if (!AuthorizationSession::get(client)->isAuthorizedForActionsOnResource( ResourcePattern::forExactNamespace(NamespaceString(parseNs(dbname, cmdObj))), ActionType::splitChunk)) { @@ -119,22 +122,19 @@ public: return Status::OK(); } - std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const override { + virtual std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const { return parseNsFullyQualified(dbname, cmdObj); } - bool run(OperationContext* opCtx, - const std::string& dbname, - BSONObj& cmdObj, - int options, - std::string& errmsg, - BSONObjBuilder& result) override { + virtual bool run(OperationContext* opCtx, + const std::string& dbname, + BSONObj& cmdObj, + int options, + std::string& errmsg, + BSONObjBuilder& result) { const NamespaceString nss(parseNs(dbname, cmdObj)); - auto routingInfo = uassertStatusOK( - Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh(opCtx, - nss)); - const auto cm = routingInfo.cm(); + auto scopedCM = uassertStatusOK(ScopedChunkManager::refreshAndGet(opCtx, nss)); const BSONField<BSONObj> findField("find", BSONObj()); const BSONField<BSONArray> boundsField("bounds", BSONArray()); @@ -190,6 +190,8 @@ public: return false; } + auto const cm = scopedCM.cm(); + std::shared_ptr<Chunk> chunk; if (!find.isEmpty()) { @@ -273,7 +275,9 @@ public: ChunkRange(chunk->getMin(), chunk->getMax()), {splitPoint})); - Grid::get(opCtx)->catalogCache()->onStaleConfigError(std::move(routingInfo)); + // Proactively refresh the chunk manager. Not strictly necessary, but this way it's + // immediately up-to-date the next time it's used. + scopedCM.db()->getChunkManagerIfExists(opCtx, nss.ns(), true); return true; } diff --git a/src/mongo/s/commands/cluster_write.cpp b/src/mongo/s/commands/cluster_write.cpp index 8b8a3f2e644..730d5e8a178 100644 --- a/src/mongo/s/commands/cluster_write.cpp +++ b/src/mongo/s/commands/cluster_write.cpp @@ -39,13 +39,14 @@ #include "mongo/s/balancer_configuration.h" #include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/catalog/type_collection.h" -#include "mongo/s/catalog_cache.h" +#include "mongo/s/chunk.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/commands/chunk_manager_targeter.h" #include "mongo/s/commands/dbclient_multi_command.h" #include "mongo/s/config_server_client.h" #include "mongo/s/grid.h" #include "mongo/s/shard_util.h" +#include "mongo/s/sharding_raii.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" @@ -65,6 +66,11 @@ void toBatchError(const Status& status, BatchedCommandResponse* response) { dassert(response->isValid(NULL)); } +void reloadChunkManager(OperationContext* opCtx, const NamespaceString& nss) { + auto config = uassertStatusOK(ScopedShardDatabase::getExisting(opCtx, nss.db())); + config.db()->getChunkManagerIfExists(opCtx, nss.ns(), true); +} + /** * Given a maxChunkSize configuration and the number of chunks in a particular sharded collection, * returns an optimal chunk size to use in order to achieve a good ratio between number of chunks @@ -170,31 +176,30 @@ BSONObj findExtremeKeyForShard(OperationContext* opCtx, void splitIfNeeded(OperationContext* opCtx, const NamespaceString& nss, const TargeterStats& stats) { - auto routingInfoStatus = Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss); - if (!routingInfoStatus.isOK()) { - log() << "failed to get collection information for " << nss - << " while checking for auto-split" << causedBy(routingInfoStatus.getStatus()); + auto scopedCMStatus = ScopedChunkManager::get(opCtx, nss); + if (!scopedCMStatus.isOK()) { + warning() << "failed to get collection information for " << nss + << " while checking for auto-split" << causedBy(scopedCMStatus.getStatus()); return; } - auto& routingInfo = routingInfoStatus.getValue(); + const auto& scopedCM = scopedCMStatus.getValue(); - if (!routingInfo.cm()) { + if (!scopedCM.cm()) { return; } for (auto it = stats.chunkSizeDelta.cbegin(); it != stats.chunkSizeDelta.cend(); ++it) { std::shared_ptr<Chunk> chunk; try { - chunk = routingInfo.cm()->findIntersectingChunkWithSimpleCollation(it->first); + chunk = scopedCM.cm()->findIntersectingChunkWithSimpleCollation(it->first); } catch (const AssertionException& ex) { warning() << "could not find chunk while checking for auto-split: " << causedBy(redact(ex)); return; } - updateChunkWriteStatsAndSplitIfNeeded( - opCtx, routingInfo.cm().get(), chunk.get(), it->second); + updateChunkWriteStatsAndSplitIfNeeded(opCtx, scopedCM.cm().get(), chunk.get(), it->second); } } @@ -467,22 +472,21 @@ void updateChunkWriteStatsAndSplitIfNeeded(OperationContext* opCtx, << (suggestedMigrateChunk ? "" : (std::string) " (migrate suggested" + (shouldBalance ? ")" : ", but no migrations allowed)")); - // Reload the chunk manager after the split - auto routingInfo = uassertStatusOK( - Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh(opCtx, - nss)); - if (!shouldBalance || !suggestedMigrateChunk) { + reloadChunkManager(opCtx, nss); return; } // Top chunk optimization - try to move the top chunk out of this shard to prevent the hot - // spot from staying on a single shard. This is based on the assumption that succeeding - // inserts will fall on the top chunk. + // spot + // from staying on a single shard. This is based on the assumption that succeeding inserts + // will + // fall on the top chunk. // We need to use the latest chunk manager (after the split) in order to have the most // up-to-date view of the chunk we are about to move - auto suggestedChunk = routingInfo.cm()->findIntersectingChunkWithSimpleCollation( + auto scopedCM = uassertStatusOK(ScopedChunkManager::refreshAndGet(opCtx, nss)); + auto suggestedChunk = scopedCM.cm()->findIntersectingChunkWithSimpleCollation( suggestedMigrateChunk->getMin()); ChunkType chunkToMove; @@ -494,8 +498,7 @@ void updateChunkWriteStatsAndSplitIfNeeded(OperationContext* opCtx, uassertStatusOK(configsvr_client::rebalanceChunk(opCtx, chunkToMove)); - // Ensure the collection gets reloaded because of the move - Grid::get(opCtx)->catalogCache()->invalidateShardedCollection(nss); + reloadChunkManager(opCtx, nss); } catch (const DBException& ex) { chunk->randomizeBytesWritten(); diff --git a/src/mongo/s/commands/commands_public.cpp b/src/mongo/s/commands/commands_public.cpp index e219d9396f9..7d14499e9d8 100644 --- a/src/mongo/s/commands/commands_public.cpp +++ b/src/mongo/s/commands/commands_public.cpp @@ -54,14 +54,17 @@ #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/catalog_cache.h" +#include "mongo/s/chunk_manager.h" #include "mongo/s/client/shard_connection.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/commands/cluster_commands_common.h" #include "mongo/s/commands/cluster_explain.h" #include "mongo/s/commands/run_on_all_shards_cmd.h" #include "mongo/s/commands/sharded_command_processing.h" +#include "mongo/s/config.h" #include "mongo/s/grid.h" #include "mongo/s/query/store_possible_cursor.h" +#include "mongo/s/sharding_raii.h" #include "mongo/s/stale_exception.h" #include "mongo/scripting/engine.h" #include "mongo/util/log.h" @@ -83,19 +86,20 @@ using std::vector; namespace { bool cursorCommandPassthrough(OperationContext* opCtx, - StringData dbName, - const ShardId& shardId, + shared_ptr<DBConfig> conf, const BSONObj& cmdObj, const NamespaceString& nss, int options, BSONObjBuilder* out) { - const auto shardStatus = Grid::get(opCtx)->shardRegistry()->getShard(opCtx, shardId); + const auto shardStatus = + Grid::get(opCtx)->shardRegistry()->getShard(opCtx, conf->getPrimaryId()); if (!shardStatus.isOK()) { + invariant(shardStatus.getStatus() == ErrorCodes::ShardNotFound); return Command::appendCommandStatus(*out, shardStatus.getStatus()); } const auto shard = shardStatus.getValue(); ScopedDbConnection conn(shard->getConnString()); - auto cursor = conn->query(str::stream() << dbName << ".$cmd", + auto cursor = conn->query(str::stream() << conf->name() << ".$cmd", cmdObj, -1, // nToReturn 0, // nToSkip @@ -151,13 +155,11 @@ StatusWith<BSONObj> getCollation(const BSONObj& cmdObj) { } class PublicGridCommand : public Command { -protected: +public: PublicGridCommand(const char* n, const char* oldname = NULL) : Command(n, false, oldname) {} - virtual bool slaveOk() const { return true; } - virtual bool adminOnly() const { return false; } @@ -168,29 +170,41 @@ protected: return false; } - bool adminPassthrough(OperationContext* opCtx, - const ShardId& shardId, - const BSONObj& cmdObj, - BSONObjBuilder& result) { - return passthrough(opCtx, "admin", shardId, cmdObj, result); - } + // all grid commands are designed not to lock +protected: bool passthrough(OperationContext* opCtx, - const std::string& db, - const ShardId& shardId, + DBConfig* conf, const BSONObj& cmdObj, BSONObjBuilder& result) { - return passthrough(opCtx, db, shardId, cmdObj, 0, result); + return _passthrough(opCtx, conf->name(), conf, cmdObj, 0, result); + } + + bool adminPassthrough(OperationContext* opCtx, + DBConfig* conf, + const BSONObj& cmdObj, + BSONObjBuilder& result) { + return _passthrough(opCtx, "admin", conf, cmdObj, 0, result); } bool passthrough(OperationContext* opCtx, - const std::string& db, - const ShardId& shardId, + DBConfig* conf, const BSONObj& cmdObj, int options, BSONObjBuilder& result) { - const auto shard = - uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, shardId)); + return _passthrough(opCtx, conf->name(), conf, cmdObj, options, result); + } + +private: + bool _passthrough(OperationContext* opCtx, + const string& db, + DBConfig* conf, + const BSONObj& cmdObj, + int options, + BSONObjBuilder& result) { + const auto shardStatus = + Grid::get(opCtx)->shardRegistry()->getShard(opCtx, conf->getPrimaryId()); + const auto shard = uassertStatusOK(shardStatus); ShardConnection conn(shard->getConnString(), ""); @@ -209,50 +223,53 @@ protected: }; class AllShardsCollectionCommand : public RunOnAllShardsCommand { -protected: +public: AllShardsCollectionCommand(const char* n, const char* oldname = NULL, bool useShardConn = false, bool implicitCreateDb = false) : RunOnAllShardsCommand(n, oldname, useShardConn, implicitCreateDb) {} - void getShardIds(OperationContext* opCtx, - const string& dbName, - BSONObj& cmdObj, - vector<ShardId>& shardIds) override { + virtual void getShardIds(OperationContext* opCtx, + const string& dbName, + BSONObj& cmdObj, + vector<ShardId>& shardIds) { const NamespaceString nss(parseNsCollectionRequired(dbName, cmdObj)); - const auto routingInfo = - uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss)); - if (routingInfo.cm()) { - // If it's a sharded collection, send it to all shards - Grid::get(opCtx)->shardRegistry()->getAllShardIds(&shardIds); + auto status = Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, dbName); + uassertStatusOK(status.getStatus()); + + shared_ptr<DBConfig> conf = status.getValue(); + + if (!conf->isSharded(nss.ns())) { + shardIds.push_back(conf->getPrimaryId()); } else { - // Otherwise just send it to the primary shard for the database - shardIds.push_back(routingInfo.primaryId()); + Grid::get(opCtx)->shardRegistry()->getAllShardIds(&shardIds); } } }; class NotAllowedOnShardedCollectionCmd : public PublicGridCommand { -protected: +public: NotAllowedOnShardedCollectionCmd(const char* n) : PublicGridCommand(n) {} - bool run(OperationContext* opCtx, - const string& dbName, - BSONObj& cmdObj, - int options, - string& errmsg, - BSONObjBuilder& result) override { + virtual bool run(OperationContext* opCtx, + const string& dbName, + BSONObj& cmdObj, + int options, + string& errmsg, + BSONObjBuilder& result) { const NamespaceString nss(parseNs(dbName, cmdObj)); - const auto routingInfo = - uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss)); - uassert(ErrorCodes::IllegalOperation, - str::stream() << "can't do command: " << getName() << " on sharded collection", - !routingInfo.cm()); + auto conf = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, dbName)); + if (!conf->isSharded(nss.ns())) { + return passthrough(opCtx, conf.get(), cmdObj, options, result); + } - return passthrough(opCtx, dbName, routingInfo.primaryId(), cmdObj, options, result); + return appendCommandStatus( + result, + Status(ErrorCodes::IllegalOperation, + str::stream() << "can't do command: " << getName() << " on sharded collection")); } }; @@ -390,7 +407,6 @@ public: class ReIndexCmd : public AllShardsCollectionCommand { public: ReIndexCmd() : AllShardsCollectionCommand("reIndex") {} - virtual void addRequiredPrivileges(const std::string& dbname, const BSONObj& cmdObj, std::vector<Privilege>* out) { @@ -402,7 +418,6 @@ public: virtual bool supportsWriteConcern(const BSONObj& cmd) const override { return false; } - } reIndexCmd; class CollectionModCmd : public AllShardsCollectionCommand { @@ -419,13 +434,12 @@ public: virtual bool supportsWriteConcern(const BSONObj& cmd) const override { return true; } - } collectionModCmd; + class ValidateCmd : public PublicGridCommand { public: ValidateCmd() : PublicGridCommand("validate") {} - virtual void addRequiredPrivileges(const std::string& dbname, const BSONObj& cmdObj, std::vector<Privilege>* out) { @@ -446,13 +460,13 @@ public: BSONObjBuilder& output) { const NamespaceString nss(parseNsCollectionRequired(dbName, cmdObj)); - auto routingInfo = - uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss)); - if (!routingInfo.cm()) { - return passthrough(opCtx, dbName, routingInfo.primaryId(), cmdObj, output); + auto conf = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, dbName)); + if (!conf->isSharded(nss.ns())) { + return passthrough(opCtx, conf.get(), cmdObj, output); } - const auto cm = routingInfo.cm(); + shared_ptr<ChunkManager> cm = conf->getChunkManager(opCtx, nss.ns()); + massert(40051, "chunk manager should not be null", cm); vector<Strategy::CommandResult> results; const BSONObj query; @@ -498,35 +512,33 @@ public: } return true; } - } validateCmd; class CreateCmd : public PublicGridCommand { public: CreateCmd() : PublicGridCommand("create") {} - - Status checkAuthForCommand(Client* client, - const std::string& dbname, - const BSONObj& cmdObj) override { + virtual Status checkAuthForCommand(Client* client, + const std::string& dbname, + const BSONObj& cmdObj) { const NamespaceString nss(parseNsCollectionRequired(dbname, cmdObj)); return AuthorizationSession::get(client)->checkAuthForCreate(nss, cmdObj); } - - bool supportsWriteConcern(const BSONObj& cmd) const override { + virtual bool supportsWriteConcern(const BSONObj& cmd) const override { return true; } - bool run(OperationContext* opCtx, const string& dbName, BSONObj& cmdObj, int, string& errmsg, - BSONObjBuilder& result) override { - uassertStatusOK(createShardDatabase(opCtx, dbName)); + BSONObjBuilder& result) { + auto dbStatus = ScopedShardDatabase::getOrCreate(opCtx, dbName); + if (!dbStatus.isOK()) { + return appendCommandStatus(result, dbStatus.getStatus()); + } - const auto dbInfo = - uassertStatusOK(Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, dbName)); - return passthrough(opCtx, dbName, dbInfo.primaryId(), cmdObj, result); + auto scopedDb = std::move(dbStatus.getValue()); + return passthrough(opCtx, scopedDb.db(), cmdObj, result); } } createCmd; @@ -534,27 +546,23 @@ public: class RenameCollectionCmd : public PublicGridCommand { public: RenameCollectionCmd() : PublicGridCommand("renameCollection") {} - virtual Status checkAuthForCommand(Client* client, const std::string& dbname, const BSONObj& cmdObj) { return rename_collection::checkAuthForRenameCollectionCommand(client, dbname, cmdObj); } - virtual bool adminOnly() const { return true; } - virtual bool supportsWriteConcern(const BSONObj& cmd) const override { return true; } - bool run(OperationContext* opCtx, const string& dbName, BSONObj& cmdObj, - int options, + int, string& errmsg, - BSONObjBuilder& result) override { + BSONObjBuilder& result) { const auto fullNsFromElt = cmdObj.firstElement(); uassert(ErrorCodes::InvalidNamespace, "'renameCollection' must be of type String", @@ -563,6 +571,10 @@ public: uassert(ErrorCodes::InvalidNamespace, str::stream() << "Invalid source namespace: " << fullnsFrom.ns(), fullnsFrom.isValid()); + const string dbNameFrom = fullnsFrom.db().toString(); + + auto confFrom = + uassertStatusOK(Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, dbNameFrom)); const auto fullnsToElt = cmdObj["to"]; uassert(ErrorCodes::InvalidNamespace, @@ -572,22 +584,24 @@ public: uassert(ErrorCodes::InvalidNamespace, str::stream() << "Invalid target namespace: " << fullnsTo.ns(), fullnsTo.isValid()); + const string dbNameTo = fullnsTo.db().toString(); + auto confTo = + uassertStatusOK(Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, dbNameTo)); - const auto fromRoutingInfo = uassertStatusOK( - Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, fullnsFrom)); - uassert(13138, "You can't rename a sharded collection", !fromRoutingInfo.cm()); + uassert( + 13138, "You can't rename a sharded collection", !confFrom->isSharded(fullnsFrom.ns())); + uassert( + 13139, "You can't rename to a sharded collection", !confTo->isSharded(fullnsTo.ns())); - const auto toRoutingInfo = uassertStatusOK( - Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, fullnsTo)); - uassert(13139, "You can't rename to a sharded collection", !toRoutingInfo.cm()); + auto shardTo = confTo->getPrimaryId(); + auto shardFrom = confFrom->getPrimaryId(); uassert(13137, "Source and destination collections must be on same shard", - fromRoutingInfo.primaryId() == toRoutingInfo.primaryId()); + shardFrom == shardTo); - return adminPassthrough(opCtx, fromRoutingInfo.primaryId(), cmdObj, result); + return adminPassthrough(opCtx, confFrom.get(), cmdObj, result); } - } renameCollectionCmd; class CopyDBCmd : public PublicGridCommand { @@ -623,14 +637,14 @@ public: "Invalid todb argument", NamespaceString::validDBName(todb, NamespaceString::DollarInDbNameBehavior::Allow)); - auto toDbInfo = uassertStatusOK(createShardDatabase(opCtx, todb)); + auto scopedToDb = uassertStatusOK(ScopedShardDatabase::getOrCreate(opCtx, todb)); uassert(ErrorCodes::IllegalOperation, "Cannot copy to a sharded database", - !toDbInfo.shardingEnabled()); + !scopedToDb.db()->isShardingEnabled()); - const std::string fromhost = cmdObj.getStringField("fromhost"); + const string fromhost = cmdObj.getStringField("fromhost"); if (!fromhost.empty()) { - return adminPassthrough(opCtx, toDbInfo.primaryId(), cmdObj, result); + return adminPassthrough(opCtx, scopedToDb.db(), cmdObj, result); } const auto fromDbElt = cmdObj["fromdb"]; @@ -643,10 +657,10 @@ public: "invalid fromdb argument", NamespaceString::validDBName(fromdb, NamespaceString::DollarInDbNameBehavior::Allow)); - auto fromDbInfo = uassertStatusOK(createShardDatabase(opCtx, fromdb)); + auto scopedFromDb = uassertStatusOK(ScopedShardDatabase::getExisting(opCtx, fromdb)); uassert(ErrorCodes::IllegalOperation, "Cannot copy from a sharded database", - !fromDbInfo.shardingEnabled()); + !scopedFromDb.db()->isShardingEnabled()); BSONObjBuilder b; BSONForEach(e, cmdObj) { @@ -656,12 +670,12 @@ public: } { - const auto shard = uassertStatusOK( - Grid::get(opCtx)->shardRegistry()->getShard(opCtx, fromDbInfo.primaryId())); + const auto shard = uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard( + opCtx, scopedFromDb.db()->getPrimaryId())); b.append("fromhost", shard->getConnString().toString()); } - return adminPassthrough(opCtx, toDbInfo.primaryId(), b.obj(), result); + return adminPassthrough(opCtx, scopedToDb.db(), b.obj(), result); } } clusterCopyDBCmd; @@ -669,39 +683,39 @@ public: class CollectionStats : public PublicGridCommand { public: CollectionStats() : PublicGridCommand("collStats", "collstats") {} - - void addRequiredPrivileges(const std::string& dbname, - const BSONObj& cmdObj, - std::vector<Privilege>* out) override { + virtual void addRequiredPrivileges(const std::string& dbname, + const BSONObj& cmdObj, + std::vector<Privilege>* out) { ActionSet actions; actions.addAction(ActionType::collStats); out->push_back(Privilege(parseResourcePattern(dbname, cmdObj), actions)); } - bool supportsWriteConcern(const BSONObj& cmd) const override { + virtual bool supportsWriteConcern(const BSONObj& cmd) const override { return false; } bool run(OperationContext* opCtx, const string& dbName, BSONObj& cmdObj, - int options, + int, string& errmsg, - BSONObjBuilder& result) override { + BSONObjBuilder& result) { const NamespaceString nss(parseNsCollectionRequired(dbName, cmdObj)); - auto routingInfo = - uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss)); - if (!routingInfo.cm()) { + auto conf = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, dbName)); + if (!conf->isSharded(nss.ns())) { result.appendBool("sharded", false); - result.append("primary", routingInfo.primaryId().toString()); - return passthrough(opCtx, dbName, routingInfo.primaryId(), cmdObj, result); - } + result.append("primary", conf->getPrimaryId().toString()); - const auto cm = routingInfo.cm(); + return passthrough(opCtx, conf.get(), cmdObj, result); + } result.appendBool("sharded", true); + shared_ptr<ChunkManager> cm = conf->getChunkManager(opCtx, nss.ns()); + massert(12594, "how could chunk manager be null!", cm); + BSONObjBuilder shardStats; map<string, long long> counts; map<string, long long> indexSizes; @@ -846,38 +860,35 @@ public: class DataSizeCmd : public PublicGridCommand { public: DataSizeCmd() : PublicGridCommand("dataSize", "datasize") {} - - std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const override { + virtual string parseNs(const string& dbname, const BSONObj& cmdObj) const override { return parseNsFullyQualified(dbname, cmdObj); } - - void addRequiredPrivileges(const std::string& dbname, - const BSONObj& cmdObj, - std::vector<Privilege>* out) override { + virtual void addRequiredPrivileges(const std::string& dbname, + const BSONObj& cmdObj, + std::vector<Privilege>* out) { ActionSet actions; actions.addAction(ActionType::find); out->push_back(Privilege(parseResourcePattern(dbname, cmdObj), actions)); } - - bool supportsWriteConcern(const BSONObj& cmd) const override { + virtual bool supportsWriteConcern(const BSONObj& cmd) const override { return false; } - bool run(OperationContext* opCtx, const string& dbName, BSONObj& cmdObj, - int options, + int, string& errmsg, - BSONObjBuilder& result) override { - const NamespaceString nss(parseNs(dbName, cmdObj)); + BSONObjBuilder& result) { + const string fullns = parseNs(dbName, cmdObj); + const string nsDBName = nsToDatabase(fullns); - auto routingInfo = - uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss)); - if (!routingInfo.cm()) { - return passthrough(opCtx, dbName, routingInfo.primaryId(), cmdObj, result); + auto conf = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, nsDBName)); + if (!conf->isSharded(fullns)) { + return passthrough(opCtx, conf.get(), cmdObj, result); } - const auto cm = routingInfo.cm(); + shared_ptr<ChunkManager> cm = conf->getChunkManager(opCtx, fullns); + massert(13407, "how could chunk manager be null!", cm); BSONObj min = cmdObj.getObjectField("min"); BSONObj max = cmdObj.getObjectField("max"); @@ -908,12 +919,13 @@ public: for (const ShardId& shardId : shardIds) { const auto shardStatus = Grid::get(opCtx)->shardRegistry()->getShard(opCtx, shardId); if (!shardStatus.isOK()) { + invariant(shardStatus.getStatus() == ErrorCodes::ShardNotFound); continue; } ScopedDbConnection conn(shardStatus.getValue()->getConnString()); BSONObj res; - bool ok = conn->runCommand(dbName, cmdObj, res); + bool ok = conn->runCommand(conf->name(), cmdObj, res); conn.done(); if (!ok) { @@ -937,20 +949,19 @@ public: class ConvertToCappedCmd : public NotAllowedOnShardedCollectionCmd { public: ConvertToCappedCmd() : NotAllowedOnShardedCollectionCmd("convertToCapped") {} - - void addRequiredPrivileges(const std::string& dbname, - const BSONObj& cmdObj, - std::vector<Privilege>* out) override { + virtual void addRequiredPrivileges(const std::string& dbname, + const BSONObj& cmdObj, + std::vector<Privilege>* out) { ActionSet actions; actions.addAction(ActionType::convertToCapped); out->push_back(Privilege(parseResourcePattern(dbname, cmdObj), actions)); } - bool supportsWriteConcern(const BSONObj& cmd) const override { + virtual bool supportsWriteConcern(const BSONObj& cmd) const override { return true; } - std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const override { + virtual std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const { return parseNsCollectionRequired(dbname, cmdObj).ns(); } @@ -959,24 +970,23 @@ public: class GroupCmd : public NotAllowedOnShardedCollectionCmd { public: GroupCmd() : NotAllowedOnShardedCollectionCmd("group") {} - - void addRequiredPrivileges(const std::string& dbname, - const BSONObj& cmdObj, - std::vector<Privilege>* out) override { + virtual void addRequiredPrivileges(const std::string& dbname, + const BSONObj& cmdObj, + std::vector<Privilege>* out) { ActionSet actions; actions.addAction(ActionType::find); out->push_back(Privilege(parseResourcePattern(dbname, cmdObj), actions)); } - bool supportsWriteConcern(const BSONObj& cmd) const override { + virtual bool supportsWriteConcern(const BSONObj& cmd) const override { return false; } - bool passOptions() const override { + virtual bool passOptions() const { return true; } - std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const override { + virtual std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const { const auto nsElt = cmdObj.firstElement().embeddedObjectUserCheck()["ns"]; uassert(ErrorCodes::InvalidNamespace, "'ns' must be of type String", @@ -993,7 +1003,7 @@ public: const BSONObj& cmdObj, ExplainCommon::Verbosity verbosity, const rpc::ServerSelectionMetadata& serverSelectionMetadata, - BSONObjBuilder* out) const override { + BSONObjBuilder* out) const { // We will time how long it takes to run the commands on the shards. Timer timer; @@ -1009,17 +1019,36 @@ public: const NamespaceString nss(parseNs(dbname, cmdObj)); - auto routingInfo = - uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss)); - uassert(ErrorCodes::IllegalOperation, - str::stream() << "Passthrough command failed: " << command.toString() << " on ns " - << nss.ns() - << ". Cannot run on sharded namespace.", - !routingInfo.cm()); + // Note that this implementation will not handle targeting retries and fails when the + // sharding metadata is too stale + auto status = Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, nss.db()); + if (!status.isOK()) { + return Status(status.getStatus().code(), + str::stream() << "Passthrough command failed: " << command.toString() + << " on ns " + << nss.ns() + << ". Caused by " + << causedBy(status.getStatus())); + } + + shared_ptr<DBConfig> conf = status.getValue(); + if (conf->isSharded(nss.ns())) { + return Status(ErrorCodes::IllegalOperation, + str::stream() << "Passthrough command failed: " << command.toString() + << " on ns " + << nss.ns() + << ". Cannot run on sharded namespace."); + } + + const auto primaryShardStatus = + Grid::get(opCtx)->shardRegistry()->getShard(opCtx, conf->getPrimaryId()); + if (!primaryShardStatus.isOK()) { + return primaryShardStatus.getStatus(); + } BSONObj shardResult; try { - ShardConnection conn(routingInfo.primary()->getConnString(), ""); + ShardConnection conn(primaryShardStatus.getValue()->getConnString(), ""); // TODO: this can throw a stale config when mongos is not up-to-date -- fix. if (!conn->runCommand(nss.db().toString(), command, shardResult, options)) { @@ -1031,7 +1060,6 @@ public: << "; result: " << shardResult); } - conn.done(); } catch (const DBException& ex) { return ex.toStatus(); @@ -1039,9 +1067,9 @@ public: // Fill out the command result. Strategy::CommandResult cmdResult; - cmdResult.shardTargetId = routingInfo.primaryId(); + cmdResult.shardTargetId = conf->getPrimaryId(); cmdResult.result = shardResult; - cmdResult.target = routingInfo.primary()->getConnString(); + cmdResult.target = primaryShardStatus.getValue()->getConnString(); return ClusterExplain::buildExplainResult( opCtx, {cmdResult}, ClusterExplain::kSingleShard, timer.millis(), out); @@ -1052,18 +1080,15 @@ public: class SplitVectorCmd : public NotAllowedOnShardedCollectionCmd { public: SplitVectorCmd() : NotAllowedOnShardedCollectionCmd("splitVector") {} - - bool passOptions() const override { + virtual bool passOptions() const { return true; } - - bool supportsWriteConcern(const BSONObj& cmd) const override { + virtual bool supportsWriteConcern(const BSONObj& cmd) const override { return false; } - - Status checkAuthForCommand(Client* client, - const std::string& dbname, - const BSONObj& cmdObj) override { + virtual Status checkAuthForCommand(Client* client, + const std::string& dbname, + const BSONObj& cmdObj) { if (!AuthorizationSession::get(client)->isAuthorizedForActionsOnResource( ResourcePattern::forExactNamespace(NamespaceString(parseNs(dbname, cmdObj))), ActionType::splitVector)) { @@ -1071,49 +1096,43 @@ public: } return Status::OK(); } - - std::string parseNs(const string& dbname, const BSONObj& cmdObj) const override { - return parseNsFullyQualified(dbname, cmdObj); - } - - bool run(OperationContext* opCtx, - const string& dbName, - BSONObj& cmdObj, - int options, - string& errmsg, - BSONObjBuilder& result) override { - const std::string ns = parseNs(dbName, cmdObj); - uassert(ErrorCodes::IllegalOperation, - "Performing splitVector across dbs isn't supported via mongos", - str::startsWith(ns, dbName)); - + virtual bool run(OperationContext* opCtx, + const string& dbName, + BSONObj& cmdObj, + int options, + string& errmsg, + BSONObjBuilder& result) { + string x = parseNs(dbName, cmdObj); + if (!str::startsWith(x, dbName)) { + errmsg = str::stream() << "doing a splitVector across dbs isn't supported via mongos"; + return false; + } return NotAllowedOnShardedCollectionCmd::run( opCtx, dbName, cmdObj, options, errmsg, result); } + virtual std::string parseNs(const string& dbname, const BSONObj& cmdObj) const { + return parseNsFullyQualified(dbname, cmdObj); + } } splitVectorCmd; class DistinctCmd : public PublicGridCommand { public: DistinctCmd() : PublicGridCommand("distinct") {} - - void help(stringstream& help) const override { + virtual void help(stringstream& help) const { help << "{ distinct : 'collection name' , key : 'a.b' , query : {} }"; } - - bool passOptions() const override { + virtual bool passOptions() const { return true; } - - void addRequiredPrivileges(const std::string& dbname, - const BSONObj& cmdObj, - std::vector<Privilege>* out) override { + virtual void addRequiredPrivileges(const std::string& dbname, + const BSONObj& cmdObj, + std::vector<Privilege>* out) { ActionSet actions; actions.addAction(ActionType::find); out->push_back(Privilege(parseResourcePattern(dbname, cmdObj), actions)); } - - bool supportsWriteConcern(const BSONObj& cmd) const override { + virtual bool supportsWriteConcern(const BSONObj& cmd) const override { return false; } @@ -1122,13 +1141,18 @@ public: BSONObj& cmdObj, int options, string& errmsg, - BSONObjBuilder& result) override { + BSONObjBuilder& result) { const NamespaceString nss(parseNsCollectionRequired(dbName, cmdObj)); - auto routingInfo = - uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss)); - if (!routingInfo.cm()) { - if (passthrough(opCtx, dbName, routingInfo.primaryId(), cmdObj, options, result)) { + auto status = Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, dbName); + if (!status.isOK()) { + return appendEmptyResultSet(result, status.getStatus(), nss.ns()); + } + + shared_ptr<DBConfig> conf = status.getValue(); + if (!conf->isSharded(nss.ns())) { + + if (passthrough(opCtx, conf.get(), cmdObj, options, result)) { return true; } @@ -1168,9 +1192,10 @@ public: return false; } - const auto cm = routingInfo.cm(); + shared_ptr<ChunkManager> cm = conf->getChunkManager(opCtx, nss.ns()); + massert(10420, "how could chunk manager be null!", cm); - auto query = getQuery(cmdObj); + BSONObj query = getQuery(cmdObj); auto queryCollation = getCollation(cmdObj); if (!queryCollation.isOK()) { return appendEmptyResultSet(result, queryCollation.getStatus(), nss.ns()); @@ -1205,7 +1230,7 @@ public: ShardConnection conn(shardStatus.getValue()->getConnString(), nss.ns()); BSONObj res; - bool ok = conn->runCommand(nss.db().toString(), cmdObj, res, options); + bool ok = conn->runCommand(conf->name(), cmdObj, res, options); conn.done(); if (!ok) { @@ -1315,18 +1340,16 @@ public: return ClusterExplain::buildExplainResult( opCtx, shardResults, mongosStageName, millisElapsed, out); } - } disinctCmd; class FileMD5Cmd : public PublicGridCommand { public: FileMD5Cmd() : PublicGridCommand("filemd5") {} - - void help(stringstream& help) const override { + virtual void help(stringstream& help) const { help << " example: { filemd5 : ObjectId(aaaaaaa) , root : \"fs\" }"; } - std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const override { + virtual std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const { std::string collectionName; if (const auto rootElt = cmdObj["root"]) { uassert(ErrorCodes::InvalidNamespace, @@ -1340,32 +1363,31 @@ public: return NamespaceString(dbname, collectionName).ns(); } - void addRequiredPrivileges(const std::string& dbname, - const BSONObj& cmdObj, - std::vector<Privilege>* out) override { + virtual void addRequiredPrivileges(const std::string& dbname, + const BSONObj& cmdObj, + std::vector<Privilege>* out) { out->push_back(Privilege(parseResourcePattern(dbname, cmdObj), ActionType::find)); } - bool supportsWriteConcern(const BSONObj& cmd) const override { + virtual bool supportsWriteConcern(const BSONObj& cmd) const override { return false; } bool run(OperationContext* opCtx, const string& dbName, BSONObj& cmdObj, - int options, + int, string& errmsg, - BSONObjBuilder& result) override { + BSONObjBuilder& result) { const NamespaceString nss(parseNs(dbName, cmdObj)); - auto routingInfo = - uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss)); - if (!routingInfo.cm()) { - return passthrough(opCtx, dbName, routingInfo.primaryId(), cmdObj, result); + auto conf = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, dbName)); + if (!conf->isSharded(nss.ns())) { + return passthrough(opCtx, conf.get(), cmdObj, result); } - const auto cm = routingInfo.cm(); - + shared_ptr<ChunkManager> cm = conf->getChunkManager(opCtx, nss.ns()); + massert(13091, "how could chunk manager be null!", cm); if (SimpleBSONObjComparator::kInstance.evaluate(cm->getShardKeyPattern().toBSON() == BSON("files_id" << 1))) { BSONObj finder = BSON("files_id" << cmdObj.firstElement()); @@ -1439,15 +1461,13 @@ public: errmsg = string("sharded filemd5 failed because: ") + res["errmsg"].valuestrsafe(); - return false; } - uassert( - 16246, - str::stream() << "Shard for database " << nss.db() - << " is too old to support GridFS sharded by {files_id:1, n:1}", - res.hasField("md5state")); + uassert(16246, + "Shard " + conf->name() + + " is too old to support GridFS sharded by {files_id:1, n:1}", + res.hasField("md5state")); lastResult = res; int nNext = res["numChunks"].numberInt(); @@ -1477,24 +1497,20 @@ public: class Geo2dFindNearCmd : public PublicGridCommand { public: Geo2dFindNearCmd() : PublicGridCommand("geoNear") {} - - void help(stringstream& h) const override { + void help(stringstream& h) const { h << "http://dochub.mongodb.org/core/geo#GeospatialIndexing-geoNearCommand"; } - - bool passOptions() const override { + virtual bool passOptions() const { return true; } - - void addRequiredPrivileges(const std::string& dbname, - const BSONObj& cmdObj, - std::vector<Privilege>* out) override { + virtual void addRequiredPrivileges(const std::string& dbname, + const BSONObj& cmdObj, + std::vector<Privilege>* out) { ActionSet actions; actions.addAction(ActionType::find); out->push_back(Privilege(parseResourcePattern(dbname, cmdObj), actions)); } - - bool supportsWriteConcern(const BSONObj& cmd) const override { + virtual bool supportsWriteConcern(const BSONObj& cmd) const override { return false; } @@ -1503,16 +1519,16 @@ public: BSONObj& cmdObj, int options, string& errmsg, - BSONObjBuilder& result) override { + BSONObjBuilder& result) { const NamespaceString nss(parseNsCollectionRequired(dbName, cmdObj)); - auto routingInfo = - uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss)); - if (!routingInfo.cm()) { - return passthrough(opCtx, dbName, routingInfo.primaryId(), cmdObj, result); + auto conf = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, dbName)); + if (!conf->isSharded(nss.ns())) { + return passthrough(opCtx, conf.get(), cmdObj, options, result); } - const auto cm = routingInfo.cm(); + shared_ptr<ChunkManager> cm = conf->getChunkManager(opCtx, nss.ns()); + massert(13500, "how could chunk manager be null!", cm); BSONObj query = getQuery(cmdObj); auto collation = getCollation(cmdObj); @@ -1612,76 +1628,64 @@ public: return true; } - } geo2dFindNearCmd; -class CompactCmd : public Command { +class CompactCmd : public PublicGridCommand { public: - CompactCmd() : Command("compact") {} - - bool slaveOk() const override { - return true; - } - - bool adminOnly() const override { - return false; - } - - void addRequiredPrivileges(const std::string& dbname, - const BSONObj& cmdObj, - std::vector<Privilege>* out) override { + CompactCmd() : PublicGridCommand("compact") {} + virtual void addRequiredPrivileges(const std::string& dbname, + const BSONObj& cmdObj, + std::vector<Privilege>* out) { ActionSet actions; actions.addAction(ActionType::compact); out->push_back(Privilege(parseResourcePattern(dbname, cmdObj), actions)); } - - bool supportsWriteConcern(const BSONObj& cmd) const override { + virtual bool supportsWriteConcern(const BSONObj& cmd) const override { return false; } - - bool run(OperationContext* opCtx, - const string& dbName, - BSONObj& cmdObj, - int options, - string& errmsg, - BSONObjBuilder& result) override { - uasserted(ErrorCodes::CommandNotSupported, "compact not allowed through mongos"); + virtual bool run(OperationContext* opCtx, + const string& dbName, + BSONObj& cmdObj, + int, + string& errmsg, + BSONObjBuilder& result) { + errmsg = "compact not allowed through mongos"; + return false; } - } compactCmd; class EvalCmd : public PublicGridCommand { public: EvalCmd() : PublicGridCommand("eval", "$eval") {} - virtual void addRequiredPrivileges(const std::string& dbname, const BSONObj& cmdObj, std::vector<Privilege>* out) { // $eval can do pretty much anything, so require all privileges. RoleGraph::generateUniversalPrivileges(out); } - - bool supportsWriteConcern(const BSONObj& cmd) const override { + virtual bool supportsWriteConcern(const BSONObj& cmd) const override { return false; } - - bool run(OperationContext* opCtx, - const string& dbName, - BSONObj& cmdObj, - int options, - string& errmsg, - BSONObjBuilder& result) override { + virtual bool run(OperationContext* opCtx, + const string& dbName, + BSONObj& cmdObj, + int, + string& errmsg, + BSONObjBuilder& result) { RARELY { warning() << "the eval command is deprecated" << startupWarningsLog; } - // $eval isn't allowed to access sharded collections, but we need to leave the shard to - // detect that - const auto dbInfo = - uassertStatusOK(Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, dbName)); - return passthrough(opCtx, dbName, dbInfo.primaryId(), cmdObj, result); - } + // $eval isn't allowed to access sharded collections, but we need to leave the + // shard to detect that. + auto status = Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, dbName); + if (!status.isOK()) { + return appendCommandStatus(result, status.getStatus()); + } + shared_ptr<DBConfig> conf = status.getValue(); + return passthrough(opCtx, conf.get(), cmdObj, result); + } } evalCmd; class CmdListCollections final : public PublicGridCommand { @@ -1707,7 +1711,7 @@ public: str::stream() << "Not authorized to create users on db: " << dbname); } - bool supportsWriteConcern(const BSONObj& cmd) const override { + virtual bool supportsWriteConcern(const BSONObj& cmd) const override { return false; } @@ -1719,23 +1723,18 @@ public: BSONObjBuilder& result) final { auto nss = NamespaceString::makeListCollectionsNSS(dbName); - auto dbInfoStatus = Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, dbName); - if (!dbInfoStatus.isOK()) { - return appendEmptyResultSet(result, dbInfoStatus.getStatus(), nss.ns()); + auto conf = Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, dbName); + if (!conf.isOK()) { + return appendEmptyResultSet(result, conf.getStatus(), dbName + ".$cmd.listCollections"); } - const auto& dbInfo = dbInfoStatus.getValue(); - - return cursorCommandPassthrough( - opCtx, dbName, dbInfo.primaryId(), cmdObj, nss, options, &result); + return cursorCommandPassthrough(opCtx, conf.getValue(), cmdObj, nss, options, &result); } - } cmdListCollections; class CmdListIndexes final : public PublicGridCommand { public: CmdListIndexes() : PublicGridCommand("listIndexes") {} - virtual Status checkAuthForCommand(Client* client, const std::string& dbname, const BSONObj& cmdObj) { @@ -1758,7 +1757,7 @@ public: << ns.coll()); } - bool supportsWriteConcern(const BSONObj& cmd) const override { + virtual bool supportsWriteConcern(const BSONObj& cmd) const override { return false; } @@ -1768,15 +1767,18 @@ public: int options, string& errmsg, BSONObjBuilder& result) final { - const NamespaceString nss(parseNsCollectionRequired(dbName, cmdObj)); - - const auto routingInfo = - uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss)); + auto conf = Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, dbName); + if (!conf.isOK()) { + return appendCommandStatus(result, conf.getStatus()); + } - const auto commandNss = NamespaceString::makeListIndexesNSS(nss.db(), nss.coll()); + const NamespaceString targetNss(parseNsCollectionRequired(dbName, cmdObj)); + const NamespaceString commandNss = + NamespaceString::makeListIndexesNSS(targetNss.db(), targetNss.coll()); + dassert(targetNss == commandNss.getTargetNSForListIndexes()); return cursorCommandPassthrough( - opCtx, nss.db(), routingInfo.primaryId(), cmdObj, commandNss, options, &result); + opCtx, conf.getValue(), cmdObj, commandNss, options, &result); } } cmdListIndexes; diff --git a/src/mongo/s/commands/run_on_all_shards_cmd.cpp b/src/mongo/s/commands/run_on_all_shards_cmd.cpp index 881b7d654ab..b534bf0628a 100644 --- a/src/mongo/s/commands/run_on_all_shards_cmd.cpp +++ b/src/mongo/s/commands/run_on_all_shards_cmd.cpp @@ -36,12 +36,12 @@ #include <set> #include "mongo/db/jsobj.h" -#include "mongo/s/catalog_cache.h" #include "mongo/s/client/shard.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/commands/cluster_commands_common.h" #include "mongo/s/commands/sharded_command_processing.h" #include "mongo/s/grid.h" +#include "mongo/s/sharding_raii.h" #include "mongo/util/log.h" namespace mongo { @@ -80,7 +80,7 @@ bool RunOnAllShardsCommand::run(OperationContext* opCtx, LOG(1) << "RunOnAllShardsCommand db: " << dbName << " cmd:" << redact(cmdObj); if (_implicitCreateDb) { - uassertStatusOK(createShardDatabase(opCtx, dbName)); + uassertStatusOK(ScopedShardDatabase::getOrCreate(opCtx, dbName)); } std::vector<ShardId> shardIds; diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp index 647e6601dfa..0182a091ab7 100644 --- a/src/mongo/s/commands/strategy.cpp +++ b/src/mongo/s/commands/strategy.cpp @@ -54,6 +54,8 @@ #include "mongo/rpc/metadata/server_selection_metadata.h" #include "mongo/rpc/metadata/tracking_metadata.h" #include "mongo/s/catalog_cache.h" +#include "mongo/s/chunk_manager.h" +#include "mongo/s/chunk_version.h" #include "mongo/s/client/parallel.h" #include "mongo/s/client/shard_connection.h" #include "mongo/s/client/shard_registry.h" @@ -363,10 +365,11 @@ void Strategy::clientCommandOp(OperationContext* opCtx, ShardConnection::checkMyConnectionVersions(opCtx, staleNS); if (loops < 4) { - const NamespaceString nss(staleNS); - if (nss.isValid()) { - Grid::get(opCtx)->catalogCache()->invalidateShardedCollection(nss); - } + // This throws out the entire database cache entry in response to + // StaleConfigException instead of just the collection which encountered it. There + // is no good reason for it other than the lack of lower-granularity cache + // invalidation. + Grid::get(opCtx)->catalogCache()->invalidate(NamespaceString(staleNS).db()); } } catch (const DBException& e) { OpQueryReplyBuilder reply; diff --git a/src/mongo/s/config.cpp b/src/mongo/s/config.cpp new file mode 100644 index 00000000000..f5aec193923 --- /dev/null +++ b/src/mongo/s/config.cpp @@ -0,0 +1,366 @@ +/** + * Copyright (C) 2008-2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding + +#include "mongo/platform/basic.h" + +#include "mongo/s/config.h" + +#include <vector> + +#include "mongo/db/lasterror.h" +#include "mongo/db/operation_context.h" +#include "mongo/db/query/collation/collator_factory_interface.h" +#include "mongo/s/catalog/sharding_catalog_client.h" +#include "mongo/s/catalog/type_chunk.h" +#include "mongo/s/catalog/type_collection.h" +#include "mongo/s/catalog/type_database.h" +#include "mongo/s/catalog_cache.h" +#include "mongo/s/chunk_manager.h" +#include "mongo/s/chunk_version.h" +#include "mongo/s/grid.h" +#include "mongo/stdx/memory.h" +#include "mongo/util/log.h" + +namespace mongo { + +struct CollectionInfo { + // The config server opTime at which the chunk manager below was loaded + const repl::OpTime configOpTime; + + // The chunk manager + const std::shared_ptr<ChunkManager> cm; +}; + +DBConfig::DBConfig(const DatabaseType& dbt, repl::OpTime configOpTime) + : _name(dbt.getName()), + _shardingEnabled(dbt.getSharded()), + _primaryId(dbt.getPrimary()), + _configOpTime(std::move(configOpTime)) {} + +DBConfig::~DBConfig() = default; + +bool DBConfig::isSharded(const std::string& ns) { + stdx::lock_guard<stdx::mutex> lk(_lock); + + return _collections.count(ns) > 0; +} + +void DBConfig::markNSNotSharded(const std::string& ns) { + stdx::lock_guard<stdx::mutex> lk(_lock); + + CollectionInfoMap::iterator it = _collections.find(ns); + if (it != _collections.end()) { + _collections.erase(it); + } +} + +std::shared_ptr<ChunkManager> DBConfig::getChunkManagerIfExists(OperationContext* opCtx, + const std::string& ns, + bool shouldReload, + bool forceReload) { + // Don't report exceptions here as errors in GetLastError + LastError::Disabled ignoreForGLE(&LastError::get(cc())); + + try { + return getChunkManager(opCtx, ns, shouldReload, forceReload); + } catch (const DBException&) { + return nullptr; + } +} + +std::shared_ptr<ChunkManager> DBConfig::getChunkManager(OperationContext* opCtx, + const std::string& ns, + bool shouldReload, + bool forceReload) { + ChunkVersion oldVersion; + std::shared_ptr<ChunkManager> oldManager; + + { + stdx::lock_guard<stdx::mutex> lk(_lock); + + auto it = _collections.find(ns); + + const bool earlyReload = (it == _collections.end()) && (shouldReload || forceReload); + if (earlyReload) { + // This is to catch cases where there this is a new sharded collection. + // Note: read the _reloadCount inside the _lock mutex, so _loadIfNeeded will always + // be forced to perform a reload. + const auto currentReloadIteration = _reloadCount.load(); + _loadIfNeeded(opCtx, currentReloadIteration); + + it = _collections.find(ns); + } + + uassert(ErrorCodes::NamespaceNotSharded, + str::stream() << "Collection is not sharded: " << ns, + it != _collections.end()); + + const auto& ci = it->second; + + if (!(shouldReload || forceReload) || earlyReload) { + return ci.cm; + } + + if (ci.cm) { + oldManager = ci.cm; + oldVersion = ci.cm->getVersion(); + } + } + + // TODO: We need to keep this first one-chunk check in until we have a more efficient way of + // creating/reusing a chunk manager, as doing so requires copying the full set of chunks + // currently + std::vector<ChunkType> newestChunk; + if (oldVersion.isSet() && !forceReload) { + uassertStatusOK(Grid::get(opCtx)->catalogClient(opCtx)->getChunks( + opCtx, + BSON(ChunkType::ns(ns)), + BSON(ChunkType::DEPRECATED_lastmod() << -1), + 1, + &newestChunk, + nullptr, + repl::ReadConcernLevel::kMajorityReadConcern)); + + if (!newestChunk.empty()) { + invariant(newestChunk.size() == 1); + ChunkVersion v = newestChunk[0].getVersion(); + if (v.equals(oldVersion)) { + stdx::lock_guard<stdx::mutex> lk(_lock); + + auto it = _collections.find(ns); + uassert(15885, + str::stream() << "not sharded after reloading from chunks : " << ns, + it != _collections.end()); + + const auto& ci = it->second; + return ci.cm; + } + } + } else if (!oldVersion.isSet()) { + warning() << "version 0 found when " << (forceReload ? "reloading" : "checking") + << " chunk manager; collection '" << ns << "' initially detected as sharded"; + } + + std::unique_ptr<ChunkManager> tempChunkManager; + + { + stdx::lock_guard<stdx::mutex> lll(_hitConfigServerLock); + + if (!newestChunk.empty() && !forceReload) { + // If we have a target we're going for see if we've hit already + stdx::lock_guard<stdx::mutex> lk(_lock); + + auto it = _collections.find(ns); + + if (it != _collections.end()) { + const auto& ci = it->second; + + ChunkVersion currentVersion = newestChunk[0].getVersion(); + + // Only reload if the version we found is newer than our own in the same epoch + if (currentVersion <= ci.cm->getVersion() && + ci.cm->getVersion().hasEqualEpoch(currentVersion)) { + return ci.cm; + } + } + } + + // Reload the chunk manager outside of the DBConfig's mutex so as to not block operations + // for different collections on the same database + tempChunkManager.reset(new ChunkManager( + NamespaceString(oldManager->getns()), + oldManager->getVersion().epoch(), + oldManager->getShardKeyPattern(), + oldManager->getDefaultCollator() ? oldManager->getDefaultCollator()->clone() : nullptr, + oldManager->isUnique())); + tempChunkManager->loadExistingRanges(opCtx, oldManager.get()); + + if (!tempChunkManager->numChunks()) { + // Maybe we're not sharded any more, so do a full reload + const auto currentReloadIteration = _reloadCount.load(); + + const bool successful = [&]() { + stdx::lock_guard<stdx::mutex> lk(_lock); + return _loadIfNeeded(opCtx, currentReloadIteration); + }(); + + // If we aren't successful loading the database entry, we don't want to keep the stale + // object around which has invalid data. + if (!successful) { + Grid::get(opCtx)->catalogCache()->invalidate(_name); + } + + return getChunkManager(opCtx, ns); + } + } + + stdx::lock_guard<stdx::mutex> lk(_lock); + + auto it = _collections.find(ns); + uassert(14822, + str::stream() << "Collection " << ns << " became unsharded in the middle.", + it != _collections.end()); + + const auto& ci = it->second; + + // Reset if our versions aren't the same + bool shouldReset = !tempChunkManager->getVersion().equals(ci.cm->getVersion()); + + // Also reset if we're forced to do so + if (!shouldReset && forceReload) { + shouldReset = true; + warning() << "chunk manager reload forced for collection '" << ns << "', config version is " + << tempChunkManager->getVersion(); + } + + // + // LEGACY BEHAVIOR + // + // It's possible to get into a state when dropping collections when our new version is + // less than our prev version. Behave identically to legacy mongos, for now, and warn to + // draw attention to the problem. + // + // TODO: Assert in next version, to allow smooth upgrades + // + + if (shouldReset && tempChunkManager->getVersion() < ci.cm->getVersion()) { + shouldReset = false; + + warning() << "not resetting chunk manager for collection '" << ns << "', config version is " + << tempChunkManager->getVersion() << " and " + << "old version is " << ci.cm->getVersion(); + } + + // end legacy behavior + + if (shouldReset) { + const auto cmOpTime = tempChunkManager->getConfigOpTime(); + + // The existing ChunkManager could have been updated since we last checked, so replace the + // existing chunk manager only if it is strictly newer. + if (cmOpTime > ci.cm->getConfigOpTime()) { + _collections.erase(ns); + auto emplacedEntryIt = + _collections.emplace(ns, CollectionInfo{cmOpTime, std::move(tempChunkManager)}) + .first; + return emplacedEntryIt->second.cm; + } + } + + return ci.cm; +} + +bool DBConfig::load(OperationContext* opCtx) { + const auto currentReloadIteration = _reloadCount.load(); + stdx::lock_guard<stdx::mutex> lk(_lock); + return _loadIfNeeded(opCtx, currentReloadIteration); +} + +bool DBConfig::_loadIfNeeded(OperationContext* opCtx, Counter reloadIteration) { + if (reloadIteration != _reloadCount.load()) { + return true; + } + + const auto catalogClient = Grid::get(opCtx)->catalogClient(opCtx); + + auto status = catalogClient->getDatabase(opCtx, _name); + if (status == ErrorCodes::NamespaceNotFound) { + return false; + } + + // All other errors are connectivity, etc so throw an exception. + uassertStatusOK(status.getStatus()); + + const auto& dbOpTimePair = status.getValue(); + const auto& dbt = dbOpTimePair.value; + invariant(_name == dbt.getName()); + _primaryId = dbt.getPrimary(); + + invariant(dbOpTimePair.opTime >= _configOpTime); + _configOpTime = dbOpTimePair.opTime; + + // Load all collections + std::vector<CollectionType> collections; + repl::OpTime configOpTimeWhenLoadingColl; + uassertStatusOK( + catalogClient->getCollections(opCtx, &_name, &collections, &configOpTimeWhenLoadingColl)); + + invariant(configOpTimeWhenLoadingColl >= _configOpTime); + + for (const auto& coll : collections) { + auto collIter = _collections.find(coll.getNs().ns()); + if (collIter != _collections.end()) { + invariant(configOpTimeWhenLoadingColl >= collIter->second.configOpTime); + } + + _collections.erase(coll.getNs().ns()); + + if (!coll.getDropped()) { + std::unique_ptr<CollatorInterface> defaultCollator; + if (!coll.getDefaultCollation().isEmpty()) { + auto statusWithCollator = CollatorFactoryInterface::get(opCtx->getServiceContext()) + ->makeFromBSON(coll.getDefaultCollation()); + + // The collation was validated upon collection creation. + invariantOK(statusWithCollator.getStatus()); + + defaultCollator = std::move(statusWithCollator.getValue()); + } + + std::unique_ptr<ChunkManager> manager( + stdx::make_unique<ChunkManager>(coll.getNs(), + coll.getEpoch(), + ShardKeyPattern(coll.getKeyPattern()), + std::move(defaultCollator), + coll.getUnique())); + + // Do the blocking collection load + manager->loadExistingRanges(opCtx, nullptr); + + // Collections with no chunks are unsharded, no matter what the collections entry says + if (manager->numChunks()) { + _collections.emplace( + coll.getNs().ns(), + CollectionInfo{configOpTimeWhenLoadingColl, std::move(manager)}); + } + } + } + + _reloadCount.fetchAndAdd(1); + + return true; +} + +ShardId DBConfig::getPrimaryId() { + stdx::lock_guard<stdx::mutex> lk(_lock); + return _primaryId; +} + +} // namespace mongo diff --git a/src/mongo/s/config.h b/src/mongo/s/config.h new file mode 100644 index 00000000000..bbd63cf3b3b --- /dev/null +++ b/src/mongo/s/config.h @@ -0,0 +1,146 @@ +/** + * Copyright (C) 2008-2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <string> + +#include "mongo/db/repl/optime.h" +#include "mongo/platform/atomic_word.h" +#include "mongo/s/shard_id.h" +#include "mongo/stdx/mutex.h" + +namespace mongo { + +class ChunkManager; +struct CollectionInfo; +class DatabaseType; +class OperationContext; + +/** + * Represents the cache entry for a database. + */ +class DBConfig { +public: + DBConfig(const DatabaseType& dbt, repl::OpTime configOpTime); + ~DBConfig(); + + /** + * The name of the database which this entry caches. + */ + const std::string& name() const { + return _name; + } + + ShardId getPrimaryId(); + + /** + * Returns whether 'enableSharding' has been called for this database. + */ + bool isShardingEnabled() const { + return _shardingEnabled; + } + + /** + * Removes the specified namespace from the set of collections under this database entry so that + * from then onwards it will be treated as unsharded. + * + * Note that this method doesn't do any writes to the config metadata, but simply drops the + * specified namespace from the cache. + */ + void markNSNotSharded(const std::string& ns); + + /** + * @return whether or not the 'ns' collection is partitioned + */ + bool isSharded(const std::string& ns); + + std::shared_ptr<ChunkManager> getChunkManager(OperationContext* opCtx, + const std::string& ns, + bool reload = false, + bool forceReload = false); + std::shared_ptr<ChunkManager> getChunkManagerIfExists(OperationContext* opCtx, + const std::string& ns, + bool reload = false, + bool forceReload = false); + + /** + * Returns true if it is successful at loading the DBConfig, false if the database is not found, + * and throws on all other errors. + */ + bool load(OperationContext* opCtx); + +protected: + typedef std::map<std::string, CollectionInfo> CollectionInfoMap; + typedef AtomicUInt64::WordType Counter; + + /** + * Returns true if it is successful at loading the DBConfig, false if the database is not found, + * and throws on all other errors. + * Also returns true without reloading if reloadIteration is not equal to the _reloadCount. + * This is to avoid multiple threads attempting to reload do duplicate work. + */ + bool _loadIfNeeded(OperationContext* opCtx, Counter reloadIteration); + + // All member variables are labeled with one of the following codes indicating the + // synchronization rules for accessing them. + // + // (L) Must hold _lock for access. + // (S) Self synchronizing, no explicit locking needed. + // + // Mutex lock order: + // _hitConfigServerLock -> _lock + // + + // Name of the database which this entry caches + const std::string _name; + + // Whether sharding is enabled for this database + const bool _shardingEnabled; + + // Primary shard id + ShardId _primaryId; // (L) + + // Set of collections and lock to protect access + stdx::mutex _lock; + CollectionInfoMap _collections; // (L) + + // OpTime of config server when the database definition was loaded. + repl::OpTime _configOpTime; // (L) + + // Ensures that only one thread at a time loads collection configuration data from + // the config server + stdx::mutex _hitConfigServerLock; + + // Increments every time this performs a full reload. Since a full reload can take a very + // long time for very large clusters, this can be used to minimize duplicate work when multiple + // threads tries to perform full rerload at roughly the same time. + AtomicUInt64 _reloadCount; // (S) +}; + +} // namespace mongo diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp index 3d4c384c506..d944954635a 100644 --- a/src/mongo/s/query/cluster_find.cpp +++ b/src/mongo/s/query/cluster_find.cpp @@ -52,6 +52,7 @@ #include "mongo/s/query/cluster_client_cursor_impl.h" #include "mongo/s/query/cluster_cursor_manager.h" #include "mongo/s/query/store_possible_cursor.h" +#include "mongo/s/sharding_raii.h" #include "mongo/s/stale_exception.h" #include "mongo/stdx/memory.h" #include "mongo/util/fail_point_service.h" @@ -318,34 +319,31 @@ StatusWith<CursorId> ClusterFind::runQuery(OperationContext* opCtx, << query.getQueryRequest().getProj()}; } - auto const catalogCache = Grid::get(opCtx)->catalogCache(); - // Re-target and re-send the initial find command to the shards until we have established the // shard version. for (size_t retries = 1; retries <= kMaxStaleConfigRetries; ++retries) { - auto routingInfoStatus = catalogCache->getCollectionRoutingInfo(opCtx, query.nss()); - if (routingInfoStatus == ErrorCodes::NamespaceNotFound) { + auto scopedCMStatus = ScopedChunkManager::get(opCtx, query.nss()); + if (scopedCMStatus == ErrorCodes::NamespaceNotFound) { // If the database doesn't exist, we successfully return an empty result set without // creating a cursor. return CursorId(0); - } else if (!routingInfoStatus.isOK()) { - return routingInfoStatus.getStatus(); + } else if (!scopedCMStatus.isOK()) { + return scopedCMStatus.getStatus(); } - auto& routingInfo = routingInfoStatus.getValue(); + const auto& scopedCM = scopedCMStatus.getValue(); auto cursorId = runQueryWithoutRetrying(opCtx, query, readPref, - routingInfo.cm().get(), - routingInfo.primary(), + scopedCM.cm().get(), + scopedCM.primary(), results, viewDefinition); if (cursorId.isOK()) { return cursorId; } - - const auto& status = cursorId.getStatus(); + auto status = std::move(cursorId.getStatus()); if (!ErrorCodes::isStaleShardingError(status.code()) && status != ErrorCodes::ShardNotFound) { @@ -359,7 +357,11 @@ StatusWith<CursorId> ClusterFind::runQuery(OperationContext* opCtx, << " on attempt " << retries << " of " << kMaxStaleConfigRetries << ": " << redact(status); - catalogCache->onStaleConfigError(std::move(routingInfo)); + if (status == ErrorCodes::StaleEpoch) { + Grid::get(opCtx)->catalogCache()->invalidate(query.nss().db().toString()); + } else { + scopedCM.db()->getChunkManagerIfExists(opCtx, query.nss().ns(), true); + } } return {ErrorCodes::StaleShardVersion, diff --git a/src/mongo/s/sharding_raii.cpp b/src/mongo/s/sharding_raii.cpp new file mode 100644 index 00000000000..b90f975ed35 --- /dev/null +++ b/src/mongo/s/sharding_raii.cpp @@ -0,0 +1,159 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/s/sharding_raii.h" + +#include "mongo/base/status_with.h" +#include "mongo/s/catalog/sharding_catalog_client.h" +#include "mongo/s/catalog_cache.h" +#include "mongo/s/chunk_manager.h" +#include "mongo/s/client/shard_registry.h" +#include "mongo/s/grid.h" + +namespace mongo { + +using std::shared_ptr; + +ScopedShardDatabase::ScopedShardDatabase(std::shared_ptr<DBConfig> db) : _db(db) { + invariant(_db); +} + +ScopedShardDatabase::~ScopedShardDatabase() = default; + +StatusWith<ScopedShardDatabase> ScopedShardDatabase::getExisting(OperationContext* opCtx, + StringData dbName) { + auto dbStatus = Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, dbName.toString()); + if (!dbStatus.isOK()) { + return {dbStatus.getStatus().code(), + str::stream() << "Database " << dbName << " was not found due to " + << dbStatus.getStatus().toString()}; + } + + return {ScopedShardDatabase(std::move(dbStatus.getValue()))}; +} + +StatusWith<ScopedShardDatabase> ScopedShardDatabase::getOrCreate(OperationContext* opCtx, + StringData dbName) { + auto dbStatus = getExisting(opCtx, dbName); + if (dbStatus.isOK()) { + return dbStatus; + } + + if (dbStatus == ErrorCodes::NamespaceNotFound) { + auto statusCreateDb = + Grid::get(opCtx)->catalogClient(opCtx)->createDatabase(opCtx, dbName.toString()); + if (statusCreateDb.isOK() || statusCreateDb == ErrorCodes::NamespaceExists) { + return getExisting(opCtx, dbName); + } + + return statusCreateDb; + } + + return dbStatus.getStatus(); +} + +ScopedChunkManager::ScopedChunkManager(ScopedShardDatabase db, std::shared_ptr<ChunkManager> cm) + : _db(std::move(db)), _cm(std::move(cm)) {} + +ScopedChunkManager::ScopedChunkManager(ScopedShardDatabase db, std::shared_ptr<Shard> primary) + : _db(std::move(db)), _primary(std::move(primary)) {} + +ScopedChunkManager::~ScopedChunkManager() = default; + +StatusWith<ScopedChunkManager> ScopedChunkManager::get(OperationContext* opCtx, + const NamespaceString& nss) { + auto scopedDbStatus = ScopedShardDatabase::getExisting(opCtx, nss.db()); + if (!scopedDbStatus.isOK()) { + return scopedDbStatus.getStatus(); + } + + auto scopedDb = std::move(scopedDbStatus.getValue()); + + auto cm = scopedDb.db()->getChunkManagerIfExists(opCtx, nss.ns()); + if (cm) { + return {ScopedChunkManager(std::move(scopedDb), std::move(cm))}; + } + + auto shardStatus = + Grid::get(opCtx)->shardRegistry()->getShard(opCtx, scopedDb.db()->getPrimaryId()); + if (!shardStatus.isOK()) { + return {ErrorCodes::fromInt(40371), + str::stream() << "The primary shard for collection " << nss.ns() + << " could not be loaded due to error " + << shardStatus.getStatus().toString()}; + } + + return {ScopedChunkManager(std::move(scopedDb), std::move(shardStatus.getValue()))}; +} + +StatusWith<ScopedChunkManager> ScopedChunkManager::getOrCreate(OperationContext* opCtx, + const NamespaceString& nss) { + auto scopedDbStatus = ScopedShardDatabase::getOrCreate(opCtx, nss.db()); + if (!scopedDbStatus.isOK()) { + return scopedDbStatus.getStatus(); + } + + return ScopedChunkManager::get(opCtx, nss); +} + +StatusWith<ScopedChunkManager> ScopedChunkManager::refreshAndGet(OperationContext* opCtx, + const NamespaceString& nss) { + auto scopedDbStatus = ScopedShardDatabase::getExisting(opCtx, nss.db()); + if (!scopedDbStatus.isOK()) { + return scopedDbStatus.getStatus(); + } + + auto scopedDb = std::move(scopedDbStatus.getValue()); + + try { + std::shared_ptr<ChunkManager> cm = + scopedDb.db()->getChunkManager(opCtx, nss.ns(), true, false); + + if (!cm) { + return {ErrorCodes::NamespaceNotSharded, + str::stream() << "Collection " << nss.ns() + << " does not exist or is not sharded."}; + } + + if (cm->getChunkMap().empty()) { + return {ErrorCodes::NamespaceNotSharded, + str::stream() << "Collection " << nss.ns() + << " is marked as sharded, but does not have any chunks. This " + "most likely indicates a corrupted metadata or " + "partially completed 'shardCollection' command."}; + } + + return {ScopedChunkManager(std::move(scopedDb), std::move(cm))}; + } catch (const AssertionException& e) { + return e.toStatus(); + } +} + +} // namespace mongo diff --git a/src/mongo/s/sharding_raii.h b/src/mongo/s/sharding_raii.h new file mode 100644 index 00000000000..0c54f281985 --- /dev/null +++ b/src/mongo/s/sharding_raii.h @@ -0,0 +1,152 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/base/disallow_copying.h" +#include "mongo/s/chunk_manager.h" +#include "mongo/s/config.h" + +namespace mongo { + +class OperationContext; + +class ScopedShardDatabase { + MONGO_DISALLOW_COPYING(ScopedShardDatabase); + +public: + ScopedShardDatabase(ScopedShardDatabase&&) = default; + ~ScopedShardDatabase(); + + /** + * Ensures that the specified database exists in the cache and if it does, returns it. + * Otherwise, either returns NamespaceNotFound if the database does not exist, or any other + * error code indicating why the database could not be loaded. + */ + static StatusWith<ScopedShardDatabase> getExisting(OperationContext* opCtx, StringData dbName); + + /** + * If the specified database exists already, loads it in the cache (if not already there) and + * returns it. Otherwise, if it does not exis, this call will implicitly create it as + * non-sharded. + */ + static StatusWith<ScopedShardDatabase> getOrCreate(OperationContext* opCtx, StringData dbName); + + /** + * Returns the underlying database cache entry. + */ + DBConfig* db() const { + return _db.get(); + } + + /** + * This method is here only for compatibility with the legacy M/R code, which requires a shared + * reference to the underlying database. It should not be used in new code. + */ + std::shared_ptr<DBConfig> getSharedDbReference() const { + return _db; + } + +private: + explicit ScopedShardDatabase(std::shared_ptr<DBConfig> db); + + // Reference to the corresponding database. Never null. + std::shared_ptr<DBConfig> _db; +}; + +class ScopedChunkManager { + MONGO_DISALLOW_COPYING(ScopedChunkManager); + +public: + ScopedChunkManager(ScopedChunkManager&&) = default; + ~ScopedChunkManager(); + + /** + * If the specified namespace is sharded, returns a ScopedChunkManager initialized with that + * collection's routing information. If it is not, the object returned is initialized with the + * database primary node on which the unsharded collection must reside. + * + * Returns NamespaceNotFound if the database does not exist, or any other error indicating + * problem communicating with the config server. + */ + static StatusWith<ScopedChunkManager> get(OperationContext* opCtx, const NamespaceString& nss); + + /** + * If the database holding the specified namespace does not exist, creates it and then behaves + * like the 'get' method above. + */ + static StatusWith<ScopedChunkManager> getOrCreate(OperationContext* opCtx, + const NamespaceString& nss); + + /** + * If the specified database and collection do not exist in the cache, tries to load them from + * the config server and returns a reference. If they are already in the cache, makes a call to + * the config server to check if there are any incremental updates to the collection chunk + * metadata and if so incorporates those. Otherwise, if it does not exist or any other error + * occurs, passes that error back. + */ + static StatusWith<ScopedChunkManager> refreshAndGet(OperationContext* opCtx, + const NamespaceString& nss); + + /** + * Returns the underlying database for which we hold reference. + */ + DBConfig* db() const { + return _db.db(); + } + + /** + * If the collection is sharded, returns a chunk manager for it. Otherwise, nullptr. + */ + std::shared_ptr<ChunkManager> cm() const { + return _cm; + } + + /** + * If the collection is not sharded, returns its primary shard. Otherwise, nullptr. + */ + std::shared_ptr<Shard> primary() const { + return _primary; + } + +private: + ScopedChunkManager(ScopedShardDatabase db, std::shared_ptr<ChunkManager> cm); + + ScopedChunkManager(ScopedShardDatabase db, std::shared_ptr<Shard> primary); + + // Scoped reference to the owning database. + ScopedShardDatabase _db; + + // Reference to the corresponding chunk manager (if sharded) or null + std::shared_ptr<ChunkManager> _cm; + + // Reference to the primary of the database (if not sharded) or null + std::shared_ptr<Shard> _primary; +}; + +} // namespace mongo |