diff options
17 files changed, 220 insertions, 79 deletions
diff --git a/jstests/sharding/safe_secondary_reads_drop_recreate.js b/jstests/sharding/safe_secondary_reads_drop_recreate.js index d5be9f69efd..4e43675e86f 100644 --- a/jstests/sharding/safe_secondary_reads_drop_recreate.js +++ b/jstests/sharding/safe_secondary_reads_drop_recreate.js @@ -562,13 +562,21 @@ // Do any test-specific setup. test.setUp(staleMongos); + // Do dummy read from the stale mongos so it loads the routing table into memory once. + // Additionally, do a secondary read to ensure that the secondary has loaded the initial + // routing table -- the first read to the primary will refresh the mongos' shardVersion, + // which will then be used against the secondary to ensure the secondary is fresh. + assert.commandWorked(staleMongos.getDB(db).runCommand({find: coll})); + assert.commandWorked(freshMongos.getDB(db).runCommand({ + find: coll, + $readPreference: {mode: 'secondary'}, + readConcern: {'level': 'local'} + })); + // Turn on system profiler on both secondaries. assert.commandWorked(st.rs0.getSecondary().getDB(db).setProfilingLevel(2)); assert.commandWorked(st.rs1.getSecondary().getDB(db).setProfilingLevel(2)); - // Do dummy read from the stale mongos so it loads the routing table into memory once. - assert.commandWorked(staleMongos.getDB(db).runCommand({find: coll})); - scenarios[scenario](staleMongos, freshMongos, test, commandProfile); // Clean up the database by dropping it; this is the only way to drop the profiler diff --git a/jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js b/jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js index 6d0248e60b6..aa9ca818d8c 100644 --- a/jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js +++ b/jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js @@ -391,8 +391,13 @@ assert.commandWorked(freshMongos.adminCommand({shardCollection: nss, key: {x: 1}})); assert.commandWorked(freshMongos.adminCommand({split: nss, middle: {x: 0}})); - // Do dummy read from the stale mongos so that it loads the routing table into memory once. + // Do dummy read from the stale mongos so it loads the routing table into memory once. + // Additionally, do a secondary read to ensure that the secondary has loaded the initial + // routing table -- the first read to the primary will refresh the mongos' shardVersion, + // which will then be used against the secondary to ensure the secondary is fresh. assert.commandWorked(staleMongos.getDB(db).runCommand({find: coll})); + assert.commandWorked(freshMongos.getDB(db).runCommand( + {find: coll, $readPreference: {mode: 'secondary'}, readConcern: {'level': 'local'}})); // Do any test-specific setup. test.setUp(staleMongos); diff --git a/jstests/sharding/safe_secondary_reads_single_migration_waitForDelete.js b/jstests/sharding/safe_secondary_reads_single_migration_waitForDelete.js index 7204a4b83c7..129197b42a8 100644 --- a/jstests/sharding/safe_secondary_reads_single_migration_waitForDelete.js +++ b/jstests/sharding/safe_secondary_reads_single_migration_waitForDelete.js @@ -332,13 +332,6 @@ let freshMongos = st.s0; let staleMongos = st.s1; - assert.commandWorked(staleMongos.adminCommand({enableSharding: db})); - st.ensurePrimaryShard(db, st.shard0.shardName); - - // Turn on system profiler on secondaries to collect data on all database operations. - assert.commandWorked(donorShardSecondary.getDB(db).setProfilingLevel(2)); - assert.commandWorked(recipientShardSecondary.getDB(db).setProfilingLevel(2)); - let res = st.s.adminCommand({listCommands: 1}); assert.commandWorked(res); @@ -356,15 +349,26 @@ jsTest.log("testing command " + tojson(test.command)); + assert.commandWorked(staleMongos.adminCommand({enableSharding: db})); + st.ensurePrimaryShard(db, st.shard0.shardName); assert.commandWorked(staleMongos.adminCommand({shardCollection: nss, key: {x: 1}})); assert.commandWorked(staleMongos.adminCommand({split: nss, middle: {x: 0}})); - // Do dummy read from the stale mongos so that it loads the routing table into memory once. + // Do dummy read from the stale mongos so it loads the routing table into memory once. + // Additionally, do a secondary read to ensure that the secondary has loaded the initial + // routing table -- the first read to the primary will refresh the mongos' shardVersion, + // which will then be used against the secondary to ensure the secondary is fresh. assert.commandWorked(staleMongos.getDB(db).runCommand({find: coll})); + assert.commandWorked(freshMongos.getDB(db).runCommand( + {find: coll, $readPreference: {mode: 'secondary'}, readConcern: {'level': 'local'}})); // Do any test-specific setup. test.setUp(staleMongos); + // Turn on system profiler on secondaries to collect data on all database operations. + assert.commandWorked(donorShardSecondary.getDB(db).setProfilingLevel(2)); + assert.commandWorked(recipientShardSecondary.getDB(db).setProfilingLevel(2)); + // Do a moveChunk from the fresh mongos to make the other mongos stale. // Use {w:2} (all) write concern so the metadata change gets persisted to the secondary // before stalely versioned commands are sent against the secondary. @@ -450,8 +454,10 @@ }); } - // Clean up the collection by dropping it. This also drops all associated indexes. - assert.commandWorked(freshMongos.getDB(db).runCommand({drop: coll})); + // Clean up the database by dropping it; this is the only way to drop the profiler + // collection on secondaries. This also drops all associated indexes. + // Do this from staleMongos, so staleMongos purges the database entry from its cache. + assert.commandWorked(staleMongos.getDB(db).runCommand({dropDatabase: 1})); } st.stop(); diff --git a/src/mongo/db/s/collection_sharding_state.cpp b/src/mongo/db/s/collection_sharding_state.cpp index 25f2727ff55..29ef4caf2b8 100644 --- a/src/mongo/db/s/collection_sharding_state.cpp +++ b/src/mongo/db/s/collection_sharding_state.cpp @@ -91,7 +91,8 @@ private: }; /** - * Used to notify the catalog cache loader of a new collection version. + * Used to notify the catalog cache loader of a new collection version and invalidate the in-memory + * routing table cache once the oplog updates are committed and become visible. */ class CollectionVersionLogOpHandler final : public RecoveryUnit::Change { public: @@ -99,8 +100,13 @@ public: : _opCtx(opCtx), _nss(nss) {} void commit() override { - Grid::get(_opCtx)->catalogCache()->invalidateShardedCollection(_nss); CatalogCacheLoader::get(_opCtx).notifyOfCollectionVersionUpdate(_nss); + + invariant(_opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX)); + // This is a hack to get around CollectionShardingState::refreshMetadata() requiring the X + // lock: markNotShardedAtStepdown() doesn't have a lock check. Temporary measure until + // SERVER-31595 removes the X lock requirement. + CollectionShardingState::get(_opCtx, _nss)->markNotShardedAtStepdown(); } void rollback() override {} @@ -110,6 +116,17 @@ private: const NamespaceString _nss; }; +/** + * Caller must hold the global lock in some mode other than MODE_NONE. + */ +bool isStandaloneOrPrimary(OperationContext* opCtx) { + dassert(opCtx->lockState()->isLocked()); + auto replCoord = repl::ReplicationCoordinator::get(opCtx); + bool isReplSet = replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet; + return !isReplSet || (repl::ReplicationCoordinator::get(opCtx)->getMemberState() == + repl::MemberState::RS_PRIMARY); +} + } // unnamed namespace CollectionShardingState::CollectionShardingState(ServiceContext* sc, NamespaceString nss) @@ -307,8 +324,7 @@ void CollectionShardingState::onUpdateOp(OperationContext* opCtx, if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) { if (_nss == NamespaceString::kShardConfigCollectionsCollectionName) { - _onConfigRefreshCompleteInvalidateCachedMetadataAndNotify( - opCtx, query, update, updatedDoc); + _onConfigCollectionsUpdateOp(opCtx, query, update, updatedDoc); } if (ShardingState::get(opCtx)->enabled()) { @@ -407,31 +423,44 @@ void CollectionShardingState::onDropCollection(OperationContext* opCtx, } } -void CollectionShardingState::_onConfigRefreshCompleteInvalidateCachedMetadataAndNotify( - OperationContext* opCtx, - const BSONObj& query, - const BSONObj& update, - const BSONObj& updatedDoc) { +void CollectionShardingState::_onConfigCollectionsUpdateOp(OperationContext* opCtx, + const BSONObj& query, + const BSONObj& update, + const BSONObj& updatedDoc) { dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX)); invariant(serverGlobalParams.clusterRole == ClusterRole::ShardServer); - // Extract which collection entry is being updated - std::string refreshCollection; + // Notification of routing table changes are only needed on secondaries. + if (isStandaloneOrPrimary(opCtx)) { + return; + } + + // Extract which user collection was updated. + std::string updatedCollection; fassertStatusOK( - 40477, bsonExtractStringField(query, ShardCollectionType::ns.name(), &refreshCollection)); - - // Parse the '$set' update, which will contain the 'lastRefreshedCollectionVersion' if it is - // present. - BSONElement updateElement; - fassertStatusOK(40478, - bsonExtractTypedField(update, StringData("$set"), Object, &updateElement)); - BSONObj setField = updateElement.Obj(); - - // If 'lastRefreshedCollectionVersion' is present, then a refresh completed and the catalog - // cache must be invalidated and the catalog cache loader notified of the new version. - if (setField.hasField(ShardCollectionType::lastRefreshedCollectionVersion.name())) { - opCtx->recoveryUnit()->registerChange( - new CollectionVersionLogOpHandler(opCtx, NamespaceString(refreshCollection))); + 40477, bsonExtractStringField(query, ShardCollectionType::ns.name(), &updatedCollection)); + + // Parse the '$set' update. + BSONElement setElement; + Status setStatus = bsonExtractTypedField(update, StringData("$set"), Object, &setElement); + if (setStatus.isOK()) { + BSONObj setField = setElement.Obj(); + const NamespaceString updatedNss(updatedCollection); + + // Need the WUOW to retain the lock for CollectionVersionLogOpHandler::commit(). + AutoGetCollection autoColl(opCtx, updatedNss, MODE_IX); + + if (setField.hasField(ShardCollectionType::lastRefreshedCollectionVersion.name())) { + opCtx->recoveryUnit()->registerChange( + new CollectionVersionLogOpHandler(opCtx, updatedNss)); + } + + if (setField.hasField(ShardCollectionType::enterCriticalSectionCounter.name())) { + // This is a hack to get around CollectionShardingState::refreshMetadata() requiring the + // X lock: markNotShardedAtStepdown() doesn't have a lock check. Temporary measure until + // SERVER-31595 removes the X lock requirement. + CollectionShardingState::get(opCtx, updatedNss)->markNotShardedAtStepdown(); + } } } @@ -440,13 +469,21 @@ void CollectionShardingState::_onConfigDeleteInvalidateCachedMetadataAndNotify( dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX)); invariant(serverGlobalParams.clusterRole == ClusterRole::ShardServer); + // Notification of routing table changes are only needed on secondaries. + if (isStandaloneOrPrimary(opCtx)) { + return; + } + // Extract which collection entry is being deleted from the _id field. std::string deletedCollection; fassertStatusOK( 40479, bsonExtractStringField(query, ShardCollectionType::ns.name(), &deletedCollection)); + const NamespaceString deletedNss(deletedCollection); + + // Need the WUOW to retain the lock for CollectionVersionLogOpHandler::commit(). + AutoGetCollection autoColl(opCtx, deletedNss, MODE_IX); - opCtx->recoveryUnit()->registerChange( - new CollectionVersionLogOpHandler(opCtx, NamespaceString(deletedCollection))); + opCtx->recoveryUnit()->registerChange(new CollectionVersionLogOpHandler(opCtx, deletedNss)); } bool CollectionShardingState::_checkShardVersionOk(OperationContext* opCtx, diff --git a/src/mongo/db/s/collection_sharding_state.h b/src/mongo/db/s/collection_sharding_state.h index c70a032918b..3ce7e8004bd 100644 --- a/src/mongo/db/s/collection_sharding_state.h +++ b/src/mongo/db/s/collection_sharding_state.h @@ -241,38 +241,42 @@ public: private: /** - * Registers a task on the opCtx -- to run after writes from the oplog are committed and visible - * to reads -- to notify the catalog cache loader of a new collection version. The catalog - * cache's routing table for the collection will also be invalidated at that time so that the - * next caller to the catalog cache for routing information will provoke a routing table - * refresh. + * This runs on updates to the shard's persisted cache of the config server's + * config.collections collection. + * + * If an update occurs to the 'lastRefreshedCollectionVersion' field, registers a task on the + * opCtx -- to run after writes from the oplog are committed and visible to reads -- to notify + * the catalog cache loader of a new collection version and clear the routing table so the next + * caller with routing information will provoke a routing table refresh. When + * 'lastRefreshedCollectionVersion' is in 'update', it means that a chunk metadata refresh + * finished being applied to the collection's locally persisted metadata store. * - * This only runs on secondaries, and only when 'lastRefreshedCollectionVersion' is in 'update', - * meaning a chunk metadata refresh finished being applied to the collection's locally persisted - * metadata store. + * If an update occurs to the 'enterCriticalSectionSignal' field, simply clear the routing table + * immediately. This will provoke the next secondary caller to refresh through the primary, + * blocking behind the critical section. * * query - BSON with an _id that identifies which collections entry is being updated. * update - the update being applied to the collections entry. * updatedDoc - the document identified by 'query' with the 'update' applied. * + * This only runs on secondaries. * The global exclusive lock is expected to be held by the caller. */ - void _onConfigRefreshCompleteInvalidateCachedMetadataAndNotify(OperationContext* opCtx, - const BSONObj& query, - const BSONObj& update, - const BSONObj& updatedDoc); + void _onConfigCollectionsUpdateOp(OperationContext* opCtx, + const BSONObj& query, + const BSONObj& update, + const BSONObj& updatedDoc); /** - * Registers a task on the opCtx -- to run after writes from the oplog are committed and visible - * to reads -- to notify the catalog cache loader of a new collection version. The catalog - * cache's routing table for the collection will also be invalidated at that time so that the - * next caller to the catalog cache for routing information will provoke a routing table - * refresh. + * Invalidates the in-memory routing table cache when a collection is dropped, so the next + * caller with routing information will provoke a routing table refresh and see the drop. * - * This only runs on secondaries + * Registers a task on the opCtx -- to run after writes from the oplog are committed and visible + * to reads. * * query - BSON with an _id field that identifies which collections entry is being updated. * + * This only runs on secondaries. * The global exclusive lock is expected to be held by the caller. */ void _onConfigDeleteInvalidateCachedMetadataAndNotify(OperationContext* opCtx, diff --git a/src/mongo/db/s/config/configsvr_shard_collection_command.cpp b/src/mongo/db/s/config/configsvr_shard_collection_command.cpp index 65fee2ae792..49d09e326e5 100644 --- a/src/mongo/db/s/config/configsvr_shard_collection_command.cpp +++ b/src/mongo/db/s/config/configsvr_shard_collection_command.cpp @@ -593,8 +593,7 @@ void migrateAndFurtherSplitInitialChunks(OperationContext* opCtx, } // Reload the config info, after all the migrations - catalogCache->invalidateShardedCollection(nss); - routingInfo = uassertStatusOK(catalogCache->getCollectionRoutingInfo(opCtx, nss)); + routingInfo = uassertStatusOK(catalogCache->getCollectionRoutingInfoWithRefresh(opCtx, nss)); uassert(ErrorCodes::ConflictingOperationInProgress, "Collection was successfully written as sharded but got dropped before it " "could be evenly distributed", diff --git a/src/mongo/db/s/force_routing_table_refresh_command.cpp b/src/mongo/db/s/force_routing_table_refresh_command.cpp index b58cdeb8e0c..16db83b4d49 100644 --- a/src/mongo/db/s/force_routing_table_refresh_command.cpp +++ b/src/mongo/db/s/force_routing_table_refresh_command.cpp @@ -37,8 +37,11 @@ #include "mongo/db/auth/privilege.h" #include "mongo/db/client.h" #include "mongo/db/commands.h" +#include "mongo/db/db_raii.h" #include "mongo/db/operation_context.h" #include "mongo/db/repl/repl_client_info.h" +#include "mongo/db/s/migration_source_manager.h" +#include "mongo/db/s/operation_sharding_state.h" #include "mongo/db/s/sharding_state.h" #include "mongo/s/catalog_cache_loader.h" #include "mongo/s/grid.h" @@ -63,7 +66,7 @@ public: } bool slaveOk() const override { - return true; + return false; } bool supportsWriteConcern(const BSONObj& cmd) const override { @@ -105,6 +108,25 @@ public: const NamespaceString nss(parseNs(dbname, cmdObj)); + { + AutoGetCollection autoColl(opCtx, nss, MODE_IS); + + // If the primary is in the critical section, secondaries must wait for the commit to + // finish on the primary in case a secondary's caller has an afterClusterTime inclusive + // of the commit (and new writes to the committed chunk) that hasn't yet propagated back + // to this shard. This ensures the read your own writes causal consistency guarantee. + auto css = CollectionShardingState::get(opCtx, nss); + if (css && css->getMigrationSourceManager()) { + auto criticalSectionSignal = + css->getMigrationSourceManager()->getMigrationCriticalSectionSignal(true); + if (criticalSectionSignal) { + auto& oss = OperationShardingState::get(opCtx); + oss.setMigrationCriticalSectionSignal(criticalSectionSignal); + oss.waitForMigrationCriticalSectionSignal(opCtx); + } + } + } + LOG(1) << "Forcing routing table refresh for " << nss; ChunkVersion unusedShardVersion; diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp index 7261283268f..f7c3c59f679 100644 --- a/src/mongo/db/s/migration_source_manager.cpp +++ b/src/mongo/db/s/migration_source_manager.cpp @@ -40,10 +40,12 @@ #include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/s/migration_chunk_cloner_source_legacy.h" #include "mongo/db/s/migration_util.h" +#include "mongo/db/s/shard_metadata_util.h" #include "mongo/db/s/sharding_state.h" #include "mongo/db/s/sharding_state_recovery.h" #include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/catalog/type_chunk.h" +#include "mongo/s/catalog/type_shard_collection.h" #include "mongo/s/catalog_cache_loader.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" @@ -59,6 +61,8 @@ namespace mongo { +using namespace shardmetadatautil; + namespace { // Wait at most this much time for the recipient to catch up sufficiently so critical section can be @@ -290,9 +294,29 @@ Status MigrationSourceManager::enterCriticalSection(OperationContext* opCtx) { _critSecSignal = std::make_shared<Notification<void>>(); } + _state = kCriticalSection; + + // Persist a signal to secondaries that we've entered the critical section. This is will cause + // secondaries to refresh their routing table when next accessed, which will block behind the + // critical section. This ensures causal consistency by preventing a stale mongos with a cluster + // time inclusive of the migration config commit update from accessing secondary data. + // Note: this write must occur after the critSec flag is set, to ensure the secondary refresh + // will stall behind the flag. + Status signalStatus = + updateShardCollectionsEntry(opCtx, + BSON(ShardCollectionType::ns() << getNss().ns()), + BSONObj(), + BSON(ShardCollectionType::enterCriticalSectionCounter() << 1), + false /*upsert*/); + if (!signalStatus.isOK()) { + return { + ErrorCodes::OperationFailed, + str::stream() << "Failed to persist critical section signal for secondaries due to: " + << signalStatus.toString()}; + } + log() << "Migration successfully entered critical section"; - _state = kCriticalSection; scopedGuard.Dismiss(); return Status::OK(); } diff --git a/src/mongo/db/s/shard_metadata_util.cpp b/src/mongo/db/s/shard_metadata_util.cpp index 533d955d575..6d929554f30 100644 --- a/src/mongo/db/s/shard_metadata_util.cpp +++ b/src/mongo/db/s/shard_metadata_util.cpp @@ -91,7 +91,7 @@ Status setPersistedRefreshFlags(OperationContext* opCtx, const NamespaceString& // Set 'refreshing' to true. BSONObj update = BSON(ShardCollectionType::refreshing() << true); return updateShardCollectionsEntry( - opCtx, BSON(ShardCollectionType::ns() << nss.ns()), update, false /*upsert*/); + opCtx, BSON(ShardCollectionType::ns() << nss.ns()), update, BSONObj(), false /*upsert*/); } Status unsetPersistedRefreshFlags(OperationContext* opCtx, @@ -103,8 +103,11 @@ Status unsetPersistedRefreshFlags(OperationContext* opCtx, updateBuilder.appendTimestamp(ShardCollectionType::lastRefreshedCollectionVersion(), refreshedVersion.toLong()); - return updateShardCollectionsEntry( - opCtx, BSON(ShardCollectionType::ns() << nss.ns()), updateBuilder.obj(), false /*upsert*/); + return updateShardCollectionsEntry(opCtx, + BSON(ShardCollectionType::ns() << nss.ns()), + updateBuilder.obj(), + BSONObj(), + false /*upsert*/); } StatusWith<RefreshState> getPersistedRefreshFlags(OperationContext* opCtx, @@ -177,25 +180,35 @@ StatusWith<ShardCollectionType> readShardCollectionsEntry(OperationContext* opCt Status updateShardCollectionsEntry(OperationContext* opCtx, const BSONObj& query, const BSONObj& update, + const BSONObj& inc, const bool upsert) { invariant(query.hasField("_id")); if (upsert) { // If upserting, this should be an update from the config server that does not have shard - // refresh information. + // refresh / migration inc signal information. invariant(!update.hasField(ShardCollectionType::refreshing())); invariant(!update.hasField(ShardCollectionType::lastRefreshedCollectionVersion())); + invariant(inc.isEmpty()); } try { DBDirectClient client(opCtx); + BSONObjBuilder builder; + if (!update.isEmpty()) { + // Want to modify the document if it already exists, not replace it. + builder.append("$set", update); + } + if (!inc.isEmpty()) { + builder.append("$inc", inc); + } + auto commandResponse = client.runCommand([&] { write_ops::Update updateOp(NamespaceString{ShardCollectionType::ConfigNS}); updateOp.setUpdates({[&] { write_ops::UpdateOpEntry entry; entry.setQ(query); - // Want to modify the document, not replace it - entry.setU(BSON("$set" << update)); + entry.setU(builder.obj()); entry.setUpsert(upsert); return entry; }()}); diff --git a/src/mongo/db/s/shard_metadata_util.h b/src/mongo/db/s/shard_metadata_util.h index 4aadfc11ba3..9bd035a41b6 100644 --- a/src/mongo/db/s/shard_metadata_util.h +++ b/src/mongo/db/s/shard_metadata_util.h @@ -143,13 +143,17 @@ StatusWith<ShardCollectionType> readShardCollectionsEntry(OperationContext* opCt * concern. * * Uses the $set operator on the update so that updates can be applied without resetting everything. + * 'inc' can be used to specify fields and their increments: it will be assigned to the $inc + * operator. * * If 'upsert' is true, expects neither 'refreshing' or 'lastRefreshedCollectionVersion' to be * present in the update: these refreshing fields should only be added to an existing document. + * Similarly, 'inc' should not specify 'upsert' true. */ Status updateShardCollectionsEntry(OperationContext* opCtx, const BSONObj& query, const BSONObj& update, + const BSONObj& inc, const bool upsert); /** diff --git a/src/mongo/db/s/shard_metadata_util_test.cpp b/src/mongo/db/s/shard_metadata_util_test.cpp index 251d8f06cd9..fed6991ed32 100644 --- a/src/mongo/db/s/shard_metadata_util_test.cpp +++ b/src/mongo/db/s/shard_metadata_util_test.cpp @@ -73,6 +73,7 @@ struct ShardMetadataUtilTest : public ShardServerTestFixture { ASSERT_OK(updateShardCollectionsEntry(operationContext(), BSON(ShardCollectionType::ns(kNss.ns())), shardCollectionType.toBSON(), + BSONObj(), true /*upsert*/)); return shardCollectionType; } diff --git a/src/mongo/db/s/shard_server_catalog_cache_loader.cpp b/src/mongo/db/s/shard_server_catalog_cache_loader.cpp index eb3168ea4d7..e7ad1e78ff4 100644 --- a/src/mongo/db/s/shard_server_catalog_cache_loader.cpp +++ b/src/mongo/db/s/shard_server_catalog_cache_loader.cpp @@ -88,8 +88,11 @@ Status persistCollectionAndChangedChunks(OperationContext* opCtx, collAndChunks.shardKeyPattern, collAndChunks.defaultCollation, collAndChunks.shardKeyIsUnique); - Status status = updateShardCollectionsEntry( - opCtx, BSON(ShardCollectionType::ns() << nss.ns()), update.toBSON(), true /*upsert*/); + Status status = updateShardCollectionsEntry(opCtx, + BSON(ShardCollectionType::ns() << nss.ns()), + update.toBSON(), + BSONObj(), + true /*upsert*/); if (!status.isOK()) { return status; } diff --git a/src/mongo/db/s/sharding_state.cpp b/src/mongo/db/s/sharding_state.cpp index 5a54ef969c9..2f46a11552e 100644 --- a/src/mongo/db/s/sharding_state.cpp +++ b/src/mongo/db/s/sharding_state.cpp @@ -500,10 +500,8 @@ ChunkVersion ShardingState::_refreshMetadata(OperationContext* opCtx, const Name << " before shard name has been set", shardId.isValid()); - auto const catalogCache = Grid::get(opCtx)->catalogCache(); - catalogCache->invalidateShardedCollection(nss); - - const auto routingInfo = uassertStatusOK(catalogCache->getCollectionRoutingInfo(opCtx, nss)); + const auto routingInfo = uassertStatusOK( + Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfoWithRefresh(opCtx, nss)); const auto cm = routingInfo.cm(); if (!cm) { diff --git a/src/mongo/s/catalog/type_shard_collection.cpp b/src/mongo/s/catalog/type_shard_collection.cpp index 3a98a15f7ba..195606b0878 100644 --- a/src/mongo/s/catalog/type_shard_collection.cpp +++ b/src/mongo/s/catalog/type_shard_collection.cpp @@ -51,6 +51,8 @@ const BSONField<bool> ShardCollectionType::unique("unique"); const BSONField<bool> ShardCollectionType::refreshing("refreshing"); const BSONField<Date_t> ShardCollectionType::lastRefreshedCollectionVersion( "lastRefreshedCollectionVersion"); +const BSONField<int> ShardCollectionType::enterCriticalSectionCounter( + "enterCriticalSectionCounter"); ShardCollectionType::ShardCollectionType(NamespaceString nss, boost::optional<UUID> uuid, diff --git a/src/mongo/s/catalog/type_shard_collection.h b/src/mongo/s/catalog/type_shard_collection.h index 8129943ade0..3f1da52eb9c 100644 --- a/src/mongo/s/catalog/type_shard_collection.h +++ b/src/mongo/s/catalog/type_shard_collection.h @@ -61,10 +61,12 @@ class StatusWith; * "locale" : "fr_CA" * }, * "unique" : false, - * "refreshing" : true, // optional - * "lastRefreshedCollectionVersion" : Timestamp(1, 0) // optional + * "refreshing" : true, // optional + * "lastRefreshedCollectionVersion" : Timestamp(1, 0), // optional + * "enterCriticalSectionCounter" : 4 // optional * } * + * enterCriticalSectionCounter is currently just an OpObserver signal, thus otherwise ignored here. */ class ShardCollectionType { public: @@ -79,6 +81,7 @@ public: static const BSONField<bool> unique; static const BSONField<bool> refreshing; static const BSONField<Date_t> lastRefreshedCollectionVersion; + static const BSONField<int> enterCriticalSectionCounter; ShardCollectionType(NamespaceString nss, boost::optional<UUID> uuid, diff --git a/src/mongo/s/catalog_cache.cpp b/src/mongo/s/catalog_cache.cpp index 0c30441202a..d28c55cd573 100644 --- a/src/mongo/s/catalog_cache.cpp +++ b/src/mongo/s/catalog_cache.cpp @@ -193,6 +193,12 @@ StatusWith<CachedCollectionRoutingInfo> CatalogCache::getCollectionRoutingInfo( return getCollectionRoutingInfo(opCtx, NamespaceString(ns)); } +StatusWith<CachedCollectionRoutingInfo> CatalogCache::getCollectionRoutingInfoWithRefresh( + OperationContext* opCtx, const NamespaceString& nss) { + invalidateShardedCollection(nss); + return getCollectionRoutingInfo(opCtx, nss); +} + StatusWith<CachedCollectionRoutingInfo> CatalogCache::getShardedCollectionRoutingInfoWithRefresh( OperationContext* opCtx, const NamespaceString& nss) { invalidateShardedCollection(nss); diff --git a/src/mongo/s/catalog_cache.h b/src/mongo/s/catalog_cache.h index a2506e1c217..44abcaaac92 100644 --- a/src/mongo/s/catalog_cache.h +++ b/src/mongo/s/catalog_cache.h @@ -83,8 +83,14 @@ public: StringData ns); /** - * Same as getCollectionRoutingInfo above, but in addition causes the namespace to be refreshed - * and returns a NamespaceNotSharded error if the collection is not sharded. + * Same as getCollectionRoutingInfo above, but in addition causes the namespace to be refreshed. + */ + StatusWith<CachedCollectionRoutingInfo> getCollectionRoutingInfoWithRefresh( + OperationContext* opCtx, const NamespaceString& nss); + + /** + * Same as getCollectionRoutingInfoWithRefresh above, but in addition returns a + * NamespaceNotSharded error if the collection is not sharded. */ StatusWith<CachedCollectionRoutingInfo> getShardedCollectionRoutingInfoWithRefresh( OperationContext* opCtx, const NamespaceString& nss); |