summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/db.cpp10
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp3
-rw-r--r--src/mongo/db/s/SConscript1
-rw-r--r--src/mongo/db/s/chunk_splitter.cpp2
-rw-r--r--src/mongo/db/s/chunk_splitter.h3
-rw-r--r--src/mongo/db/s/periodic_balancer_config_refresher.cpp116
-rw-r--r--src/mongo/db/s/periodic_balancer_config_refresher.h84
-rw-r--r--src/mongo/db/s/sharding_state.cpp5
-rw-r--r--src/mongo/db/service_context_d_test_fixture.cpp6
-rw-r--r--src/mongo/util/mock_periodic_runner_impl.h62
10 files changed, 284 insertions, 8 deletions
diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp
index c14f37eef34..36c5bd0b149 100644
--- a/src/mongo/db/db.cpp
+++ b/src/mongo/db/db.cpp
@@ -518,6 +518,11 @@ ExitCode _initAndListen(int listenPort) {
<< startupWarningsLog;
}
+ // Set up the periodic runner for background job execution
+ auto runner = makePeriodicRunner(serviceContext);
+ runner->startup();
+ serviceContext->setPeriodicRunner(std::move(runner));
+
// This function may take the global lock.
auto shardingInitialized =
uassertStatusOK(ShardingState::get(startupOpCtx.get())
@@ -604,11 +609,6 @@ ExitCode _initAndListen(int listenPort) {
PeriodicTask::startRunningPeriodicTasks();
- // Set up the periodic runner for background job execution
- auto runner = makePeriodicRunner(serviceContext);
- runner->startup();
- serviceContext->setPeriodicRunner(std::move(runner));
-
SessionKiller::set(serviceContext,
std::make_shared<SessionKiller>(serviceContext, killSessionsLocal));
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index 03cc731a0b4..20d96481ace 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -72,6 +72,7 @@
#include "mongo/db/s/balancer/balancer.h"
#include "mongo/db/s/chunk_splitter.h"
#include "mongo/db/s/config/sharding_catalog_manager.h"
+#include "mongo/db/s/periodic_balancer_config_refresher.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/db/s/sharding_state_recovery.h"
#include "mongo/db/server_options.h"
@@ -705,6 +706,7 @@ void ReplicationCoordinatorExternalStateImpl::shardingOnStepDownHook() {
invariant(serverGlobalParams.clusterRole == ClusterRole::ShardServer);
ChunkSplitter::get(_service).onStepDown();
CatalogCacheLoader::get(_service).onStepDown();
+ PeriodicBalancerConfigRefresher::get(_service).onStepDown();
}
if (auto validator = LogicalTimeValidator::get(_service)) {
@@ -788,6 +790,7 @@ void ReplicationCoordinatorExternalStateImpl::_shardingOnTransitionToPrimaryHook
CatalogCacheLoader::get(_service).onStepUp();
ChunkSplitter::get(_service).onStepUp();
+ PeriodicBalancerConfigRefresher::get(_service).onStepUp(_service);
} else { // unsharded
if (auto validator = LogicalTimeValidator::get(_service)) {
validator->enableKeyGenerator(opCtx, true);
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index 5d9ef0a0ef8..dc9b1c7458b 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -30,6 +30,7 @@ env.Library(
'active_migrations_registry.cpp',
'active_move_primaries_registry.cpp',
'chunk_move_write_concern_options.cpp',
+ 'periodic_balancer_config_refresher.cpp',
'chunk_splitter.cpp',
'config_server_op_observer.cpp',
'implicit_create_collection.cpp',
diff --git a/src/mongo/db/s/chunk_splitter.cpp b/src/mongo/db/s/chunk_splitter.cpp
index a0fd9c60337..cc48a524eee 100644
--- a/src/mongo/db/s/chunk_splitter.cpp
+++ b/src/mongo/db/s/chunk_splitter.cpp
@@ -232,7 +232,7 @@ ChunkSplitter& ChunkSplitter::get(ServiceContext* serviceContext) {
return getChunkSplitter(serviceContext);
}
-void ChunkSplitter::setReplicaSetMode(bool isPrimary) {
+void ChunkSplitter::onShardingInitialization(bool isPrimary) {
stdx::lock_guard<stdx::mutex> scopedLock(_mutex);
_isPrimary = isPrimary;
}
diff --git a/src/mongo/db/s/chunk_splitter.h b/src/mongo/db/s/chunk_splitter.h
index 71800dc3585..e3ae198e8ac 100644
--- a/src/mongo/db/s/chunk_splitter.h
+++ b/src/mongo/db/s/chunk_splitter.h
@@ -29,6 +29,7 @@
#pragma once
#include "mongo/util/concurrency/thread_pool.h"
+#include "mongo/util/periodic_runner.h"
namespace mongo {
@@ -57,7 +58,7 @@ public:
* Sets the mode of the ChunkSplitter to either primary or secondary.
* The ChunkSplitter is only active when primary.
*/
- void setReplicaSetMode(bool isPrimary);
+ void onShardingInitialization(bool isPrimary);
/**
* Invoked when the shard server primary enters the 'PRIMARY' state to set up the ChunkSplitter
diff --git a/src/mongo/db/s/periodic_balancer_config_refresher.cpp b/src/mongo/db/s/periodic_balancer_config_refresher.cpp
new file mode 100644
index 00000000000..8c5de42927e
--- /dev/null
+++ b/src/mongo/db/s/periodic_balancer_config_refresher.cpp
@@ -0,0 +1,116 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/s/periodic_balancer_config_refresher.h"
+
+#include "mongo/db/operation_context.h"
+#include "mongo/db/service_context.h"
+#include "mongo/s/balancer_configuration.h"
+#include "mongo/s/grid.h"
+#include "mongo/util/log.h"
+
+namespace mongo {
+
+namespace {
+
+const auto getPeriodicBalancerConfigRefresher =
+ ServiceContext::declareDecoration<PeriodicBalancerConfigRefresher>();
+
+std::unique_ptr<PeriodicRunner::PeriodicJobHandle> launchBalancerConfigRefresher(
+ ServiceContext* serviceContext) {
+ auto periodicRunner = serviceContext->getPeriodicRunner();
+ invariant(periodicRunner);
+
+ PeriodicRunner::PeriodicJob job(
+ "PeriodicBalancerConfigRefresher",
+ [](Client* client) {
+ auto opCtx = client->makeOperationContext();
+
+ const auto balancerConfig = Grid::get(opCtx.get())->getBalancerConfiguration();
+ invariant(balancerConfig);
+
+ Status status = balancerConfig->refreshAndCheck(opCtx.get());
+ if (!status.isOK()) {
+ log() << "Failed to refresh balancer configuration" << causedBy(status);
+ }
+ },
+ Seconds(30));
+ auto balancerConfigRefresher = periodicRunner->makeJob(std::move(job));
+ balancerConfigRefresher->start();
+ return balancerConfigRefresher;
+}
+
+} // namespace
+
+PeriodicBalancerConfigRefresher& PeriodicBalancerConfigRefresher::get(OperationContext* opCtx) {
+ return get(opCtx->getServiceContext());
+}
+
+PeriodicBalancerConfigRefresher& PeriodicBalancerConfigRefresher::get(
+ ServiceContext* serviceContext) {
+ return getPeriodicBalancerConfigRefresher(serviceContext);
+}
+
+void PeriodicBalancerConfigRefresher::onShardingInitialization(ServiceContext* serviceContext,
+ bool isPrimary) {
+ _isPrimary = isPrimary;
+ // This function is called on sharding state initialization, so go ahead
+ // and start up the balancer config refresher task if we're a primary.
+ if (isPrimary && !_balancerConfigRefresher) {
+ _balancerConfigRefresher = launchBalancerConfigRefresher(serviceContext);
+ }
+}
+void PeriodicBalancerConfigRefresher::onStepUp(ServiceContext* serviceContext) {
+ if (!_isPrimary) {
+ _isPrimary = true;
+ // If this is the first time we're stepping up, start a thread to periodically refresh the
+ // balancer configuration.
+ if (!_balancerConfigRefresher) {
+ _balancerConfigRefresher = launchBalancerConfigRefresher(serviceContext);
+ } else {
+ // If we're stepping up again after having stepped down, just resume
+ // the existing task.
+ _balancerConfigRefresher->resume();
+ }
+ }
+}
+
+void PeriodicBalancerConfigRefresher::onStepDown() {
+ if (_isPrimary) {
+ _isPrimary = false;
+ invariant(_balancerConfigRefresher);
+ // We don't need to be refreshing the balancer configuration unless we're primary.
+ _balancerConfigRefresher->pause();
+ }
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/s/periodic_balancer_config_refresher.h b/src/mongo/db/s/periodic_balancer_config_refresher.h
new file mode 100644
index 00000000000..c5a86532264
--- /dev/null
+++ b/src/mongo/db/s/periodic_balancer_config_refresher.h
@@ -0,0 +1,84 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/base/disallow_copying.h"
+#include "mongo/util/periodic_runner.h"
+
+namespace mongo {
+
+class OperationContext;
+class ServiceContext;
+
+class PeriodicBalancerConfigRefresher final {
+ MONGO_DISALLOW_COPYING(PeriodicBalancerConfigRefresher);
+
+public:
+ PeriodicBalancerConfigRefresher() = default;
+ ~PeriodicBalancerConfigRefresher() = default;
+
+ PeriodicBalancerConfigRefresher(PeriodicBalancerConfigRefresher&& source) = delete;
+ PeriodicBalancerConfigRefresher& operator=(PeriodicBalancerConfigRefresher&& other) = delete;
+
+ /**
+ * Obtains the service-wide chunk PeriodicBalancerConfigRefresher instance.
+ */
+ static PeriodicBalancerConfigRefresher& get(OperationContext* opCtx);
+ static PeriodicBalancerConfigRefresher& get(ServiceContext* serviceContext);
+
+
+ /**
+ * Sets the mode to either primary or secondary. If it is primary, starts a
+ * periodic task to refresh the balancer configuration. The
+ * PeriodicBalancerConfigRefresher is only active when primary.
+ */
+ void onShardingInitialization(ServiceContext* serviceContext, bool isPrimary);
+
+ /**
+ * Invoked when the shard server primary enters the 'PRIMARY' state to
+ * trigger the start of the periodic refresh task.
+ */
+ void onStepUp(ServiceContext* serviceContext);
+
+ /**
+ * Invoked when this node which is currently serving as a 'PRIMARY' steps down.
+ *
+ * Pauses the periodic refresh until subsequent step up. This method might
+ * be called multiple times in succession, which is what happens as a
+ * result of incomplete transition to primary so it is resilient to that.
+ */
+ void onStepDown();
+
+private:
+ bool _isPrimary{false};
+
+ // Periodic job for refreshing the balancer configuration
+ std::unique_ptr<PeriodicRunner::PeriodicJobHandle> _balancerConfigRefresher;
+};
+} // namespace mongo
diff --git a/src/mongo/db/s/sharding_state.cpp b/src/mongo/db/s/sharding_state.cpp
index d7843fcac26..18c2c10f703 100644
--- a/src/mongo/db/s/sharding_state.cpp
+++ b/src/mongo/db/s/sharding_state.cpp
@@ -47,6 +47,7 @@
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/s/chunk_splitter.h"
#include "mongo/db/s/operation_sharding_state.h"
+#include "mongo/db/s/periodic_balancer_config_refresher.h"
#include "mongo/db/s/sharded_connection_info.h"
#include "mongo/db/s/sharding_initialization_mongod.h"
#include "mongo/db/s/type_shard_identity.h"
@@ -232,7 +233,9 @@ Status ShardingState::initializeFromShardIdentity(OperationContext* opCtx,
repl::MemberState::RS_PRIMARY);
CatalogCacheLoader::get(opCtx).initializeReplicaSetRole(isStandaloneOrPrimary);
- ChunkSplitter::get(opCtx).setReplicaSetMode(isStandaloneOrPrimary);
+ ChunkSplitter::get(opCtx).onShardingInitialization(isStandaloneOrPrimary);
+ PeriodicBalancerConfigRefresher::get(opCtx).onShardingInitialization(
+ opCtx->getServiceContext(), isStandaloneOrPrimary);
log() << "initialized sharding components for "
<< (isStandaloneOrPrimary ? "primary" : "secondary") << " node.";
diff --git a/src/mongo/db/service_context_d_test_fixture.cpp b/src/mongo/db/service_context_d_test_fixture.cpp
index 8f444a5a17c..ebe874c5951 100644
--- a/src/mongo/db/service_context_d_test_fixture.cpp
+++ b/src/mongo/db/service_context_d_test_fixture.cpp
@@ -43,6 +43,7 @@
#include "mongo/db/storage/storage_options.h"
#include "mongo/unittest/temp_dir.h"
#include "mongo/util/assert_util.h"
+#include "mongo/util/mock_periodic_runner_impl.h"
#include "mongo/db/catalog/database_holder.h"
@@ -62,6 +63,11 @@ ServiceContextMongoDTest::ServiceContextMongoDTest(std::string engine) {
auto logicalClock = std::make_unique<LogicalClock>(serviceContext);
LogicalClock::set(serviceContext, std::move(logicalClock));
+ // Set up a fake no-op PeriodicRunner. No jobs will ever get run, which is
+ // desired behavior for unit tests unrelated to background jobs.
+ auto runner = std::make_unique<MockPeriodicRunnerImpl>();
+ serviceContext->setPeriodicRunner(std::move(runner));
+
unittest::TempDir tempDir("service_context_d_test_fixture");
storageGlobalParams.dbpath = tempDir.path();
diff --git a/src/mongo/util/mock_periodic_runner_impl.h b/src/mongo/util/mock_periodic_runner_impl.h
new file mode 100644
index 00000000000..b1204ef510a
--- /dev/null
+++ b/src/mongo/util/mock_periodic_runner_impl.h
@@ -0,0 +1,62 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include <string>
+
+#include "mongo/util/periodic_runner.h"
+
+namespace mongo {
+
+/**
+ * A mock implementation of the PeriodicRunner interface that does nothing.
+ */
+class MockPeriodicRunnerImpl final : public PeriodicRunner {
+public:
+ class MockPeriodicJobHandleImpl final : public PeriodicRunner::PeriodicJobHandle {
+ public:
+ ~MockPeriodicJobHandleImpl() = default;
+
+ virtual void start(){};
+ virtual void pause(){};
+ virtual void resume(){};
+ };
+
+ ~MockPeriodicRunnerImpl() = default;
+
+ std::unique_ptr<PeriodicRunner::PeriodicJobHandle> makeJob(PeriodicRunner::PeriodicJob job) {
+ return std::make_unique<MockPeriodicJobHandleImpl>();
+ }
+
+ void scheduleJob(PeriodicRunner::PeriodicJob job) {}
+ void startup() {}
+ void shutdown() {}
+};
+
+} // namespace mongo