diff options
Diffstat (limited to 'src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp')
-rw-r--r-- | src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp | 702 |
1 files changed, 343 insertions, 359 deletions
diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp index 9cc35e05bcd..15c7913feb6 100644 --- a/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp +++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp @@ -63,465 +63,449 @@ namespace mongo { - using std::set; - using std::string; - using std::unique_ptr; - using std::vector; - using str::stream; +using std::set; +using std::string; +using std::unique_ptr; +using std::vector; +using str::stream; namespace { - const Status notYetImplemented(ErrorCodes::InternalError, "Not yet implemented"); // todo remove +const Status notYetImplemented(ErrorCodes::InternalError, "Not yet implemented"); // todo remove - // Until read committed is supported always write to the primary with majoirty write and read - // from the secondary. That way we ensure that reads will see a consistent data. - const ReadPreferenceSetting kConfigWriteSelector(ReadPreference::PrimaryOnly, TagSet{}); - const ReadPreferenceSetting kConfigReadSelector(ReadPreference::SecondaryOnly, TagSet{}); +// Until read committed is supported always write to the primary with majoirty write and read +// from the secondary. That way we ensure that reads will see a consistent data. +const ReadPreferenceSetting kConfigWriteSelector(ReadPreference::PrimaryOnly, TagSet{}); +const ReadPreferenceSetting kConfigReadSelector(ReadPreference::SecondaryOnly, TagSet{}); - const int kNotMasterNumRetries = 3; - const Milliseconds kNotMasterRetryInterval{500}; +const int kNotMasterNumRetries = 3; +const Milliseconds kNotMasterRetryInterval{500}; - void _toBatchError(const Status& status, BatchedCommandResponse* response) { - response->clear(); - response->setErrCode(status.code()); - response->setErrMessage(status.reason()); - response->setOk(false); - } - -} // namespace +void _toBatchError(const Status& status, BatchedCommandResponse* response) { + response->clear(); + response->setErrCode(status.code()); + response->setErrMessage(status.reason()); + response->setOk(false); +} - CatalogManagerReplicaSet::CatalogManagerReplicaSet() = default; +} // namespace - CatalogManagerReplicaSet::~CatalogManagerReplicaSet() = default; +CatalogManagerReplicaSet::CatalogManagerReplicaSet() = default; - Status CatalogManagerReplicaSet::init(const ConnectionString& configCS, - std::unique_ptr<DistLockManager> distLockManager) { +CatalogManagerReplicaSet::~CatalogManagerReplicaSet() = default; - invariant(configCS.type() == ConnectionString::SET); +Status CatalogManagerReplicaSet::init(const ConnectionString& configCS, + std::unique_ptr<DistLockManager> distLockManager) { + invariant(configCS.type() == ConnectionString::SET); - _configServerConnectionString = configCS; - _distLockManager = std::move(distLockManager); + _configServerConnectionString = configCS; + _distLockManager = std::move(distLockManager); - return Status::OK(); - } - - Status CatalogManagerReplicaSet::startup(bool upgrade) { - return Status::OK(); - } + return Status::OK(); +} - ConnectionString CatalogManagerReplicaSet::connectionString() const { - return _configServerConnectionString; - } +Status CatalogManagerReplicaSet::startup(bool upgrade) { + return Status::OK(); +} - void CatalogManagerReplicaSet::shutDown() { - LOG(1) << "CatalogManagerReplicaSet::shutDown() called."; - { - std::lock_guard<std::mutex> lk(_mutex); - _inShutdown = true; - } +ConnectionString CatalogManagerReplicaSet::connectionString() const { + return _configServerConnectionString; +} - invariant(_distLockManager); - _distLockManager->shutDown(); +void CatalogManagerReplicaSet::shutDown() { + LOG(1) << "CatalogManagerReplicaSet::shutDown() called."; + { + std::lock_guard<std::mutex> lk(_mutex); + _inShutdown = true; } - Status CatalogManagerReplicaSet::enableSharding(const std::string& dbName) { - return notYetImplemented; + invariant(_distLockManager); + _distLockManager->shutDown(); +} + +Status CatalogManagerReplicaSet::enableSharding(const std::string& dbName) { + return notYetImplemented; +} + +Status CatalogManagerReplicaSet::shardCollection(const string& ns, + const ShardKeyPattern& fieldsAndOrder, + bool unique, + vector<BSONObj>* initPoints, + set<ShardId>* initShardsIds) { + return notYetImplemented; +} + +Status CatalogManagerReplicaSet::createDatabase(const std::string& dbName) { + return notYetImplemented; +} + +StatusWith<string> CatalogManagerReplicaSet::addShard(const string& name, + const ConnectionString& shardConnectionString, + const long long maxSize) { + return notYetImplemented; +} + +StatusWith<ShardDrainingStatus> CatalogManagerReplicaSet::removeShard(OperationContext* txn, + const std::string& name) { + return notYetImplemented; +} + +Status CatalogManagerReplicaSet::updateDatabase(const std::string& dbName, const DatabaseType& db) { + fassert(28684, db.validate()); + + return notYetImplemented; +} + +StatusWith<DatabaseType> CatalogManagerReplicaSet::getDatabase(const std::string& dbName) { + invariant(nsIsDbOnly(dbName)); + + // The two databases that are hosted on the config server are config and admin + if (dbName == "config" || dbName == "admin") { + DatabaseType dbt; + dbt.setName(dbName); + dbt.setSharded(false); + dbt.setPrimary("config"); + + return dbt; } - Status CatalogManagerReplicaSet::shardCollection(const string& ns, - const ShardKeyPattern& fieldsAndOrder, - bool unique, - vector<BSONObj>* initPoints, - set<ShardId>* initShardsIds) { - return notYetImplemented; + const auto configShard = grid.shardRegistry()->getShard("config"); + const auto readHost = configShard->getTargeter()->findHost(kConfigReadSelector); + if (!readHost.isOK()) { + return readHost.getStatus(); } - Status CatalogManagerReplicaSet::createDatabase(const std::string& dbName) { - return notYetImplemented; - } + auto findStatus = grid.shardRegistry()->exhaustiveFind(readHost.getValue(), + NamespaceString(DatabaseType::ConfigNS), + BSON(DatabaseType::name(dbName)), + 1); - StatusWith<string> CatalogManagerReplicaSet::addShard( - const string& name, - const ConnectionString& shardConnectionString, - const long long maxSize) { - return notYetImplemented; + if (!findStatus.isOK()) { + return findStatus.getStatus(); } - StatusWith<ShardDrainingStatus> CatalogManagerReplicaSet::removeShard(OperationContext* txn, - const std::string& name) { - return notYetImplemented; + const auto& docs = findStatus.getValue(); + if (docs.empty()) { + return {ErrorCodes::NamespaceNotFound, stream() << "database " << dbName << " not found"}; } - Status CatalogManagerReplicaSet::updateDatabase(const std::string& dbName, - const DatabaseType& db) { - fassert(28684, db.validate()); - - return notYetImplemented; + invariant(docs.size() == 1); + + return DatabaseType::fromBSON(docs.front()); +} + +Status CatalogManagerReplicaSet::updateCollection(const std::string& collNs, + const CollectionType& coll) { + fassert(28683, coll.validate()); + + BatchedCommandResponse response; + Status status = update(CollectionType::ConfigNS, + BSON(CollectionType::fullNs(collNs)), + coll.toBSON(), + true, // upsert + false, // multi + &response); + if (!status.isOK()) { + return Status(status.code(), + str::stream() << "collection metadata write failed: " << response.toBSON() + << "; status: " << status.toString()); } - StatusWith<DatabaseType> CatalogManagerReplicaSet::getDatabase(const std::string& dbName) { - invariant(nsIsDbOnly(dbName)); - - // The two databases that are hosted on the config server are config and admin - if (dbName == "config" || dbName == "admin") { - DatabaseType dbt; - dbt.setName(dbName); - dbt.setSharded(false); - dbt.setPrimary("config"); + return Status::OK(); +} - return dbt; - } - - const auto configShard = grid.shardRegistry()->getShard("config"); - const auto readHost = configShard->getTargeter()->findHost(kConfigReadSelector); - if (!readHost.isOK()) { - return readHost.getStatus(); - } - - auto findStatus = grid.shardRegistry()->exhaustiveFind( - readHost.getValue(), - NamespaceString(DatabaseType::ConfigNS), - BSON(DatabaseType::name(dbName)), - 1); - - if (!findStatus.isOK()) { - return findStatus.getStatus(); - } +StatusWith<CollectionType> CatalogManagerReplicaSet::getCollection(const std::string& collNs) { + auto configShard = grid.shardRegistry()->getShard("config"); - const auto& docs = findStatus.getValue(); - if (docs.empty()) { - return {ErrorCodes::NamespaceNotFound, - stream() << "database " << dbName << " not found"}; - } + auto readHostStatus = configShard->getTargeter()->findHost(kConfigReadSelector); + if (!readHostStatus.isOK()) { + return readHostStatus.getStatus(); + } - invariant(docs.size() == 1); + auto statusFind = + grid.shardRegistry()->exhaustiveFind(readHostStatus.getValue(), + NamespaceString(CollectionType::ConfigNS), + BSON(CollectionType::fullNs(collNs)), + 1); - return DatabaseType::fromBSON(docs.front()); + if (!statusFind.isOK()) { + return statusFind.getStatus(); } - Status CatalogManagerReplicaSet::updateCollection(const std::string& collNs, - const CollectionType& coll) { - fassert(28683, coll.validate()); - - BatchedCommandResponse response; - Status status = update(CollectionType::ConfigNS, - BSON(CollectionType::fullNs(collNs)), - coll.toBSON(), - true, // upsert - false, // multi - &response); - if (!status.isOK()) { - return Status(status.code(), - str::stream() << "collection metadata write failed: " - << response.toBSON() << "; status: " << status.toString()); - } - - return Status::OK(); + const auto& retVal = statusFind.getValue(); + if (retVal.empty()) { + return Status(ErrorCodes::NamespaceNotFound, + stream() << "collection " << collNs << " not found"); } - StatusWith<CollectionType> CatalogManagerReplicaSet::getCollection(const std::string& collNs) { - auto configShard = grid.shardRegistry()->getShard("config"); + invariant(retVal.size() == 1); - auto readHostStatus = configShard->getTargeter()->findHost(kConfigReadSelector); - if (!readHostStatus.isOK()) { - return readHostStatus.getStatus(); - } + return CollectionType::fromBSON(retVal.front()); +} - auto statusFind = grid.shardRegistry()->exhaustiveFind( - readHostStatus.getValue(), - NamespaceString(CollectionType::ConfigNS), - BSON(CollectionType::fullNs(collNs)), - 1); +Status CatalogManagerReplicaSet::getCollections(const std::string* dbName, + std::vector<CollectionType>* collections) { + return notYetImplemented; +} - if (!statusFind.isOK()) { - return statusFind.getStatus(); - } +Status CatalogManagerReplicaSet::dropCollection(const std::string& collectionNs) { + return notYetImplemented; +} - const auto& retVal = statusFind.getValue(); - if (retVal.empty()) { - return Status(ErrorCodes::NamespaceNotFound, - stream() << "collection " << collNs << " not found"); - } +void CatalogManagerReplicaSet::logAction(const ActionLogType& actionLog) {} - invariant(retVal.size() == 1); +void CatalogManagerReplicaSet::logChange(OperationContext* opCtx, + const string& what, + const string& ns, + const BSONObj& detail) {} - return CollectionType::fromBSON(retVal.front()); +StatusWith<SettingsType> CatalogManagerReplicaSet::getGlobalSettings(const string& key) { + const auto configShard = grid.shardRegistry()->getShard("config"); + const auto readHost = configShard->getTargeter()->findHost(kConfigReadSelector); + if (!readHost.isOK()) { + return readHost.getStatus(); } - Status CatalogManagerReplicaSet::getCollections(const std::string* dbName, - std::vector<CollectionType>* collections) { - return notYetImplemented; - } + auto findStatus = grid.shardRegistry()->exhaustiveFind(readHost.getValue(), + NamespaceString(SettingsType::ConfigNS), + BSON(SettingsType::key(key)), + 1); - Status CatalogManagerReplicaSet::dropCollection(const std::string& collectionNs) { - return notYetImplemented; + if (!findStatus.isOK()) { + return findStatus.getStatus(); } - void CatalogManagerReplicaSet::logAction(const ActionLogType& actionLog) { - + const auto& docs = findStatus.getValue(); + if (docs.empty()) { + return {ErrorCodes::NoMatchingDocument, + str::stream() << "can't find settings document with key: " << key}; } - void CatalogManagerReplicaSet::logChange(OperationContext* opCtx, - const string& what, - const string& ns, - const BSONObj& detail) { + BSONObj settingsDoc = docs.front(); + StatusWith<SettingsType> settingsResult = SettingsType::fromBSON(settingsDoc); + if (!settingsResult.isOK()) { + return {ErrorCodes::FailedToParse, + str::stream() << "error while parsing settings document: " << settingsDoc << " : " + << settingsResult.getStatus().toString()}; } - StatusWith<SettingsType> CatalogManagerReplicaSet::getGlobalSettings(const string& key) { - const auto configShard = grid.shardRegistry()->getShard("config"); - const auto readHost = configShard->getTargeter()->findHost(kConfigReadSelector); - if (!readHost.isOK()) { - return readHost.getStatus(); - } - - auto findStatus = grid.shardRegistry()->exhaustiveFind( - readHost.getValue(), - NamespaceString(SettingsType::ConfigNS), - BSON(SettingsType::key(key)), - 1); - - if (!findStatus.isOK()) { - return findStatus.getStatus(); - } - - const auto& docs = findStatus.getValue(); - if (docs.empty()) { - return {ErrorCodes::NoMatchingDocument, - str::stream() << "can't find settings document with key: " << key}; - } - - BSONObj settingsDoc = docs.front(); - StatusWith<SettingsType> settingsResult = SettingsType::fromBSON(settingsDoc); - if (!settingsResult.isOK()) { - return {ErrorCodes::FailedToParse, - str::stream() << "error while parsing settings document: " << settingsDoc - << " : " << settingsResult.getStatus().toString()}; - } - - const SettingsType& settings = settingsResult.getValue(); + const SettingsType& settings = settingsResult.getValue(); - Status validationStatus = settings.validate(); - if (!validationStatus.isOK()) { - return validationStatus; - } - - return settingsResult; + Status validationStatus = settings.validate(); + if (!validationStatus.isOK()) { + return validationStatus; } - Status CatalogManagerReplicaSet::getDatabasesForShard(const string& shardName, - vector<string>* dbs) { - return notYetImplemented; + return settingsResult; +} + +Status CatalogManagerReplicaSet::getDatabasesForShard(const string& shardName, + vector<string>* dbs) { + return notYetImplemented; +} + +Status CatalogManagerReplicaSet::getChunks(const Query& query, + int nToReturn, + vector<ChunkType>* chunks) { + auto configShard = grid.shardRegistry()->getShard("config"); + auto readHostStatus = configShard->getTargeter()->findHost(kConfigReadSelector); + if (!readHostStatus.isOK()) { + return readHostStatus.getStatus(); } - Status CatalogManagerReplicaSet::getChunks(const Query& query, - int nToReturn, - vector<ChunkType>* chunks) { + auto findStatus = grid.shardRegistry()->exhaustiveFind(readHostStatus.getValue(), + NamespaceString(ChunkType::ConfigNS), + query.obj, + boost::none); // no limit + if (!findStatus.isOK()) { + return findStatus.getStatus(); + } - auto configShard = grid.shardRegistry()->getShard("config"); - auto readHostStatus = configShard->getTargeter()->findHost(kConfigReadSelector); - if (!readHostStatus.isOK()) { - return readHostStatus.getStatus(); + for (const BSONObj& obj : findStatus.getValue()) { + auto chunkRes = ChunkType::fromBSON(obj); + if (!chunkRes.isOK()) { + chunks->clear(); + return {ErrorCodes::FailedToParse, + stream() << "Failed to parse chunk with id (" + << obj[ChunkType::name()].toString() + << "): " << chunkRes.getStatus().reason()}; } - auto findStatus = grid.shardRegistry()->exhaustiveFind(readHostStatus.getValue(), - NamespaceString(ChunkType::ConfigNS), - query.obj, - boost::none); // no limit - if (!findStatus.isOK()) { - return findStatus.getStatus(); - } + chunks->push_back(chunkRes.getValue()); + } - for (const BSONObj& obj : findStatus.getValue()) { - auto chunkRes = ChunkType::fromBSON(obj); - if (!chunkRes.isOK()) { - chunks->clear(); - return {ErrorCodes::FailedToParse, - stream() << "Failed to parse chunk with id (" - << obj[ChunkType::name()].toString() << "): " - << chunkRes.getStatus().reason()}; - } + return Status::OK(); +} - chunks->push_back(chunkRes.getValue()); - } +Status CatalogManagerReplicaSet::getTagsForCollection(const std::string& collectionNs, + std::vector<TagsType>* tags) { + return notYetImplemented; +} - return Status::OK(); - } +StatusWith<string> CatalogManagerReplicaSet::getTagForChunk(const std::string& collectionNs, + const ChunkType& chunk) { + return notYetImplemented; +} - Status CatalogManagerReplicaSet::getTagsForCollection(const std::string& collectionNs, - std::vector<TagsType>* tags) { - return notYetImplemented; +Status CatalogManagerReplicaSet::getAllShards(vector<ShardType>* shards) { + const auto configShard = grid.shardRegistry()->getShard("config"); + const auto readHost = configShard->getTargeter()->findHost(kConfigReadSelector); + if (!readHost.isOK()) { + return readHost.getStatus(); } - StatusWith<string> CatalogManagerReplicaSet::getTagForChunk(const std::string& collectionNs, - const ChunkType& chunk) { - return notYetImplemented; + auto findStatus = grid.shardRegistry()->exhaustiveFind(readHost.getValue(), + NamespaceString(ShardType::ConfigNS), + BSONObj(), // no query filter + boost::none); // no limit + if (!findStatus.isOK()) { + return findStatus.getStatus(); } - Status CatalogManagerReplicaSet::getAllShards(vector<ShardType>* shards) { - const auto configShard = grid.shardRegistry()->getShard("config"); - const auto readHost = configShard->getTargeter()->findHost(kConfigReadSelector); - if (!readHost.isOK()) { - return readHost.getStatus(); - } - - auto findStatus = grid.shardRegistry()->exhaustiveFind(readHost.getValue(), - NamespaceString(ShardType::ConfigNS), - BSONObj(), // no query filter - boost::none); // no limit - if (!findStatus.isOK()) { - return findStatus.getStatus(); - } - - for (const BSONObj& doc : findStatus.getValue()) { - auto shardRes = ShardType::fromBSON(doc); - if (!shardRes.isOK()) { - shards->clear(); - return {ErrorCodes::FailedToParse, - stream() << "Failed to parse shard with id (" - << doc[ShardType::name()].toString() << "): " - << shardRes.getStatus().reason()}; - } - - shards->push_back(shardRes.getValue()); + for (const BSONObj& doc : findStatus.getValue()) { + auto shardRes = ShardType::fromBSON(doc); + if (!shardRes.isOK()) { + shards->clear(); + return {ErrorCodes::FailedToParse, + stream() << "Failed to parse shard with id (" + << doc[ShardType::name()].toString() + << "): " << shardRes.getStatus().reason()}; } - return Status::OK(); - } - - bool CatalogManagerReplicaSet::isShardHost(const ConnectionString& connectionString) { - return false; + shards->push_back(shardRes.getValue()); } - bool CatalogManagerReplicaSet::runUserManagementWriteCommand(const std::string& commandName, - const std::string& dbname, - const BSONObj& cmdObj, - BSONObjBuilder* result) { - auto scopedDistLock = getDistLockManager()->lock("authorizationData", - commandName, - Seconds{5}); - if (!scopedDistLock.isOK()) { - return Command::appendCommandStatus(*result, scopedDistLock.getStatus()); - } - - auto targeter = grid.shardRegistry()->getShard("config")->getTargeter(); + return Status::OK(); +} - Status notMasterStatus{ErrorCodes::InternalError, "status not set"}; - for (int i = 0; i < kNotMasterNumRetries; ++i) { +bool CatalogManagerReplicaSet::isShardHost(const ConnectionString& connectionString) { + return false; +} - auto target = targeter->findHost(kConfigWriteSelector); - if (!target.isOK()) { - if (ErrorCodes::NotMaster == target.getStatus()) { - notMasterStatus = target.getStatus(); - sleepmillis(kNotMasterRetryInterval.count()); - continue; - } - return Command::appendCommandStatus(*result, target.getStatus()); - } +bool CatalogManagerReplicaSet::runUserManagementWriteCommand(const std::string& commandName, + const std::string& dbname, + const BSONObj& cmdObj, + BSONObjBuilder* result) { + auto scopedDistLock = getDistLockManager()->lock("authorizationData", commandName, Seconds{5}); + if (!scopedDistLock.isOK()) { + return Command::appendCommandStatus(*result, scopedDistLock.getStatus()); + } - auto response = grid.shardRegistry()->runCommand(target.getValue(), dbname, cmdObj); - if (!response.isOK()) { - return Command::appendCommandStatus(*result, response.getStatus()); - } + auto targeter = grid.shardRegistry()->getShard("config")->getTargeter(); - Status commandStatus = Command::getStatusFromCommandResult(response.getValue()); - if (ErrorCodes::NotMaster == commandStatus) { - notMasterStatus = commandStatus; + Status notMasterStatus{ErrorCodes::InternalError, "status not set"}; + for (int i = 0; i < kNotMasterNumRetries; ++i) { + auto target = targeter->findHost(kConfigWriteSelector); + if (!target.isOK()) { + if (ErrorCodes::NotMaster == target.getStatus()) { + notMasterStatus = target.getStatus(); sleepmillis(kNotMasterRetryInterval.count()); continue; } - - result->appendElements(response.getValue()); - - return commandStatus.isOK(); + return Command::appendCommandStatus(*result, target.getStatus()); } - invariant(ErrorCodes::NotMaster == notMasterStatus); - return Command::appendCommandStatus(*result, notMasterStatus); - } - - bool CatalogManagerReplicaSet::runUserManagementReadCommand(const std::string& dbname, - const BSONObj& cmdObj, - BSONObjBuilder* result) { - auto targeter = grid.shardRegistry()->getShard("config")->getTargeter(); - auto target = targeter->findHost(kConfigReadSelector); - if (!target.isOK()) { - return Command::appendCommandStatus(*result, target.getStatus()); + auto response = grid.shardRegistry()->runCommand(target.getValue(), dbname, cmdObj); + if (!response.isOK()) { + return Command::appendCommandStatus(*result, response.getStatus()); } - auto resultStatus = grid.shardRegistry()->runCommand(target.getValue(), dbname, cmdObj); - if (!resultStatus.isOK()) { - return Command::appendCommandStatus(*result, resultStatus.getStatus()); + Status commandStatus = Command::getStatusFromCommandResult(response.getValue()); + if (ErrorCodes::NotMaster == commandStatus) { + notMasterStatus = commandStatus; + sleepmillis(kNotMasterRetryInterval.count()); + continue; } - result->appendElements(resultStatus.getValue()); + result->appendElements(response.getValue()); - return Command::getStatusFromCommandResult(resultStatus.getValue()).isOK(); - return false; + return commandStatus.isOK(); } - Status CatalogManagerReplicaSet::applyChunkOpsDeprecated(const BSONArray& updateOps, - const BSONArray& preCondition) { - return notYetImplemented; + invariant(ErrorCodes::NotMaster == notMasterStatus); + return Command::appendCommandStatus(*result, notMasterStatus); +} + +bool CatalogManagerReplicaSet::runUserManagementReadCommand(const std::string& dbname, + const BSONObj& cmdObj, + BSONObjBuilder* result) { + auto targeter = grid.shardRegistry()->getShard("config")->getTargeter(); + auto target = targeter->findHost(kConfigReadSelector); + if (!target.isOK()) { + return Command::appendCommandStatus(*result, target.getStatus()); } - DistLockManager* CatalogManagerReplicaSet::getDistLockManager() { - invariant(_distLockManager); - return _distLockManager.get(); + auto resultStatus = grid.shardRegistry()->runCommand(target.getValue(), dbname, cmdObj); + if (!resultStatus.isOK()) { + return Command::appendCommandStatus(*result, resultStatus.getStatus()); } - void CatalogManagerReplicaSet::writeConfigServerDirect( - const BatchedCommandRequest& batchRequest, - BatchedCommandResponse* batchResponse) { - std::string dbname = batchRequest.getNSS().db().toString(); - invariant (dbname == "config" || dbname == "admin"); - const BSONObj cmdObj = batchRequest.toBSON(); - auto targeter = grid.shardRegistry()->getShard("config")->getTargeter(); - - Status notMasterStatus{ErrorCodes::InternalError, "status not set"}; - for (int i = 0; i < kNotMasterNumRetries; ++i) { - - auto target = targeter->findHost(kConfigWriteSelector); - if (!target.isOK()) { - if (ErrorCodes::NotMaster == target.getStatus()) { - notMasterStatus = target.getStatus(); - sleepmillis(kNotMasterRetryInterval.count()); - continue; - } - _toBatchError(target.getStatus(), batchResponse); - return; - } + result->appendElements(resultStatus.getValue()); - auto resultStatus = grid.shardRegistry()->runCommand(target.getValue(), - batchRequest.getNSS().db().toString(), - batchRequest.toBSON()); + return Command::getStatusFromCommandResult(resultStatus.getValue()).isOK(); + return false; +} - if (!resultStatus.isOK()) { - _toBatchError(resultStatus.getStatus(), batchResponse); - return; - } +Status CatalogManagerReplicaSet::applyChunkOpsDeprecated(const BSONArray& updateOps, + const BSONArray& preCondition) { + return notYetImplemented; +} + +DistLockManager* CatalogManagerReplicaSet::getDistLockManager() { + invariant(_distLockManager); + return _distLockManager.get(); +} - const BSONObj& commandResponse = resultStatus.getValue(); +void CatalogManagerReplicaSet::writeConfigServerDirect(const BatchedCommandRequest& batchRequest, + BatchedCommandResponse* batchResponse) { + std::string dbname = batchRequest.getNSS().db().toString(); + invariant(dbname == "config" || dbname == "admin"); + const BSONObj cmdObj = batchRequest.toBSON(); + auto targeter = grid.shardRegistry()->getShard("config")->getTargeter(); - Status commandStatus = getStatusFromCommandResult(commandResponse); - if (commandStatus == ErrorCodes::NotMaster) { - notMasterStatus = commandStatus; + Status notMasterStatus{ErrorCodes::InternalError, "status not set"}; + for (int i = 0; i < kNotMasterNumRetries; ++i) { + auto target = targeter->findHost(kConfigWriteSelector); + if (!target.isOK()) { + if (ErrorCodes::NotMaster == target.getStatus()) { + notMasterStatus = target.getStatus(); sleepmillis(kNotMasterRetryInterval.count()); continue; } + _toBatchError(target.getStatus(), batchResponse); + return; + } - string errmsg; - if (!batchResponse->parseBSON(commandResponse, &errmsg)) { - _toBatchError(Status(ErrorCodes::FailedToParse, - str::stream() << "Failed to parse config server response: " << - errmsg), - batchResponse); - return; - } - return; // The normal case return point. + auto resultStatus = grid.shardRegistry()->runCommand( + target.getValue(), batchRequest.getNSS().db().toString(), batchRequest.toBSON()); + + if (!resultStatus.isOK()) { + _toBatchError(resultStatus.getStatus(), batchResponse); + return; } - invariant(ErrorCodes::NotMaster == notMasterStatus); - _toBatchError(notMasterStatus, batchResponse); + const BSONObj& commandResponse = resultStatus.getValue(); + + Status commandStatus = getStatusFromCommandResult(commandResponse); + if (commandStatus == ErrorCodes::NotMaster) { + notMasterStatus = commandStatus; + sleepmillis(kNotMasterRetryInterval.count()); + continue; + } + + string errmsg; + if (!batchResponse->parseBSON(commandResponse, &errmsg)) { + _toBatchError( + Status(ErrorCodes::FailedToParse, + str::stream() << "Failed to parse config server response: " << errmsg), + batchResponse); + return; + } + return; // The normal case return point. } -} // namespace mongo + invariant(ErrorCodes::NotMaster == notMasterStatus); + _toBatchError(notMasterStatus, batchResponse); +} + +} // namespace mongo |