diff options
author | Misha Tyulenev <misha@mongodb.com> | 2016-04-14 10:21:09 -0400 |
---|---|---|
committer | Misha Tyulenev <misha@mongodb.com> | 2016-04-14 10:21:51 -0400 |
commit | 0d30a89c8ce925b4176389ecc2bb59e09afb24f4 (patch) | |
tree | 271357f952f4aa20deebd6bdf9b006e146c372f8 /src/mongo/s/client/shard_registry.cpp | |
parent | d0717b4c75cd931abdf7d09d1cab3c8dc554bdaf (diff) | |
download | mongo-0d30a89c8ce925b4176389ecc2bb59e09afb24f4.tar.gz |
SERVER-23498 Add ShardFactory
Diffstat (limited to 'src/mongo/s/client/shard_registry.cpp')
-rw-r--r-- | src/mongo/s/client/shard_registry.cpp | 67 |
1 files changed, 22 insertions, 45 deletions
diff --git a/src/mongo/s/client/shard_registry.cpp b/src/mongo/s/client/shard_registry.cpp index 2735e9b59c4..b418cc7e59b 100644 --- a/src/mongo/s/client/shard_registry.cpp +++ b/src/mongo/s/client/shard_registry.cpp @@ -39,7 +39,6 @@ #include "mongo/client/connection_string.h" #include "mongo/client/query_fetcher.h" #include "mongo/client/remote_command_targeter.h" -#include "mongo/client/remote_command_targeter_factory.h" #include "mongo/client/replica_set_monitor.h" #include "mongo/db/client.h" #include "mongo/db/query/lite_parsed_query.h" @@ -54,6 +53,7 @@ #include "mongo/s/client/shard.h" #include "mongo/s/client/shard_connection.h" #include "mongo/s/grid.h" +#include "mongo/s/client/shard_factory.h" #include "mongo/s/write_ops/wc_error_detail.h" #include "mongo/stdx/memory.h" #include "mongo/stdx/mutex.h" @@ -169,12 +169,12 @@ const ShardRegistry::ErrorCodesSet ShardRegistry::kWriteConcernErrors{ ErrorCodes::UnknownReplWriteConcern, ErrorCodes::CannotSatisfyWriteConcern}; -ShardRegistry::ShardRegistry(std::unique_ptr<RemoteCommandTargeterFactory> targeterFactory, +ShardRegistry::ShardRegistry(std::unique_ptr<ShardFactory> shardFactory, std::unique_ptr<executor::TaskExecutorPool> executorPool, executor::NetworkInterface* network, std::unique_ptr<executor::TaskExecutor> addShardExecutor, ConnectionString configServerCS) - : _targeterFactory(std::move(targeterFactory)), + : _shardFactory(std::move(shardFactory)), _executorPool(std::move(executorPool)), _network(network), _executorForAddShard(std::move(addShardExecutor)) { @@ -260,8 +260,7 @@ bool ShardRegistry::reload(OperationContext* txn) { // Ensure targeter exists for all shards and take shard connection string from the targeter. // Do this before re-taking the mutex to avoid deadlock with the ReplicaSetMonitor updating // hosts for a given shard. - std::vector<std::tuple<std::string, ConnectionString, std::unique_ptr<RemoteCommandTargeter>>> - shardsInfo; + std::vector<std::tuple<std::string, ConnectionString>> shardsInfo; for (const auto& shardType : shards) { // This validation should ideally go inside the ShardType::validate call. However, doing // it there would prevent us from loading previously faulty shard hosts, which might have @@ -272,10 +271,7 @@ bool ShardRegistry::reload(OperationContext* txn) { continue; } - auto targeter = _targeterFactory->create(shardHostStatus.getValue()); - - shardsInfo.push_back(std::make_tuple( - shardType.getName(), targeter->connectionString(), std::move(targeter))); + shardsInfo.push_back(std::make_tuple(shardType.getName(), shardHostStatus.getValue())); } lk.lock(); @@ -293,9 +289,7 @@ bool ShardRegistry::reload(OperationContext* txn) { continue; } - _addShard_inlock(std::move(std::get<0>(shardInfo)), - std::move(std::get<1>(shardInfo)), - std::move(std::get<2>(shardInfo))); + _addShard_inlock(std::move(std::get<0>(shardInfo)), std::move(std::get<1>(shardInfo))); } nextReloadState = ReloadState::Idle; @@ -342,7 +336,7 @@ shared_ptr<Shard> ShardRegistry::getConfigShard() { } unique_ptr<Shard> ShardRegistry::createConnection(const ConnectionString& connStr) const { - return stdx::make_unique<Shard>("<unnamed>", connStr, _targeterFactory->create(connStr)); + return _shardFactory->createUniqueShard("<unnamed>", connStr, false); } shared_ptr<Shard> ShardRegistry::lookupRSName(const string& name) const { @@ -429,7 +423,7 @@ void ShardRegistry::appendConnectionStats(executor::ConnectionPoolStats* stats) } void ShardRegistry::_addConfigShard_inlock() { - _addShard_inlock("config", _configServerCS, _targeterFactory->create(_configServerCS)); + _addShard_inlock("config", _configServerCS); } void ShardRegistry::updateReplSetHosts(const ConnectionString& newConnString) { @@ -444,19 +438,17 @@ void ShardRegistry::updateReplSetHosts(const ConnectionString& newConnString) { if (shard->isConfig()) { _updateConfigServerConnectionString_inlock(newConnString); } else { - _addShard_inlock(shard->getId(), newConnString, _targeterFactory->create(newConnString)); + _addShard_inlock(shard->getId(), newConnString); } } -void ShardRegistry::_addShard_inlock(const ShardId& shardId, - const ConnectionString& connString, - std::unique_ptr<RemoteCommandTargeter> targeter) { - auto originalShard = _findUsingLookUp_inlock(shardId); - if (originalShard) { - auto oldConnString = originalShard->getConnString(); +void ShardRegistry::_addShard_inlock(const ShardId& shardId, const ConnectionString& connString) { + auto currentShard = _findUsingLookUp_inlock(shardId); + if (currentShard) { + auto oldConnString = currentShard->originalConnString(); if (oldConnString.toString() != connString.toString()) { - log() << "Updating ShardRegistry connection string for shard " << originalShard->getId() + log() << "Updating ShardRegistry connection string for shard " << currentShard->getId() << " from: " << oldConnString.toString() << " to: " << connString.toString(); } @@ -466,7 +458,9 @@ void ShardRegistry::_addShard_inlock(const ShardId& shardId, } } - shared_ptr<Shard> shard = std::make_shared<Shard>(shardId, connString, std::move(targeter)); + // TODO: the third argument should pass the bool that will instruct factory to create either + // local or remote shard. + auto shard = _shardFactory->createShard(shardId, connString, false); _lookup[shard->getId()] = shard; @@ -513,7 +507,8 @@ StatusWith<ShardRegistry::QueryResponse> ShardRegistry::_exhaustiveFindOnConfig( const BSONObj& query, const BSONObj& sort, boost::optional<long long> limit) { - const auto targeter = getConfigShard()->getTargeter(); + const auto configShard = getConfigShard(); + const auto targeter = configShard->getTargeter(); const auto host = targeter->findHost(readPref, RemoteCommandTargeter::selectFindHostMaxWaitTime(txn)); if (!host.isOK()) { @@ -606,7 +601,7 @@ StatusWith<ShardRegistry::QueryResponse> ShardRegistry::_exhaustiveFindOnConfig( fetcher.wait(); - updateReplSetMonitor(targeter, host.getValue(), status); + configShard->updateReplSetMonitor(host.getValue(), status); if (!status.isOK()) { if (status.compareCode(ErrorCodes::ExceededTimeLimit)) { @@ -809,7 +804,7 @@ StatusWith<ShardRegistry::CommandResponse> ShardRegistry::_runCommandWithMetadat executor->wait(callStatus.getValue()); if (!responseStatus.isOK()) { - updateReplSetMonitor(targeter, host.getValue(), responseStatus.getStatus()); + shard->updateReplSetMonitor(host.getValue(), responseStatus.getStatus()); if (responseStatus.getStatus().compareCode(ErrorCodes::ExceededTimeLimit)) { LOG(0) << "Operation timed out with status " << responseStatus.getStatus(); } @@ -819,7 +814,7 @@ StatusWith<ShardRegistry::CommandResponse> ShardRegistry::_runCommandWithMetadat auto response = std::move(responseStatus.getValue()); Status commandSpecificStatus = getStatusFromCommandResult(response.data); - updateReplSetMonitor(targeter, host.getValue(), commandSpecificStatus); + shard->updateReplSetMonitor(host.getValue(), commandSpecificStatus); CommandResponse cmdResponse; cmdResponse.response = response.data.getOwned(); @@ -837,22 +832,4 @@ StatusWith<ShardRegistry::CommandResponse> ShardRegistry::_runCommandWithMetadat return StatusWith<CommandResponse>(std::move(cmdResponse)); } -void ShardRegistry::updateReplSetMonitor(const std::shared_ptr<RemoteCommandTargeter>& targeter, - const HostAndPort& remoteHost, - const Status& remoteCommandStatus) { - if (remoteCommandStatus.isOK()) - return; - - if (ErrorCodes::isNotMasterError(remoteCommandStatus.code()) || - (remoteCommandStatus == ErrorCodes::InterruptedDueToReplStateChange)) { - targeter->markHostNotMaster(remoteHost); - } else if (ErrorCodes::isNetworkError(remoteCommandStatus.code())) { - targeter->markHostUnreachable(remoteHost); - } else if (remoteCommandStatus == ErrorCodes::NotMasterOrSecondary) { - targeter->markHostUnreachable(remoteHost); - } else if (remoteCommandStatus == ErrorCodes::ExceededTimeLimit) { - targeter->markHostUnreachable(remoteHost); - } -} - } // namespace mongo |