diff options
Diffstat (limited to 'src/mongo/db/s/shard_server_catalog_cache_loader.cpp')
-rw-r--r-- | src/mongo/db/s/shard_server_catalog_cache_loader.cpp | 181 |
1 files changed, 78 insertions, 103 deletions
diff --git a/src/mongo/db/s/shard_server_catalog_cache_loader.cpp b/src/mongo/db/s/shard_server_catalog_cache_loader.cpp index 83596a1da3c..e935418432a 100644 --- a/src/mongo/db/s/shard_server_catalog_cache_loader.cpp +++ b/src/mongo/db/s/shard_server_catalog_cache_loader.cpp @@ -35,6 +35,7 @@ #include "mongo/db/client.h" #include "mongo/db/operation_context.h" #include "mongo/db/operation_context_group.h" +#include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/s/shard_metadata_util.h" #include "mongo/db/s/sharding_state.h" #include "mongo/s/catalog/type_shard_collection.h" @@ -51,6 +52,8 @@ using CollectionAndChangedChunks = CatalogCacheLoader::CollectionAndChangedChunk namespace { +AtomicUInt64 taskIdGenerator{0}; + /** * Constructs the options for the loader thread pool. */ @@ -232,13 +235,11 @@ StatusWith<CollectionAndChangedChunks> getIncompletePersistedMetadataSinceVersio } /** - * Sends forceRoutingTableRefresh to the primary, to force the primary to refresh its routing table - * entry for 'nss' and to obtain the primary's collectionVersion for 'nss' after the refresh. - * - * Returns the primary's returned collectionVersion for 'nss', or throws on error. + * Sends forceRoutingTableRefresh to the primary to force it to refresh its routing table for + * collection 'nss' and then waits for the refresh to replicate to this node. */ -ChunkVersion forcePrimaryToRefresh(OperationContext* opCtx, const NamespaceString& nss) { - auto shardingState = ShardingState::get(opCtx); +void forcePrimaryRefreshAndWaitForReplication(OperationContext* opCtx, const NamespaceString& nss) { + auto const shardingState = ShardingState::get(opCtx); invariant(shardingState->enabled()); auto selfShard = uassertStatusOK( @@ -251,10 +252,11 @@ ChunkVersion forcePrimaryToRefresh(OperationContext* opCtx, const NamespaceStrin BSON("forceRoutingTableRefresh" << nss.ns()), Seconds{30}, Shard::RetryPolicy::kIdempotent)); + uassertStatusOK(cmdResponse.commandStatus); - return uassertStatusOK( - ChunkVersion::parseFromBSONWithFieldForCommands(cmdResponse.response, "collectionVersion")); + uassertStatusOK(repl::ReplicationCoordinator::get(opCtx)->waitUntilOpTimeForRead( + opCtx, {LogicalTime::fromOperationTime(cmdResponse.response), boost::none})); } /** @@ -285,36 +287,10 @@ ShardServerCatalogCacheLoader::~ShardServerCatalogCacheLoader() { invariant(_contexts.isEmpty()); } -void ShardServerCatalogCacheLoader::setForTesting() { - _testing = true; -} - void ShardServerCatalogCacheLoader::notifyOfCollectionVersionUpdate(const NamespaceString& nss) { _namespaceNotifications.notifyChange(nss); } -Status ShardServerCatalogCacheLoader::waitForCollectionVersion(OperationContext* opCtx, - const NamespaceString& nss, - const ChunkVersion& version) { - invariant(!opCtx->lockState()->isLocked()); - while (true) { - auto scopedNotification = _namespaceNotifications.createNotification(nss); - - auto swRefreshState = getPersistedRefreshFlags(opCtx, nss); - if (!swRefreshState.isOK()) { - return swRefreshState.getStatus(); - } - RefreshState refreshState = swRefreshState.getValue(); - - if (refreshState.lastRefreshedCollectionVersion.epoch() != version.epoch() || - refreshState.lastRefreshedCollectionVersion >= version) { - return Status::OK(); - } - - scopedNotification.get(opCtx); - } -} - void ShardServerCatalogCacheLoader::initializeReplicaSetRole(bool isPrimary) { stdx::lock_guard<stdx::mutex> lock(_mutex); invariant(_role == ReplicaSetRole::None); @@ -379,12 +355,62 @@ std::shared_ptr<Notification<void>> ShardServerCatalogCacheLoader::getChunksSinc return notify; } +void ShardServerCatalogCacheLoader::waitForCollectionFlush(OperationContext* opCtx, + const NamespaceString& nss) { + stdx::unique_lock<stdx::mutex> lg(_mutex); + const auto initialTerm = _term; + + boost::optional<uint64_t> taskNumToWait; + + while (true) { + uassert(ErrorCodes::NotMaster, + str::stream() << "Unable to wait for collection metadata flush for " << nss.ns() + << " because the node's replication role changed.", + _role == ReplicaSetRole::Primary && _term == initialTerm); + + auto it = _taskLists.find(nss); + + // If there are no tasks for the specified namespace, everything must have been completed + if (it == _taskLists.end()) + return; + + auto& taskList = it->second; + + if (!taskNumToWait) { + const auto& lastTask = taskList.back(); + taskNumToWait = lastTask.taskNum; + } else { + const auto& activeTask = taskList.front(); + + if (activeTask.taskNum > *taskNumToWait) { + auto secondTaskIt = std::next(taskList.begin()); + + // Because of an optimization where a namespace drop clears all tasks except the + // active it is possible that the task number we are waiting on will never actually + // be written. Because of this we move the task number to the drop which can only be + // in the active task or in the one after the active. + if (activeTask.dropped) { + taskNumToWait = activeTask.taskNum; + } else if (secondTaskIt != taskList.end() && secondTaskIt->dropped) { + taskNumToWait = secondTaskIt->taskNum; + } else { + return; + } + } + } + + // It is not safe to use taskList after this call, because it will unlock and lock the tasks + // mutex, so we just loop around. + taskList.waitForActiveTaskCompletion(lg); + } +} + void ShardServerCatalogCacheLoader::_runSecondaryGetChunksSince( OperationContext* opCtx, const NamespaceString& nss, const ChunkVersion& catalogCacheSinceVersion, stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn) { - _forcePrimaryRefreshAndWaitForReplication(opCtx, nss); + forcePrimaryRefreshAndWaitForReplication(opCtx, nss); // Read the local metadata. auto swCollAndChunks = @@ -392,61 +418,6 @@ void ShardServerCatalogCacheLoader::_runSecondaryGetChunksSince( callbackFn(opCtx, std::move(swCollAndChunks)); } -/** - * "Waiting for replication" by waiting to see a local version equal or greater to the primary's - * collectionVersion is not so straightforward. A few key insights: - * - * 1) ChunkVersions are ordered, so within an epoch, we can wait for a particular ChunkVersion. - * - * 2) Epochs are not ordered. If we are waiting for epochB and see epochA locally, we can't know if - * the update for epochB already replicated or has yet to replicate. - * - * To deal with this, on seeing epochA, we wait for one update. If we are now in epochB (e.g., if - * epochA was UNSHARDED) we continue waiting for updates until our version meets or exceeds the - * primary's. Otherwise, we throw an error. A caller can retry, which will cause us to ask the - * primary for a new collectionVersion to wait for. If we were behind, we continue waiting; if we - * were ahead, we now have a new target. - * - * This only occurs if collections are being created, sharded, and dropped quickly. - * - * 3) Unsharded collections do not have epochs at all. A unique identifier for all collections, - * including unsharded, will be introduced in 3.6. Until then, we cannot differentiate between - * different incarnations of unsharded collections of the same name. - * - * We do not deal with this at all. We report that we are "up to date" even if we are at an - * earlier incarnation of the unsharded collection. - */ -void ShardServerCatalogCacheLoader::_forcePrimaryRefreshAndWaitForReplication( - OperationContext* opCtx, const NamespaceString& nss) { - // Start listening for metadata updates before obtaining the primary's version, in case we - // replicate an epoch change past the primary's version before reading locally. - boost::optional<NamespaceMetadataChangeNotifications::ScopedNotification> notif( - _namespaceNotifications.createNotification(nss)); - - auto primaryVersion = forcePrimaryToRefresh(opCtx, nss); - - bool waitedForUpdate = false; - while (true) { - auto secondaryVersion = getLocalVersion(opCtx, nss); - - if (secondaryVersion.hasEqualEpoch(primaryVersion) && secondaryVersion >= primaryVersion) { - return; - } - - if (waitedForUpdate) { - // If we still aren't in the primary's epoch, throw. - uassert(ErrorCodes::ConflictingOperationInProgress, - "The collection has recently been dropped and recreated", - secondaryVersion.epoch() == primaryVersion.epoch()); - } - - // Wait for a chunk metadata update (either ChunkVersion increment or epoch change). - notif->get(opCtx); - notif.emplace(_namespaceNotifications.createNotification(nss)); - waitedForUpdate = true; - } -} - void ShardServerCatalogCacheLoader::_schedulePrimaryGetChunksSince( OperationContext* opCtx, const NamespaceString& nss, @@ -694,11 +665,7 @@ void ShardServerCatalogCacheLoader::_runTasks(const NamespaceString& nss) { // If task completed successfully, remove it from work queue if (taskFinished) { - _taskLists[nss].removeActiveTask(); - } - - if (_testing) { - notifyOfCollectionVersionUpdate(nss); + _taskLists[nss].pop_front(); } // Schedule more work if there is any @@ -720,7 +687,7 @@ void ShardServerCatalogCacheLoader::_updatePersistedMetadata(OperationContext* o const NamespaceString& nss) { stdx::unique_lock<stdx::mutex> lock(_mutex); - const Task& task = _taskLists[nss].getActiveTask(); + const Task& task = _taskLists[nss].front(); invariant(task.dropped || !task.collectionAndChangedChunks->changedChunks.empty()); // If this task is from an old term and no longer valid, do not execute and return true so that @@ -830,7 +797,9 @@ ShardServerCatalogCacheLoader::Task::Task( StatusWith<CollectionAndChangedChunks> statusWithCollectionAndChangedChunks, ChunkVersion minimumQueryVersion, long long currentTerm) - : minQueryVersion(minimumQueryVersion), termCreated(currentTerm) { + : taskNum(taskIdGenerator.fetchAndAdd(1)), + minQueryVersion(minimumQueryVersion), + termCreated(currentTerm) { if (statusWithCollectionAndChangedChunks.isOK()) { collectionAndChangedChunks = statusWithCollectionAndChangedChunks.getValue(); invariant(!collectionAndChangedChunks->changedChunks.empty()); @@ -842,6 +811,9 @@ ShardServerCatalogCacheLoader::Task::Task( } } +ShardServerCatalogCacheLoader::TaskList::TaskList() + : _activeTaskCompletedCondVar(std::make_shared<stdx::condition_variable>()) {} + void ShardServerCatalogCacheLoader::TaskList::addTask(Task task) { if (_tasks.empty()) { _tasks.emplace_back(std::move(task)); @@ -870,15 +842,18 @@ void ShardServerCatalogCacheLoader::TaskList::addTask(Task task) { } } -const ShardServerCatalogCacheLoader::Task& ShardServerCatalogCacheLoader::TaskList::getActiveTask() - const { +void ShardServerCatalogCacheLoader::TaskList::pop_front() { invariant(!_tasks.empty()); - return _tasks.front(); + _tasks.pop_front(); + _activeTaskCompletedCondVar->notify_all(); } -void ShardServerCatalogCacheLoader::TaskList::removeActiveTask() { - invariant(!_tasks.empty()); - _tasks.pop_front(); +void ShardServerCatalogCacheLoader::TaskList::waitForActiveTaskCompletion( + stdx::unique_lock<stdx::mutex>& lg) { + // Increase the use_count of the condition variable shared pointer, because the entire task list + // might get deleted during the unlocked interval + auto condVar = _activeTaskCompletedCondVar; + condVar->wait(lg); } bool ShardServerCatalogCacheLoader::TaskList::hasTasksFromThisTerm(long long term) const { |