summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorjannaerin <golden.janna@gmail.com>2018-04-03 13:30:30 -0400
committerjannaerin <golden.janna@gmail.com>2018-04-05 20:29:05 -0400
commit7a48a263485a585dac1e1289c830eafd35a3d54b (patch)
tree38544c586218bfa72bf7fd842c3ed6019e788ae3 /src
parent265a38952f11a5d9a6144a22f10dc59b138e0b69 (diff)
downloadmongo-7a48a263485a585dac1e1289c830eafd35a3d54b.tar.gz
SERVER-34145 Persist database version on shard
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/namespace_string.cpp1
-rw-r--r--src/mongo/db/namespace_string.h4
-rw-r--r--src/mongo/db/s/flush_database_cache_updates_command.cpp6
-rw-r--r--src/mongo/db/s/read_only_catalog_cache_loader.cpp2
-rw-r--r--src/mongo/db/s/shard_metadata_util.cpp102
-rw-r--r--src/mongo/db/s/shard_metadata_util.h28
-rw-r--r--src/mongo/db/s/shard_server_catalog_cache_loader.cpp428
-rw-r--r--src/mongo/db/s/shard_server_catalog_cache_loader.h177
-rw-r--r--src/mongo/s/SConscript1
-rw-r--r--src/mongo/s/catalog/type_shard_database.cpp132
-rw-r--r--src/mongo/s/catalog/type_shard_database.h121
-rw-r--r--src/mongo/s/config_server_catalog_cache_loader.cpp16
12 files changed, 938 insertions, 80 deletions
diff --git a/src/mongo/db/namespace_string.cpp b/src/mongo/db/namespace_string.cpp
index cdd87d63fff..2e59ef396bf 100644
--- a/src/mongo/db/namespace_string.cpp
+++ b/src/mongo/db/namespace_string.cpp
@@ -55,6 +55,7 @@ constexpr StringData NamespaceString::kLocalDb;
constexpr StringData NamespaceString::kConfigDb;
constexpr StringData NamespaceString::kSystemDotViewsCollectionName;
constexpr StringData NamespaceString::kShardConfigCollectionsCollectionName;
+constexpr StringData NamespaceString::kShardConfigDatabasesCollectionName;
constexpr StringData NamespaceString::kSystemKeysCollectionName;
const NamespaceString NamespaceString::kServerConfigurationNamespace(kServerConfiguration);
diff --git a/src/mongo/db/namespace_string.h b/src/mongo/db/namespace_string.h
index 3e5a0d07793..f9ee71c2637 100644
--- a/src/mongo/db/namespace_string.h
+++ b/src/mongo/db/namespace_string.h
@@ -65,6 +65,10 @@ public:
static constexpr StringData kShardConfigCollectionsCollectionName =
"config.cache.collections"_sd;
+ // Name for a shard's databases metadata collection, each document of which indicates the
+ // state of a specific database.
+ static constexpr StringData kShardConfigDatabasesCollectionName = "config.cache.databases"_sd;
+
// Name for causal consistency's key collection.
static constexpr StringData kSystemKeysCollectionName = "admin.system.keys"_sd;
diff --git a/src/mongo/db/s/flush_database_cache_updates_command.cpp b/src/mongo/db/s/flush_database_cache_updates_command.cpp
index deae485cca9..362c45dc8e8 100644
--- a/src/mongo/db/s/flush_database_cache_updates_command.cpp
+++ b/src/mongo/db/s/flush_database_cache_updates_command.cpp
@@ -121,6 +121,12 @@ public:
{
AutoGetDb autoDb(opCtx, name, MODE_IS);
+ if (!autoDb.getDb()) {
+ uasserted(ErrorCodes::NamespaceNotFound,
+ str::stream() << "Can't issue _flushDatabaseCacheUpdates on the database "
+ << name
+ << " because it does not exist on this shard.");
+ }
// If the primary is in the critical section, secondaries must wait for the commit to
// finish on the primary in case a secondary's caller has an afterClusterTime inclusive
diff --git a/src/mongo/db/s/read_only_catalog_cache_loader.cpp b/src/mongo/db/s/read_only_catalog_cache_loader.cpp
index ccbb8e1965e..4c3c317bf15 100644
--- a/src/mongo/db/s/read_only_catalog_cache_loader.cpp
+++ b/src/mongo/db/s/read_only_catalog_cache_loader.cpp
@@ -53,7 +53,7 @@ std::shared_ptr<Notification<void>> ReadOnlyCatalogCacheLoader::getChunksSince(
void ReadOnlyCatalogCacheLoader::getDatabase(
StringData dbName,
stdx::function<void(OperationContext*, StatusWith<DatabaseType>)> callbackFn) {
- // stub
+ return _configServerLoader.getDatabase(dbName, callbackFn);
}
} // namespace mongo
diff --git a/src/mongo/db/s/shard_metadata_util.cpp b/src/mongo/db/s/shard_metadata_util.cpp
index de8140a76e1..21afbf7b306 100644
--- a/src/mongo/db/s/shard_metadata_util.cpp
+++ b/src/mongo/db/s/shard_metadata_util.cpp
@@ -40,6 +40,7 @@
#include "mongo/rpc/unique_message.h"
#include "mongo/s/catalog/type_chunk.h"
#include "mongo/s/catalog/type_shard_collection.h"
+#include "mongo/s/catalog/type_shard_database.h"
#include "mongo/s/chunk_version.h"
#include "mongo/s/write_ops/batched_command_response.h"
#include "mongo/stdx/memory.h"
@@ -175,6 +176,39 @@ StatusWith<ShardCollectionType> readShardCollectionsEntry(OperationContext* opCt
}
}
+StatusWith<ShardDatabaseType> readShardDatabasesEntry(OperationContext* opCtx, StringData dbName) {
+ Query fullQuery(BSON(ShardDatabaseType::name() << dbName.toString()));
+
+ try {
+ DBDirectClient client(opCtx);
+ std::unique_ptr<DBClientCursor> cursor =
+ client.query(ShardDatabaseType::ConfigNS.ns(), fullQuery, 1);
+ if (!cursor) {
+ return Status(ErrorCodes::OperationFailed,
+ str::stream() << "Failed to establish a cursor for reading "
+ << ShardDatabaseType::ConfigNS.ns()
+ << " from local storage");
+ }
+
+ if (!cursor->more()) {
+ // The database has been dropped.
+ return Status(ErrorCodes::NamespaceNotFound,
+ str::stream() << "database " << dbName.toString() << " not found");
+ }
+
+ BSONObj document = cursor->nextSafe();
+ auto statusWithDatabaseEntry = ShardDatabaseType::fromBSON(document);
+ if (!statusWithDatabaseEntry.isOK()) {
+ return statusWithDatabaseEntry.getStatus();
+ }
+
+ return statusWithDatabaseEntry.getValue();
+ } catch (const DBException& ex) {
+ return ex.toStatus(str::stream() << "Failed to read the '" << dbName.toString()
+ << "' entry locally from config.databases");
+ }
+}
+
Status updateShardCollectionsEntry(OperationContext* opCtx,
const BSONObj& query,
const BSONObj& update,
@@ -220,6 +254,49 @@ Status updateShardCollectionsEntry(OperationContext* opCtx,
}
}
+Status updateShardDatabasesEntry(OperationContext* opCtx,
+ const BSONObj& query,
+ const BSONObj& update,
+ const BSONObj& inc,
+ const bool upsert) {
+ invariant(query.hasField("_id"));
+ if (upsert) {
+ // If upserting, this should be an update from the config server that does not have shard
+ // migration inc signal information.
+ invariant(inc.isEmpty());
+ }
+
+ try {
+ DBDirectClient client(opCtx);
+
+ BSONObjBuilder builder;
+ if (!update.isEmpty()) {
+ // Want to modify the document if it already exists, not replace it.
+ builder.append("$set", update);
+ }
+ if (!inc.isEmpty()) {
+ builder.append("$inc", inc);
+ }
+
+ auto commandResponse = client.runCommand([&] {
+ write_ops::Update updateOp(NamespaceString{ShardDatabaseType::ConfigNS});
+ updateOp.setUpdates({[&] {
+ write_ops::UpdateOpEntry entry;
+ entry.setQ(query);
+ entry.setU(builder.obj());
+ entry.setUpsert(upsert);
+ return entry;
+ }()});
+ return updateOp.serialize({});
+ }());
+ uassertStatusOK(getStatusFromWriteCommandResponse(commandResponse->getCommandReply()));
+
+ return Status::OK();
+ } catch (const DBException& ex) {
+ return ex.toStatus();
+ }
+}
+
StatusWith<std::vector<ChunkType>> readShardChunks(OperationContext* opCtx,
const NamespaceString& nss,
const BSONObj& query,
@@ -368,5 +445,30 @@ Status dropChunksAndDeleteCollectionsEntry(OperationContext* opCtx, const Namesp
}
}
+Status deleteDatabasesEntry(OperationContext* opCtx, StringData dbName) {
+ try {
+ DBDirectClient client(opCtx);
+
+ auto deleteCommandResponse = client.runCommand([&] {
+ write_ops::Delete deleteOp(
+ NamespaceString{NamespaceString::kShardConfigDatabasesCollectionName});
+ deleteOp.setDeletes({[&] {
+ write_ops::DeleteOpEntry entry;
+ entry.setQ(BSON(ShardDatabaseType::name << dbName.toString()));
+ entry.setMulti(false);
+ return entry;
+ }()});
+ return deleteOp.serialize({});
+ }());
+ uassertStatusOK(
+ getStatusFromWriteCommandResponse(deleteCommandResponse->getCommandReply()));
+
+ LOG(1) << "Successfully cleared persisted metadata for db '" << dbName.toString() << "'.";
+ return Status::OK();
+ } catch (const DBException& ex) {
+ return ex.toStatus();
+ }
+}
+
} // namespace shardmetadatautil
} // namespace mongo
diff --git a/src/mongo/db/s/shard_metadata_util.h b/src/mongo/db/s/shard_metadata_util.h
index 9bd035a41b6..62c6c68262e 100644
--- a/src/mongo/db/s/shard_metadata_util.h
+++ b/src/mongo/db/s/shard_metadata_util.h
@@ -43,6 +43,7 @@ class CollectionMetadata;
class NamespaceString;
class OperationContext;
class ShardCollectionType;
+class ShardDatabaseType;
template <typename T>
class StatusWith;
@@ -139,6 +140,11 @@ StatusWith<ShardCollectionType> readShardCollectionsEntry(OperationContext* opCt
const NamespaceString& nss);
/**
+ * Reads the shard server's databases collection entry identified by 'dbName'.
+ */
+StatusWith<ShardDatabaseType> readShardDatabasesEntry(OperationContext* opCtx, StringData dbName);
+
+/**
* Updates the collections collection entry matching 'query' with 'update' using local write
* concern.
*
@@ -157,6 +163,22 @@ Status updateShardCollectionsEntry(OperationContext* opCtx,
const bool upsert);
/**
+ * Updates the databases collection entry matching 'query' with 'update' using local write
+ * concern.
+ *
+ * Uses the $set operator on the update so that updates can be applied without resetting everything.
+ * 'inc' can be used to specify fields and their increments: it will be assigned to the $inc
+ * operator.
+ *
+ * 'inc' should not specify 'upsert' true.
+ */
+Status updateShardDatabasesEntry(OperationContext* opCtx,
+ const BSONObj& query,
+ const BSONObj& update,
+ const BSONObj& inc,
+ const bool upsert);
+
+/**
* Reads the shard server's chunks collection corresponding to 'nss' for chunks matching 'query',
* returning at most 'limit' chunks in 'sort' order. 'epoch' populates the returned chunks' version
* fields, because we do not yet have UUIDs to replace epoches nor UUIDs associated with namespaces.
@@ -201,5 +223,11 @@ Status updateShardChunks(OperationContext* opCtx,
*/
Status dropChunksAndDeleteCollectionsEntry(OperationContext* opCtx, const NamespaceString& nss);
+/**
+ * Deletes locally persisted database metadata associated with 'dbName': removes the databases
+ * collection entry.
+ */
+Status deleteDatabasesEntry(OperationContext* opCtx, StringData dbName);
+
} // namespace shardmetadatautil
} // namespace mongo
diff --git a/src/mongo/db/s/shard_server_catalog_cache_loader.cpp b/src/mongo/db/s/shard_server_catalog_cache_loader.cpp
index 23b9e85318d..d469bf78829 100644
--- a/src/mongo/db/s/shard_server_catalog_cache_loader.cpp
+++ b/src/mongo/db/s/shard_server_catalog_cache_loader.cpp
@@ -40,6 +40,7 @@
#include "mongo/db/s/shard_metadata_util.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/s/catalog/type_shard_collection.h"
+#include "mongo/s/catalog/type_shard_database.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/grid.h"
#include "mongo/stdx/memory.h"
@@ -120,6 +121,24 @@ Status persistCollectionAndChangedChunks(OperationContext* opCtx,
}
/**
+ * Takes a DatabaseType object and persists the changes to the shard's metadata
+ * collections.
+ */
+Status persistDbVersion(OperationContext* opCtx, const DatabaseType& dbt) {
+ // Update the databases collection entry for 'dbName' in case there are any new updates.
+ Status status = updateShardDatabasesEntry(opCtx,
+ BSON(ShardDatabaseType::name() << dbt.getName()),
+ dbt.toBSON(),
+ BSONObj(),
+ true /*upsert*/);
+ if (!status.isOK()) {
+ return status;
+ }
+
+ return Status::OK();
+}
+
+/**
* This function will throw on error!
*
* Retrieves the persisted max chunk version for 'nss', if there are any persisted chunks. If there
@@ -130,7 +149,7 @@ Status persistCollectionAndChangedChunks(OperationContext* opCtx,
* could be dropped and recreated between reading the collection epoch and retrieving the chunk,
* which would make the returned ChunkVersion corrupt.
*/
-ChunkVersion getPersistedMaxVersion(OperationContext* opCtx, const NamespaceString& nss) {
+ChunkVersion getPersistedMaxChunkVersion(OperationContext* opCtx, const NamespaceString& nss) {
// Must read the collections entry to get the epoch to pass into ChunkType for shard's chunk
// collection.
auto statusWithCollection = readShardCollectionsEntry(opCtx, nss);
@@ -168,6 +187,30 @@ ChunkVersion getPersistedMaxVersion(OperationContext* opCtx, const NamespaceStri
/**
* This function will throw on error!
*
+ * Retrieves the persisted max db version for 'dbName', if there are any persisted dbs. If there
+ * are none -- meaning there's no persisted metadata for 'dbName' --, returns boost::optional.
+ */
+boost::optional<DatabaseVersion> getPersistedMaxDbVersion(OperationContext* opCtx,
+ StringData dbName) {
+
+ auto statusWithDatabaseEntry = readShardDatabasesEntry(opCtx, dbName);
+ if (statusWithDatabaseEntry == ErrorCodes::NamespaceNotFound) {
+ // There is no persisted metadata.
+ return boost::none;
+ }
+ uassert(ErrorCodes::OperationFailed,
+ str::stream() << "Failed to read persisted database entry for db '" << dbName.toString()
+ << "' due to '"
+ << statusWithDatabaseEntry.getStatus().toString()
+ << "'.",
+ statusWithDatabaseEntry.isOK());
+
+ return statusWithDatabaseEntry.getValue().getDbVersion();
+}
+
+/**
+ * This function will throw on error!
+ *
* Tries to find persisted chunk metadata with chunk versions GTE to 'version'.
*
* If 'version's epoch matches persisted metadata, returns persisted metadata GTE 'version'.
@@ -397,8 +440,8 @@ void ShardServerCatalogCacheLoader::getDatabase(
isPrimary = (_role == ReplicaSetRole::Primary);
}
- uassertStatusOK(
- _threadPool.schedule([ this, dbName, callbackFn, isPrimary, currentTerm ]() noexcept {
+ uassertStatusOK(_threadPool.schedule(
+ [ this, name = dbName.toString(), callbackFn, isPrimary, currentTerm ]() noexcept {
auto context = _contexts.makeOperationContext(*Client::getCurrent());
{
@@ -418,7 +461,8 @@ void ShardServerCatalogCacheLoader::getDatabase(
try {
if (isPrimary) {
- _schedulePrimaryGetDatabase(context.opCtx(), dbName, currentTerm, callbackFn);
+ _schedulePrimaryGetDatabase(
+ context.opCtx(), StringData(name), currentTerm, callbackFn);
}
} catch (const DBException& ex) {
callbackFn(context.opCtx(), ex.toStatus());
@@ -426,18 +470,6 @@ void ShardServerCatalogCacheLoader::getDatabase(
}));
}
-void ShardServerCatalogCacheLoader::_schedulePrimaryGetDatabase(
- OperationContext* opCtx,
- StringData dbName,
- long long termScheduled,
- stdx::function<void(OperationContext*, StatusWith<DatabaseType>)> callbackFn) {
-
- auto remoteRefreshCallbackFn = [](OperationContext* opCtx,
- StatusWith<DatabaseType> swDatabaseType) {};
-
- _configServerLoader->getDatabase(dbName, remoteRefreshCallbackFn);
-}
-
void ShardServerCatalogCacheLoader::waitForCollectionFlush(OperationContext* opCtx,
const NamespaceString& nss) {
stdx::unique_lock<stdx::mutex> lg(_mutex);
@@ -451,10 +483,10 @@ void ShardServerCatalogCacheLoader::waitForCollectionFlush(OperationContext* opC
<< " because the node's replication role changed.",
_role == ReplicaSetRole::Primary && _term == initialTerm);
- auto it = _taskLists.find(nss);
+ auto it = _collAndChunkTaskLists.find(nss);
// If there are no tasks for the specified namespace, everything must have been completed
- if (it == _taskLists.end())
+ if (it == _collAndChunkTaskLists.end())
return;
auto& taskList = it->second;
@@ -490,15 +522,54 @@ void ShardServerCatalogCacheLoader::waitForCollectionFlush(OperationContext* opC
void ShardServerCatalogCacheLoader::waitForDatabaseFlush(OperationContext* opCtx,
StringData dbName) {
+
stdx::unique_lock<stdx::mutex> lg(_mutex);
const auto initialTerm = _term;
- uassert(ErrorCodes::NotMaster,
- str::stream() << "Unable to wait for database metadata flush for " << dbName.toString()
- << " because the node's replication role changed.",
- _role == ReplicaSetRole::Primary && _term == initialTerm);
+ boost::optional<uint64_t> taskNumToWait;
+
+ while (true) {
+ uassert(ErrorCodes::NotMaster,
+ str::stream() << "Unable to wait for database metadata flush for "
+ << dbName.toString()
+ << " because the node's replication role changed.",
+ _role == ReplicaSetRole::Primary && _term == initialTerm);
+
+ auto it = _dbTaskLists.find(dbName.toString());
+
+ // If there are no tasks for the specified namespace, everything must have been completed
+ if (it == _dbTaskLists.end())
+ return;
+
+ auto& taskList = it->second;
+
+ if (!taskNumToWait) {
+ const auto& lastTask = taskList.back();
+ taskNumToWait = lastTask.taskNum;
+ } else {
+ const auto& activeTask = taskList.front();
- return;
+ if (activeTask.taskNum > *taskNumToWait) {
+ auto secondTaskIt = std::next(taskList.begin());
+
+ // Because of an optimization where a namespace drop clears all tasks except the
+ // active it is possible that the task number we are waiting on will never actually
+ // be written. Because of this we move the task number to the drop which can only be
+ // in the active task or in the one after the active.
+ if (activeTask.dropped) {
+ taskNumToWait = activeTask.taskNum;
+ } else if (secondTaskIt != taskList.end() && secondTaskIt->dropped) {
+ taskNumToWait = secondTaskIt->taskNum;
+ } else {
+ return;
+ }
+ }
+ }
+
+ // It is not safe to use taskList after this call, because it will unlock and lock the tasks
+ // mutex, so we just loop around.
+ taskList.waitForActiveTaskCompletion(lg);
+ }
}
void ShardServerCatalogCacheLoader::_runSecondaryGetChunksSince(
@@ -526,9 +597,9 @@ void ShardServerCatalogCacheLoader::_schedulePrimaryGetChunksSince(
const ChunkVersion maxLoaderVersion = [&] {
{
stdx::lock_guard<stdx::mutex> lock(_mutex);
- auto taskListIt = _taskLists.find(nss);
+ auto taskListIt = _collAndChunkTaskLists.find(nss);
- if (taskListIt != _taskLists.end() &&
+ if (taskListIt != _collAndChunkTaskLists.end() &&
taskListIt->second.hasTasksFromThisTerm(termScheduled)) {
// Enqueued tasks have the latest metadata
return taskListIt->second.getHighestVersionEnqueued();
@@ -536,7 +607,7 @@ void ShardServerCatalogCacheLoader::_schedulePrimaryGetChunksSince(
}
// If there are no enqueued tasks, get the max persisted
- return getPersistedMaxVersion(opCtx, nss);
+ return getPersistedMaxChunkVersion(opCtx, nss);
}();
auto remoteRefreshCallbackFn = [this,
@@ -550,8 +621,10 @@ void ShardServerCatalogCacheLoader::_schedulePrimaryGetChunksSince(
StatusWith<CollectionAndChangedChunks> swCollectionAndChangedChunks) {
if (swCollectionAndChangedChunks == ErrorCodes::NamespaceNotFound) {
- Status scheduleStatus = _ensureMajorityPrimaryAndScheduleTask(
- opCtx, nss, Task{swCollectionAndChangedChunks, maxLoaderVersion, termScheduled});
+ Status scheduleStatus = _ensureMajorityPrimaryAndScheduleCollAndChunksTask(
+ opCtx,
+ nss,
+ collAndChunkTask{swCollectionAndChangedChunks, maxLoaderVersion, termScheduled});
if (!scheduleStatus.isOK()) {
callbackFn(opCtx, scheduleStatus);
notify->set();
@@ -577,10 +650,11 @@ void ShardServerCatalogCacheLoader::_schedulePrimaryGetChunksSince(
} else {
if ((collAndChunks.epoch != maxLoaderVersion.epoch()) ||
(collAndChunks.changedChunks.back().getVersion() > maxLoaderVersion)) {
- Status scheduleStatus = _ensureMajorityPrimaryAndScheduleTask(
+ Status scheduleStatus = _ensureMajorityPrimaryAndScheduleCollAndChunksTask(
opCtx,
nss,
- Task{swCollectionAndChangedChunks, maxLoaderVersion, termScheduled});
+ collAndChunkTask{
+ swCollectionAndChangedChunks, maxLoaderVersion, termScheduled});
if (!scheduleStatus.isOK()) {
callbackFn(opCtx, scheduleStatus);
notify->set();
@@ -616,6 +690,68 @@ void ShardServerCatalogCacheLoader::_schedulePrimaryGetChunksSince(
_configServerLoader->getChunksSince(nss, maxLoaderVersion, remoteRefreshCallbackFn);
}
+void ShardServerCatalogCacheLoader::_schedulePrimaryGetDatabase(
+ OperationContext* opCtx,
+ StringData dbName,
+ long long termScheduled,
+ stdx::function<void(OperationContext*, StatusWith<DatabaseType>)> callbackFn) {
+
+ // Get the max version the loader has.
+ boost::optional<DatabaseVersion> maxLoaderVersion = [&] {
+ {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ auto taskListIt = _dbTaskLists.find(dbName.toString());
+
+ if (taskListIt != _dbTaskLists.end() &&
+ taskListIt->second.hasTasksFromThisTerm(termScheduled)) {
+ // Enqueued tasks have the latest metadata
+ return taskListIt->second.getHighestVersionEnqueued();
+ }
+ }
+
+ return getPersistedMaxDbVersion(opCtx, dbName);
+ }();
+
+ auto remoteRefreshCallbackFn =
+ [ this, name = dbName.toString(), maxLoaderVersion, termScheduled, callbackFn ](
+ OperationContext * opCtx, StatusWith<DatabaseType> swDatabaseType) {
+
+ if (swDatabaseType == ErrorCodes::NamespaceNotFound) {
+
+ Status scheduleStatus = _ensureMajorityPrimaryAndScheduleDbTask(
+ opCtx, name, dbTask{swDatabaseType, maxLoaderVersion, termScheduled});
+ if (!scheduleStatus.isOK()) {
+ callbackFn(opCtx, scheduleStatus);
+ return;
+ }
+
+ log() << "Cache loader remotely refreshed for database " << name
+ << " and found the database has been dropped.";
+
+ } else if (swDatabaseType.isOK()) {
+ auto& dbType = swDatabaseType.getValue();
+
+ if (!bool(maxLoaderVersion) || (dbType.getVersion() > maxLoaderVersion.get())) {
+
+ Status scheduleStatus = _ensureMajorityPrimaryAndScheduleDbTask(
+ opCtx, name, dbTask{swDatabaseType, maxLoaderVersion, termScheduled});
+ if (!scheduleStatus.isOK()) {
+ callbackFn(opCtx, scheduleStatus);
+ return;
+ }
+ }
+
+ log() << "Cache loader remotely refreshed for database " << name << " and found "
+ << dbType.toBSON();
+ }
+
+ // Complete the callbackFn work.
+ callbackFn(opCtx, std::move(swDatabaseType));
+ };
+
+ _configServerLoader->getDatabase(dbName, remoteRefreshCallbackFn);
+}
+
StatusWith<CollectionAndChangedChunks> ShardServerCatalogCacheLoader::_getLoaderMetadata(
OperationContext* opCtx,
const NamespaceString& nss,
@@ -696,9 +832,9 @@ std::pair<bool, CollectionAndChangedChunks> ShardServerCatalogCacheLoader::_getE
const ChunkVersion& catalogCacheSinceVersion,
const long long term) {
stdx::unique_lock<stdx::mutex> lock(_mutex);
- auto taskListIt = _taskLists.find(nss);
+ auto taskListIt = _collAndChunkTaskLists.find(nss);
- if (taskListIt == _taskLists.end()) {
+ if (taskListIt == _collAndChunkTaskLists.end()) {
return std::make_pair(false, CollectionAndChangedChunks());
} else if (!taskListIt->second.hasTasksFromThisTerm(term)) {
// If task list does not have a term that matches, there's no valid task data to collect.
@@ -726,8 +862,8 @@ std::pair<bool, CollectionAndChangedChunks> ShardServerCatalogCacheLoader::_getE
return std::make_pair(true, collAndChunks);
}
-Status ShardServerCatalogCacheLoader::_ensureMajorityPrimaryAndScheduleTask(
- OperationContext* opCtx, const NamespaceString& nss, Task task) {
+Status ShardServerCatalogCacheLoader::_ensureMajorityPrimaryAndScheduleCollAndChunksTask(
+ OperationContext* opCtx, const NamespaceString& nss, collAndChunkTask task) {
Status linearizableReadStatus = waitForLinearizableReadConcern(opCtx);
if (!linearizableReadStatus.isOK()) {
return linearizableReadStatus.withContext(
@@ -737,18 +873,49 @@ Status ShardServerCatalogCacheLoader::_ensureMajorityPrimaryAndScheduleTask(
stdx::lock_guard<stdx::mutex> lock(_mutex);
- const bool wasEmpty = _taskLists[nss].empty();
- _taskLists[nss].addTask(std::move(task));
+ const bool wasEmpty = _collAndChunkTaskLists[nss].empty();
+ _collAndChunkTaskLists[nss].addTask(std::move(task));
if (wasEmpty) {
- Status status = _threadPool.schedule([this, nss]() { _runTasks(nss); });
+ Status status = _threadPool.schedule([this, nss]() { _runCollAndChunksTasks(nss); });
if (!status.isOK()) {
log() << "Cache loader failed to schedule persisted metadata update"
<< " task for namespace '" << nss << "' due to '" << redact(status)
<< "'. Clearing task list so that scheduling"
<< " will be attempted by the next caller to refresh this namespace.";
stdx::lock_guard<stdx::mutex> lock(_mutex);
- _taskLists.erase(nss);
+ _collAndChunkTaskLists.erase(nss);
+ }
+ return status;
+ }
+
+ return Status::OK();
+}
+
+Status ShardServerCatalogCacheLoader::_ensureMajorityPrimaryAndScheduleDbTask(
+ OperationContext* opCtx, StringData dbName, dbTask task) {
+ Status linearizableReadStatus = waitForLinearizableReadConcern(opCtx);
+ if (!linearizableReadStatus.isOK()) {
+ return linearizableReadStatus.withContext(
+ "Unable to schedule routing table update because this is not the majority primary and "
+ "may not have the latest data.");
+ }
+
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+
+ const bool wasEmpty = _dbTaskLists[dbName.toString()].empty();
+ _dbTaskLists[dbName.toString()].addTask(std::move(task));
+
+ if (wasEmpty) {
+ Status status = _threadPool.schedule(
+ [ this, name = dbName.toString() ]() { _runDbTasks(StringData(name)); });
+ if (!status.isOK()) {
+ log() << "Cache loader failed to schedule persisted metadata update"
+ << " task for db '" << dbName.toString() << "' due to '" << redact(status)
+ << "'. Clearing task list so that scheduling"
+ << " will be attempted by the next caller to refresh this namespace.";
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ _dbTaskLists.erase(dbName.toString());
}
return status;
}
@@ -756,12 +923,12 @@ Status ShardServerCatalogCacheLoader::_ensureMajorityPrimaryAndScheduleTask(
return Status::OK();
}
-void ShardServerCatalogCacheLoader::_runTasks(const NamespaceString& nss) {
+void ShardServerCatalogCacheLoader::_runCollAndChunksTasks(const NamespaceString& nss) {
auto context = _contexts.makeOperationContext(*Client::getCurrent());
bool taskFinished = false;
try {
- _updatePersistedMetadata(context.opCtx(), nss);
+ _updatePersistedCollAndChunksMetadata(context.opCtx(), nss);
taskFinished = true;
} catch (const DBException& ex) {
Status exceptionStatus = ex.toStatus();
@@ -780,29 +947,72 @@ void ShardServerCatalogCacheLoader::_runTasks(const NamespaceString& nss) {
// If task completed successfully, remove it from work queue
if (taskFinished) {
- _taskLists[nss].pop_front();
+ _collAndChunkTaskLists[nss].pop_front();
}
// Schedule more work if there is any
- if (!_taskLists[nss].empty()) {
- Status status = _threadPool.schedule([this, nss]() { _runTasks(nss); });
+ if (!_collAndChunkTaskLists[nss].empty()) {
+ Status status = _threadPool.schedule([this, nss]() { _runCollAndChunksTasks(nss); });
if (!status.isOK()) {
log() << "Cache loader failed to schedule a persisted metadata update"
<< " task for namespace '" << nss << "' due to '" << redact(status)
<< "'. Clearing task list so that scheduling will be attempted by the next"
<< " caller to refresh this namespace.";
- _taskLists.erase(nss);
+ _collAndChunkTaskLists.erase(nss);
}
} else {
- _taskLists.erase(nss);
+ _collAndChunkTaskLists.erase(nss);
}
}
-void ShardServerCatalogCacheLoader::_updatePersistedMetadata(OperationContext* opCtx,
- const NamespaceString& nss) {
+void ShardServerCatalogCacheLoader::_runDbTasks(StringData dbName) {
+ auto context = _contexts.makeOperationContext(*Client::getCurrent());
+
+ bool taskFinished = false;
+ try {
+ _updatePersistedDbMetadata(context.opCtx(), dbName);
+ taskFinished = true;
+ } catch (const DBException& ex) {
+ Status exceptionStatus = ex.toStatus();
+
+ // This thread must stop if we are shutting down
+ if (ErrorCodes::isShutdownError(exceptionStatus.code())) {
+ log() << "Failed to persist metadata update for db '" << dbName.toString()
+ << "' due to shutdown.";
+ return;
+ }
+
+ log() << redact(exceptionStatus);
+ }
+
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+
+ // If task completed successfully, remove it from work queue
+ if (taskFinished) {
+ _dbTaskLists[dbName.toString()].pop_front();
+ }
+
+ // Schedule more work if there is any
+ if (!_dbTaskLists[dbName.toString()].empty()) {
+ Status status = _threadPool.schedule(
+ [ this, name = dbName.toString() ]() { _runDbTasks(StringData(name)); });
+ if (!status.isOK()) {
+ log() << "Cache loader failed to schedule a persisted metadata update"
+ << " task for namespace '" << dbName.toString() << "' due to '" << redact(status)
+ << "'. Clearing task list so that scheduling will be attempted by the next"
+ << " caller to refresh this namespace.";
+ _dbTaskLists.erase(dbName.toString());
+ }
+ } else {
+ _dbTaskLists.erase(dbName.toString());
+ }
+}
+
+void ShardServerCatalogCacheLoader::_updatePersistedCollAndChunksMetadata(
+ OperationContext* opCtx, const NamespaceString& nss) {
stdx::unique_lock<stdx::mutex> lock(_mutex);
- const Task& task = _taskLists[nss].front();
+ const collAndChunkTask& task = _collAndChunkTaskLists[nss].front();
invariant(task.dropped || !task.collectionAndChangedChunks->changedChunks.empty());
// If this task is from an old term and no longer valid, do not execute and return true so that
@@ -837,6 +1047,38 @@ void ShardServerCatalogCacheLoader::_updatePersistedMetadata(OperationContext* o
<< task.minQueryVersion << "' to collection version '" << task.maxQueryVersion << "'.";
}
+void ShardServerCatalogCacheLoader::_updatePersistedDbMetadata(OperationContext* opCtx,
+ StringData dbName) {
+ stdx::unique_lock<stdx::mutex> lock(_mutex);
+
+ const dbTask& task = _dbTaskLists[dbName.toString()].front();
+
+ // If this task is from an old term and no longer valid, do not execute and return true so that
+ // the task gets removed from the task list
+ if (task.termCreated != _term) {
+ return;
+ }
+
+ lock.unlock();
+
+ // Check if this is a drop task
+ if (task.dropped) {
+ // The database was dropped. The persisted metadata for the collection must be cleared.
+ uassertStatusOKWithContext(deleteDatabasesEntry(opCtx, dbName),
+ str::stream() << "Failed to clear persisted metadata for db '"
+ << dbName.toString()
+ << "'. Will be retried.");
+ return;
+ }
+
+ uassertStatusOKWithContext(persistDbVersion(opCtx, task.databaseType.get()),
+ str::stream() << "Failed to update the persisted metadata for db '"
+ << dbName.toString()
+ << "'. Will be retried.");
+
+ log() << "Successfully updated persisted metadata for db '" << dbName.toString();
+}
+
CollectionAndChangedChunks
ShardServerCatalogCacheLoader::_getCompletePersistedMetadataForSecondarySinceVersion(
OperationContext* opCtx, const NamespaceString& nss, const ChunkVersion& version) {
@@ -875,7 +1117,7 @@ ShardServerCatalogCacheLoader::_getCompletePersistedMetadataForSecondarySinceVer
}
}
-ShardServerCatalogCacheLoader::Task::Task(
+ShardServerCatalogCacheLoader::collAndChunkTask::collAndChunkTask(
StatusWith<CollectionAndChangedChunks> statusWithCollectionAndChangedChunks,
ChunkVersion minimumQueryVersion,
long long currentTerm)
@@ -893,10 +1135,28 @@ ShardServerCatalogCacheLoader::Task::Task(
}
}
-ShardServerCatalogCacheLoader::TaskList::TaskList()
+ShardServerCatalogCacheLoader::dbTask::dbTask(StatusWith<DatabaseType> swDatabaseType,
+ boost::optional<DatabaseVersion> minimumVersion,
+ long long currentTerm)
+ : taskNum(taskIdGenerator.fetchAndAdd(1)),
+ minVersion(minimumVersion),
+ termCreated(currentTerm) {
+ if (swDatabaseType.isOK()) {
+ databaseType = std::move(swDatabaseType.getValue());
+ maxVersion = databaseType->getVersion();
+ } else {
+ invariant(swDatabaseType == ErrorCodes::NamespaceNotFound);
+ dropped = true;
+ }
+}
+
+ShardServerCatalogCacheLoader::CollAndChunkTaskList::CollAndChunkTaskList()
+ : _activeTaskCompletedCondVar(std::make_shared<stdx::condition_variable>()) {}
+
+ShardServerCatalogCacheLoader::DbTaskList::DbTaskList()
: _activeTaskCompletedCondVar(std::make_shared<stdx::condition_variable>()) {}
-void ShardServerCatalogCacheLoader::TaskList::addTask(Task task) {
+void ShardServerCatalogCacheLoader::CollAndChunkTaskList::addTask(collAndChunkTask task) {
if (_tasks.empty()) {
_tasks.emplace_back(std::move(task));
return;
@@ -924,13 +1184,47 @@ void ShardServerCatalogCacheLoader::TaskList::addTask(Task task) {
}
}
-void ShardServerCatalogCacheLoader::TaskList::pop_front() {
+void ShardServerCatalogCacheLoader::DbTaskList::addTask(dbTask task) {
+ if (_tasks.empty()) {
+ _tasks.emplace_back(std::move(task));
+
+ return;
+ }
+
+ if (task.dropped) {
+ invariant(_tasks.back().maxVersion == task.minVersion);
+
+ // As an optimization, on collection drop, clear any pending tasks in order to prevent any
+ // throw-away work from executing. Because we have no way to differentiate whether the
+ // active tasks is currently being operated on by a thread or not, we must leave the front
+ // intact.
+ _tasks.erase(std::next(_tasks.begin()), _tasks.end());
+
+ // No need to schedule a drop if one is already currently active.
+ if (!_tasks.front().dropped) {
+ _tasks.emplace_back(std::move(task));
+ }
+ } else {
+ // Tasks must have contiguous versions, unless a complete reload occurs.
+ invariant(!bool(task.minVersion) ||
+ (_tasks.back().maxVersion.get() == task.minVersion.get()));
+ _tasks.emplace_back(std::move(task));
+ }
+}
+
+void ShardServerCatalogCacheLoader::CollAndChunkTaskList::pop_front() {
+ invariant(!_tasks.empty());
+ _tasks.pop_front();
+ _activeTaskCompletedCondVar->notify_all();
+}
+
+void ShardServerCatalogCacheLoader::DbTaskList::pop_front() {
invariant(!_tasks.empty());
_tasks.pop_front();
_activeTaskCompletedCondVar->notify_all();
}
-void ShardServerCatalogCacheLoader::TaskList::waitForActiveTaskCompletion(
+void ShardServerCatalogCacheLoader::CollAndChunkTaskList::waitForActiveTaskCompletion(
stdx::unique_lock<stdx::mutex>& lg) {
// Increase the use_count of the condition variable shared pointer, because the entire task list
// might get deleted during the unlocked interval
@@ -938,17 +1232,39 @@ void ShardServerCatalogCacheLoader::TaskList::waitForActiveTaskCompletion(
condVar->wait(lg);
}
-bool ShardServerCatalogCacheLoader::TaskList::hasTasksFromThisTerm(long long term) const {
+void ShardServerCatalogCacheLoader::DbTaskList::waitForActiveTaskCompletion(
+ stdx::unique_lock<stdx::mutex>& lg) {
+ // Increase the use_count of the condition variable shared pointer, because the entire task list
+ // might get deleted during the unlocked interval
+ auto condVar = _activeTaskCompletedCondVar;
+ condVar->wait(lg);
+}
+
+bool ShardServerCatalogCacheLoader::CollAndChunkTaskList::hasTasksFromThisTerm(
+ long long term) const {
invariant(!_tasks.empty());
return _tasks.back().termCreated == term;
}
-ChunkVersion ShardServerCatalogCacheLoader::TaskList::getHighestVersionEnqueued() const {
+bool ShardServerCatalogCacheLoader::DbTaskList::hasTasksFromThisTerm(long long term) const {
+ invariant(!_tasks.empty());
+ return _tasks.back().termCreated == term;
+}
+
+ChunkVersion ShardServerCatalogCacheLoader::CollAndChunkTaskList::getHighestVersionEnqueued()
+ const {
invariant(!_tasks.empty());
return _tasks.back().maxQueryVersion;
}
-CollectionAndChangedChunks ShardServerCatalogCacheLoader::TaskList::getEnqueuedMetadataForTerm(
+boost::optional<DatabaseVersion>
+ShardServerCatalogCacheLoader::DbTaskList::getHighestVersionEnqueued() const {
+ invariant(!_tasks.empty());
+ return _tasks.back().maxVersion;
+}
+
+CollectionAndChangedChunks
+ShardServerCatalogCacheLoader::CollAndChunkTaskList::getEnqueuedMetadataForTerm(
const long long term) const {
CollectionAndChangedChunks collAndChunks;
for (const auto& task : _tasks) {
diff --git a/src/mongo/db/s/shard_server_catalog_cache_loader.h b/src/mongo/db/s/shard_server_catalog_cache_loader.h
index 4ce23b39dc0..87bd3dea31c 100644
--- a/src/mongo/db/s/shard_server_catalog_cache_loader.h
+++ b/src/mongo/db/s/shard_server_catalog_cache_loader.h
@@ -107,9 +107,9 @@ private:
* apply a set up updated chunks to the shard persisted metadata store or to drop the persisted
* metadata for a specific collection.
*/
- struct Task {
- MONGO_DISALLOW_COPYING(Task);
- Task(Task&&) = default;
+ struct collAndChunkTask {
+ MONGO_DISALLOW_COPYING(collAndChunkTask);
+ collAndChunkTask(collAndChunkTask&&) = default;
/**
* Initializes a task for either dropping or updating the persisted metadata for the
@@ -124,9 +124,10 @@ private:
* 'maxQueryVersion' is either set to the highest chunk version in
* 'collectionAndChangedChunks' or ChunkVersion::UNSHARDED().
*/
- Task(StatusWith<CollectionAndChangedChunks> statusWithCollectionAndChangedChunks,
- ChunkVersion minimumQueryVersion,
- long long currentTerm);
+ collAndChunkTask(
+ StatusWith<CollectionAndChangedChunks> statusWithCollectionAndChangedChunks,
+ ChunkVersion minimumQueryVersion,
+ long long currentTerm);
// Always-incrementing task number to uniquely identify different tasks
uint64_t taskNum;
@@ -136,12 +137,13 @@ private:
// The highest version that the loader had before going to the config server's metadata
// store for updated chunks.
- // Used by the TaskList below to enforce consistent updates are applied.
+ // Used by the CollAndChunkTaskList below to enforce consistent updates are applied.
ChunkVersion minQueryVersion;
// Either the highest chunk version in 'collectionAndChangedChunks' or the same as
// 'minQueryVersion' if 'dropped' is true.
- // Used by the TaskList below to enforce consistent updates are applied.
+ // Used by the CollAndChunkTaskList below to enforce consistent updates are
+ // applied.
ChunkVersion maxQueryVersion;
// Indicates whether the collection metadata must be cleared.
@@ -152,6 +154,55 @@ private:
};
/**
+ * This represents an update task for the persisted database metadata. The task will either be
+ * to
+ * persist an update to the shard persisted metadata store or to drop the persisted
+ * metadata for a specific database.
+ */
+ struct dbTask {
+ MONGO_DISALLOW_COPYING(dbTask);
+ dbTask(dbTask&&) = default;
+
+ /**
+ * Initializes a task for either dropping or updating the persisted metadata for the
+ * associated database. Which type of task is determined by the Status of 'swDatabaseType',
+ * whether it is NamespaceNotFound or OK.
+ *
+ * Note: swDatabaseType must always be NamespaceNotFound or OK, otherwise the constructor
+ * will invariant because there is no task to complete.
+ *
+ * 'databaseType' is only initialized if 'dropped' is false.
+ * 'minimumVersion' sets 'minVersion'.
+ * 'maxVersion' is either set to the db version in 'databaseType' or boost::none.
+ */
+ dbTask(StatusWith<DatabaseType> swDatabaseType,
+ boost::optional<DatabaseVersion> minimumVersion,
+ long long currentTerm);
+
+ // Always-incrementing task number to uniquely identify different tasks
+ uint64_t taskNum;
+
+ // Database metadata update to be applied to the shard persisted metadata store.
+ boost::optional<DatabaseType> databaseType;
+
+ // The highest version that the loader had before going to the config server's metadata
+ // store for the updated db version.
+ // Used by the dbTaskList below to enforce consistent updates are applied.
+ boost::optional<DatabaseVersion> minVersion;
+
+ // Either the highest database version in 'databaseType' or the same as
+ // 'minVersion' if 'dropped' is true.
+ // Used by the DbTaskList below to enforce consistent updates are applied.
+ boost::optional<DatabaseVersion> maxVersion;
+
+ // Indicates whether the database metadata must be cleared.
+ bool dropped{false};
+
+ // The term in which the loader scheduled this task.
+ uint32_t termCreated;
+ };
+
+ /**
* A list (work queue) of updates to apply to the shard persisted metadata store for a specific
* collection. Enforces that tasks that are added to the list are either consistent:
*
@@ -161,9 +212,9 @@ private:
*
* minQueryVersion == ChunkVersion::UNSHARDED().
*/
- class TaskList {
+ class CollAndChunkTaskList {
public:
- TaskList();
+ CollAndChunkTaskList();
/**
* Adds 'task' to the back of the 'tasks' list.
@@ -172,7 +223,7 @@ private:
* don't waste time applying changes we will just delete. If the one remaining task in the
* list is already a drop task, the new one isn't added because it is redundant.
*/
- void addTask(Task task);
+ void addTask(collAndChunkTask task);
auto& front() {
invariant(!_tasks.empty());
@@ -229,14 +280,94 @@ private:
CollectionAndChangedChunks getEnqueuedMetadataForTerm(const long long term) const;
private:
- std::list<Task> _tasks{};
+ std::list<collAndChunkTask> _tasks{};
// Condition variable which will be signaled whenever the active task from the tasks list is
// completed. Must be used in conjunction with the loader's mutex.
std::shared_ptr<stdx::condition_variable> _activeTaskCompletedCondVar;
};
- typedef std::map<NamespaceString, TaskList> TaskLists;
+ /**
+ * A list (work queue) of updates to apply to the shard persisted metadata store for a specific
+ * database. Enforces that tasks that are added to the list are either consistent:
+ *
+ * tasks[i].minVersion == tasks[i-1].maxVersion.
+ *
+ * or applying a complete update from the minumum version, where
+ *
+ * minVersion == boost::none.
+ */
+ class DbTaskList {
+ public:
+ DbTaskList();
+
+ /**
+ * Adds 'task' to the back of the 'tasks' list.
+ *
+ * If 'task' is a drop task, clears 'tasks' except for the front active task, so that we
+ * don't waste time applying changes we will just delete. If the one remaining task in the
+ * list is already a drop task, the new one isn't added because it is redundant.
+ */
+ void addTask(dbTask task);
+
+ auto& front() {
+ invariant(!_tasks.empty());
+ return _tasks.front();
+ }
+
+ auto& back() {
+ invariant(!_tasks.empty());
+ return _tasks.back();
+ }
+
+ auto begin() {
+ invariant(!_tasks.empty());
+ return _tasks.begin();
+ }
+
+ auto end() {
+ invariant(!_tasks.empty());
+ return _tasks.end();
+ }
+
+ void pop_front();
+
+ bool empty() const {
+ return _tasks.empty();
+ }
+
+ /**
+ * Must only be called if there is an active task. Behaves like a condition variable and
+ * will be signaled when the active task has been completed.
+ *
+ * NOTE: Because this call unlocks and locks the provided mutex, it is not safe to use the
+ * same task object on which it was called because it might have been deleted during the
+ * unlocked period.
+ */
+ void waitForActiveTaskCompletion(stdx::unique_lock<stdx::mutex>& lg);
+
+ /**
+ * Checks whether 'term' matches the term of the latest task in the task list. This is
+ * useful to check whether the task list has outdated data that's no longer valid to use in
+ * the current/new term specified by 'term'.
+ */
+ bool hasTasksFromThisTerm(long long term) const;
+
+ /**
+ * Gets the last task's highest version -- this is the most up to date version.
+ */
+ boost::optional<DatabaseVersion> getHighestVersionEnqueued() const;
+
+ private:
+ std::list<dbTask> _tasks{};
+
+ // Condition variable which will be signaled whenever the active task from the tasks list is
+ // completed. Must be used in conjunction with the loader's mutex.
+ std::shared_ptr<stdx::condition_variable> _activeTaskCompletedCondVar;
+ };
+
+ typedef std::map<NamespaceString, CollAndChunkTaskList> CollAndChunkTaskLists;
+ typedef std::map<std::string, DbTaskList> DbTaskLists;
/**
* Forces the primary to refresh its metadata for 'nss' and waits until this node's metadata
@@ -327,16 +458,21 @@ private:
*
* Only run on the shard primary.
*/
- Status _ensureMajorityPrimaryAndScheduleTask(OperationContext* opCtx,
- const NamespaceString& nss,
- Task task);
+ Status _ensureMajorityPrimaryAndScheduleCollAndChunksTask(OperationContext* opCtx,
+ const NamespaceString& nss,
+ collAndChunkTask task);
+ Status _ensureMajorityPrimaryAndScheduleDbTask(OperationContext* opCtx,
+ StringData dbName,
+ dbTask task);
/**
* Schedules tasks in the 'nss' task list to execute until the task list is depleted.
*
* Only run on the shard primary.
*/
- void _runTasks(const NamespaceString& nss);
+ void _runCollAndChunksTasks(const NamespaceString& nss);
+
+ void _runDbTasks(StringData dbName);
/**
* Executes the task at the front of the task list for 'nss'. The task will either drop 'nss's
@@ -344,7 +480,9 @@ private:
*
* Only run on the shard primary.
*/
- void _updatePersistedMetadata(OperationContext* opCtx, const NamespaceString& nss);
+ void _updatePersistedCollAndChunksMetadata(OperationContext* opCtx, const NamespaceString& nss);
+
+ void _updatePersistedDbMetadata(OperationContext* opCtx, StringData dbName);
/**
* Attempts to read the collection and chunk metadata since 'version' from the shard persisted
@@ -370,7 +508,8 @@ private:
stdx::mutex _mutex;
// Map to track in progress persisted cache updates on the shard primary.
- TaskLists _taskLists;
+ CollAndChunkTaskLists _collAndChunkTaskLists;
+ DbTaskLists _dbTaskLists;
// This value is bumped every time the set of currently scheduled tasks should no longer be
// running. This includes, replica set state transitions and shutdown.
diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript
index 57e488774b3..b815dc7b38e 100644
--- a/src/mongo/s/SConscript
+++ b/src/mongo/s/SConscript
@@ -116,6 +116,7 @@ env.Library(
'catalog/type_mongos.cpp',
'catalog/type_shard.cpp',
'catalog/type_shard_collection.cpp',
+ 'catalog/type_shard_database.cpp',
'catalog/type_tags.cpp',
'chunk_version.cpp',
'request_types/add_shard_request_type.cpp',
diff --git a/src/mongo/s/catalog/type_shard_database.cpp b/src/mongo/s/catalog/type_shard_database.cpp
new file mode 100644
index 00000000000..5be668cceae
--- /dev/null
+++ b/src/mongo/s/catalog/type_shard_database.cpp
@@ -0,0 +1,132 @@
+/**
+ * Copyright (C) 2018 10gen Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/s/catalog/type_shard_database.h"
+
+#include "mongo/base/status_with.h"
+#include "mongo/bson/bsonobj.h"
+#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/bson/util/bson_extract.h"
+#include "mongo/s/catalog/type_database.h"
+#include "mongo/util/assert_util.h"
+
+namespace mongo {
+
+using std::string;
+
+const NamespaceString ShardDatabaseType::ConfigNS(
+ NamespaceString::kShardConfigDatabasesCollectionName);
+
+const BSONField<std::string> ShardDatabaseType::name("_id");
+const BSONField<DatabaseVersion> ShardDatabaseType::version("version");
+const BSONField<std::string> ShardDatabaseType::primary("primary");
+const BSONField<bool> ShardDatabaseType::partitioned("partitioned");
+const BSONField<int> ShardDatabaseType::enterCriticalSectionCounter("enterCriticalSectionCounter");
+
+ShardDatabaseType::ShardDatabaseType(const std::string dbName,
+ boost::optional<DatabaseVersion> version,
+ const ShardId primary,
+ bool partitioned)
+ : _name(dbName), _version(version), _primary(primary), _partitioned(partitioned) {}
+
+StatusWith<ShardDatabaseType> ShardDatabaseType::fromBSON(const BSONObj& source) {
+ std::string dbName;
+ {
+ Status status = bsonExtractStringField(source, name.name(), &dbName);
+ if (!status.isOK())
+ return status;
+ }
+
+ boost::optional<DatabaseVersion> dbVersion = boost::none;
+ {
+ BSONObj versionField = source.getObjectField("version");
+ // TODO: Parse this unconditionally once featureCompatibilityVersion 3.6 is no longer
+ // supported.
+ if (!versionField.isEmpty()) {
+ dbVersion = DatabaseVersion::parse(IDLParserErrorContext("DatabaseType"), versionField);
+ }
+ }
+
+ std::string dbPrimary;
+ {
+ Status status = bsonExtractStringField(source, primary.name(), &dbPrimary);
+ if (!status.isOK())
+ return status;
+ }
+
+ bool dbPartitioned;
+ {
+ Status status =
+ bsonExtractBooleanFieldWithDefault(source, partitioned.name(), false, &dbPartitioned);
+ if (!status.isOK())
+ return status;
+ }
+
+ ShardDatabaseType shardDatabaseType(dbName, dbVersion, dbPrimary, dbPartitioned);
+
+ return shardDatabaseType;
+}
+
+BSONObj ShardDatabaseType::toBSON() const {
+ BSONObjBuilder builder;
+
+ builder.append(name.name(), _name);
+ if (_version) {
+ builder.append(version.name(), _version->toBSON());
+ }
+ builder.append(primary.name(), _primary.toString());
+ builder.append(partitioned.name(), _partitioned);
+
+ return builder.obj();
+}
+
+std::string ShardDatabaseType::toString() const {
+ return toBSON().toString();
+}
+
+void ShardDatabaseType::setDbVersion(boost::optional<DatabaseVersion> version) {
+ _version = version;
+}
+
+void ShardDatabaseType::setDbName(const std::string& dbName) {
+ invariant(!dbName.empty());
+ _name = dbName;
+}
+
+void ShardDatabaseType::setPrimary(const ShardId& primary) {
+ invariant(primary.isValid());
+ _primary = primary;
+}
+
+void ShardDatabaseType::setPartitioned(bool partitioned) {
+ _partitioned = partitioned;
+}
+
+} // namespace mongo
diff --git a/src/mongo/s/catalog/type_shard_database.h b/src/mongo/s/catalog/type_shard_database.h
new file mode 100644
index 00000000000..f8b0209636d
--- /dev/null
+++ b/src/mongo/s/catalog/type_shard_database.h
@@ -0,0 +1,121 @@
+/**
+ * Copyright (C) 2018 10gen Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include <boost/optional.hpp>
+#include <string>
+
+#include "mongo/db/jsobj.h"
+#include "mongo/s/database_version_gen.h"
+#include "mongo/s/shard_id.h"
+
+namespace mongo {
+
+class Status;
+template <typename T>
+class StatusWith;
+
+/**
+ * This class represents the layout and contents of documents contained in the shard server's
+ * config.databases collection. All manipulation of documents coming from that collection should
+ * be done with this class.
+ *
+ * Expected shard server config.databases collection format:
+ * {
+ * "_id" : "foo",
+ * "version" : {
+ * "uuid" : UUID
+ * "lastMod" : 1
+ * },
+ * "primary": "shard0000",
+ * "partitioned": true,
+ * "enterCriticalSectionCounter" : 4 // optional
+ * }
+ *
+ * enterCriticalSectionCounter is currently just an OpObserver signal, thus otherwise ignored here.
+ */
+class ShardDatabaseType {
+public:
+ // Name of the database collection on the shard server.
+ static const NamespaceString ConfigNS;
+
+ static const BSONField<std::string> name; // "_id"
+ static const BSONField<DatabaseVersion> version;
+ static const BSONField<std::string> primary;
+ static const BSONField<bool> partitioned;
+ static const BSONField<int> enterCriticalSectionCounter;
+
+ ShardDatabaseType(const std::string dbName,
+ boost::optional<DatabaseVersion> version,
+ const ShardId primary,
+ bool partitioned);
+
+ /**
+ * Constructs a new ShardDatabaseType object from BSON. Also does validation of the contents.
+ */
+ static StatusWith<ShardDatabaseType> fromBSON(const BSONObj& source);
+
+ /**
+ * Returns the BSON representation of this shard database type object.
+ */
+ BSONObj toBSON() const;
+
+ /**
+ * Returns a std::string representation of the current internal state.
+ */
+ std::string toString() const;
+
+ const std::string& getDbName() const {
+ return _name;
+ }
+ void setDbName(const std::string& dbName);
+
+ const boost::optional<DatabaseVersion> getDbVersion() const {
+ return _version;
+ }
+ void setDbVersion(boost::optional<DatabaseVersion> version);
+
+ const ShardId& getPrimary() const {
+ return _primary;
+ }
+ void setPrimary(const ShardId& primary);
+
+ bool getPartitioned() const {
+ return _partitioned;
+ }
+ void setPartitioned(bool partitioned);
+
+private:
+ std::string _name;
+ boost::optional<DatabaseVersion> _version;
+ ShardId _primary;
+ bool _partitioned;
+};
+
+} // namespace mongo
diff --git a/src/mongo/s/config_server_catalog_cache_loader.cpp b/src/mongo/s/config_server_catalog_cache_loader.cpp
index 68a6817ec83..eaa2669b072 100644
--- a/src/mongo/s/config_server_catalog_cache_loader.cpp
+++ b/src/mongo/s/config_server_catalog_cache_loader.cpp
@@ -200,13 +200,21 @@ void ConfigServerCatalogCacheLoader::getDatabase(
stdx::function<void(OperationContext*, StatusWith<DatabaseType>)> callbackFn) {
if (MONGO_FAIL_POINT(callShardServerCallbackFn)) {
- uassertStatusOK(_threadPool.schedule([ dbName, callbackFn ]() noexcept {
+ uassertStatusOK(_threadPool.schedule([ name = dbName.toString(), callbackFn ]() noexcept {
auto opCtx = Client::getCurrent()->makeOperationContext();
- const auto dbVersion = Versioning::newDatabaseVersion();
- DatabaseType dbt(dbName.toString(), ShardId("PrimaryShard"), false, dbVersion);
+ auto swDbt = [&]() -> StatusWith<DatabaseType> {
+ try {
- callbackFn(opCtx.get(), dbt);
+ const auto dbVersion = Versioning::newDatabaseVersion();
+ DatabaseType dbt(std::move(name), ShardId("PrimaryShard"), false, dbVersion);
+ return dbt;
+ } catch (const DBException& ex) {
+ return ex.toStatus();
+ }
+ }();
+
+ callbackFn(opCtx.get(), swDbt);
}));
}
}