diff options
author | Dianna Hohensee <dianna.hohensee@10gen.com> | 2017-05-17 09:21:49 -0400 |
---|---|---|
committer | Dianna Hohensee <dianna.hohensee@10gen.com> | 2017-05-19 14:20:14 -0400 |
commit | f7c5e33fc4634658877bafdb607c3865787acc7c (patch) | |
tree | 6b9d072bcd402ba0f7d0fb143cf45d20ee146ece /src/mongo/db | |
parent | 9ba2ced42110d439cf2644eaf57fac057dd1f337 (diff) | |
download | mongo-f7c5e33fc4634658877bafdb607c3865787acc7c.tar.gz |
SERVER-28724 add onStepDown and onStepUp functionality to ShardServerCatalogCacheLoader
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/db.cpp | 1 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_external_state_impl.cpp | 8 | ||||
-rw-r--r-- | src/mongo/db/s/SConscript | 5 | ||||
-rw-r--r-- | src/mongo/db/s/collection_sharding_state_test.cpp | 106 | ||||
-rw-r--r-- | src/mongo/db/s/migration_source_manager.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/s/shard_server_catalog_cache_loader.cpp | 112 | ||||
-rw-r--r-- | src/mongo/db/s/shard_server_catalog_cache_loader.h | 68 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_state.cpp | 43 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_state.h | 8 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_state_test.cpp | 26 |
10 files changed, 267 insertions, 112 deletions
diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp index 34786483431..a76bdac50f5 100644 --- a/src/mongo/db/db.cpp +++ b/src/mongo/db/db.cpp @@ -621,6 +621,7 @@ ExitCode _initAndListen(int listenPort) { << startupWarningsLog; } + // This function may take the global lock. auto shardingInitialized = uassertStatusOK(ShardingState::get(startupOpCtx.get()) ->initializeShardingAwarenessIfNeeded(startupOpCtx.get())); diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index 75b7851d77a..b0c4a14e175 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -79,6 +79,7 @@ #include "mongo/rpc/metadata/egress_metadata_hook_list.h" #include "mongo/s/catalog/sharding_catalog_manager.h" #include "mongo/s/catalog/type_shard.h" +#include "mongo/s/catalog_cache.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/cluster_identity_loader.h" #include "mongo/s/grid.h" @@ -681,6 +682,9 @@ void ReplicationCoordinatorExternalStateImpl::killAllUserOperations(OperationCon void ReplicationCoordinatorExternalStateImpl::shardingOnStepDownHook() { if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { Balancer::get(_service)->interruptBalancer(); + } else if (ShardingState::get(_service)->enabled()) { + invariant(serverGlobalParams.clusterRole == ClusterRole::ShardServer); + Grid::get(_service)->catalogCache()->onStepDown(); } ShardingState::get(_service)->markCollectionsNotShardedAtStepdown(); @@ -740,6 +744,8 @@ void ReplicationCoordinatorExternalStateImpl::_shardingOnTransitionToPrimaryHook // If this is a config server node becoming a primary, start the balancer Balancer::get(opCtx)->initiateBalancer(opCtx); } else if (ShardingState::get(opCtx)->enabled()) { + invariant(serverGlobalParams.clusterRole == ClusterRole::ShardServer); + const auto configsvrConnStr = Grid::get(opCtx)->shardRegistry()->getConfigShard()->getConnString(); auto status = ShardingState::get(opCtx)->updateShardIdentityConfigString( @@ -748,6 +754,8 @@ void ReplicationCoordinatorExternalStateImpl::_shardingOnTransitionToPrimaryHook warning() << "error encountered while trying to update config connection string to " << configsvrConnStr << causedBy(status); } + + Grid::get(_service)->catalogCache()->onStepUp(); } // There is a slight chance that some stale metadata might have been loaded before the latest diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index 9603962e4ae..4beb6bb5da6 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -238,7 +238,6 @@ env.CppUnitTest( '$BUILD_DIR/mongo/s/catalog/dist_lock_manager_mock', '$BUILD_DIR/mongo/s/catalog/sharding_catalog_client_impl', '$BUILD_DIR/mongo/s/catalog/sharding_catalog_mock', - '$BUILD_DIR/mongo/s/sharding_mongod_test_fixture', '$BUILD_DIR/mongo/s/shard_server_test_fixture', ], ) @@ -259,10 +258,8 @@ env.CppUnitTest( '$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture', '$BUILD_DIR/mongo/s/catalog/sharding_catalog_mock', '$BUILD_DIR/mongo/s/coreshard', - '$BUILD_DIR/mongo/util/clock_source_mock', '$BUILD_DIR/mongo/util/net/message_port_mock', - '$BUILD_DIR/mongo/db/service_context_d_test_fixture', - '$BUILD_DIR/mongo/s/sharding_mongod_test_fixture', + '$BUILD_DIR/mongo/s/shard_server_test_fixture', ], ) diff --git a/src/mongo/db/s/collection_sharding_state_test.cpp b/src/mongo/db/s/collection_sharding_state_test.cpp index f1cecac1979..c1b15ce8c0c 100644 --- a/src/mongo/db/s/collection_sharding_state_test.cpp +++ b/src/mongo/db/s/collection_sharding_state_test.cpp @@ -26,85 +26,53 @@ * then also delete it in the license file. */ -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding - #include "mongo/platform/basic.h" -#include "mongo/base/status_with.h" -#include "mongo/db/jsobj.h" +#include "mongo/db/s/collection_sharding_state.h" + +#include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/namespace_string.h" -#include "mongo/db/operation_context_noop.h" -#include "mongo/db/repl/replication_coordinator.h" -#include "mongo/db/repl/replication_coordinator_mock.h" #include "mongo/db/s/collection_metadata.h" -#include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/s/sharding_state.h" #include "mongo/db/s/type_shard_identity.h" -#include "mongo/db/server_options.h" -#include "mongo/db/service_context_noop.h" -#include "mongo/db/service_context_noop.h" -#include "mongo/unittest/unittest.h" -#include "mongo/util/clock_source_mock.h" +#include "mongo/s/shard_server_test_fixture.h" namespace mongo { namespace { -class CollShardingStateTest : public mongo::unittest::Test { +/** + * Uses the ShardServerTestFixture that sets up the ShardServerCatalogCacheLoader on the + * CatalogCache and has a real Locker class (as opposed to LockerNoop) for locking. + */ +class CollShardingStateTest : public ShardServerTestFixture { public: void setUp() override { - _service.setFastClockSource(stdx::make_unique<ClockSourceMock>()); - _service.setPreciseClockSource(stdx::make_unique<ClockSourceMock>()); - - serverGlobalParams.clusterRole = ClusterRole::ShardServer; - _client = _service.makeClient("ShardingStateTest"); - _opCtx = _client->makeOperationContext(); - - // Set a ReplicationCoordinator, since it is accessed as part of shardVersion checks. - // TODO(esha): remove once the Safe Secondary Reads (PM-256) project is complete. - auto svCtx = getServiceContext(); - repl::ReplSettings replSettings; - replSettings.setMaster(true); - repl::ReplicationCoordinator::set( - svCtx, stdx::make_unique<repl::ReplicationCoordinatorMock>(svCtx, replSettings)); + ShardServerTestFixture::setUp(); // Note: this assumes that globalInit will always be called on the same thread as the main // test thread. - ShardingState::get(opCtx())->setGlobalInitMethodForTest( - [this](OperationContext*, const ConnectionString&, StringData) { - _initCallCount++; - return Status::OK(); - }); - } - - void tearDown() override {} - - OperationContext* opCtx() { - return _opCtx.get(); + ShardingState::get(operationContext()) + ->setGlobalInitMethodForTest( + [this](OperationContext*, const ConnectionString&, StringData) { + _initCallCount++; + return Status::OK(); + }); } int getInitCallCount() const { return _initCallCount; } - ServiceContext* getServiceContext() { - return &_service; - } - -protected: - ServiceContextNoop _service; - private: - ServiceContext::UniqueClient _client; - ServiceContext::UniqueOperationContext _opCtx; - int _initCallCount = 0; - const HostAndPort _host{"node1:12345"}; - const std::string _setName = "mySet"; - const std::vector<HostAndPort> _servers{_host}; }; TEST_F(CollShardingStateTest, GlobalInitGetsCalledAfterWriteCommits) { - CollectionShardingState collShardingState(&_service, + // Must hold a lock to call initializeFromShardIdentity, which is called by the op observer on + // the shard identity document. + Lock::GlobalWrite lock(operationContext()); + + CollectionShardingState collShardingState(getServiceContext(), NamespaceString::kConfigCollectionNamespace); ShardIdentityType shardIdentity; @@ -113,8 +81,8 @@ TEST_F(CollShardingStateTest, GlobalInitGetsCalledAfterWriteCommits) { shardIdentity.setShardName("a"); shardIdentity.setClusterId(OID::gen()); - WriteUnitOfWork wuow(opCtx()); - collShardingState.onInsertOp(opCtx(), shardIdentity.toBSON()); + WriteUnitOfWork wuow(operationContext()); + collShardingState.onInsertOp(operationContext(), shardIdentity.toBSON()); ASSERT_EQ(0, getInitCallCount()); @@ -124,6 +92,10 @@ TEST_F(CollShardingStateTest, GlobalInitGetsCalledAfterWriteCommits) { } TEST_F(CollShardingStateTest, GlobalInitDoesntGetCalledIfWriteAborts) { + // Must hold a lock to call initializeFromShardIdentity, which is called by the op observer on + // the shard identity document. + Lock::GlobalWrite lock(operationContext()); + CollectionShardingState collShardingState(getServiceContext(), NamespaceString::kConfigCollectionNamespace); @@ -134,8 +106,8 @@ TEST_F(CollShardingStateTest, GlobalInitDoesntGetCalledIfWriteAborts) { shardIdentity.setClusterId(OID::gen()); { - WriteUnitOfWork wuow(opCtx()); - collShardingState.onInsertOp(opCtx(), shardIdentity.toBSON()); + WriteUnitOfWork wuow(operationContext()); + collShardingState.onInsertOp(operationContext(), shardIdentity.toBSON()); ASSERT_EQ(0, getInitCallCount()); } @@ -144,6 +116,10 @@ TEST_F(CollShardingStateTest, GlobalInitDoesntGetCalledIfWriteAborts) { } TEST_F(CollShardingStateTest, GlobalInitDoesntGetsCalledIfNSIsNotForShardIdentity) { + // Must hold a lock to call initializeFromShardIdentity, which is called by the op observer on + // the shard identity document. + Lock::GlobalWrite lock(operationContext()); + CollectionShardingState collShardingState(getServiceContext(), NamespaceString("admin.user")); ShardIdentityType shardIdentity; @@ -152,8 +128,8 @@ TEST_F(CollShardingStateTest, GlobalInitDoesntGetsCalledIfNSIsNotForShardIdentit shardIdentity.setShardName("a"); shardIdentity.setClusterId(OID::gen()); - WriteUnitOfWork wuow(opCtx()); - collShardingState.onInsertOp(opCtx(), shardIdentity.toBSON()); + WriteUnitOfWork wuow(operationContext()); + collShardingState.onInsertOp(operationContext(), shardIdentity.toBSON()); ASSERT_EQ(0, getInitCallCount()); @@ -163,22 +139,28 @@ TEST_F(CollShardingStateTest, GlobalInitDoesntGetsCalledIfNSIsNotForShardIdentit } TEST_F(CollShardingStateTest, OnInsertOpThrowWithIncompleteShardIdentityDocument) { + // Must hold a lock to call CollectionShardingState::onInsertOp. + Lock::GlobalWrite lock(operationContext()); + CollectionShardingState collShardingState(getServiceContext(), NamespaceString::kConfigCollectionNamespace); ShardIdentityType shardIdentity; shardIdentity.setShardName("a"); - ASSERT_THROWS(collShardingState.onInsertOp(opCtx(), shardIdentity.toBSON()), + ASSERT_THROWS(collShardingState.onInsertOp(operationContext(), shardIdentity.toBSON()), AssertionException); } TEST_F(CollShardingStateTest, GlobalInitDoesntGetsCalledIfShardIdentityDocWasNotInserted) { + // Must hold a lock to call CollectionShardingState::onInsertOp. + Lock::GlobalWrite lock(operationContext()); + CollectionShardingState collShardingState(getServiceContext(), NamespaceString::kConfigCollectionNamespace); - WriteUnitOfWork wuow(opCtx()); - collShardingState.onInsertOp(opCtx(), BSON("_id" << 1)); + WriteUnitOfWork wuow(operationContext()); + collShardingState.onInsertOp(operationContext(), BSON("_id" << 1)); ASSERT_EQ(0, getInitCallCount()); diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp index 6133a27903a..ce41648e687 100644 --- a/src/mongo/db/s/migration_source_manager.cpp +++ b/src/mongo/db/s/migration_source_manager.cpp @@ -92,7 +92,7 @@ MigrationSourceManager::MigrationSourceManager(OperationContext* opCtx, _args.getFromShardId() != _args.getToShardId()); log() << "Starting chunk migration " << redact(_args.toString()) - << " with expected collection version epoch" << _args.getVersionEpoch(); + << " with expected collection version epoch " << _args.getVersionEpoch(); // Now that the collection is locked, snapshot the metadata and fetch the latest versions ShardingState* const shardingState = ShardingState::get(opCtx); diff --git a/src/mongo/db/s/shard_server_catalog_cache_loader.cpp b/src/mongo/db/s/shard_server_catalog_cache_loader.cpp index 368eb23abec..d3219b25c35 100644 --- a/src/mongo/db/s/shard_server_catalog_cache_loader.cpp +++ b/src/mongo/db/s/shard_server_catalog_cache_loader.cpp @@ -240,26 +240,64 @@ ShardServerCatalogCacheLoader::~ShardServerCatalogCacheLoader() { _threadPool.join(); } +void ShardServerCatalogCacheLoader::initializeReplicaSetRole(bool isPrimary) { + stdx::lock_guard<stdx::mutex> lock(_mutex); + invariant(_role == ReplicaSetRole::None); + + if (isPrimary) { + _role = ReplicaSetRole::Primary; + } else { + _role = ReplicaSetRole::Secondary; + } +} + +void ShardServerCatalogCacheLoader::onStepDown() { + stdx::lock_guard<stdx::mutex> lock(_mutex); + invariant(_role != ReplicaSetRole::None); + ++_term; + _role = ReplicaSetRole::Secondary; +} + +void ShardServerCatalogCacheLoader::onStepUp() { + stdx::lock_guard<stdx::mutex> lock(_mutex); + invariant(_role != ReplicaSetRole::None); + ++_term; + _role = ReplicaSetRole::Primary; +} std::shared_ptr<Notification<void>> ShardServerCatalogCacheLoader::getChunksSince( const NamespaceString& nss, ChunkVersion version, stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn) { + long long currentTerm; + bool isPrimary; + { + // Take the mutex so that we can discern whether we're primary or secondary and schedule a + // task with the corresponding _term value. + stdx::lock_guard<stdx::mutex> lock(_mutex); + invariant(_role != ReplicaSetRole::None); + + currentTerm = _term; + isPrimary = (_role == ReplicaSetRole::Primary); + } - // TODO: plug in secondary machinery, with onStepDown and onBecomePrimary tasks: clear TaskLists - // and thread pool + // TODO: add and plug in secondary machinery auto notify = std::make_shared<Notification<void>>(); - uassertStatusOK(_threadPool.schedule([ this, nss, version, callbackFn, notify ]() noexcept { - auto opCtx = Client::getCurrent()->makeOperationContext(); - try { - _schedulePrimayGetChunksSince(opCtx.get(), nss, version, callbackFn, notify); - } catch (const DBException& ex) { - callbackFn(opCtx.get(), ex.toStatus()); - notify->set(); - } - })); + if (isPrimary) { + uassertStatusOK(_threadPool.schedule( + [ this, nss, version, currentTerm, callbackFn, notify ]() noexcept { + auto opCtx = Client::getCurrent()->makeOperationContext(); + try { + _schedulePrimayGetChunksSince( + opCtx.get(), nss, version, currentTerm, callbackFn, notify); + } catch (const DBException& ex) { + callbackFn(opCtx.get(), ex.toStatus()); + notify->set(); + } + })); + } return notify; } @@ -268,6 +306,7 @@ void ShardServerCatalogCacheLoader::_schedulePrimayGetChunksSince( OperationContext* opCtx, const NamespaceString& nss, const ChunkVersion& catalogCacheSinceVersion, + long long termScheduled, stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn, std::shared_ptr<Notification<void>> notify) { @@ -277,7 +316,8 @@ void ShardServerCatalogCacheLoader::_schedulePrimayGetChunksSince( stdx::lock_guard<stdx::mutex> lock(_mutex); auto taskListIt = _taskLists.find(nss); - if (taskListIt != _taskLists.end()) { + if (taskListIt != _taskLists.end() && + taskListIt->second.hasTasksFromThisTerm(termScheduled)) { // Enqueued tasks have the latest metadata return taskListIt->second.getHighestVersionEnqueued(); } @@ -288,7 +328,7 @@ void ShardServerCatalogCacheLoader::_schedulePrimayGetChunksSince( }(); auto remoteRefreshCallbackFn = - [this, nss, catalogCacheSinceVersion, maxLoaderVersion, notify, callbackFn]( + [this, nss, catalogCacheSinceVersion, maxLoaderVersion, termScheduled, callbackFn, notify]( OperationContext* opCtx, StatusWith<CollectionAndChangedChunks> swCollectionAndChangedChunks) { @@ -297,8 +337,8 @@ void ShardServerCatalogCacheLoader::_schedulePrimayGetChunksSince( // No updates to apply. Do nothing. } else { // Enqueue a Task to apply the update retrieved from the config server. - Status scheduleStatus = - _scheduleTask(nss, Task{swCollectionAndChangedChunks, maxLoaderVersion}); + Status scheduleStatus = _scheduleTask( + nss, Task{swCollectionAndChangedChunks, maxLoaderVersion, termScheduled}); if (!scheduleStatus.isOK()) { callbackFn(opCtx, StatusWith<CollectionAndChangedChunks>(scheduleStatus)); notify->set(); @@ -310,7 +350,7 @@ void ShardServerCatalogCacheLoader::_schedulePrimayGetChunksSince( // -- both persisted and enqueued. swCollectionAndChangedChunks = - _getLoaderMetadata(opCtx, nss, catalogCacheSinceVersion); + _getLoaderMetadata(opCtx, nss, catalogCacheSinceVersion, termScheduled); // If no results were returned, convert the response into // NamespaceNotFound. if (swCollectionAndChangedChunks.isOK() && @@ -334,12 +374,13 @@ void ShardServerCatalogCacheLoader::_schedulePrimayGetChunksSince( StatusWith<CollectionAndChangedChunks> ShardServerCatalogCacheLoader::_getLoaderMetadata( OperationContext* opCtx, const NamespaceString& nss, - const ChunkVersion& catalogCacheSinceVersion) { + const ChunkVersion& catalogCacheSinceVersion, + const long long term) { // Get the enqueued metadata first. Otherwise we could miss data between reading persisted and // enqueued, if an enqueued task finished after the persisted read but before the enqueued read. - auto enqueuedRes = _getEnqueuedMetadata(nss, catalogCacheSinceVersion); + auto enqueuedRes = _getEnqueuedMetadata(nss, catalogCacheSinceVersion, term); bool isEnqueued = std::move(enqueuedRes.first); CollectionAndChangedChunks enqueued = std::move(enqueuedRes.second); @@ -387,18 +428,25 @@ StatusWith<CollectionAndChangedChunks> ShardServerCatalogCacheLoader::_getLoader } std::pair<bool, CollectionAndChangedChunks> ShardServerCatalogCacheLoader::_getEnqueuedMetadata( - const NamespaceString& nss, const ChunkVersion& catalogCacheSinceVersion) { + const NamespaceString& nss, + const ChunkVersion& catalogCacheSinceVersion, + const long long term) { stdx::unique_lock<stdx::mutex> lock(_mutex); auto taskListIt = _taskLists.find(nss); if (taskListIt == _taskLists.end()) { return std::make_pair(false, CollectionAndChangedChunks()); + } else if (!taskListIt->second.hasTasksFromThisTerm(term)) { + // If task list does not have a term that matches, there's no valid task data to collect. + return std::make_pair(false, CollectionAndChangedChunks()); } - CollectionAndChangedChunks collAndChunks = taskListIt->second.getEnqueuedMetadata(); + // Only return task data of tasks scheduled in the same term as the given 'term': older term + // task data is no longer valid. + CollectionAndChangedChunks collAndChunks = taskListIt->second.getEnqueuedMetadataForTerm(term); // Returns all the results if 'catalogCacheSinceVersion's epoch does not match. Otherwise, trim - // the results to be GTE to 'catalogCacheSinceVersion' + // the results to be GTE to 'catalogCacheSinceVersion'. if (collAndChunks.epoch != catalogCacheSinceVersion.epoch()) { return std::make_pair(true, collAndChunks); @@ -480,6 +528,12 @@ bool ShardServerCatalogCacheLoader::_updatePersistedMetadata(OperationContext* o const Task task = _taskLists[nss].getActiveTask(); invariant(task.dropped || !task.collectionAndChangedChunks->changedChunks.empty()); + // If this task is from an old term and no longer valid, do not execute and return true so that + // the task gets removed from the task list + if (task.termCreated != _term) { + return true; + } + lock.unlock(); // Check if this is a drop task. @@ -543,9 +597,11 @@ bool ShardServerCatalogCacheLoader::_updatePersistedMetadata(OperationContext* o ShardServerCatalogCacheLoader::Task::Task( StatusWith<CollectionAndChangedChunks> statusWithCollectionAndChangedChunks, - ChunkVersion minimumQueryVersion) { + ChunkVersion minimumQueryVersion, + const long long currentTerm) { minQueryVersion = minimumQueryVersion; + termCreated = currentTerm; if (statusWithCollectionAndChangedChunks.isOK()) { collectionAndChangedChunks = statusWithCollectionAndChangedChunks.getValue(); @@ -595,14 +651,24 @@ void ShardServerCatalogCacheLoader::TaskList::removeActiveTask() { _tasks.pop_front(); } +bool ShardServerCatalogCacheLoader::TaskList::hasTasksFromThisTerm(long long term) const { + return _tasks.back().termCreated == term; +} + ChunkVersion ShardServerCatalogCacheLoader::TaskList::getHighestVersionEnqueued() const { invariant(!_tasks.empty()); return _tasks.back().maxQueryVersion; } -CollectionAndChangedChunks ShardServerCatalogCacheLoader::TaskList::getEnqueuedMetadata() const { +CollectionAndChangedChunks ShardServerCatalogCacheLoader::TaskList::getEnqueuedMetadataForTerm( + const long long term) const { CollectionAndChangedChunks collAndChunks; for (const auto& task : _tasks) { + if (task.termCreated != term) { + // Task data is no longer valid. Go on to the next task in the list. + continue; + } + if (task.dropped) { // A drop task should reset the metadata. collAndChunks = CollectionAndChangedChunks(); diff --git a/src/mongo/db/s/shard_server_catalog_cache_loader.h b/src/mongo/db/s/shard_server_catalog_cache_loader.h index 5be9eb51727..845145c2708 100644 --- a/src/mongo/db/s/shard_server_catalog_cache_loader.h +++ b/src/mongo/db/s/shard_server_catalog_cache_loader.h @@ -45,11 +45,29 @@ class ThreadPoolInterface; * retrieves chunk metadata from the shard persisted chunk metadata. */ class ShardServerCatalogCacheLoader : public CatalogCacheLoader { + MONGO_DISALLOW_COPYING(ShardServerCatalogCacheLoader); + public: ShardServerCatalogCacheLoader(std::unique_ptr<CatalogCacheLoader> configLoader); ~ShardServerCatalogCacheLoader(); /** + * Initializes internal state so that the loader behaves as a primary or secondary. This can + * only be called once, when the sharding state is initialized. + */ + void initializeReplicaSetRole(bool isPrimary) override; + + /** + * Updates internal state so that the loader can start behaving like a secondary. + */ + void onStepDown() override; + + /** + * Updates internal state so that the loader can start behaving like a primary. + */ + void onStepUp() override; + + /** * This must be called serially, never in parallel, including waiting for the returned * Notification to be signalled. * @@ -66,6 +84,10 @@ public: override; private: + // Differentiates the server's role in the replica set so that the chunk loader knows whether to + // load metadata locally or remotely. + enum class ReplicaSetRole { None, Secondary, Primary }; + /** * This represents an update task for the persisted chunk metadata. The task will either be to * apply a set up updated chunks to the shard persisted metadata store or to drop the persisted @@ -86,7 +108,8 @@ private: * 'collectionAndChangedChunks' or ChunkVersion::UNSHARDED(). */ Task(StatusWith<CollectionAndChangedChunks> statusWithCollectionAndChangedChunks, - ChunkVersion minimumQueryVersion); + ChunkVersion minimumQueryVersion, + long long currentTerm); // Chunks and Collection updates to be applied to the shard persisted metadata store. boost::optional<CollectionAndChangedChunks> collectionAndChangedChunks{boost::none}; @@ -103,6 +126,9 @@ private: // Indicates whether the collection metadata must be cleared. bool dropped{false}; + + // The term in which the loader scheduled this task. + uint32_t termCreated; }; /** @@ -144,14 +170,22 @@ private: } /** + * Checks whether 'term' matches the term of the latest task in the task list. This is + * useful to check whether the task list has outdated data that's no longer valid to use in + * the current/new term specified by 'term'. + */ + bool hasTasksFromThisTerm(long long term) const; + + /** * Gets the last task's highest version -- this is the most up to date version. */ ChunkVersion getHighestVersionEnqueued() const; /** - * Iterates over the task list to retrieve the enqueued metadata. + * Iterates over the task list to retrieve the enqueued metadata. Only retrieves collects + * data from tasks that have terms matching the specified 'term'. */ - CollectionAndChangedChunks getEnqueuedMetadata() const; + CollectionAndChangedChunks getEnqueuedMetadataForTerm(const long long term) const; private: std::list<Task> _tasks{}; @@ -164,8 +198,9 @@ private: * of the shard's persisted metadata store with the latest updates retrieved from the config * server. * - * Then calls 'callbackFn' with metadata loaded from the shard persisted metadata store, and any - * in-memory task enqueued to update that store, GTE to 'catalogCacheSinceVersion' + * Then calls 'callbackFn' with metadata retrived locally from the shard persisted metadata + * store and any in-memory tasks with terms matching 'currentTerm' enqueued to update that + * store, GTE to 'catalogCacheSinceVersion'. * * Only run on the shard primary. */ @@ -173,13 +208,14 @@ private: OperationContext* opCtx, const NamespaceString& nss, const ChunkVersion& catalogCacheSinceVersion, + long long currentTerm, stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn, std::shared_ptr<Notification<void>> notify); /** - * Loads chunk metadata from the shard persisted metadata store, and any in-memory task enqueued - * to update that store, GTE to 'catalogCacheSinceVersion'. + * Loads chunk metadata from the shard persisted metadata store and any in-memory tasks with + * terms matching 'term' enqueued to update that store, GTE to 'catalogCacheSinceVersion'. * * Will return an empty CollectionAndChangedChunks object if no metadata is found (collection * was dropped). @@ -189,13 +225,14 @@ private: StatusWith<CollectionAndChangedChunks> _getLoaderMetadata( OperationContext* opCtx, const NamespaceString& nss, - const ChunkVersion& catalogCacheSinceVersion); + const ChunkVersion& catalogCacheSinceVersion, + const long long term); /** * Loads chunk metadata from all in-memory tasks enqueued to update the shard persisted metadata * store for collection 'nss' that is GTE 'catalogCacheSinceVersion'. If * 'catalogCacheSinceVersion's epoch does not match that of the metadata enqueued, returns all - * metadata. + * metadata. Ignores tasks with terms that do not match 'term': these are no longer valid. * * The bool returned in the pair indicates whether there are any tasks enqueued. If none are, it * is false. If it is true, and the CollectionAndChangedChunks returned is empty, this indicates @@ -204,7 +241,9 @@ private: * Only run on the shard primary. */ std::pair<bool, CollectionAndChangedChunks> _getEnqueuedMetadata( - const NamespaceString& nss, const ChunkVersion& catalogCacheSinceVersion); + const NamespaceString& nss, + const ChunkVersion& catalogCacheSinceVersion, + const long long term); /** * Adds 'task' to the task list for 'nss'. If this creates a new task list, then '_runTasks' is @@ -240,6 +279,15 @@ private: // Map to track in progress persisted cache updates on the shard primary. TaskLists _taskLists; + + // This value is increment every time this server changes from primary to secondary and vice + // versa. In this way, if a task is scheduled with one term value and then execution is + // attempted during another term, we can skip the operation because it is no longer valid. + long long _term{0}; + + // Indicates whether this server is the primary or not, so that the appropriate loading action + // can be taken. + ReplicaSetRole _role{ReplicaSetRole::None}; }; } // namespace mongo diff --git a/src/mongo/db/s/sharding_state.cpp b/src/mongo/db/s/sharding_state.cpp index ebe7b475a22..bf150a59957 100644 --- a/src/mongo/db/s/sharding_state.cpp +++ b/src/mongo/db/s/sharding_state.cpp @@ -267,11 +267,12 @@ Status ShardingState::refreshMetadataNow(OperationContext* opCtx, } } -// NOTE: This method can be called inside a database lock so it should never take any database +// NOTE: This method will be called inside a database lock so it should never take any database // locks, perform I/O, or any long running operations. Status ShardingState::initializeFromShardIdentity(OperationContext* opCtx, const ShardIdentityType& shardIdentity) { invariant(serverGlobalParams.clusterRole == ClusterRole::ShardServer); + invariant(opCtx->lockState()->isLocked()); Status validationStatus = shardIdentity.validate(); if (!validationStatus.isOK()) { @@ -314,13 +315,23 @@ Status ShardingState::initializeFromShardIdentity(OperationContext* opCtx, try { Status status = _globalInit(opCtx, configSvrConnStr, generateDistLockProcessId(opCtx)); if (status.isOK()) { - log() << "initialized sharding components"; - _setInitializationState(InitializationState::kInitialized); ReplicaSetMonitor::setSynchronousConfigChangeHook( &ShardRegistry::replicaSetChangeShardRegistryUpdateHook); ReplicaSetMonitor::setAsynchronousConfigChangeHook(&updateShardIdentityConfigStringCB); - _setInitializationState(InitializationState::kInitialized); + // Determine primary/secondary/standalone state in order to set it on the CatalogCache. + auto replCoord = repl::ReplicationCoordinator::get(opCtx); + bool isReplSet = + replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet; + bool isStandaloneOrPrimary = + !isReplSet || (repl::ReplicationCoordinator::get(opCtx)->getMemberState() == + repl::MemberState::RS_PRIMARY); + + Grid::get(opCtx)->catalogCache()->initializeReplicaSetRole(isStandaloneOrPrimary); + + log() << "initialized sharding components for " + << (isStandaloneOrPrimary ? "primary" : "secondary") << " node."; + _setInitializationState(InitializationState::kInitialized); } else { log() << "failed to initialize sharding components" << causedBy(status); _initializationStatus = status; @@ -350,6 +361,8 @@ void ShardingState::_setInitializationState(InitializationState newState) { } StatusWith<bool> ShardingState::initializeShardingAwarenessIfNeeded(OperationContext* opCtx) { + invariant(!opCtx->lockState()->isLocked()); + // In sharded readOnly mode, we ignore the shardIdentity document on disk and instead *require* // a shardIdentity document to be passed through --overrideShardIdentity. if (storageGlobalParams.readOnly) { @@ -364,9 +377,14 @@ StatusWith<bool> ShardingState::initializeShardingAwarenessIfNeeded(OperationCon if (!swOverrideShardIdentity.isOK()) { return swOverrideShardIdentity.getStatus(); } - auto status = initializeFromShardIdentity(opCtx, swOverrideShardIdentity.getValue()); - if (!status.isOK()) { - return status; + { + // Global lock is required to call initializeFromShardIdenetity(). + Lock::GlobalWrite lk(opCtx); + auto status = + initializeFromShardIdentity(opCtx, swOverrideShardIdentity.getValue()); + if (!status.isOK()) { + return status; + } } return true; } else { @@ -399,7 +417,6 @@ StatusWith<bool> ShardingState::initializeShardingAwarenessIfNeeded(OperationCon } // Load the shardIdentity document from disk. - invariant(!opCtx->lockState()->isLocked()); BSONObj shardIdentityBSON; bool foundShardIdentity = false; try { @@ -428,9 +445,13 @@ StatusWith<bool> ShardingState::initializeShardingAwarenessIfNeeded(OperationCon if (!swShardIdentity.isOK()) { return swShardIdentity.getStatus(); } - auto status = initializeFromShardIdentity(opCtx, swShardIdentity.getValue()); - if (!status.isOK()) { - return status; + { + // Global lock is required to call initializeFromShardIdenetity(). + Lock::GlobalWrite lk(opCtx); + auto status = initializeFromShardIdentity(opCtx, swShardIdentity.getValue()); + if (!status.isOK()) { + return status; + } } return true; } else { diff --git a/src/mongo/db/s/sharding_state.h b/src/mongo/db/s/sharding_state.h index c6efdeaae30..d41764c1d01 100644 --- a/src/mongo/db/s/sharding_state.h +++ b/src/mongo/db/s/sharding_state.h @@ -114,7 +114,11 @@ public: } /** - * Initializes the sharding state of this server from the shard identity document argument. + * Initializes the sharding state of this server from the shard identity document argument + * and sets secondary or primary state information on the catalog cache loader. + * + * Note: caller must hold a global/database lock! Needed in order to stably check for + * replica set state (primary, secondary, standalone). */ Status initializeFromShardIdentity(OperationContext* opCtx, const ShardIdentityType& shardIdentity); @@ -247,6 +251,8 @@ public: * classes for sharding were initialized, but no networking calls were made yet (with the * exception of the duplicate ShardRegistry reload in ShardRegistry::startup() (see * SERVER-26123). Outgoing networking calls to cluster members can now be made. + * + * Note: this function briefly takes the global lock to determine primary/secondary state. */ StatusWith<bool> initializeShardingAwarenessIfNeeded(OperationContext* opCtx); diff --git a/src/mongo/db/s/sharding_state_test.cpp b/src/mongo/db/s/sharding_state_test.cpp index 6e233262e2a..c47acfa143e 100644 --- a/src/mongo/db/s/sharding_state_test.cpp +++ b/src/mongo/db/s/sharding_state_test.cpp @@ -30,10 +30,12 @@ #include "mongo/client/remote_command_targeter_mock.h" #include "mongo/client/replica_set_monitor.h" +#include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/jsobj.h" #include "mongo/db/namespace_string.h" #include "mongo/db/repl/replication_coordinator_mock.h" +#include "mongo/db/s/shard_server_catalog_cache_loader.h" #include "mongo/db/s/sharding_state.h" #include "mongo/db/s/type_shard_identity.h" #include "mongo/db/server_options.h" @@ -41,7 +43,9 @@ #include "mongo/db/storage/storage_options.h" #include "mongo/s/catalog/dist_lock_manager_mock.h" #include "mongo/s/catalog/sharding_catalog_client_impl.h" +#include "mongo/s/catalog_cache.h" #include "mongo/s/client/shard_registry.h" +#include "mongo/s/config_server_catalog_cache_loader.h" #include "mongo/s/sharding_mongod_test_fixture.h" namespace mongo { @@ -116,12 +120,25 @@ protected: return stdx::make_unique<ShardingCatalogClientImpl>(std::move(distLockManager)); } + std::unique_ptr<CatalogCacheLoader> makeCatalogCacheLoader() { + return stdx::make_unique<ShardServerCatalogCacheLoader>( + stdx::make_unique<ConfigServerCatalogCacheLoader>()); + } + + std::unique_ptr<CatalogCache> makeCatalogCache( + std::unique_ptr<CatalogCacheLoader> catalogCacheLoader) { + invariant(catalogCacheLoader); + return stdx::make_unique<CatalogCache>(std::move(catalogCacheLoader)); + } + private: ShardingState _shardingState; ShardId _shardName; }; TEST_F(ShardingStateTest, ValidShardIdentitySucceeds) { + Lock::GlobalWrite lk(operationContext()); + ShardIdentityType shardIdentity; shardIdentity.setConfigsvrConnString( ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); @@ -135,6 +152,9 @@ TEST_F(ShardingStateTest, ValidShardIdentitySucceeds) { } TEST_F(ShardingStateTest, InitWhilePreviouslyInErrorStateWillStayInErrorState) { + // Must hold a lock to call initializeFromShardIdentity. + Lock::GlobalWrite lk(operationContext()); + ShardIdentityType shardIdentity; shardIdentity.setConfigsvrConnString( ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); @@ -169,6 +189,9 @@ TEST_F(ShardingStateTest, InitWhilePreviouslyInErrorStateWillStayInErrorState) { } TEST_F(ShardingStateTest, InitializeAgainWithMatchingShardIdentitySucceeds) { + // Must hold a lock to call initializeFromShardIdentity. + Lock::GlobalWrite lk(operationContext()); + auto clusterID = OID::gen(); ShardIdentityType shardIdentity; shardIdentity.setConfigsvrConnString( @@ -197,6 +220,9 @@ TEST_F(ShardingStateTest, InitializeAgainWithMatchingShardIdentitySucceeds) { } TEST_F(ShardingStateTest, InitializeAgainWithSameReplSetNameSucceeds) { + // Must hold a lock to call initializeFromShardIdentity. + Lock::GlobalWrite lk(operationContext()); + auto clusterID = OID::gen(); ShardIdentityType shardIdentity; shardIdentity.setConfigsvrConnString( |