summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jstests/sharding/safe_secondary_reads_drop_recreate.js14
-rw-r--r--jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js7
-rw-r--r--jstests/sharding/safe_secondary_reads_single_migration_waitForDelete.js26
-rw-r--r--src/mongo/db/s/collection_sharding_state.cpp91
-rw-r--r--src/mongo/db/s/collection_sharding_state.h40
-rw-r--r--src/mongo/db/s/config/configsvr_shard_collection_command.cpp3
-rw-r--r--src/mongo/db/s/force_routing_table_refresh_command.cpp24
-rw-r--r--src/mongo/db/s/migration_source_manager.cpp26
-rw-r--r--src/mongo/db/s/shard_metadata_util.cpp25
-rw-r--r--src/mongo/db/s/shard_metadata_util.h4
-rw-r--r--src/mongo/db/s/shard_metadata_util_test.cpp1
-rw-r--r--src/mongo/db/s/shard_server_catalog_cache_loader.cpp7
-rw-r--r--src/mongo/db/s/sharding_state.cpp6
-rw-r--r--src/mongo/s/catalog/type_shard_collection.cpp2
-rw-r--r--src/mongo/s/catalog/type_shard_collection.h7
-rw-r--r--src/mongo/s/catalog_cache.cpp6
-rw-r--r--src/mongo/s/catalog_cache.h10
17 files changed, 220 insertions, 79 deletions
diff --git a/jstests/sharding/safe_secondary_reads_drop_recreate.js b/jstests/sharding/safe_secondary_reads_drop_recreate.js
index d5be9f69efd..4e43675e86f 100644
--- a/jstests/sharding/safe_secondary_reads_drop_recreate.js
+++ b/jstests/sharding/safe_secondary_reads_drop_recreate.js
@@ -562,13 +562,21 @@
// Do any test-specific setup.
test.setUp(staleMongos);
+ // Do dummy read from the stale mongos so it loads the routing table into memory once.
+ // Additionally, do a secondary read to ensure that the secondary has loaded the initial
+ // routing table -- the first read to the primary will refresh the mongos' shardVersion,
+ // which will then be used against the secondary to ensure the secondary is fresh.
+ assert.commandWorked(staleMongos.getDB(db).runCommand({find: coll}));
+ assert.commandWorked(freshMongos.getDB(db).runCommand({
+ find: coll,
+ $readPreference: {mode: 'secondary'},
+ readConcern: {'level': 'local'}
+ }));
+
// Turn on system profiler on both secondaries.
assert.commandWorked(st.rs0.getSecondary().getDB(db).setProfilingLevel(2));
assert.commandWorked(st.rs1.getSecondary().getDB(db).setProfilingLevel(2));
- // Do dummy read from the stale mongos so it loads the routing table into memory once.
- assert.commandWorked(staleMongos.getDB(db).runCommand({find: coll}));
-
scenarios[scenario](staleMongos, freshMongos, test, commandProfile);
// Clean up the database by dropping it; this is the only way to drop the profiler
diff --git a/jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js b/jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js
index 6d0248e60b6..aa9ca818d8c 100644
--- a/jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js
+++ b/jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js
@@ -391,8 +391,13 @@
assert.commandWorked(freshMongos.adminCommand({shardCollection: nss, key: {x: 1}}));
assert.commandWorked(freshMongos.adminCommand({split: nss, middle: {x: 0}}));
- // Do dummy read from the stale mongos so that it loads the routing table into memory once.
+ // Do dummy read from the stale mongos so it loads the routing table into memory once.
+ // Additionally, do a secondary read to ensure that the secondary has loaded the initial
+ // routing table -- the first read to the primary will refresh the mongos' shardVersion,
+ // which will then be used against the secondary to ensure the secondary is fresh.
assert.commandWorked(staleMongos.getDB(db).runCommand({find: coll}));
+ assert.commandWorked(freshMongos.getDB(db).runCommand(
+ {find: coll, $readPreference: {mode: 'secondary'}, readConcern: {'level': 'local'}}));
// Do any test-specific setup.
test.setUp(staleMongos);
diff --git a/jstests/sharding/safe_secondary_reads_single_migration_waitForDelete.js b/jstests/sharding/safe_secondary_reads_single_migration_waitForDelete.js
index 7204a4b83c7..129197b42a8 100644
--- a/jstests/sharding/safe_secondary_reads_single_migration_waitForDelete.js
+++ b/jstests/sharding/safe_secondary_reads_single_migration_waitForDelete.js
@@ -332,13 +332,6 @@
let freshMongos = st.s0;
let staleMongos = st.s1;
- assert.commandWorked(staleMongos.adminCommand({enableSharding: db}));
- st.ensurePrimaryShard(db, st.shard0.shardName);
-
- // Turn on system profiler on secondaries to collect data on all database operations.
- assert.commandWorked(donorShardSecondary.getDB(db).setProfilingLevel(2));
- assert.commandWorked(recipientShardSecondary.getDB(db).setProfilingLevel(2));
-
let res = st.s.adminCommand({listCommands: 1});
assert.commandWorked(res);
@@ -356,15 +349,26 @@
jsTest.log("testing command " + tojson(test.command));
+ assert.commandWorked(staleMongos.adminCommand({enableSharding: db}));
+ st.ensurePrimaryShard(db, st.shard0.shardName);
assert.commandWorked(staleMongos.adminCommand({shardCollection: nss, key: {x: 1}}));
assert.commandWorked(staleMongos.adminCommand({split: nss, middle: {x: 0}}));
- // Do dummy read from the stale mongos so that it loads the routing table into memory once.
+ // Do dummy read from the stale mongos so it loads the routing table into memory once.
+ // Additionally, do a secondary read to ensure that the secondary has loaded the initial
+ // routing table -- the first read to the primary will refresh the mongos' shardVersion,
+ // which will then be used against the secondary to ensure the secondary is fresh.
assert.commandWorked(staleMongos.getDB(db).runCommand({find: coll}));
+ assert.commandWorked(freshMongos.getDB(db).runCommand(
+ {find: coll, $readPreference: {mode: 'secondary'}, readConcern: {'level': 'local'}}));
// Do any test-specific setup.
test.setUp(staleMongos);
+ // Turn on system profiler on secondaries to collect data on all database operations.
+ assert.commandWorked(donorShardSecondary.getDB(db).setProfilingLevel(2));
+ assert.commandWorked(recipientShardSecondary.getDB(db).setProfilingLevel(2));
+
// Do a moveChunk from the fresh mongos to make the other mongos stale.
// Use {w:2} (all) write concern so the metadata change gets persisted to the secondary
// before stalely versioned commands are sent against the secondary.
@@ -450,8 +454,10 @@
});
}
- // Clean up the collection by dropping it. This also drops all associated indexes.
- assert.commandWorked(freshMongos.getDB(db).runCommand({drop: coll}));
+ // Clean up the database by dropping it; this is the only way to drop the profiler
+ // collection on secondaries. This also drops all associated indexes.
+ // Do this from staleMongos, so staleMongos purges the database entry from its cache.
+ assert.commandWorked(staleMongos.getDB(db).runCommand({dropDatabase: 1}));
}
st.stop();
diff --git a/src/mongo/db/s/collection_sharding_state.cpp b/src/mongo/db/s/collection_sharding_state.cpp
index 25f2727ff55..29ef4caf2b8 100644
--- a/src/mongo/db/s/collection_sharding_state.cpp
+++ b/src/mongo/db/s/collection_sharding_state.cpp
@@ -91,7 +91,8 @@ private:
};
/**
- * Used to notify the catalog cache loader of a new collection version.
+ * Used to notify the catalog cache loader of a new collection version and invalidate the in-memory
+ * routing table cache once the oplog updates are committed and become visible.
*/
class CollectionVersionLogOpHandler final : public RecoveryUnit::Change {
public:
@@ -99,8 +100,13 @@ public:
: _opCtx(opCtx), _nss(nss) {}
void commit() override {
- Grid::get(_opCtx)->catalogCache()->invalidateShardedCollection(_nss);
CatalogCacheLoader::get(_opCtx).notifyOfCollectionVersionUpdate(_nss);
+
+ invariant(_opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX));
+ // This is a hack to get around CollectionShardingState::refreshMetadata() requiring the X
+ // lock: markNotShardedAtStepdown() doesn't have a lock check. Temporary measure until
+ // SERVER-31595 removes the X lock requirement.
+ CollectionShardingState::get(_opCtx, _nss)->markNotShardedAtStepdown();
}
void rollback() override {}
@@ -110,6 +116,17 @@ private:
const NamespaceString _nss;
};
+/**
+ * Caller must hold the global lock in some mode other than MODE_NONE.
+ */
+bool isStandaloneOrPrimary(OperationContext* opCtx) {
+ dassert(opCtx->lockState()->isLocked());
+ auto replCoord = repl::ReplicationCoordinator::get(opCtx);
+ bool isReplSet = replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet;
+ return !isReplSet || (repl::ReplicationCoordinator::get(opCtx)->getMemberState() ==
+ repl::MemberState::RS_PRIMARY);
+}
+
} // unnamed namespace
CollectionShardingState::CollectionShardingState(ServiceContext* sc, NamespaceString nss)
@@ -307,8 +324,7 @@ void CollectionShardingState::onUpdateOp(OperationContext* opCtx,
if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) {
if (_nss == NamespaceString::kShardConfigCollectionsCollectionName) {
- _onConfigRefreshCompleteInvalidateCachedMetadataAndNotify(
- opCtx, query, update, updatedDoc);
+ _onConfigCollectionsUpdateOp(opCtx, query, update, updatedDoc);
}
if (ShardingState::get(opCtx)->enabled()) {
@@ -407,31 +423,44 @@ void CollectionShardingState::onDropCollection(OperationContext* opCtx,
}
}
-void CollectionShardingState::_onConfigRefreshCompleteInvalidateCachedMetadataAndNotify(
- OperationContext* opCtx,
- const BSONObj& query,
- const BSONObj& update,
- const BSONObj& updatedDoc) {
+void CollectionShardingState::_onConfigCollectionsUpdateOp(OperationContext* opCtx,
+ const BSONObj& query,
+ const BSONObj& update,
+ const BSONObj& updatedDoc) {
dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX));
invariant(serverGlobalParams.clusterRole == ClusterRole::ShardServer);
- // Extract which collection entry is being updated
- std::string refreshCollection;
+ // Notification of routing table changes are only needed on secondaries.
+ if (isStandaloneOrPrimary(opCtx)) {
+ return;
+ }
+
+ // Extract which user collection was updated.
+ std::string updatedCollection;
fassertStatusOK(
- 40477, bsonExtractStringField(query, ShardCollectionType::ns.name(), &refreshCollection));
-
- // Parse the '$set' update, which will contain the 'lastRefreshedCollectionVersion' if it is
- // present.
- BSONElement updateElement;
- fassertStatusOK(40478,
- bsonExtractTypedField(update, StringData("$set"), Object, &updateElement));
- BSONObj setField = updateElement.Obj();
-
- // If 'lastRefreshedCollectionVersion' is present, then a refresh completed and the catalog
- // cache must be invalidated and the catalog cache loader notified of the new version.
- if (setField.hasField(ShardCollectionType::lastRefreshedCollectionVersion.name())) {
- opCtx->recoveryUnit()->registerChange(
- new CollectionVersionLogOpHandler(opCtx, NamespaceString(refreshCollection)));
+ 40477, bsonExtractStringField(query, ShardCollectionType::ns.name(), &updatedCollection));
+
+ // Parse the '$set' update.
+ BSONElement setElement;
+ Status setStatus = bsonExtractTypedField(update, StringData("$set"), Object, &setElement);
+ if (setStatus.isOK()) {
+ BSONObj setField = setElement.Obj();
+ const NamespaceString updatedNss(updatedCollection);
+
+ // Need the WUOW to retain the lock for CollectionVersionLogOpHandler::commit().
+ AutoGetCollection autoColl(opCtx, updatedNss, MODE_IX);
+
+ if (setField.hasField(ShardCollectionType::lastRefreshedCollectionVersion.name())) {
+ opCtx->recoveryUnit()->registerChange(
+ new CollectionVersionLogOpHandler(opCtx, updatedNss));
+ }
+
+ if (setField.hasField(ShardCollectionType::enterCriticalSectionCounter.name())) {
+ // This is a hack to get around CollectionShardingState::refreshMetadata() requiring the
+ // X lock: markNotShardedAtStepdown() doesn't have a lock check. Temporary measure until
+ // SERVER-31595 removes the X lock requirement.
+ CollectionShardingState::get(opCtx, updatedNss)->markNotShardedAtStepdown();
+ }
}
}
@@ -440,13 +469,21 @@ void CollectionShardingState::_onConfigDeleteInvalidateCachedMetadataAndNotify(
dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX));
invariant(serverGlobalParams.clusterRole == ClusterRole::ShardServer);
+ // Notification of routing table changes are only needed on secondaries.
+ if (isStandaloneOrPrimary(opCtx)) {
+ return;
+ }
+
// Extract which collection entry is being deleted from the _id field.
std::string deletedCollection;
fassertStatusOK(
40479, bsonExtractStringField(query, ShardCollectionType::ns.name(), &deletedCollection));
+ const NamespaceString deletedNss(deletedCollection);
+
+ // Need the WUOW to retain the lock for CollectionVersionLogOpHandler::commit().
+ AutoGetCollection autoColl(opCtx, deletedNss, MODE_IX);
- opCtx->recoveryUnit()->registerChange(
- new CollectionVersionLogOpHandler(opCtx, NamespaceString(deletedCollection)));
+ opCtx->recoveryUnit()->registerChange(new CollectionVersionLogOpHandler(opCtx, deletedNss));
}
bool CollectionShardingState::_checkShardVersionOk(OperationContext* opCtx,
diff --git a/src/mongo/db/s/collection_sharding_state.h b/src/mongo/db/s/collection_sharding_state.h
index c70a032918b..3ce7e8004bd 100644
--- a/src/mongo/db/s/collection_sharding_state.h
+++ b/src/mongo/db/s/collection_sharding_state.h
@@ -241,38 +241,42 @@ public:
private:
/**
- * Registers a task on the opCtx -- to run after writes from the oplog are committed and visible
- * to reads -- to notify the catalog cache loader of a new collection version. The catalog
- * cache's routing table for the collection will also be invalidated at that time so that the
- * next caller to the catalog cache for routing information will provoke a routing table
- * refresh.
+ * This runs on updates to the shard's persisted cache of the config server's
+ * config.collections collection.
+ *
+ * If an update occurs to the 'lastRefreshedCollectionVersion' field, registers a task on the
+ * opCtx -- to run after writes from the oplog are committed and visible to reads -- to notify
+ * the catalog cache loader of a new collection version and clear the routing table so the next
+ * caller with routing information will provoke a routing table refresh. When
+ * 'lastRefreshedCollectionVersion' is in 'update', it means that a chunk metadata refresh
+ * finished being applied to the collection's locally persisted metadata store.
*
- * This only runs on secondaries, and only when 'lastRefreshedCollectionVersion' is in 'update',
- * meaning a chunk metadata refresh finished being applied to the collection's locally persisted
- * metadata store.
+ * If an update occurs to the 'enterCriticalSectionSignal' field, simply clear the routing table
+ * immediately. This will provoke the next secondary caller to refresh through the primary,
+ * blocking behind the critical section.
*
* query - BSON with an _id that identifies which collections entry is being updated.
* update - the update being applied to the collections entry.
* updatedDoc - the document identified by 'query' with the 'update' applied.
*
+ * This only runs on secondaries.
* The global exclusive lock is expected to be held by the caller.
*/
- void _onConfigRefreshCompleteInvalidateCachedMetadataAndNotify(OperationContext* opCtx,
- const BSONObj& query,
- const BSONObj& update,
- const BSONObj& updatedDoc);
+ void _onConfigCollectionsUpdateOp(OperationContext* opCtx,
+ const BSONObj& query,
+ const BSONObj& update,
+ const BSONObj& updatedDoc);
/**
- * Registers a task on the opCtx -- to run after writes from the oplog are committed and visible
- * to reads -- to notify the catalog cache loader of a new collection version. The catalog
- * cache's routing table for the collection will also be invalidated at that time so that the
- * next caller to the catalog cache for routing information will provoke a routing table
- * refresh.
+ * Invalidates the in-memory routing table cache when a collection is dropped, so the next
+ * caller with routing information will provoke a routing table refresh and see the drop.
*
- * This only runs on secondaries
+ * Registers a task on the opCtx -- to run after writes from the oplog are committed and visible
+ * to reads.
*
* query - BSON with an _id field that identifies which collections entry is being updated.
*
+ * This only runs on secondaries.
* The global exclusive lock is expected to be held by the caller.
*/
void _onConfigDeleteInvalidateCachedMetadataAndNotify(OperationContext* opCtx,
diff --git a/src/mongo/db/s/config/configsvr_shard_collection_command.cpp b/src/mongo/db/s/config/configsvr_shard_collection_command.cpp
index 65fee2ae792..49d09e326e5 100644
--- a/src/mongo/db/s/config/configsvr_shard_collection_command.cpp
+++ b/src/mongo/db/s/config/configsvr_shard_collection_command.cpp
@@ -593,8 +593,7 @@ void migrateAndFurtherSplitInitialChunks(OperationContext* opCtx,
}
// Reload the config info, after all the migrations
- catalogCache->invalidateShardedCollection(nss);
- routingInfo = uassertStatusOK(catalogCache->getCollectionRoutingInfo(opCtx, nss));
+ routingInfo = uassertStatusOK(catalogCache->getCollectionRoutingInfoWithRefresh(opCtx, nss));
uassert(ErrorCodes::ConflictingOperationInProgress,
"Collection was successfully written as sharded but got dropped before it "
"could be evenly distributed",
diff --git a/src/mongo/db/s/force_routing_table_refresh_command.cpp b/src/mongo/db/s/force_routing_table_refresh_command.cpp
index b58cdeb8e0c..16db83b4d49 100644
--- a/src/mongo/db/s/force_routing_table_refresh_command.cpp
+++ b/src/mongo/db/s/force_routing_table_refresh_command.cpp
@@ -37,8 +37,11 @@
#include "mongo/db/auth/privilege.h"
#include "mongo/db/client.h"
#include "mongo/db/commands.h"
+#include "mongo/db/db_raii.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/repl/repl_client_info.h"
+#include "mongo/db/s/migration_source_manager.h"
+#include "mongo/db/s/operation_sharding_state.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/s/catalog_cache_loader.h"
#include "mongo/s/grid.h"
@@ -63,7 +66,7 @@ public:
}
bool slaveOk() const override {
- return true;
+ return false;
}
bool supportsWriteConcern(const BSONObj& cmd) const override {
@@ -105,6 +108,25 @@ public:
const NamespaceString nss(parseNs(dbname, cmdObj));
+ {
+ AutoGetCollection autoColl(opCtx, nss, MODE_IS);
+
+ // If the primary is in the critical section, secondaries must wait for the commit to
+ // finish on the primary in case a secondary's caller has an afterClusterTime inclusive
+ // of the commit (and new writes to the committed chunk) that hasn't yet propagated back
+ // to this shard. This ensures the read your own writes causal consistency guarantee.
+ auto css = CollectionShardingState::get(opCtx, nss);
+ if (css && css->getMigrationSourceManager()) {
+ auto criticalSectionSignal =
+ css->getMigrationSourceManager()->getMigrationCriticalSectionSignal(true);
+ if (criticalSectionSignal) {
+ auto& oss = OperationShardingState::get(opCtx);
+ oss.setMigrationCriticalSectionSignal(criticalSectionSignal);
+ oss.waitForMigrationCriticalSectionSignal(opCtx);
+ }
+ }
+ }
+
LOG(1) << "Forcing routing table refresh for " << nss;
ChunkVersion unusedShardVersion;
diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp
index 7261283268f..f7c3c59f679 100644
--- a/src/mongo/db/s/migration_source_manager.cpp
+++ b/src/mongo/db/s/migration_source_manager.cpp
@@ -40,10 +40,12 @@
#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/s/migration_chunk_cloner_source_legacy.h"
#include "mongo/db/s/migration_util.h"
+#include "mongo/db/s/shard_metadata_util.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/db/s/sharding_state_recovery.h"
#include "mongo/s/catalog/sharding_catalog_client.h"
#include "mongo/s/catalog/type_chunk.h"
+#include "mongo/s/catalog/type_shard_collection.h"
#include "mongo/s/catalog_cache_loader.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/grid.h"
@@ -59,6 +61,8 @@
namespace mongo {
+using namespace shardmetadatautil;
+
namespace {
// Wait at most this much time for the recipient to catch up sufficiently so critical section can be
@@ -290,9 +294,29 @@ Status MigrationSourceManager::enterCriticalSection(OperationContext* opCtx) {
_critSecSignal = std::make_shared<Notification<void>>();
}
+ _state = kCriticalSection;
+
+ // Persist a signal to secondaries that we've entered the critical section. This is will cause
+ // secondaries to refresh their routing table when next accessed, which will block behind the
+ // critical section. This ensures causal consistency by preventing a stale mongos with a cluster
+ // time inclusive of the migration config commit update from accessing secondary data.
+ // Note: this write must occur after the critSec flag is set, to ensure the secondary refresh
+ // will stall behind the flag.
+ Status signalStatus =
+ updateShardCollectionsEntry(opCtx,
+ BSON(ShardCollectionType::ns() << getNss().ns()),
+ BSONObj(),
+ BSON(ShardCollectionType::enterCriticalSectionCounter() << 1),
+ false /*upsert*/);
+ if (!signalStatus.isOK()) {
+ return {
+ ErrorCodes::OperationFailed,
+ str::stream() << "Failed to persist critical section signal for secondaries due to: "
+ << signalStatus.toString()};
+ }
+
log() << "Migration successfully entered critical section";
- _state = kCriticalSection;
scopedGuard.Dismiss();
return Status::OK();
}
diff --git a/src/mongo/db/s/shard_metadata_util.cpp b/src/mongo/db/s/shard_metadata_util.cpp
index 533d955d575..6d929554f30 100644
--- a/src/mongo/db/s/shard_metadata_util.cpp
+++ b/src/mongo/db/s/shard_metadata_util.cpp
@@ -91,7 +91,7 @@ Status setPersistedRefreshFlags(OperationContext* opCtx, const NamespaceString&
// Set 'refreshing' to true.
BSONObj update = BSON(ShardCollectionType::refreshing() << true);
return updateShardCollectionsEntry(
- opCtx, BSON(ShardCollectionType::ns() << nss.ns()), update, false /*upsert*/);
+ opCtx, BSON(ShardCollectionType::ns() << nss.ns()), update, BSONObj(), false /*upsert*/);
}
Status unsetPersistedRefreshFlags(OperationContext* opCtx,
@@ -103,8 +103,11 @@ Status unsetPersistedRefreshFlags(OperationContext* opCtx,
updateBuilder.appendTimestamp(ShardCollectionType::lastRefreshedCollectionVersion(),
refreshedVersion.toLong());
- return updateShardCollectionsEntry(
- opCtx, BSON(ShardCollectionType::ns() << nss.ns()), updateBuilder.obj(), false /*upsert*/);
+ return updateShardCollectionsEntry(opCtx,
+ BSON(ShardCollectionType::ns() << nss.ns()),
+ updateBuilder.obj(),
+ BSONObj(),
+ false /*upsert*/);
}
StatusWith<RefreshState> getPersistedRefreshFlags(OperationContext* opCtx,
@@ -177,25 +180,35 @@ StatusWith<ShardCollectionType> readShardCollectionsEntry(OperationContext* opCt
Status updateShardCollectionsEntry(OperationContext* opCtx,
const BSONObj& query,
const BSONObj& update,
+ const BSONObj& inc,
const bool upsert) {
invariant(query.hasField("_id"));
if (upsert) {
// If upserting, this should be an update from the config server that does not have shard
- // refresh information.
+ // refresh / migration inc signal information.
invariant(!update.hasField(ShardCollectionType::refreshing()));
invariant(!update.hasField(ShardCollectionType::lastRefreshedCollectionVersion()));
+ invariant(inc.isEmpty());
}
try {
DBDirectClient client(opCtx);
+ BSONObjBuilder builder;
+ if (!update.isEmpty()) {
+ // Want to modify the document if it already exists, not replace it.
+ builder.append("$set", update);
+ }
+ if (!inc.isEmpty()) {
+ builder.append("$inc", inc);
+ }
+
auto commandResponse = client.runCommand([&] {
write_ops::Update updateOp(NamespaceString{ShardCollectionType::ConfigNS});
updateOp.setUpdates({[&] {
write_ops::UpdateOpEntry entry;
entry.setQ(query);
- // Want to modify the document, not replace it
- entry.setU(BSON("$set" << update));
+ entry.setU(builder.obj());
entry.setUpsert(upsert);
return entry;
}()});
diff --git a/src/mongo/db/s/shard_metadata_util.h b/src/mongo/db/s/shard_metadata_util.h
index 4aadfc11ba3..9bd035a41b6 100644
--- a/src/mongo/db/s/shard_metadata_util.h
+++ b/src/mongo/db/s/shard_metadata_util.h
@@ -143,13 +143,17 @@ StatusWith<ShardCollectionType> readShardCollectionsEntry(OperationContext* opCt
* concern.
*
* Uses the $set operator on the update so that updates can be applied without resetting everything.
+ * 'inc' can be used to specify fields and their increments: it will be assigned to the $inc
+ * operator.
*
* If 'upsert' is true, expects neither 'refreshing' or 'lastRefreshedCollectionVersion' to be
* present in the update: these refreshing fields should only be added to an existing document.
+ * Similarly, 'inc' should not specify 'upsert' true.
*/
Status updateShardCollectionsEntry(OperationContext* opCtx,
const BSONObj& query,
const BSONObj& update,
+ const BSONObj& inc,
const bool upsert);
/**
diff --git a/src/mongo/db/s/shard_metadata_util_test.cpp b/src/mongo/db/s/shard_metadata_util_test.cpp
index 251d8f06cd9..fed6991ed32 100644
--- a/src/mongo/db/s/shard_metadata_util_test.cpp
+++ b/src/mongo/db/s/shard_metadata_util_test.cpp
@@ -73,6 +73,7 @@ struct ShardMetadataUtilTest : public ShardServerTestFixture {
ASSERT_OK(updateShardCollectionsEntry(operationContext(),
BSON(ShardCollectionType::ns(kNss.ns())),
shardCollectionType.toBSON(),
+ BSONObj(),
true /*upsert*/));
return shardCollectionType;
}
diff --git a/src/mongo/db/s/shard_server_catalog_cache_loader.cpp b/src/mongo/db/s/shard_server_catalog_cache_loader.cpp
index eb3168ea4d7..e7ad1e78ff4 100644
--- a/src/mongo/db/s/shard_server_catalog_cache_loader.cpp
+++ b/src/mongo/db/s/shard_server_catalog_cache_loader.cpp
@@ -88,8 +88,11 @@ Status persistCollectionAndChangedChunks(OperationContext* opCtx,
collAndChunks.shardKeyPattern,
collAndChunks.defaultCollation,
collAndChunks.shardKeyIsUnique);
- Status status = updateShardCollectionsEntry(
- opCtx, BSON(ShardCollectionType::ns() << nss.ns()), update.toBSON(), true /*upsert*/);
+ Status status = updateShardCollectionsEntry(opCtx,
+ BSON(ShardCollectionType::ns() << nss.ns()),
+ update.toBSON(),
+ BSONObj(),
+ true /*upsert*/);
if (!status.isOK()) {
return status;
}
diff --git a/src/mongo/db/s/sharding_state.cpp b/src/mongo/db/s/sharding_state.cpp
index 5a54ef969c9..2f46a11552e 100644
--- a/src/mongo/db/s/sharding_state.cpp
+++ b/src/mongo/db/s/sharding_state.cpp
@@ -500,10 +500,8 @@ ChunkVersion ShardingState::_refreshMetadata(OperationContext* opCtx, const Name
<< " before shard name has been set",
shardId.isValid());
- auto const catalogCache = Grid::get(opCtx)->catalogCache();
- catalogCache->invalidateShardedCollection(nss);
-
- const auto routingInfo = uassertStatusOK(catalogCache->getCollectionRoutingInfo(opCtx, nss));
+ const auto routingInfo = uassertStatusOK(
+ Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfoWithRefresh(opCtx, nss));
const auto cm = routingInfo.cm();
if (!cm) {
diff --git a/src/mongo/s/catalog/type_shard_collection.cpp b/src/mongo/s/catalog/type_shard_collection.cpp
index 3a98a15f7ba..195606b0878 100644
--- a/src/mongo/s/catalog/type_shard_collection.cpp
+++ b/src/mongo/s/catalog/type_shard_collection.cpp
@@ -51,6 +51,8 @@ const BSONField<bool> ShardCollectionType::unique("unique");
const BSONField<bool> ShardCollectionType::refreshing("refreshing");
const BSONField<Date_t> ShardCollectionType::lastRefreshedCollectionVersion(
"lastRefreshedCollectionVersion");
+const BSONField<int> ShardCollectionType::enterCriticalSectionCounter(
+ "enterCriticalSectionCounter");
ShardCollectionType::ShardCollectionType(NamespaceString nss,
boost::optional<UUID> uuid,
diff --git a/src/mongo/s/catalog/type_shard_collection.h b/src/mongo/s/catalog/type_shard_collection.h
index 8129943ade0..3f1da52eb9c 100644
--- a/src/mongo/s/catalog/type_shard_collection.h
+++ b/src/mongo/s/catalog/type_shard_collection.h
@@ -61,10 +61,12 @@ class StatusWith;
* "locale" : "fr_CA"
* },
* "unique" : false,
- * "refreshing" : true, // optional
- * "lastRefreshedCollectionVersion" : Timestamp(1, 0) // optional
+ * "refreshing" : true, // optional
+ * "lastRefreshedCollectionVersion" : Timestamp(1, 0), // optional
+ * "enterCriticalSectionCounter" : 4 // optional
* }
*
+ * enterCriticalSectionCounter is currently just an OpObserver signal, thus otherwise ignored here.
*/
class ShardCollectionType {
public:
@@ -79,6 +81,7 @@ public:
static const BSONField<bool> unique;
static const BSONField<bool> refreshing;
static const BSONField<Date_t> lastRefreshedCollectionVersion;
+ static const BSONField<int> enterCriticalSectionCounter;
ShardCollectionType(NamespaceString nss,
boost::optional<UUID> uuid,
diff --git a/src/mongo/s/catalog_cache.cpp b/src/mongo/s/catalog_cache.cpp
index 0c30441202a..d28c55cd573 100644
--- a/src/mongo/s/catalog_cache.cpp
+++ b/src/mongo/s/catalog_cache.cpp
@@ -193,6 +193,12 @@ StatusWith<CachedCollectionRoutingInfo> CatalogCache::getCollectionRoutingInfo(
return getCollectionRoutingInfo(opCtx, NamespaceString(ns));
}
+StatusWith<CachedCollectionRoutingInfo> CatalogCache::getCollectionRoutingInfoWithRefresh(
+ OperationContext* opCtx, const NamespaceString& nss) {
+ invalidateShardedCollection(nss);
+ return getCollectionRoutingInfo(opCtx, nss);
+}
+
StatusWith<CachedCollectionRoutingInfo> CatalogCache::getShardedCollectionRoutingInfoWithRefresh(
OperationContext* opCtx, const NamespaceString& nss) {
invalidateShardedCollection(nss);
diff --git a/src/mongo/s/catalog_cache.h b/src/mongo/s/catalog_cache.h
index a2506e1c217..44abcaaac92 100644
--- a/src/mongo/s/catalog_cache.h
+++ b/src/mongo/s/catalog_cache.h
@@ -83,8 +83,14 @@ public:
StringData ns);
/**
- * Same as getCollectionRoutingInfo above, but in addition causes the namespace to be refreshed
- * and returns a NamespaceNotSharded error if the collection is not sharded.
+ * Same as getCollectionRoutingInfo above, but in addition causes the namespace to be refreshed.
+ */
+ StatusWith<CachedCollectionRoutingInfo> getCollectionRoutingInfoWithRefresh(
+ OperationContext* opCtx, const NamespaceString& nss);
+
+ /**
+ * Same as getCollectionRoutingInfoWithRefresh above, but in addition returns a
+ * NamespaceNotSharded error if the collection is not sharded.
*/
StatusWith<CachedCollectionRoutingInfo> getShardedCollectionRoutingInfoWithRefresh(
OperationContext* opCtx, const NamespaceString& nss);