summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoanna Huang <joannahuang@Joannas-MacBook-Pro.local>2017-08-01 13:46:41 -0400
committerJoanna Huang <joannahuang@Joannas-MacBook-Pro.local>2017-08-09 10:02:15 -0400
commit373e543253801f53dfc881f46f6346c96f43e70d (patch)
treee7040d4a7ffd7b346b5b2eba1befdb7a1b4d70e2
parent041f96f0d1b8a153f76bd86b3d394961fa09b716 (diff)
downloadmongo-373e543253801f53dfc881f46f6346c96f43e70d.tar.gz
SERVER-30443 Create PeriodicBalancerSettingsRefresher on the ServiceContext
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp3
-rw-r--r--src/mongo/db/s/sharding_state.cpp1
-rw-r--r--src/mongo/s/SConscript1
-rw-r--r--src/mongo/s/periodic_balancer_settings_refresher.cpp132
-rw-r--r--src/mongo/s/periodic_balancer_settings_refresher.h106
-rw-r--r--src/mongo/s/sharding_initialization.cpp11
6 files changed, 253 insertions, 1 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index 5bdfcc46ca2..93dd61ad3ef 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -87,6 +87,7 @@
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/cluster_identity_loader.h"
#include "mongo/s/grid.h"
+#include "mongo/s/periodic_balancer_settings_refresher.h"
#include "mongo/stdx/functional.h"
#include "mongo/stdx/memory.h"
#include "mongo/stdx/thread.h"
@@ -714,6 +715,7 @@ void ReplicationCoordinatorExternalStateImpl::shardingOnStepDownHook() {
invariant(serverGlobalParams.clusterRole == ClusterRole::ShardServer);
ShardingState::get(_service)->interruptChunkSplitter();
CatalogCacheLoader::get(_service).onStepDown();
+ PeriodicBalancerSettingsRefresher::get(_service)->stop();
}
ShardingState::get(_service)->markCollectionsNotShardedAtStepdown();
@@ -802,6 +804,7 @@ void ReplicationCoordinatorExternalStateImpl::_shardingOnTransitionToPrimaryHook
}
CatalogCacheLoader::get(_service).onStepUp();
+ PeriodicBalancerSettingsRefresher::get(_service)->start();
ShardingState::get(_service)->initiateChunkSplitter();
}
diff --git a/src/mongo/db/s/sharding_state.cpp b/src/mongo/db/s/sharding_state.cpp
index 5c0f925cb91..b775ca6ae0b 100644
--- a/src/mongo/db/s/sharding_state.cpp
+++ b/src/mongo/db/s/sharding_state.cpp
@@ -343,7 +343,6 @@ Status ShardingState::initializeFromShardIdentity(OperationContext* opCtx,
repl::MemberState::RS_PRIMARY);
CatalogCacheLoader::get(opCtx).initializeReplicaSetRole(isStandaloneOrPrimary);
-
_chunkSplitter->setReplicaSetMode(isStandaloneOrPrimary);
log() << "initialized sharding components for "
diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript
index f743a4f9acd..4d001a6da9a 100644
--- a/src/mongo/s/SConscript
+++ b/src/mongo/s/SConscript
@@ -274,6 +274,7 @@ env.Library(
'config_server_catalog_cache_loader.cpp',
'config_server_client.cpp',
'grid.cpp',
+ 'periodic_balancer_settings_refresher.cpp',
'shard_util.cpp',
'sharding_egress_metadata_hook.cpp',
],
diff --git a/src/mongo/s/periodic_balancer_settings_refresher.cpp b/src/mongo/s/periodic_balancer_settings_refresher.cpp
new file mode 100644
index 00000000000..8c2d4f50c3a
--- /dev/null
+++ b/src/mongo/s/periodic_balancer_settings_refresher.cpp
@@ -0,0 +1,132 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/s/periodic_balancer_settings_refresher.h"
+
+#include "mongo/db/client.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/db/service_context.h"
+#include "mongo/s/balancer_configuration.h"
+#include "mongo/s/grid.h"
+#include "mongo/util/concurrency/idle_thread_block.h"
+#include "mongo/util/exit.h"
+#include "mongo/util/log.h"
+
+namespace mongo {
+namespace {
+
+const Milliseconds kRefreshInterval(10 * 1000);
+
+const auto getPeriodicBalancerSettingsRefresher =
+ ServiceContext::declareDecoration<std::unique_ptr<PeriodicBalancerSettingsRefresher>>();
+
+} // namespace
+
+PeriodicBalancerSettingsRefresher::PeriodicBalancerSettingsRefresher(bool isPrimary)
+ : _isPrimary(isPrimary) {
+ _thread = stdx::thread([this] { _periodicRefresh(); });
+}
+
+PeriodicBalancerSettingsRefresher::~PeriodicBalancerSettingsRefresher() {
+ invariant(!_thread.joinable());
+}
+
+void PeriodicBalancerSettingsRefresher::create(ServiceContext* serviceContext, bool isPrimary) {
+ invariant(!getPeriodicBalancerSettingsRefresher(serviceContext));
+ getPeriodicBalancerSettingsRefresher(serviceContext) =
+ stdx::make_unique<PeriodicBalancerSettingsRefresher>(isPrimary);
+
+ // Register a shutdown task to terminate the refresher thread.
+ registerShutdownTask(
+ [serviceContext] { PeriodicBalancerSettingsRefresher::get(serviceContext)->shutdown(); });
+}
+
+PeriodicBalancerSettingsRefresher* PeriodicBalancerSettingsRefresher::get(
+ ServiceContext* serviceContext) {
+ return getPeriodicBalancerSettingsRefresher(serviceContext).get();
+}
+
+void PeriodicBalancerSettingsRefresher::start() {
+ stdx::lock_guard<stdx::mutex> scopedLock(_mutex);
+ invariant(!_isPrimary);
+ _isPrimary = true;
+}
+
+void PeriodicBalancerSettingsRefresher::stop() {
+ stdx::lock_guard<stdx::mutex> scopedLock(_mutex);
+ if (!_isPrimary) {
+ return;
+ }
+ _isPrimary = false;
+}
+
+void PeriodicBalancerSettingsRefresher::shutdown() {
+ {
+ stdx::lock_guard<stdx::mutex> scopedLock(_mutex);
+ if (_isShutdown) {
+ return;
+ }
+ _isShutdown = true;
+ }
+
+ _thread.join();
+ _thread = {};
+}
+
+void PeriodicBalancerSettingsRefresher::_periodicRefresh() {
+ Client::initThread("Periodic Balancer Settings Refresher");
+ auto opCtx = cc().makeOperationContext();
+
+ while (!_shutDownRequested()) {
+ if (_isPrimary) {
+ auto status =
+ Grid::get(opCtx.get())->getBalancerConfiguration()->refreshAndCheck(opCtx.get());
+ if (!status.isOK()) {
+ log() << "failed to refresh balancer settings" << causedBy(status);
+ }
+ }
+
+ try {
+ MONGO_IDLE_THREAD_BLOCK;
+ opCtx->sleepFor(kRefreshInterval);
+ } catch (DBException e) {
+ log() << "Periodic Balancer Settings Refresher interrupted ::" << e.what();
+ }
+ }
+}
+
+bool PeriodicBalancerSettingsRefresher::_shutDownRequested() {
+ stdx::lock_guard<stdx::mutex> scopedLock(_mutex);
+ return _isShutdown;
+}
+
+} // namespace mongo
diff --git a/src/mongo/s/periodic_balancer_settings_refresher.h b/src/mongo/s/periodic_balancer_settings_refresher.h
new file mode 100644
index 00000000000..5b747836496
--- /dev/null
+++ b/src/mongo/s/periodic_balancer_settings_refresher.h
@@ -0,0 +1,106 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/base/disallow_copying.h"
+#include "mongo/stdx/mutex.h"
+#include "mongo/stdx/thread.h"
+
+namespace mongo {
+
+class OperationContext;
+class ServiceContext;
+
+/**
+ * Periodically refreshes the cache of the balancer configuration on a primary shard.
+ */
+class PeriodicBalancerSettingsRefresher {
+ MONGO_DISALLOW_COPYING(PeriodicBalancerSettingsRefresher);
+
+public:
+ PeriodicBalancerSettingsRefresher(bool isPrimary);
+ ~PeriodicBalancerSettingsRefresher();
+
+ /**
+ * Instantiates an instance of the PeriodicBalancerSettingsRefresher and installs it on the
+ * specified service context.
+ *
+ * This method is not thread-safe and must be called only once when the service is starting.
+ */
+ static void create(ServiceContext* serviceContext, bool isPrimary);
+
+ /**
+ * Retrieves the per-service instance of the PeriodicBalancerSettingsRefresher.
+ */
+ static PeriodicBalancerSettingsRefresher* get(ServiceContext* serviceContext);
+
+ /**
+ * Invoked when the shard server primary enters the 'PRIMARY' state to start the
+ * PeriodicBalancerSettingsRefresher.
+ */
+ void start();
+
+ /**
+ * Invoked when this node which is currently serving as a 'PRIMARY' steps down.
+ *
+ * This method might be called multiple times in succession, which is what happens as a result
+ * of incomplete transition to primary so it is resilient to that.
+ */
+ void stop();
+
+ /**
+ * Signals shutdown and blocks until the refresher thread has stopped.
+ */
+ void shutdown();
+
+private:
+ /**
+ * The main loop that refreshes the cache periodically. This runs in a separate thread.
+ */
+ void _periodicRefresh();
+
+ /**
+ * Use to check whether or not shutdown has been requested for the running thread.
+ */
+ bool _shutDownRequested();
+
+ // The background thread to refresh balancer settings
+ stdx::thread _thread;
+
+ // Protects the state below
+ stdx::mutex _mutex;
+
+ // The PeriodicBalancerSettingsRefresher is only active on a primary node.
+ bool _isPrimary;
+
+ // Used to shut down the background thread
+ bool _isShutdown{false};
+};
+
+} // namespace mongo
diff --git a/src/mongo/s/sharding_initialization.cpp b/src/mongo/s/sharding_initialization.cpp
index 49931ed353a..60fcd383df4 100644
--- a/src/mongo/s/sharding_initialization.cpp
+++ b/src/mongo/s/sharding_initialization.cpp
@@ -65,6 +65,7 @@
#include "mongo/s/client/sharding_network_connection_hook.h"
#include "mongo/s/cluster_identity_loader.h"
#include "mongo/s/grid.h"
+#include "mongo/s/periodic_balancer_settings_refresher.h"
#include "mongo/s/query/cluster_cursor_manager.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/exit.h"
@@ -225,6 +226,16 @@ Status initializeGlobalShardingState(OperationContext* opCtx,
if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer &&
replCoord->getMemberState().primary()) {
LogicalTimeValidator::get(opCtx)->enableKeyGenerator(opCtx, true);
+ } else if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) {
+ // Determine primary/secondary/standalone state in order to properly set up the refresher.
+ bool isReplSet =
+ replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet;
+ bool isStandaloneOrPrimary =
+ !isReplSet || (repl::ReplicationCoordinator::get(opCtx)->getMemberState() ==
+ repl::MemberState::RS_PRIMARY);
+
+ PeriodicBalancerSettingsRefresher::create(opCtx->getServiceContext(),
+ isStandaloneOrPrimary);
}
return Status::OK();