diff options
author | jannaerin <golden.janna@gmail.com> | 2018-04-03 13:30:30 -0400 |
---|---|---|
committer | jannaerin <golden.janna@gmail.com> | 2018-04-05 20:29:05 -0400 |
commit | 7a48a263485a585dac1e1289c830eafd35a3d54b (patch) | |
tree | 38544c586218bfa72bf7fd842c3ed6019e788ae3 /src/mongo/db | |
parent | 265a38952f11a5d9a6144a22f10dc59b138e0b69 (diff) | |
download | mongo-7a48a263485a585dac1e1289c830eafd35a3d54b.tar.gz |
SERVER-34145 Persist database version on shard
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/namespace_string.cpp | 1 | ||||
-rw-r--r-- | src/mongo/db/namespace_string.h | 4 | ||||
-rw-r--r-- | src/mongo/db/s/flush_database_cache_updates_command.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/s/read_only_catalog_cache_loader.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/s/shard_metadata_util.cpp | 102 | ||||
-rw-r--r-- | src/mongo/db/s/shard_metadata_util.h | 28 | ||||
-rw-r--r-- | src/mongo/db/s/shard_server_catalog_cache_loader.cpp | 428 | ||||
-rw-r--r-- | src/mongo/db/s/shard_server_catalog_cache_loader.h | 177 |
8 files changed, 672 insertions, 76 deletions
diff --git a/src/mongo/db/namespace_string.cpp b/src/mongo/db/namespace_string.cpp index cdd87d63fff..2e59ef396bf 100644 --- a/src/mongo/db/namespace_string.cpp +++ b/src/mongo/db/namespace_string.cpp @@ -55,6 +55,7 @@ constexpr StringData NamespaceString::kLocalDb; constexpr StringData NamespaceString::kConfigDb; constexpr StringData NamespaceString::kSystemDotViewsCollectionName; constexpr StringData NamespaceString::kShardConfigCollectionsCollectionName; +constexpr StringData NamespaceString::kShardConfigDatabasesCollectionName; constexpr StringData NamespaceString::kSystemKeysCollectionName; const NamespaceString NamespaceString::kServerConfigurationNamespace(kServerConfiguration); diff --git a/src/mongo/db/namespace_string.h b/src/mongo/db/namespace_string.h index 3e5a0d07793..f9ee71c2637 100644 --- a/src/mongo/db/namespace_string.h +++ b/src/mongo/db/namespace_string.h @@ -65,6 +65,10 @@ public: static constexpr StringData kShardConfigCollectionsCollectionName = "config.cache.collections"_sd; + // Name for a shard's databases metadata collection, each document of which indicates the + // state of a specific database. + static constexpr StringData kShardConfigDatabasesCollectionName = "config.cache.databases"_sd; + // Name for causal consistency's key collection. static constexpr StringData kSystemKeysCollectionName = "admin.system.keys"_sd; diff --git a/src/mongo/db/s/flush_database_cache_updates_command.cpp b/src/mongo/db/s/flush_database_cache_updates_command.cpp index deae485cca9..362c45dc8e8 100644 --- a/src/mongo/db/s/flush_database_cache_updates_command.cpp +++ b/src/mongo/db/s/flush_database_cache_updates_command.cpp @@ -121,6 +121,12 @@ public: { AutoGetDb autoDb(opCtx, name, MODE_IS); + if (!autoDb.getDb()) { + uasserted(ErrorCodes::NamespaceNotFound, + str::stream() << "Can't issue _flushDatabaseCacheUpdates on the database " + << name + << " because it does not exist on this shard."); + } // If the primary is in the critical section, secondaries must wait for the commit to // finish on the primary in case a secondary's caller has an afterClusterTime inclusive diff --git a/src/mongo/db/s/read_only_catalog_cache_loader.cpp b/src/mongo/db/s/read_only_catalog_cache_loader.cpp index ccbb8e1965e..4c3c317bf15 100644 --- a/src/mongo/db/s/read_only_catalog_cache_loader.cpp +++ b/src/mongo/db/s/read_only_catalog_cache_loader.cpp @@ -53,7 +53,7 @@ std::shared_ptr<Notification<void>> ReadOnlyCatalogCacheLoader::getChunksSince( void ReadOnlyCatalogCacheLoader::getDatabase( StringData dbName, stdx::function<void(OperationContext*, StatusWith<DatabaseType>)> callbackFn) { - // stub + return _configServerLoader.getDatabase(dbName, callbackFn); } } // namespace mongo diff --git a/src/mongo/db/s/shard_metadata_util.cpp b/src/mongo/db/s/shard_metadata_util.cpp index de8140a76e1..21afbf7b306 100644 --- a/src/mongo/db/s/shard_metadata_util.cpp +++ b/src/mongo/db/s/shard_metadata_util.cpp @@ -40,6 +40,7 @@ #include "mongo/rpc/unique_message.h" #include "mongo/s/catalog/type_chunk.h" #include "mongo/s/catalog/type_shard_collection.h" +#include "mongo/s/catalog/type_shard_database.h" #include "mongo/s/chunk_version.h" #include "mongo/s/write_ops/batched_command_response.h" #include "mongo/stdx/memory.h" @@ -175,6 +176,39 @@ StatusWith<ShardCollectionType> readShardCollectionsEntry(OperationContext* opCt } } +StatusWith<ShardDatabaseType> readShardDatabasesEntry(OperationContext* opCtx, StringData dbName) { + Query fullQuery(BSON(ShardDatabaseType::name() << dbName.toString())); + + try { + DBDirectClient client(opCtx); + std::unique_ptr<DBClientCursor> cursor = + client.query(ShardDatabaseType::ConfigNS.ns(), fullQuery, 1); + if (!cursor) { + return Status(ErrorCodes::OperationFailed, + str::stream() << "Failed to establish a cursor for reading " + << ShardDatabaseType::ConfigNS.ns() + << " from local storage"); + } + + if (!cursor->more()) { + // The database has been dropped. + return Status(ErrorCodes::NamespaceNotFound, + str::stream() << "database " << dbName.toString() << " not found"); + } + + BSONObj document = cursor->nextSafe(); + auto statusWithDatabaseEntry = ShardDatabaseType::fromBSON(document); + if (!statusWithDatabaseEntry.isOK()) { + return statusWithDatabaseEntry.getStatus(); + } + + return statusWithDatabaseEntry.getValue(); + } catch (const DBException& ex) { + return ex.toStatus(str::stream() << "Failed to read the '" << dbName.toString() + << "' entry locally from config.databases"); + } +} + Status updateShardCollectionsEntry(OperationContext* opCtx, const BSONObj& query, const BSONObj& update, @@ -220,6 +254,49 @@ Status updateShardCollectionsEntry(OperationContext* opCtx, } } +Status updateShardDatabasesEntry(OperationContext* opCtx, + const BSONObj& query, + const BSONObj& update, + const BSONObj& inc, + const bool upsert) { + invariant(query.hasField("_id")); + if (upsert) { + // If upserting, this should be an update from the config server that does not have shard + // migration inc signal information. + invariant(inc.isEmpty()); + } + + try { + DBDirectClient client(opCtx); + + BSONObjBuilder builder; + if (!update.isEmpty()) { + // Want to modify the document if it already exists, not replace it. + builder.append("$set", update); + } + if (!inc.isEmpty()) { + builder.append("$inc", inc); + } + + auto commandResponse = client.runCommand([&] { + write_ops::Update updateOp(NamespaceString{ShardDatabaseType::ConfigNS}); + updateOp.setUpdates({[&] { + write_ops::UpdateOpEntry entry; + entry.setQ(query); + entry.setU(builder.obj()); + entry.setUpsert(upsert); + return entry; + }()}); + return updateOp.serialize({}); + }()); + uassertStatusOK(getStatusFromWriteCommandResponse(commandResponse->getCommandReply())); + + return Status::OK(); + } catch (const DBException& ex) { + return ex.toStatus(); + } +} + StatusWith<std::vector<ChunkType>> readShardChunks(OperationContext* opCtx, const NamespaceString& nss, const BSONObj& query, @@ -368,5 +445,30 @@ Status dropChunksAndDeleteCollectionsEntry(OperationContext* opCtx, const Namesp } } +Status deleteDatabasesEntry(OperationContext* opCtx, StringData dbName) { + try { + DBDirectClient client(opCtx); + + auto deleteCommandResponse = client.runCommand([&] { + write_ops::Delete deleteOp( + NamespaceString{NamespaceString::kShardConfigDatabasesCollectionName}); + deleteOp.setDeletes({[&] { + write_ops::DeleteOpEntry entry; + entry.setQ(BSON(ShardDatabaseType::name << dbName.toString())); + entry.setMulti(false); + return entry; + }()}); + return deleteOp.serialize({}); + }()); + uassertStatusOK( + getStatusFromWriteCommandResponse(deleteCommandResponse->getCommandReply())); + + LOG(1) << "Successfully cleared persisted metadata for db '" << dbName.toString() << "'."; + return Status::OK(); + } catch (const DBException& ex) { + return ex.toStatus(); + } +} + } // namespace shardmetadatautil } // namespace mongo diff --git a/src/mongo/db/s/shard_metadata_util.h b/src/mongo/db/s/shard_metadata_util.h index 9bd035a41b6..62c6c68262e 100644 --- a/src/mongo/db/s/shard_metadata_util.h +++ b/src/mongo/db/s/shard_metadata_util.h @@ -43,6 +43,7 @@ class CollectionMetadata; class NamespaceString; class OperationContext; class ShardCollectionType; +class ShardDatabaseType; template <typename T> class StatusWith; @@ -139,6 +140,11 @@ StatusWith<ShardCollectionType> readShardCollectionsEntry(OperationContext* opCt const NamespaceString& nss); /** + * Reads the shard server's databases collection entry identified by 'dbName'. + */ +StatusWith<ShardDatabaseType> readShardDatabasesEntry(OperationContext* opCtx, StringData dbName); + +/** * Updates the collections collection entry matching 'query' with 'update' using local write * concern. * @@ -157,6 +163,22 @@ Status updateShardCollectionsEntry(OperationContext* opCtx, const bool upsert); /** + * Updates the databases collection entry matching 'query' with 'update' using local write + * concern. + * + * Uses the $set operator on the update so that updates can be applied without resetting everything. + * 'inc' can be used to specify fields and their increments: it will be assigned to the $inc + * operator. + * + * 'inc' should not specify 'upsert' true. + */ +Status updateShardDatabasesEntry(OperationContext* opCtx, + const BSONObj& query, + const BSONObj& update, + const BSONObj& inc, + const bool upsert); + +/** * Reads the shard server's chunks collection corresponding to 'nss' for chunks matching 'query', * returning at most 'limit' chunks in 'sort' order. 'epoch' populates the returned chunks' version * fields, because we do not yet have UUIDs to replace epoches nor UUIDs associated with namespaces. @@ -201,5 +223,11 @@ Status updateShardChunks(OperationContext* opCtx, */ Status dropChunksAndDeleteCollectionsEntry(OperationContext* opCtx, const NamespaceString& nss); +/** + * Deletes locally persisted database metadata associated with 'dbName': removes the databases + * collection entry. + */ +Status deleteDatabasesEntry(OperationContext* opCtx, StringData dbName); + } // namespace shardmetadatautil } // namespace mongo diff --git a/src/mongo/db/s/shard_server_catalog_cache_loader.cpp b/src/mongo/db/s/shard_server_catalog_cache_loader.cpp index 23b9e85318d..d469bf78829 100644 --- a/src/mongo/db/s/shard_server_catalog_cache_loader.cpp +++ b/src/mongo/db/s/shard_server_catalog_cache_loader.cpp @@ -40,6 +40,7 @@ #include "mongo/db/s/shard_metadata_util.h" #include "mongo/db/s/sharding_state.h" #include "mongo/s/catalog/type_shard_collection.h" +#include "mongo/s/catalog/type_shard_database.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" #include "mongo/stdx/memory.h" @@ -120,6 +121,24 @@ Status persistCollectionAndChangedChunks(OperationContext* opCtx, } /** + * Takes a DatabaseType object and persists the changes to the shard's metadata + * collections. + */ +Status persistDbVersion(OperationContext* opCtx, const DatabaseType& dbt) { + // Update the databases collection entry for 'dbName' in case there are any new updates. + Status status = updateShardDatabasesEntry(opCtx, + BSON(ShardDatabaseType::name() << dbt.getName()), + dbt.toBSON(), + BSONObj(), + true /*upsert*/); + if (!status.isOK()) { + return status; + } + + return Status::OK(); +} + +/** * This function will throw on error! * * Retrieves the persisted max chunk version for 'nss', if there are any persisted chunks. If there @@ -130,7 +149,7 @@ Status persistCollectionAndChangedChunks(OperationContext* opCtx, * could be dropped and recreated between reading the collection epoch and retrieving the chunk, * which would make the returned ChunkVersion corrupt. */ -ChunkVersion getPersistedMaxVersion(OperationContext* opCtx, const NamespaceString& nss) { +ChunkVersion getPersistedMaxChunkVersion(OperationContext* opCtx, const NamespaceString& nss) { // Must read the collections entry to get the epoch to pass into ChunkType for shard's chunk // collection. auto statusWithCollection = readShardCollectionsEntry(opCtx, nss); @@ -168,6 +187,30 @@ ChunkVersion getPersistedMaxVersion(OperationContext* opCtx, const NamespaceStri /** * This function will throw on error! * + * Retrieves the persisted max db version for 'dbName', if there are any persisted dbs. If there + * are none -- meaning there's no persisted metadata for 'dbName' --, returns boost::optional. + */ +boost::optional<DatabaseVersion> getPersistedMaxDbVersion(OperationContext* opCtx, + StringData dbName) { + + auto statusWithDatabaseEntry = readShardDatabasesEntry(opCtx, dbName); + if (statusWithDatabaseEntry == ErrorCodes::NamespaceNotFound) { + // There is no persisted metadata. + return boost::none; + } + uassert(ErrorCodes::OperationFailed, + str::stream() << "Failed to read persisted database entry for db '" << dbName.toString() + << "' due to '" + << statusWithDatabaseEntry.getStatus().toString() + << "'.", + statusWithDatabaseEntry.isOK()); + + return statusWithDatabaseEntry.getValue().getDbVersion(); +} + +/** + * This function will throw on error! + * * Tries to find persisted chunk metadata with chunk versions GTE to 'version'. * * If 'version's epoch matches persisted metadata, returns persisted metadata GTE 'version'. @@ -397,8 +440,8 @@ void ShardServerCatalogCacheLoader::getDatabase( isPrimary = (_role == ReplicaSetRole::Primary); } - uassertStatusOK( - _threadPool.schedule([ this, dbName, callbackFn, isPrimary, currentTerm ]() noexcept { + uassertStatusOK(_threadPool.schedule( + [ this, name = dbName.toString(), callbackFn, isPrimary, currentTerm ]() noexcept { auto context = _contexts.makeOperationContext(*Client::getCurrent()); { @@ -418,7 +461,8 @@ void ShardServerCatalogCacheLoader::getDatabase( try { if (isPrimary) { - _schedulePrimaryGetDatabase(context.opCtx(), dbName, currentTerm, callbackFn); + _schedulePrimaryGetDatabase( + context.opCtx(), StringData(name), currentTerm, callbackFn); } } catch (const DBException& ex) { callbackFn(context.opCtx(), ex.toStatus()); @@ -426,18 +470,6 @@ void ShardServerCatalogCacheLoader::getDatabase( })); } -void ShardServerCatalogCacheLoader::_schedulePrimaryGetDatabase( - OperationContext* opCtx, - StringData dbName, - long long termScheduled, - stdx::function<void(OperationContext*, StatusWith<DatabaseType>)> callbackFn) { - - auto remoteRefreshCallbackFn = [](OperationContext* opCtx, - StatusWith<DatabaseType> swDatabaseType) {}; - - _configServerLoader->getDatabase(dbName, remoteRefreshCallbackFn); -} - void ShardServerCatalogCacheLoader::waitForCollectionFlush(OperationContext* opCtx, const NamespaceString& nss) { stdx::unique_lock<stdx::mutex> lg(_mutex); @@ -451,10 +483,10 @@ void ShardServerCatalogCacheLoader::waitForCollectionFlush(OperationContext* opC << " because the node's replication role changed.", _role == ReplicaSetRole::Primary && _term == initialTerm); - auto it = _taskLists.find(nss); + auto it = _collAndChunkTaskLists.find(nss); // If there are no tasks for the specified namespace, everything must have been completed - if (it == _taskLists.end()) + if (it == _collAndChunkTaskLists.end()) return; auto& taskList = it->second; @@ -490,15 +522,54 @@ void ShardServerCatalogCacheLoader::waitForCollectionFlush(OperationContext* opC void ShardServerCatalogCacheLoader::waitForDatabaseFlush(OperationContext* opCtx, StringData dbName) { + stdx::unique_lock<stdx::mutex> lg(_mutex); const auto initialTerm = _term; - uassert(ErrorCodes::NotMaster, - str::stream() << "Unable to wait for database metadata flush for " << dbName.toString() - << " because the node's replication role changed.", - _role == ReplicaSetRole::Primary && _term == initialTerm); + boost::optional<uint64_t> taskNumToWait; + + while (true) { + uassert(ErrorCodes::NotMaster, + str::stream() << "Unable to wait for database metadata flush for " + << dbName.toString() + << " because the node's replication role changed.", + _role == ReplicaSetRole::Primary && _term == initialTerm); + + auto it = _dbTaskLists.find(dbName.toString()); + + // If there are no tasks for the specified namespace, everything must have been completed + if (it == _dbTaskLists.end()) + return; + + auto& taskList = it->second; + + if (!taskNumToWait) { + const auto& lastTask = taskList.back(); + taskNumToWait = lastTask.taskNum; + } else { + const auto& activeTask = taskList.front(); - return; + if (activeTask.taskNum > *taskNumToWait) { + auto secondTaskIt = std::next(taskList.begin()); + + // Because of an optimization where a namespace drop clears all tasks except the + // active it is possible that the task number we are waiting on will never actually + // be written. Because of this we move the task number to the drop which can only be + // in the active task or in the one after the active. + if (activeTask.dropped) { + taskNumToWait = activeTask.taskNum; + } else if (secondTaskIt != taskList.end() && secondTaskIt->dropped) { + taskNumToWait = secondTaskIt->taskNum; + } else { + return; + } + } + } + + // It is not safe to use taskList after this call, because it will unlock and lock the tasks + // mutex, so we just loop around. + taskList.waitForActiveTaskCompletion(lg); + } } void ShardServerCatalogCacheLoader::_runSecondaryGetChunksSince( @@ -526,9 +597,9 @@ void ShardServerCatalogCacheLoader::_schedulePrimaryGetChunksSince( const ChunkVersion maxLoaderVersion = [&] { { stdx::lock_guard<stdx::mutex> lock(_mutex); - auto taskListIt = _taskLists.find(nss); + auto taskListIt = _collAndChunkTaskLists.find(nss); - if (taskListIt != _taskLists.end() && + if (taskListIt != _collAndChunkTaskLists.end() && taskListIt->second.hasTasksFromThisTerm(termScheduled)) { // Enqueued tasks have the latest metadata return taskListIt->second.getHighestVersionEnqueued(); @@ -536,7 +607,7 @@ void ShardServerCatalogCacheLoader::_schedulePrimaryGetChunksSince( } // If there are no enqueued tasks, get the max persisted - return getPersistedMaxVersion(opCtx, nss); + return getPersistedMaxChunkVersion(opCtx, nss); }(); auto remoteRefreshCallbackFn = [this, @@ -550,8 +621,10 @@ void ShardServerCatalogCacheLoader::_schedulePrimaryGetChunksSince( StatusWith<CollectionAndChangedChunks> swCollectionAndChangedChunks) { if (swCollectionAndChangedChunks == ErrorCodes::NamespaceNotFound) { - Status scheduleStatus = _ensureMajorityPrimaryAndScheduleTask( - opCtx, nss, Task{swCollectionAndChangedChunks, maxLoaderVersion, termScheduled}); + Status scheduleStatus = _ensureMajorityPrimaryAndScheduleCollAndChunksTask( + opCtx, + nss, + collAndChunkTask{swCollectionAndChangedChunks, maxLoaderVersion, termScheduled}); if (!scheduleStatus.isOK()) { callbackFn(opCtx, scheduleStatus); notify->set(); @@ -577,10 +650,11 @@ void ShardServerCatalogCacheLoader::_schedulePrimaryGetChunksSince( } else { if ((collAndChunks.epoch != maxLoaderVersion.epoch()) || (collAndChunks.changedChunks.back().getVersion() > maxLoaderVersion)) { - Status scheduleStatus = _ensureMajorityPrimaryAndScheduleTask( + Status scheduleStatus = _ensureMajorityPrimaryAndScheduleCollAndChunksTask( opCtx, nss, - Task{swCollectionAndChangedChunks, maxLoaderVersion, termScheduled}); + collAndChunkTask{ + swCollectionAndChangedChunks, maxLoaderVersion, termScheduled}); if (!scheduleStatus.isOK()) { callbackFn(opCtx, scheduleStatus); notify->set(); @@ -616,6 +690,68 @@ void ShardServerCatalogCacheLoader::_schedulePrimaryGetChunksSince( _configServerLoader->getChunksSince(nss, maxLoaderVersion, remoteRefreshCallbackFn); } +void ShardServerCatalogCacheLoader::_schedulePrimaryGetDatabase( + OperationContext* opCtx, + StringData dbName, + long long termScheduled, + stdx::function<void(OperationContext*, StatusWith<DatabaseType>)> callbackFn) { + + // Get the max version the loader has. + boost::optional<DatabaseVersion> maxLoaderVersion = [&] { + { + stdx::lock_guard<stdx::mutex> lock(_mutex); + auto taskListIt = _dbTaskLists.find(dbName.toString()); + + if (taskListIt != _dbTaskLists.end() && + taskListIt->second.hasTasksFromThisTerm(termScheduled)) { + // Enqueued tasks have the latest metadata + return taskListIt->second.getHighestVersionEnqueued(); + } + } + + return getPersistedMaxDbVersion(opCtx, dbName); + }(); + + auto remoteRefreshCallbackFn = + [ this, name = dbName.toString(), maxLoaderVersion, termScheduled, callbackFn ]( + OperationContext * opCtx, StatusWith<DatabaseType> swDatabaseType) { + + if (swDatabaseType == ErrorCodes::NamespaceNotFound) { + + Status scheduleStatus = _ensureMajorityPrimaryAndScheduleDbTask( + opCtx, name, dbTask{swDatabaseType, maxLoaderVersion, termScheduled}); + if (!scheduleStatus.isOK()) { + callbackFn(opCtx, scheduleStatus); + return; + } + + log() << "Cache loader remotely refreshed for database " << name + << " and found the database has been dropped."; + + } else if (swDatabaseType.isOK()) { + auto& dbType = swDatabaseType.getValue(); + + if (!bool(maxLoaderVersion) || (dbType.getVersion() > maxLoaderVersion.get())) { + + Status scheduleStatus = _ensureMajorityPrimaryAndScheduleDbTask( + opCtx, name, dbTask{swDatabaseType, maxLoaderVersion, termScheduled}); + if (!scheduleStatus.isOK()) { + callbackFn(opCtx, scheduleStatus); + return; + } + } + + log() << "Cache loader remotely refreshed for database " << name << " and found " + << dbType.toBSON(); + } + + // Complete the callbackFn work. + callbackFn(opCtx, std::move(swDatabaseType)); + }; + + _configServerLoader->getDatabase(dbName, remoteRefreshCallbackFn); +} + StatusWith<CollectionAndChangedChunks> ShardServerCatalogCacheLoader::_getLoaderMetadata( OperationContext* opCtx, const NamespaceString& nss, @@ -696,9 +832,9 @@ std::pair<bool, CollectionAndChangedChunks> ShardServerCatalogCacheLoader::_getE const ChunkVersion& catalogCacheSinceVersion, const long long term) { stdx::unique_lock<stdx::mutex> lock(_mutex); - auto taskListIt = _taskLists.find(nss); + auto taskListIt = _collAndChunkTaskLists.find(nss); - if (taskListIt == _taskLists.end()) { + if (taskListIt == _collAndChunkTaskLists.end()) { return std::make_pair(false, CollectionAndChangedChunks()); } else if (!taskListIt->second.hasTasksFromThisTerm(term)) { // If task list does not have a term that matches, there's no valid task data to collect. @@ -726,8 +862,8 @@ std::pair<bool, CollectionAndChangedChunks> ShardServerCatalogCacheLoader::_getE return std::make_pair(true, collAndChunks); } -Status ShardServerCatalogCacheLoader::_ensureMajorityPrimaryAndScheduleTask( - OperationContext* opCtx, const NamespaceString& nss, Task task) { +Status ShardServerCatalogCacheLoader::_ensureMajorityPrimaryAndScheduleCollAndChunksTask( + OperationContext* opCtx, const NamespaceString& nss, collAndChunkTask task) { Status linearizableReadStatus = waitForLinearizableReadConcern(opCtx); if (!linearizableReadStatus.isOK()) { return linearizableReadStatus.withContext( @@ -737,18 +873,49 @@ Status ShardServerCatalogCacheLoader::_ensureMajorityPrimaryAndScheduleTask( stdx::lock_guard<stdx::mutex> lock(_mutex); - const bool wasEmpty = _taskLists[nss].empty(); - _taskLists[nss].addTask(std::move(task)); + const bool wasEmpty = _collAndChunkTaskLists[nss].empty(); + _collAndChunkTaskLists[nss].addTask(std::move(task)); if (wasEmpty) { - Status status = _threadPool.schedule([this, nss]() { _runTasks(nss); }); + Status status = _threadPool.schedule([this, nss]() { _runCollAndChunksTasks(nss); }); if (!status.isOK()) { log() << "Cache loader failed to schedule persisted metadata update" << " task for namespace '" << nss << "' due to '" << redact(status) << "'. Clearing task list so that scheduling" << " will be attempted by the next caller to refresh this namespace."; stdx::lock_guard<stdx::mutex> lock(_mutex); - _taskLists.erase(nss); + _collAndChunkTaskLists.erase(nss); + } + return status; + } + + return Status::OK(); +} + +Status ShardServerCatalogCacheLoader::_ensureMajorityPrimaryAndScheduleDbTask( + OperationContext* opCtx, StringData dbName, dbTask task) { + Status linearizableReadStatus = waitForLinearizableReadConcern(opCtx); + if (!linearizableReadStatus.isOK()) { + return linearizableReadStatus.withContext( + "Unable to schedule routing table update because this is not the majority primary and " + "may not have the latest data."); + } + + stdx::lock_guard<stdx::mutex> lock(_mutex); + + const bool wasEmpty = _dbTaskLists[dbName.toString()].empty(); + _dbTaskLists[dbName.toString()].addTask(std::move(task)); + + if (wasEmpty) { + Status status = _threadPool.schedule( + [ this, name = dbName.toString() ]() { _runDbTasks(StringData(name)); }); + if (!status.isOK()) { + log() << "Cache loader failed to schedule persisted metadata update" + << " task for db '" << dbName.toString() << "' due to '" << redact(status) + << "'. Clearing task list so that scheduling" + << " will be attempted by the next caller to refresh this namespace."; + stdx::lock_guard<stdx::mutex> lock(_mutex); + _dbTaskLists.erase(dbName.toString()); } return status; } @@ -756,12 +923,12 @@ Status ShardServerCatalogCacheLoader::_ensureMajorityPrimaryAndScheduleTask( return Status::OK(); } -void ShardServerCatalogCacheLoader::_runTasks(const NamespaceString& nss) { +void ShardServerCatalogCacheLoader::_runCollAndChunksTasks(const NamespaceString& nss) { auto context = _contexts.makeOperationContext(*Client::getCurrent()); bool taskFinished = false; try { - _updatePersistedMetadata(context.opCtx(), nss); + _updatePersistedCollAndChunksMetadata(context.opCtx(), nss); taskFinished = true; } catch (const DBException& ex) { Status exceptionStatus = ex.toStatus(); @@ -780,29 +947,72 @@ void ShardServerCatalogCacheLoader::_runTasks(const NamespaceString& nss) { // If task completed successfully, remove it from work queue if (taskFinished) { - _taskLists[nss].pop_front(); + _collAndChunkTaskLists[nss].pop_front(); } // Schedule more work if there is any - if (!_taskLists[nss].empty()) { - Status status = _threadPool.schedule([this, nss]() { _runTasks(nss); }); + if (!_collAndChunkTaskLists[nss].empty()) { + Status status = _threadPool.schedule([this, nss]() { _runCollAndChunksTasks(nss); }); if (!status.isOK()) { log() << "Cache loader failed to schedule a persisted metadata update" << " task for namespace '" << nss << "' due to '" << redact(status) << "'. Clearing task list so that scheduling will be attempted by the next" << " caller to refresh this namespace."; - _taskLists.erase(nss); + _collAndChunkTaskLists.erase(nss); } } else { - _taskLists.erase(nss); + _collAndChunkTaskLists.erase(nss); } } -void ShardServerCatalogCacheLoader::_updatePersistedMetadata(OperationContext* opCtx, - const NamespaceString& nss) { +void ShardServerCatalogCacheLoader::_runDbTasks(StringData dbName) { + auto context = _contexts.makeOperationContext(*Client::getCurrent()); + + bool taskFinished = false; + try { + _updatePersistedDbMetadata(context.opCtx(), dbName); + taskFinished = true; + } catch (const DBException& ex) { + Status exceptionStatus = ex.toStatus(); + + // This thread must stop if we are shutting down + if (ErrorCodes::isShutdownError(exceptionStatus.code())) { + log() << "Failed to persist metadata update for db '" << dbName.toString() + << "' due to shutdown."; + return; + } + + log() << redact(exceptionStatus); + } + + stdx::lock_guard<stdx::mutex> lock(_mutex); + + // If task completed successfully, remove it from work queue + if (taskFinished) { + _dbTaskLists[dbName.toString()].pop_front(); + } + + // Schedule more work if there is any + if (!_dbTaskLists[dbName.toString()].empty()) { + Status status = _threadPool.schedule( + [ this, name = dbName.toString() ]() { _runDbTasks(StringData(name)); }); + if (!status.isOK()) { + log() << "Cache loader failed to schedule a persisted metadata update" + << " task for namespace '" << dbName.toString() << "' due to '" << redact(status) + << "'. Clearing task list so that scheduling will be attempted by the next" + << " caller to refresh this namespace."; + _dbTaskLists.erase(dbName.toString()); + } + } else { + _dbTaskLists.erase(dbName.toString()); + } +} + +void ShardServerCatalogCacheLoader::_updatePersistedCollAndChunksMetadata( + OperationContext* opCtx, const NamespaceString& nss) { stdx::unique_lock<stdx::mutex> lock(_mutex); - const Task& task = _taskLists[nss].front(); + const collAndChunkTask& task = _collAndChunkTaskLists[nss].front(); invariant(task.dropped || !task.collectionAndChangedChunks->changedChunks.empty()); // If this task is from an old term and no longer valid, do not execute and return true so that @@ -837,6 +1047,38 @@ void ShardServerCatalogCacheLoader::_updatePersistedMetadata(OperationContext* o << task.minQueryVersion << "' to collection version '" << task.maxQueryVersion << "'."; } +void ShardServerCatalogCacheLoader::_updatePersistedDbMetadata(OperationContext* opCtx, + StringData dbName) { + stdx::unique_lock<stdx::mutex> lock(_mutex); + + const dbTask& task = _dbTaskLists[dbName.toString()].front(); + + // If this task is from an old term and no longer valid, do not execute and return true so that + // the task gets removed from the task list + if (task.termCreated != _term) { + return; + } + + lock.unlock(); + + // Check if this is a drop task + if (task.dropped) { + // The database was dropped. The persisted metadata for the collection must be cleared. + uassertStatusOKWithContext(deleteDatabasesEntry(opCtx, dbName), + str::stream() << "Failed to clear persisted metadata for db '" + << dbName.toString() + << "'. Will be retried."); + return; + } + + uassertStatusOKWithContext(persistDbVersion(opCtx, task.databaseType.get()), + str::stream() << "Failed to update the persisted metadata for db '" + << dbName.toString() + << "'. Will be retried."); + + log() << "Successfully updated persisted metadata for db '" << dbName.toString(); +} + CollectionAndChangedChunks ShardServerCatalogCacheLoader::_getCompletePersistedMetadataForSecondarySinceVersion( OperationContext* opCtx, const NamespaceString& nss, const ChunkVersion& version) { @@ -875,7 +1117,7 @@ ShardServerCatalogCacheLoader::_getCompletePersistedMetadataForSecondarySinceVer } } -ShardServerCatalogCacheLoader::Task::Task( +ShardServerCatalogCacheLoader::collAndChunkTask::collAndChunkTask( StatusWith<CollectionAndChangedChunks> statusWithCollectionAndChangedChunks, ChunkVersion minimumQueryVersion, long long currentTerm) @@ -893,10 +1135,28 @@ ShardServerCatalogCacheLoader::Task::Task( } } -ShardServerCatalogCacheLoader::TaskList::TaskList() +ShardServerCatalogCacheLoader::dbTask::dbTask(StatusWith<DatabaseType> swDatabaseType, + boost::optional<DatabaseVersion> minimumVersion, + long long currentTerm) + : taskNum(taskIdGenerator.fetchAndAdd(1)), + minVersion(minimumVersion), + termCreated(currentTerm) { + if (swDatabaseType.isOK()) { + databaseType = std::move(swDatabaseType.getValue()); + maxVersion = databaseType->getVersion(); + } else { + invariant(swDatabaseType == ErrorCodes::NamespaceNotFound); + dropped = true; + } +} + +ShardServerCatalogCacheLoader::CollAndChunkTaskList::CollAndChunkTaskList() + : _activeTaskCompletedCondVar(std::make_shared<stdx::condition_variable>()) {} + +ShardServerCatalogCacheLoader::DbTaskList::DbTaskList() : _activeTaskCompletedCondVar(std::make_shared<stdx::condition_variable>()) {} -void ShardServerCatalogCacheLoader::TaskList::addTask(Task task) { +void ShardServerCatalogCacheLoader::CollAndChunkTaskList::addTask(collAndChunkTask task) { if (_tasks.empty()) { _tasks.emplace_back(std::move(task)); return; @@ -924,13 +1184,47 @@ void ShardServerCatalogCacheLoader::TaskList::addTask(Task task) { } } -void ShardServerCatalogCacheLoader::TaskList::pop_front() { +void ShardServerCatalogCacheLoader::DbTaskList::addTask(dbTask task) { + if (_tasks.empty()) { + _tasks.emplace_back(std::move(task)); + + return; + } + + if (task.dropped) { + invariant(_tasks.back().maxVersion == task.minVersion); + + // As an optimization, on collection drop, clear any pending tasks in order to prevent any + // throw-away work from executing. Because we have no way to differentiate whether the + // active tasks is currently being operated on by a thread or not, we must leave the front + // intact. + _tasks.erase(std::next(_tasks.begin()), _tasks.end()); + + // No need to schedule a drop if one is already currently active. + if (!_tasks.front().dropped) { + _tasks.emplace_back(std::move(task)); + } + } else { + // Tasks must have contiguous versions, unless a complete reload occurs. + invariant(!bool(task.minVersion) || + (_tasks.back().maxVersion.get() == task.minVersion.get())); + _tasks.emplace_back(std::move(task)); + } +} + +void ShardServerCatalogCacheLoader::CollAndChunkTaskList::pop_front() { + invariant(!_tasks.empty()); + _tasks.pop_front(); + _activeTaskCompletedCondVar->notify_all(); +} + +void ShardServerCatalogCacheLoader::DbTaskList::pop_front() { invariant(!_tasks.empty()); _tasks.pop_front(); _activeTaskCompletedCondVar->notify_all(); } -void ShardServerCatalogCacheLoader::TaskList::waitForActiveTaskCompletion( +void ShardServerCatalogCacheLoader::CollAndChunkTaskList::waitForActiveTaskCompletion( stdx::unique_lock<stdx::mutex>& lg) { // Increase the use_count of the condition variable shared pointer, because the entire task list // might get deleted during the unlocked interval @@ -938,17 +1232,39 @@ void ShardServerCatalogCacheLoader::TaskList::waitForActiveTaskCompletion( condVar->wait(lg); } -bool ShardServerCatalogCacheLoader::TaskList::hasTasksFromThisTerm(long long term) const { +void ShardServerCatalogCacheLoader::DbTaskList::waitForActiveTaskCompletion( + stdx::unique_lock<stdx::mutex>& lg) { + // Increase the use_count of the condition variable shared pointer, because the entire task list + // might get deleted during the unlocked interval + auto condVar = _activeTaskCompletedCondVar; + condVar->wait(lg); +} + +bool ShardServerCatalogCacheLoader::CollAndChunkTaskList::hasTasksFromThisTerm( + long long term) const { invariant(!_tasks.empty()); return _tasks.back().termCreated == term; } -ChunkVersion ShardServerCatalogCacheLoader::TaskList::getHighestVersionEnqueued() const { +bool ShardServerCatalogCacheLoader::DbTaskList::hasTasksFromThisTerm(long long term) const { + invariant(!_tasks.empty()); + return _tasks.back().termCreated == term; +} + +ChunkVersion ShardServerCatalogCacheLoader::CollAndChunkTaskList::getHighestVersionEnqueued() + const { invariant(!_tasks.empty()); return _tasks.back().maxQueryVersion; } -CollectionAndChangedChunks ShardServerCatalogCacheLoader::TaskList::getEnqueuedMetadataForTerm( +boost::optional<DatabaseVersion> +ShardServerCatalogCacheLoader::DbTaskList::getHighestVersionEnqueued() const { + invariant(!_tasks.empty()); + return _tasks.back().maxVersion; +} + +CollectionAndChangedChunks +ShardServerCatalogCacheLoader::CollAndChunkTaskList::getEnqueuedMetadataForTerm( const long long term) const { CollectionAndChangedChunks collAndChunks; for (const auto& task : _tasks) { diff --git a/src/mongo/db/s/shard_server_catalog_cache_loader.h b/src/mongo/db/s/shard_server_catalog_cache_loader.h index 4ce23b39dc0..87bd3dea31c 100644 --- a/src/mongo/db/s/shard_server_catalog_cache_loader.h +++ b/src/mongo/db/s/shard_server_catalog_cache_loader.h @@ -107,9 +107,9 @@ private: * apply a set up updated chunks to the shard persisted metadata store or to drop the persisted * metadata for a specific collection. */ - struct Task { - MONGO_DISALLOW_COPYING(Task); - Task(Task&&) = default; + struct collAndChunkTask { + MONGO_DISALLOW_COPYING(collAndChunkTask); + collAndChunkTask(collAndChunkTask&&) = default; /** * Initializes a task for either dropping or updating the persisted metadata for the @@ -124,9 +124,10 @@ private: * 'maxQueryVersion' is either set to the highest chunk version in * 'collectionAndChangedChunks' or ChunkVersion::UNSHARDED(). */ - Task(StatusWith<CollectionAndChangedChunks> statusWithCollectionAndChangedChunks, - ChunkVersion minimumQueryVersion, - long long currentTerm); + collAndChunkTask( + StatusWith<CollectionAndChangedChunks> statusWithCollectionAndChangedChunks, + ChunkVersion minimumQueryVersion, + long long currentTerm); // Always-incrementing task number to uniquely identify different tasks uint64_t taskNum; @@ -136,12 +137,13 @@ private: // The highest version that the loader had before going to the config server's metadata // store for updated chunks. - // Used by the TaskList below to enforce consistent updates are applied. + // Used by the CollAndChunkTaskList below to enforce consistent updates are applied. ChunkVersion minQueryVersion; // Either the highest chunk version in 'collectionAndChangedChunks' or the same as // 'minQueryVersion' if 'dropped' is true. - // Used by the TaskList below to enforce consistent updates are applied. + // Used by the CollAndChunkTaskList below to enforce consistent updates are + // applied. ChunkVersion maxQueryVersion; // Indicates whether the collection metadata must be cleared. @@ -152,6 +154,55 @@ private: }; /** + * This represents an update task for the persisted database metadata. The task will either be + * to + * persist an update to the shard persisted metadata store or to drop the persisted + * metadata for a specific database. + */ + struct dbTask { + MONGO_DISALLOW_COPYING(dbTask); + dbTask(dbTask&&) = default; + + /** + * Initializes a task for either dropping or updating the persisted metadata for the + * associated database. Which type of task is determined by the Status of 'swDatabaseType', + * whether it is NamespaceNotFound or OK. + * + * Note: swDatabaseType must always be NamespaceNotFound or OK, otherwise the constructor + * will invariant because there is no task to complete. + * + * 'databaseType' is only initialized if 'dropped' is false. + * 'minimumVersion' sets 'minVersion'. + * 'maxVersion' is either set to the db version in 'databaseType' or boost::none. + */ + dbTask(StatusWith<DatabaseType> swDatabaseType, + boost::optional<DatabaseVersion> minimumVersion, + long long currentTerm); + + // Always-incrementing task number to uniquely identify different tasks + uint64_t taskNum; + + // Database metadata update to be applied to the shard persisted metadata store. + boost::optional<DatabaseType> databaseType; + + // The highest version that the loader had before going to the config server's metadata + // store for the updated db version. + // Used by the dbTaskList below to enforce consistent updates are applied. + boost::optional<DatabaseVersion> minVersion; + + // Either the highest database version in 'databaseType' or the same as + // 'minVersion' if 'dropped' is true. + // Used by the DbTaskList below to enforce consistent updates are applied. + boost::optional<DatabaseVersion> maxVersion; + + // Indicates whether the database metadata must be cleared. + bool dropped{false}; + + // The term in which the loader scheduled this task. + uint32_t termCreated; + }; + + /** * A list (work queue) of updates to apply to the shard persisted metadata store for a specific * collection. Enforces that tasks that are added to the list are either consistent: * @@ -161,9 +212,9 @@ private: * * minQueryVersion == ChunkVersion::UNSHARDED(). */ - class TaskList { + class CollAndChunkTaskList { public: - TaskList(); + CollAndChunkTaskList(); /** * Adds 'task' to the back of the 'tasks' list. @@ -172,7 +223,7 @@ private: * don't waste time applying changes we will just delete. If the one remaining task in the * list is already a drop task, the new one isn't added because it is redundant. */ - void addTask(Task task); + void addTask(collAndChunkTask task); auto& front() { invariant(!_tasks.empty()); @@ -229,14 +280,94 @@ private: CollectionAndChangedChunks getEnqueuedMetadataForTerm(const long long term) const; private: - std::list<Task> _tasks{}; + std::list<collAndChunkTask> _tasks{}; // Condition variable which will be signaled whenever the active task from the tasks list is // completed. Must be used in conjunction with the loader's mutex. std::shared_ptr<stdx::condition_variable> _activeTaskCompletedCondVar; }; - typedef std::map<NamespaceString, TaskList> TaskLists; + /** + * A list (work queue) of updates to apply to the shard persisted metadata store for a specific + * database. Enforces that tasks that are added to the list are either consistent: + * + * tasks[i].minVersion == tasks[i-1].maxVersion. + * + * or applying a complete update from the minumum version, where + * + * minVersion == boost::none. + */ + class DbTaskList { + public: + DbTaskList(); + + /** + * Adds 'task' to the back of the 'tasks' list. + * + * If 'task' is a drop task, clears 'tasks' except for the front active task, so that we + * don't waste time applying changes we will just delete. If the one remaining task in the + * list is already a drop task, the new one isn't added because it is redundant. + */ + void addTask(dbTask task); + + auto& front() { + invariant(!_tasks.empty()); + return _tasks.front(); + } + + auto& back() { + invariant(!_tasks.empty()); + return _tasks.back(); + } + + auto begin() { + invariant(!_tasks.empty()); + return _tasks.begin(); + } + + auto end() { + invariant(!_tasks.empty()); + return _tasks.end(); + } + + void pop_front(); + + bool empty() const { + return _tasks.empty(); + } + + /** + * Must only be called if there is an active task. Behaves like a condition variable and + * will be signaled when the active task has been completed. + * + * NOTE: Because this call unlocks and locks the provided mutex, it is not safe to use the + * same task object on which it was called because it might have been deleted during the + * unlocked period. + */ + void waitForActiveTaskCompletion(stdx::unique_lock<stdx::mutex>& lg); + + /** + * Checks whether 'term' matches the term of the latest task in the task list. This is + * useful to check whether the task list has outdated data that's no longer valid to use in + * the current/new term specified by 'term'. + */ + bool hasTasksFromThisTerm(long long term) const; + + /** + * Gets the last task's highest version -- this is the most up to date version. + */ + boost::optional<DatabaseVersion> getHighestVersionEnqueued() const; + + private: + std::list<dbTask> _tasks{}; + + // Condition variable which will be signaled whenever the active task from the tasks list is + // completed. Must be used in conjunction with the loader's mutex. + std::shared_ptr<stdx::condition_variable> _activeTaskCompletedCondVar; + }; + + typedef std::map<NamespaceString, CollAndChunkTaskList> CollAndChunkTaskLists; + typedef std::map<std::string, DbTaskList> DbTaskLists; /** * Forces the primary to refresh its metadata for 'nss' and waits until this node's metadata @@ -327,16 +458,21 @@ private: * * Only run on the shard primary. */ - Status _ensureMajorityPrimaryAndScheduleTask(OperationContext* opCtx, - const NamespaceString& nss, - Task task); + Status _ensureMajorityPrimaryAndScheduleCollAndChunksTask(OperationContext* opCtx, + const NamespaceString& nss, + collAndChunkTask task); + Status _ensureMajorityPrimaryAndScheduleDbTask(OperationContext* opCtx, + StringData dbName, + dbTask task); /** * Schedules tasks in the 'nss' task list to execute until the task list is depleted. * * Only run on the shard primary. */ - void _runTasks(const NamespaceString& nss); + void _runCollAndChunksTasks(const NamespaceString& nss); + + void _runDbTasks(StringData dbName); /** * Executes the task at the front of the task list for 'nss'. The task will either drop 'nss's @@ -344,7 +480,9 @@ private: * * Only run on the shard primary. */ - void _updatePersistedMetadata(OperationContext* opCtx, const NamespaceString& nss); + void _updatePersistedCollAndChunksMetadata(OperationContext* opCtx, const NamespaceString& nss); + + void _updatePersistedDbMetadata(OperationContext* opCtx, StringData dbName); /** * Attempts to read the collection and chunk metadata since 'version' from the shard persisted @@ -370,7 +508,8 @@ private: stdx::mutex _mutex; // Map to track in progress persisted cache updates on the shard primary. - TaskLists _taskLists; + CollAndChunkTaskLists _collAndChunkTaskLists; + DbTaskLists _dbTaskLists; // This value is bumped every time the set of currently scheduled tasks should no longer be // running. This includes, replica set state transitions and shutdown. |