summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp8
-rw-r--r--src/mongo/db/s/sharding_initialization_mongod.cpp107
-rw-r--r--src/mongo/db/s/sharding_initialization_mongod.h11
3 files changed, 84 insertions, 42 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index 9d8162067fc..c34fe99d4f7 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -779,12 +779,8 @@ void ReplicationCoordinatorExternalStateImpl::_shardingOnTransitionToPrimaryHook
const auto configsvrConnStr =
Grid::get(opCtx)->shardRegistry()->getConfigShard()->getConnString();
- status = ShardingInitializationMongoD::get(opCtx)->updateShardIdentityConfigString(
- opCtx, configsvrConnStr);
- if (!status.isOK()) {
- warning() << "error encountered while trying to update config connection string to "
- << configsvrConnStr << causedBy(status);
- }
+ ShardingInitializationMongoD::get(opCtx)->updateShardIdentityConfigString(opCtx,
+ configsvrConnStr);
CatalogCacheLoader::get(_service).onStepUp();
ChunkSplitter::get(_service).onStepUp();
diff --git a/src/mongo/db/s/sharding_initialization_mongod.cpp b/src/mongo/db/s/sharding_initialization_mongod.cpp
index 1902d884cee..7bee0b6a789 100644
--- a/src/mongo/db/s/sharding_initialization_mongod.cpp
+++ b/src/mongo/db/s/sharding_initialization_mongod.cpp
@@ -67,6 +67,10 @@
#include "mongo/util/log.h"
namespace mongo {
+
+// Failpoint for disabling updateShardIdentityConfigString calls on signaled RS nodes.
+MONGO_FAIL_POINT_DEFINE(failUpdateShardIdentityConfigString);
+
namespace {
const auto getInstance = ServiceContext::declareDecoration<ShardingInitializationMongoD>();
@@ -83,41 +87,73 @@ auto makeEgressHooksList(ServiceContext* service) {
/**
* Updates the config server field of the shardIdentity document with the given connection string if
* setName is equal to the config server replica set name.
- *
- * NOTE: This is intended to be used on a new thread that hasn't called Client::initThread.
- *
- * One example use case is for the ReplicaSetMonitor asynchronous callback when it detects changes
- * to replica set membership.
*/
-void updateShardIdentityConfigStringCB(const std::string& setName,
- const std::string& newConnectionString) {
- auto configsvrConnStr =
- Grid::get(getGlobalServiceContext())->shardRegistry()->getConfigServerConnectionString();
- if (configsvrConnStr.getSetName() != setName) {
- // Ignore all change notification for other sets that are not the config server.
- return;
- }
+class ShardingReplicaSetChangeListener final : public ReplicaSetChangeNotifier::Listener {
+public:
+ ShardingReplicaSetChangeListener(ServiceContext* serviceContext)
+ : _serviceContext(serviceContext) {}
+ ~ShardingReplicaSetChangeListener() final = default;
+
+ void onFoundSet(const Key&) final {}
+
+ // Update the shard identy config string
+ void onConfirmedSet(const State& state) final {
+ auto connStr = state.connStr;
+
+ auto fun = [ serviceContext = _serviceContext, connStr ](auto args) {
+ if (ErrorCodes::isCancelationError(args.status.code())) {
+ return;
+ }
+ uassertStatusOK(args.status);
+
+ LOG(0) << "Updating config server with confirmed set " << connStr;
+ Grid::get(serviceContext)->shardRegistry()->updateReplSetHosts(connStr);
+
+ if (MONGO_FAIL_POINT(failUpdateShardIdentityConfigString)) {
+ return;
+ }
+
+ auto configsvrConnStr =
+ Grid::get(serviceContext)->shardRegistry()->getConfigServerConnectionString();
- Client::initThread("updateShardIdentityConfigConnString");
- auto uniqOpCtx = Client::getCurrent()->makeOperationContext();
+ // Only proceed if the notification is for the configsvr
+ if (configsvrConnStr.getSetName() != connStr.getSetName()) {
+ return;
+ }
+
+ ThreadClient tc("updateShardIdentityConfigString", serviceContext);
+ auto opCtx = tc->makeOperationContext();
+
+ ShardingInitializationMongoD::updateShardIdentityConfigString(opCtx.get(), connStr);
+ };
- auto status = ShardingInitializationMongoD::updateShardIdentityConfigString(
- uniqOpCtx.get(), uassertStatusOK(ConnectionString::parse(newConnectionString)));
- if (!status.isOK() && !ErrorCodes::isNotMasterError(status.code())) {
- warning() << "Error encountered while trying to update config connection string to "
- << newConnectionString << causedBy(redact(status));
+ auto executor = Grid::get(_serviceContext)->getExecutorPool()->getFixedExecutor();
+ auto schedStatus = executor->scheduleWork(std::move(fun)).getStatus();
+ if (ErrorCodes::isCancelationError(schedStatus.code())) {
+ LOG(2) << "Unable to schedule confirmed set update due to " << schedStatus;
+ return;
+ }
+ uassertStatusOK(schedStatus);
}
-}
+ void onPossibleSet(const State& state) final {
+ Grid::get(_serviceContext)->shardRegistry()->updateReplSetHosts(state.connStr);
+ }
+ void onDroppedSet(const Key&) final {}
-void initializeShardingEnvironmentOnShardServer(OperationContext* opCtx,
- const ShardIdentity& shardIdentity,
- StringData distLockProcessId) {
+private:
+ ServiceContext* _serviceContext;
+};
+
+} // namespace
+
+void ShardingInitializationMongoD::initializeShardingEnvironmentOnShardServer(
+ OperationContext* opCtx, const ShardIdentity& shardIdentity, StringData distLockProcessId) {
initializeGlobalShardingStateForMongoD(
opCtx, shardIdentity.getConfigsvrConnectionString(), distLockProcessId);
- ReplicaSetMonitor::setSynchronousConfigChangeHook(
- &ShardRegistry::replicaSetChangeShardRegistryUpdateHook);
- ReplicaSetMonitor::setAsynchronousConfigChangeHook(&updateShardIdentityConfigStringCB);
+ _replicaSetChangeListener =
+ ReplicaSetMonitor::getNotifier().makeListener<ShardingReplicaSetChangeListener>(
+ opCtx->getServiceContext());
// Determine primary/secondary/standalone state in order to properly initialize sharding
// components.
@@ -141,10 +177,10 @@ void initializeShardingEnvironmentOnShardServer(OperationContext* opCtx,
<< (isStandaloneOrPrimary ? "primary" : "secondary") << " node.";
}
-} // namespace
-
ShardingInitializationMongoD::ShardingInitializationMongoD()
- : _initFunc(initializeShardingEnvironmentOnShardServer) {}
+ : _initFunc([this](auto... args) {
+ this->initializeShardingEnvironmentOnShardServer(std::forward<decltype(args)>(args)...);
+ }) {}
ShardingInitializationMongoD::~ShardingInitializationMongoD() = default;
@@ -166,6 +202,7 @@ void ShardingInitializationMongoD::shutDown(OperationContext* opCtx) {
grid->getExecutorPool()->shutdownAndJoin();
grid->catalogClient()->shutDown(opCtx);
grid->shardRegistry()->shutdown();
+ _replicaSetChangeListener.reset();
}
bool ShardingInitializationMongoD::initializeShardingAwarenessIfNeeded(OperationContext* opCtx) {
@@ -306,7 +343,7 @@ void ShardingInitializationMongoD::initializeFromShardIdentity(
}
}
-Status ShardingInitializationMongoD::updateShardIdentityConfigString(
+void ShardingInitializationMongoD::updateShardIdentityConfigString(
OperationContext* opCtx, const ConnectionString& newConnectionString) {
BSONObj updateObj(
ShardIdentityType::createConfigServerUpdateObject(newConnectionString.toString()));
@@ -328,10 +365,12 @@ Status ShardingInitializationMongoD::updateShardIdentityConfigString(
<< newConnectionString;
}
} catch (const DBException& exception) {
- return exception.toStatus();
+ auto status = exception.toStatus();
+ if (!ErrorCodes::isNotMasterError(status.code())) {
+ warning() << "Error encountered while trying to update config connection string to "
+ << newConnectionString.toString() << causedBy(redact(status));
+ }
}
-
- return Status::OK();
}
void initializeGlobalShardingStateForMongoD(OperationContext* opCtx,
diff --git a/src/mongo/db/s/sharding_initialization_mongod.h b/src/mongo/db/s/sharding_initialization_mongod.h
index e26302b4bb2..a205d68d1b2 100644
--- a/src/mongo/db/s/sharding_initialization_mongod.h
+++ b/src/mongo/db/s/sharding_initialization_mongod.h
@@ -30,6 +30,7 @@
#pragma once
#include "mongo/base/string_data.h"
+#include "mongo/client/replica_set_change_notifier.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/db/s/type_shard_identity.h"
#include "mongo/stdx/functional.h"
@@ -60,6 +61,10 @@ public:
static ShardingInitializationMongoD* get(OperationContext* opCtx);
static ShardingInitializationMongoD* get(ServiceContext* service);
+ void initializeShardingEnvironmentOnShardServer(OperationContext* opCtx,
+ const ShardIdentity& shardIdentity,
+ StringData distLockProcessId);
+
/**
* If started with --shardsvr, initializes sharding awareness from the shardIdentity document on
* disk, if there is one.
@@ -94,8 +99,8 @@ public:
* Updates the config server field of the shardIdentity document with the given connection
* string.
*/
- static Status updateShardIdentityConfigString(OperationContext* opCtx,
- const ConnectionString& newConnectionString);
+ static void updateShardIdentityConfigString(OperationContext* opCtx,
+ const ConnectionString& newConnectionString);
/**
* For testing only. Mock the initialization method used by initializeFromConfigConnString and
@@ -112,6 +117,8 @@ private:
// Function for initializing the sharding environment components (i.e. everything on the Grid)
ShardingEnvironmentInitFunc _initFunc;
+
+ ReplicaSetChangeListenerHandle _replicaSetChangeListener;
};
/**