summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKevin Pulo <kevin.pulo@mongodb.com>2020-07-02 20:37:03 +1000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-08-05 09:32:06 +0000
commit403ee63aad4b5d76b2abd5e003eb77cb1dbdbf51 (patch)
tree40025e2e19fec26e7865f954a8a00146e5d72adb
parentbbeb56709be4a9b5a764c7809105ad2e30167fc9 (diff)
downloadmongo-403ee63aad4b5d76b2abd5e003eb77cb1dbdbf51.tar.gz
SERVER-46202 Make ShardRegistryData immutable
-rw-r--r--src/mongo/s/SConscript2
-rw-r--r--src/mongo/s/client/shard_registry.cpp329
-rw-r--r--src/mongo/s/client/shard_registry.h143
-rw-r--r--src/mongo/s/client/shard_registry_data_test.cpp92
4 files changed, 235 insertions, 331 deletions
diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript
index 14f85e390a1..70400e57ed0 100644
--- a/src/mongo/s/SConscript
+++ b/src/mongo/s/SConscript
@@ -258,7 +258,6 @@ env.Library(
'$BUILD_DIR/mongo/db/logical_time_metadata_hook',
'$BUILD_DIR/mongo/util/concurrency/thread_pool',
'$BUILD_DIR/mongo/executor/task_executor_pool',
- '$BUILD_DIR/mongo/util/caching',
'client/shard_interface',
'query/cluster_cursor_manager',
'sharding_routing_table',
@@ -585,7 +584,6 @@ env.CppUnitTest(
'chunk_test.cpp',
'chunk_version_test.cpp',
'chunk_writes_tracker_test.cpp',
- 'client/shard_registry_data_test.cpp',
'client/shard_remote_test.cpp',
'cluster_identity_loader_test.cpp',
'cluster_last_error_info_test.cpp',
diff --git a/src/mongo/s/client/shard_registry.cpp b/src/mongo/s/client/shard_registry.cpp
index 61ee14708a1..e0a5cc13476 100644
--- a/src/mongo/s/client/shard_registry.cpp
+++ b/src/mongo/s/client/shard_registry.cpp
@@ -64,12 +64,6 @@
namespace mongo {
-using std::set;
-using std::shared_ptr;
-using std::string;
-using std::unique_ptr;
-using std::vector;
-
using executor::NetworkInterface;
using executor::NetworkInterfaceThreadPool;
using executor::TaskExecutor;
@@ -90,18 +84,20 @@ ShardRegistry::ShardRegistry(std::unique_ptr<ShardFactory> shardFactory,
std::vector<ShardRemovalHook> shardRemovalHooks)
: _shardFactory(std::move(shardFactory)),
_initConfigServerCS(configServerCS),
- _shardRemovalHooks(std::move(shardRemovalHooks)) {}
+ _shardRemovalHooks(std::move(shardRemovalHooks)) {
+ invariant(_initConfigServerCS.isValid());
+}
ShardRegistry::~ShardRegistry() {
shutdown();
}
void ShardRegistry::shutdown() {
- if (_executor && !_isShutdown) {
+ if (_executor && !_isShutdown.load()) {
LOGV2_DEBUG(22723, 1, "Shutting down task executor for reloading shard registry");
_executor->shutdown();
_executor->join();
- _isShutdown = true;
+ _isShutdown.store(true);
}
}
@@ -109,17 +105,17 @@ ConnectionString ShardRegistry::getConfigServerConnectionString() const {
return getConfigShard()->getConnString();
}
-StatusWith<shared_ptr<Shard>> ShardRegistry::getShard(OperationContext* opCtx,
- const ShardId& shardId) {
+StatusWith<std::shared_ptr<Shard>> ShardRegistry::getShard(OperationContext* opCtx,
+ const ShardId& shardId) {
// If we know about the shard, return it.
- auto shard = _data.findShard(shardId);
+ auto shard = getShardNoReload(shardId);
if (shard) {
return shard;
}
// If we can't find the shard, attempt to reload the ShardRegistry.
bool didReload = reload(opCtx);
- shard = _data.findShard(shardId);
+ shard = getShardNoReload(shardId);
// If we found the shard, return it.
if (shard) {
@@ -135,7 +131,7 @@ StatusWith<shared_ptr<Shard>> ShardRegistry::getShard(OperationContext* opCtx,
// If we did not perform the reload ourselves (because there was a concurrent reload), force a
// reload again to ensure that we have seen data at least as up to date as our first reload.
reload(opCtx);
- shard = _data.findShard(shardId);
+ shard = getShardNoReload(shardId);
if (shard) {
return shard;
@@ -144,35 +140,41 @@ StatusWith<shared_ptr<Shard>> ShardRegistry::getShard(OperationContext* opCtx,
return {ErrorCodes::ShardNotFound, str::stream() << "Shard " << shardId << " not found"};
}
-shared_ptr<Shard> ShardRegistry::getShardNoReload(const ShardId& shardId) {
+std::shared_ptr<Shard> ShardRegistry::getShardNoReload(const ShardId& shardId) {
+ stdx::lock_guard<Latch> lk(_mutex);
return _data.findShard(shardId);
}
-shared_ptr<Shard> ShardRegistry::getShardForHostNoReload(const HostAndPort& host) {
+std::shared_ptr<Shard> ShardRegistry::getShardForHostNoReload(const HostAndPort& host) {
+ stdx::lock_guard<Latch> lk(_mutex);
return _data.findByHostAndPort(host);
}
-shared_ptr<Shard> ShardRegistry::getConfigShard() const {
- auto shard = _data.getConfigShard();
- invariant(shard);
- return shard;
+std::shared_ptr<Shard> ShardRegistry::getConfigShard() const {
+ stdx::lock_guard<Latch> lk(_mutex);
+ invariant(_configShard);
+ return _configShard;
}
-unique_ptr<Shard> ShardRegistry::createConnection(const ConnectionString& connStr) const {
+std::unique_ptr<Shard> ShardRegistry::createConnection(const ConnectionString& connStr) const {
return _shardFactory->createUniqueShard(ShardId("<unnamed>"), connStr);
}
-shared_ptr<Shard> ShardRegistry::lookupRSName(const string& name) const {
+std::shared_ptr<Shard> ShardRegistry::lookupRSName(const std::string& name) const {
+ stdx::lock_guard<Latch> lk(_mutex);
return _data.findByRSName(name);
}
-void ShardRegistry::getAllShardIdsNoReload(vector<ShardId>* all) const {
+void ShardRegistry::getAllShardIdsNoReload(std::vector<ShardId>* all) const {
std::set<ShardId> seen;
- _data.getAllShardIds(seen);
+ {
+ stdx::lock_guard<Latch> lk(_mutex);
+ _data.getAllShardIds(seen);
+ }
all->assign(seen.begin(), seen.end());
}
-void ShardRegistry::getAllShardIds(OperationContext* opCtx, vector<ShardId>* all) {
+void ShardRegistry::getAllShardIds(OperationContext* opCtx, std::vector<ShardId>* all) {
getAllShardIdsNoReload(all);
if (all->empty()) {
bool didReload = reload(opCtx);
@@ -188,31 +190,44 @@ void ShardRegistry::getAllShardIds(OperationContext* opCtx, vector<ShardId>* all
int ShardRegistry::getNumShards() const {
std::set<ShardId> seen;
- _data.getAllShardIds(seen);
+ {
+ stdx::lock_guard<Latch> lk(_mutex);
+ _data.getAllShardIds(seen);
+ }
return seen.size();
}
-void ShardRegistry::toBSON(BSONObjBuilder* result) const {
- _data.toBSON(result);
-}
-
void ShardRegistry::updateReplSetHosts(const ConnectionString& newConnString) {
invariant(newConnString.type() == ConnectionString::SET ||
newConnString.type() == ConnectionString::CUSTOM); // For dbtests
// to prevent update config shard connection string during init
- stdx::unique_lock<Latch> lock(_reloadMutex);
- _data.rebuildShardIfExists(newConnString, _shardFactory.get());
+ stdx::unique_lock<Latch> lock(_mutex);
+
+ auto shard = _data.findByRSName(newConnString.getSetName());
+ if (!shard) {
+ return;
+ }
+
+ auto [data, updatedShard] =
+ ShardRegistryData::createFromExisting(_data, newConnString, _shardFactory.get());
+
+ if (updatedShard && updatedShard->isConfig()) {
+ _configShard = updatedShard;
+ }
+
+ _data = data;
}
void ShardRegistry::init() {
- stdx::unique_lock<Latch> reloadLock(_reloadMutex);
- invariant(_initConfigServerCS.isValid());
- auto configShard =
- _shardFactory->createShard(ShardRegistry::kConfigServerShardId, _initConfigServerCS);
- _data.addConfigShard(configShard);
- // set to invalid so it cant be started more than once.
- _initConfigServerCS = ConnectionString();
+ invariant(!_isInitialized.load());
+ {
+ stdx::unique_lock<Latch> lock(_mutex);
+ _configShard =
+ _shardFactory->createShard(ShardRegistry::kConfigServerShardId, _initConfigServerCS);
+ _data = ShardRegistryData::createWithConfigShardOnly(_configShard);
+ }
+ _isInitialized.store(true);
}
void ShardRegistry::startup(OperationContext* opCtx) {
@@ -291,8 +306,7 @@ void ShardRegistry::_internalReload(const CallbackArgs& cbArgs) {
}
bool ShardRegistry::isUp() const {
- stdx::unique_lock<Latch> reloadLock(_reloadMutex);
- return _isUp;
+ return _isUp.load();
}
bool ShardRegistry::reload(OperationContext* opCtx) {
@@ -335,17 +349,20 @@ bool ShardRegistry::reload(OperationContext* opCtx) {
_inReloadCV.notify_all();
});
- ShardRegistryData currData(opCtx, _shardFactory.get());
- currData.addConfigShard(_data.getConfigShard());
- _data.swapAndMerge(currData);
+ ShardRegistryData reloadedData =
+ ShardRegistryData::createFromCatalogClient(opCtx, _shardFactory.get(), getConfigShard());
- // Remove RSMs that are not in the catalog any more.
- std::set<ShardId> removedShardIds;
- currData.getAllShardIds(removedShardIds);
- _data.shardIdSetDifference(removedShardIds);
+ stdx::unique_lock<Latch> lock(_mutex);
+
+ auto [mergedData, removedShards] = ShardRegistryData::mergeExisting(_data, reloadedData);
+ _data = std::move(mergedData);
+
+ lock.unlock();
- for (auto& shardId : removedShardIds) {
- auto shard = currData.findByShardId(shardId);
+ // Remove RSMs that are not in the catalog any more.
+ for (auto& pair : removedShards) {
+ auto& shardId = pair.first;
+ auto& shard = pair.second;
invariant(shard);
auto name = shard->getConnString().getSetName();
@@ -359,14 +376,14 @@ bool ShardRegistry::reload(OperationContext* opCtx) {
nextReloadState = ReloadState::Idle;
// first successful reload means that registry is up
- _isUp = true;
+ _isUp.store(true);
return true;
}
void ShardRegistry::clearEntries() {
ShardRegistryData empty;
- empty.addConfigShard(_data.getConfigShard());
- _data.swap(empty);
+ stdx::lock_guard<Latch> lk(_mutex);
+ _data = empty;
}
void ShardRegistry::updateReplicaSetOnConfigServer(ServiceContext* serviceContext,
@@ -410,9 +427,38 @@ void ShardRegistry::updateReplicaSetOnConfigServer(ServiceContext* serviceContex
}
}
+void ShardRegistry::toBSON(BSONObjBuilder* result) const {
+ std::vector<std::shared_ptr<Shard>> shards;
+ {
+ stdx::lock_guard<Latch> lk(_mutex);
+ _data.getAllShards(shards);
+ }
+
+ std::sort(std::begin(shards),
+ std::end(shards),
+ [](std::shared_ptr<const Shard> lhs, std::shared_ptr<const Shard> rhs) {
+ return lhs->getId() < rhs->getId();
+ });
+
+ BSONObjBuilder mapBob(result->subobjStart("map"));
+ for (auto&& shard : shards) {
+ // Intentionally calling getConnString while not holding _mutex
+ // because it can take ReplicaSetMonitor::SetState::mutex if it's ShardRemote.
+ mapBob.append(shard->getId(), shard->getConnString().toString());
+ }
+}
+
////////////// ShardRegistryData //////////////////
-ShardRegistryData::ShardRegistryData(OperationContext* opCtx, ShardFactory* shardFactory) {
+ShardRegistryData ShardRegistryData::createWithConfigShardOnly(std::shared_ptr<Shard> configShard) {
+ ShardRegistryData data;
+ data._addShard(configShard, true);
+ return data;
+}
+
+ShardRegistryData ShardRegistryData::createFromCatalogClient(OperationContext* opCtx,
+ ShardFactory* shardFactory,
+ std::shared_ptr<Shard> configShard) {
auto const catalogClient = Grid::get(opCtx)->catalogClient();
auto readConcern = repl::ReadConcernLevel::kMajorityReadConcern;
@@ -459,6 +505,8 @@ ShardRegistryData::ShardRegistryData(OperationContext* opCtx, ShardFactory* shar
shardsInfo.push_back(std::make_tuple(shardType.getName(), shardHostStatus.getValue()));
}
+ ShardRegistryData data;
+ data._addShard(configShard, true);
for (auto& shardInfo : shardsInfo) {
if (std::get<0>(shardInfo) == "config") {
continue;
@@ -467,98 +515,88 @@ ShardRegistryData::ShardRegistryData(OperationContext* opCtx, ShardFactory* shar
auto shard = shardFactory->createShard(std::move(std::get<0>(shardInfo)),
std::move(std::get<1>(shardInfo)));
- _addShard(WithLock::withoutLock(), std::move(shard), false);
+ data._addShard(std::move(shard), false);
}
+ return data;
}
-void ShardRegistryData::swap(ShardRegistryData& other) {
- stdx::lock_guard<Latch> lk(_mutex);
- _shardIdLookup.swap(other._shardIdLookup);
- _rsLookup.swap(other._rsLookup);
- _hostLookup.swap(other._hostLookup);
- _connStringLookup.swap(other._connStringLookup);
- _configShard.swap(other._configShard);
-}
+std::pair<ShardRegistryData, ShardRegistryData::ShardMap> ShardRegistryData::mergeExisting(
+ const ShardRegistryData& alreadyCachedData, const ShardRegistryData& configServerData) {
+ ShardRegistryData mergedData(configServerData);
-void ShardRegistryData::swapAndMerge(ShardRegistryData& other) {
- stdx::lock_guard<Latch> lk(_mutex);
- _rsLookup.swap(other._rsLookup);
- _configShard.swap(other._configShard);
- _shardIdLookup.swap(other._shardIdLookup);
-
- for (auto it = other._connStringLookup.begin(); it != other._connStringLookup.end(); ++it) {
- auto res = _connStringLookup.find(it->first);
- if (res == _connStringLookup.end()) {
- _connStringLookup[it->first] = it->second;
- }
+ // For connstrings and hosts, prefer values from alreadyCachedData to whatever might have been
+ // fetched from the configsvrs.
+ for (auto it = alreadyCachedData._connStringLookup.begin();
+ it != alreadyCachedData._connStringLookup.end();
+ ++it) {
+ mergedData._connStringLookup[it->first] = it->second;
+ }
+ for (auto it = alreadyCachedData._hostLookup.begin(); it != alreadyCachedData._hostLookup.end();
+ ++it) {
+ mergedData._hostLookup[it->first] = it->second;
}
- for (auto it = other._hostLookup.begin(); it != other._hostLookup.end(); ++it) {
- auto res = _hostLookup.find(it->first);
- if (res == _hostLookup.end()) {
- _hostLookup[it->first] = it->second;
+ // Find the shards that are no longer present.
+ ShardMap removedShards;
+ for (auto i = alreadyCachedData._shardIdLookup.begin();
+ i != alreadyCachedData._shardIdLookup.end();
+ ++i) {
+ invariant(i->second);
+ if (mergedData._shardIdLookup.find(i->second->getId()) == mergedData._shardIdLookup.end()) {
+ removedShards[i->second->getId()] = i->second;
}
}
-}
-shared_ptr<Shard> ShardRegistryData::getConfigShard() const {
- stdx::lock_guard<Latch> lk(_mutex);
- return _configShard;
+ return {mergedData, removedShards};
}
-void ShardRegistryData::addConfigShard(std::shared_ptr<Shard> shard) {
- stdx::lock_guard<Latch> lk(_mutex);
- _configShard = shard;
- _addShard(lk, shard, true);
-}
+std::pair<ShardRegistryData, std::shared_ptr<Shard>> ShardRegistryData::createFromExisting(
+ const ShardRegistryData& existingData,
+ const ConnectionString& newConnString,
+ ShardFactory* shardFactory) {
+ ShardRegistryData data(existingData);
-shared_ptr<Shard> ShardRegistryData::findByRSName(const string& name) const {
- stdx::lock_guard<Latch> lk(_mutex);
- auto i = _rsLookup.find(name);
- return (i != _rsLookup.end()) ? i->second : nullptr;
-}
+ auto it = data._rsLookup.find(newConnString.getSetName());
+ if (it == data._rsLookup.end()) {
+ return {data, nullptr};
+ }
+ invariant(it->second);
+ auto updatedShard = shardFactory->createShard(it->second->getId(), newConnString);
+ data._addShard(updatedShard, true);
-shared_ptr<Shard> ShardRegistryData::findByHostAndPort(const HostAndPort& hostAndPort) const {
- stdx::lock_guard<Latch> lk(_mutex);
- return _findByHostAndPort(lk, hostAndPort);
+ return {data, updatedShard};
}
-shared_ptr<Shard> ShardRegistryData::findByShardId(const ShardId& shardId) const {
- stdx::lock_guard<Latch> lk(_mutex);
- return _findByShardId(lk, shardId);
+std::shared_ptr<Shard> ShardRegistryData::findByRSName(const std::string& name) const {
+ auto i = _rsLookup.find(name);
+ return (i != _rsLookup.end()) ? i->second : nullptr;
}
-shared_ptr<Shard> ShardRegistryData::_findByConnectionString(
- WithLock, const ConnectionString& connectionString) const {
+std::shared_ptr<Shard> ShardRegistryData::_findByConnectionString(
+ const ConnectionString& connectionString) const {
auto i = _connStringLookup.find(connectionString);
return (i != _connStringLookup.end()) ? i->second : nullptr;
}
-shared_ptr<Shard> ShardRegistryData::_findByHostAndPort(WithLock,
- const HostAndPort& hostAndPort) const {
+std::shared_ptr<Shard> ShardRegistryData::findByHostAndPort(const HostAndPort& hostAndPort) const {
auto i = _hostLookup.find(hostAndPort);
return (i != _hostLookup.end()) ? i->second : nullptr;
}
-shared_ptr<Shard> ShardRegistryData::_findByShardId(WithLock, ShardId const& shardId) const {
+std::shared_ptr<Shard> ShardRegistryData::_findByShardId(const ShardId& shardId) const {
auto i = _shardIdLookup.find(shardId);
return (i != _shardIdLookup.end()) ? i->second : nullptr;
}
-shared_ptr<Shard> ShardRegistryData::findShard(ShardId const& shardId) const {
- stdx::lock_guard<Latch> lk(_mutex);
- return _findShard(lk, shardId);
-}
-
-shared_ptr<Shard> ShardRegistryData::_findShard(WithLock lk, ShardId const& shardId) const {
- auto shard = _findByShardId(lk, shardId);
+std::shared_ptr<Shard> ShardRegistryData::findShard(const ShardId& shardId) const {
+ auto shard = _findByShardId(shardId);
if (shard) {
return shard;
}
StatusWith<ConnectionString> swConnString = ConnectionString::parse(shardId.toString());
if (swConnString.isOK()) {
- shard = _findByConnectionString(lk, swConnString.getValue());
+ shard = _findByConnectionString(swConnString.getValue());
if (shard) {
return shard;
}
@@ -566,7 +604,7 @@ shared_ptr<Shard> ShardRegistryData::_findShard(WithLock lk, ShardId const& shar
StatusWith<HostAndPort> swHostAndPort = HostAndPort::parse(shardId.toString());
if (swHostAndPort.isOK()) {
- shard = _findByHostAndPort(lk, swHostAndPort.getValue());
+ shard = findByHostAndPort(swHostAndPort.getValue());
if (shard) {
return shard;
}
@@ -575,32 +613,14 @@ shared_ptr<Shard> ShardRegistryData::_findShard(WithLock lk, ShardId const& shar
return nullptr;
}
-void ShardRegistryData::toBSON(BSONObjBuilder* result) const {
- std::vector<std::shared_ptr<Shard>> shards;
- {
- stdx::lock_guard<Latch> lk(_mutex);
- shards.reserve(_shardIdLookup.size());
- for (auto&& shard : _shardIdLookup) {
- shards.emplace_back(shard.second);
- }
- }
-
- std::sort(std::begin(shards),
- std::end(shards),
- [](const std::shared_ptr<Shard>& lhs, const std::shared_ptr<Shard>& rhs) {
- return lhs->getId() < rhs->getId();
- });
-
- BSONObjBuilder mapBob(result->subobjStart("map"));
- for (auto&& shard : shards) {
- // Intentionally calling getConnString while not holding ShardRegistryData::_mutex
- // because it can take ReplicaSetMonitor::SetState::mutex if it's ShardRemote.
- mapBob.append(shard->getId(), shard->getConnString().toString());
+void ShardRegistryData::getAllShards(std::vector<std::shared_ptr<Shard>>& result) const {
+ result.reserve(_shardIdLookup.size());
+ for (auto&& shard : _shardIdLookup) {
+ result.emplace_back(shard.second);
}
}
void ShardRegistryData::getAllShardIds(std::set<ShardId>& seen) const {
- stdx::lock_guard<Latch> lk(_mutex);
for (auto i = _shardIdLookup.begin(); i != _shardIdLookup.end(); ++i) {
const auto& s = i->second;
if (s->getId().toString() == "config") {
@@ -610,50 +630,13 @@ void ShardRegistryData::getAllShardIds(std::set<ShardId>& seen) const {
}
}
-void ShardRegistryData::shardIdSetDifference(std::set<ShardId>& diff) const {
- stdx::lock_guard<Latch> lk(_mutex);
- for (auto i = _shardIdLookup.begin(); i != _shardIdLookup.end(); ++i) {
- invariant(i->second);
- auto res = diff.find(i->second->getId());
- if (res != diff.end()) {
- diff.erase(res);
- }
- }
-}
-
-void ShardRegistryData::rebuildShardIfExists(const ConnectionString& newConnString,
- ShardFactory* factory) {
- stdx::unique_lock<Latch> updateConnStringLock(_mutex);
- auto it = _rsLookup.find(newConnString.getSetName());
- if (it == _rsLookup.end()) {
- return;
- }
-
- _rebuildShard(updateConnStringLock, newConnString, factory);
-}
-
-
-void ShardRegistryData::_rebuildShard(WithLock lk,
- ConnectionString const& newConnString,
- ShardFactory* factory) {
- auto it = _rsLookup.find(newConnString.getSetName());
- invariant(it->second);
- auto shard = factory->createShard(it->second->getId(), newConnString);
- _addShard(lk, shard, true);
- if (shard->isConfig()) {
- _configShard = shard;
- }
-}
-
-void ShardRegistryData::_addShard(WithLock lk,
- std::shared_ptr<Shard> const& shard,
- bool useOriginalCS) {
+void ShardRegistryData::_addShard(std::shared_ptr<Shard> shard, bool useOriginalCS) {
const ShardId shardId = shard->getId();
const ConnectionString connString =
useOriginalCS ? shard->originalConnString() : shard->getConnString();
- auto currentShard = _findShard(lk, shardId);
+ auto currentShard = findShard(shardId);
if (currentShard) {
auto oldConnString = currentShard->originalConnString();
diff --git a/src/mongo/s/client/shard_registry.h b/src/mongo/s/client/shard_registry.h
index c998a897d4f..4f68d1a8c4c 100644
--- a/src/mongo/s/client/shard_registry.h
+++ b/src/mongo/s/client/shard_registry.h
@@ -55,92 +55,98 @@ class ShardType;
class ShardRegistryData {
public:
+ using ShardMap = stdx::unordered_map<ShardId, std::shared_ptr<Shard>, ShardId::Hasher>;
+
/**
- * Reads shards docs from the catalog client and fills in maps.
+ * Creates a basic ShardRegistryData, that only contains the config shard. Needed during
+ * initialization, when the config servers are contacted for the first time (ie. the first time
+ * createFromCatalogClient() is called).
*/
- ShardRegistryData(OperationContext* opCtx, ShardFactory* shardFactory);
- ShardRegistryData() = default;
- ~ShardRegistryData() = default;
+ static ShardRegistryData createWithConfigShardOnly(std::shared_ptr<Shard> configShard);
- void swap(ShardRegistryData& other);
+ /**
+ * Reads shards docs from the catalog client and fills in maps.
+ */
+ static ShardRegistryData createFromCatalogClient(OperationContext* opCtx,
+ ShardFactory* shardFactory,
+ std::shared_ptr<Shard> configShard);
- /*
- * Swaps _shardIdLookup, _rsLookup, and _configShard with other. Merges _hostLookup and
- * _connStringLookup without overwriting existing entries in either map.
+ /**
+ * Merges alreadyCachedData and configServerData into a new ShardRegistryData.
+ *
+ * The merged data is the same as configServerData, except that for the host and connection
+ * string based lookups, any values from alreadyCachedData will take precedence over those from
+ * configServerData.
+ *
+ * Returns the merged data, as well as the shards that have been removed (ie. that are present
+ * in alreadyCachedData but not configServerData) as a mapping from ShardId to
+ * std::shared_ptr<Shard>.
*
* Called when reloading the shard registry. It is important to merge _hostLookup because
* reloading the shard registry can interleave with updates to the shard registry passed by the
* RSM.
*/
- void swapAndMerge(ShardRegistryData& other);
+ static std::pair<ShardRegistryData, ShardMap> mergeExisting(
+ const ShardRegistryData& alreadyCachedData, const ShardRegistryData& configServerData);
+
+ /**
+ * Create a duplicate of existingData, but additionally updates the shard for newConnString.
+ * Used when notified by the RSM of a new connection string from a shard.
+ */
+ static std::pair<ShardRegistryData, std::shared_ptr<Shard>> createFromExisting(
+ const ShardRegistryData& existingData,
+ const ConnectionString& newConnString,
+ ShardFactory* shardFactory);
/**
- * Returns a shared pointer the shard object with the given shard id.
+ * Returns the shard with the given shard id, connection string, or host and port.
*
* Callers might pass in the connection string or HostAndPort rather than ShardId, so this
* method will first look for the shard by ShardId, then connection string, then HostAndPort
* stopping once it finds the shard.
*/
- std::shared_ptr<Shard> findShard(ShardId const& shardId) const;
+ std::shared_ptr<Shard> findShard(const ShardId& shardId) const;
/**
- * Lookup shard by replica set name. Returns nullptr if the name can't be found.
+ * Returns the shard with the given replica set name, or nullptr if no such shard.
*/
- std::shared_ptr<Shard> findByRSName(const std::string& rsName) const;
+ std::shared_ptr<Shard> findByRSName(const std::string& name) const;
/**
- * Returns a shared pointer to the shard object with the given shard id.
- */
- std::shared_ptr<Shard> findByShardId(const ShardId&) const;
-
- /**
- * Finds the shard that the mongod listening at this HostAndPort is a member of.
+ * Returns the shard which contains a mongod with the given host and port, or nullptr if no such
+ * shard.
*/
std::shared_ptr<Shard> findByHostAndPort(const HostAndPort&) const;
/**
- * Returns config shard.
+ * Returns the set of all known shard ids.
*/
- std::shared_ptr<Shard> getConfigShard() const;
+ void getAllShardIds(std::set<ShardId>& result) const;
/**
- * Adds config shard.
+ * Returns the set of all known shard objects.
*/
- void addConfigShard(std::shared_ptr<Shard>);
-
- void getAllShardIds(std::set<ShardId>& result) const;
+ void getAllShards(std::vector<std::shared_ptr<Shard>>& result) const;
+private:
/**
- * Erases known by this shardIds from the diff argument.
+ * Returns the shard with the given shard id, or nullptr if no such shard.
*/
- void shardIdSetDifference(std::set<ShardId>& diff) const;
- void toBSON(BSONObjBuilder* result) const;
+ std::shared_ptr<Shard> _findByShardId(const ShardId&) const;
+
/**
- * If the shard with same replica set name as in the newConnString already exists then replace
- * it with the shard built for the newConnString.
+ * Returns the shard with the given connection string, or nullptr if no such shard.
*/
- void rebuildShardIfExists(const ConnectionString& newConnString, ShardFactory* factory);
+ std::shared_ptr<Shard> _findByConnectionString(const ConnectionString& connectionString) const;
-private:
/**
- * Creates a shard based on the specified information and puts it into the lookup maps.
- * if useOriginalCS = true it will use the ConnectionSring used for shard creation to update
+ * Puts the given shard object into the lookup maps.
+ *
+ * If useOriginalCS = true it will use the ConnectionSring used for shard creation to update
* lookup maps. Otherwise the current connection string from the Shard's RemoteCommandTargeter
- * will be used.
+ * will be used. Only called during ShardRegistryData construction.
*/
- void _addShard(WithLock, std::shared_ptr<Shard> const&, bool useOriginalCS);
- auto _findByShardId(WithLock, ShardId const&) const -> std::shared_ptr<Shard>;
- auto _findByHostAndPort(WithLock, const HostAndPort& hostAndPort) const
- -> std::shared_ptr<Shard>;
- auto _findByConnectionString(WithLock, const ConnectionString& connectionString) const
- -> std::shared_ptr<Shard>;
- auto _findShard(WithLock lk, ShardId const& shardId) const -> std::shared_ptr<Shard>;
- void _rebuildShard(WithLock, ConnectionString const& newConnString, ShardFactory* factory);
-
- // Protects the lookup maps below.
- mutable Mutex _mutex = MONGO_MAKE_LATCH("ShardRegistryData::_mutex");
-
- using ShardMap = stdx::unordered_map<ShardId, std::shared_ptr<Shard>, ShardId::Hasher>;
+ void _addShard(std::shared_ptr<Shard>, bool useOriginalCS);
// Map of shardName -> Shard
ShardMap _shardIdLookup;
@@ -153,9 +159,6 @@ private:
// Map of connection string to Shard
std::map<ConnectionString, std::shared_ptr<Shard>> _connStringLookup;
-
- // store configShard separately to always have a reference
- std::shared_ptr<Shard> _configShard;
};
/**
@@ -193,6 +196,7 @@ public:
std::vector<ShardRemovalHook> shardRemovalHooks = {});
~ShardRegistry();
+
/**
* Starts ReplicaSetMonitor by adding a config shard.
*/
@@ -309,27 +313,45 @@ public:
const ConnectionString& connStr) noexcept;
private:
+ void _internalReload(const executor::TaskExecutor::CallbackArgs& cbArgs);
+
/**
* Factory to create shards. Never changed after startup so safe to access outside of _mutex.
*/
const std::unique_ptr<ShardFactory> _shardFactory;
/**
- * Specified in the ShardRegistry c-tor. It's used only in startup() to initialize the config
- * shard
+ * Specified in the ShardRegistry c-tor. It's used only in init() to initialize the config
+ * shard.
*/
- ConnectionString _initConfigServerCS;
+ const ConnectionString _initConfigServerCS;
+
+ AtomicWord<bool> _isInitialized{false};
/**
* A list of callbacks to be called asynchronously when it has been discovered that a shard was
* removed.
*/
- std::vector<ShardRemovalHook> _shardRemovalHooks;
+ const std::vector<ShardRemovalHook> _shardRemovalHooks;
+
+ // Protects the ShardRegistryData lookup maps in _data, and _configShard.
+ mutable Mutex _mutex = MONGO_MAKE_LATCH("ShardRegistry::_mutex");
- void _internalReload(const executor::TaskExecutor::CallbackArgs& cbArgs);
ShardRegistryData _data;
- // Protects the _reloadState and _initConfigServerCS during startup.
+ // Store a separate reference to the configShard.
+ std::shared_ptr<Shard> _configShard;
+
+ // Executor for reloading.
+ std::unique_ptr<executor::TaskExecutor> _executor{};
+
+ // The ShardRegistry is "up" once there has been a successful refresh.
+ AtomicWord<bool> _isUp{false};
+
+ // Set to true in shutdown call to prevent calling it twice.
+ AtomicWord<bool> _isShutdown{false};
+
+ // Protects the _reloadState during startup and refresh.
mutable Mutex _reloadMutex = MONGO_MAKE_LATCH("ShardRegistry::_reloadMutex");
stdx::condition_variable _inReloadCV;
@@ -340,13 +362,6 @@ private:
};
ReloadState _reloadState{ReloadState::Idle};
- bool _isUp{false};
-
- // Executor for reloading.
- std::unique_ptr<executor::TaskExecutor> _executor{};
-
- // Set to true in shutdown call to prevent calling it twice.
- bool _isShutdown{false};
};
} // namespace mongo
diff --git a/src/mongo/s/client/shard_registry_data_test.cpp b/src/mongo/s/client/shard_registry_data_test.cpp
deleted file mode 100644
index 2fcfed25e63..00000000000
--- a/src/mongo/s/client/shard_registry_data_test.cpp
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- * Copyright (C) 2018-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/bson/json.h"
-#include "mongo/client/remote_command_targeter_factory_mock.h"
-#include "mongo/client/remote_command_targeter_mock.h"
-#include "mongo/s/client/shard_factory.h"
-#include "mongo/s/client/shard_registry.h"
-#include "mongo/s/client/shard_remote.h"
-#include "mongo/unittest/unittest.h"
-#include "mongo/util/time_support.h"
-
-namespace mongo {
-namespace {
-
-class ShardRegistryDataTest : public mongo::unittest::Test {
-public:
- ShardFactory* shardFactory() {
- return _shardFactory.get();
- }
-
-private:
- void setUp() override {
- auto targeterFactory = std::make_unique<RemoteCommandTargeterFactoryMock>();
- auto targeterFactoryPtr = targeterFactory.get();
-
- ShardFactory::BuilderCallable setBuilder =
- [targeterFactoryPtr](const ShardId& shardId, const ConnectionString& connStr) {
- return std::make_unique<ShardRemote>(
- shardId, connStr, targeterFactoryPtr->create(connStr));
- };
-
- ShardFactory::BuilderCallable masterBuilder =
- [targeterFactoryPtr](const ShardId& shardId, const ConnectionString& connStr) {
- return std::make_unique<ShardRemote>(
- shardId, connStr, targeterFactoryPtr->create(connStr));
- };
-
- ShardFactory::BuildersMap buildersMap{
- {ConnectionString::SET, std::move(setBuilder)},
- {ConnectionString::MASTER, std::move(masterBuilder)},
- };
-
- _shardFactory =
- std::make_unique<ShardFactory>(std::move(buildersMap), std::move(targeterFactory));
- }
-
- void tearDown() override {}
-
- std::unique_ptr<ShardFactory> _shardFactory;
-};
-
-TEST_F(ShardRegistryDataTest, AddConfigShard) {
- ConnectionString configCS("rs/dummy1:1234,dummy2:2345,dummy3:3456", ConnectionString::SET);
- auto configShard = shardFactory()->createShard(ShardRegistry::kConfigServerShardId, configCS);
-
- ShardRegistryData data;
- data.addConfigShard(configShard);
-
- ASSERT_EQUALS(configCS.toString(), data.getConfigShard()->originalConnString().toString());
-}
-
-} // namespace
-} // namespace mongo