summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/s/collection_sharding_state.cpp95
-rw-r--r--src/mongo/db/s/shard_metadata_util.cpp15
-rw-r--r--src/mongo/db/s/shard_metadata_util.h11
-rw-r--r--src/mongo/db/s/shard_server_catalog_cache_loader.cpp191
-rw-r--r--src/mongo/db/s/shard_server_catalog_cache_loader.h25
5 files changed, 200 insertions, 137 deletions
diff --git a/src/mongo/db/s/collection_sharding_state.cpp b/src/mongo/db/s/collection_sharding_state.cpp
index 348d76e6886..27d00b58b91 100644
--- a/src/mongo/db/s/collection_sharding_state.cpp
+++ b/src/mongo/db/s/collection_sharding_state.cpp
@@ -108,18 +108,6 @@ private:
const ChunkVersion _updatedVersion;
};
-/**
- * Checks via the ReplicationCoordinator whether this server is a primary/standalone that can do
- * writes. This function may return false if the server is primary but in drain mode.
- *
- * Note: expects the global lock to be held so that a meaningful answer is returned -- replica set
- * state cannot change under a lock.
- */
-bool isPrimary(OperationContext* opCtx, const NamespaceString& nss) {
- // If the server can execute writes, then it is either a primary or standalone.
- return repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesFor(opCtx, nss);
-}
-
} // unnamed namespace
CollectionShardingState::CollectionShardingState(ServiceContext* sc, NamespaceString nss)
@@ -412,43 +400,37 @@ void CollectionShardingState::_onConfigRefreshCompleteInvalidateCachedMetadataAn
dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX));
invariant(serverGlobalParams.clusterRole == ClusterRole::ShardServer);
- if (!isPrimary(opCtx, _nss)) {
- // Extract which collection entry is being updated
- std::string refreshCollection;
- fassertStatusOK(
- 40477,
- bsonExtractStringField(query, ShardCollectionType::uuid.name(), &refreshCollection));
-
- // Parse the '$set' update, which will contain the 'lastRefreshedCollectionVersion' if it is
- // present.
- BSONElement updateElement;
- fassertStatusOK(40478,
- bsonExtractTypedField(update, StringData("$set"), Object, &updateElement));
- BSONObj setField = updateElement.Obj();
-
- // If 'lastRefreshedCollectionVersion' is present, then a refresh completed and the catalog
- // cache must be invalidated and the catalog cache loader notified of the new version.
- if (setField.hasField(ShardCollectionType::lastRefreshedCollectionVersion.name())) {
- // Get the version epoch from the 'updatedDoc', since it didn't get updated and won't be
- // in 'update'.
- BSONElement oidElem;
- fassert(40513,
- bsonExtractTypedField(
- updatedDoc, ShardCollectionType::epoch(), BSONType::jstOID, &oidElem));
-
- // Get the new collection version.
- auto statusWithLastRefreshedChunkVersion =
- ChunkVersion::parseFromBSONWithFieldAndSetEpoch(
- updatedDoc,
- ShardCollectionType::lastRefreshedCollectionVersion(),
- oidElem.OID());
- fassert(40514, statusWithLastRefreshedChunkVersion.isOK());
-
- opCtx->recoveryUnit()->registerChange(
- new CollectionVersionLogOpHandler(opCtx,
- NamespaceString(refreshCollection),
- statusWithLastRefreshedChunkVersion.getValue()));
- }
+ // Extract which collection entry is being updated
+ std::string refreshCollection;
+ fassertStatusOK(
+ 40477, bsonExtractStringField(query, ShardCollectionType::uuid.name(), &refreshCollection));
+
+ // Parse the '$set' update, which will contain the 'lastRefreshedCollectionVersion' if it is
+ // present.
+ BSONElement updateElement;
+ fassertStatusOK(40478,
+ bsonExtractTypedField(update, StringData("$set"), Object, &updateElement));
+ BSONObj setField = updateElement.Obj();
+
+ // If 'lastRefreshedCollectionVersion' is present, then a refresh completed and the catalog
+ // cache must be invalidated and the catalog cache loader notified of the new version.
+ if (setField.hasField(ShardCollectionType::lastRefreshedCollectionVersion.name())) {
+ // Get the version epoch from the 'updatedDoc', since it didn't get updated and won't be
+ // in 'update'.
+ BSONElement oidElem;
+ fassert(40513,
+ bsonExtractTypedField(
+ updatedDoc, ShardCollectionType::epoch(), BSONType::jstOID, &oidElem));
+
+ // Get the new collection version.
+ auto statusWithLastRefreshedChunkVersion = ChunkVersion::parseFromBSONWithFieldAndSetEpoch(
+ updatedDoc, ShardCollectionType::lastRefreshedCollectionVersion(), oidElem.OID());
+ fassert(40514, statusWithLastRefreshedChunkVersion.isOK());
+
+ opCtx->recoveryUnit()->registerChange(
+ new CollectionVersionLogOpHandler(opCtx,
+ NamespaceString(refreshCollection),
+ statusWithLastRefreshedChunkVersion.getValue()));
}
}
@@ -457,16 +439,13 @@ void CollectionShardingState::_onConfigDeleteInvalidateCachedMetadataAndNotify(
dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX));
invariant(serverGlobalParams.clusterRole == ClusterRole::ShardServer);
- if (!isPrimary(opCtx, _nss)) {
- // Extract which collection entry is being deleted from the _id field.
- std::string deletedCollection;
- fassertStatusOK(
- 40479,
- bsonExtractStringField(query, ShardCollectionType::uuid.name(), &deletedCollection));
+ // Extract which collection entry is being deleted from the _id field.
+ std::string deletedCollection;
+ fassertStatusOK(
+ 40479, bsonExtractStringField(query, ShardCollectionType::uuid.name(), &deletedCollection));
- opCtx->recoveryUnit()->registerChange(new CollectionVersionLogOpHandler(
- opCtx, NamespaceString(deletedCollection), ChunkVersion::UNSHARDED()));
- }
+ opCtx->recoveryUnit()->registerChange(new CollectionVersionLogOpHandler(
+ opCtx, NamespaceString(deletedCollection), ChunkVersion::UNSHARDED()));
}
bool CollectionShardingState::_checkShardVersionOk(OperationContext* opCtx,
diff --git a/src/mongo/db/s/shard_metadata_util.cpp b/src/mongo/db/s/shard_metadata_util.cpp
index 937528403e7..cb614accc31 100644
--- a/src/mongo/db/s/shard_metadata_util.cpp
+++ b/src/mongo/db/s/shard_metadata_util.cpp
@@ -72,11 +72,18 @@ QueryAndSort createShardChunkDiffQuery(const ChunkVersion& collectionVersion) {
BSON(ChunkType::lastmod() << 1)};
}
-bool RefreshState::operator==(RefreshState& other) {
+bool RefreshState::operator==(RefreshState& other) const {
return (other.epoch == epoch) && (other.refreshing == refreshing) &&
(other.lastRefreshedCollectionVersion == lastRefreshedCollectionVersion);
}
+std::string RefreshState::toString() const {
+ return str::stream() << "epoch: " << epoch
+ << ", refreshing: " << (refreshing ? "true" : "false")
+ << ", lastRefreshedCollectionVersion: "
+ << lastRefreshedCollectionVersion.toString();
+}
+
Status setPersistedRefreshFlags(OperationContext* opCtx, const NamespaceString& nss) {
// Set 'refreshing' to true.
BSONObj update = BSON(ShardCollectionType::refreshing() << true);
@@ -119,7 +126,11 @@ StatusWith<RefreshState> getPersistedRefreshFlags(OperationContext* opCtx,
}
return RefreshState{entry.getEpoch(),
- entry.hasRefreshing() ? entry.getRefreshing() : false,
+ // If the refreshing field has not yet been added, this means that the first
+ // refresh has started, but no chunks have ever yet been applied, around
+ // which these flags are set. So default to refreshing true because the
+ // chunk metadata is being updated and is not yet ready to be read.
+ entry.hasRefreshing() ? entry.getRefreshing() : true,
entry.hasLastRefreshedCollectionVersion()
? entry.getLastRefreshedCollectionVersion()
: ChunkVersion(0, 0, entry.getEpoch())};
diff --git a/src/mongo/db/s/shard_metadata_util.h b/src/mongo/db/s/shard_metadata_util.h
index 6a742f8d75a..6a1b8cb5cd6 100644
--- a/src/mongo/db/s/shard_metadata_util.h
+++ b/src/mongo/db/s/shard_metadata_util.h
@@ -60,13 +60,20 @@ struct QueryAndSort {
};
/**
- * Subset of the shard's collections collection related to refresh state.
+ * Subset of the shard's collections collection document that relates to refresh state.
*/
struct RefreshState {
- bool operator==(RefreshState& other);
+ bool operator==(RefreshState& other) const;
+ std::string toString() const;
+ // The current epoch of the collection metadata.
OID epoch;
+
+ // Whether a refresh is currently in progress.
bool refreshing;
+
+ // The collection version after the last complete refresh. Indicates change if refreshing has
+ // started and finished since last loaded.
ChunkVersion lastRefreshedCollectionVersion;
};
diff --git a/src/mongo/db/s/shard_server_catalog_cache_loader.cpp b/src/mongo/db/s/shard_server_catalog_cache_loader.cpp
index edc41a3c66d..52385c7cc8d 100644
--- a/src/mongo/db/s/shard_server_catalog_cache_loader.cpp
+++ b/src/mongo/db/s/shard_server_catalog_cache_loader.cpp
@@ -113,11 +113,15 @@ Status persistCollectionAndChangedChunks(OperationContext* opCtx,
}
/**
+ * This function will throw on error!
+ *
* Retrieves the persisted max chunk version for 'nss', if there are any persisted chunks. If there
* are none -- meaning there's no persisted metadata for 'nss' --, returns a
* ChunkVersion::UNSHARDED() version.
*
- * It is unsafe to call this when a task for 'nss' is running concurrently.
+ * It is unsafe to call this when a task for 'nss' is running concurrently because the collection
+ * could be dropped and recreated between reading the collection epoch and retrieving the chunk,
+ * which would make the returned ChunkVersion corrupt.
*/
ChunkVersion getPersistedMaxVersion(OperationContext* opCtx, const NamespaceString& nss) {
// Must read the collections entry to get the epoch to pass into ChunkType for shard's chunk
@@ -155,28 +159,22 @@ ChunkVersion getPersistedMaxVersion(OperationContext* opCtx, const NamespaceStri
}
/**
- * Tries to find persisted chunk metadata with chunk versions GTE to 'version'. Should always
- * return metadata if the collection exists.
+ * This function will throw on error!
+ *
+ * Tries to find persisted chunk metadata with chunk versions GTE to 'version'.
+ *
*
* If 'version's epoch matches persisted metadata, returns persisted metadata GTE 'version'.
* If 'version's epoch doesn't match persisted metadata, returns all persisted metadata.
- * If there is no persisted metadata, returns an empty CollectionAndChangedChunks object.
+ * If collections entry does not exist, throws NamespaceNotFound error. Can return an empty
+ * chunks vector in CollectionAndChangedChunks without erroring, if collections entry IS found.
*/
-StatusWith<CollectionAndChangedChunks> getPersistedMetadataSinceVersion(OperationContext* opCtx,
- const NamespaceString& nss,
- ChunkVersion version) {
- auto swShardCollectionEntry = readShardCollectionsEntry(opCtx, nss);
- if (swShardCollectionEntry == ErrorCodes::NamespaceNotFound) {
- // If there is no metadata, collection does not exist. Return empty results.
- return CollectionAndChangedChunks();
- } else if (!swShardCollectionEntry.isOK()) {
- return StatusWith<CollectionAndChangedChunks>(
- ErrorCodes::OperationFailed,
- str::stream() << "Failed to load local collection metadata due to '"
- << swShardCollectionEntry.getStatus().toString()
- << "'.");
- }
- auto shardCollectionEntry = std::move(swShardCollectionEntry.getValue());
+CollectionAndChangedChunks getPersistedMetadataSinceVersion(OperationContext* opCtx,
+ const NamespaceString& nss,
+ ChunkVersion version,
+ const bool okToReadWhileRefreshing) {
+ ShardCollectionType shardCollectionEntry =
+ uassertStatusOK(readShardCollectionsEntry(opCtx, nss));
// If the persisted epoch doesn't match what the CatalogCache requested, read everything.
ChunkVersion startingVersion = (shardCollectionEntry.getEpoch() == version.epoch())
@@ -185,44 +183,14 @@ StatusWith<CollectionAndChangedChunks> getPersistedMetadataSinceVersion(Operatio
QueryAndSort diff = createShardChunkDiffQuery(startingVersion);
- auto swChangedChunks =
- readShardChunks(opCtx, nss, diff.query, diff.sort, boost::none, startingVersion.epoch());
- if (!swChangedChunks.isOK()) {
- return StatusWith<CollectionAndChangedChunks>(
- ErrorCodes::OperationFailed,
- str::stream() << "Failed to load local collection metadata due to '"
- << swChangedChunks.getStatus().toString()
- << "'.");
- } else if (swChangedChunks.getValue().empty()) {
- // No chunks were found, collection was dropped. Return empty results.
- return CollectionAndChangedChunks();
- }
-
- // Make sure the collections entry epoch has not changed. Otherwise an epoch changing update was
- // applied after we originally read the entry and the chunks may not match the original epoch.
-
- auto swAfterShardCollectionEntry = readShardCollectionsEntry(opCtx, nss);
- if (swAfterShardCollectionEntry == ErrorCodes::NamespaceNotFound) {
- // The collection has been dropped since we began loading, return empty results.
- return CollectionAndChangedChunks();
- } else if (!swAfterShardCollectionEntry.isOK()) {
- return StatusWith<CollectionAndChangedChunks>(
- ErrorCodes::OperationFailed,
- str::stream() << "Failed to reload local collection metadata due to '"
- << swAfterShardCollectionEntry.getStatus().toString()
- << "'.");
- }
-
- if (shardCollectionEntry.getEpoch() != swAfterShardCollectionEntry.getValue().getEpoch()) {
- // The collection was dropped and recreated since we began. Return empty results.
- return CollectionAndChangedChunks();
- }
+ auto changedChunks = uassertStatusOK(
+ readShardChunks(opCtx, nss, diff.query, diff.sort, boost::none, startingVersion.epoch()));
return CollectionAndChangedChunks{shardCollectionEntry.getEpoch(),
shardCollectionEntry.getKeyPattern().toBSON(),
shardCollectionEntry.getDefaultCollation(),
shardCollectionEntry.getUnique(),
- std::move(swChangedChunks.getValue())};
+ std::move(changedChunks)};
}
} // namespace
@@ -282,7 +250,6 @@ void ShardServerCatalogCacheLoader::waitForVersion(OperationContext* opCtx,
}
if (refreshState.lastRefreshedCollectionVersion <= version) {
-
opCtx->waitForConditionOrInterrupt(*(notification.condVar()), lock, [&]() -> bool {
return notification.version().epoch() != version.epoch() ||
notification.version() >= version;
@@ -344,27 +311,37 @@ std::shared_ptr<Notification<void>> ShardServerCatalogCacheLoader::getChunksSinc
isPrimary = (_role == ReplicaSetRole::Primary);
}
- // TODO: add and plug in secondary machinery
-
auto notify = std::make_shared<Notification<void>>();
- if (isPrimary) {
- uassertStatusOK(_threadPool.schedule(
- [ this, nss, version, currentTerm, callbackFn, notify ]() noexcept {
- auto opCtx = Client::getCurrent()->makeOperationContext();
- try {
+ uassertStatusOK(_threadPool.schedule(
+ [ this, nss, version, callbackFn, notify, isPrimary, currentTerm ]() noexcept {
+ auto opCtx = Client::getCurrent()->makeOperationContext();
+ try {
+ if (isPrimary) {
_schedulePrimaryGetChunksSince(
opCtx.get(), nss, version, currentTerm, callbackFn, notify);
- } catch (const DBException& ex) {
- callbackFn(opCtx.get(), ex.toStatus());
- notify->set();
+ } else {
+ _runSecondaryGetChunksSince(opCtx.get(), nss, version, callbackFn);
}
- }));
- }
+ } catch (const DBException& ex) {
+ callbackFn(opCtx.get(), ex.toStatus());
+ notify->set();
+ }
+ }));
return notify;
}
+void ShardServerCatalogCacheLoader::_runSecondaryGetChunksSince(
+ OperationContext* opCtx,
+ const NamespaceString& nss,
+ const ChunkVersion& catalogCacheSinceVersion,
+ stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn) {
+ auto swCollAndChunks =
+ _getCompletePersistedMetadataForSecondarySinceVersion(opCtx, nss, catalogCacheSinceVersion);
+ callbackFn(opCtx, std::move(swCollAndChunks));
+}
+
void ShardServerCatalogCacheLoader::_schedulePrimaryGetChunksSince(
OperationContext* opCtx,
const NamespaceString& nss,
@@ -404,15 +381,25 @@ void ShardServerCatalogCacheLoader::_schedulePrimaryGetChunksSince(
swCollectionAndChangedChunks != ErrorCodes::NamespaceNotFound) {
// No updates to apply. Do nothing.
} else {
- // Enqueue a Task to apply the update retrieved from the config server.
- Status scheduleStatus = _scheduleTask(
- nss, Task{swCollectionAndChangedChunks, maxLoaderVersion, termScheduled});
- if (!scheduleStatus.isOK()) {
- callbackFn(opCtx, scheduleStatus);
- notify->set();
- return;
+ // Enqueue a Task to apply the update retrieved from the config server, if new data was
+ // retrieved.
+ if (!swCollectionAndChangedChunks.isOK() ||
+ (swCollectionAndChangedChunks.getValue()
+ .changedChunks.back()
+ .getVersion()
+ .epoch() != maxLoaderVersion.epoch()) ||
+ (swCollectionAndChangedChunks.getValue().changedChunks.back().getVersion() >
+ maxLoaderVersion)) {
+ Status scheduleStatus = _scheduleTask(
+ nss, Task{swCollectionAndChangedChunks, maxLoaderVersion, termScheduled});
+ if (!scheduleStatus.isOK()) {
+ callbackFn(opCtx, scheduleStatus);
+ notify->set();
+ return;
+ }
}
+
if (swCollectionAndChangedChunks.isOK()) {
log() << "Cache loader remotely refreshed for collection " << nss
<< " from collection version " << maxLoaderVersion
@@ -460,11 +447,20 @@ StatusWith<CollectionAndChangedChunks> ShardServerCatalogCacheLoader::_getLoader
bool tasksAreEnqueued = std::move(enqueuedRes.first);
CollectionAndChangedChunks enqueued = std::move(enqueuedRes.second);
- auto swPersisted = getPersistedMetadataSinceVersion(opCtx, nss, catalogCacheSinceVersion);
- if (!swPersisted.isOK()) {
+ // TODO: a primary can load metadata while updates are being applied once we have indexes on the
+ // chunk collections that ensure new data is seen after query yields. This function keeps
+ // retrying until no updates are applied concurrently. Waiting on SERVER-27714 to add indexes.
+ auto swPersisted =
+ _getCompletePersistedMetadataForSecondarySinceVersion(opCtx, nss, catalogCacheSinceVersion);
+ CollectionAndChangedChunks persisted;
+ if (swPersisted == ErrorCodes::NamespaceNotFound) {
+ // No persisted metadata found, create an empty object.
+ persisted = CollectionAndChangedChunks();
+ } else if (!swPersisted.isOK()) {
return swPersisted;
+ } else {
+ persisted = std::move(swPersisted.getValue());
}
- CollectionAndChangedChunks persisted = std::move(swPersisted.getValue());
log() << "Cache loader found "
<< (enqueued.changedChunks.empty()
@@ -683,6 +679,51 @@ bool ShardServerCatalogCacheLoader::_updatePersistedMetadata(OperationContext* o
return true;
}
+StatusWith<CollectionAndChangedChunks>
+ShardServerCatalogCacheLoader::_getCompletePersistedMetadataForSecondarySinceVersion(
+ OperationContext* opCtx, const NamespaceString& nss, const ChunkVersion& version) {
+ try {
+ // Keep trying to load the metadata until we get a complete view without updates being
+ // concurrently applied.
+ while (true) {
+ // Wait until a point when updates are not being applied to the 'nss' metadata.
+ RefreshState beginRefreshState = uassertStatusOK(getPersistedRefreshFlags(opCtx, nss));
+ while (beginRefreshState.refreshing) {
+ waitForVersion(opCtx, nss, beginRefreshState.lastRefreshedCollectionVersion, true);
+ beginRefreshState = uassertStatusOK(getPersistedRefreshFlags(opCtx, nss));
+ }
+
+ // Load the metadata.
+ CollectionAndChangedChunks collAndChangedChunks =
+ getPersistedMetadataSinceVersion(opCtx, nss, version, true);
+
+ // Check that no updates were concurrently applied while we were loading the metadata:
+ // this could cause the loaded metadata to provide an incomplete view of the chunk
+ // ranges.
+ RefreshState endRefreshState = uassertStatusOK(getPersistedRefreshFlags(opCtx, nss));
+ if (beginRefreshState == endRefreshState) {
+ return collAndChangedChunks;
+ }
+ LOG(1) << "Cache loader read meatadata while updates were being applied: this"
+ << " metadata may be incomplete. Retrying. Refresh state before read: "
+ << beginRefreshState << ". Current refresh state: '" << endRefreshState << "'.";
+ }
+ } catch (const DBException& ex) {
+ Status status = ex.toStatus();
+
+ // NamespaceNotFound errors are expected and must be returned.
+ if (status == ErrorCodes::NamespaceNotFound) {
+ return status;
+ }
+
+ // All other errors are unhandled.
+ return Status(ErrorCodes::OperationFailed,
+ str::stream() << "Failed to load local metadata due to '"
+ << ex.toStatus().toString()
+ << "'.");
+ }
+}
+
ShardServerCatalogCacheLoader::Task::Task(
StatusWith<CollectionAndChangedChunks> statusWithCollectionAndChangedChunks,
ChunkVersion minimumQueryVersion,
diff --git a/src/mongo/db/s/shard_server_catalog_cache_loader.h b/src/mongo/db/s/shard_server_catalog_cache_loader.h
index b193ab6d422..43593811e42 100644
--- a/src/mongo/db/s/shard_server_catalog_cache_loader.h
+++ b/src/mongo/db/s/shard_server_catalog_cache_loader.h
@@ -218,6 +218,16 @@ private:
typedef std::map<NamespaceString, TaskList> TaskLists;
/**
+ * Retrieves chunk metadata from the shard's persisted metadata store, then passes the results
+ * to the 'callbackFn'.
+ */
+ void _runSecondaryGetChunksSince(
+ OperationContext* opCtx,
+ const NamespaceString& nss,
+ const ChunkVersion& catalogCacheSinceVersion,
+ stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn);
+
+ /**
* Refreshes chunk metadata from the config server's metadata store, and schedules maintenance
* of the shard's persisted metadata store with the latest updates retrieved from the config
* server.
@@ -292,6 +302,21 @@ private:
*/
bool _updatePersistedMetadata(OperationContext* opCtx, const NamespaceString& nss);
+ /**
+ * Attempt to read the collection and chunk metadata since version 'sinceVersion' from the shard
+ * persisted metadata store.
+ *
+ * Retries reading the metadata if the shard persisted metadata store becomes imcomplete due to
+ * concurrent updates being applied -- complete here means that every chunk range is accounted
+ * for.
+ *
+ * May return: a complete metadata update, which when applied to a complete metadata store up to
+ * 'sinceVersion' again produces a complete metadata store; or a NamespaceNotFound error, which
+ * means no metadata was found and the collection was dropped.
+ */
+ StatusWith<CollectionAndChangedChunks> _getCompletePersistedMetadataForSecondarySinceVersion(
+ OperationContext* opCtx, const NamespaceString& nss, const ChunkVersion& version);
+
// Used by the shard primary to retrieve chunk metadata from the config server.
const std::unique_ptr<CatalogCacheLoader> _configServerLoader;