summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorDianna Hohensee <dianna.hohensee@10gen.com>2017-05-17 09:21:49 -0400
committerDianna Hohensee <dianna.hohensee@10gen.com>2017-05-19 14:20:14 -0400
commitf7c5e33fc4634658877bafdb607c3865787acc7c (patch)
tree6b9d072bcd402ba0f7d0fb143cf45d20ee146ece /src/mongo/db
parent9ba2ced42110d439cf2644eaf57fac057dd1f337 (diff)
downloadmongo-f7c5e33fc4634658877bafdb607c3865787acc7c.tar.gz
SERVER-28724 add onStepDown and onStepUp functionality to ShardServerCatalogCacheLoader
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/db.cpp1
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp8
-rw-r--r--src/mongo/db/s/SConscript5
-rw-r--r--src/mongo/db/s/collection_sharding_state_test.cpp106
-rw-r--r--src/mongo/db/s/migration_source_manager.cpp2
-rw-r--r--src/mongo/db/s/shard_server_catalog_cache_loader.cpp112
-rw-r--r--src/mongo/db/s/shard_server_catalog_cache_loader.h68
-rw-r--r--src/mongo/db/s/sharding_state.cpp43
-rw-r--r--src/mongo/db/s/sharding_state.h8
-rw-r--r--src/mongo/db/s/sharding_state_test.cpp26
10 files changed, 267 insertions, 112 deletions
diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp
index 34786483431..a76bdac50f5 100644
--- a/src/mongo/db/db.cpp
+++ b/src/mongo/db/db.cpp
@@ -621,6 +621,7 @@ ExitCode _initAndListen(int listenPort) {
<< startupWarningsLog;
}
+ // This function may take the global lock.
auto shardingInitialized =
uassertStatusOK(ShardingState::get(startupOpCtx.get())
->initializeShardingAwarenessIfNeeded(startupOpCtx.get()));
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index 75b7851d77a..b0c4a14e175 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -79,6 +79,7 @@
#include "mongo/rpc/metadata/egress_metadata_hook_list.h"
#include "mongo/s/catalog/sharding_catalog_manager.h"
#include "mongo/s/catalog/type_shard.h"
+#include "mongo/s/catalog_cache.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/cluster_identity_loader.h"
#include "mongo/s/grid.h"
@@ -681,6 +682,9 @@ void ReplicationCoordinatorExternalStateImpl::killAllUserOperations(OperationCon
void ReplicationCoordinatorExternalStateImpl::shardingOnStepDownHook() {
if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) {
Balancer::get(_service)->interruptBalancer();
+ } else if (ShardingState::get(_service)->enabled()) {
+ invariant(serverGlobalParams.clusterRole == ClusterRole::ShardServer);
+ Grid::get(_service)->catalogCache()->onStepDown();
}
ShardingState::get(_service)->markCollectionsNotShardedAtStepdown();
@@ -740,6 +744,8 @@ void ReplicationCoordinatorExternalStateImpl::_shardingOnTransitionToPrimaryHook
// If this is a config server node becoming a primary, start the balancer
Balancer::get(opCtx)->initiateBalancer(opCtx);
} else if (ShardingState::get(opCtx)->enabled()) {
+ invariant(serverGlobalParams.clusterRole == ClusterRole::ShardServer);
+
const auto configsvrConnStr =
Grid::get(opCtx)->shardRegistry()->getConfigShard()->getConnString();
auto status = ShardingState::get(opCtx)->updateShardIdentityConfigString(
@@ -748,6 +754,8 @@ void ReplicationCoordinatorExternalStateImpl::_shardingOnTransitionToPrimaryHook
warning() << "error encountered while trying to update config connection string to "
<< configsvrConnStr << causedBy(status);
}
+
+ Grid::get(_service)->catalogCache()->onStepUp();
}
// There is a slight chance that some stale metadata might have been loaded before the latest
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index 9603962e4ae..4beb6bb5da6 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -238,7 +238,6 @@ env.CppUnitTest(
'$BUILD_DIR/mongo/s/catalog/dist_lock_manager_mock',
'$BUILD_DIR/mongo/s/catalog/sharding_catalog_client_impl',
'$BUILD_DIR/mongo/s/catalog/sharding_catalog_mock',
- '$BUILD_DIR/mongo/s/sharding_mongod_test_fixture',
'$BUILD_DIR/mongo/s/shard_server_test_fixture',
],
)
@@ -259,10 +258,8 @@ env.CppUnitTest(
'$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture',
'$BUILD_DIR/mongo/s/catalog/sharding_catalog_mock',
'$BUILD_DIR/mongo/s/coreshard',
- '$BUILD_DIR/mongo/util/clock_source_mock',
'$BUILD_DIR/mongo/util/net/message_port_mock',
- '$BUILD_DIR/mongo/db/service_context_d_test_fixture',
- '$BUILD_DIR/mongo/s/sharding_mongod_test_fixture',
+ '$BUILD_DIR/mongo/s/shard_server_test_fixture',
],
)
diff --git a/src/mongo/db/s/collection_sharding_state_test.cpp b/src/mongo/db/s/collection_sharding_state_test.cpp
index f1cecac1979..c1b15ce8c0c 100644
--- a/src/mongo/db/s/collection_sharding_state_test.cpp
+++ b/src/mongo/db/s/collection_sharding_state_test.cpp
@@ -26,85 +26,53 @@
* then also delete it in the license file.
*/
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
-
#include "mongo/platform/basic.h"
-#include "mongo/base/status_with.h"
-#include "mongo/db/jsobj.h"
+#include "mongo/db/s/collection_sharding_state.h"
+
+#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/namespace_string.h"
-#include "mongo/db/operation_context_noop.h"
-#include "mongo/db/repl/replication_coordinator.h"
-#include "mongo/db/repl/replication_coordinator_mock.h"
#include "mongo/db/s/collection_metadata.h"
-#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/db/s/type_shard_identity.h"
-#include "mongo/db/server_options.h"
-#include "mongo/db/service_context_noop.h"
-#include "mongo/db/service_context_noop.h"
-#include "mongo/unittest/unittest.h"
-#include "mongo/util/clock_source_mock.h"
+#include "mongo/s/shard_server_test_fixture.h"
namespace mongo {
namespace {
-class CollShardingStateTest : public mongo::unittest::Test {
+/**
+ * Uses the ShardServerTestFixture that sets up the ShardServerCatalogCacheLoader on the
+ * CatalogCache and has a real Locker class (as opposed to LockerNoop) for locking.
+ */
+class CollShardingStateTest : public ShardServerTestFixture {
public:
void setUp() override {
- _service.setFastClockSource(stdx::make_unique<ClockSourceMock>());
- _service.setPreciseClockSource(stdx::make_unique<ClockSourceMock>());
-
- serverGlobalParams.clusterRole = ClusterRole::ShardServer;
- _client = _service.makeClient("ShardingStateTest");
- _opCtx = _client->makeOperationContext();
-
- // Set a ReplicationCoordinator, since it is accessed as part of shardVersion checks.
- // TODO(esha): remove once the Safe Secondary Reads (PM-256) project is complete.
- auto svCtx = getServiceContext();
- repl::ReplSettings replSettings;
- replSettings.setMaster(true);
- repl::ReplicationCoordinator::set(
- svCtx, stdx::make_unique<repl::ReplicationCoordinatorMock>(svCtx, replSettings));
+ ShardServerTestFixture::setUp();
// Note: this assumes that globalInit will always be called on the same thread as the main
// test thread.
- ShardingState::get(opCtx())->setGlobalInitMethodForTest(
- [this](OperationContext*, const ConnectionString&, StringData) {
- _initCallCount++;
- return Status::OK();
- });
- }
-
- void tearDown() override {}
-
- OperationContext* opCtx() {
- return _opCtx.get();
+ ShardingState::get(operationContext())
+ ->setGlobalInitMethodForTest(
+ [this](OperationContext*, const ConnectionString&, StringData) {
+ _initCallCount++;
+ return Status::OK();
+ });
}
int getInitCallCount() const {
return _initCallCount;
}
- ServiceContext* getServiceContext() {
- return &_service;
- }
-
-protected:
- ServiceContextNoop _service;
-
private:
- ServiceContext::UniqueClient _client;
- ServiceContext::UniqueOperationContext _opCtx;
-
int _initCallCount = 0;
- const HostAndPort _host{"node1:12345"};
- const std::string _setName = "mySet";
- const std::vector<HostAndPort> _servers{_host};
};
TEST_F(CollShardingStateTest, GlobalInitGetsCalledAfterWriteCommits) {
- CollectionShardingState collShardingState(&_service,
+ // Must hold a lock to call initializeFromShardIdentity, which is called by the op observer on
+ // the shard identity document.
+ Lock::GlobalWrite lock(operationContext());
+
+ CollectionShardingState collShardingState(getServiceContext(),
NamespaceString::kConfigCollectionNamespace);
ShardIdentityType shardIdentity;
@@ -113,8 +81,8 @@ TEST_F(CollShardingStateTest, GlobalInitGetsCalledAfterWriteCommits) {
shardIdentity.setShardName("a");
shardIdentity.setClusterId(OID::gen());
- WriteUnitOfWork wuow(opCtx());
- collShardingState.onInsertOp(opCtx(), shardIdentity.toBSON());
+ WriteUnitOfWork wuow(operationContext());
+ collShardingState.onInsertOp(operationContext(), shardIdentity.toBSON());
ASSERT_EQ(0, getInitCallCount());
@@ -124,6 +92,10 @@ TEST_F(CollShardingStateTest, GlobalInitGetsCalledAfterWriteCommits) {
}
TEST_F(CollShardingStateTest, GlobalInitDoesntGetCalledIfWriteAborts) {
+ // Must hold a lock to call initializeFromShardIdentity, which is called by the op observer on
+ // the shard identity document.
+ Lock::GlobalWrite lock(operationContext());
+
CollectionShardingState collShardingState(getServiceContext(),
NamespaceString::kConfigCollectionNamespace);
@@ -134,8 +106,8 @@ TEST_F(CollShardingStateTest, GlobalInitDoesntGetCalledIfWriteAborts) {
shardIdentity.setClusterId(OID::gen());
{
- WriteUnitOfWork wuow(opCtx());
- collShardingState.onInsertOp(opCtx(), shardIdentity.toBSON());
+ WriteUnitOfWork wuow(operationContext());
+ collShardingState.onInsertOp(operationContext(), shardIdentity.toBSON());
ASSERT_EQ(0, getInitCallCount());
}
@@ -144,6 +116,10 @@ TEST_F(CollShardingStateTest, GlobalInitDoesntGetCalledIfWriteAborts) {
}
TEST_F(CollShardingStateTest, GlobalInitDoesntGetsCalledIfNSIsNotForShardIdentity) {
+ // Must hold a lock to call initializeFromShardIdentity, which is called by the op observer on
+ // the shard identity document.
+ Lock::GlobalWrite lock(operationContext());
+
CollectionShardingState collShardingState(getServiceContext(), NamespaceString("admin.user"));
ShardIdentityType shardIdentity;
@@ -152,8 +128,8 @@ TEST_F(CollShardingStateTest, GlobalInitDoesntGetsCalledIfNSIsNotForShardIdentit
shardIdentity.setShardName("a");
shardIdentity.setClusterId(OID::gen());
- WriteUnitOfWork wuow(opCtx());
- collShardingState.onInsertOp(opCtx(), shardIdentity.toBSON());
+ WriteUnitOfWork wuow(operationContext());
+ collShardingState.onInsertOp(operationContext(), shardIdentity.toBSON());
ASSERT_EQ(0, getInitCallCount());
@@ -163,22 +139,28 @@ TEST_F(CollShardingStateTest, GlobalInitDoesntGetsCalledIfNSIsNotForShardIdentit
}
TEST_F(CollShardingStateTest, OnInsertOpThrowWithIncompleteShardIdentityDocument) {
+ // Must hold a lock to call CollectionShardingState::onInsertOp.
+ Lock::GlobalWrite lock(operationContext());
+
CollectionShardingState collShardingState(getServiceContext(),
NamespaceString::kConfigCollectionNamespace);
ShardIdentityType shardIdentity;
shardIdentity.setShardName("a");
- ASSERT_THROWS(collShardingState.onInsertOp(opCtx(), shardIdentity.toBSON()),
+ ASSERT_THROWS(collShardingState.onInsertOp(operationContext(), shardIdentity.toBSON()),
AssertionException);
}
TEST_F(CollShardingStateTest, GlobalInitDoesntGetsCalledIfShardIdentityDocWasNotInserted) {
+ // Must hold a lock to call CollectionShardingState::onInsertOp.
+ Lock::GlobalWrite lock(operationContext());
+
CollectionShardingState collShardingState(getServiceContext(),
NamespaceString::kConfigCollectionNamespace);
- WriteUnitOfWork wuow(opCtx());
- collShardingState.onInsertOp(opCtx(), BSON("_id" << 1));
+ WriteUnitOfWork wuow(operationContext());
+ collShardingState.onInsertOp(operationContext(), BSON("_id" << 1));
ASSERT_EQ(0, getInitCallCount());
diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp
index 6133a27903a..ce41648e687 100644
--- a/src/mongo/db/s/migration_source_manager.cpp
+++ b/src/mongo/db/s/migration_source_manager.cpp
@@ -92,7 +92,7 @@ MigrationSourceManager::MigrationSourceManager(OperationContext* opCtx,
_args.getFromShardId() != _args.getToShardId());
log() << "Starting chunk migration " << redact(_args.toString())
- << " with expected collection version epoch" << _args.getVersionEpoch();
+ << " with expected collection version epoch " << _args.getVersionEpoch();
// Now that the collection is locked, snapshot the metadata and fetch the latest versions
ShardingState* const shardingState = ShardingState::get(opCtx);
diff --git a/src/mongo/db/s/shard_server_catalog_cache_loader.cpp b/src/mongo/db/s/shard_server_catalog_cache_loader.cpp
index 368eb23abec..d3219b25c35 100644
--- a/src/mongo/db/s/shard_server_catalog_cache_loader.cpp
+++ b/src/mongo/db/s/shard_server_catalog_cache_loader.cpp
@@ -240,26 +240,64 @@ ShardServerCatalogCacheLoader::~ShardServerCatalogCacheLoader() {
_threadPool.join();
}
+void ShardServerCatalogCacheLoader::initializeReplicaSetRole(bool isPrimary) {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ invariant(_role == ReplicaSetRole::None);
+
+ if (isPrimary) {
+ _role = ReplicaSetRole::Primary;
+ } else {
+ _role = ReplicaSetRole::Secondary;
+ }
+}
+
+void ShardServerCatalogCacheLoader::onStepDown() {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ invariant(_role != ReplicaSetRole::None);
+ ++_term;
+ _role = ReplicaSetRole::Secondary;
+}
+
+void ShardServerCatalogCacheLoader::onStepUp() {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ invariant(_role != ReplicaSetRole::None);
+ ++_term;
+ _role = ReplicaSetRole::Primary;
+}
std::shared_ptr<Notification<void>> ShardServerCatalogCacheLoader::getChunksSince(
const NamespaceString& nss,
ChunkVersion version,
stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn) {
+ long long currentTerm;
+ bool isPrimary;
+ {
+ // Take the mutex so that we can discern whether we're primary or secondary and schedule a
+ // task with the corresponding _term value.
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ invariant(_role != ReplicaSetRole::None);
+
+ currentTerm = _term;
+ isPrimary = (_role == ReplicaSetRole::Primary);
+ }
- // TODO: plug in secondary machinery, with onStepDown and onBecomePrimary tasks: clear TaskLists
- // and thread pool
+ // TODO: add and plug in secondary machinery
auto notify = std::make_shared<Notification<void>>();
- uassertStatusOK(_threadPool.schedule([ this, nss, version, callbackFn, notify ]() noexcept {
- auto opCtx = Client::getCurrent()->makeOperationContext();
- try {
- _schedulePrimayGetChunksSince(opCtx.get(), nss, version, callbackFn, notify);
- } catch (const DBException& ex) {
- callbackFn(opCtx.get(), ex.toStatus());
- notify->set();
- }
- }));
+ if (isPrimary) {
+ uassertStatusOK(_threadPool.schedule(
+ [ this, nss, version, currentTerm, callbackFn, notify ]() noexcept {
+ auto opCtx = Client::getCurrent()->makeOperationContext();
+ try {
+ _schedulePrimayGetChunksSince(
+ opCtx.get(), nss, version, currentTerm, callbackFn, notify);
+ } catch (const DBException& ex) {
+ callbackFn(opCtx.get(), ex.toStatus());
+ notify->set();
+ }
+ }));
+ }
return notify;
}
@@ -268,6 +306,7 @@ void ShardServerCatalogCacheLoader::_schedulePrimayGetChunksSince(
OperationContext* opCtx,
const NamespaceString& nss,
const ChunkVersion& catalogCacheSinceVersion,
+ long long termScheduled,
stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn,
std::shared_ptr<Notification<void>> notify) {
@@ -277,7 +316,8 @@ void ShardServerCatalogCacheLoader::_schedulePrimayGetChunksSince(
stdx::lock_guard<stdx::mutex> lock(_mutex);
auto taskListIt = _taskLists.find(nss);
- if (taskListIt != _taskLists.end()) {
+ if (taskListIt != _taskLists.end() &&
+ taskListIt->second.hasTasksFromThisTerm(termScheduled)) {
// Enqueued tasks have the latest metadata
return taskListIt->second.getHighestVersionEnqueued();
}
@@ -288,7 +328,7 @@ void ShardServerCatalogCacheLoader::_schedulePrimayGetChunksSince(
}();
auto remoteRefreshCallbackFn =
- [this, nss, catalogCacheSinceVersion, maxLoaderVersion, notify, callbackFn](
+ [this, nss, catalogCacheSinceVersion, maxLoaderVersion, termScheduled, callbackFn, notify](
OperationContext* opCtx,
StatusWith<CollectionAndChangedChunks> swCollectionAndChangedChunks) {
@@ -297,8 +337,8 @@ void ShardServerCatalogCacheLoader::_schedulePrimayGetChunksSince(
// No updates to apply. Do nothing.
} else {
// Enqueue a Task to apply the update retrieved from the config server.
- Status scheduleStatus =
- _scheduleTask(nss, Task{swCollectionAndChangedChunks, maxLoaderVersion});
+ Status scheduleStatus = _scheduleTask(
+ nss, Task{swCollectionAndChangedChunks, maxLoaderVersion, termScheduled});
if (!scheduleStatus.isOK()) {
callbackFn(opCtx, StatusWith<CollectionAndChangedChunks>(scheduleStatus));
notify->set();
@@ -310,7 +350,7 @@ void ShardServerCatalogCacheLoader::_schedulePrimayGetChunksSince(
// -- both persisted and enqueued.
swCollectionAndChangedChunks =
- _getLoaderMetadata(opCtx, nss, catalogCacheSinceVersion);
+ _getLoaderMetadata(opCtx, nss, catalogCacheSinceVersion, termScheduled);
// If no results were returned, convert the response into
// NamespaceNotFound.
if (swCollectionAndChangedChunks.isOK() &&
@@ -334,12 +374,13 @@ void ShardServerCatalogCacheLoader::_schedulePrimayGetChunksSince(
StatusWith<CollectionAndChangedChunks> ShardServerCatalogCacheLoader::_getLoaderMetadata(
OperationContext* opCtx,
const NamespaceString& nss,
- const ChunkVersion& catalogCacheSinceVersion) {
+ const ChunkVersion& catalogCacheSinceVersion,
+ const long long term) {
// Get the enqueued metadata first. Otherwise we could miss data between reading persisted and
// enqueued, if an enqueued task finished after the persisted read but before the enqueued read.
- auto enqueuedRes = _getEnqueuedMetadata(nss, catalogCacheSinceVersion);
+ auto enqueuedRes = _getEnqueuedMetadata(nss, catalogCacheSinceVersion, term);
bool isEnqueued = std::move(enqueuedRes.first);
CollectionAndChangedChunks enqueued = std::move(enqueuedRes.second);
@@ -387,18 +428,25 @@ StatusWith<CollectionAndChangedChunks> ShardServerCatalogCacheLoader::_getLoader
}
std::pair<bool, CollectionAndChangedChunks> ShardServerCatalogCacheLoader::_getEnqueuedMetadata(
- const NamespaceString& nss, const ChunkVersion& catalogCacheSinceVersion) {
+ const NamespaceString& nss,
+ const ChunkVersion& catalogCacheSinceVersion,
+ const long long term) {
stdx::unique_lock<stdx::mutex> lock(_mutex);
auto taskListIt = _taskLists.find(nss);
if (taskListIt == _taskLists.end()) {
return std::make_pair(false, CollectionAndChangedChunks());
+ } else if (!taskListIt->second.hasTasksFromThisTerm(term)) {
+ // If task list does not have a term that matches, there's no valid task data to collect.
+ return std::make_pair(false, CollectionAndChangedChunks());
}
- CollectionAndChangedChunks collAndChunks = taskListIt->second.getEnqueuedMetadata();
+ // Only return task data of tasks scheduled in the same term as the given 'term': older term
+ // task data is no longer valid.
+ CollectionAndChangedChunks collAndChunks = taskListIt->second.getEnqueuedMetadataForTerm(term);
// Returns all the results if 'catalogCacheSinceVersion's epoch does not match. Otherwise, trim
- // the results to be GTE to 'catalogCacheSinceVersion'
+ // the results to be GTE to 'catalogCacheSinceVersion'.
if (collAndChunks.epoch != catalogCacheSinceVersion.epoch()) {
return std::make_pair(true, collAndChunks);
@@ -480,6 +528,12 @@ bool ShardServerCatalogCacheLoader::_updatePersistedMetadata(OperationContext* o
const Task task = _taskLists[nss].getActiveTask();
invariant(task.dropped || !task.collectionAndChangedChunks->changedChunks.empty());
+ // If this task is from an old term and no longer valid, do not execute and return true so that
+ // the task gets removed from the task list
+ if (task.termCreated != _term) {
+ return true;
+ }
+
lock.unlock();
// Check if this is a drop task.
@@ -543,9 +597,11 @@ bool ShardServerCatalogCacheLoader::_updatePersistedMetadata(OperationContext* o
ShardServerCatalogCacheLoader::Task::Task(
StatusWith<CollectionAndChangedChunks> statusWithCollectionAndChangedChunks,
- ChunkVersion minimumQueryVersion) {
+ ChunkVersion minimumQueryVersion,
+ const long long currentTerm) {
minQueryVersion = minimumQueryVersion;
+ termCreated = currentTerm;
if (statusWithCollectionAndChangedChunks.isOK()) {
collectionAndChangedChunks = statusWithCollectionAndChangedChunks.getValue();
@@ -595,14 +651,24 @@ void ShardServerCatalogCacheLoader::TaskList::removeActiveTask() {
_tasks.pop_front();
}
+bool ShardServerCatalogCacheLoader::TaskList::hasTasksFromThisTerm(long long term) const {
+ return _tasks.back().termCreated == term;
+}
+
ChunkVersion ShardServerCatalogCacheLoader::TaskList::getHighestVersionEnqueued() const {
invariant(!_tasks.empty());
return _tasks.back().maxQueryVersion;
}
-CollectionAndChangedChunks ShardServerCatalogCacheLoader::TaskList::getEnqueuedMetadata() const {
+CollectionAndChangedChunks ShardServerCatalogCacheLoader::TaskList::getEnqueuedMetadataForTerm(
+ const long long term) const {
CollectionAndChangedChunks collAndChunks;
for (const auto& task : _tasks) {
+ if (task.termCreated != term) {
+ // Task data is no longer valid. Go on to the next task in the list.
+ continue;
+ }
+
if (task.dropped) {
// A drop task should reset the metadata.
collAndChunks = CollectionAndChangedChunks();
diff --git a/src/mongo/db/s/shard_server_catalog_cache_loader.h b/src/mongo/db/s/shard_server_catalog_cache_loader.h
index 5be9eb51727..845145c2708 100644
--- a/src/mongo/db/s/shard_server_catalog_cache_loader.h
+++ b/src/mongo/db/s/shard_server_catalog_cache_loader.h
@@ -45,11 +45,29 @@ class ThreadPoolInterface;
* retrieves chunk metadata from the shard persisted chunk metadata.
*/
class ShardServerCatalogCacheLoader : public CatalogCacheLoader {
+ MONGO_DISALLOW_COPYING(ShardServerCatalogCacheLoader);
+
public:
ShardServerCatalogCacheLoader(std::unique_ptr<CatalogCacheLoader> configLoader);
~ShardServerCatalogCacheLoader();
/**
+ * Initializes internal state so that the loader behaves as a primary or secondary. This can
+ * only be called once, when the sharding state is initialized.
+ */
+ void initializeReplicaSetRole(bool isPrimary) override;
+
+ /**
+ * Updates internal state so that the loader can start behaving like a secondary.
+ */
+ void onStepDown() override;
+
+ /**
+ * Updates internal state so that the loader can start behaving like a primary.
+ */
+ void onStepUp() override;
+
+ /**
* This must be called serially, never in parallel, including waiting for the returned
* Notification to be signalled.
*
@@ -66,6 +84,10 @@ public:
override;
private:
+ // Differentiates the server's role in the replica set so that the chunk loader knows whether to
+ // load metadata locally or remotely.
+ enum class ReplicaSetRole { None, Secondary, Primary };
+
/**
* This represents an update task for the persisted chunk metadata. The task will either be to
* apply a set up updated chunks to the shard persisted metadata store or to drop the persisted
@@ -86,7 +108,8 @@ private:
* 'collectionAndChangedChunks' or ChunkVersion::UNSHARDED().
*/
Task(StatusWith<CollectionAndChangedChunks> statusWithCollectionAndChangedChunks,
- ChunkVersion minimumQueryVersion);
+ ChunkVersion minimumQueryVersion,
+ long long currentTerm);
// Chunks and Collection updates to be applied to the shard persisted metadata store.
boost::optional<CollectionAndChangedChunks> collectionAndChangedChunks{boost::none};
@@ -103,6 +126,9 @@ private:
// Indicates whether the collection metadata must be cleared.
bool dropped{false};
+
+ // The term in which the loader scheduled this task.
+ uint32_t termCreated;
};
/**
@@ -144,14 +170,22 @@ private:
}
/**
+ * Checks whether 'term' matches the term of the latest task in the task list. This is
+ * useful to check whether the task list has outdated data that's no longer valid to use in
+ * the current/new term specified by 'term'.
+ */
+ bool hasTasksFromThisTerm(long long term) const;
+
+ /**
* Gets the last task's highest version -- this is the most up to date version.
*/
ChunkVersion getHighestVersionEnqueued() const;
/**
- * Iterates over the task list to retrieve the enqueued metadata.
+ * Iterates over the task list to retrieve the enqueued metadata. Only retrieves collects
+ * data from tasks that have terms matching the specified 'term'.
*/
- CollectionAndChangedChunks getEnqueuedMetadata() const;
+ CollectionAndChangedChunks getEnqueuedMetadataForTerm(const long long term) const;
private:
std::list<Task> _tasks{};
@@ -164,8 +198,9 @@ private:
* of the shard's persisted metadata store with the latest updates retrieved from the config
* server.
*
- * Then calls 'callbackFn' with metadata loaded from the shard persisted metadata store, and any
- * in-memory task enqueued to update that store, GTE to 'catalogCacheSinceVersion'
+ * Then calls 'callbackFn' with metadata retrived locally from the shard persisted metadata
+ * store and any in-memory tasks with terms matching 'currentTerm' enqueued to update that
+ * store, GTE to 'catalogCacheSinceVersion'.
*
* Only run on the shard primary.
*/
@@ -173,13 +208,14 @@ private:
OperationContext* opCtx,
const NamespaceString& nss,
const ChunkVersion& catalogCacheSinceVersion,
+ long long currentTerm,
stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn,
std::shared_ptr<Notification<void>> notify);
/**
- * Loads chunk metadata from the shard persisted metadata store, and any in-memory task enqueued
- * to update that store, GTE to 'catalogCacheSinceVersion'.
+ * Loads chunk metadata from the shard persisted metadata store and any in-memory tasks with
+ * terms matching 'term' enqueued to update that store, GTE to 'catalogCacheSinceVersion'.
*
* Will return an empty CollectionAndChangedChunks object if no metadata is found (collection
* was dropped).
@@ -189,13 +225,14 @@ private:
StatusWith<CollectionAndChangedChunks> _getLoaderMetadata(
OperationContext* opCtx,
const NamespaceString& nss,
- const ChunkVersion& catalogCacheSinceVersion);
+ const ChunkVersion& catalogCacheSinceVersion,
+ const long long term);
/**
* Loads chunk metadata from all in-memory tasks enqueued to update the shard persisted metadata
* store for collection 'nss' that is GTE 'catalogCacheSinceVersion'. If
* 'catalogCacheSinceVersion's epoch does not match that of the metadata enqueued, returns all
- * metadata.
+ * metadata. Ignores tasks with terms that do not match 'term': these are no longer valid.
*
* The bool returned in the pair indicates whether there are any tasks enqueued. If none are, it
* is false. If it is true, and the CollectionAndChangedChunks returned is empty, this indicates
@@ -204,7 +241,9 @@ private:
* Only run on the shard primary.
*/
std::pair<bool, CollectionAndChangedChunks> _getEnqueuedMetadata(
- const NamespaceString& nss, const ChunkVersion& catalogCacheSinceVersion);
+ const NamespaceString& nss,
+ const ChunkVersion& catalogCacheSinceVersion,
+ const long long term);
/**
* Adds 'task' to the task list for 'nss'. If this creates a new task list, then '_runTasks' is
@@ -240,6 +279,15 @@ private:
// Map to track in progress persisted cache updates on the shard primary.
TaskLists _taskLists;
+
+ // This value is increment every time this server changes from primary to secondary and vice
+ // versa. In this way, if a task is scheduled with one term value and then execution is
+ // attempted during another term, we can skip the operation because it is no longer valid.
+ long long _term{0};
+
+ // Indicates whether this server is the primary or not, so that the appropriate loading action
+ // can be taken.
+ ReplicaSetRole _role{ReplicaSetRole::None};
};
} // namespace mongo
diff --git a/src/mongo/db/s/sharding_state.cpp b/src/mongo/db/s/sharding_state.cpp
index ebe7b475a22..bf150a59957 100644
--- a/src/mongo/db/s/sharding_state.cpp
+++ b/src/mongo/db/s/sharding_state.cpp
@@ -267,11 +267,12 @@ Status ShardingState::refreshMetadataNow(OperationContext* opCtx,
}
}
-// NOTE: This method can be called inside a database lock so it should never take any database
+// NOTE: This method will be called inside a database lock so it should never take any database
// locks, perform I/O, or any long running operations.
Status ShardingState::initializeFromShardIdentity(OperationContext* opCtx,
const ShardIdentityType& shardIdentity) {
invariant(serverGlobalParams.clusterRole == ClusterRole::ShardServer);
+ invariant(opCtx->lockState()->isLocked());
Status validationStatus = shardIdentity.validate();
if (!validationStatus.isOK()) {
@@ -314,13 +315,23 @@ Status ShardingState::initializeFromShardIdentity(OperationContext* opCtx,
try {
Status status = _globalInit(opCtx, configSvrConnStr, generateDistLockProcessId(opCtx));
if (status.isOK()) {
- log() << "initialized sharding components";
- _setInitializationState(InitializationState::kInitialized);
ReplicaSetMonitor::setSynchronousConfigChangeHook(
&ShardRegistry::replicaSetChangeShardRegistryUpdateHook);
ReplicaSetMonitor::setAsynchronousConfigChangeHook(&updateShardIdentityConfigStringCB);
- _setInitializationState(InitializationState::kInitialized);
+ // Determine primary/secondary/standalone state in order to set it on the CatalogCache.
+ auto replCoord = repl::ReplicationCoordinator::get(opCtx);
+ bool isReplSet =
+ replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet;
+ bool isStandaloneOrPrimary =
+ !isReplSet || (repl::ReplicationCoordinator::get(opCtx)->getMemberState() ==
+ repl::MemberState::RS_PRIMARY);
+
+ Grid::get(opCtx)->catalogCache()->initializeReplicaSetRole(isStandaloneOrPrimary);
+
+ log() << "initialized sharding components for "
+ << (isStandaloneOrPrimary ? "primary" : "secondary") << " node.";
+ _setInitializationState(InitializationState::kInitialized);
} else {
log() << "failed to initialize sharding components" << causedBy(status);
_initializationStatus = status;
@@ -350,6 +361,8 @@ void ShardingState::_setInitializationState(InitializationState newState) {
}
StatusWith<bool> ShardingState::initializeShardingAwarenessIfNeeded(OperationContext* opCtx) {
+ invariant(!opCtx->lockState()->isLocked());
+
// In sharded readOnly mode, we ignore the shardIdentity document on disk and instead *require*
// a shardIdentity document to be passed through --overrideShardIdentity.
if (storageGlobalParams.readOnly) {
@@ -364,9 +377,14 @@ StatusWith<bool> ShardingState::initializeShardingAwarenessIfNeeded(OperationCon
if (!swOverrideShardIdentity.isOK()) {
return swOverrideShardIdentity.getStatus();
}
- auto status = initializeFromShardIdentity(opCtx, swOverrideShardIdentity.getValue());
- if (!status.isOK()) {
- return status;
+ {
+ // Global lock is required to call initializeFromShardIdenetity().
+ Lock::GlobalWrite lk(opCtx);
+ auto status =
+ initializeFromShardIdentity(opCtx, swOverrideShardIdentity.getValue());
+ if (!status.isOK()) {
+ return status;
+ }
}
return true;
} else {
@@ -399,7 +417,6 @@ StatusWith<bool> ShardingState::initializeShardingAwarenessIfNeeded(OperationCon
}
// Load the shardIdentity document from disk.
- invariant(!opCtx->lockState()->isLocked());
BSONObj shardIdentityBSON;
bool foundShardIdentity = false;
try {
@@ -428,9 +445,13 @@ StatusWith<bool> ShardingState::initializeShardingAwarenessIfNeeded(OperationCon
if (!swShardIdentity.isOK()) {
return swShardIdentity.getStatus();
}
- auto status = initializeFromShardIdentity(opCtx, swShardIdentity.getValue());
- if (!status.isOK()) {
- return status;
+ {
+ // Global lock is required to call initializeFromShardIdenetity().
+ Lock::GlobalWrite lk(opCtx);
+ auto status = initializeFromShardIdentity(opCtx, swShardIdentity.getValue());
+ if (!status.isOK()) {
+ return status;
+ }
}
return true;
} else {
diff --git a/src/mongo/db/s/sharding_state.h b/src/mongo/db/s/sharding_state.h
index c6efdeaae30..d41764c1d01 100644
--- a/src/mongo/db/s/sharding_state.h
+++ b/src/mongo/db/s/sharding_state.h
@@ -114,7 +114,11 @@ public:
}
/**
- * Initializes the sharding state of this server from the shard identity document argument.
+ * Initializes the sharding state of this server from the shard identity document argument
+ * and sets secondary or primary state information on the catalog cache loader.
+ *
+ * Note: caller must hold a global/database lock! Needed in order to stably check for
+ * replica set state (primary, secondary, standalone).
*/
Status initializeFromShardIdentity(OperationContext* opCtx,
const ShardIdentityType& shardIdentity);
@@ -247,6 +251,8 @@ public:
* classes for sharding were initialized, but no networking calls were made yet (with the
* exception of the duplicate ShardRegistry reload in ShardRegistry::startup() (see
* SERVER-26123). Outgoing networking calls to cluster members can now be made.
+ *
+ * Note: this function briefly takes the global lock to determine primary/secondary state.
*/
StatusWith<bool> initializeShardingAwarenessIfNeeded(OperationContext* opCtx);
diff --git a/src/mongo/db/s/sharding_state_test.cpp b/src/mongo/db/s/sharding_state_test.cpp
index 6e233262e2a..c47acfa143e 100644
--- a/src/mongo/db/s/sharding_state_test.cpp
+++ b/src/mongo/db/s/sharding_state_test.cpp
@@ -30,10 +30,12 @@
#include "mongo/client/remote_command_targeter_mock.h"
#include "mongo/client/replica_set_monitor.h"
+#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/repl/replication_coordinator_mock.h"
+#include "mongo/db/s/shard_server_catalog_cache_loader.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/db/s/type_shard_identity.h"
#include "mongo/db/server_options.h"
@@ -41,7 +43,9 @@
#include "mongo/db/storage/storage_options.h"
#include "mongo/s/catalog/dist_lock_manager_mock.h"
#include "mongo/s/catalog/sharding_catalog_client_impl.h"
+#include "mongo/s/catalog_cache.h"
#include "mongo/s/client/shard_registry.h"
+#include "mongo/s/config_server_catalog_cache_loader.h"
#include "mongo/s/sharding_mongod_test_fixture.h"
namespace mongo {
@@ -116,12 +120,25 @@ protected:
return stdx::make_unique<ShardingCatalogClientImpl>(std::move(distLockManager));
}
+ std::unique_ptr<CatalogCacheLoader> makeCatalogCacheLoader() {
+ return stdx::make_unique<ShardServerCatalogCacheLoader>(
+ stdx::make_unique<ConfigServerCatalogCacheLoader>());
+ }
+
+ std::unique_ptr<CatalogCache> makeCatalogCache(
+ std::unique_ptr<CatalogCacheLoader> catalogCacheLoader) {
+ invariant(catalogCacheLoader);
+ return stdx::make_unique<CatalogCache>(std::move(catalogCacheLoader));
+ }
+
private:
ShardingState _shardingState;
ShardId _shardName;
};
TEST_F(ShardingStateTest, ValidShardIdentitySucceeds) {
+ Lock::GlobalWrite lk(operationContext());
+
ShardIdentityType shardIdentity;
shardIdentity.setConfigsvrConnString(
ConnectionString(ConnectionString::SET, "a:1,b:2", "config"));
@@ -135,6 +152,9 @@ TEST_F(ShardingStateTest, ValidShardIdentitySucceeds) {
}
TEST_F(ShardingStateTest, InitWhilePreviouslyInErrorStateWillStayInErrorState) {
+ // Must hold a lock to call initializeFromShardIdentity.
+ Lock::GlobalWrite lk(operationContext());
+
ShardIdentityType shardIdentity;
shardIdentity.setConfigsvrConnString(
ConnectionString(ConnectionString::SET, "a:1,b:2", "config"));
@@ -169,6 +189,9 @@ TEST_F(ShardingStateTest, InitWhilePreviouslyInErrorStateWillStayInErrorState) {
}
TEST_F(ShardingStateTest, InitializeAgainWithMatchingShardIdentitySucceeds) {
+ // Must hold a lock to call initializeFromShardIdentity.
+ Lock::GlobalWrite lk(operationContext());
+
auto clusterID = OID::gen();
ShardIdentityType shardIdentity;
shardIdentity.setConfigsvrConnString(
@@ -197,6 +220,9 @@ TEST_F(ShardingStateTest, InitializeAgainWithMatchingShardIdentitySucceeds) {
}
TEST_F(ShardingStateTest, InitializeAgainWithSameReplSetNameSucceeds) {
+ // Must hold a lock to call initializeFromShardIdentity.
+ Lock::GlobalWrite lk(operationContext());
+
auto clusterID = OID::gen();
ShardIdentityType shardIdentity;
shardIdentity.setConfigsvrConnString(