diff options
-rw-r--r-- | src/mongo/db/db.cpp | 10 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_external_state_impl.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/s/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/s/chunk_splitter.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/s/chunk_splitter.h | 3 | ||||
-rw-r--r-- | src/mongo/db/s/periodic_balancer_config_refresher.cpp | 116 | ||||
-rw-r--r-- | src/mongo/db/s/periodic_balancer_config_refresher.h | 84 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_state.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/service_context_d_test_fixture.cpp | 6 | ||||
-rw-r--r-- | src/mongo/util/mock_periodic_runner_impl.h | 62 |
10 files changed, 284 insertions, 8 deletions
diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp index c14f37eef34..36c5bd0b149 100644 --- a/src/mongo/db/db.cpp +++ b/src/mongo/db/db.cpp @@ -518,6 +518,11 @@ ExitCode _initAndListen(int listenPort) { << startupWarningsLog; } + // Set up the periodic runner for background job execution + auto runner = makePeriodicRunner(serviceContext); + runner->startup(); + serviceContext->setPeriodicRunner(std::move(runner)); + // This function may take the global lock. auto shardingInitialized = uassertStatusOK(ShardingState::get(startupOpCtx.get()) @@ -604,11 +609,6 @@ ExitCode _initAndListen(int listenPort) { PeriodicTask::startRunningPeriodicTasks(); - // Set up the periodic runner for background job execution - auto runner = makePeriodicRunner(serviceContext); - runner->startup(); - serviceContext->setPeriodicRunner(std::move(runner)); - SessionKiller::set(serviceContext, std::make_shared<SessionKiller>(serviceContext, killSessionsLocal)); diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index 03cc731a0b4..20d96481ace 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -72,6 +72,7 @@ #include "mongo/db/s/balancer/balancer.h" #include "mongo/db/s/chunk_splitter.h" #include "mongo/db/s/config/sharding_catalog_manager.h" +#include "mongo/db/s/periodic_balancer_config_refresher.h" #include "mongo/db/s/sharding_state.h" #include "mongo/db/s/sharding_state_recovery.h" #include "mongo/db/server_options.h" @@ -705,6 +706,7 @@ void ReplicationCoordinatorExternalStateImpl::shardingOnStepDownHook() { invariant(serverGlobalParams.clusterRole == ClusterRole::ShardServer); ChunkSplitter::get(_service).onStepDown(); CatalogCacheLoader::get(_service).onStepDown(); + PeriodicBalancerConfigRefresher::get(_service).onStepDown(); } if (auto validator = LogicalTimeValidator::get(_service)) { @@ -788,6 +790,7 @@ void ReplicationCoordinatorExternalStateImpl::_shardingOnTransitionToPrimaryHook CatalogCacheLoader::get(_service).onStepUp(); ChunkSplitter::get(_service).onStepUp(); + PeriodicBalancerConfigRefresher::get(_service).onStepUp(_service); } else { // unsharded if (auto validator = LogicalTimeValidator::get(_service)) { validator->enableKeyGenerator(opCtx, true); diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index 5d9ef0a0ef8..dc9b1c7458b 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -30,6 +30,7 @@ env.Library( 'active_migrations_registry.cpp', 'active_move_primaries_registry.cpp', 'chunk_move_write_concern_options.cpp', + 'periodic_balancer_config_refresher.cpp', 'chunk_splitter.cpp', 'config_server_op_observer.cpp', 'implicit_create_collection.cpp', diff --git a/src/mongo/db/s/chunk_splitter.cpp b/src/mongo/db/s/chunk_splitter.cpp index a0fd9c60337..cc48a524eee 100644 --- a/src/mongo/db/s/chunk_splitter.cpp +++ b/src/mongo/db/s/chunk_splitter.cpp @@ -232,7 +232,7 @@ ChunkSplitter& ChunkSplitter::get(ServiceContext* serviceContext) { return getChunkSplitter(serviceContext); } -void ChunkSplitter::setReplicaSetMode(bool isPrimary) { +void ChunkSplitter::onShardingInitialization(bool isPrimary) { stdx::lock_guard<stdx::mutex> scopedLock(_mutex); _isPrimary = isPrimary; } diff --git a/src/mongo/db/s/chunk_splitter.h b/src/mongo/db/s/chunk_splitter.h index 71800dc3585..e3ae198e8ac 100644 --- a/src/mongo/db/s/chunk_splitter.h +++ b/src/mongo/db/s/chunk_splitter.h @@ -29,6 +29,7 @@ #pragma once #include "mongo/util/concurrency/thread_pool.h" +#include "mongo/util/periodic_runner.h" namespace mongo { @@ -57,7 +58,7 @@ public: * Sets the mode of the ChunkSplitter to either primary or secondary. * The ChunkSplitter is only active when primary. */ - void setReplicaSetMode(bool isPrimary); + void onShardingInitialization(bool isPrimary); /** * Invoked when the shard server primary enters the 'PRIMARY' state to set up the ChunkSplitter diff --git a/src/mongo/db/s/periodic_balancer_config_refresher.cpp b/src/mongo/db/s/periodic_balancer_config_refresher.cpp new file mode 100644 index 00000000000..8c5de42927e --- /dev/null +++ b/src/mongo/db/s/periodic_balancer_config_refresher.cpp @@ -0,0 +1,116 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding + +#include "mongo/platform/basic.h" + +#include "mongo/db/s/periodic_balancer_config_refresher.h" + +#include "mongo/db/operation_context.h" +#include "mongo/db/service_context.h" +#include "mongo/s/balancer_configuration.h" +#include "mongo/s/grid.h" +#include "mongo/util/log.h" + +namespace mongo { + +namespace { + +const auto getPeriodicBalancerConfigRefresher = + ServiceContext::declareDecoration<PeriodicBalancerConfigRefresher>(); + +std::unique_ptr<PeriodicRunner::PeriodicJobHandle> launchBalancerConfigRefresher( + ServiceContext* serviceContext) { + auto periodicRunner = serviceContext->getPeriodicRunner(); + invariant(periodicRunner); + + PeriodicRunner::PeriodicJob job( + "PeriodicBalancerConfigRefresher", + [](Client* client) { + auto opCtx = client->makeOperationContext(); + + const auto balancerConfig = Grid::get(opCtx.get())->getBalancerConfiguration(); + invariant(balancerConfig); + + Status status = balancerConfig->refreshAndCheck(opCtx.get()); + if (!status.isOK()) { + log() << "Failed to refresh balancer configuration" << causedBy(status); + } + }, + Seconds(30)); + auto balancerConfigRefresher = periodicRunner->makeJob(std::move(job)); + balancerConfigRefresher->start(); + return balancerConfigRefresher; +} + +} // namespace + +PeriodicBalancerConfigRefresher& PeriodicBalancerConfigRefresher::get(OperationContext* opCtx) { + return get(opCtx->getServiceContext()); +} + +PeriodicBalancerConfigRefresher& PeriodicBalancerConfigRefresher::get( + ServiceContext* serviceContext) { + return getPeriodicBalancerConfigRefresher(serviceContext); +} + +void PeriodicBalancerConfigRefresher::onShardingInitialization(ServiceContext* serviceContext, + bool isPrimary) { + _isPrimary = isPrimary; + // This function is called on sharding state initialization, so go ahead + // and start up the balancer config refresher task if we're a primary. + if (isPrimary && !_balancerConfigRefresher) { + _balancerConfigRefresher = launchBalancerConfigRefresher(serviceContext); + } +} +void PeriodicBalancerConfigRefresher::onStepUp(ServiceContext* serviceContext) { + if (!_isPrimary) { + _isPrimary = true; + // If this is the first time we're stepping up, start a thread to periodically refresh the + // balancer configuration. + if (!_balancerConfigRefresher) { + _balancerConfigRefresher = launchBalancerConfigRefresher(serviceContext); + } else { + // If we're stepping up again after having stepped down, just resume + // the existing task. + _balancerConfigRefresher->resume(); + } + } +} + +void PeriodicBalancerConfigRefresher::onStepDown() { + if (_isPrimary) { + _isPrimary = false; + invariant(_balancerConfigRefresher); + // We don't need to be refreshing the balancer configuration unless we're primary. + _balancerConfigRefresher->pause(); + } +} + +} // namespace mongo diff --git a/src/mongo/db/s/periodic_balancer_config_refresher.h b/src/mongo/db/s/periodic_balancer_config_refresher.h new file mode 100644 index 00000000000..c5a86532264 --- /dev/null +++ b/src/mongo/db/s/periodic_balancer_config_refresher.h @@ -0,0 +1,84 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/base/disallow_copying.h" +#include "mongo/util/periodic_runner.h" + +namespace mongo { + +class OperationContext; +class ServiceContext; + +class PeriodicBalancerConfigRefresher final { + MONGO_DISALLOW_COPYING(PeriodicBalancerConfigRefresher); + +public: + PeriodicBalancerConfigRefresher() = default; + ~PeriodicBalancerConfigRefresher() = default; + + PeriodicBalancerConfigRefresher(PeriodicBalancerConfigRefresher&& source) = delete; + PeriodicBalancerConfigRefresher& operator=(PeriodicBalancerConfigRefresher&& other) = delete; + + /** + * Obtains the service-wide chunk PeriodicBalancerConfigRefresher instance. + */ + static PeriodicBalancerConfigRefresher& get(OperationContext* opCtx); + static PeriodicBalancerConfigRefresher& get(ServiceContext* serviceContext); + + + /** + * Sets the mode to either primary or secondary. If it is primary, starts a + * periodic task to refresh the balancer configuration. The + * PeriodicBalancerConfigRefresher is only active when primary. + */ + void onShardingInitialization(ServiceContext* serviceContext, bool isPrimary); + + /** + * Invoked when the shard server primary enters the 'PRIMARY' state to + * trigger the start of the periodic refresh task. + */ + void onStepUp(ServiceContext* serviceContext); + + /** + * Invoked when this node which is currently serving as a 'PRIMARY' steps down. + * + * Pauses the periodic refresh until subsequent step up. This method might + * be called multiple times in succession, which is what happens as a + * result of incomplete transition to primary so it is resilient to that. + */ + void onStepDown(); + +private: + bool _isPrimary{false}; + + // Periodic job for refreshing the balancer configuration + std::unique_ptr<PeriodicRunner::PeriodicJobHandle> _balancerConfigRefresher; +}; +} // namespace mongo diff --git a/src/mongo/db/s/sharding_state.cpp b/src/mongo/db/s/sharding_state.cpp index d7843fcac26..18c2c10f703 100644 --- a/src/mongo/db/s/sharding_state.cpp +++ b/src/mongo/db/s/sharding_state.cpp @@ -47,6 +47,7 @@ #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/s/chunk_splitter.h" #include "mongo/db/s/operation_sharding_state.h" +#include "mongo/db/s/periodic_balancer_config_refresher.h" #include "mongo/db/s/sharded_connection_info.h" #include "mongo/db/s/sharding_initialization_mongod.h" #include "mongo/db/s/type_shard_identity.h" @@ -232,7 +233,9 @@ Status ShardingState::initializeFromShardIdentity(OperationContext* opCtx, repl::MemberState::RS_PRIMARY); CatalogCacheLoader::get(opCtx).initializeReplicaSetRole(isStandaloneOrPrimary); - ChunkSplitter::get(opCtx).setReplicaSetMode(isStandaloneOrPrimary); + ChunkSplitter::get(opCtx).onShardingInitialization(isStandaloneOrPrimary); + PeriodicBalancerConfigRefresher::get(opCtx).onShardingInitialization( + opCtx->getServiceContext(), isStandaloneOrPrimary); log() << "initialized sharding components for " << (isStandaloneOrPrimary ? "primary" : "secondary") << " node."; diff --git a/src/mongo/db/service_context_d_test_fixture.cpp b/src/mongo/db/service_context_d_test_fixture.cpp index 8f444a5a17c..ebe874c5951 100644 --- a/src/mongo/db/service_context_d_test_fixture.cpp +++ b/src/mongo/db/service_context_d_test_fixture.cpp @@ -43,6 +43,7 @@ #include "mongo/db/storage/storage_options.h" #include "mongo/unittest/temp_dir.h" #include "mongo/util/assert_util.h" +#include "mongo/util/mock_periodic_runner_impl.h" #include "mongo/db/catalog/database_holder.h" @@ -62,6 +63,11 @@ ServiceContextMongoDTest::ServiceContextMongoDTest(std::string engine) { auto logicalClock = std::make_unique<LogicalClock>(serviceContext); LogicalClock::set(serviceContext, std::move(logicalClock)); + // Set up a fake no-op PeriodicRunner. No jobs will ever get run, which is + // desired behavior for unit tests unrelated to background jobs. + auto runner = std::make_unique<MockPeriodicRunnerImpl>(); + serviceContext->setPeriodicRunner(std::move(runner)); + unittest::TempDir tempDir("service_context_d_test_fixture"); storageGlobalParams.dbpath = tempDir.path(); diff --git a/src/mongo/util/mock_periodic_runner_impl.h b/src/mongo/util/mock_periodic_runner_impl.h new file mode 100644 index 00000000000..b1204ef510a --- /dev/null +++ b/src/mongo/util/mock_periodic_runner_impl.h @@ -0,0 +1,62 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include <string> + +#include "mongo/util/periodic_runner.h" + +namespace mongo { + +/** + * A mock implementation of the PeriodicRunner interface that does nothing. + */ +class MockPeriodicRunnerImpl final : public PeriodicRunner { +public: + class MockPeriodicJobHandleImpl final : public PeriodicRunner::PeriodicJobHandle { + public: + ~MockPeriodicJobHandleImpl() = default; + + virtual void start(){}; + virtual void pause(){}; + virtual void resume(){}; + }; + + ~MockPeriodicRunnerImpl() = default; + + std::unique_ptr<PeriodicRunner::PeriodicJobHandle> makeJob(PeriodicRunner::PeriodicJob job) { + return std::make_unique<MockPeriodicJobHandleImpl>(); + } + + void scheduleJob(PeriodicRunner::PeriodicJob job) {} + void startup() {} + void shutdown() {} +}; + +} // namespace mongo |