summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/shard_server_catalog_cache_loader.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/s/shard_server_catalog_cache_loader.cpp')
-rw-r--r--src/mongo/db/s/shard_server_catalog_cache_loader.cpp181
1 files changed, 78 insertions, 103 deletions
diff --git a/src/mongo/db/s/shard_server_catalog_cache_loader.cpp b/src/mongo/db/s/shard_server_catalog_cache_loader.cpp
index 83596a1da3c..e935418432a 100644
--- a/src/mongo/db/s/shard_server_catalog_cache_loader.cpp
+++ b/src/mongo/db/s/shard_server_catalog_cache_loader.cpp
@@ -35,6 +35,7 @@
#include "mongo/db/client.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/operation_context_group.h"
+#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/s/shard_metadata_util.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/s/catalog/type_shard_collection.h"
@@ -51,6 +52,8 @@ using CollectionAndChangedChunks = CatalogCacheLoader::CollectionAndChangedChunk
namespace {
+AtomicUInt64 taskIdGenerator{0};
+
/**
* Constructs the options for the loader thread pool.
*/
@@ -232,13 +235,11 @@ StatusWith<CollectionAndChangedChunks> getIncompletePersistedMetadataSinceVersio
}
/**
- * Sends forceRoutingTableRefresh to the primary, to force the primary to refresh its routing table
- * entry for 'nss' and to obtain the primary's collectionVersion for 'nss' after the refresh.
- *
- * Returns the primary's returned collectionVersion for 'nss', or throws on error.
+ * Sends forceRoutingTableRefresh to the primary to force it to refresh its routing table for
+ * collection 'nss' and then waits for the refresh to replicate to this node.
*/
-ChunkVersion forcePrimaryToRefresh(OperationContext* opCtx, const NamespaceString& nss) {
- auto shardingState = ShardingState::get(opCtx);
+void forcePrimaryRefreshAndWaitForReplication(OperationContext* opCtx, const NamespaceString& nss) {
+ auto const shardingState = ShardingState::get(opCtx);
invariant(shardingState->enabled());
auto selfShard = uassertStatusOK(
@@ -251,10 +252,11 @@ ChunkVersion forcePrimaryToRefresh(OperationContext* opCtx, const NamespaceStrin
BSON("forceRoutingTableRefresh" << nss.ns()),
Seconds{30},
Shard::RetryPolicy::kIdempotent));
+
uassertStatusOK(cmdResponse.commandStatus);
- return uassertStatusOK(
- ChunkVersion::parseFromBSONWithFieldForCommands(cmdResponse.response, "collectionVersion"));
+ uassertStatusOK(repl::ReplicationCoordinator::get(opCtx)->waitUntilOpTimeForRead(
+ opCtx, {LogicalTime::fromOperationTime(cmdResponse.response), boost::none}));
}
/**
@@ -285,36 +287,10 @@ ShardServerCatalogCacheLoader::~ShardServerCatalogCacheLoader() {
invariant(_contexts.isEmpty());
}
-void ShardServerCatalogCacheLoader::setForTesting() {
- _testing = true;
-}
-
void ShardServerCatalogCacheLoader::notifyOfCollectionVersionUpdate(const NamespaceString& nss) {
_namespaceNotifications.notifyChange(nss);
}
-Status ShardServerCatalogCacheLoader::waitForCollectionVersion(OperationContext* opCtx,
- const NamespaceString& nss,
- const ChunkVersion& version) {
- invariant(!opCtx->lockState()->isLocked());
- while (true) {
- auto scopedNotification = _namespaceNotifications.createNotification(nss);
-
- auto swRefreshState = getPersistedRefreshFlags(opCtx, nss);
- if (!swRefreshState.isOK()) {
- return swRefreshState.getStatus();
- }
- RefreshState refreshState = swRefreshState.getValue();
-
- if (refreshState.lastRefreshedCollectionVersion.epoch() != version.epoch() ||
- refreshState.lastRefreshedCollectionVersion >= version) {
- return Status::OK();
- }
-
- scopedNotification.get(opCtx);
- }
-}
-
void ShardServerCatalogCacheLoader::initializeReplicaSetRole(bool isPrimary) {
stdx::lock_guard<stdx::mutex> lock(_mutex);
invariant(_role == ReplicaSetRole::None);
@@ -379,12 +355,62 @@ std::shared_ptr<Notification<void>> ShardServerCatalogCacheLoader::getChunksSinc
return notify;
}
+void ShardServerCatalogCacheLoader::waitForCollectionFlush(OperationContext* opCtx,
+ const NamespaceString& nss) {
+ stdx::unique_lock<stdx::mutex> lg(_mutex);
+ const auto initialTerm = _term;
+
+ boost::optional<uint64_t> taskNumToWait;
+
+ while (true) {
+ uassert(ErrorCodes::NotMaster,
+ str::stream() << "Unable to wait for collection metadata flush for " << nss.ns()
+ << " because the node's replication role changed.",
+ _role == ReplicaSetRole::Primary && _term == initialTerm);
+
+ auto it = _taskLists.find(nss);
+
+ // If there are no tasks for the specified namespace, everything must have been completed
+ if (it == _taskLists.end())
+ return;
+
+ auto& taskList = it->second;
+
+ if (!taskNumToWait) {
+ const auto& lastTask = taskList.back();
+ taskNumToWait = lastTask.taskNum;
+ } else {
+ const auto& activeTask = taskList.front();
+
+ if (activeTask.taskNum > *taskNumToWait) {
+ auto secondTaskIt = std::next(taskList.begin());
+
+ // Because of an optimization where a namespace drop clears all tasks except the
+ // active it is possible that the task number we are waiting on will never actually
+ // be written. Because of this we move the task number to the drop which can only be
+ // in the active task or in the one after the active.
+ if (activeTask.dropped) {
+ taskNumToWait = activeTask.taskNum;
+ } else if (secondTaskIt != taskList.end() && secondTaskIt->dropped) {
+ taskNumToWait = secondTaskIt->taskNum;
+ } else {
+ return;
+ }
+ }
+ }
+
+ // It is not safe to use taskList after this call, because it will unlock and lock the tasks
+ // mutex, so we just loop around.
+ taskList.waitForActiveTaskCompletion(lg);
+ }
+}
+
void ShardServerCatalogCacheLoader::_runSecondaryGetChunksSince(
OperationContext* opCtx,
const NamespaceString& nss,
const ChunkVersion& catalogCacheSinceVersion,
stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn) {
- _forcePrimaryRefreshAndWaitForReplication(opCtx, nss);
+ forcePrimaryRefreshAndWaitForReplication(opCtx, nss);
// Read the local metadata.
auto swCollAndChunks =
@@ -392,61 +418,6 @@ void ShardServerCatalogCacheLoader::_runSecondaryGetChunksSince(
callbackFn(opCtx, std::move(swCollAndChunks));
}
-/**
- * "Waiting for replication" by waiting to see a local version equal or greater to the primary's
- * collectionVersion is not so straightforward. A few key insights:
- *
- * 1) ChunkVersions are ordered, so within an epoch, we can wait for a particular ChunkVersion.
- *
- * 2) Epochs are not ordered. If we are waiting for epochB and see epochA locally, we can't know if
- * the update for epochB already replicated or has yet to replicate.
- *
- * To deal with this, on seeing epochA, we wait for one update. If we are now in epochB (e.g., if
- * epochA was UNSHARDED) we continue waiting for updates until our version meets or exceeds the
- * primary's. Otherwise, we throw an error. A caller can retry, which will cause us to ask the
- * primary for a new collectionVersion to wait for. If we were behind, we continue waiting; if we
- * were ahead, we now have a new target.
- *
- * This only occurs if collections are being created, sharded, and dropped quickly.
- *
- * 3) Unsharded collections do not have epochs at all. A unique identifier for all collections,
- * including unsharded, will be introduced in 3.6. Until then, we cannot differentiate between
- * different incarnations of unsharded collections of the same name.
- *
- * We do not deal with this at all. We report that we are "up to date" even if we are at an
- * earlier incarnation of the unsharded collection.
- */
-void ShardServerCatalogCacheLoader::_forcePrimaryRefreshAndWaitForReplication(
- OperationContext* opCtx, const NamespaceString& nss) {
- // Start listening for metadata updates before obtaining the primary's version, in case we
- // replicate an epoch change past the primary's version before reading locally.
- boost::optional<NamespaceMetadataChangeNotifications::ScopedNotification> notif(
- _namespaceNotifications.createNotification(nss));
-
- auto primaryVersion = forcePrimaryToRefresh(opCtx, nss);
-
- bool waitedForUpdate = false;
- while (true) {
- auto secondaryVersion = getLocalVersion(opCtx, nss);
-
- if (secondaryVersion.hasEqualEpoch(primaryVersion) && secondaryVersion >= primaryVersion) {
- return;
- }
-
- if (waitedForUpdate) {
- // If we still aren't in the primary's epoch, throw.
- uassert(ErrorCodes::ConflictingOperationInProgress,
- "The collection has recently been dropped and recreated",
- secondaryVersion.epoch() == primaryVersion.epoch());
- }
-
- // Wait for a chunk metadata update (either ChunkVersion increment or epoch change).
- notif->get(opCtx);
- notif.emplace(_namespaceNotifications.createNotification(nss));
- waitedForUpdate = true;
- }
-}
-
void ShardServerCatalogCacheLoader::_schedulePrimaryGetChunksSince(
OperationContext* opCtx,
const NamespaceString& nss,
@@ -694,11 +665,7 @@ void ShardServerCatalogCacheLoader::_runTasks(const NamespaceString& nss) {
// If task completed successfully, remove it from work queue
if (taskFinished) {
- _taskLists[nss].removeActiveTask();
- }
-
- if (_testing) {
- notifyOfCollectionVersionUpdate(nss);
+ _taskLists[nss].pop_front();
}
// Schedule more work if there is any
@@ -720,7 +687,7 @@ void ShardServerCatalogCacheLoader::_updatePersistedMetadata(OperationContext* o
const NamespaceString& nss) {
stdx::unique_lock<stdx::mutex> lock(_mutex);
- const Task& task = _taskLists[nss].getActiveTask();
+ const Task& task = _taskLists[nss].front();
invariant(task.dropped || !task.collectionAndChangedChunks->changedChunks.empty());
// If this task is from an old term and no longer valid, do not execute and return true so that
@@ -830,7 +797,9 @@ ShardServerCatalogCacheLoader::Task::Task(
StatusWith<CollectionAndChangedChunks> statusWithCollectionAndChangedChunks,
ChunkVersion minimumQueryVersion,
long long currentTerm)
- : minQueryVersion(minimumQueryVersion), termCreated(currentTerm) {
+ : taskNum(taskIdGenerator.fetchAndAdd(1)),
+ minQueryVersion(minimumQueryVersion),
+ termCreated(currentTerm) {
if (statusWithCollectionAndChangedChunks.isOK()) {
collectionAndChangedChunks = statusWithCollectionAndChangedChunks.getValue();
invariant(!collectionAndChangedChunks->changedChunks.empty());
@@ -842,6 +811,9 @@ ShardServerCatalogCacheLoader::Task::Task(
}
}
+ShardServerCatalogCacheLoader::TaskList::TaskList()
+ : _activeTaskCompletedCondVar(std::make_shared<stdx::condition_variable>()) {}
+
void ShardServerCatalogCacheLoader::TaskList::addTask(Task task) {
if (_tasks.empty()) {
_tasks.emplace_back(std::move(task));
@@ -870,15 +842,18 @@ void ShardServerCatalogCacheLoader::TaskList::addTask(Task task) {
}
}
-const ShardServerCatalogCacheLoader::Task& ShardServerCatalogCacheLoader::TaskList::getActiveTask()
- const {
+void ShardServerCatalogCacheLoader::TaskList::pop_front() {
invariant(!_tasks.empty());
- return _tasks.front();
+ _tasks.pop_front();
+ _activeTaskCompletedCondVar->notify_all();
}
-void ShardServerCatalogCacheLoader::TaskList::removeActiveTask() {
- invariant(!_tasks.empty());
- _tasks.pop_front();
+void ShardServerCatalogCacheLoader::TaskList::waitForActiveTaskCompletion(
+ stdx::unique_lock<stdx::mutex>& lg) {
+ // Increase the use_count of the condition variable shared pointer, because the entire task list
+ // might get deleted during the unlocked interval
+ auto condVar = _activeTaskCompletedCondVar;
+ condVar->wait(lg);
}
bool ShardServerCatalogCacheLoader::TaskList::hasTasksFromThisTerm(long long term) const {