summaryrefslogtreecommitdiff
path: root/src/mongo/db/s
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2017-08-01 18:55:52 -0400
committerKaloian Manassiev <kaloian.manassiev@mongodb.com>2017-08-10 12:49:33 -0400
commit7ce57a44876cfedcedaf2dd9896817a2b021df66 (patch)
tree04f8e498f16db52da80a74435049a0f0995de970 /src/mongo/db/s
parenta14097396cba4366d159dde303a6d4af130e781f (diff)
downloadmongo-7ce57a44876cfedcedaf2dd9896817a2b021df66.tar.gz
SERVER-30147 Add ability to wait on collection metadata flush
Diffstat (limited to 'src/mongo/db/s')
-rw-r--r--src/mongo/db/s/catalog_cache_loader_mock.cpp5
-rw-r--r--src/mongo/db/s/catalog_cache_loader_mock.h4
-rw-r--r--src/mongo/db/s/force_routing_table_refresh_command.cpp48
-rw-r--r--src/mongo/db/s/move_chunk_command.cpp21
-rw-r--r--src/mongo/db/s/read_only_catalog_cache_loader.cpp25
-rw-r--r--src/mongo/db/s/read_only_catalog_cache_loader.h16
-rw-r--r--src/mongo/db/s/shard_server_catalog_cache_loader.cpp181
-rw-r--r--src/mongo/db/s/shard_server_catalog_cache_loader.h95
-rw-r--r--src/mongo/db/s/shard_server_catalog_cache_loader_test.cpp4
9 files changed, 153 insertions, 246 deletions
diff --git a/src/mongo/db/s/catalog_cache_loader_mock.cpp b/src/mongo/db/s/catalog_cache_loader_mock.cpp
index d5dbe15c50e..4dc0f775fab 100644
--- a/src/mongo/db/s/catalog_cache_loader_mock.cpp
+++ b/src/mongo/db/s/catalog_cache_loader_mock.cpp
@@ -85,9 +85,8 @@ void CatalogCacheLoaderMock::notifyOfCollectionVersionUpdate(const NamespaceStri
MONGO_UNREACHABLE;
}
-Status CatalogCacheLoaderMock::waitForCollectionVersion(OperationContext* opCtx,
- const NamespaceString& nss,
- const ChunkVersion& version) {
+void CatalogCacheLoaderMock::waitForCollectionFlush(OperationContext* opCtx,
+ const NamespaceString& nss) {
MONGO_UNREACHABLE;
}
diff --git a/src/mongo/db/s/catalog_cache_loader_mock.h b/src/mongo/db/s/catalog_cache_loader_mock.h
index 0fb16efde8f..18da4671f2b 100644
--- a/src/mongo/db/s/catalog_cache_loader_mock.h
+++ b/src/mongo/db/s/catalog_cache_loader_mock.h
@@ -51,9 +51,7 @@ public:
void onStepDown() override;
void onStepUp() override;
void notifyOfCollectionVersionUpdate(const NamespaceString& nss) override;
- Status waitForCollectionVersion(OperationContext* opCtx,
- const NamespaceString& nss,
- const ChunkVersion& version) override;
+ void waitForCollectionFlush(OperationContext* opCtx, const NamespaceString& nss) override;
std::shared_ptr<Notification<void>> getChunksSince(
const NamespaceString& nss,
diff --git a/src/mongo/db/s/force_routing_table_refresh_command.cpp b/src/mongo/db/s/force_routing_table_refresh_command.cpp
index 6d144041702..b58cdeb8e0c 100644
--- a/src/mongo/db/s/force_routing_table_refresh_command.cpp
+++ b/src/mongo/db/s/force_routing_table_refresh_command.cpp
@@ -37,37 +37,25 @@
#include "mongo/db/auth/privilege.h"
#include "mongo/db/client.h"
#include "mongo/db/commands.h"
-#include "mongo/db/db_raii.h"
-#include "mongo/db/lasterror.h"
#include "mongo/db/operation_context.h"
-#include "mongo/db/repl/replication_coordinator_global.h"
-#include "mongo/db/s/collection_metadata.h"
-#include "mongo/db/s/collection_sharding_state.h"
-#include "mongo/db/s/migration_source_manager.h"
+#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/s/sharding_state.h"
-#include "mongo/db/wire_version.h"
-#include "mongo/s/catalog_cache.h"
-#include "mongo/s/chunk_version.h"
-#include "mongo/s/client/shard_registry.h"
+#include "mongo/s/catalog_cache_loader.h"
#include "mongo/s/grid.h"
#include "mongo/util/log.h"
-#include "mongo/util/stringutils.h"
namespace mongo {
-
namespace {
-/**
- * Takes a single argument, a namespace string, and forces this node to refresh its routing table
- * cache entry for that namespace.
- */
class ForceRoutingTableRefresh : public BasicCommand {
public:
ForceRoutingTableRefresh() : BasicCommand("forceRoutingTableRefresh") {}
void help(std::stringstream& help) const override {
- help << "internal command to force a node to refresh its routing table entry for a "
- "namespace";
+ help << "Internal command which forces a sharded node to refresh its metadata from the "
+ "config server and persist it locally only. Behaves like any other write command "
+ "in that it returns the cluster time of the last metadata write so it can be "
+ "waited on.";
}
bool adminOnly() const override {
@@ -78,7 +66,7 @@ public:
return true;
}
- virtual bool supportsWriteConcern(const BSONObj& cmd) const override {
+ bool supportsWriteConcern(const BSONObj& cmd) const override {
return false;
}
@@ -107,27 +95,25 @@ public:
bool run(OperationContext* opCtx,
const std::string& dbname,
const BSONObj& cmdObj,
- BSONObjBuilder& result) {
- auto shardingState = ShardingState::get(opCtx);
+ BSONObjBuilder& result) override {
+ auto const shardingState = ShardingState::get(opCtx);
uassertStatusOK(shardingState->canAcceptShardedCommands());
uassert(ErrorCodes::IllegalOperation,
- "can't issue forceRoutingTableRefresh from 'eval'",
+ "Can't issue forceRoutingTableRefresh from 'eval'",
!opCtx->getClient()->isInDirectClient());
- NamespaceString nss(parseNs(dbname, cmdObj));
+ const NamespaceString nss(parseNs(dbname, cmdObj));
+
+ LOG(1) << "Forcing routing table refresh for " << nss;
- log() << "forcing routing table refresh for " << nss;
ChunkVersion unusedShardVersion;
- uassertStatusOK(
- ShardingState::get(opCtx)->refreshMetadataNow(opCtx, nss, &unusedShardVersion));
+ uassertStatusOK(shardingState->refreshMetadataNow(opCtx, nss, &unusedShardVersion));
+
+ CatalogCacheLoader::get(opCtx).waitForCollectionFlush(opCtx, nss);
- auto routingInfo =
- uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss));
+ repl::ReplClientInfo::forClient(opCtx->getClient()).setLastOpToSystemLastOpTime(opCtx);
- const auto collectionVersion =
- routingInfo.cm() ? routingInfo.cm()->getVersion() : ChunkVersion::UNSHARDED();
- collectionVersion.appendWithFieldForCommands(&result, "collectionVersion");
return true;
}
diff --git a/src/mongo/db/s/move_chunk_command.cpp b/src/mongo/db/s/move_chunk_command.cpp
index 7dce14247af..1cf01f2702b 100644
--- a/src/mongo/db/s/move_chunk_command.cpp
+++ b/src/mongo/db/s/move_chunk_command.cpp
@@ -242,25 +242,10 @@ private:
auto nss = moveChunkRequest.getNss();
const auto range = ChunkRange(moveChunkRequest.getMinKey(), moveChunkRequest.getMaxKey());
- // Wait for the metadata update to be persisted before scheduling the range deletion.
- //
- // This is necessary to prevent a race on the secondary because both metadata persistence
- // and range deletion is done asynchronously and we must prevent the data deletion from
- // being propagated before the metadata update.
- ChunkVersion collectionVersion = [&]() {
- AutoGetCollection autoColl(opCtx, nss, MODE_IS);
- auto metadata = CollectionShardingState::get(opCtx, nss)->getMetadata();
- uassert(ErrorCodes::NamespaceNotSharded,
- str::stream() << "Chunk move failed because collection '" << nss.ns()
- << "' is no longer sharded.",
- metadata);
- return metadata->getCollVersion();
- }();
-
- // Now schedule the range deletion clean up.
+ // Wait for the metadata update to be persisted in order to avoid orphaned documents from
+ // starting to get deleted before the metadata changes have propagated to the secondaries.
auto notification = [&] {
- uassertStatusOK(CatalogCacheLoader::get(opCtx).waitForCollectionVersion(
- opCtx, nss, collectionVersion));
+ CatalogCacheLoader::get(opCtx).waitForCollectionFlush(opCtx, nss);
auto const whenToClean = moveChunkRequest.getWaitForDelete()
? CollectionShardingState::kNow
diff --git a/src/mongo/db/s/read_only_catalog_cache_loader.cpp b/src/mongo/db/s/read_only_catalog_cache_loader.cpp
index 46cd792ddb7..1b8175cba41 100644
--- a/src/mongo/db/s/read_only_catalog_cache_loader.cpp
+++ b/src/mongo/db/s/read_only_catalog_cache_loader.cpp
@@ -30,32 +30,13 @@
#include "mongo/db/s/read_only_catalog_cache_loader.h"
-#include "mongo/db/operation_context.h"
-
namespace mongo {
using CollectionAndChangedChunks = CatalogCacheLoader::CollectionAndChangedChunks;
-void ReadOnlyCatalogCacheLoader::initializeReplicaSetRole(bool isPrimary) {
- return;
-}
-
-void ReadOnlyCatalogCacheLoader::onStepDown() {
- return;
-}
-
-void ReadOnlyCatalogCacheLoader::onStepUp() {
- return;
-}
-
-void ReadOnlyCatalogCacheLoader::notifyOfCollectionVersionUpdate(const NamespaceString& nss) {
- return;
-}
-
-Status ReadOnlyCatalogCacheLoader::waitForCollectionVersion(OperationContext* opCtx,
- const NamespaceString& nss,
- const ChunkVersion& version) {
- return Status::OK();
+void ReadOnlyCatalogCacheLoader::waitForCollectionFlush(OperationContext* opCtx,
+ const NamespaceString& nss) {
+ MONGO_UNREACHABLE;
}
std::shared_ptr<Notification<void>> ReadOnlyCatalogCacheLoader::getChunksSince(
diff --git a/src/mongo/db/s/read_only_catalog_cache_loader.h b/src/mongo/db/s/read_only_catalog_cache_loader.h
index 6485538c127..eca54264caf 100644
--- a/src/mongo/db/s/read_only_catalog_cache_loader.h
+++ b/src/mongo/db/s/read_only_catalog_cache_loader.h
@@ -28,7 +28,6 @@
#pragma once
-#include "mongo/s/catalog_cache_loader.h"
#include "mongo/s/config_server_catalog_cache_loader.h"
namespace mongo {
@@ -40,16 +39,11 @@ namespace mongo {
*/
class ReadOnlyCatalogCacheLoader final : public CatalogCacheLoader {
public:
- /**
- * These functions do nothing and simply return.
- */
- void initializeReplicaSetRole(bool isPrimary) override;
- void onStepDown() override;
- void onStepUp() override;
- void notifyOfCollectionVersionUpdate(const NamespaceString& nss) override;
- Status waitForCollectionVersion(OperationContext* opCtx,
- const NamespaceString& nss,
- const ChunkVersion& version) override;
+ void initializeReplicaSetRole(bool isPrimary) override {}
+ void onStepDown() override {}
+ void onStepUp() override {}
+ void notifyOfCollectionVersionUpdate(const NamespaceString& nss) override {}
+ void waitForCollectionFlush(OperationContext* opCtx, const NamespaceString& nss) override;
std::shared_ptr<Notification<void>> getChunksSince(
const NamespaceString& nss,
diff --git a/src/mongo/db/s/shard_server_catalog_cache_loader.cpp b/src/mongo/db/s/shard_server_catalog_cache_loader.cpp
index 83596a1da3c..e935418432a 100644
--- a/src/mongo/db/s/shard_server_catalog_cache_loader.cpp
+++ b/src/mongo/db/s/shard_server_catalog_cache_loader.cpp
@@ -35,6 +35,7 @@
#include "mongo/db/client.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/operation_context_group.h"
+#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/s/shard_metadata_util.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/s/catalog/type_shard_collection.h"
@@ -51,6 +52,8 @@ using CollectionAndChangedChunks = CatalogCacheLoader::CollectionAndChangedChunk
namespace {
+AtomicUInt64 taskIdGenerator{0};
+
/**
* Constructs the options for the loader thread pool.
*/
@@ -232,13 +235,11 @@ StatusWith<CollectionAndChangedChunks> getIncompletePersistedMetadataSinceVersio
}
/**
- * Sends forceRoutingTableRefresh to the primary, to force the primary to refresh its routing table
- * entry for 'nss' and to obtain the primary's collectionVersion for 'nss' after the refresh.
- *
- * Returns the primary's returned collectionVersion for 'nss', or throws on error.
+ * Sends forceRoutingTableRefresh to the primary to force it to refresh its routing table for
+ * collection 'nss' and then waits for the refresh to replicate to this node.
*/
-ChunkVersion forcePrimaryToRefresh(OperationContext* opCtx, const NamespaceString& nss) {
- auto shardingState = ShardingState::get(opCtx);
+void forcePrimaryRefreshAndWaitForReplication(OperationContext* opCtx, const NamespaceString& nss) {
+ auto const shardingState = ShardingState::get(opCtx);
invariant(shardingState->enabled());
auto selfShard = uassertStatusOK(
@@ -251,10 +252,11 @@ ChunkVersion forcePrimaryToRefresh(OperationContext* opCtx, const NamespaceStrin
BSON("forceRoutingTableRefresh" << nss.ns()),
Seconds{30},
Shard::RetryPolicy::kIdempotent));
+
uassertStatusOK(cmdResponse.commandStatus);
- return uassertStatusOK(
- ChunkVersion::parseFromBSONWithFieldForCommands(cmdResponse.response, "collectionVersion"));
+ uassertStatusOK(repl::ReplicationCoordinator::get(opCtx)->waitUntilOpTimeForRead(
+ opCtx, {LogicalTime::fromOperationTime(cmdResponse.response), boost::none}));
}
/**
@@ -285,36 +287,10 @@ ShardServerCatalogCacheLoader::~ShardServerCatalogCacheLoader() {
invariant(_contexts.isEmpty());
}
-void ShardServerCatalogCacheLoader::setForTesting() {
- _testing = true;
-}
-
void ShardServerCatalogCacheLoader::notifyOfCollectionVersionUpdate(const NamespaceString& nss) {
_namespaceNotifications.notifyChange(nss);
}
-Status ShardServerCatalogCacheLoader::waitForCollectionVersion(OperationContext* opCtx,
- const NamespaceString& nss,
- const ChunkVersion& version) {
- invariant(!opCtx->lockState()->isLocked());
- while (true) {
- auto scopedNotification = _namespaceNotifications.createNotification(nss);
-
- auto swRefreshState = getPersistedRefreshFlags(opCtx, nss);
- if (!swRefreshState.isOK()) {
- return swRefreshState.getStatus();
- }
- RefreshState refreshState = swRefreshState.getValue();
-
- if (refreshState.lastRefreshedCollectionVersion.epoch() != version.epoch() ||
- refreshState.lastRefreshedCollectionVersion >= version) {
- return Status::OK();
- }
-
- scopedNotification.get(opCtx);
- }
-}
-
void ShardServerCatalogCacheLoader::initializeReplicaSetRole(bool isPrimary) {
stdx::lock_guard<stdx::mutex> lock(_mutex);
invariant(_role == ReplicaSetRole::None);
@@ -379,12 +355,62 @@ std::shared_ptr<Notification<void>> ShardServerCatalogCacheLoader::getChunksSinc
return notify;
}
+void ShardServerCatalogCacheLoader::waitForCollectionFlush(OperationContext* opCtx,
+ const NamespaceString& nss) {
+ stdx::unique_lock<stdx::mutex> lg(_mutex);
+ const auto initialTerm = _term;
+
+ boost::optional<uint64_t> taskNumToWait;
+
+ while (true) {
+ uassert(ErrorCodes::NotMaster,
+ str::stream() << "Unable to wait for collection metadata flush for " << nss.ns()
+ << " because the node's replication role changed.",
+ _role == ReplicaSetRole::Primary && _term == initialTerm);
+
+ auto it = _taskLists.find(nss);
+
+ // If there are no tasks for the specified namespace, everything must have been completed
+ if (it == _taskLists.end())
+ return;
+
+ auto& taskList = it->second;
+
+ if (!taskNumToWait) {
+ const auto& lastTask = taskList.back();
+ taskNumToWait = lastTask.taskNum;
+ } else {
+ const auto& activeTask = taskList.front();
+
+ if (activeTask.taskNum > *taskNumToWait) {
+ auto secondTaskIt = std::next(taskList.begin());
+
+ // Because of an optimization where a namespace drop clears all tasks except the
+ // active it is possible that the task number we are waiting on will never actually
+ // be written. Because of this we move the task number to the drop which can only be
+ // in the active task or in the one after the active.
+ if (activeTask.dropped) {
+ taskNumToWait = activeTask.taskNum;
+ } else if (secondTaskIt != taskList.end() && secondTaskIt->dropped) {
+ taskNumToWait = secondTaskIt->taskNum;
+ } else {
+ return;
+ }
+ }
+ }
+
+ // It is not safe to use taskList after this call, because it will unlock and lock the tasks
+ // mutex, so we just loop around.
+ taskList.waitForActiveTaskCompletion(lg);
+ }
+}
+
void ShardServerCatalogCacheLoader::_runSecondaryGetChunksSince(
OperationContext* opCtx,
const NamespaceString& nss,
const ChunkVersion& catalogCacheSinceVersion,
stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn) {
- _forcePrimaryRefreshAndWaitForReplication(opCtx, nss);
+ forcePrimaryRefreshAndWaitForReplication(opCtx, nss);
// Read the local metadata.
auto swCollAndChunks =
@@ -392,61 +418,6 @@ void ShardServerCatalogCacheLoader::_runSecondaryGetChunksSince(
callbackFn(opCtx, std::move(swCollAndChunks));
}
-/**
- * "Waiting for replication" by waiting to see a local version equal or greater to the primary's
- * collectionVersion is not so straightforward. A few key insights:
- *
- * 1) ChunkVersions are ordered, so within an epoch, we can wait for a particular ChunkVersion.
- *
- * 2) Epochs are not ordered. If we are waiting for epochB and see epochA locally, we can't know if
- * the update for epochB already replicated or has yet to replicate.
- *
- * To deal with this, on seeing epochA, we wait for one update. If we are now in epochB (e.g., if
- * epochA was UNSHARDED) we continue waiting for updates until our version meets or exceeds the
- * primary's. Otherwise, we throw an error. A caller can retry, which will cause us to ask the
- * primary for a new collectionVersion to wait for. If we were behind, we continue waiting; if we
- * were ahead, we now have a new target.
- *
- * This only occurs if collections are being created, sharded, and dropped quickly.
- *
- * 3) Unsharded collections do not have epochs at all. A unique identifier for all collections,
- * including unsharded, will be introduced in 3.6. Until then, we cannot differentiate between
- * different incarnations of unsharded collections of the same name.
- *
- * We do not deal with this at all. We report that we are "up to date" even if we are at an
- * earlier incarnation of the unsharded collection.
- */
-void ShardServerCatalogCacheLoader::_forcePrimaryRefreshAndWaitForReplication(
- OperationContext* opCtx, const NamespaceString& nss) {
- // Start listening for metadata updates before obtaining the primary's version, in case we
- // replicate an epoch change past the primary's version before reading locally.
- boost::optional<NamespaceMetadataChangeNotifications::ScopedNotification> notif(
- _namespaceNotifications.createNotification(nss));
-
- auto primaryVersion = forcePrimaryToRefresh(opCtx, nss);
-
- bool waitedForUpdate = false;
- while (true) {
- auto secondaryVersion = getLocalVersion(opCtx, nss);
-
- if (secondaryVersion.hasEqualEpoch(primaryVersion) && secondaryVersion >= primaryVersion) {
- return;
- }
-
- if (waitedForUpdate) {
- // If we still aren't in the primary's epoch, throw.
- uassert(ErrorCodes::ConflictingOperationInProgress,
- "The collection has recently been dropped and recreated",
- secondaryVersion.epoch() == primaryVersion.epoch());
- }
-
- // Wait for a chunk metadata update (either ChunkVersion increment or epoch change).
- notif->get(opCtx);
- notif.emplace(_namespaceNotifications.createNotification(nss));
- waitedForUpdate = true;
- }
-}
-
void ShardServerCatalogCacheLoader::_schedulePrimaryGetChunksSince(
OperationContext* opCtx,
const NamespaceString& nss,
@@ -694,11 +665,7 @@ void ShardServerCatalogCacheLoader::_runTasks(const NamespaceString& nss) {
// If task completed successfully, remove it from work queue
if (taskFinished) {
- _taskLists[nss].removeActiveTask();
- }
-
- if (_testing) {
- notifyOfCollectionVersionUpdate(nss);
+ _taskLists[nss].pop_front();
}
// Schedule more work if there is any
@@ -720,7 +687,7 @@ void ShardServerCatalogCacheLoader::_updatePersistedMetadata(OperationContext* o
const NamespaceString& nss) {
stdx::unique_lock<stdx::mutex> lock(_mutex);
- const Task& task = _taskLists[nss].getActiveTask();
+ const Task& task = _taskLists[nss].front();
invariant(task.dropped || !task.collectionAndChangedChunks->changedChunks.empty());
// If this task is from an old term and no longer valid, do not execute and return true so that
@@ -830,7 +797,9 @@ ShardServerCatalogCacheLoader::Task::Task(
StatusWith<CollectionAndChangedChunks> statusWithCollectionAndChangedChunks,
ChunkVersion minimumQueryVersion,
long long currentTerm)
- : minQueryVersion(minimumQueryVersion), termCreated(currentTerm) {
+ : taskNum(taskIdGenerator.fetchAndAdd(1)),
+ minQueryVersion(minimumQueryVersion),
+ termCreated(currentTerm) {
if (statusWithCollectionAndChangedChunks.isOK()) {
collectionAndChangedChunks = statusWithCollectionAndChangedChunks.getValue();
invariant(!collectionAndChangedChunks->changedChunks.empty());
@@ -842,6 +811,9 @@ ShardServerCatalogCacheLoader::Task::Task(
}
}
+ShardServerCatalogCacheLoader::TaskList::TaskList()
+ : _activeTaskCompletedCondVar(std::make_shared<stdx::condition_variable>()) {}
+
void ShardServerCatalogCacheLoader::TaskList::addTask(Task task) {
if (_tasks.empty()) {
_tasks.emplace_back(std::move(task));
@@ -870,15 +842,18 @@ void ShardServerCatalogCacheLoader::TaskList::addTask(Task task) {
}
}
-const ShardServerCatalogCacheLoader::Task& ShardServerCatalogCacheLoader::TaskList::getActiveTask()
- const {
+void ShardServerCatalogCacheLoader::TaskList::pop_front() {
invariant(!_tasks.empty());
- return _tasks.front();
+ _tasks.pop_front();
+ _activeTaskCompletedCondVar->notify_all();
}
-void ShardServerCatalogCacheLoader::TaskList::removeActiveTask() {
- invariant(!_tasks.empty());
- _tasks.pop_front();
+void ShardServerCatalogCacheLoader::TaskList::waitForActiveTaskCompletion(
+ stdx::unique_lock<stdx::mutex>& lg) {
+ // Increase the use_count of the condition variable shared pointer, because the entire task list
+ // might get deleted during the unlocked interval
+ auto condVar = _activeTaskCompletedCondVar;
+ condVar->wait(lg);
}
bool ShardServerCatalogCacheLoader::TaskList::hasTasksFromThisTerm(long long term) const {
diff --git a/src/mongo/db/s/shard_server_catalog_cache_loader.h b/src/mongo/db/s/shard_server_catalog_cache_loader.h
index 257791be7d0..5212dab12fa 100644
--- a/src/mongo/db/s/shard_server_catalog_cache_loader.h
+++ b/src/mongo/db/s/shard_server_catalog_cache_loader.h
@@ -31,6 +31,7 @@
#include "mongo/db/operation_context_group.h"
#include "mongo/db/s/namespace_metadata_change_notifications.h"
#include "mongo/s/catalog_cache_loader.h"
+#include "mongo/stdx/condition_variable.h"
#include "mongo/util/concurrency/thread_pool.h"
namespace mongo {
@@ -51,15 +52,6 @@ public:
~ShardServerCatalogCacheLoader();
/**
- * For testing use only.
- *
- * Currently this only sets a boolean such that after metadata updates the notification system
- * is signaled internally, rather than depending on the OpObservers which are not connectted for
- * unit testing.
- */
- void setForTesting();
-
- /**
* Initializes internal state so that the loader behaves as a primary or secondary. This can
* only be called once, when the sharding state is initialized.
*/
@@ -82,23 +74,6 @@ public:
void notifyOfCollectionVersionUpdate(const NamespaceString& nss) override;
/**
- * Waits for the persisted collection version to be gte to 'version', or an epoch change. Only
- * call this function if you KNOW that a version gte WILL eventually be persisted.
- *
- * This function cannot wait for a version if nothing is persisted because a collection can
- * become unsharded after we start waiting and 'version' will then never be reached. If 'nss'
- * has no persisted metadata, even if it will shortly, a NamespaceNotFound error will be
- * returned.
- *
- * A lock must not be held when calling this because it would prevent using the latest snapshot
- * and actually seeing the change after it arrives.
- * This function can throw a DBException if the opCtx is interrupted.
- */
- Status waitForCollectionVersion(OperationContext* opCtx,
- const NamespaceString& nss,
- const ChunkVersion& version) override;
-
- /**
* This must be called serially, never in parallel, including waiting for the returned
* Notification to be signalled.
*
@@ -114,6 +89,8 @@ public:
stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn)
override;
+ void waitForCollectionFlush(OperationContext* opCtx, const NamespaceString& nss) override;
+
private:
// Differentiates the server's role in the replica set so that the chunk loader knows whether to
// load metadata locally or remotely.
@@ -145,6 +122,9 @@ private:
ChunkVersion minimumQueryVersion,
long long currentTerm);
+ // Always-incrementing task number to uniquely identify different tasks
+ uint64_t taskNum;
+
// Chunks and Collection updates to be applied to the shard persisted metadata store.
boost::optional<CollectionAndChangedChunks> collectionAndChangedChunks{boost::none};
@@ -177,6 +157,8 @@ private:
*/
class TaskList {
public:
+ TaskList();
+
/**
* Adds 'task' to the back of the 'tasks' list.
*
@@ -186,24 +168,43 @@ private:
*/
void addTask(Task task);
- /**
- * Returns the front of the 'tasks' list. Invariants if 'tasks' is empty.
- */
- const Task& getActiveTask() const;
+ auto& front() {
+ invariant(!_tasks.empty());
+ return _tasks.front();
+ }
- /**
- * Erases the current active task and updates 'activeTask' to the next task in 'tasks'.
- */
- void removeActiveTask();
+ auto& back() {
+ invariant(!_tasks.empty());
+ return _tasks.back();
+ }
- /**
- * Checks whether there are any tasks left.
- */
- const bool empty() {
+ auto begin() {
+ invariant(!_tasks.empty());
+ return _tasks.begin();
+ }
+
+ auto end() {
+ invariant(!_tasks.empty());
+ return _tasks.end();
+ }
+
+ void pop_front();
+
+ bool empty() const {
return _tasks.empty();
}
/**
+ * Must only be called if there is an active task. Behaves like a condition variable and
+ * will be signaled when the active task has been completed.
+ *
+ * NOTE: Because this call unlocks and locks the provided mutex, it is not safe to use the
+ * same task object on which it was called because it might have been deleted during the
+ * unlocked period.
+ */
+ void waitForActiveTaskCompletion(stdx::unique_lock<stdx::mutex>& lg);
+
+ /**
* Checks whether 'term' matches the term of the latest task in the task list. This is
* useful to check whether the task list has outdated data that's no longer valid to use in
* the current/new term specified by 'term'.
@@ -223,6 +224,10 @@ private:
private:
std::list<Task> _tasks{};
+
+ // Condition variable which will be signaled whenever the active task from the tasks list is
+ // completed. Must be used in conjunction with the loader's mutex.
+ std::shared_ptr<stdx::condition_variable> _activeTaskCompletedCondVar;
};
typedef std::map<NamespaceString, TaskList> TaskLists;
@@ -240,17 +245,6 @@ private:
stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn);
/**
- * Forces the primary to refresh its chunk metadata for 'nss' and obtain's the primary's
- * collectionVersion after the refresh.
- *
- * Then waits until it has replicated chunk metadata up to at least that collectionVersion.
- *
- * Throws on error.
- */
- void _forcePrimaryRefreshAndWaitForReplication(OperationContext* opCtx,
- const NamespaceString& nss);
-
- /**
* Refreshes chunk metadata from the config server's metadata store, and schedules maintenance
* of the shard's persisted metadata store with the latest updates retrieved from the config
* server.
@@ -362,9 +356,6 @@ private:
// The collection of operation contexts in use by all threads.
OperationContextGroup _contexts;
-
- // Gates additional actions needed when testing.
- bool _testing{false};
};
} // namespace mongo
diff --git a/src/mongo/db/s/shard_server_catalog_cache_loader_test.cpp b/src/mongo/db/s/shard_server_catalog_cache_loader_test.cpp
index 1680f999a01..b681f0e30f5 100644
--- a/src/mongo/db/s/shard_server_catalog_cache_loader_test.cpp
+++ b/src/mongo/db/s/shard_server_catalog_cache_loader_test.cpp
@@ -104,7 +104,6 @@ void ShardServerCatalogCacheLoaderTest::setUp() {
// Set the shard loader to primary mode, and set it for testing.
_shardLoader->initializeReplicaSetRole(true);
- _shardLoader->setForTesting();
}
void ShardServerCatalogCacheLoaderTest::tearDown() {
@@ -407,8 +406,7 @@ TEST_F(ShardServerCatalogCacheLoaderTest, PrimaryLoadFromShardedAndFindDiffReque
notification->get();
// Wait for persistence of update
- ASSERT_OK(_shardLoader->waitForCollectionVersion(
- operationContext(), kNss, updatedChunksDiff.back().getVersion()));
+ _shardLoader->waitForCollectionFlush(operationContext(), kNss);
// Set up the remote loader to return a single document we've already seen, indicating no change
// occurred.