summaryrefslogtreecommitdiff
path: root/src/mongo/s/client/shard_registry.cpp
diff options
context:
space:
mode:
authorMisha Tyulenev <misha@mongodb.com>2016-04-14 10:21:09 -0400
committerMisha Tyulenev <misha@mongodb.com>2016-04-14 10:21:51 -0400
commit0d30a89c8ce925b4176389ecc2bb59e09afb24f4 (patch)
tree271357f952f4aa20deebd6bdf9b006e146c372f8 /src/mongo/s/client/shard_registry.cpp
parentd0717b4c75cd931abdf7d09d1cab3c8dc554bdaf (diff)
downloadmongo-0d30a89c8ce925b4176389ecc2bb59e09afb24f4.tar.gz
SERVER-23498 Add ShardFactory
Diffstat (limited to 'src/mongo/s/client/shard_registry.cpp')
-rw-r--r--src/mongo/s/client/shard_registry.cpp67
1 files changed, 22 insertions, 45 deletions
diff --git a/src/mongo/s/client/shard_registry.cpp b/src/mongo/s/client/shard_registry.cpp
index 2735e9b59c4..b418cc7e59b 100644
--- a/src/mongo/s/client/shard_registry.cpp
+++ b/src/mongo/s/client/shard_registry.cpp
@@ -39,7 +39,6 @@
#include "mongo/client/connection_string.h"
#include "mongo/client/query_fetcher.h"
#include "mongo/client/remote_command_targeter.h"
-#include "mongo/client/remote_command_targeter_factory.h"
#include "mongo/client/replica_set_monitor.h"
#include "mongo/db/client.h"
#include "mongo/db/query/lite_parsed_query.h"
@@ -54,6 +53,7 @@
#include "mongo/s/client/shard.h"
#include "mongo/s/client/shard_connection.h"
#include "mongo/s/grid.h"
+#include "mongo/s/client/shard_factory.h"
#include "mongo/s/write_ops/wc_error_detail.h"
#include "mongo/stdx/memory.h"
#include "mongo/stdx/mutex.h"
@@ -169,12 +169,12 @@ const ShardRegistry::ErrorCodesSet ShardRegistry::kWriteConcernErrors{
ErrorCodes::UnknownReplWriteConcern,
ErrorCodes::CannotSatisfyWriteConcern};
-ShardRegistry::ShardRegistry(std::unique_ptr<RemoteCommandTargeterFactory> targeterFactory,
+ShardRegistry::ShardRegistry(std::unique_ptr<ShardFactory> shardFactory,
std::unique_ptr<executor::TaskExecutorPool> executorPool,
executor::NetworkInterface* network,
std::unique_ptr<executor::TaskExecutor> addShardExecutor,
ConnectionString configServerCS)
- : _targeterFactory(std::move(targeterFactory)),
+ : _shardFactory(std::move(shardFactory)),
_executorPool(std::move(executorPool)),
_network(network),
_executorForAddShard(std::move(addShardExecutor)) {
@@ -260,8 +260,7 @@ bool ShardRegistry::reload(OperationContext* txn) {
// Ensure targeter exists for all shards and take shard connection string from the targeter.
// Do this before re-taking the mutex to avoid deadlock with the ReplicaSetMonitor updating
// hosts for a given shard.
- std::vector<std::tuple<std::string, ConnectionString, std::unique_ptr<RemoteCommandTargeter>>>
- shardsInfo;
+ std::vector<std::tuple<std::string, ConnectionString>> shardsInfo;
for (const auto& shardType : shards) {
// This validation should ideally go inside the ShardType::validate call. However, doing
// it there would prevent us from loading previously faulty shard hosts, which might have
@@ -272,10 +271,7 @@ bool ShardRegistry::reload(OperationContext* txn) {
continue;
}
- auto targeter = _targeterFactory->create(shardHostStatus.getValue());
-
- shardsInfo.push_back(std::make_tuple(
- shardType.getName(), targeter->connectionString(), std::move(targeter)));
+ shardsInfo.push_back(std::make_tuple(shardType.getName(), shardHostStatus.getValue()));
}
lk.lock();
@@ -293,9 +289,7 @@ bool ShardRegistry::reload(OperationContext* txn) {
continue;
}
- _addShard_inlock(std::move(std::get<0>(shardInfo)),
- std::move(std::get<1>(shardInfo)),
- std::move(std::get<2>(shardInfo)));
+ _addShard_inlock(std::move(std::get<0>(shardInfo)), std::move(std::get<1>(shardInfo)));
}
nextReloadState = ReloadState::Idle;
@@ -342,7 +336,7 @@ shared_ptr<Shard> ShardRegistry::getConfigShard() {
}
unique_ptr<Shard> ShardRegistry::createConnection(const ConnectionString& connStr) const {
- return stdx::make_unique<Shard>("<unnamed>", connStr, _targeterFactory->create(connStr));
+ return _shardFactory->createUniqueShard("<unnamed>", connStr, false);
}
shared_ptr<Shard> ShardRegistry::lookupRSName(const string& name) const {
@@ -429,7 +423,7 @@ void ShardRegistry::appendConnectionStats(executor::ConnectionPoolStats* stats)
}
void ShardRegistry::_addConfigShard_inlock() {
- _addShard_inlock("config", _configServerCS, _targeterFactory->create(_configServerCS));
+ _addShard_inlock("config", _configServerCS);
}
void ShardRegistry::updateReplSetHosts(const ConnectionString& newConnString) {
@@ -444,19 +438,17 @@ void ShardRegistry::updateReplSetHosts(const ConnectionString& newConnString) {
if (shard->isConfig()) {
_updateConfigServerConnectionString_inlock(newConnString);
} else {
- _addShard_inlock(shard->getId(), newConnString, _targeterFactory->create(newConnString));
+ _addShard_inlock(shard->getId(), newConnString);
}
}
-void ShardRegistry::_addShard_inlock(const ShardId& shardId,
- const ConnectionString& connString,
- std::unique_ptr<RemoteCommandTargeter> targeter) {
- auto originalShard = _findUsingLookUp_inlock(shardId);
- if (originalShard) {
- auto oldConnString = originalShard->getConnString();
+void ShardRegistry::_addShard_inlock(const ShardId& shardId, const ConnectionString& connString) {
+ auto currentShard = _findUsingLookUp_inlock(shardId);
+ if (currentShard) {
+ auto oldConnString = currentShard->originalConnString();
if (oldConnString.toString() != connString.toString()) {
- log() << "Updating ShardRegistry connection string for shard " << originalShard->getId()
+ log() << "Updating ShardRegistry connection string for shard " << currentShard->getId()
<< " from: " << oldConnString.toString() << " to: " << connString.toString();
}
@@ -466,7 +458,9 @@ void ShardRegistry::_addShard_inlock(const ShardId& shardId,
}
}
- shared_ptr<Shard> shard = std::make_shared<Shard>(shardId, connString, std::move(targeter));
+ // TODO: the third argument should pass the bool that will instruct factory to create either
+ // local or remote shard.
+ auto shard = _shardFactory->createShard(shardId, connString, false);
_lookup[shard->getId()] = shard;
@@ -513,7 +507,8 @@ StatusWith<ShardRegistry::QueryResponse> ShardRegistry::_exhaustiveFindOnConfig(
const BSONObj& query,
const BSONObj& sort,
boost::optional<long long> limit) {
- const auto targeter = getConfigShard()->getTargeter();
+ const auto configShard = getConfigShard();
+ const auto targeter = configShard->getTargeter();
const auto host =
targeter->findHost(readPref, RemoteCommandTargeter::selectFindHostMaxWaitTime(txn));
if (!host.isOK()) {
@@ -606,7 +601,7 @@ StatusWith<ShardRegistry::QueryResponse> ShardRegistry::_exhaustiveFindOnConfig(
fetcher.wait();
- updateReplSetMonitor(targeter, host.getValue(), status);
+ configShard->updateReplSetMonitor(host.getValue(), status);
if (!status.isOK()) {
if (status.compareCode(ErrorCodes::ExceededTimeLimit)) {
@@ -809,7 +804,7 @@ StatusWith<ShardRegistry::CommandResponse> ShardRegistry::_runCommandWithMetadat
executor->wait(callStatus.getValue());
if (!responseStatus.isOK()) {
- updateReplSetMonitor(targeter, host.getValue(), responseStatus.getStatus());
+ shard->updateReplSetMonitor(host.getValue(), responseStatus.getStatus());
if (responseStatus.getStatus().compareCode(ErrorCodes::ExceededTimeLimit)) {
LOG(0) << "Operation timed out with status " << responseStatus.getStatus();
}
@@ -819,7 +814,7 @@ StatusWith<ShardRegistry::CommandResponse> ShardRegistry::_runCommandWithMetadat
auto response = std::move(responseStatus.getValue());
Status commandSpecificStatus = getStatusFromCommandResult(response.data);
- updateReplSetMonitor(targeter, host.getValue(), commandSpecificStatus);
+ shard->updateReplSetMonitor(host.getValue(), commandSpecificStatus);
CommandResponse cmdResponse;
cmdResponse.response = response.data.getOwned();
@@ -837,22 +832,4 @@ StatusWith<ShardRegistry::CommandResponse> ShardRegistry::_runCommandWithMetadat
return StatusWith<CommandResponse>(std::move(cmdResponse));
}
-void ShardRegistry::updateReplSetMonitor(const std::shared_ptr<RemoteCommandTargeter>& targeter,
- const HostAndPort& remoteHost,
- const Status& remoteCommandStatus) {
- if (remoteCommandStatus.isOK())
- return;
-
- if (ErrorCodes::isNotMasterError(remoteCommandStatus.code()) ||
- (remoteCommandStatus == ErrorCodes::InterruptedDueToReplStateChange)) {
- targeter->markHostNotMaster(remoteHost);
- } else if (ErrorCodes::isNetworkError(remoteCommandStatus.code())) {
- targeter->markHostUnreachable(remoteHost);
- } else if (remoteCommandStatus == ErrorCodes::NotMasterOrSecondary) {
- targeter->markHostUnreachable(remoteHost);
- } else if (remoteCommandStatus == ErrorCodes::ExceededTimeLimit) {
- targeter->markHostUnreachable(remoteHost);
- }
-}
-
} // namespace mongo