summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/shard_server_catalog_cache_loader.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/s/shard_server_catalog_cache_loader.cpp')
-rw-r--r--src/mongo/db/s/shard_server_catalog_cache_loader.cpp428
1 files changed, 372 insertions, 56 deletions
diff --git a/src/mongo/db/s/shard_server_catalog_cache_loader.cpp b/src/mongo/db/s/shard_server_catalog_cache_loader.cpp
index 23b9e85318d..d469bf78829 100644
--- a/src/mongo/db/s/shard_server_catalog_cache_loader.cpp
+++ b/src/mongo/db/s/shard_server_catalog_cache_loader.cpp
@@ -40,6 +40,7 @@
#include "mongo/db/s/shard_metadata_util.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/s/catalog/type_shard_collection.h"
+#include "mongo/s/catalog/type_shard_database.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/grid.h"
#include "mongo/stdx/memory.h"
@@ -120,6 +121,24 @@ Status persistCollectionAndChangedChunks(OperationContext* opCtx,
}
/**
+ * Takes a DatabaseType object and persists the changes to the shard's metadata
+ * collections.
+ */
+Status persistDbVersion(OperationContext* opCtx, const DatabaseType& dbt) {
+ // Update the databases collection entry for 'dbName' in case there are any new updates.
+ Status status = updateShardDatabasesEntry(opCtx,
+ BSON(ShardDatabaseType::name() << dbt.getName()),
+ dbt.toBSON(),
+ BSONObj(),
+ true /*upsert*/);
+ if (!status.isOK()) {
+ return status;
+ }
+
+ return Status::OK();
+}
+
+/**
* This function will throw on error!
*
* Retrieves the persisted max chunk version for 'nss', if there are any persisted chunks. If there
@@ -130,7 +149,7 @@ Status persistCollectionAndChangedChunks(OperationContext* opCtx,
* could be dropped and recreated between reading the collection epoch and retrieving the chunk,
* which would make the returned ChunkVersion corrupt.
*/
-ChunkVersion getPersistedMaxVersion(OperationContext* opCtx, const NamespaceString& nss) {
+ChunkVersion getPersistedMaxChunkVersion(OperationContext* opCtx, const NamespaceString& nss) {
// Must read the collections entry to get the epoch to pass into ChunkType for shard's chunk
// collection.
auto statusWithCollection = readShardCollectionsEntry(opCtx, nss);
@@ -168,6 +187,30 @@ ChunkVersion getPersistedMaxVersion(OperationContext* opCtx, const NamespaceStri
/**
* This function will throw on error!
*
+ * Retrieves the persisted max db version for 'dbName', if there are any persisted dbs. If there
+ * are none -- meaning there's no persisted metadata for 'dbName' --, returns boost::optional.
+ */
+boost::optional<DatabaseVersion> getPersistedMaxDbVersion(OperationContext* opCtx,
+ StringData dbName) {
+
+ auto statusWithDatabaseEntry = readShardDatabasesEntry(opCtx, dbName);
+ if (statusWithDatabaseEntry == ErrorCodes::NamespaceNotFound) {
+ // There is no persisted metadata.
+ return boost::none;
+ }
+ uassert(ErrorCodes::OperationFailed,
+ str::stream() << "Failed to read persisted database entry for db '" << dbName.toString()
+ << "' due to '"
+ << statusWithDatabaseEntry.getStatus().toString()
+ << "'.",
+ statusWithDatabaseEntry.isOK());
+
+ return statusWithDatabaseEntry.getValue().getDbVersion();
+}
+
+/**
+ * This function will throw on error!
+ *
* Tries to find persisted chunk metadata with chunk versions GTE to 'version'.
*
* If 'version's epoch matches persisted metadata, returns persisted metadata GTE 'version'.
@@ -397,8 +440,8 @@ void ShardServerCatalogCacheLoader::getDatabase(
isPrimary = (_role == ReplicaSetRole::Primary);
}
- uassertStatusOK(
- _threadPool.schedule([ this, dbName, callbackFn, isPrimary, currentTerm ]() noexcept {
+ uassertStatusOK(_threadPool.schedule(
+ [ this, name = dbName.toString(), callbackFn, isPrimary, currentTerm ]() noexcept {
auto context = _contexts.makeOperationContext(*Client::getCurrent());
{
@@ -418,7 +461,8 @@ void ShardServerCatalogCacheLoader::getDatabase(
try {
if (isPrimary) {
- _schedulePrimaryGetDatabase(context.opCtx(), dbName, currentTerm, callbackFn);
+ _schedulePrimaryGetDatabase(
+ context.opCtx(), StringData(name), currentTerm, callbackFn);
}
} catch (const DBException& ex) {
callbackFn(context.opCtx(), ex.toStatus());
@@ -426,18 +470,6 @@ void ShardServerCatalogCacheLoader::getDatabase(
}));
}
-void ShardServerCatalogCacheLoader::_schedulePrimaryGetDatabase(
- OperationContext* opCtx,
- StringData dbName,
- long long termScheduled,
- stdx::function<void(OperationContext*, StatusWith<DatabaseType>)> callbackFn) {
-
- auto remoteRefreshCallbackFn = [](OperationContext* opCtx,
- StatusWith<DatabaseType> swDatabaseType) {};
-
- _configServerLoader->getDatabase(dbName, remoteRefreshCallbackFn);
-}
-
void ShardServerCatalogCacheLoader::waitForCollectionFlush(OperationContext* opCtx,
const NamespaceString& nss) {
stdx::unique_lock<stdx::mutex> lg(_mutex);
@@ -451,10 +483,10 @@ void ShardServerCatalogCacheLoader::waitForCollectionFlush(OperationContext* opC
<< " because the node's replication role changed.",
_role == ReplicaSetRole::Primary && _term == initialTerm);
- auto it = _taskLists.find(nss);
+ auto it = _collAndChunkTaskLists.find(nss);
// If there are no tasks for the specified namespace, everything must have been completed
- if (it == _taskLists.end())
+ if (it == _collAndChunkTaskLists.end())
return;
auto& taskList = it->second;
@@ -490,15 +522,54 @@ void ShardServerCatalogCacheLoader::waitForCollectionFlush(OperationContext* opC
void ShardServerCatalogCacheLoader::waitForDatabaseFlush(OperationContext* opCtx,
StringData dbName) {
+
stdx::unique_lock<stdx::mutex> lg(_mutex);
const auto initialTerm = _term;
- uassert(ErrorCodes::NotMaster,
- str::stream() << "Unable to wait for database metadata flush for " << dbName.toString()
- << " because the node's replication role changed.",
- _role == ReplicaSetRole::Primary && _term == initialTerm);
+ boost::optional<uint64_t> taskNumToWait;
+
+ while (true) {
+ uassert(ErrorCodes::NotMaster,
+ str::stream() << "Unable to wait for database metadata flush for "
+ << dbName.toString()
+ << " because the node's replication role changed.",
+ _role == ReplicaSetRole::Primary && _term == initialTerm);
+
+ auto it = _dbTaskLists.find(dbName.toString());
+
+ // If there are no tasks for the specified namespace, everything must have been completed
+ if (it == _dbTaskLists.end())
+ return;
+
+ auto& taskList = it->second;
+
+ if (!taskNumToWait) {
+ const auto& lastTask = taskList.back();
+ taskNumToWait = lastTask.taskNum;
+ } else {
+ const auto& activeTask = taskList.front();
- return;
+ if (activeTask.taskNum > *taskNumToWait) {
+ auto secondTaskIt = std::next(taskList.begin());
+
+ // Because of an optimization where a namespace drop clears all tasks except the
+ // active it is possible that the task number we are waiting on will never actually
+ // be written. Because of this we move the task number to the drop which can only be
+ // in the active task or in the one after the active.
+ if (activeTask.dropped) {
+ taskNumToWait = activeTask.taskNum;
+ } else if (secondTaskIt != taskList.end() && secondTaskIt->dropped) {
+ taskNumToWait = secondTaskIt->taskNum;
+ } else {
+ return;
+ }
+ }
+ }
+
+ // It is not safe to use taskList after this call, because it will unlock and lock the tasks
+ // mutex, so we just loop around.
+ taskList.waitForActiveTaskCompletion(lg);
+ }
}
void ShardServerCatalogCacheLoader::_runSecondaryGetChunksSince(
@@ -526,9 +597,9 @@ void ShardServerCatalogCacheLoader::_schedulePrimaryGetChunksSince(
const ChunkVersion maxLoaderVersion = [&] {
{
stdx::lock_guard<stdx::mutex> lock(_mutex);
- auto taskListIt = _taskLists.find(nss);
+ auto taskListIt = _collAndChunkTaskLists.find(nss);
- if (taskListIt != _taskLists.end() &&
+ if (taskListIt != _collAndChunkTaskLists.end() &&
taskListIt->second.hasTasksFromThisTerm(termScheduled)) {
// Enqueued tasks have the latest metadata
return taskListIt->second.getHighestVersionEnqueued();
@@ -536,7 +607,7 @@ void ShardServerCatalogCacheLoader::_schedulePrimaryGetChunksSince(
}
// If there are no enqueued tasks, get the max persisted
- return getPersistedMaxVersion(opCtx, nss);
+ return getPersistedMaxChunkVersion(opCtx, nss);
}();
auto remoteRefreshCallbackFn = [this,
@@ -550,8 +621,10 @@ void ShardServerCatalogCacheLoader::_schedulePrimaryGetChunksSince(
StatusWith<CollectionAndChangedChunks> swCollectionAndChangedChunks) {
if (swCollectionAndChangedChunks == ErrorCodes::NamespaceNotFound) {
- Status scheduleStatus = _ensureMajorityPrimaryAndScheduleTask(
- opCtx, nss, Task{swCollectionAndChangedChunks, maxLoaderVersion, termScheduled});
+ Status scheduleStatus = _ensureMajorityPrimaryAndScheduleCollAndChunksTask(
+ opCtx,
+ nss,
+ collAndChunkTask{swCollectionAndChangedChunks, maxLoaderVersion, termScheduled});
if (!scheduleStatus.isOK()) {
callbackFn(opCtx, scheduleStatus);
notify->set();
@@ -577,10 +650,11 @@ void ShardServerCatalogCacheLoader::_schedulePrimaryGetChunksSince(
} else {
if ((collAndChunks.epoch != maxLoaderVersion.epoch()) ||
(collAndChunks.changedChunks.back().getVersion() > maxLoaderVersion)) {
- Status scheduleStatus = _ensureMajorityPrimaryAndScheduleTask(
+ Status scheduleStatus = _ensureMajorityPrimaryAndScheduleCollAndChunksTask(
opCtx,
nss,
- Task{swCollectionAndChangedChunks, maxLoaderVersion, termScheduled});
+ collAndChunkTask{
+ swCollectionAndChangedChunks, maxLoaderVersion, termScheduled});
if (!scheduleStatus.isOK()) {
callbackFn(opCtx, scheduleStatus);
notify->set();
@@ -616,6 +690,68 @@ void ShardServerCatalogCacheLoader::_schedulePrimaryGetChunksSince(
_configServerLoader->getChunksSince(nss, maxLoaderVersion, remoteRefreshCallbackFn);
}
+void ShardServerCatalogCacheLoader::_schedulePrimaryGetDatabase(
+ OperationContext* opCtx,
+ StringData dbName,
+ long long termScheduled,
+ stdx::function<void(OperationContext*, StatusWith<DatabaseType>)> callbackFn) {
+
+ // Get the max version the loader has.
+ boost::optional<DatabaseVersion> maxLoaderVersion = [&] {
+ {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ auto taskListIt = _dbTaskLists.find(dbName.toString());
+
+ if (taskListIt != _dbTaskLists.end() &&
+ taskListIt->second.hasTasksFromThisTerm(termScheduled)) {
+ // Enqueued tasks have the latest metadata
+ return taskListIt->second.getHighestVersionEnqueued();
+ }
+ }
+
+ return getPersistedMaxDbVersion(opCtx, dbName);
+ }();
+
+ auto remoteRefreshCallbackFn =
+ [ this, name = dbName.toString(), maxLoaderVersion, termScheduled, callbackFn ](
+ OperationContext * opCtx, StatusWith<DatabaseType> swDatabaseType) {
+
+ if (swDatabaseType == ErrorCodes::NamespaceNotFound) {
+
+ Status scheduleStatus = _ensureMajorityPrimaryAndScheduleDbTask(
+ opCtx, name, dbTask{swDatabaseType, maxLoaderVersion, termScheduled});
+ if (!scheduleStatus.isOK()) {
+ callbackFn(opCtx, scheduleStatus);
+ return;
+ }
+
+ log() << "Cache loader remotely refreshed for database " << name
+ << " and found the database has been dropped.";
+
+ } else if (swDatabaseType.isOK()) {
+ auto& dbType = swDatabaseType.getValue();
+
+ if (!bool(maxLoaderVersion) || (dbType.getVersion() > maxLoaderVersion.get())) {
+
+ Status scheduleStatus = _ensureMajorityPrimaryAndScheduleDbTask(
+ opCtx, name, dbTask{swDatabaseType, maxLoaderVersion, termScheduled});
+ if (!scheduleStatus.isOK()) {
+ callbackFn(opCtx, scheduleStatus);
+ return;
+ }
+ }
+
+ log() << "Cache loader remotely refreshed for database " << name << " and found "
+ << dbType.toBSON();
+ }
+
+ // Complete the callbackFn work.
+ callbackFn(opCtx, std::move(swDatabaseType));
+ };
+
+ _configServerLoader->getDatabase(dbName, remoteRefreshCallbackFn);
+}
+
StatusWith<CollectionAndChangedChunks> ShardServerCatalogCacheLoader::_getLoaderMetadata(
OperationContext* opCtx,
const NamespaceString& nss,
@@ -696,9 +832,9 @@ std::pair<bool, CollectionAndChangedChunks> ShardServerCatalogCacheLoader::_getE
const ChunkVersion& catalogCacheSinceVersion,
const long long term) {
stdx::unique_lock<stdx::mutex> lock(_mutex);
- auto taskListIt = _taskLists.find(nss);
+ auto taskListIt = _collAndChunkTaskLists.find(nss);
- if (taskListIt == _taskLists.end()) {
+ if (taskListIt == _collAndChunkTaskLists.end()) {
return std::make_pair(false, CollectionAndChangedChunks());
} else if (!taskListIt->second.hasTasksFromThisTerm(term)) {
// If task list does not have a term that matches, there's no valid task data to collect.
@@ -726,8 +862,8 @@ std::pair<bool, CollectionAndChangedChunks> ShardServerCatalogCacheLoader::_getE
return std::make_pair(true, collAndChunks);
}
-Status ShardServerCatalogCacheLoader::_ensureMajorityPrimaryAndScheduleTask(
- OperationContext* opCtx, const NamespaceString& nss, Task task) {
+Status ShardServerCatalogCacheLoader::_ensureMajorityPrimaryAndScheduleCollAndChunksTask(
+ OperationContext* opCtx, const NamespaceString& nss, collAndChunkTask task) {
Status linearizableReadStatus = waitForLinearizableReadConcern(opCtx);
if (!linearizableReadStatus.isOK()) {
return linearizableReadStatus.withContext(
@@ -737,18 +873,49 @@ Status ShardServerCatalogCacheLoader::_ensureMajorityPrimaryAndScheduleTask(
stdx::lock_guard<stdx::mutex> lock(_mutex);
- const bool wasEmpty = _taskLists[nss].empty();
- _taskLists[nss].addTask(std::move(task));
+ const bool wasEmpty = _collAndChunkTaskLists[nss].empty();
+ _collAndChunkTaskLists[nss].addTask(std::move(task));
if (wasEmpty) {
- Status status = _threadPool.schedule([this, nss]() { _runTasks(nss); });
+ Status status = _threadPool.schedule([this, nss]() { _runCollAndChunksTasks(nss); });
if (!status.isOK()) {
log() << "Cache loader failed to schedule persisted metadata update"
<< " task for namespace '" << nss << "' due to '" << redact(status)
<< "'. Clearing task list so that scheduling"
<< " will be attempted by the next caller to refresh this namespace.";
stdx::lock_guard<stdx::mutex> lock(_mutex);
- _taskLists.erase(nss);
+ _collAndChunkTaskLists.erase(nss);
+ }
+ return status;
+ }
+
+ return Status::OK();
+}
+
+Status ShardServerCatalogCacheLoader::_ensureMajorityPrimaryAndScheduleDbTask(
+ OperationContext* opCtx, StringData dbName, dbTask task) {
+ Status linearizableReadStatus = waitForLinearizableReadConcern(opCtx);
+ if (!linearizableReadStatus.isOK()) {
+ return linearizableReadStatus.withContext(
+ "Unable to schedule routing table update because this is not the majority primary and "
+ "may not have the latest data.");
+ }
+
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+
+ const bool wasEmpty = _dbTaskLists[dbName.toString()].empty();
+ _dbTaskLists[dbName.toString()].addTask(std::move(task));
+
+ if (wasEmpty) {
+ Status status = _threadPool.schedule(
+ [ this, name = dbName.toString() ]() { _runDbTasks(StringData(name)); });
+ if (!status.isOK()) {
+ log() << "Cache loader failed to schedule persisted metadata update"
+ << " task for db '" << dbName.toString() << "' due to '" << redact(status)
+ << "'. Clearing task list so that scheduling"
+ << " will be attempted by the next caller to refresh this namespace.";
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ _dbTaskLists.erase(dbName.toString());
}
return status;
}
@@ -756,12 +923,12 @@ Status ShardServerCatalogCacheLoader::_ensureMajorityPrimaryAndScheduleTask(
return Status::OK();
}
-void ShardServerCatalogCacheLoader::_runTasks(const NamespaceString& nss) {
+void ShardServerCatalogCacheLoader::_runCollAndChunksTasks(const NamespaceString& nss) {
auto context = _contexts.makeOperationContext(*Client::getCurrent());
bool taskFinished = false;
try {
- _updatePersistedMetadata(context.opCtx(), nss);
+ _updatePersistedCollAndChunksMetadata(context.opCtx(), nss);
taskFinished = true;
} catch (const DBException& ex) {
Status exceptionStatus = ex.toStatus();
@@ -780,29 +947,72 @@ void ShardServerCatalogCacheLoader::_runTasks(const NamespaceString& nss) {
// If task completed successfully, remove it from work queue
if (taskFinished) {
- _taskLists[nss].pop_front();
+ _collAndChunkTaskLists[nss].pop_front();
}
// Schedule more work if there is any
- if (!_taskLists[nss].empty()) {
- Status status = _threadPool.schedule([this, nss]() { _runTasks(nss); });
+ if (!_collAndChunkTaskLists[nss].empty()) {
+ Status status = _threadPool.schedule([this, nss]() { _runCollAndChunksTasks(nss); });
if (!status.isOK()) {
log() << "Cache loader failed to schedule a persisted metadata update"
<< " task for namespace '" << nss << "' due to '" << redact(status)
<< "'. Clearing task list so that scheduling will be attempted by the next"
<< " caller to refresh this namespace.";
- _taskLists.erase(nss);
+ _collAndChunkTaskLists.erase(nss);
}
} else {
- _taskLists.erase(nss);
+ _collAndChunkTaskLists.erase(nss);
}
}
-void ShardServerCatalogCacheLoader::_updatePersistedMetadata(OperationContext* opCtx,
- const NamespaceString& nss) {
+void ShardServerCatalogCacheLoader::_runDbTasks(StringData dbName) {
+ auto context = _contexts.makeOperationContext(*Client::getCurrent());
+
+ bool taskFinished = false;
+ try {
+ _updatePersistedDbMetadata(context.opCtx(), dbName);
+ taskFinished = true;
+ } catch (const DBException& ex) {
+ Status exceptionStatus = ex.toStatus();
+
+ // This thread must stop if we are shutting down
+ if (ErrorCodes::isShutdownError(exceptionStatus.code())) {
+ log() << "Failed to persist metadata update for db '" << dbName.toString()
+ << "' due to shutdown.";
+ return;
+ }
+
+ log() << redact(exceptionStatus);
+ }
+
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+
+ // If task completed successfully, remove it from work queue
+ if (taskFinished) {
+ _dbTaskLists[dbName.toString()].pop_front();
+ }
+
+ // Schedule more work if there is any
+ if (!_dbTaskLists[dbName.toString()].empty()) {
+ Status status = _threadPool.schedule(
+ [ this, name = dbName.toString() ]() { _runDbTasks(StringData(name)); });
+ if (!status.isOK()) {
+ log() << "Cache loader failed to schedule a persisted metadata update"
+ << " task for namespace '" << dbName.toString() << "' due to '" << redact(status)
+ << "'. Clearing task list so that scheduling will be attempted by the next"
+ << " caller to refresh this namespace.";
+ _dbTaskLists.erase(dbName.toString());
+ }
+ } else {
+ _dbTaskLists.erase(dbName.toString());
+ }
+}
+
+void ShardServerCatalogCacheLoader::_updatePersistedCollAndChunksMetadata(
+ OperationContext* opCtx, const NamespaceString& nss) {
stdx::unique_lock<stdx::mutex> lock(_mutex);
- const Task& task = _taskLists[nss].front();
+ const collAndChunkTask& task = _collAndChunkTaskLists[nss].front();
invariant(task.dropped || !task.collectionAndChangedChunks->changedChunks.empty());
// If this task is from an old term and no longer valid, do not execute and return true so that
@@ -837,6 +1047,38 @@ void ShardServerCatalogCacheLoader::_updatePersistedMetadata(OperationContext* o
<< task.minQueryVersion << "' to collection version '" << task.maxQueryVersion << "'.";
}
+void ShardServerCatalogCacheLoader::_updatePersistedDbMetadata(OperationContext* opCtx,
+ StringData dbName) {
+ stdx::unique_lock<stdx::mutex> lock(_mutex);
+
+ const dbTask& task = _dbTaskLists[dbName.toString()].front();
+
+ // If this task is from an old term and no longer valid, do not execute and return true so that
+ // the task gets removed from the task list
+ if (task.termCreated != _term) {
+ return;
+ }
+
+ lock.unlock();
+
+ // Check if this is a drop task
+ if (task.dropped) {
+ // The database was dropped. The persisted metadata for the collection must be cleared.
+ uassertStatusOKWithContext(deleteDatabasesEntry(opCtx, dbName),
+ str::stream() << "Failed to clear persisted metadata for db '"
+ << dbName.toString()
+ << "'. Will be retried.");
+ return;
+ }
+
+ uassertStatusOKWithContext(persistDbVersion(opCtx, task.databaseType.get()),
+ str::stream() << "Failed to update the persisted metadata for db '"
+ << dbName.toString()
+ << "'. Will be retried.");
+
+ log() << "Successfully updated persisted metadata for db '" << dbName.toString();
+}
+
CollectionAndChangedChunks
ShardServerCatalogCacheLoader::_getCompletePersistedMetadataForSecondarySinceVersion(
OperationContext* opCtx, const NamespaceString& nss, const ChunkVersion& version) {
@@ -875,7 +1117,7 @@ ShardServerCatalogCacheLoader::_getCompletePersistedMetadataForSecondarySinceVer
}
}
-ShardServerCatalogCacheLoader::Task::Task(
+ShardServerCatalogCacheLoader::collAndChunkTask::collAndChunkTask(
StatusWith<CollectionAndChangedChunks> statusWithCollectionAndChangedChunks,
ChunkVersion minimumQueryVersion,
long long currentTerm)
@@ -893,10 +1135,28 @@ ShardServerCatalogCacheLoader::Task::Task(
}
}
-ShardServerCatalogCacheLoader::TaskList::TaskList()
+ShardServerCatalogCacheLoader::dbTask::dbTask(StatusWith<DatabaseType> swDatabaseType,
+ boost::optional<DatabaseVersion> minimumVersion,
+ long long currentTerm)
+ : taskNum(taskIdGenerator.fetchAndAdd(1)),
+ minVersion(minimumVersion),
+ termCreated(currentTerm) {
+ if (swDatabaseType.isOK()) {
+ databaseType = std::move(swDatabaseType.getValue());
+ maxVersion = databaseType->getVersion();
+ } else {
+ invariant(swDatabaseType == ErrorCodes::NamespaceNotFound);
+ dropped = true;
+ }
+}
+
+ShardServerCatalogCacheLoader::CollAndChunkTaskList::CollAndChunkTaskList()
+ : _activeTaskCompletedCondVar(std::make_shared<stdx::condition_variable>()) {}
+
+ShardServerCatalogCacheLoader::DbTaskList::DbTaskList()
: _activeTaskCompletedCondVar(std::make_shared<stdx::condition_variable>()) {}
-void ShardServerCatalogCacheLoader::TaskList::addTask(Task task) {
+void ShardServerCatalogCacheLoader::CollAndChunkTaskList::addTask(collAndChunkTask task) {
if (_tasks.empty()) {
_tasks.emplace_back(std::move(task));
return;
@@ -924,13 +1184,47 @@ void ShardServerCatalogCacheLoader::TaskList::addTask(Task task) {
}
}
-void ShardServerCatalogCacheLoader::TaskList::pop_front() {
+void ShardServerCatalogCacheLoader::DbTaskList::addTask(dbTask task) {
+ if (_tasks.empty()) {
+ _tasks.emplace_back(std::move(task));
+
+ return;
+ }
+
+ if (task.dropped) {
+ invariant(_tasks.back().maxVersion == task.minVersion);
+
+ // As an optimization, on collection drop, clear any pending tasks in order to prevent any
+ // throw-away work from executing. Because we have no way to differentiate whether the
+ // active tasks is currently being operated on by a thread or not, we must leave the front
+ // intact.
+ _tasks.erase(std::next(_tasks.begin()), _tasks.end());
+
+ // No need to schedule a drop if one is already currently active.
+ if (!_tasks.front().dropped) {
+ _tasks.emplace_back(std::move(task));
+ }
+ } else {
+ // Tasks must have contiguous versions, unless a complete reload occurs.
+ invariant(!bool(task.minVersion) ||
+ (_tasks.back().maxVersion.get() == task.minVersion.get()));
+ _tasks.emplace_back(std::move(task));
+ }
+}
+
+void ShardServerCatalogCacheLoader::CollAndChunkTaskList::pop_front() {
+ invariant(!_tasks.empty());
+ _tasks.pop_front();
+ _activeTaskCompletedCondVar->notify_all();
+}
+
+void ShardServerCatalogCacheLoader::DbTaskList::pop_front() {
invariant(!_tasks.empty());
_tasks.pop_front();
_activeTaskCompletedCondVar->notify_all();
}
-void ShardServerCatalogCacheLoader::TaskList::waitForActiveTaskCompletion(
+void ShardServerCatalogCacheLoader::CollAndChunkTaskList::waitForActiveTaskCompletion(
stdx::unique_lock<stdx::mutex>& lg) {
// Increase the use_count of the condition variable shared pointer, because the entire task list
// might get deleted during the unlocked interval
@@ -938,17 +1232,39 @@ void ShardServerCatalogCacheLoader::TaskList::waitForActiveTaskCompletion(
condVar->wait(lg);
}
-bool ShardServerCatalogCacheLoader::TaskList::hasTasksFromThisTerm(long long term) const {
+void ShardServerCatalogCacheLoader::DbTaskList::waitForActiveTaskCompletion(
+ stdx::unique_lock<stdx::mutex>& lg) {
+ // Increase the use_count of the condition variable shared pointer, because the entire task list
+ // might get deleted during the unlocked interval
+ auto condVar = _activeTaskCompletedCondVar;
+ condVar->wait(lg);
+}
+
+bool ShardServerCatalogCacheLoader::CollAndChunkTaskList::hasTasksFromThisTerm(
+ long long term) const {
invariant(!_tasks.empty());
return _tasks.back().termCreated == term;
}
-ChunkVersion ShardServerCatalogCacheLoader::TaskList::getHighestVersionEnqueued() const {
+bool ShardServerCatalogCacheLoader::DbTaskList::hasTasksFromThisTerm(long long term) const {
+ invariant(!_tasks.empty());
+ return _tasks.back().termCreated == term;
+}
+
+ChunkVersion ShardServerCatalogCacheLoader::CollAndChunkTaskList::getHighestVersionEnqueued()
+ const {
invariant(!_tasks.empty());
return _tasks.back().maxQueryVersion;
}
-CollectionAndChangedChunks ShardServerCatalogCacheLoader::TaskList::getEnqueuedMetadataForTerm(
+boost::optional<DatabaseVersion>
+ShardServerCatalogCacheLoader::DbTaskList::getHighestVersionEnqueued() const {
+ invariant(!_tasks.empty());
+ return _tasks.back().maxVersion;
+}
+
+CollectionAndChangedChunks
+ShardServerCatalogCacheLoader::CollAndChunkTaskList::getEnqueuedMetadataForTerm(
const long long term) const {
CollectionAndChangedChunks collAndChunks;
for (const auto& task : _tasks) {