diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2017-08-01 18:55:52 -0400 |
---|---|---|
committer | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2017-08-10 12:49:33 -0400 |
commit | 7ce57a44876cfedcedaf2dd9896817a2b021df66 (patch) | |
tree | 04f8e498f16db52da80a74435049a0f0995de970 /src/mongo/db/s | |
parent | a14097396cba4366d159dde303a6d4af130e781f (diff) | |
download | mongo-7ce57a44876cfedcedaf2dd9896817a2b021df66.tar.gz |
SERVER-30147 Add ability to wait on collection metadata flush
Diffstat (limited to 'src/mongo/db/s')
-rw-r--r-- | src/mongo/db/s/catalog_cache_loader_mock.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/s/catalog_cache_loader_mock.h | 4 | ||||
-rw-r--r-- | src/mongo/db/s/force_routing_table_refresh_command.cpp | 48 | ||||
-rw-r--r-- | src/mongo/db/s/move_chunk_command.cpp | 21 | ||||
-rw-r--r-- | src/mongo/db/s/read_only_catalog_cache_loader.cpp | 25 | ||||
-rw-r--r-- | src/mongo/db/s/read_only_catalog_cache_loader.h | 16 | ||||
-rw-r--r-- | src/mongo/db/s/shard_server_catalog_cache_loader.cpp | 181 | ||||
-rw-r--r-- | src/mongo/db/s/shard_server_catalog_cache_loader.h | 95 | ||||
-rw-r--r-- | src/mongo/db/s/shard_server_catalog_cache_loader_test.cpp | 4 |
9 files changed, 153 insertions, 246 deletions
diff --git a/src/mongo/db/s/catalog_cache_loader_mock.cpp b/src/mongo/db/s/catalog_cache_loader_mock.cpp index d5dbe15c50e..4dc0f775fab 100644 --- a/src/mongo/db/s/catalog_cache_loader_mock.cpp +++ b/src/mongo/db/s/catalog_cache_loader_mock.cpp @@ -85,9 +85,8 @@ void CatalogCacheLoaderMock::notifyOfCollectionVersionUpdate(const NamespaceStri MONGO_UNREACHABLE; } -Status CatalogCacheLoaderMock::waitForCollectionVersion(OperationContext* opCtx, - const NamespaceString& nss, - const ChunkVersion& version) { +void CatalogCacheLoaderMock::waitForCollectionFlush(OperationContext* opCtx, + const NamespaceString& nss) { MONGO_UNREACHABLE; } diff --git a/src/mongo/db/s/catalog_cache_loader_mock.h b/src/mongo/db/s/catalog_cache_loader_mock.h index 0fb16efde8f..18da4671f2b 100644 --- a/src/mongo/db/s/catalog_cache_loader_mock.h +++ b/src/mongo/db/s/catalog_cache_loader_mock.h @@ -51,9 +51,7 @@ public: void onStepDown() override; void onStepUp() override; void notifyOfCollectionVersionUpdate(const NamespaceString& nss) override; - Status waitForCollectionVersion(OperationContext* opCtx, - const NamespaceString& nss, - const ChunkVersion& version) override; + void waitForCollectionFlush(OperationContext* opCtx, const NamespaceString& nss) override; std::shared_ptr<Notification<void>> getChunksSince( const NamespaceString& nss, diff --git a/src/mongo/db/s/force_routing_table_refresh_command.cpp b/src/mongo/db/s/force_routing_table_refresh_command.cpp index 6d144041702..b58cdeb8e0c 100644 --- a/src/mongo/db/s/force_routing_table_refresh_command.cpp +++ b/src/mongo/db/s/force_routing_table_refresh_command.cpp @@ -37,37 +37,25 @@ #include "mongo/db/auth/privilege.h" #include "mongo/db/client.h" #include "mongo/db/commands.h" -#include "mongo/db/db_raii.h" -#include "mongo/db/lasterror.h" #include "mongo/db/operation_context.h" -#include "mongo/db/repl/replication_coordinator_global.h" -#include "mongo/db/s/collection_metadata.h" -#include "mongo/db/s/collection_sharding_state.h" -#include "mongo/db/s/migration_source_manager.h" +#include "mongo/db/repl/repl_client_info.h" #include "mongo/db/s/sharding_state.h" -#include "mongo/db/wire_version.h" -#include "mongo/s/catalog_cache.h" -#include "mongo/s/chunk_version.h" -#include "mongo/s/client/shard_registry.h" +#include "mongo/s/catalog_cache_loader.h" #include "mongo/s/grid.h" #include "mongo/util/log.h" -#include "mongo/util/stringutils.h" namespace mongo { - namespace { -/** - * Takes a single argument, a namespace string, and forces this node to refresh its routing table - * cache entry for that namespace. - */ class ForceRoutingTableRefresh : public BasicCommand { public: ForceRoutingTableRefresh() : BasicCommand("forceRoutingTableRefresh") {} void help(std::stringstream& help) const override { - help << "internal command to force a node to refresh its routing table entry for a " - "namespace"; + help << "Internal command which forces a sharded node to refresh its metadata from the " + "config server and persist it locally only. Behaves like any other write command " + "in that it returns the cluster time of the last metadata write so it can be " + "waited on."; } bool adminOnly() const override { @@ -78,7 +66,7 @@ public: return true; } - virtual bool supportsWriteConcern(const BSONObj& cmd) const override { + bool supportsWriteConcern(const BSONObj& cmd) const override { return false; } @@ -107,27 +95,25 @@ public: bool run(OperationContext* opCtx, const std::string& dbname, const BSONObj& cmdObj, - BSONObjBuilder& result) { - auto shardingState = ShardingState::get(opCtx); + BSONObjBuilder& result) override { + auto const shardingState = ShardingState::get(opCtx); uassertStatusOK(shardingState->canAcceptShardedCommands()); uassert(ErrorCodes::IllegalOperation, - "can't issue forceRoutingTableRefresh from 'eval'", + "Can't issue forceRoutingTableRefresh from 'eval'", !opCtx->getClient()->isInDirectClient()); - NamespaceString nss(parseNs(dbname, cmdObj)); + const NamespaceString nss(parseNs(dbname, cmdObj)); + + LOG(1) << "Forcing routing table refresh for " << nss; - log() << "forcing routing table refresh for " << nss; ChunkVersion unusedShardVersion; - uassertStatusOK( - ShardingState::get(opCtx)->refreshMetadataNow(opCtx, nss, &unusedShardVersion)); + uassertStatusOK(shardingState->refreshMetadataNow(opCtx, nss, &unusedShardVersion)); + + CatalogCacheLoader::get(opCtx).waitForCollectionFlush(opCtx, nss); - auto routingInfo = - uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss)); + repl::ReplClientInfo::forClient(opCtx->getClient()).setLastOpToSystemLastOpTime(opCtx); - const auto collectionVersion = - routingInfo.cm() ? routingInfo.cm()->getVersion() : ChunkVersion::UNSHARDED(); - collectionVersion.appendWithFieldForCommands(&result, "collectionVersion"); return true; } diff --git a/src/mongo/db/s/move_chunk_command.cpp b/src/mongo/db/s/move_chunk_command.cpp index 7dce14247af..1cf01f2702b 100644 --- a/src/mongo/db/s/move_chunk_command.cpp +++ b/src/mongo/db/s/move_chunk_command.cpp @@ -242,25 +242,10 @@ private: auto nss = moveChunkRequest.getNss(); const auto range = ChunkRange(moveChunkRequest.getMinKey(), moveChunkRequest.getMaxKey()); - // Wait for the metadata update to be persisted before scheduling the range deletion. - // - // This is necessary to prevent a race on the secondary because both metadata persistence - // and range deletion is done asynchronously and we must prevent the data deletion from - // being propagated before the metadata update. - ChunkVersion collectionVersion = [&]() { - AutoGetCollection autoColl(opCtx, nss, MODE_IS); - auto metadata = CollectionShardingState::get(opCtx, nss)->getMetadata(); - uassert(ErrorCodes::NamespaceNotSharded, - str::stream() << "Chunk move failed because collection '" << nss.ns() - << "' is no longer sharded.", - metadata); - return metadata->getCollVersion(); - }(); - - // Now schedule the range deletion clean up. + // Wait for the metadata update to be persisted in order to avoid orphaned documents from + // starting to get deleted before the metadata changes have propagated to the secondaries. auto notification = [&] { - uassertStatusOK(CatalogCacheLoader::get(opCtx).waitForCollectionVersion( - opCtx, nss, collectionVersion)); + CatalogCacheLoader::get(opCtx).waitForCollectionFlush(opCtx, nss); auto const whenToClean = moveChunkRequest.getWaitForDelete() ? CollectionShardingState::kNow diff --git a/src/mongo/db/s/read_only_catalog_cache_loader.cpp b/src/mongo/db/s/read_only_catalog_cache_loader.cpp index 46cd792ddb7..1b8175cba41 100644 --- a/src/mongo/db/s/read_only_catalog_cache_loader.cpp +++ b/src/mongo/db/s/read_only_catalog_cache_loader.cpp @@ -30,32 +30,13 @@ #include "mongo/db/s/read_only_catalog_cache_loader.h" -#include "mongo/db/operation_context.h" - namespace mongo { using CollectionAndChangedChunks = CatalogCacheLoader::CollectionAndChangedChunks; -void ReadOnlyCatalogCacheLoader::initializeReplicaSetRole(bool isPrimary) { - return; -} - -void ReadOnlyCatalogCacheLoader::onStepDown() { - return; -} - -void ReadOnlyCatalogCacheLoader::onStepUp() { - return; -} - -void ReadOnlyCatalogCacheLoader::notifyOfCollectionVersionUpdate(const NamespaceString& nss) { - return; -} - -Status ReadOnlyCatalogCacheLoader::waitForCollectionVersion(OperationContext* opCtx, - const NamespaceString& nss, - const ChunkVersion& version) { - return Status::OK(); +void ReadOnlyCatalogCacheLoader::waitForCollectionFlush(OperationContext* opCtx, + const NamespaceString& nss) { + MONGO_UNREACHABLE; } std::shared_ptr<Notification<void>> ReadOnlyCatalogCacheLoader::getChunksSince( diff --git a/src/mongo/db/s/read_only_catalog_cache_loader.h b/src/mongo/db/s/read_only_catalog_cache_loader.h index 6485538c127..eca54264caf 100644 --- a/src/mongo/db/s/read_only_catalog_cache_loader.h +++ b/src/mongo/db/s/read_only_catalog_cache_loader.h @@ -28,7 +28,6 @@ #pragma once -#include "mongo/s/catalog_cache_loader.h" #include "mongo/s/config_server_catalog_cache_loader.h" namespace mongo { @@ -40,16 +39,11 @@ namespace mongo { */ class ReadOnlyCatalogCacheLoader final : public CatalogCacheLoader { public: - /** - * These functions do nothing and simply return. - */ - void initializeReplicaSetRole(bool isPrimary) override; - void onStepDown() override; - void onStepUp() override; - void notifyOfCollectionVersionUpdate(const NamespaceString& nss) override; - Status waitForCollectionVersion(OperationContext* opCtx, - const NamespaceString& nss, - const ChunkVersion& version) override; + void initializeReplicaSetRole(bool isPrimary) override {} + void onStepDown() override {} + void onStepUp() override {} + void notifyOfCollectionVersionUpdate(const NamespaceString& nss) override {} + void waitForCollectionFlush(OperationContext* opCtx, const NamespaceString& nss) override; std::shared_ptr<Notification<void>> getChunksSince( const NamespaceString& nss, diff --git a/src/mongo/db/s/shard_server_catalog_cache_loader.cpp b/src/mongo/db/s/shard_server_catalog_cache_loader.cpp index 83596a1da3c..e935418432a 100644 --- a/src/mongo/db/s/shard_server_catalog_cache_loader.cpp +++ b/src/mongo/db/s/shard_server_catalog_cache_loader.cpp @@ -35,6 +35,7 @@ #include "mongo/db/client.h" #include "mongo/db/operation_context.h" #include "mongo/db/operation_context_group.h" +#include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/s/shard_metadata_util.h" #include "mongo/db/s/sharding_state.h" #include "mongo/s/catalog/type_shard_collection.h" @@ -51,6 +52,8 @@ using CollectionAndChangedChunks = CatalogCacheLoader::CollectionAndChangedChunk namespace { +AtomicUInt64 taskIdGenerator{0}; + /** * Constructs the options for the loader thread pool. */ @@ -232,13 +235,11 @@ StatusWith<CollectionAndChangedChunks> getIncompletePersistedMetadataSinceVersio } /** - * Sends forceRoutingTableRefresh to the primary, to force the primary to refresh its routing table - * entry for 'nss' and to obtain the primary's collectionVersion for 'nss' after the refresh. - * - * Returns the primary's returned collectionVersion for 'nss', or throws on error. + * Sends forceRoutingTableRefresh to the primary to force it to refresh its routing table for + * collection 'nss' and then waits for the refresh to replicate to this node. */ -ChunkVersion forcePrimaryToRefresh(OperationContext* opCtx, const NamespaceString& nss) { - auto shardingState = ShardingState::get(opCtx); +void forcePrimaryRefreshAndWaitForReplication(OperationContext* opCtx, const NamespaceString& nss) { + auto const shardingState = ShardingState::get(opCtx); invariant(shardingState->enabled()); auto selfShard = uassertStatusOK( @@ -251,10 +252,11 @@ ChunkVersion forcePrimaryToRefresh(OperationContext* opCtx, const NamespaceStrin BSON("forceRoutingTableRefresh" << nss.ns()), Seconds{30}, Shard::RetryPolicy::kIdempotent)); + uassertStatusOK(cmdResponse.commandStatus); - return uassertStatusOK( - ChunkVersion::parseFromBSONWithFieldForCommands(cmdResponse.response, "collectionVersion")); + uassertStatusOK(repl::ReplicationCoordinator::get(opCtx)->waitUntilOpTimeForRead( + opCtx, {LogicalTime::fromOperationTime(cmdResponse.response), boost::none})); } /** @@ -285,36 +287,10 @@ ShardServerCatalogCacheLoader::~ShardServerCatalogCacheLoader() { invariant(_contexts.isEmpty()); } -void ShardServerCatalogCacheLoader::setForTesting() { - _testing = true; -} - void ShardServerCatalogCacheLoader::notifyOfCollectionVersionUpdate(const NamespaceString& nss) { _namespaceNotifications.notifyChange(nss); } -Status ShardServerCatalogCacheLoader::waitForCollectionVersion(OperationContext* opCtx, - const NamespaceString& nss, - const ChunkVersion& version) { - invariant(!opCtx->lockState()->isLocked()); - while (true) { - auto scopedNotification = _namespaceNotifications.createNotification(nss); - - auto swRefreshState = getPersistedRefreshFlags(opCtx, nss); - if (!swRefreshState.isOK()) { - return swRefreshState.getStatus(); - } - RefreshState refreshState = swRefreshState.getValue(); - - if (refreshState.lastRefreshedCollectionVersion.epoch() != version.epoch() || - refreshState.lastRefreshedCollectionVersion >= version) { - return Status::OK(); - } - - scopedNotification.get(opCtx); - } -} - void ShardServerCatalogCacheLoader::initializeReplicaSetRole(bool isPrimary) { stdx::lock_guard<stdx::mutex> lock(_mutex); invariant(_role == ReplicaSetRole::None); @@ -379,12 +355,62 @@ std::shared_ptr<Notification<void>> ShardServerCatalogCacheLoader::getChunksSinc return notify; } +void ShardServerCatalogCacheLoader::waitForCollectionFlush(OperationContext* opCtx, + const NamespaceString& nss) { + stdx::unique_lock<stdx::mutex> lg(_mutex); + const auto initialTerm = _term; + + boost::optional<uint64_t> taskNumToWait; + + while (true) { + uassert(ErrorCodes::NotMaster, + str::stream() << "Unable to wait for collection metadata flush for " << nss.ns() + << " because the node's replication role changed.", + _role == ReplicaSetRole::Primary && _term == initialTerm); + + auto it = _taskLists.find(nss); + + // If there are no tasks for the specified namespace, everything must have been completed + if (it == _taskLists.end()) + return; + + auto& taskList = it->second; + + if (!taskNumToWait) { + const auto& lastTask = taskList.back(); + taskNumToWait = lastTask.taskNum; + } else { + const auto& activeTask = taskList.front(); + + if (activeTask.taskNum > *taskNumToWait) { + auto secondTaskIt = std::next(taskList.begin()); + + // Because of an optimization where a namespace drop clears all tasks except the + // active it is possible that the task number we are waiting on will never actually + // be written. Because of this we move the task number to the drop which can only be + // in the active task or in the one after the active. + if (activeTask.dropped) { + taskNumToWait = activeTask.taskNum; + } else if (secondTaskIt != taskList.end() && secondTaskIt->dropped) { + taskNumToWait = secondTaskIt->taskNum; + } else { + return; + } + } + } + + // It is not safe to use taskList after this call, because it will unlock and lock the tasks + // mutex, so we just loop around. + taskList.waitForActiveTaskCompletion(lg); + } +} + void ShardServerCatalogCacheLoader::_runSecondaryGetChunksSince( OperationContext* opCtx, const NamespaceString& nss, const ChunkVersion& catalogCacheSinceVersion, stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn) { - _forcePrimaryRefreshAndWaitForReplication(opCtx, nss); + forcePrimaryRefreshAndWaitForReplication(opCtx, nss); // Read the local metadata. auto swCollAndChunks = @@ -392,61 +418,6 @@ void ShardServerCatalogCacheLoader::_runSecondaryGetChunksSince( callbackFn(opCtx, std::move(swCollAndChunks)); } -/** - * "Waiting for replication" by waiting to see a local version equal or greater to the primary's - * collectionVersion is not so straightforward. A few key insights: - * - * 1) ChunkVersions are ordered, so within an epoch, we can wait for a particular ChunkVersion. - * - * 2) Epochs are not ordered. If we are waiting for epochB and see epochA locally, we can't know if - * the update for epochB already replicated or has yet to replicate. - * - * To deal with this, on seeing epochA, we wait for one update. If we are now in epochB (e.g., if - * epochA was UNSHARDED) we continue waiting for updates until our version meets or exceeds the - * primary's. Otherwise, we throw an error. A caller can retry, which will cause us to ask the - * primary for a new collectionVersion to wait for. If we were behind, we continue waiting; if we - * were ahead, we now have a new target. - * - * This only occurs if collections are being created, sharded, and dropped quickly. - * - * 3) Unsharded collections do not have epochs at all. A unique identifier for all collections, - * including unsharded, will be introduced in 3.6. Until then, we cannot differentiate between - * different incarnations of unsharded collections of the same name. - * - * We do not deal with this at all. We report that we are "up to date" even if we are at an - * earlier incarnation of the unsharded collection. - */ -void ShardServerCatalogCacheLoader::_forcePrimaryRefreshAndWaitForReplication( - OperationContext* opCtx, const NamespaceString& nss) { - // Start listening for metadata updates before obtaining the primary's version, in case we - // replicate an epoch change past the primary's version before reading locally. - boost::optional<NamespaceMetadataChangeNotifications::ScopedNotification> notif( - _namespaceNotifications.createNotification(nss)); - - auto primaryVersion = forcePrimaryToRefresh(opCtx, nss); - - bool waitedForUpdate = false; - while (true) { - auto secondaryVersion = getLocalVersion(opCtx, nss); - - if (secondaryVersion.hasEqualEpoch(primaryVersion) && secondaryVersion >= primaryVersion) { - return; - } - - if (waitedForUpdate) { - // If we still aren't in the primary's epoch, throw. - uassert(ErrorCodes::ConflictingOperationInProgress, - "The collection has recently been dropped and recreated", - secondaryVersion.epoch() == primaryVersion.epoch()); - } - - // Wait for a chunk metadata update (either ChunkVersion increment or epoch change). - notif->get(opCtx); - notif.emplace(_namespaceNotifications.createNotification(nss)); - waitedForUpdate = true; - } -} - void ShardServerCatalogCacheLoader::_schedulePrimaryGetChunksSince( OperationContext* opCtx, const NamespaceString& nss, @@ -694,11 +665,7 @@ void ShardServerCatalogCacheLoader::_runTasks(const NamespaceString& nss) { // If task completed successfully, remove it from work queue if (taskFinished) { - _taskLists[nss].removeActiveTask(); - } - - if (_testing) { - notifyOfCollectionVersionUpdate(nss); + _taskLists[nss].pop_front(); } // Schedule more work if there is any @@ -720,7 +687,7 @@ void ShardServerCatalogCacheLoader::_updatePersistedMetadata(OperationContext* o const NamespaceString& nss) { stdx::unique_lock<stdx::mutex> lock(_mutex); - const Task& task = _taskLists[nss].getActiveTask(); + const Task& task = _taskLists[nss].front(); invariant(task.dropped || !task.collectionAndChangedChunks->changedChunks.empty()); // If this task is from an old term and no longer valid, do not execute and return true so that @@ -830,7 +797,9 @@ ShardServerCatalogCacheLoader::Task::Task( StatusWith<CollectionAndChangedChunks> statusWithCollectionAndChangedChunks, ChunkVersion minimumQueryVersion, long long currentTerm) - : minQueryVersion(minimumQueryVersion), termCreated(currentTerm) { + : taskNum(taskIdGenerator.fetchAndAdd(1)), + minQueryVersion(minimumQueryVersion), + termCreated(currentTerm) { if (statusWithCollectionAndChangedChunks.isOK()) { collectionAndChangedChunks = statusWithCollectionAndChangedChunks.getValue(); invariant(!collectionAndChangedChunks->changedChunks.empty()); @@ -842,6 +811,9 @@ ShardServerCatalogCacheLoader::Task::Task( } } +ShardServerCatalogCacheLoader::TaskList::TaskList() + : _activeTaskCompletedCondVar(std::make_shared<stdx::condition_variable>()) {} + void ShardServerCatalogCacheLoader::TaskList::addTask(Task task) { if (_tasks.empty()) { _tasks.emplace_back(std::move(task)); @@ -870,15 +842,18 @@ void ShardServerCatalogCacheLoader::TaskList::addTask(Task task) { } } -const ShardServerCatalogCacheLoader::Task& ShardServerCatalogCacheLoader::TaskList::getActiveTask() - const { +void ShardServerCatalogCacheLoader::TaskList::pop_front() { invariant(!_tasks.empty()); - return _tasks.front(); + _tasks.pop_front(); + _activeTaskCompletedCondVar->notify_all(); } -void ShardServerCatalogCacheLoader::TaskList::removeActiveTask() { - invariant(!_tasks.empty()); - _tasks.pop_front(); +void ShardServerCatalogCacheLoader::TaskList::waitForActiveTaskCompletion( + stdx::unique_lock<stdx::mutex>& lg) { + // Increase the use_count of the condition variable shared pointer, because the entire task list + // might get deleted during the unlocked interval + auto condVar = _activeTaskCompletedCondVar; + condVar->wait(lg); } bool ShardServerCatalogCacheLoader::TaskList::hasTasksFromThisTerm(long long term) const { diff --git a/src/mongo/db/s/shard_server_catalog_cache_loader.h b/src/mongo/db/s/shard_server_catalog_cache_loader.h index 257791be7d0..5212dab12fa 100644 --- a/src/mongo/db/s/shard_server_catalog_cache_loader.h +++ b/src/mongo/db/s/shard_server_catalog_cache_loader.h @@ -31,6 +31,7 @@ #include "mongo/db/operation_context_group.h" #include "mongo/db/s/namespace_metadata_change_notifications.h" #include "mongo/s/catalog_cache_loader.h" +#include "mongo/stdx/condition_variable.h" #include "mongo/util/concurrency/thread_pool.h" namespace mongo { @@ -51,15 +52,6 @@ public: ~ShardServerCatalogCacheLoader(); /** - * For testing use only. - * - * Currently this only sets a boolean such that after metadata updates the notification system - * is signaled internally, rather than depending on the OpObservers which are not connectted for - * unit testing. - */ - void setForTesting(); - - /** * Initializes internal state so that the loader behaves as a primary or secondary. This can * only be called once, when the sharding state is initialized. */ @@ -82,23 +74,6 @@ public: void notifyOfCollectionVersionUpdate(const NamespaceString& nss) override; /** - * Waits for the persisted collection version to be gte to 'version', or an epoch change. Only - * call this function if you KNOW that a version gte WILL eventually be persisted. - * - * This function cannot wait for a version if nothing is persisted because a collection can - * become unsharded after we start waiting and 'version' will then never be reached. If 'nss' - * has no persisted metadata, even if it will shortly, a NamespaceNotFound error will be - * returned. - * - * A lock must not be held when calling this because it would prevent using the latest snapshot - * and actually seeing the change after it arrives. - * This function can throw a DBException if the opCtx is interrupted. - */ - Status waitForCollectionVersion(OperationContext* opCtx, - const NamespaceString& nss, - const ChunkVersion& version) override; - - /** * This must be called serially, never in parallel, including waiting for the returned * Notification to be signalled. * @@ -114,6 +89,8 @@ public: stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn) override; + void waitForCollectionFlush(OperationContext* opCtx, const NamespaceString& nss) override; + private: // Differentiates the server's role in the replica set so that the chunk loader knows whether to // load metadata locally or remotely. @@ -145,6 +122,9 @@ private: ChunkVersion minimumQueryVersion, long long currentTerm); + // Always-incrementing task number to uniquely identify different tasks + uint64_t taskNum; + // Chunks and Collection updates to be applied to the shard persisted metadata store. boost::optional<CollectionAndChangedChunks> collectionAndChangedChunks{boost::none}; @@ -177,6 +157,8 @@ private: */ class TaskList { public: + TaskList(); + /** * Adds 'task' to the back of the 'tasks' list. * @@ -186,24 +168,43 @@ private: */ void addTask(Task task); - /** - * Returns the front of the 'tasks' list. Invariants if 'tasks' is empty. - */ - const Task& getActiveTask() const; + auto& front() { + invariant(!_tasks.empty()); + return _tasks.front(); + } - /** - * Erases the current active task and updates 'activeTask' to the next task in 'tasks'. - */ - void removeActiveTask(); + auto& back() { + invariant(!_tasks.empty()); + return _tasks.back(); + } - /** - * Checks whether there are any tasks left. - */ - const bool empty() { + auto begin() { + invariant(!_tasks.empty()); + return _tasks.begin(); + } + + auto end() { + invariant(!_tasks.empty()); + return _tasks.end(); + } + + void pop_front(); + + bool empty() const { return _tasks.empty(); } /** + * Must only be called if there is an active task. Behaves like a condition variable and + * will be signaled when the active task has been completed. + * + * NOTE: Because this call unlocks and locks the provided mutex, it is not safe to use the + * same task object on which it was called because it might have been deleted during the + * unlocked period. + */ + void waitForActiveTaskCompletion(stdx::unique_lock<stdx::mutex>& lg); + + /** * Checks whether 'term' matches the term of the latest task in the task list. This is * useful to check whether the task list has outdated data that's no longer valid to use in * the current/new term specified by 'term'. @@ -223,6 +224,10 @@ private: private: std::list<Task> _tasks{}; + + // Condition variable which will be signaled whenever the active task from the tasks list is + // completed. Must be used in conjunction with the loader's mutex. + std::shared_ptr<stdx::condition_variable> _activeTaskCompletedCondVar; }; typedef std::map<NamespaceString, TaskList> TaskLists; @@ -240,17 +245,6 @@ private: stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn); /** - * Forces the primary to refresh its chunk metadata for 'nss' and obtain's the primary's - * collectionVersion after the refresh. - * - * Then waits until it has replicated chunk metadata up to at least that collectionVersion. - * - * Throws on error. - */ - void _forcePrimaryRefreshAndWaitForReplication(OperationContext* opCtx, - const NamespaceString& nss); - - /** * Refreshes chunk metadata from the config server's metadata store, and schedules maintenance * of the shard's persisted metadata store with the latest updates retrieved from the config * server. @@ -362,9 +356,6 @@ private: // The collection of operation contexts in use by all threads. OperationContextGroup _contexts; - - // Gates additional actions needed when testing. - bool _testing{false}; }; } // namespace mongo diff --git a/src/mongo/db/s/shard_server_catalog_cache_loader_test.cpp b/src/mongo/db/s/shard_server_catalog_cache_loader_test.cpp index 1680f999a01..b681f0e30f5 100644 --- a/src/mongo/db/s/shard_server_catalog_cache_loader_test.cpp +++ b/src/mongo/db/s/shard_server_catalog_cache_loader_test.cpp @@ -104,7 +104,6 @@ void ShardServerCatalogCacheLoaderTest::setUp() { // Set the shard loader to primary mode, and set it for testing. _shardLoader->initializeReplicaSetRole(true); - _shardLoader->setForTesting(); } void ShardServerCatalogCacheLoaderTest::tearDown() { @@ -407,8 +406,7 @@ TEST_F(ShardServerCatalogCacheLoaderTest, PrimaryLoadFromShardedAndFindDiffReque notification->get(); // Wait for persistence of update - ASSERT_OK(_shardLoader->waitForCollectionVersion( - operationContext(), kNss, updatedChunksDiff.back().getVersion())); + _shardLoader->waitForCollectionFlush(operationContext(), kNss); // Set up the remote loader to return a single document we've already seen, indicating no change // occurred. |