/** * Copyright (C) 2015 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding #include "mongo/platform/basic.h" #include "mongo/s/client/shard_registry.h" #include #include "mongo/bson/bsonobj.h" #include "mongo/bson/util/bson_extract.h" #include "mongo/client/connection_string.h" #include "mongo/client/query_fetcher.h" #include "mongo/client/remote_command_targeter.h" #include "mongo/client/remote_command_targeter_factory.h" #include "mongo/client/replica_set_monitor.h" #include "mongo/db/client.h" #include "mongo/db/query/lite_parsed_query.h" #include "mongo/executor/connection_pool_stats.h" #include "mongo/executor/task_executor.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/rpc/metadata/config_server_metadata.h" #include "mongo/rpc/metadata/repl_set_metadata.h" #include "mongo/rpc/metadata/server_selection_metadata.h" #include "mongo/s/catalog/catalog_manager.h" #include "mongo/s/catalog/type_shard.h" #include "mongo/s/client/shard.h" #include "mongo/s/client/shard_connection.h" #include "mongo/s/grid.h" #include "mongo/s/write_ops/wc_error_detail.h" #include "mongo/stdx/memory.h" #include "mongo/stdx/mutex.h" #include "mongo/util/log.h" #include "mongo/util/map_util.h" #include "mongo/util/mongoutils/str.h" #include "mongo/util/scopeguard.h" #include "mongo/util/time_support.h" namespace mongo { using std::shared_ptr; using std::set; using std::string; using std::unique_ptr; using std::vector; using executor::TaskExecutor; using RemoteCommandCallbackArgs = TaskExecutor::RemoteCommandCallbackArgs; using repl::OpTime; namespace { const char kCmdResponseWriteConcernField[] = "writeConcernError"; const Seconds kConfigCommandTimeout{30}; const int kOnErrorNumRetries = 3; const BSONObj kReplMetadata(BSON(rpc::kReplSetMetadataFieldName << 1)); const BSONObj kSecondaryOkMetadata{rpc::ServerSelectionMetadata(true, boost::none).toBSON()}; const BSONObj kReplSecondaryOkMetadata{[] { BSONObjBuilder o; o.appendElements(kSecondaryOkMetadata); o.appendElements(kReplMetadata); return o.obj(); }()}; BSONObj appendMaxTimeToCmdObj(long long maxTimeMicros, const BSONObj& cmdObj) { Seconds maxTime = kConfigCommandTimeout; Microseconds remainingTxnMaxTime(maxTimeMicros); bool hasTxnMaxTime(remainingTxnMaxTime != Microseconds::zero()); bool hasUserMaxTime = !cmdObj[LiteParsedQuery::cmdOptionMaxTimeMS].eoo(); if (hasTxnMaxTime) { maxTime = duration_cast(remainingTxnMaxTime); } else if (hasUserMaxTime) { return cmdObj; } BSONObjBuilder updatedCmdBuilder; if (hasTxnMaxTime && hasUserMaxTime) { // Need to remove user provided maxTimeMS. BSONObjIterator cmdObjIter(cmdObj); const char* maxTimeFieldName = LiteParsedQuery::cmdOptionMaxTimeMS; while (cmdObjIter.more()) { BSONElement e = cmdObjIter.next(); if (str::equals(e.fieldName(), maxTimeFieldName)) { continue; } updatedCmdBuilder.append(e); } } else { updatedCmdBuilder.appendElements(cmdObj); } updatedCmdBuilder.append(LiteParsedQuery::cmdOptionMaxTimeMS, durationCount(maxTime)); return updatedCmdBuilder.obj(); } Status checkForWriteConcernError(const BSONObj& obj) { BSONElement wcErrorElem; Status status = bsonExtractTypedField(obj, kCmdResponseWriteConcernField, Object, &wcErrorElem); if (status.isOK()) { BSONObj wcErrObj(wcErrorElem.Obj()); WCErrorDetail wcError; string wcErrorParseMsg; if (!wcError.parseBSON(wcErrObj, &wcErrorParseMsg)) { return Status(ErrorCodes::UnsupportedFormat, str::stream() << "Failed to parse write concern section due to " << wcErrorParseMsg); } else { return Status(ErrorCodes::WriteConcernFailed, wcError.toString()); } } else if (status == ErrorCodes::NoSuchKey) { return Status::OK(); } return status; } } // unnamed namespace const ShardRegistry::ErrorCodesSet ShardRegistry::kNotMasterErrors{ErrorCodes::NotMaster, ErrorCodes::NotMasterNoSlaveOk}; const ShardRegistry::ErrorCodesSet ShardRegistry::kAllRetriableErrors{ ErrorCodes::NotMaster, ErrorCodes::NotMasterNoSlaveOk, ErrorCodes::NotMasterOrSecondary, // If write concern failed to be satisfied on the remote server, this most probably means that // some of the secondary nodes were unreachable or otherwise unresponsive, so the call is safe // to be retried if idempotency can be guaranteed. ErrorCodes::WriteConcernFailed, ErrorCodes::HostUnreachable, ErrorCodes::HostNotFound, ErrorCodes::NetworkTimeout, ErrorCodes::InterruptedDueToReplStateChange}; ShardRegistry::ShardRegistry(std::unique_ptr targeterFactory, std::unique_ptr executorPool, executor::NetworkInterface* network, std::unique_ptr addShardExecutor, ConnectionString configServerCS) : _targeterFactory(std::move(targeterFactory)), _executorPool(std::move(executorPool)), _network(network), _executorForAddShard(std::move(addShardExecutor)) { updateConfigServerConnectionString(configServerCS); } ShardRegistry::~ShardRegistry() = default; ConnectionString ShardRegistry::getConfigServerConnectionString() const { stdx::lock_guard lk(_mutex); return _configServerCS; } void ShardRegistry::updateConfigServerConnectionString(ConnectionString configServerCS) { stdx::lock_guard lk(_mutex); _updateConfigServerConnectionString_inlock(std::move(configServerCS)); } void ShardRegistry::_updateConfigServerConnectionString_inlock(ConnectionString configServerCS) { log() << "Updating config server connection string to: " << configServerCS.toString(); _configServerCS = std::move(configServerCS); _addConfigShard_inlock(); } void ShardRegistry::startup() { _executorForAddShard->startup(); _executorPool->startup(); } void ShardRegistry::shutdown() { _executorForAddShard->shutdown(); _executorForAddShard->join(); _executorPool->shutdownAndJoin(); } bool ShardRegistry::reload(OperationContext* txn) { stdx::unique_lock lk(_mutex); if (_reloadState == ReloadState::Reloading) { // Another thread is already in the process of reloading so no need to do duplicate work. // There is also an issue if multiple threads are allowed to call getAllShards() // simultaneously because there is no good way to determine which of the threads has the // more recent version of the data. do { _inReloadCV.wait(lk); } while (_reloadState == ReloadState::Reloading); if (_reloadState == ReloadState::Idle) { return false; } // else proceed to reload since an error occured on the last reload attempt. invariant(_reloadState == ReloadState::Failed); } _reloadState = ReloadState::Reloading; lk.unlock(); auto nextReloadState = ReloadState::Failed; auto failGuard = MakeGuard([&] { if (!lk.owns_lock()) { lk.lock(); } _reloadState = nextReloadState; _inReloadCV.notify_all(); }); auto shardsStatus = grid.catalogManager(txn)->getAllShards(txn); if (!shardsStatus.isOK()) { uasserted(shardsStatus.getStatus().code(), str::stream() << "could not get updated shard list from config server due to " << shardsStatus.getStatus().reason()); } auto shards = std::move(shardsStatus.getValue().value); auto reloadOpTime = std::move(shardsStatus.getValue().opTime); LOG(1) << "found " << shards.size() << " shards listed on config server(s) with lastVisibleOpTime: " << reloadOpTime.toBSON(); // Ensure targeter exists for all shards and take shard connection string from the targeter. // Do this before re-taking the mutex to avoid deadlock with the ReplicaSetMonitor updating // hosts for a given shard. std::vector>> shardsInfo; for (const auto& shardType : shards) { // This validation should ideally go inside the ShardType::validate call. However, doing // it there would prevent us from loading previously faulty shard hosts, which might have // been stored (i.e., the entire getAllShards call would fail). auto shardHostStatus = ConnectionString::parse(shardType.getHost()); if (!shardHostStatus.isOK()) { warning() << "Unable to parse shard host " << shardHostStatus.getStatus().toString(); continue; } auto targeter = _targeterFactory->create(shardHostStatus.getValue()); shardsInfo.push_back(std::make_tuple( shardType.getName(), targeter->connectionString(), std::move(targeter))); } lk.lock(); _lookup.clear(); _rsLookup.clear(); _hostLookup.clear(); _addConfigShard_inlock(); for (auto& shardInfo : shardsInfo) { // Skip the config host even if there is one left over from legacy installations. The // config host is installed manually from the catalog manager data. if (std::get<0>(shardInfo) == "config") { continue; } _addShard_inlock(std::move(std::get<0>(shardInfo)), std::move(std::get<1>(shardInfo)), std::move(std::get<2>(shardInfo))); } nextReloadState = ReloadState::Idle; return true; } void ShardRegistry::rebuildConfigShard() { stdx::lock_guard lk(_mutex); _addConfigShard_inlock(); } shared_ptr ShardRegistry::getShard(OperationContext* txn, const ShardId& shardId) { shared_ptr shard = _findUsingLookUp(shardId); if (shard) { return shard; } // If we can't find the shard, we might just need to reload the cache bool didReload = reload(txn); shard = _findUsingLookUp(shardId); if (shard || didReload) { return shard; } reload(txn); return _findUsingLookUp(shardId); } shared_ptr ShardRegistry::getShardNoReload(const ShardId& shardId) { return _findUsingLookUp(shardId); } shared_ptr ShardRegistry::getShardForHostNoReload(const HostAndPort& host) { stdx::lock_guard lk(_mutex); return mapFindWithDefault(_hostLookup, host); } shared_ptr ShardRegistry::getConfigShard() { shared_ptr shard = _findUsingLookUp("config"); invariant(shard); return shard; } unique_ptr ShardRegistry::createConnection(const ConnectionString& connStr) const { return stdx::make_unique("", connStr, _targeterFactory->create(connStr)); } shared_ptr ShardRegistry::lookupRSName(const string& name) const { stdx::lock_guard lk(_mutex); ShardMap::const_iterator i = _rsLookup.find(name); return (i == _rsLookup.end()) ? nullptr : i->second; } void ShardRegistry::remove(const ShardId& id) { stdx::lock_guard lk(_mutex); set entriesToRemove; for (const auto& i : _lookup) { shared_ptr s = i.second; if (s->getId() == id) { entriesToRemove.insert(i.first); ConnectionString connStr = s->getConnString(); for (const auto& host : connStr.getServers()) { entriesToRemove.insert(host.toString()); _hostLookup.erase(host); } } } for (const auto& entry : entriesToRemove) { _lookup.erase(entry); } for (ShardMap::iterator i = _rsLookup.begin(); i != _rsLookup.end();) { shared_ptr s = i->second; if (s->getId() == id) { _rsLookup.erase(i++); } else { ++i; } } shardConnectionPool.removeHost(id); ReplicaSetMonitor::remove(id); } void ShardRegistry::getAllShardIds(vector* all) const { std::set seen; { stdx::lock_guard lk(_mutex); for (ShardMap::const_iterator i = _lookup.begin(); i != _lookup.end(); ++i) { const shared_ptr& s = i->second; if (s->getId() == "config") { continue; } seen.insert(s->getId()); } } all->assign(seen.begin(), seen.end()); } void ShardRegistry::toBSON(BSONObjBuilder* result) { // Need to copy, then sort by shardId. std::vector> shards; { stdx::lock_guard lk(_mutex); shards.reserve(_lookup.size()); for (auto&& shard : _lookup) { shards.emplace_back(shard.first, shard.second->getConnString().toString()); } } std::sort(std::begin(shards), std::end(shards)); BSONObjBuilder mapBob(result->subobjStart("map")); for (auto&& shard : shards) { mapBob.append(shard.first, shard.second); } } void ShardRegistry::appendConnectionStats(executor::ConnectionPoolStats* stats) const { // Get stats from the pool of task executors, including fixed executor within. _executorPool->appendConnectionStats(stats); // Get stats from the separate executor for addShard. _executorForAddShard->appendConnectionStats(stats); } void ShardRegistry::_addConfigShard_inlock() { _addShard_inlock("config", _configServerCS, _configServerCS.type() == ConnectionString::SYNC ? nullptr : _targeterFactory->create(_configServerCS)); } void ShardRegistry::updateReplSetHosts(const ConnectionString& newConnString) { invariant(newConnString.type() == ConnectionString::SET || newConnString.type() == ConnectionString::CUSTOM); // For dbtests stdx::lock_guard lk(_mutex); ShardMap::const_iterator i = _rsLookup.find(newConnString.getSetName()); if (i == _rsLookup.end()) return; auto shard = i->second; if (shard->isConfig()) { _updateConfigServerConnectionString_inlock(newConnString); } else { _addShard_inlock(shard->getId(), newConnString, _targeterFactory->create(newConnString)); } } void ShardRegistry::_addShard_inlock(const ShardId& shardId, const ConnectionString& connString, std::unique_ptr targeter) { auto originalShard = _findUsingLookUp_inlock(shardId); if (originalShard) { auto oldConnString = originalShard->getConnString(); if (oldConnString.toString() != connString.toString()) { log() << "Updating ShardRegistry connection string for shard " << originalShard->getId() << " from: " << oldConnString.toString() << " to: " << connString.toString(); } for (const auto& host : oldConnString.getServers()) { _lookup.erase(host.toString()); _hostLookup.erase(host); } } shared_ptr shard = std::make_shared(shardId, connString, std::move(targeter)); _lookup[shard->getId()] = shard; if (connString.type() == ConnectionString::SET) { _rsLookup[connString.getSetName()] = shard; } else if (connString.type() == ConnectionString::CUSTOM) { // CUSTOM connection strings (ie "$dummy:10000) become DBDirectClient connections which // always return "localhost" as their resposne to getServerAddress(). This is just for // making dbtest work. _lookup["localhost"] = shard; _hostLookup[HostAndPort{"localhost"}] = shard; } // TODO: The only reason to have the shard host names in the lookup table is for the // setShardVersion call, which resolves the shard id from the shard address. This is // error-prone and will go away eventually when we switch all communications to go through // the remote command runner and all nodes are sharding aware by default. _lookup[connString.toString()] = shard; for (const HostAndPort& hostAndPort : connString.getServers()) { _lookup[hostAndPort.toString()] = shard; _hostLookup[hostAndPort] = shard; } } shared_ptr ShardRegistry::_findUsingLookUp(const ShardId& shardId) { stdx::lock_guard lk(_mutex); return _findUsingLookUp_inlock(shardId); } shared_ptr ShardRegistry::_findUsingLookUp_inlock(const ShardId& shardId) { ShardMap::iterator it = _lookup.find(shardId); if (it != _lookup.end()) { return it->second; } return nullptr; } void ShardRegistry::advanceConfigOpTime(OpTime opTime) { stdx::lock_guard lk(_mutex); if (_configOpTime < opTime) { _configOpTime = opTime; } } OpTime ShardRegistry::getConfigOpTime() { stdx::lock_guard lk(_mutex); return _configOpTime; } StatusWith ShardRegistry::_exhaustiveFindOnConfig( OperationContext* txn, const ReadPreferenceSetting& readPref, const NamespaceString& nss, const BSONObj& query, const BSONObj& sort, boost::optional limit) { const auto targeter = getConfigShard()->getTargeter(); const auto host = targeter->findHost(readPref, RemoteCommandTargeter::selectFindHostMaxWaitTime(txn)); if (!host.isOK()) { return host.getStatus(); } // If for some reason the callback never gets invoked, we will return this status Status status = Status(ErrorCodes::InternalError, "Internal error running find command"); QueryResponse response; auto fetcherCallback = [this, &status, &response]( const Fetcher::QueryResponseStatus& dataStatus, Fetcher::NextAction* nextAction) { // Throw out any accumulated results on error if (!dataStatus.isOK()) { status = dataStatus.getStatus(); response.docs.clear(); return; } auto& data = dataStatus.getValue(); if (data.otherFields.metadata.hasField(rpc::kReplSetMetadataFieldName)) { auto replParseStatus = rpc::ReplSetMetadata::readFromMetadata(data.otherFields.metadata); if (!replParseStatus.isOK()) { status = replParseStatus.getStatus(); response.docs.clear(); return; } response.opTime = replParseStatus.getValue().getLastOpVisible(); advanceConfigOpTime(response.opTime); } for (const BSONObj& doc : data.documents) { response.docs.push_back(doc.getOwned()); } status = Status::OK(); }; BSONObj readConcernObj; { const repl::ReadConcernArgs readConcern{getConfigOpTime(), repl::ReadConcernLevel::kMajorityReadConcern}; BSONObjBuilder bob; readConcern.appendInfo(&bob); readConcernObj = bob.done().getObjectField(repl::ReadConcernArgs::kReadConcernFieldName).getOwned(); } auto lpq = LiteParsedQuery::makeAsFindCmd(nss, query, BSONObj(), // projection sort, BSONObj(), // hint readConcernObj, boost::none, // skip limit); BSONObjBuilder findCmdBuilder; lpq->asFindCommand(&findCmdBuilder); Seconds maxTime = kConfigCommandTimeout; Microseconds remainingTxnMaxTime(txn->getRemainingMaxTimeMicros()); if (remainingTxnMaxTime != Microseconds::zero()) { maxTime = duration_cast(remainingTxnMaxTime); } findCmdBuilder.append(LiteParsedQuery::cmdOptionMaxTimeMS, durationCount(maxTime)); QueryFetcher fetcher(_executorPool->getFixedExecutor(), host.getValue(), nss, findCmdBuilder.done(), fetcherCallback, readPref.pref == ReadPreference::PrimaryOnly ? kReplMetadata : kReplSecondaryOkMetadata, maxTime); Status scheduleStatus = fetcher.schedule(); if (!scheduleStatus.isOK()) { return scheduleStatus; } fetcher.wait(); updateReplSetMonitor(targeter, host.getValue(), status); if (!status.isOK()) { return status; } return response; } StatusWith ShardRegistry::exhaustiveFindOnConfig( OperationContext* txn, const ReadPreferenceSetting& readPref, const NamespaceString& nss, const BSONObj& query, const BSONObj& sort, boost::optional limit) { for (int retry = 1; retry <= kOnErrorNumRetries; retry++) { auto result = _exhaustiveFindOnConfig(txn, readPref, nss, query, sort, limit); if (result.isOK()) { return result; } if (kAllRetriableErrors.count(result.getStatus().code()) && retry < kOnErrorNumRetries) { continue; } return result.getStatus(); } MONGO_UNREACHABLE; } StatusWith ShardRegistry::runIdempotentCommandOnShard( OperationContext* txn, const std::shared_ptr& shard, const ReadPreferenceSetting& readPref, const std::string& dbName, const BSONObj& cmdObj) { auto response = _runCommandWithRetries(txn, _executorPool->getFixedExecutor(), shard, readPref, dbName, cmdObj, readPref.pref == ReadPreference::PrimaryOnly ? rpc::makeEmptyMetadata() : kSecondaryOkMetadata, kAllRetriableErrors); if (!response.isOK()) { return response.getStatus(); } return response.getValue().response; } StatusWith ShardRegistry::runIdempotentCommandOnShard( OperationContext* txn, ShardId shardId, const ReadPreferenceSetting& readPref, const std::string& dbName, const BSONObj& cmdObj) { auto shard = getShard(txn, shardId); if (!shard) { return {ErrorCodes::ShardNotFound, str::stream() << "shard " << shardId << " not found"}; } return runIdempotentCommandOnShard(txn, shard, readPref, dbName, cmdObj); } StatusWith ShardRegistry::runIdempotentCommandForAddShard( OperationContext* txn, const std::shared_ptr& shard, const ReadPreferenceSetting& readPref, const std::string& dbName, const BSONObj& cmdObj) { auto status = _runCommandWithRetries(txn, _executorForAddShard.get(), shard, readPref, dbName, cmdObj, readPref.pref == ReadPreference::PrimaryOnly ? rpc::makeEmptyMetadata() : kSecondaryOkMetadata, kAllRetriableErrors); if (!status.isOK()) { return status.getStatus(); } return status.getValue().response; } StatusWith ShardRegistry::runIdempotentCommandOnConfig( OperationContext* txn, const ReadPreferenceSetting& readPref, const std::string& dbName, const BSONObj& cmdObj) { auto response = _runCommandWithRetries( txn, _executorPool->getFixedExecutor(), getConfigShard(), readPref, dbName, cmdObj, readPref.pref == ReadPreference::PrimaryOnly ? kReplMetadata : kReplSecondaryOkMetadata, kAllRetriableErrors); if (!response.isOK()) { return response.getStatus(); } return response.getValue().response; } StatusWith ShardRegistry::runCommandOnConfigWithRetries( OperationContext* txn, const std::string& dbname, const BSONObj& cmdObj, const ShardRegistry::ErrorCodesSet& errorsToCheck) { auto response = _runCommandWithRetries(txn, _executorPool->getFixedExecutor(), getConfigShard(), ReadPreferenceSetting{ReadPreference::PrimaryOnly}, dbname, cmdObj, kReplMetadata, errorsToCheck); if (!response.isOK()) { return response.getStatus(); } return response.getValue().response; } StatusWith ShardRegistry::_runCommandWithRetries( OperationContext* txn, TaskExecutor* executor, const std::shared_ptr& shard, const ReadPreferenceSetting& readPref, const std::string& dbname, const BSONObj& cmdObj, const BSONObj& metadata, const ShardRegistry::ErrorCodesSet& errorsToCheck) { const bool isConfigShard = shard->isConfig(); for (int retry = 1; retry <= kOnErrorNumRetries; ++retry) { const BSONObj cmdWithMaxTimeMS = (isConfigShard ? appendMaxTimeToCmdObj(txn->getRemainingMaxTimeMicros(), cmdObj) : cmdObj); auto response = _runCommandWithMetadata( txn, executor, shard, readPref, dbname, cmdWithMaxTimeMS, metadata, errorsToCheck); if (response.isOK()) { return response; } if (errorsToCheck.count(response.getStatus().code()) && retry < kOnErrorNumRetries) { LOG(1) << "Command failed with retriable error and will be retried" << causedBy(response.getStatus()); continue; } return response.getStatus(); } MONGO_UNREACHABLE; } StatusWith ShardRegistry::_runCommandWithMetadata( OperationContext* txn, TaskExecutor* executor, const std::shared_ptr& shard, const ReadPreferenceSetting& readPref, const std::string& dbName, const BSONObj& cmdObj, const BSONObj& metadata, const ShardRegistry::ErrorCodesSet& errorsToCheck) { auto targeter = shard->getTargeter(); auto host = targeter->findHost(readPref, RemoteCommandTargeter::selectFindHostMaxWaitTime(txn)); if (!host.isOK()) { return host.getStatus(); } executor::RemoteCommandRequest request( host.getValue(), dbName, cmdObj, metadata, kConfigCommandTimeout); StatusWith responseStatus = Status(ErrorCodes::InternalError, "Internal error running command"); auto callStatus = executor->scheduleRemoteCommand(request, [&responseStatus](const RemoteCommandCallbackArgs& args) { responseStatus = args.response; }); if (!callStatus.isOK()) { return callStatus.getStatus(); } // Block until the command is carried out executor->wait(callStatus.getValue()); if (!responseStatus.isOK()) { updateReplSetMonitor(targeter, host.getValue(), responseStatus.getStatus()); return responseStatus.getStatus(); } auto response = std::move(responseStatus.getValue()); Status commandSpecificStatus = getStatusFromCommandResult(response.data); updateReplSetMonitor(targeter, host.getValue(), commandSpecificStatus); CommandResponse cmdResponse; cmdResponse.response = response.data.getOwned(); cmdResponse.metadata = response.metadata.getOwned(); if (response.metadata.hasField(rpc::kReplSetMetadataFieldName)) { auto replParseStatus = rpc::ReplSetMetadata::readFromMetadata(response.metadata); if (!replParseStatus.isOK()) { return replParseStatus.getStatus(); } const auto& replMetadata = replParseStatus.getValue(); cmdResponse.visibleOpTime = replMetadata.getLastOpVisible(); if (shard->isConfig()) { advanceConfigOpTime(cmdResponse.visibleOpTime); } } if (errorsToCheck.count(commandSpecificStatus.code())) { return commandSpecificStatus; } Status writeConcernStatus = checkForWriteConcernError(response.data); if (!writeConcernStatus.isOK()) { return writeConcernStatus; } return StatusWith(std::move(cmdResponse)); } void ShardRegistry::updateReplSetMonitor(const std::shared_ptr& targeter, const HostAndPort& remoteHost, const Status& remoteCommandStatus) { if (remoteCommandStatus.isOK()) return; if (ErrorCodes::isNotMasterError(remoteCommandStatus.code()) || (remoteCommandStatus == ErrorCodes::InterruptedDueToReplStateChange)) { targeter->markHostNotMaster(remoteHost); } else if (ErrorCodes::isNetworkError(remoteCommandStatus.code())) { targeter->markHostUnreachable(remoteHost); } else if (remoteCommandStatus == ErrorCodes::NotMasterOrSecondary) { targeter->markHostUnreachable(remoteHost); } else if (remoteCommandStatus == ErrorCodes::ExceededTimeLimit) { targeter->markHostUnreachable(remoteHost); } } } // namespace mongo