diff options
Diffstat (limited to 'src/mongo/s/catalog/replset')
8 files changed, 2956 insertions, 3040 deletions
diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp index 9cc35e05bcd..15c7913feb6 100644 --- a/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp +++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp @@ -63,465 +63,449 @@ namespace mongo { - using std::set; - using std::string; - using std::unique_ptr; - using std::vector; - using str::stream; +using std::set; +using std::string; +using std::unique_ptr; +using std::vector; +using str::stream; namespace { - const Status notYetImplemented(ErrorCodes::InternalError, "Not yet implemented"); // todo remove +const Status notYetImplemented(ErrorCodes::InternalError, "Not yet implemented"); // todo remove - // Until read committed is supported always write to the primary with majoirty write and read - // from the secondary. That way we ensure that reads will see a consistent data. - const ReadPreferenceSetting kConfigWriteSelector(ReadPreference::PrimaryOnly, TagSet{}); - const ReadPreferenceSetting kConfigReadSelector(ReadPreference::SecondaryOnly, TagSet{}); +// Until read committed is supported always write to the primary with majoirty write and read +// from the secondary. That way we ensure that reads will see a consistent data. +const ReadPreferenceSetting kConfigWriteSelector(ReadPreference::PrimaryOnly, TagSet{}); +const ReadPreferenceSetting kConfigReadSelector(ReadPreference::SecondaryOnly, TagSet{}); - const int kNotMasterNumRetries = 3; - const Milliseconds kNotMasterRetryInterval{500}; +const int kNotMasterNumRetries = 3; +const Milliseconds kNotMasterRetryInterval{500}; - void _toBatchError(const Status& status, BatchedCommandResponse* response) { - response->clear(); - response->setErrCode(status.code()); - response->setErrMessage(status.reason()); - response->setOk(false); - } - -} // namespace +void _toBatchError(const Status& status, BatchedCommandResponse* response) { + response->clear(); + response->setErrCode(status.code()); + response->setErrMessage(status.reason()); + response->setOk(false); +} - CatalogManagerReplicaSet::CatalogManagerReplicaSet() = default; +} // namespace - CatalogManagerReplicaSet::~CatalogManagerReplicaSet() = default; +CatalogManagerReplicaSet::CatalogManagerReplicaSet() = default; - Status CatalogManagerReplicaSet::init(const ConnectionString& configCS, - std::unique_ptr<DistLockManager> distLockManager) { +CatalogManagerReplicaSet::~CatalogManagerReplicaSet() = default; - invariant(configCS.type() == ConnectionString::SET); +Status CatalogManagerReplicaSet::init(const ConnectionString& configCS, + std::unique_ptr<DistLockManager> distLockManager) { + invariant(configCS.type() == ConnectionString::SET); - _configServerConnectionString = configCS; - _distLockManager = std::move(distLockManager); + _configServerConnectionString = configCS; + _distLockManager = std::move(distLockManager); - return Status::OK(); - } - - Status CatalogManagerReplicaSet::startup(bool upgrade) { - return Status::OK(); - } + return Status::OK(); +} - ConnectionString CatalogManagerReplicaSet::connectionString() const { - return _configServerConnectionString; - } +Status CatalogManagerReplicaSet::startup(bool upgrade) { + return Status::OK(); +} - void CatalogManagerReplicaSet::shutDown() { - LOG(1) << "CatalogManagerReplicaSet::shutDown() called."; - { - std::lock_guard<std::mutex> lk(_mutex); - _inShutdown = true; - } +ConnectionString CatalogManagerReplicaSet::connectionString() const { + return _configServerConnectionString; +} - invariant(_distLockManager); - _distLockManager->shutDown(); +void CatalogManagerReplicaSet::shutDown() { + LOG(1) << "CatalogManagerReplicaSet::shutDown() called."; + { + std::lock_guard<std::mutex> lk(_mutex); + _inShutdown = true; } - Status CatalogManagerReplicaSet::enableSharding(const std::string& dbName) { - return notYetImplemented; + invariant(_distLockManager); + _distLockManager->shutDown(); +} + +Status CatalogManagerReplicaSet::enableSharding(const std::string& dbName) { + return notYetImplemented; +} + +Status CatalogManagerReplicaSet::shardCollection(const string& ns, + const ShardKeyPattern& fieldsAndOrder, + bool unique, + vector<BSONObj>* initPoints, + set<ShardId>* initShardsIds) { + return notYetImplemented; +} + +Status CatalogManagerReplicaSet::createDatabase(const std::string& dbName) { + return notYetImplemented; +} + +StatusWith<string> CatalogManagerReplicaSet::addShard(const string& name, + const ConnectionString& shardConnectionString, + const long long maxSize) { + return notYetImplemented; +} + +StatusWith<ShardDrainingStatus> CatalogManagerReplicaSet::removeShard(OperationContext* txn, + const std::string& name) { + return notYetImplemented; +} + +Status CatalogManagerReplicaSet::updateDatabase(const std::string& dbName, const DatabaseType& db) { + fassert(28684, db.validate()); + + return notYetImplemented; +} + +StatusWith<DatabaseType> CatalogManagerReplicaSet::getDatabase(const std::string& dbName) { + invariant(nsIsDbOnly(dbName)); + + // The two databases that are hosted on the config server are config and admin + if (dbName == "config" || dbName == "admin") { + DatabaseType dbt; + dbt.setName(dbName); + dbt.setSharded(false); + dbt.setPrimary("config"); + + return dbt; } - Status CatalogManagerReplicaSet::shardCollection(const string& ns, - const ShardKeyPattern& fieldsAndOrder, - bool unique, - vector<BSONObj>* initPoints, - set<ShardId>* initShardsIds) { - return notYetImplemented; + const auto configShard = grid.shardRegistry()->getShard("config"); + const auto readHost = configShard->getTargeter()->findHost(kConfigReadSelector); + if (!readHost.isOK()) { + return readHost.getStatus(); } - Status CatalogManagerReplicaSet::createDatabase(const std::string& dbName) { - return notYetImplemented; - } + auto findStatus = grid.shardRegistry()->exhaustiveFind(readHost.getValue(), + NamespaceString(DatabaseType::ConfigNS), + BSON(DatabaseType::name(dbName)), + 1); - StatusWith<string> CatalogManagerReplicaSet::addShard( - const string& name, - const ConnectionString& shardConnectionString, - const long long maxSize) { - return notYetImplemented; + if (!findStatus.isOK()) { + return findStatus.getStatus(); } - StatusWith<ShardDrainingStatus> CatalogManagerReplicaSet::removeShard(OperationContext* txn, - const std::string& name) { - return notYetImplemented; + const auto& docs = findStatus.getValue(); + if (docs.empty()) { + return {ErrorCodes::NamespaceNotFound, stream() << "database " << dbName << " not found"}; } - Status CatalogManagerReplicaSet::updateDatabase(const std::string& dbName, - const DatabaseType& db) { - fassert(28684, db.validate()); - - return notYetImplemented; + invariant(docs.size() == 1); + + return DatabaseType::fromBSON(docs.front()); +} + +Status CatalogManagerReplicaSet::updateCollection(const std::string& collNs, + const CollectionType& coll) { + fassert(28683, coll.validate()); + + BatchedCommandResponse response; + Status status = update(CollectionType::ConfigNS, + BSON(CollectionType::fullNs(collNs)), + coll.toBSON(), + true, // upsert + false, // multi + &response); + if (!status.isOK()) { + return Status(status.code(), + str::stream() << "collection metadata write failed: " << response.toBSON() + << "; status: " << status.toString()); } - StatusWith<DatabaseType> CatalogManagerReplicaSet::getDatabase(const std::string& dbName) { - invariant(nsIsDbOnly(dbName)); - - // The two databases that are hosted on the config server are config and admin - if (dbName == "config" || dbName == "admin") { - DatabaseType dbt; - dbt.setName(dbName); - dbt.setSharded(false); - dbt.setPrimary("config"); + return Status::OK(); +} - return dbt; - } - - const auto configShard = grid.shardRegistry()->getShard("config"); - const auto readHost = configShard->getTargeter()->findHost(kConfigReadSelector); - if (!readHost.isOK()) { - return readHost.getStatus(); - } - - auto findStatus = grid.shardRegistry()->exhaustiveFind( - readHost.getValue(), - NamespaceString(DatabaseType::ConfigNS), - BSON(DatabaseType::name(dbName)), - 1); - - if (!findStatus.isOK()) { - return findStatus.getStatus(); - } +StatusWith<CollectionType> CatalogManagerReplicaSet::getCollection(const std::string& collNs) { + auto configShard = grid.shardRegistry()->getShard("config"); - const auto& docs = findStatus.getValue(); - if (docs.empty()) { - return {ErrorCodes::NamespaceNotFound, - stream() << "database " << dbName << " not found"}; - } + auto readHostStatus = configShard->getTargeter()->findHost(kConfigReadSelector); + if (!readHostStatus.isOK()) { + return readHostStatus.getStatus(); + } - invariant(docs.size() == 1); + auto statusFind = + grid.shardRegistry()->exhaustiveFind(readHostStatus.getValue(), + NamespaceString(CollectionType::ConfigNS), + BSON(CollectionType::fullNs(collNs)), + 1); - return DatabaseType::fromBSON(docs.front()); + if (!statusFind.isOK()) { + return statusFind.getStatus(); } - Status CatalogManagerReplicaSet::updateCollection(const std::string& collNs, - const CollectionType& coll) { - fassert(28683, coll.validate()); - - BatchedCommandResponse response; - Status status = update(CollectionType::ConfigNS, - BSON(CollectionType::fullNs(collNs)), - coll.toBSON(), - true, // upsert - false, // multi - &response); - if (!status.isOK()) { - return Status(status.code(), - str::stream() << "collection metadata write failed: " - << response.toBSON() << "; status: " << status.toString()); - } - - return Status::OK(); + const auto& retVal = statusFind.getValue(); + if (retVal.empty()) { + return Status(ErrorCodes::NamespaceNotFound, + stream() << "collection " << collNs << " not found"); } - StatusWith<CollectionType> CatalogManagerReplicaSet::getCollection(const std::string& collNs) { - auto configShard = grid.shardRegistry()->getShard("config"); + invariant(retVal.size() == 1); - auto readHostStatus = configShard->getTargeter()->findHost(kConfigReadSelector); - if (!readHostStatus.isOK()) { - return readHostStatus.getStatus(); - } + return CollectionType::fromBSON(retVal.front()); +} - auto statusFind = grid.shardRegistry()->exhaustiveFind( - readHostStatus.getValue(), - NamespaceString(CollectionType::ConfigNS), - BSON(CollectionType::fullNs(collNs)), - 1); +Status CatalogManagerReplicaSet::getCollections(const std::string* dbName, + std::vector<CollectionType>* collections) { + return notYetImplemented; +} - if (!statusFind.isOK()) { - return statusFind.getStatus(); - } +Status CatalogManagerReplicaSet::dropCollection(const std::string& collectionNs) { + return notYetImplemented; +} - const auto& retVal = statusFind.getValue(); - if (retVal.empty()) { - return Status(ErrorCodes::NamespaceNotFound, - stream() << "collection " << collNs << " not found"); - } +void CatalogManagerReplicaSet::logAction(const ActionLogType& actionLog) {} - invariant(retVal.size() == 1); +void CatalogManagerReplicaSet::logChange(OperationContext* opCtx, + const string& what, + const string& ns, + const BSONObj& detail) {} - return CollectionType::fromBSON(retVal.front()); +StatusWith<SettingsType> CatalogManagerReplicaSet::getGlobalSettings(const string& key) { + const auto configShard = grid.shardRegistry()->getShard("config"); + const auto readHost = configShard->getTargeter()->findHost(kConfigReadSelector); + if (!readHost.isOK()) { + return readHost.getStatus(); } - Status CatalogManagerReplicaSet::getCollections(const std::string* dbName, - std::vector<CollectionType>* collections) { - return notYetImplemented; - } + auto findStatus = grid.shardRegistry()->exhaustiveFind(readHost.getValue(), + NamespaceString(SettingsType::ConfigNS), + BSON(SettingsType::key(key)), + 1); - Status CatalogManagerReplicaSet::dropCollection(const std::string& collectionNs) { - return notYetImplemented; + if (!findStatus.isOK()) { + return findStatus.getStatus(); } - void CatalogManagerReplicaSet::logAction(const ActionLogType& actionLog) { - + const auto& docs = findStatus.getValue(); + if (docs.empty()) { + return {ErrorCodes::NoMatchingDocument, + str::stream() << "can't find settings document with key: " << key}; } - void CatalogManagerReplicaSet::logChange(OperationContext* opCtx, - const string& what, - const string& ns, - const BSONObj& detail) { + BSONObj settingsDoc = docs.front(); + StatusWith<SettingsType> settingsResult = SettingsType::fromBSON(settingsDoc); + if (!settingsResult.isOK()) { + return {ErrorCodes::FailedToParse, + str::stream() << "error while parsing settings document: " << settingsDoc << " : " + << settingsResult.getStatus().toString()}; } - StatusWith<SettingsType> CatalogManagerReplicaSet::getGlobalSettings(const string& key) { - const auto configShard = grid.shardRegistry()->getShard("config"); - const auto readHost = configShard->getTargeter()->findHost(kConfigReadSelector); - if (!readHost.isOK()) { - return readHost.getStatus(); - } - - auto findStatus = grid.shardRegistry()->exhaustiveFind( - readHost.getValue(), - NamespaceString(SettingsType::ConfigNS), - BSON(SettingsType::key(key)), - 1); - - if (!findStatus.isOK()) { - return findStatus.getStatus(); - } - - const auto& docs = findStatus.getValue(); - if (docs.empty()) { - return {ErrorCodes::NoMatchingDocument, - str::stream() << "can't find settings document with key: " << key}; - } - - BSONObj settingsDoc = docs.front(); - StatusWith<SettingsType> settingsResult = SettingsType::fromBSON(settingsDoc); - if (!settingsResult.isOK()) { - return {ErrorCodes::FailedToParse, - str::stream() << "error while parsing settings document: " << settingsDoc - << " : " << settingsResult.getStatus().toString()}; - } - - const SettingsType& settings = settingsResult.getValue(); + const SettingsType& settings = settingsResult.getValue(); - Status validationStatus = settings.validate(); - if (!validationStatus.isOK()) { - return validationStatus; - } - - return settingsResult; + Status validationStatus = settings.validate(); + if (!validationStatus.isOK()) { + return validationStatus; } - Status CatalogManagerReplicaSet::getDatabasesForShard(const string& shardName, - vector<string>* dbs) { - return notYetImplemented; + return settingsResult; +} + +Status CatalogManagerReplicaSet::getDatabasesForShard(const string& shardName, + vector<string>* dbs) { + return notYetImplemented; +} + +Status CatalogManagerReplicaSet::getChunks(const Query& query, + int nToReturn, + vector<ChunkType>* chunks) { + auto configShard = grid.shardRegistry()->getShard("config"); + auto readHostStatus = configShard->getTargeter()->findHost(kConfigReadSelector); + if (!readHostStatus.isOK()) { + return readHostStatus.getStatus(); } - Status CatalogManagerReplicaSet::getChunks(const Query& query, - int nToReturn, - vector<ChunkType>* chunks) { + auto findStatus = grid.shardRegistry()->exhaustiveFind(readHostStatus.getValue(), + NamespaceString(ChunkType::ConfigNS), + query.obj, + boost::none); // no limit + if (!findStatus.isOK()) { + return findStatus.getStatus(); + } - auto configShard = grid.shardRegistry()->getShard("config"); - auto readHostStatus = configShard->getTargeter()->findHost(kConfigReadSelector); - if (!readHostStatus.isOK()) { - return readHostStatus.getStatus(); + for (const BSONObj& obj : findStatus.getValue()) { + auto chunkRes = ChunkType::fromBSON(obj); + if (!chunkRes.isOK()) { + chunks->clear(); + return {ErrorCodes::FailedToParse, + stream() << "Failed to parse chunk with id (" + << obj[ChunkType::name()].toString() + << "): " << chunkRes.getStatus().reason()}; } - auto findStatus = grid.shardRegistry()->exhaustiveFind(readHostStatus.getValue(), - NamespaceString(ChunkType::ConfigNS), - query.obj, - boost::none); // no limit - if (!findStatus.isOK()) { - return findStatus.getStatus(); - } + chunks->push_back(chunkRes.getValue()); + } - for (const BSONObj& obj : findStatus.getValue()) { - auto chunkRes = ChunkType::fromBSON(obj); - if (!chunkRes.isOK()) { - chunks->clear(); - return {ErrorCodes::FailedToParse, - stream() << "Failed to parse chunk with id (" - << obj[ChunkType::name()].toString() << "): " - << chunkRes.getStatus().reason()}; - } + return Status::OK(); +} - chunks->push_back(chunkRes.getValue()); - } +Status CatalogManagerReplicaSet::getTagsForCollection(const std::string& collectionNs, + std::vector<TagsType>* tags) { + return notYetImplemented; +} - return Status::OK(); - } +StatusWith<string> CatalogManagerReplicaSet::getTagForChunk(const std::string& collectionNs, + const ChunkType& chunk) { + return notYetImplemented; +} - Status CatalogManagerReplicaSet::getTagsForCollection(const std::string& collectionNs, - std::vector<TagsType>* tags) { - return notYetImplemented; +Status CatalogManagerReplicaSet::getAllShards(vector<ShardType>* shards) { + const auto configShard = grid.shardRegistry()->getShard("config"); + const auto readHost = configShard->getTargeter()->findHost(kConfigReadSelector); + if (!readHost.isOK()) { + return readHost.getStatus(); } - StatusWith<string> CatalogManagerReplicaSet::getTagForChunk(const std::string& collectionNs, - const ChunkType& chunk) { - return notYetImplemented; + auto findStatus = grid.shardRegistry()->exhaustiveFind(readHost.getValue(), + NamespaceString(ShardType::ConfigNS), + BSONObj(), // no query filter + boost::none); // no limit + if (!findStatus.isOK()) { + return findStatus.getStatus(); } - Status CatalogManagerReplicaSet::getAllShards(vector<ShardType>* shards) { - const auto configShard = grid.shardRegistry()->getShard("config"); - const auto readHost = configShard->getTargeter()->findHost(kConfigReadSelector); - if (!readHost.isOK()) { - return readHost.getStatus(); - } - - auto findStatus = grid.shardRegistry()->exhaustiveFind(readHost.getValue(), - NamespaceString(ShardType::ConfigNS), - BSONObj(), // no query filter - boost::none); // no limit - if (!findStatus.isOK()) { - return findStatus.getStatus(); - } - - for (const BSONObj& doc : findStatus.getValue()) { - auto shardRes = ShardType::fromBSON(doc); - if (!shardRes.isOK()) { - shards->clear(); - return {ErrorCodes::FailedToParse, - stream() << "Failed to parse shard with id (" - << doc[ShardType::name()].toString() << "): " - << shardRes.getStatus().reason()}; - } - - shards->push_back(shardRes.getValue()); + for (const BSONObj& doc : findStatus.getValue()) { + auto shardRes = ShardType::fromBSON(doc); + if (!shardRes.isOK()) { + shards->clear(); + return {ErrorCodes::FailedToParse, + stream() << "Failed to parse shard with id (" + << doc[ShardType::name()].toString() + << "): " << shardRes.getStatus().reason()}; } - return Status::OK(); - } - - bool CatalogManagerReplicaSet::isShardHost(const ConnectionString& connectionString) { - return false; + shards->push_back(shardRes.getValue()); } - bool CatalogManagerReplicaSet::runUserManagementWriteCommand(const std::string& commandName, - const std::string& dbname, - const BSONObj& cmdObj, - BSONObjBuilder* result) { - auto scopedDistLock = getDistLockManager()->lock("authorizationData", - commandName, - Seconds{5}); - if (!scopedDistLock.isOK()) { - return Command::appendCommandStatus(*result, scopedDistLock.getStatus()); - } - - auto targeter = grid.shardRegistry()->getShard("config")->getTargeter(); + return Status::OK(); +} - Status notMasterStatus{ErrorCodes::InternalError, "status not set"}; - for (int i = 0; i < kNotMasterNumRetries; ++i) { +bool CatalogManagerReplicaSet::isShardHost(const ConnectionString& connectionString) { + return false; +} - auto target = targeter->findHost(kConfigWriteSelector); - if (!target.isOK()) { - if (ErrorCodes::NotMaster == target.getStatus()) { - notMasterStatus = target.getStatus(); - sleepmillis(kNotMasterRetryInterval.count()); - continue; - } - return Command::appendCommandStatus(*result, target.getStatus()); - } +bool CatalogManagerReplicaSet::runUserManagementWriteCommand(const std::string& commandName, + const std::string& dbname, + const BSONObj& cmdObj, + BSONObjBuilder* result) { + auto scopedDistLock = getDistLockManager()->lock("authorizationData", commandName, Seconds{5}); + if (!scopedDistLock.isOK()) { + return Command::appendCommandStatus(*result, scopedDistLock.getStatus()); + } - auto response = grid.shardRegistry()->runCommand(target.getValue(), dbname, cmdObj); - if (!response.isOK()) { - return Command::appendCommandStatus(*result, response.getStatus()); - } + auto targeter = grid.shardRegistry()->getShard("config")->getTargeter(); - Status commandStatus = Command::getStatusFromCommandResult(response.getValue()); - if (ErrorCodes::NotMaster == commandStatus) { - notMasterStatus = commandStatus; + Status notMasterStatus{ErrorCodes::InternalError, "status not set"}; + for (int i = 0; i < kNotMasterNumRetries; ++i) { + auto target = targeter->findHost(kConfigWriteSelector); + if (!target.isOK()) { + if (ErrorCodes::NotMaster == target.getStatus()) { + notMasterStatus = target.getStatus(); sleepmillis(kNotMasterRetryInterval.count()); continue; } - - result->appendElements(response.getValue()); - - return commandStatus.isOK(); + return Command::appendCommandStatus(*result, target.getStatus()); } - invariant(ErrorCodes::NotMaster == notMasterStatus); - return Command::appendCommandStatus(*result, notMasterStatus); - } - - bool CatalogManagerReplicaSet::runUserManagementReadCommand(const std::string& dbname, - const BSONObj& cmdObj, - BSONObjBuilder* result) { - auto targeter = grid.shardRegistry()->getShard("config")->getTargeter(); - auto target = targeter->findHost(kConfigReadSelector); - if (!target.isOK()) { - return Command::appendCommandStatus(*result, target.getStatus()); + auto response = grid.shardRegistry()->runCommand(target.getValue(), dbname, cmdObj); + if (!response.isOK()) { + return Command::appendCommandStatus(*result, response.getStatus()); } - auto resultStatus = grid.shardRegistry()->runCommand(target.getValue(), dbname, cmdObj); - if (!resultStatus.isOK()) { - return Command::appendCommandStatus(*result, resultStatus.getStatus()); + Status commandStatus = Command::getStatusFromCommandResult(response.getValue()); + if (ErrorCodes::NotMaster == commandStatus) { + notMasterStatus = commandStatus; + sleepmillis(kNotMasterRetryInterval.count()); + continue; } - result->appendElements(resultStatus.getValue()); + result->appendElements(response.getValue()); - return Command::getStatusFromCommandResult(resultStatus.getValue()).isOK(); - return false; + return commandStatus.isOK(); } - Status CatalogManagerReplicaSet::applyChunkOpsDeprecated(const BSONArray& updateOps, - const BSONArray& preCondition) { - return notYetImplemented; + invariant(ErrorCodes::NotMaster == notMasterStatus); + return Command::appendCommandStatus(*result, notMasterStatus); +} + +bool CatalogManagerReplicaSet::runUserManagementReadCommand(const std::string& dbname, + const BSONObj& cmdObj, + BSONObjBuilder* result) { + auto targeter = grid.shardRegistry()->getShard("config")->getTargeter(); + auto target = targeter->findHost(kConfigReadSelector); + if (!target.isOK()) { + return Command::appendCommandStatus(*result, target.getStatus()); } - DistLockManager* CatalogManagerReplicaSet::getDistLockManager() { - invariant(_distLockManager); - return _distLockManager.get(); + auto resultStatus = grid.shardRegistry()->runCommand(target.getValue(), dbname, cmdObj); + if (!resultStatus.isOK()) { + return Command::appendCommandStatus(*result, resultStatus.getStatus()); } - void CatalogManagerReplicaSet::writeConfigServerDirect( - const BatchedCommandRequest& batchRequest, - BatchedCommandResponse* batchResponse) { - std::string dbname = batchRequest.getNSS().db().toString(); - invariant (dbname == "config" || dbname == "admin"); - const BSONObj cmdObj = batchRequest.toBSON(); - auto targeter = grid.shardRegistry()->getShard("config")->getTargeter(); - - Status notMasterStatus{ErrorCodes::InternalError, "status not set"}; - for (int i = 0; i < kNotMasterNumRetries; ++i) { - - auto target = targeter->findHost(kConfigWriteSelector); - if (!target.isOK()) { - if (ErrorCodes::NotMaster == target.getStatus()) { - notMasterStatus = target.getStatus(); - sleepmillis(kNotMasterRetryInterval.count()); - continue; - } - _toBatchError(target.getStatus(), batchResponse); - return; - } + result->appendElements(resultStatus.getValue()); - auto resultStatus = grid.shardRegistry()->runCommand(target.getValue(), - batchRequest.getNSS().db().toString(), - batchRequest.toBSON()); + return Command::getStatusFromCommandResult(resultStatus.getValue()).isOK(); + return false; +} - if (!resultStatus.isOK()) { - _toBatchError(resultStatus.getStatus(), batchResponse); - return; - } +Status CatalogManagerReplicaSet::applyChunkOpsDeprecated(const BSONArray& updateOps, + const BSONArray& preCondition) { + return notYetImplemented; +} + +DistLockManager* CatalogManagerReplicaSet::getDistLockManager() { + invariant(_distLockManager); + return _distLockManager.get(); +} - const BSONObj& commandResponse = resultStatus.getValue(); +void CatalogManagerReplicaSet::writeConfigServerDirect(const BatchedCommandRequest& batchRequest, + BatchedCommandResponse* batchResponse) { + std::string dbname = batchRequest.getNSS().db().toString(); + invariant(dbname == "config" || dbname == "admin"); + const BSONObj cmdObj = batchRequest.toBSON(); + auto targeter = grid.shardRegistry()->getShard("config")->getTargeter(); - Status commandStatus = getStatusFromCommandResult(commandResponse); - if (commandStatus == ErrorCodes::NotMaster) { - notMasterStatus = commandStatus; + Status notMasterStatus{ErrorCodes::InternalError, "status not set"}; + for (int i = 0; i < kNotMasterNumRetries; ++i) { + auto target = targeter->findHost(kConfigWriteSelector); + if (!target.isOK()) { + if (ErrorCodes::NotMaster == target.getStatus()) { + notMasterStatus = target.getStatus(); sleepmillis(kNotMasterRetryInterval.count()); continue; } + _toBatchError(target.getStatus(), batchResponse); + return; + } - string errmsg; - if (!batchResponse->parseBSON(commandResponse, &errmsg)) { - _toBatchError(Status(ErrorCodes::FailedToParse, - str::stream() << "Failed to parse config server response: " << - errmsg), - batchResponse); - return; - } - return; // The normal case return point. + auto resultStatus = grid.shardRegistry()->runCommand( + target.getValue(), batchRequest.getNSS().db().toString(), batchRequest.toBSON()); + + if (!resultStatus.isOK()) { + _toBatchError(resultStatus.getStatus(), batchResponse); + return; } - invariant(ErrorCodes::NotMaster == notMasterStatus); - _toBatchError(notMasterStatus, batchResponse); + const BSONObj& commandResponse = resultStatus.getValue(); + + Status commandStatus = getStatusFromCommandResult(commandResponse); + if (commandStatus == ErrorCodes::NotMaster) { + notMasterStatus = commandStatus; + sleepmillis(kNotMasterRetryInterval.count()); + continue; + } + + string errmsg; + if (!batchResponse->parseBSON(commandResponse, &errmsg)) { + _toBatchError( + Status(ErrorCodes::FailedToParse, + str::stream() << "Failed to parse config server response: " << errmsg), + batchResponse); + return; + } + return; // The normal case return point. } -} // namespace mongo + invariant(ErrorCodes::NotMaster == notMasterStatus); + _toBatchError(notMasterStatus, batchResponse); +} + +} // namespace mongo diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set.h b/src/mongo/s/catalog/replset/catalog_manager_replica_set.h index 9b85640d1fb..c4b93093c26 100644 --- a/src/mongo/s/catalog/replset/catalog_manager_replica_set.h +++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set.h @@ -39,115 +39,111 @@ namespace mongo { +/** + * Implements the catalog manager for talking to replica set config servers. + */ +class CatalogManagerReplicaSet final : public CatalogManager { +public: + CatalogManagerReplicaSet(); + virtual ~CatalogManagerReplicaSet(); + /** - * Implements the catalog manager for talking to replica set config servers. + * Initializes the catalog manager. + * Can only be called once for the lifetime of the catalog manager. + * TODO(spencer): Take pointer to ShardRegistry rather than getting it from the global + * "grid" object. */ - class CatalogManagerReplicaSet final : public CatalogManager { - public: - CatalogManagerReplicaSet(); - virtual ~CatalogManagerReplicaSet(); - - /** - * Initializes the catalog manager. - * Can only be called once for the lifetime of the catalog manager. - * TODO(spencer): Take pointer to ShardRegistry rather than getting it from the global - * "grid" object. - */ - Status init(const ConnectionString& configCS, - std::unique_ptr<DistLockManager> distLockManager); - - Status startup(bool upgrade) override; + Status init(const ConnectionString& configCS, std::unique_ptr<DistLockManager> distLockManager); - ConnectionString connectionString() const override; + Status startup(bool upgrade) override; - void shutDown() override; + ConnectionString connectionString() const override; - Status enableSharding(const std::string& dbName) override; + void shutDown() override; - Status shardCollection(const std::string& ns, - const ShardKeyPattern& fieldsAndOrder, - bool unique, - std::vector<BSONObj>* initPoints, - std::set<ShardId>* initShardsIds = nullptr) override; + Status enableSharding(const std::string& dbName) override; - StatusWith<std::string> addShard(const std::string& name, - const ConnectionString& shardConnectionString, - const long long maxSize) override; + Status shardCollection(const std::string& ns, + const ShardKeyPattern& fieldsAndOrder, + bool unique, + std::vector<BSONObj>* initPoints, + std::set<ShardId>* initShardsIds = nullptr) override; - StatusWith<ShardDrainingStatus> removeShard(OperationContext* txn, - const std::string& name) override; + StatusWith<std::string> addShard(const std::string& name, + const ConnectionString& shardConnectionString, + const long long maxSize) override; - Status createDatabase(const std::string& dbName) override; + StatusWith<ShardDrainingStatus> removeShard(OperationContext* txn, + const std::string& name) override; - Status updateDatabase(const std::string& dbName, const DatabaseType& db) override; + Status createDatabase(const std::string& dbName) override; - StatusWith<DatabaseType> getDatabase(const std::string& dbName) override; + Status updateDatabase(const std::string& dbName, const DatabaseType& db) override; - Status updateCollection(const std::string& collNs, const CollectionType& coll) override; + StatusWith<DatabaseType> getDatabase(const std::string& dbName) override; - StatusWith<CollectionType> getCollection(const std::string& collNs) override; + Status updateCollection(const std::string& collNs, const CollectionType& coll) override; - Status getCollections(const std::string* dbName, - std::vector<CollectionType>* collections) override; + StatusWith<CollectionType> getCollection(const std::string& collNs) override; - Status dropCollection(const std::string& collectionNs) override; + Status getCollections(const std::string* dbName, + std::vector<CollectionType>* collections) override; - Status getDatabasesForShard(const std::string& shardName, - std::vector<std::string>* dbs) override; + Status dropCollection(const std::string& collectionNs) override; - Status getChunks(const Query& query, - int nToReturn, - std::vector<ChunkType>* chunks) override; + Status getDatabasesForShard(const std::string& shardName, + std::vector<std::string>* dbs) override; - Status getTagsForCollection(const std::string& collectionNs, - std::vector<TagsType>* tags) override; + Status getChunks(const Query& query, int nToReturn, std::vector<ChunkType>* chunks) override; - StatusWith<std::string> getTagForChunk(const std::string& collectionNs, - const ChunkType& chunk) override; + Status getTagsForCollection(const std::string& collectionNs, + std::vector<TagsType>* tags) override; - Status getAllShards(std::vector<ShardType>* shards) override; + StatusWith<std::string> getTagForChunk(const std::string& collectionNs, + const ChunkType& chunk) override; - bool isShardHost(const ConnectionString& shardConnectionString) override; + Status getAllShards(std::vector<ShardType>* shards) override; - bool runUserManagementWriteCommand(const std::string& commandName, - const std::string& dbname, - const BSONObj& cmdObj, - BSONObjBuilder* result) override; + bool isShardHost(const ConnectionString& shardConnectionString) override; - bool runUserManagementReadCommand(const std::string& dbname, - const BSONObj& cmdObj, - BSONObjBuilder* result) override; + bool runUserManagementWriteCommand(const std::string& commandName, + const std::string& dbname, + const BSONObj& cmdObj, + BSONObjBuilder* result) override; - Status applyChunkOpsDeprecated(const BSONArray& updateOps, - const BSONArray& preCondition) override; + bool runUserManagementReadCommand(const std::string& dbname, + const BSONObj& cmdObj, + BSONObjBuilder* result) override; - void logAction(const ActionLogType& actionLog) override; + Status applyChunkOpsDeprecated(const BSONArray& updateOps, + const BSONArray& preCondition) override; - void logChange(OperationContext* txn, - const std::string& what, - const std::string& ns, - const BSONObj& detail) override; + void logAction(const ActionLogType& actionLog) override; - StatusWith<SettingsType> getGlobalSettings(const std::string& key) override; + void logChange(OperationContext* txn, + const std::string& what, + const std::string& ns, + const BSONObj& detail) override; - void writeConfigServerDirect(const BatchedCommandRequest& request, - BatchedCommandResponse* response) override; + StatusWith<SettingsType> getGlobalSettings(const std::string& key) override; - DistLockManager* getDistLockManager() override; + void writeConfigServerDirect(const BatchedCommandRequest& request, + BatchedCommandResponse* response) override; - private: + DistLockManager* getDistLockManager() override; - // Config server connection string - ConnectionString _configServerConnectionString; +private: + // Config server connection string + ConnectionString _configServerConnectionString; - // Distribted lock manager singleton. - std::unique_ptr<DistLockManager> _distLockManager; + // Distribted lock manager singleton. + std::unique_ptr<DistLockManager> _distLockManager; - // protects _inShutdown - std::mutex _mutex; + // protects _inShutdown + std::mutex _mutex; - // True if shutDown() has been called. False, otherwise. - bool _inShutdown = false; - }; + // True if shutDown() has been called. False, otherwise. + bool _inShutdown = false; +}; -} // namespace mongo +} // namespace mongo diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set_test.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set_test.cpp index 7ba386e12f0..2d551a855f5 100644 --- a/src/mongo/s/catalog/replset/catalog_manager_replica_set_test.cpp +++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set_test.cpp @@ -56,357 +56,473 @@ namespace mongo { namespace { - using executor::NetworkInterfaceMock; - using executor::TaskExecutor; - using std::async; - using std::string; - using std::vector; - using stdx::chrono::milliseconds; - using unittest::assertGet; - - static const std::chrono::seconds kFutureTimeout{5}; - - TEST_F(CatalogManagerReplSetTestFixture, GetCollectionExisting) { - RemoteCommandTargeterMock* targeter = - RemoteCommandTargeterMock::get(shardRegistry()->getShard("config")->getTargeter()); - targeter->setFindHostReturnValue(HostAndPort("TestHost1")); - - CollectionType expectedColl; - expectedColl.setNs(NamespaceString("TestDB.TestNS")); - expectedColl.setKeyPattern(BSON("KeyName" << 1)); - expectedColl.setUpdatedAt(Date_t()); - expectedColl.setEpoch(OID::gen()); - - auto future = async(std::launch::async, [this, &expectedColl] { - return assertGet(catalogManager()->getCollection(expectedColl.getNs())); - }); - - onFindCommand([&expectedColl](const RemoteCommandRequest& request) { - const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String()); - ASSERT_EQ(nss.toString(), CollectionType::ConfigNS); - - auto query = assertGet( - LiteParsedQuery::makeFromFindCommand(nss, request.cmdObj, false)); - - // Ensure the query is correct - ASSERT_EQ(query->ns(), CollectionType::ConfigNS); - ASSERT_EQ(query->getFilter(), BSON(CollectionType::fullNs(expectedColl.getNs()))); - - return vector<BSONObj>{ expectedColl.toBSON() }; - }); - - // Now wait for the getCollection call to return - const auto& actualColl = future.get(); - ASSERT_EQ(expectedColl.toBSON(), actualColl.toBSON()); - } - - TEST_F(CatalogManagerReplSetTestFixture, GetCollectionNotExisting) { - RemoteCommandTargeterMock* targeter = - RemoteCommandTargeterMock::get(shardRegistry()->getShard("config")->getTargeter()); - targeter->setFindHostReturnValue(HostAndPort("TestHost1")); - - auto future = async(std::launch::async, [this] { - auto status = catalogManager()->getCollection("NonExistent"); - ASSERT_EQUALS(status.getStatus(), ErrorCodes::NamespaceNotFound); - }); - - onFindCommand([](const RemoteCommandRequest& request) { - return vector<BSONObj>{ }; - }); - - // Now wait for the getCollection call to return - future.get(); - } - - TEST_F(CatalogManagerReplSetTestFixture, GetDatabaseExisting) { - RemoteCommandTargeterMock* targeter = - RemoteCommandTargeterMock::get(shardRegistry()->getShard("config")->getTargeter()); - targeter->setFindHostReturnValue(HostAndPort("TestHost1")); - - DatabaseType expectedDb; - expectedDb.setName("bigdata"); - expectedDb.setPrimary("shard0000"); - expectedDb.setSharded(true); - - auto future = async(std::launch::async, [this, &expectedDb] { - return assertGet(catalogManager()->getDatabase(expectedDb.getName())); - }); - - onFindCommand([&expectedDb](const RemoteCommandRequest& request) { - const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String()); - ASSERT_EQ(nss.toString(), DatabaseType::ConfigNS); - - auto query = assertGet( - LiteParsedQuery::makeFromFindCommand(nss, request.cmdObj, false)); - - ASSERT_EQ(query->ns(), DatabaseType::ConfigNS); - ASSERT_EQ(query->getFilter(), BSON(DatabaseType::name(expectedDb.getName()))); +using executor::NetworkInterfaceMock; +using executor::TaskExecutor; +using std::async; +using std::string; +using std::vector; +using stdx::chrono::milliseconds; +using unittest::assertGet; + +static const std::chrono::seconds kFutureTimeout{5}; + +TEST_F(CatalogManagerReplSetTestFixture, GetCollectionExisting) { + RemoteCommandTargeterMock* targeter = + RemoteCommandTargeterMock::get(shardRegistry()->getShard("config")->getTargeter()); + targeter->setFindHostReturnValue(HostAndPort("TestHost1")); + + CollectionType expectedColl; + expectedColl.setNs(NamespaceString("TestDB.TestNS")); + expectedColl.setKeyPattern(BSON("KeyName" << 1)); + expectedColl.setUpdatedAt(Date_t()); + expectedColl.setEpoch(OID::gen()); + + auto future = async(std::launch::async, + [this, &expectedColl] { + return assertGet(catalogManager()->getCollection(expectedColl.getNs())); + }); + + onFindCommand([&expectedColl](const RemoteCommandRequest& request) { + const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String()); + ASSERT_EQ(nss.toString(), CollectionType::ConfigNS); + + auto query = assertGet(LiteParsedQuery::makeFromFindCommand(nss, request.cmdObj, false)); + + // Ensure the query is correct + ASSERT_EQ(query->ns(), CollectionType::ConfigNS); + ASSERT_EQ(query->getFilter(), BSON(CollectionType::fullNs(expectedColl.getNs()))); + + return vector<BSONObj>{expectedColl.toBSON()}; + }); + + // Now wait for the getCollection call to return + const auto& actualColl = future.get(); + ASSERT_EQ(expectedColl.toBSON(), actualColl.toBSON()); +} + +TEST_F(CatalogManagerReplSetTestFixture, GetCollectionNotExisting) { + RemoteCommandTargeterMock* targeter = + RemoteCommandTargeterMock::get(shardRegistry()->getShard("config")->getTargeter()); + targeter->setFindHostReturnValue(HostAndPort("TestHost1")); + + auto future = async(std::launch::async, + [this] { + auto status = catalogManager()->getCollection("NonExistent"); + ASSERT_EQUALS(status.getStatus(), ErrorCodes::NamespaceNotFound); + }); + + onFindCommand([](const RemoteCommandRequest& request) { return vector<BSONObj>{}; }); + + // Now wait for the getCollection call to return + future.get(); +} + +TEST_F(CatalogManagerReplSetTestFixture, GetDatabaseExisting) { + RemoteCommandTargeterMock* targeter = + RemoteCommandTargeterMock::get(shardRegistry()->getShard("config")->getTargeter()); + targeter->setFindHostReturnValue(HostAndPort("TestHost1")); + + DatabaseType expectedDb; + expectedDb.setName("bigdata"); + expectedDb.setPrimary("shard0000"); + expectedDb.setSharded(true); + + auto future = async(std::launch::async, + [this, &expectedDb] { + return assertGet(catalogManager()->getDatabase(expectedDb.getName())); + }); + + onFindCommand([&expectedDb](const RemoteCommandRequest& request) { + const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String()); + ASSERT_EQ(nss.toString(), DatabaseType::ConfigNS); + + auto query = assertGet(LiteParsedQuery::makeFromFindCommand(nss, request.cmdObj, false)); + + ASSERT_EQ(query->ns(), DatabaseType::ConfigNS); + ASSERT_EQ(query->getFilter(), BSON(DatabaseType::name(expectedDb.getName()))); + + return vector<BSONObj>{expectedDb.toBSON()}; + }); + + const auto& actualDb = future.get(); + ASSERT_EQ(expectedDb.toBSON(), actualDb.toBSON()); +} + +TEST_F(CatalogManagerReplSetTestFixture, GetDatabaseNotExisting) { + RemoteCommandTargeterMock* targeter = + RemoteCommandTargeterMock::get(shardRegistry()->getShard("config")->getTargeter()); + targeter->setFindHostReturnValue(HostAndPort("TestHost1")); + + auto future = async(std::launch::async, + [this] { + auto dbResult = catalogManager()->getDatabase("NonExistent"); + ASSERT_EQ(dbResult.getStatus(), ErrorCodes::NamespaceNotFound); + }); + + onFindCommand([](const RemoteCommandRequest& request) { return vector<BSONObj>{}; }); + + future.get(); +} + +TEST_F(CatalogManagerReplSetTestFixture, UpdateCollection) { + RemoteCommandTargeterMock* targeter = + RemoteCommandTargeterMock::get(shardRegistry()->getShard("config")->getTargeter()); + targeter->setFindHostReturnValue(HostAndPort("TestHost1")); + + CollectionType collection; + collection.setNs(NamespaceString("db.coll")); + collection.setUpdatedAt(network()->now()); + collection.setUnique(true); + collection.setEpoch(OID::gen()); + collection.setKeyPattern(KeyPattern(BSON("_id" << 1))); + + auto future = async(std::launch::async, + [this, collection] { + auto status = catalogManager()->updateCollection( + collection.getNs().toString(), collection); + ASSERT_OK(status); + }); + + onCommand([collection](const RemoteCommandRequest& request) { + ASSERT_EQUALS("config", request.dbname); + + BatchedUpdateRequest actualBatchedUpdate; + std::string errmsg; + ASSERT_TRUE(actualBatchedUpdate.parseBSON(request.cmdObj, &errmsg)); + ASSERT_EQUALS(CollectionType::ConfigNS, actualBatchedUpdate.getCollName()); + auto updates = actualBatchedUpdate.getUpdates(); + ASSERT_EQUALS(1U, updates.size()); + auto update = updates.front(); + + ASSERT_TRUE(update->getUpsert()); + ASSERT_FALSE(update->getMulti()); + ASSERT_EQUALS(update->getQuery(), + BSON(CollectionType::fullNs(collection.getNs().toString()))); + ASSERT_EQUALS(update->getUpdateExpr(), collection.toBSON()); + + BatchedCommandResponse response; + response.setOk(true); + response.setNModified(1); + + return response.toBSON(); + }); + + // Now wait for the updateCollection call to return + future.wait_for(kFutureTimeout); +} + +TEST_F(CatalogManagerReplSetTestFixture, UpdateCollectionNotMaster) { + RemoteCommandTargeterMock* targeter = + RemoteCommandTargeterMock::get(shardRegistry()->getShard("config")->getTargeter()); + targeter->setFindHostReturnValue(HostAndPort("TestHost1")); + + CollectionType collection; + collection.setNs(NamespaceString("db.coll")); + collection.setUpdatedAt(network()->now()); + collection.setUnique(true); + collection.setEpoch(OID::gen()); + collection.setKeyPattern(KeyPattern(BSON("_id" << 1))); + + auto future = async(std::launch::async, + [this, collection] { + auto status = catalogManager()->updateCollection( + collection.getNs().toString(), collection); + ASSERT_EQUALS(ErrorCodes::NotMaster, status); + }); + + for (int i = 0; i < 3; ++i) { + onCommand([](const RemoteCommandRequest& request) { + BatchedCommandResponse response; + response.setOk(false); + response.setErrCode(ErrorCodes::NotMaster); + response.setErrMessage("not master"); - return vector<BSONObj>{ expectedDb.toBSON() }; + return response.toBSON(); }); - - const auto& actualDb = future.get(); - ASSERT_EQ(expectedDb.toBSON(), actualDb.toBSON()); } - TEST_F(CatalogManagerReplSetTestFixture, GetDatabaseNotExisting) { - RemoteCommandTargeterMock* targeter = - RemoteCommandTargeterMock::get(shardRegistry()->getShard("config")->getTargeter()); - targeter->setFindHostReturnValue(HostAndPort("TestHost1")); - - auto future = async(std::launch::async, [this] { - auto dbResult = catalogManager()->getDatabase("NonExistent"); - ASSERT_EQ(dbResult.getStatus(), ErrorCodes::NamespaceNotFound); - }); - - onFindCommand([](const RemoteCommandRequest& request) { - return vector<BSONObj>{ }; - }); - - future.get(); + // Now wait for the updateCollection call to return + future.wait_for(kFutureTimeout); +} + +TEST_F(CatalogManagerReplSetTestFixture, UpdateCollectionNotMasterRetrySuccess) { + RemoteCommandTargeterMock* targeter = + RemoteCommandTargeterMock::get(shardRegistry()->getShard("config")->getTargeter()); + HostAndPort host1("TestHost1"); + HostAndPort host2("TestHost2"); + targeter->setFindHostReturnValue(host1); + + CollectionType collection; + collection.setNs(NamespaceString("db.coll")); + collection.setUpdatedAt(network()->now()); + collection.setUnique(true); + collection.setEpoch(OID::gen()); + collection.setKeyPattern(KeyPattern(BSON("_id" << 1))); + + auto future = async(std::launch::async, + [this, collection] { + auto status = catalogManager()->updateCollection( + collection.getNs().toString(), collection); + ASSERT_OK(status); + }); + + onCommand([host1, host2, targeter](const RemoteCommandRequest& request) { + ASSERT_EQUALS(host1, request.target); + + BatchedCommandResponse response; + response.setOk(false); + response.setErrCode(ErrorCodes::NotMaster); + response.setErrMessage("not master"); + + // Ensure that when the catalog manager tries to retarget after getting the + // NotMaster response, it will get back a new target. + targeter->setFindHostReturnValue(host2); + return response.toBSON(); + }); + + onCommand([host2, collection](const RemoteCommandRequest& request) { + ASSERT_EQUALS(host2, request.target); + + BatchedUpdateRequest actualBatchedUpdate; + std::string errmsg; + ASSERT_TRUE(actualBatchedUpdate.parseBSON(request.cmdObj, &errmsg)); + ASSERT_EQUALS(CollectionType::ConfigNS, actualBatchedUpdate.getCollName()); + auto updates = actualBatchedUpdate.getUpdates(); + ASSERT_EQUALS(1U, updates.size()); + auto update = updates.front(); + + ASSERT_TRUE(update->getUpsert()); + ASSERT_FALSE(update->getMulti()); + ASSERT_EQUALS(update->getQuery(), + BSON(CollectionType::fullNs(collection.getNs().toString()))); + ASSERT_EQUALS(update->getUpdateExpr(), collection.toBSON()); + + BatchedCommandResponse response; + response.setOk(true); + response.setNModified(1); + + return response.toBSON(); + }); + + // Now wait for the updateCollection call to return + future.wait_for(kFutureTimeout); +} + +TEST_F(CatalogManagerReplSetTestFixture, GetAllShardsValid) { + RemoteCommandTargeterMock* targeter = + RemoteCommandTargeterMock::get(shardRegistry()->getShard("config")->getTargeter()); + targeter->setFindHostReturnValue(HostAndPort("TestHost1")); + + ShardType s1; + s1.setName("shard0000"); + s1.setHost("ShardHost"); + s1.setDraining(false); + s1.setMaxSizeMB(50); + s1.setTags({"tag1", "tag2", "tag3"}); + + ShardType s2; + s2.setName("shard0001"); + s2.setHost("ShardHost"); + + ShardType s3; + s3.setName("shard0002"); + s3.setHost("ShardHost"); + s3.setMaxSizeMB(65); + + const vector<ShardType> expectedShardsList = {s1, s2, s3}; + + auto future = async(std::launch::async, + [this] { + vector<ShardType> shards; + ASSERT_OK(catalogManager()->getAllShards(&shards)); + return shards; + }); + + onFindCommand([&s1, &s2, &s3](const RemoteCommandRequest& request) { + const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String()); + ASSERT_EQ(nss.toString(), ShardType::ConfigNS); + + auto query = assertGet(LiteParsedQuery::makeFromFindCommand(nss, request.cmdObj, false)); + + ASSERT_EQ(query->ns(), ShardType::ConfigNS); + ASSERT_EQ(query->getFilter(), BSONObj()); + + return vector<BSONObj>{s1.toBSON(), s2.toBSON(), s3.toBSON()}; + }); + + const vector<ShardType> actualShardsList = future.get(); + ASSERT_EQ(actualShardsList.size(), expectedShardsList.size()); + + for (size_t i = 0; i < actualShardsList.size(); ++i) { + ASSERT_EQ(actualShardsList[i].toBSON(), expectedShardsList[i].toBSON()); } +} - TEST_F(CatalogManagerReplSetTestFixture, UpdateCollection) { - RemoteCommandTargeterMock* targeter = - RemoteCommandTargeterMock::get(shardRegistry()->getShard("config")->getTargeter()); - targeter->setFindHostReturnValue(HostAndPort("TestHost1")); - - CollectionType collection; - collection.setNs(NamespaceString("db.coll")); - collection.setUpdatedAt(network()->now()); - collection.setUnique(true); - collection.setEpoch(OID::gen()); - collection.setKeyPattern(KeyPattern(BSON("_id" << 1))); - - auto future = async(std::launch::async, [this, collection] { - auto status = catalogManager()->updateCollection(collection.getNs().toString(), - collection); - ASSERT_OK(status); - }); - - onCommand([collection](const RemoteCommandRequest& request) { - ASSERT_EQUALS("config", request.dbname); - - BatchedUpdateRequest actualBatchedUpdate; - std::string errmsg; - ASSERT_TRUE(actualBatchedUpdate.parseBSON(request.cmdObj, &errmsg)); - ASSERT_EQUALS(CollectionType::ConfigNS, actualBatchedUpdate.getCollName()); - auto updates = actualBatchedUpdate.getUpdates(); - ASSERT_EQUALS(1U, updates.size()); - auto update = updates.front(); +TEST_F(CatalogManagerReplSetTestFixture, GetAllShardsWithInvalidShard) { + RemoteCommandTargeterMock* targeter = + RemoteCommandTargeterMock::get(shardRegistry()->getShard("config")->getTargeter()); + targeter->setFindHostReturnValue(HostAndPort("TestHost1")); - ASSERT_TRUE(update->getUpsert()); - ASSERT_FALSE(update->getMulti()); - ASSERT_EQUALS(update->getQuery(), - BSON(CollectionType::fullNs(collection.getNs().toString()))); - ASSERT_EQUALS(update->getUpdateExpr(), collection.toBSON()); + auto future = async(std::launch::async, + [this] { + vector<ShardType> shards; + Status status = catalogManager()->getAllShards(&shards); - BatchedCommandResponse response; - response.setOk(true); - response.setNModified(1); + ASSERT_NOT_OK(status); + ASSERT_EQ(0U, shards.size()); + }); - return response.toBSON(); - }); + onFindCommand([](const RemoteCommandRequest& request) { + const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String()); + ASSERT_EQ(nss.toString(), ShardType::ConfigNS); - // Now wait for the updateCollection call to return - future.wait_for(kFutureTimeout); - } + auto query = assertGet(LiteParsedQuery::makeFromFindCommand(nss, request.cmdObj, false)); - TEST_F(CatalogManagerReplSetTestFixture, UpdateCollectionNotMaster) { - RemoteCommandTargeterMock* targeter = - RemoteCommandTargeterMock::get(shardRegistry()->getShard("config")->getTargeter()); - targeter->setFindHostReturnValue(HostAndPort("TestHost1")); - - CollectionType collection; - collection.setNs(NamespaceString("db.coll")); - collection.setUpdatedAt(network()->now()); - collection.setUnique(true); - collection.setEpoch(OID::gen()); - collection.setKeyPattern(KeyPattern(BSON("_id" << 1))); - - auto future = async(std::launch::async, [this, collection] { - auto status = catalogManager()->updateCollection(collection.getNs().toString(), - collection); - ASSERT_EQUALS(ErrorCodes::NotMaster, status); - }); + ASSERT_EQ(query->ns(), ShardType::ConfigNS); + ASSERT_EQ(query->getFilter(), BSONObj()); - for (int i = 0; i < 3; ++i) { - onCommand([](const RemoteCommandRequest& request) { - BatchedCommandResponse response; - response.setOk(false); - response.setErrCode(ErrorCodes::NotMaster); - response.setErrMessage("not master"); + // valid ShardType + ShardType s1; + s1.setName("shard0001"); + s1.setHost("ShardHost"); - return response.toBSON(); - }); - } + return vector<BSONObj>{ + s1.toBSON(), + BSONObj() // empty document is invalid + }; + }); - // Now wait for the updateCollection call to return - future.wait_for(kFutureTimeout); - } + future.get(); +} - TEST_F(CatalogManagerReplSetTestFixture, UpdateCollectionNotMasterRetrySuccess) { - RemoteCommandTargeterMock* targeter = - RemoteCommandTargeterMock::get(shardRegistry()->getShard("config")->getTargeter()); - HostAndPort host1("TestHost1"); - HostAndPort host2("TestHost2"); - targeter->setFindHostReturnValue(host1); - - CollectionType collection; - collection.setNs(NamespaceString("db.coll")); - collection.setUpdatedAt(network()->now()); - collection.setUnique(true); - collection.setEpoch(OID::gen()); - collection.setKeyPattern(KeyPattern(BSON("_id" << 1))); - - auto future = async(std::launch::async, [this, collection] { - auto status = catalogManager()->updateCollection(collection.getNs().toString(), - collection); - ASSERT_OK(status); - }); +TEST_F(CatalogManagerReplSetTestFixture, GetChunksForNS) { + RemoteCommandTargeterMock* targeter = + RemoteCommandTargeterMock::get(shardRegistry()->getShard("config")->getTargeter()); + targeter->setFindHostReturnValue(HostAndPort("TestHost1")); - onCommand([host1, host2, targeter](const RemoteCommandRequest& request) { - ASSERT_EQUALS(host1, request.target); + OID oid = OID::gen(); - BatchedCommandResponse response; - response.setOk(false); - response.setErrCode(ErrorCodes::NotMaster); - response.setErrMessage("not master"); + ChunkType chunkA; + chunkA.setName("chunk0000"); + chunkA.setNS("TestDB.TestColl"); + chunkA.setMin(BSON("a" << 1)); + chunkA.setMax(BSON("a" << 100)); + chunkA.setVersion({1, 2, oid}); + chunkA.setShard("shard0000"); - // Ensure that when the catalog manager tries to retarget after getting the - // NotMaster response, it will get back a new target. - targeter->setFindHostReturnValue(host2); - return response.toBSON(); - }); + ChunkType chunkB; + chunkB.setName("chunk0001"); + chunkB.setNS("TestDB.TestColl"); + chunkB.setMin(BSON("a" << 100)); + chunkB.setMax(BSON("a" << 200)); + chunkB.setVersion({3, 4, oid}); + chunkB.setShard("shard0001"); - onCommand([host2, collection](const RemoteCommandRequest& request) { - ASSERT_EQUALS(host2, request.target); + ChunkVersion queryChunkVersion({1, 2, oid}); - BatchedUpdateRequest actualBatchedUpdate; - std::string errmsg; - ASSERT_TRUE(actualBatchedUpdate.parseBSON(request.cmdObj, &errmsg)); - ASSERT_EQUALS(CollectionType::ConfigNS, actualBatchedUpdate.getCollName()); - auto updates = actualBatchedUpdate.getUpdates(); - ASSERT_EQUALS(1U, updates.size()); - auto update = updates.front(); + const Query chunksQuery( + BSON(ChunkType::ns("TestDB.TestColl") + << ChunkType::DEPRECATED_lastmod() + << BSON("$gte" << static_cast<long long>(queryChunkVersion.toLong())))); - ASSERT_TRUE(update->getUpsert()); - ASSERT_FALSE(update->getMulti()); - ASSERT_EQUALS(update->getQuery(), - BSON(CollectionType::fullNs(collection.getNs().toString()))); - ASSERT_EQUALS(update->getUpdateExpr(), collection.toBSON()); + auto future = async(std::launch::async, + [this, &chunksQuery] { + vector<ChunkType> chunks; - BatchedCommandResponse response; - response.setOk(true); - response.setNModified(1); + ASSERT_OK(catalogManager()->getChunks(chunksQuery, 0, &chunks)); + ASSERT_EQ(2U, chunks.size()); - return response.toBSON(); - }); + return chunks; + }); - // Now wait for the updateCollection call to return - future.wait_for(kFutureTimeout); - } + onFindCommand([&chunksQuery, chunkA, chunkB](const RemoteCommandRequest& request) { + const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String()); + ASSERT_EQ(nss.toString(), ChunkType::ConfigNS); - TEST_F(CatalogManagerReplSetTestFixture, GetAllShardsValid) { - RemoteCommandTargeterMock* targeter = - RemoteCommandTargeterMock::get(shardRegistry()->getShard("config")->getTargeter()); - targeter->setFindHostReturnValue(HostAndPort("TestHost1")); + auto query = assertGet(LiteParsedQuery::makeFromFindCommand(nss, request.cmdObj, false)); - ShardType s1; - s1.setName("shard0000"); - s1.setHost("ShardHost"); - s1.setDraining(false); - s1.setMaxSizeMB(50); - s1.setTags({ "tag1", "tag2", "tag3" }); + ASSERT_EQ(query->ns(), ChunkType::ConfigNS); + ASSERT_EQ(query->getFilter(), chunksQuery.getFilter()); - ShardType s2; - s2.setName("shard0001"); - s2.setHost("ShardHost"); + return vector<BSONObj>{chunkA.toBSON(), chunkB.toBSON()}; + }); - ShardType s3; - s3.setName("shard0002"); - s3.setHost("ShardHost"); - s3.setMaxSizeMB(65); + const auto& chunks = future.get(); + ASSERT_EQ(chunkA.toBSON(), chunks[0].toBSON()); + ASSERT_EQ(chunkB.toBSON(), chunks[1].toBSON()); +} - const vector<ShardType> expectedShardsList = { s1, s2, s3 }; +TEST_F(CatalogManagerReplSetTestFixture, GetChunksForNSNoChunks) { + RemoteCommandTargeterMock* targeter = + RemoteCommandTargeterMock::get(shardRegistry()->getShard("config")->getTargeter()); + targeter->setFindHostReturnValue(HostAndPort("TestHost1")); - auto future = async(std::launch::async, [this] { - vector<ShardType> shards; - ASSERT_OK(catalogManager()->getAllShards(&shards)); - return shards; - }); + ChunkVersion queryChunkVersion({1, 2, OID::gen()}); - onFindCommand([&s1, &s2, &s3](const RemoteCommandRequest& request) { - const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String()); - ASSERT_EQ(nss.toString(), ShardType::ConfigNS); + const Query chunksQuery( + BSON(ChunkType::ns("TestDB.TestColl") + << ChunkType::DEPRECATED_lastmod() + << BSON("$gte" << static_cast<long long>(queryChunkVersion.toLong())))); - auto query = assertGet( - LiteParsedQuery::makeFromFindCommand(nss, request.cmdObj, false)); + auto future = async(std::launch::async, + [this, &chunksQuery] { + vector<ChunkType> chunks; - ASSERT_EQ(query->ns(), ShardType::ConfigNS); - ASSERT_EQ(query->getFilter(), BSONObj()); + ASSERT_OK(catalogManager()->getChunks(chunksQuery, 0, &chunks)); + ASSERT_EQ(0U, chunks.size()); + }); - return vector<BSONObj>{ s1.toBSON(), s2.toBSON(), s3.toBSON() }; - }); + onFindCommand([&chunksQuery](const RemoteCommandRequest& request) { + const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String()); + ASSERT_EQ(nss.toString(), ChunkType::ConfigNS); - const vector<ShardType> actualShardsList = future.get(); - ASSERT_EQ(actualShardsList.size(), expectedShardsList.size()); + auto query = assertGet(LiteParsedQuery::makeFromFindCommand(nss, request.cmdObj, false)); - for (size_t i = 0; i < actualShardsList.size(); ++i) { - ASSERT_EQ(actualShardsList[i].toBSON(), expectedShardsList[i].toBSON()); - } - } + ASSERT_EQ(query->ns(), ChunkType::ConfigNS); + ASSERT_EQ(query->getFilter(), chunksQuery.getFilter()); - TEST_F(CatalogManagerReplSetTestFixture, GetAllShardsWithInvalidShard) { - RemoteCommandTargeterMock* targeter = - RemoteCommandTargeterMock::get(shardRegistry()->getShard("config")->getTargeter()); - targeter->setFindHostReturnValue(HostAndPort("TestHost1")); + return vector<BSONObj>{}; + }); - auto future = async(std::launch::async, [this] { - vector<ShardType> shards; - Status status = catalogManager()->getAllShards(&shards); + future.get(); +} - ASSERT_NOT_OK(status); - ASSERT_EQ(0U, shards.size()); - }); +TEST_F(CatalogManagerReplSetTestFixture, GetChunksForNSInvalidChunk) { + RemoteCommandTargeterMock* targeter = + RemoteCommandTargeterMock::get(shardRegistry()->getShard("config")->getTargeter()); + targeter->setFindHostReturnValue(HostAndPort("TestHost1")); - onFindCommand([](const RemoteCommandRequest& request) { - const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String()); - ASSERT_EQ(nss.toString(), ShardType::ConfigNS); + ChunkVersion queryChunkVersion({1, 2, OID::gen()}); - auto query = assertGet( - LiteParsedQuery::makeFromFindCommand(nss, request.cmdObj, false)); + const Query chunksQuery( + BSON(ChunkType::ns("TestDB.TestColl") + << ChunkType::DEPRECATED_lastmod() + << BSON("$gte" << static_cast<long long>(queryChunkVersion.toLong())))); - ASSERT_EQ(query->ns(), ShardType::ConfigNS); - ASSERT_EQ(query->getFilter(), BSONObj()); + auto future = async(std::launch::async, + [this, &chunksQuery] { + vector<ChunkType> chunks; + Status status = catalogManager()->getChunks(chunksQuery, 0, &chunks); - // valid ShardType - ShardType s1; - s1.setName("shard0001"); - s1.setHost("ShardHost"); + ASSERT_EQUALS(ErrorCodes::FailedToParse, status); + ASSERT_EQ(0U, chunks.size()); + }); - return vector<BSONObj> { - s1.toBSON(), - BSONObj() // empty document is invalid - }; - }); + onFindCommand([&chunksQuery](const RemoteCommandRequest& request) { + const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String()); + ASSERT_EQ(nss.toString(), ChunkType::ConfigNS); - future.get(); - } - - TEST_F(CatalogManagerReplSetTestFixture, GetChunksForNS) { - RemoteCommandTargeterMock* targeter = - RemoteCommandTargeterMock::get(shardRegistry()->getShard("config")->getTargeter()); - targeter->setFindHostReturnValue(HostAndPort("TestHost1")); + auto query = assertGet(LiteParsedQuery::makeFromFindCommand(nss, request.cmdObj, false)); - OID oid = OID::gen(); + ASSERT_EQ(query->ns(), ChunkType::ConfigNS); + ASSERT_EQ(query->getFilter(), chunksQuery.getFilter()); ChunkType chunkA; chunkA.setName("chunk0000"); chunkA.setNS("TestDB.TestColl"); chunkA.setMin(BSON("a" << 1)); chunkA.setMax(BSON("a" << 100)); - chunkA.setVersion({ 1, 2, oid }); + chunkA.setVersion({1, 2, OID::gen()}); chunkA.setShard("shard0000"); ChunkType chunkB; @@ -414,450 +530,355 @@ namespace { chunkB.setNS("TestDB.TestColl"); chunkB.setMin(BSON("a" << 100)); chunkB.setMax(BSON("a" << 200)); - chunkB.setVersion({ 3, 4, oid }); - chunkB.setShard("shard0001"); - - ChunkVersion queryChunkVersion({ 1, 2, oid }); - - const Query chunksQuery(BSON(ChunkType::ns("TestDB.TestColl") << - ChunkType::DEPRECATED_lastmod() << - BSON("$gte" << static_cast<long long>( - queryChunkVersion.toLong())))); - - auto future = async(std::launch::async, [this, &chunksQuery] { - vector<ChunkType> chunks; - - ASSERT_OK(catalogManager()->getChunks(chunksQuery, 0, &chunks)); - ASSERT_EQ(2U, chunks.size()); - - return chunks; - }); - - onFindCommand([&chunksQuery, chunkA, chunkB](const RemoteCommandRequest& request) { - const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String()); - ASSERT_EQ(nss.toString(), ChunkType::ConfigNS); - - auto query = assertGet( - LiteParsedQuery::makeFromFindCommand(nss, request.cmdObj, false)); - - ASSERT_EQ(query->ns(), ChunkType::ConfigNS); - ASSERT_EQ(query->getFilter(), chunksQuery.getFilter()); - - return vector<BSONObj>{ chunkA.toBSON(), chunkB.toBSON() }; - }); - - const auto& chunks = future.get(); - ASSERT_EQ(chunkA.toBSON(), chunks[0].toBSON()); - ASSERT_EQ(chunkB.toBSON(), chunks[1].toBSON()); - } - - TEST_F(CatalogManagerReplSetTestFixture, GetChunksForNSNoChunks) { - RemoteCommandTargeterMock* targeter = - RemoteCommandTargeterMock::get(shardRegistry()->getShard("config")->getTargeter()); - targeter->setFindHostReturnValue(HostAndPort("TestHost1")); - - ChunkVersion queryChunkVersion({ 1, 2, OID::gen() }); - - const Query chunksQuery(BSON(ChunkType::ns("TestDB.TestColl") << - ChunkType::DEPRECATED_lastmod() << - BSON("$gte" << static_cast<long long>( - queryChunkVersion.toLong())))); - - auto future = async(std::launch::async, [this, &chunksQuery] { - vector<ChunkType> chunks; - - ASSERT_OK(catalogManager()->getChunks(chunksQuery, 0, &chunks)); - ASSERT_EQ(0U, chunks.size()); - }); - - onFindCommand([&chunksQuery](const RemoteCommandRequest& request) { - const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String()); - ASSERT_EQ(nss.toString(), ChunkType::ConfigNS); - - auto query = assertGet( - LiteParsedQuery::makeFromFindCommand(nss, request.cmdObj, false)); - - ASSERT_EQ(query->ns(), ChunkType::ConfigNS); - ASSERT_EQ(query->getFilter(), chunksQuery.getFilter()); - - return vector<BSONObj>{ }; - }); - - future.get(); - } - - TEST_F(CatalogManagerReplSetTestFixture, GetChunksForNSInvalidChunk) { - RemoteCommandTargeterMock* targeter = - RemoteCommandTargeterMock::get(shardRegistry()->getShard("config")->getTargeter()); - targeter->setFindHostReturnValue(HostAndPort("TestHost1")); - - ChunkVersion queryChunkVersion({ 1, 2, OID::gen() }); - - const Query chunksQuery(BSON(ChunkType::ns("TestDB.TestColl") << - ChunkType::DEPRECATED_lastmod() << - BSON("$gte" << static_cast<long long>( - queryChunkVersion.toLong())))); - - auto future = async(std::launch::async, [this, &chunksQuery] { - vector<ChunkType> chunks; - Status status = catalogManager()->getChunks(chunksQuery, 0, &chunks); - - ASSERT_EQUALS(ErrorCodes::FailedToParse, status); - ASSERT_EQ(0U, chunks.size()); - }); - - onFindCommand([&chunksQuery](const RemoteCommandRequest& request) { - const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String()); - ASSERT_EQ(nss.toString(), ChunkType::ConfigNS); - - auto query = assertGet( - LiteParsedQuery::makeFromFindCommand(nss, request.cmdObj, false)); - - ASSERT_EQ(query->ns(), ChunkType::ConfigNS); - ASSERT_EQ(query->getFilter(), chunksQuery.getFilter()); - - ChunkType chunkA; - chunkA.setName("chunk0000"); - chunkA.setNS("TestDB.TestColl"); - chunkA.setMin(BSON("a" << 1)); - chunkA.setMax(BSON("a" << 100)); - chunkA.setVersion({ 1, 2, OID::gen() }); - chunkA.setShard("shard0000"); - - ChunkType chunkB; - chunkB.setName("chunk0001"); - chunkB.setNS("TestDB.TestColl"); - chunkB.setMin(BSON("a" << 100)); - chunkB.setMax(BSON("a" << 200)); - chunkB.setVersion({ 3, 4, OID::gen() }); - // Missing shard id - - return vector<BSONObj>{ chunkA.toBSON(), chunkB.toBSON() }; - }); - - future.get(); - } - - TEST_F(CatalogManagerReplSetTestFixture, RunUserManagementReadCommand) { - RemoteCommandTargeterMock* targeter = - RemoteCommandTargeterMock::get(shardRegistry()->getShard("config")->getTargeter()); - targeter->setFindHostReturnValue(HostAndPort("TestHost1")); - - auto future = async(std::launch::async, [this] { - BSONObjBuilder responseBuilder; - bool ok = catalogManager()->runUserManagementReadCommand("test", - BSON("usersInfo" << 1), - &responseBuilder); - ASSERT_TRUE(ok); - - BSONObj response = responseBuilder.obj(); - ASSERT_TRUE(response["ok"].trueValue()); - auto users = response["users"].Array(); - ASSERT_EQUALS(0U, users.size()); - }); - - onCommand([](const RemoteCommandRequest& request) { - ASSERT_EQUALS("test", request.dbname); - ASSERT_EQUALS(BSON("usersInfo" << 1), request.cmdObj); - - return BSON("ok" << 1 << "users" << BSONArrayBuilder().arr()); - }); - - // Now wait for the runUserManagementReadCommand call to return - future.wait_for(kFutureTimeout); - } - - TEST_F(CatalogManagerReplSetTestFixture, RunUserManagementReadCommandUnsatisfiedReadPref) { - RemoteCommandTargeterMock* targeter = - RemoteCommandTargeterMock::get(shardRegistry()->getShard("config")->getTargeter()); - targeter->setFindHostReturnValue(Status(ErrorCodes::FailedToSatisfyReadPreference, - "no nodes up")); - - BSONObjBuilder responseBuilder; - bool ok = catalogManager()->runUserManagementReadCommand("test", - BSON("usersInfo" << 1), - &responseBuilder); - ASSERT_FALSE(ok); - - Status commandStatus = Command::getStatusFromCommandResult(responseBuilder.obj()); - ASSERT_EQUALS(ErrorCodes::FailedToSatisfyReadPreference, commandStatus); - } - - TEST_F(CatalogManagerReplSetTestFixture, RunUserManagementWriteCommandDistLockHeld) { - RemoteCommandTargeterMock* targeter = - RemoteCommandTargeterMock::get(shardRegistry()->getShard("config")->getTargeter()); - targeter->setFindHostReturnValue(HostAndPort("TestHost1")); - - distLock()->expectLock( - [](StringData name, - StringData whyMessage, - milliseconds waitFor, - milliseconds lockTryInterval) { + chunkB.setVersion({3, 4, OID::gen()}); + // Missing shard id + + return vector<BSONObj>{chunkA.toBSON(), chunkB.toBSON()}; + }); + + future.get(); +} + +TEST_F(CatalogManagerReplSetTestFixture, RunUserManagementReadCommand) { + RemoteCommandTargeterMock* targeter = + RemoteCommandTargeterMock::get(shardRegistry()->getShard("config")->getTargeter()); + targeter->setFindHostReturnValue(HostAndPort("TestHost1")); + + auto future = async(std::launch::async, + [this] { + BSONObjBuilder responseBuilder; + bool ok = catalogManager()->runUserManagementReadCommand( + "test", BSON("usersInfo" << 1), &responseBuilder); + ASSERT_TRUE(ok); + + BSONObj response = responseBuilder.obj(); + ASSERT_TRUE(response["ok"].trueValue()); + auto users = response["users"].Array(); + ASSERT_EQUALS(0U, users.size()); + }); + + onCommand([](const RemoteCommandRequest& request) { + ASSERT_EQUALS("test", request.dbname); + ASSERT_EQUALS(BSON("usersInfo" << 1), request.cmdObj); + + return BSON("ok" << 1 << "users" << BSONArrayBuilder().arr()); + }); + + // Now wait for the runUserManagementReadCommand call to return + future.wait_for(kFutureTimeout); +} + +TEST_F(CatalogManagerReplSetTestFixture, RunUserManagementReadCommandUnsatisfiedReadPref) { + RemoteCommandTargeterMock* targeter = + RemoteCommandTargeterMock::get(shardRegistry()->getShard("config")->getTargeter()); + targeter->setFindHostReturnValue( + Status(ErrorCodes::FailedToSatisfyReadPreference, "no nodes up")); + + BSONObjBuilder responseBuilder; + bool ok = catalogManager()->runUserManagementReadCommand( + "test", BSON("usersInfo" << 1), &responseBuilder); + ASSERT_FALSE(ok); + + Status commandStatus = Command::getStatusFromCommandResult(responseBuilder.obj()); + ASSERT_EQUALS(ErrorCodes::FailedToSatisfyReadPreference, commandStatus); +} + +TEST_F(CatalogManagerReplSetTestFixture, RunUserManagementWriteCommandDistLockHeld) { + RemoteCommandTargeterMock* targeter = + RemoteCommandTargeterMock::get(shardRegistry()->getShard("config")->getTargeter()); + targeter->setFindHostReturnValue(HostAndPort("TestHost1")); + + distLock()->expectLock( + [](StringData name, + StringData whyMessage, + milliseconds waitFor, + milliseconds lockTryInterval) { + ASSERT_EQUALS("authorizationData", name); + ASSERT_EQUALS("dropUser", whyMessage); + }, + Status(ErrorCodes::LockBusy, "lock already held")); + + BSONObjBuilder responseBuilder; + bool ok = catalogManager()->runUserManagementWriteCommand("dropUser", + "test", + BSON("dropUser" + << "test"), + &responseBuilder); + ASSERT_FALSE(ok); + BSONObj response = responseBuilder.obj(); + ASSERT_EQUALS(ErrorCodes::LockBusy, Command::getStatusFromCommandResult(response)); +} + +TEST_F(CatalogManagerReplSetTestFixture, RunUserManagementWriteCommandSuccess) { + RemoteCommandTargeterMock* targeter = + RemoteCommandTargeterMock::get(shardRegistry()->getShard("config")->getTargeter()); + targeter->setFindHostReturnValue(HostAndPort("TestHost1")); + + distLock()->expectLock( + [](StringData name, + StringData whyMessage, + milliseconds waitFor, + milliseconds lockTryInterval) { ASSERT_EQUALS("authorizationData", name); ASSERT_EQUALS("dropUser", whyMessage); - }, Status(ErrorCodes::LockBusy, "lock already held")); + }, + Status::OK()); + + auto future = + async(std::launch::async, + [this] { + BSONObjBuilder responseBuilder; + bool ok = catalogManager()->runUserManagementWriteCommand("dropUser", + "test", + BSON("dropUser" + << "test"), + &responseBuilder); + ASSERT_FALSE(ok); + + Status commandStatus = Command::getStatusFromCommandResult(responseBuilder.obj()); + ASSERT_EQUALS(ErrorCodes::UserNotFound, commandStatus); + }); + + onCommand([](const RemoteCommandRequest& request) { + ASSERT_EQUALS("test", request.dbname); + ASSERT_EQUALS(BSON("dropUser" + << "test"), + request.cmdObj); BSONObjBuilder responseBuilder; - bool ok = catalogManager()->runUserManagementWriteCommand("dropUser", - "test", - BSON("dropUser" << "test"), - &responseBuilder); - ASSERT_FALSE(ok); - BSONObj response = responseBuilder.obj(); - ASSERT_EQUALS(ErrorCodes::LockBusy, Command::getStatusFromCommandResult(response)); - } - - TEST_F(CatalogManagerReplSetTestFixture, RunUserManagementWriteCommandSuccess) { - RemoteCommandTargeterMock* targeter = - RemoteCommandTargeterMock::get(shardRegistry()->getShard("config")->getTargeter()); - targeter->setFindHostReturnValue(HostAndPort("TestHost1")); - - distLock()->expectLock( - [](StringData name, - StringData whyMessage, - milliseconds waitFor, - milliseconds lockTryInterval) { + Command::appendCommandStatus(responseBuilder, + Status(ErrorCodes::UserNotFound, "User test@test not found")); + return responseBuilder.obj(); + }); + + // Now wait for the runUserManagementWriteCommand call to return + future.wait_for(kFutureTimeout); +} + +TEST_F(CatalogManagerReplSetTestFixture, RunUserManagementWriteCommandNotMaster) { + RemoteCommandTargeterMock* targeter = + RemoteCommandTargeterMock::get(shardRegistry()->getShard("config")->getTargeter()); + targeter->setFindHostReturnValue(HostAndPort("TestHost1")); + + distLock()->expectLock( + [](StringData name, + StringData whyMessage, + milliseconds waitFor, + milliseconds lockTryInterval) { ASSERT_EQUALS("authorizationData", name); ASSERT_EQUALS("dropUser", whyMessage); - }, Status::OK()); - - auto future = async(std::launch::async, [this] { - BSONObjBuilder responseBuilder; - bool ok = catalogManager()->runUserManagementWriteCommand("dropUser", - "test", - BSON("dropUser" << "test"), - &responseBuilder); - ASSERT_FALSE(ok); - - Status commandStatus = Command::getStatusFromCommandResult(responseBuilder.obj()); - ASSERT_EQUALS(ErrorCodes::UserNotFound, commandStatus); - }); - + }, + Status::OK()); + + auto future = + async(std::launch::async, + [this] { + BSONObjBuilder responseBuilder; + bool ok = catalogManager()->runUserManagementWriteCommand("dropUser", + "test", + BSON("dropUser" + << "test"), + &responseBuilder); + ASSERT_FALSE(ok); + + Status commandStatus = Command::getStatusFromCommandResult(responseBuilder.obj()); + ASSERT_EQUALS(ErrorCodes::NotMaster, commandStatus); + }); + + for (int i = 0; i < 3; ++i) { onCommand([](const RemoteCommandRequest& request) { - ASSERT_EQUALS("test", request.dbname); - ASSERT_EQUALS(BSON("dropUser" << "test"), request.cmdObj); - BSONObjBuilder responseBuilder; Command::appendCommandStatus(responseBuilder, - Status(ErrorCodes::UserNotFound, - "User test@test not found")); + Status(ErrorCodes::NotMaster, "not master")); return responseBuilder.obj(); }); - - // Now wait for the runUserManagementWriteCommand call to return - future.wait_for(kFutureTimeout); } - TEST_F(CatalogManagerReplSetTestFixture, RunUserManagementWriteCommandNotMaster) { - RemoteCommandTargeterMock* targeter = - RemoteCommandTargeterMock::get(shardRegistry()->getShard("config")->getTargeter()); - targeter->setFindHostReturnValue(HostAndPort("TestHost1")); - - distLock()->expectLock( - [](StringData name, - StringData whyMessage, - milliseconds waitFor, - milliseconds lockTryInterval) { + // Now wait for the runUserManagementWriteCommand call to return + future.wait_for(kFutureTimeout); +} + +TEST_F(CatalogManagerReplSetTestFixture, RunUserManagementWriteCommandNotMasterRetrySuccess) { + HostAndPort host1("TestHost1"); + HostAndPort host2("TestHost2"); + RemoteCommandTargeterMock* targeter = + RemoteCommandTargeterMock::get(shardRegistry()->getShard("config")->getTargeter()); + targeter->setFindHostReturnValue(host1); + + distLock()->expectLock( + [](StringData name, + StringData whyMessage, + milliseconds waitFor, + milliseconds lockTryInterval) { ASSERT_EQUALS("authorizationData", name); ASSERT_EQUALS("dropUser", whyMessage); - }, Status::OK()); - - auto future = async(std::launch::async, [this] { - BSONObjBuilder responseBuilder; - bool ok = catalogManager()->runUserManagementWriteCommand("dropUser", - "test", - BSON("dropUser" << "test"), - &responseBuilder); - ASSERT_FALSE(ok); - - Status commandStatus = Command::getStatusFromCommandResult(responseBuilder.obj()); - ASSERT_EQUALS(ErrorCodes::NotMaster, commandStatus); - }); - - for (int i = 0; i < 3; ++i) { - onCommand([](const RemoteCommandRequest& request) { - BSONObjBuilder responseBuilder; - Command::appendCommandStatus(responseBuilder, - Status(ErrorCodes::NotMaster, "not master")); - return responseBuilder.obj(); - }); - } - - // Now wait for the runUserManagementWriteCommand call to return - future.wait_for(kFutureTimeout); - } + }, + Status::OK()); + + auto future = + async(std::launch::async, + [this] { + BSONObjBuilder responseBuilder; + bool ok = catalogManager()->runUserManagementWriteCommand("dropUser", + "test", + BSON("dropUser" + << "test"), + &responseBuilder); + ASSERT_TRUE(ok); + + Status commandStatus = Command::getStatusFromCommandResult(responseBuilder.obj()); + ASSERT_OK(commandStatus); + }); + + onCommand([targeter, host1, host2](const RemoteCommandRequest& request) { + ASSERT_EQUALS(host1, request.target); - TEST_F(CatalogManagerReplSetTestFixture, RunUserManagementWriteCommandNotMasterRetrySuccess) { - HostAndPort host1("TestHost1"); - HostAndPort host2("TestHost2"); - RemoteCommandTargeterMock* targeter = - RemoteCommandTargeterMock::get(shardRegistry()->getShard("config")->getTargeter()); - targeter->setFindHostReturnValue(host1); - - distLock()->expectLock( - [](StringData name, - StringData whyMessage, - milliseconds waitFor, - milliseconds lockTryInterval) { - ASSERT_EQUALS("authorizationData", name); - ASSERT_EQUALS("dropUser", whyMessage); - }, Status::OK()); - - auto future = async(std::launch::async, [this] { - BSONObjBuilder responseBuilder; - bool ok = catalogManager()->runUserManagementWriteCommand("dropUser", - "test", - BSON("dropUser" << "test"), - &responseBuilder); - ASSERT_TRUE(ok); - - Status commandStatus = Command::getStatusFromCommandResult(responseBuilder.obj()); - ASSERT_OK(commandStatus); - }); - - onCommand([targeter, host1, host2](const RemoteCommandRequest& request) { - ASSERT_EQUALS(host1, request.target); - - BSONObjBuilder responseBuilder; - Command::appendCommandStatus(responseBuilder, - Status(ErrorCodes::NotMaster, "not master")); - - // Ensure that when the catalog manager tries to retarget after getting the - // NotMaster response, it will get back a new target. - targeter->setFindHostReturnValue(host2); - return responseBuilder.obj(); - }); - - onCommand([host2](const RemoteCommandRequest& request) { - ASSERT_EQUALS(host2, request.target); - ASSERT_EQUALS("test", request.dbname); - ASSERT_EQUALS(BSON("dropUser" << "test"), request.cmdObj); - - return BSON("ok" << 1); - }); + BSONObjBuilder responseBuilder; + Command::appendCommandStatus(responseBuilder, Status(ErrorCodes::NotMaster, "not master")); + + // Ensure that when the catalog manager tries to retarget after getting the + // NotMaster response, it will get back a new target. + targeter->setFindHostReturnValue(host2); + return responseBuilder.obj(); + }); - // Now wait for the runUserManagementWriteCommand call to return - future.wait_for(kFutureTimeout); - } + onCommand([host2](const RemoteCommandRequest& request) { + ASSERT_EQUALS(host2, request.target); + ASSERT_EQUALS("test", request.dbname); + ASSERT_EQUALS(BSON("dropUser" + << "test"), + request.cmdObj); - TEST_F(CatalogManagerReplSetTestFixture, GetGlobalSettingsBalancerDoc) { - RemoteCommandTargeterMock* targeter = - RemoteCommandTargeterMock::get(shardRegistry()->getShard("config")->getTargeter()); - targeter->setFindHostReturnValue(HostAndPort("TestHost1")); + return BSON("ok" << 1); + }); - // sample balancer doc - SettingsType st1; - st1.setKey(SettingsType::BalancerDocKey); - st1.setBalancerStopped(true); + // Now wait for the runUserManagementWriteCommand call to return + future.wait_for(kFutureTimeout); +} - auto future = async(std::launch::async, [this] { - return assertGet(catalogManager()->getGlobalSettings(SettingsType::BalancerDocKey)); - }); +TEST_F(CatalogManagerReplSetTestFixture, GetGlobalSettingsBalancerDoc) { + RemoteCommandTargeterMock* targeter = + RemoteCommandTargeterMock::get(shardRegistry()->getShard("config")->getTargeter()); + targeter->setFindHostReturnValue(HostAndPort("TestHost1")); + + // sample balancer doc + SettingsType st1; + st1.setKey(SettingsType::BalancerDocKey); + st1.setBalancerStopped(true); + + auto future = async(std::launch::async, + [this] { + return assertGet( + catalogManager()->getGlobalSettings(SettingsType::BalancerDocKey)); + }); - onFindCommand([st1](const RemoteCommandRequest& request) { - const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String()); - ASSERT_EQ(nss.toString(), SettingsType::ConfigNS); + onFindCommand([st1](const RemoteCommandRequest& request) { + const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String()); + ASSERT_EQ(nss.toString(), SettingsType::ConfigNS); - auto query = assertGet( - LiteParsedQuery::makeFromFindCommand(nss, request.cmdObj, false)); + auto query = assertGet(LiteParsedQuery::makeFromFindCommand(nss, request.cmdObj, false)); - ASSERT_EQ(query->ns(), SettingsType::ConfigNS); - ASSERT_EQ(query->getFilter(), BSON(SettingsType::key(SettingsType::BalancerDocKey))); + ASSERT_EQ(query->ns(), SettingsType::ConfigNS); + ASSERT_EQ(query->getFilter(), BSON(SettingsType::key(SettingsType::BalancerDocKey))); - return vector<BSONObj>{ st1.toBSON() }; - }); + return vector<BSONObj>{st1.toBSON()}; + }); - const auto& actualBalSettings = future.get(); - ASSERT_EQ(actualBalSettings.toBSON(), st1.toBSON()); - } + const auto& actualBalSettings = future.get(); + ASSERT_EQ(actualBalSettings.toBSON(), st1.toBSON()); +} - TEST_F(CatalogManagerReplSetTestFixture, GetGlobalSettingsChunkSizeDoc) { - RemoteCommandTargeterMock* targeter = - RemoteCommandTargeterMock::get(shardRegistry()->getShard("config")->getTargeter()); - targeter->setFindHostReturnValue(HostAndPort("TestHost1")); +TEST_F(CatalogManagerReplSetTestFixture, GetGlobalSettingsChunkSizeDoc) { + RemoteCommandTargeterMock* targeter = + RemoteCommandTargeterMock::get(shardRegistry()->getShard("config")->getTargeter()); + targeter->setFindHostReturnValue(HostAndPort("TestHost1")); - // sample chunk size doc - SettingsType st1; - st1.setKey(SettingsType::ChunkSizeDocKey); - st1.setChunkSizeMB(80); + // sample chunk size doc + SettingsType st1; + st1.setKey(SettingsType::ChunkSizeDocKey); + st1.setChunkSizeMB(80); - auto future = async(std::launch::async, [this] { - return assertGet(catalogManager()->getGlobalSettings(SettingsType::ChunkSizeDocKey)); - }); + auto future = async(std::launch::async, + [this] { + return assertGet( + catalogManager()->getGlobalSettings(SettingsType::ChunkSizeDocKey)); + }); - onFindCommand([st1](const RemoteCommandRequest& request) { - const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String()); - ASSERT_EQ(nss.toString(), SettingsType::ConfigNS); + onFindCommand([st1](const RemoteCommandRequest& request) { + const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String()); + ASSERT_EQ(nss.toString(), SettingsType::ConfigNS); - auto query = assertGet( - LiteParsedQuery::makeFromFindCommand(nss, request.cmdObj, false)); + auto query = assertGet(LiteParsedQuery::makeFromFindCommand(nss, request.cmdObj, false)); - ASSERT_EQ(query->ns(), SettingsType::ConfigNS); - ASSERT_EQ(query->getFilter(), BSON(SettingsType::key(SettingsType::ChunkSizeDocKey))); + ASSERT_EQ(query->ns(), SettingsType::ConfigNS); + ASSERT_EQ(query->getFilter(), BSON(SettingsType::key(SettingsType::ChunkSizeDocKey))); - return vector<BSONObj>{ st1.toBSON() }; - }); + return vector<BSONObj>{st1.toBSON()}; + }); - const auto& actualBalSettings = future.get(); - ASSERT_EQ(actualBalSettings.toBSON(), st1.toBSON()); - } + const auto& actualBalSettings = future.get(); + ASSERT_EQ(actualBalSettings.toBSON(), st1.toBSON()); +} - TEST_F(CatalogManagerReplSetTestFixture, GetGlobalSettingsInvalidDoc) { - RemoteCommandTargeterMock* targeter = - RemoteCommandTargeterMock::get(shardRegistry()->getShard("config")->getTargeter()); - targeter->setFindHostReturnValue(HostAndPort("TestHost1")); +TEST_F(CatalogManagerReplSetTestFixture, GetGlobalSettingsInvalidDoc) { + RemoteCommandTargeterMock* targeter = + RemoteCommandTargeterMock::get(shardRegistry()->getShard("config")->getTargeter()); + targeter->setFindHostReturnValue(HostAndPort("TestHost1")); - auto future = async(std::launch::async, [this] { - const auto balSettings = catalogManager()->getGlobalSettings("invalidKey"); + auto future = async(std::launch::async, + [this] { + const auto balSettings = + catalogManager()->getGlobalSettings("invalidKey"); - ASSERT_EQ(balSettings.getStatus(), ErrorCodes::FailedToParse); - }); + ASSERT_EQ(balSettings.getStatus(), ErrorCodes::FailedToParse); + }); - onFindCommand([](const RemoteCommandRequest& request) { - const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String()); - ASSERT_EQ(nss.toString(), SettingsType::ConfigNS); + onFindCommand([](const RemoteCommandRequest& request) { + const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String()); + ASSERT_EQ(nss.toString(), SettingsType::ConfigNS); - auto query = assertGet( - LiteParsedQuery::makeFromFindCommand(nss, request.cmdObj, false)); + auto query = assertGet(LiteParsedQuery::makeFromFindCommand(nss, request.cmdObj, false)); - ASSERT_EQ(query->ns(), SettingsType::ConfigNS); - ASSERT_EQ(query->getFilter(), BSON(SettingsType::key("invalidKey"))); + ASSERT_EQ(query->ns(), SettingsType::ConfigNS); + ASSERT_EQ(query->getFilter(), BSON(SettingsType::key("invalidKey"))); - return vector<BSONObj> { - BSON("invalidKey" << "some value") // invalid settings document -- key is required - }; - }); + return vector<BSONObj>{ + BSON("invalidKey" + << "some value") // invalid settings document -- key is required + }; + }); - future.get(); - } + future.get(); +} - TEST_F(CatalogManagerReplSetTestFixture, GetGlobalSettingsNonExistent) { - RemoteCommandTargeterMock* targeter = - RemoteCommandTargeterMock::get(shardRegistry()->getShard("config")->getTargeter()); - targeter->setFindHostReturnValue(HostAndPort("TestHost1")); +TEST_F(CatalogManagerReplSetTestFixture, GetGlobalSettingsNonExistent) { + RemoteCommandTargeterMock* targeter = + RemoteCommandTargeterMock::get(shardRegistry()->getShard("config")->getTargeter()); + targeter->setFindHostReturnValue(HostAndPort("TestHost1")); - auto future = async(std::launch::async, [this] { - const auto chunkSizeSettings = catalogManager()->getGlobalSettings( - SettingsType::ChunkSizeDocKey); + auto future = + async(std::launch::async, + [this] { + const auto chunkSizeSettings = + catalogManager()->getGlobalSettings(SettingsType::ChunkSizeDocKey); - ASSERT_EQ(chunkSizeSettings.getStatus(), ErrorCodes::NoMatchingDocument); - }); + ASSERT_EQ(chunkSizeSettings.getStatus(), ErrorCodes::NoMatchingDocument); + }); - onFindCommand([](const RemoteCommandRequest& request) { - const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String()); - ASSERT_EQ(nss.toString(), SettingsType::ConfigNS); + onFindCommand([](const RemoteCommandRequest& request) { + const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String()); + ASSERT_EQ(nss.toString(), SettingsType::ConfigNS); - auto query = assertGet( - LiteParsedQuery::makeFromFindCommand(nss, request.cmdObj, false)); + auto query = assertGet(LiteParsedQuery::makeFromFindCommand(nss, request.cmdObj, false)); - ASSERT_EQ(query->ns(), SettingsType::ConfigNS); - ASSERT_EQ(query->getFilter(), BSON(SettingsType::key(SettingsType::ChunkSizeDocKey))); + ASSERT_EQ(query->ns(), SettingsType::ConfigNS); + ASSERT_EQ(query->getFilter(), BSON(SettingsType::key(SettingsType::ChunkSizeDocKey))); - return vector<BSONObj> { }; - }); + return vector<BSONObj>{}; + }); - future.get(); - } + future.get(); +} -} // namespace -} // namespace mongo +} // namespace +} // namespace mongo diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set_test_fixture.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set_test_fixture.cpp index 3f0d78f451b..795c1d329db 100644 --- a/src/mongo/s/catalog/replset/catalog_manager_replica_set_test_fixture.cpp +++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set_test_fixture.cpp @@ -48,133 +48,127 @@ namespace mongo { - using executor::NetworkInterfaceMock; - using std::vector; +using executor::NetworkInterfaceMock; +using std::vector; - CatalogManagerReplSetTestFixture::CatalogManagerReplSetTestFixture() = default; +CatalogManagerReplSetTestFixture::CatalogManagerReplSetTestFixture() = default; - CatalogManagerReplSetTestFixture::~CatalogManagerReplSetTestFixture() = default; +CatalogManagerReplSetTestFixture::~CatalogManagerReplSetTestFixture() = default; - void CatalogManagerReplSetTestFixture::setUp() { - std::unique_ptr<NetworkInterfaceMock> network( - stdx::make_unique<executor::NetworkInterfaceMock>()); +void CatalogManagerReplSetTestFixture::setUp() { + std::unique_ptr<NetworkInterfaceMock> network( + stdx::make_unique<executor::NetworkInterfaceMock>()); - _mockNetwork = network.get(); + _mockNetwork = network.get(); - std::unique_ptr<repl::ReplicationExecutor> executor( - stdx::make_unique<repl::ReplicationExecutor>(network.release(), - nullptr, - 0)); + std::unique_ptr<repl::ReplicationExecutor> executor( + stdx::make_unique<repl::ReplicationExecutor>(network.release(), nullptr, 0)); - // The executor thread might run after the executor unique_ptr above has been moved to the - // ShardRegistry, so make sure we get the underlying pointer before that. - _executorThread = std::thread(std::bind([](repl::ReplicationExecutor* executorPtr) { - executorPtr->run(); - }, - executor.get())); + // The executor thread might run after the executor unique_ptr above has been moved to the + // ShardRegistry, so make sure we get the underlying pointer before that. + _executorThread = std::thread(std::bind( + [](repl::ReplicationExecutor* executorPtr) { executorPtr->run(); }, executor.get())); - std::unique_ptr<CatalogManagerReplicaSet> cm( - stdx::make_unique<CatalogManagerReplicaSet>()); + std::unique_ptr<CatalogManagerReplicaSet> cm(stdx::make_unique<CatalogManagerReplicaSet>()); - ASSERT_OK(cm->init(ConnectionString::forReplicaSet("CatalogManagerReplSetTest", - { HostAndPort{ "TestHost1" }, - HostAndPort{ "TestHost2" } }), - stdx::make_unique<DistLockManagerMock>())); + ASSERT_OK(cm->init( + ConnectionString::forReplicaSet("CatalogManagerReplSetTest", + {HostAndPort{"TestHost1"}, HostAndPort{"TestHost2"}}), + stdx::make_unique<DistLockManagerMock>())); - std::unique_ptr<ShardRegistry> shardRegistry( - stdx::make_unique<ShardRegistry>(stdx::make_unique<RemoteCommandTargeterFactoryMock>(), - stdx::make_unique<RemoteCommandRunnerMock>(), - std::move(executor), - cm.get())); + std::unique_ptr<ShardRegistry> shardRegistry( + stdx::make_unique<ShardRegistry>(stdx::make_unique<RemoteCommandTargeterFactoryMock>(), + stdx::make_unique<RemoteCommandRunnerMock>(), + std::move(executor), + cm.get())); - // For now initialize the global grid object. All sharding objects will be accessible - // from there until we get rid of it. - grid.init(std::move(cm), std::move(shardRegistry)); - } + // For now initialize the global grid object. All sharding objects will be accessible + // from there until we get rid of it. + grid.init(std::move(cm), std::move(shardRegistry)); +} - void CatalogManagerReplSetTestFixture::tearDown() { - // Stop the executor and wait for the executor thread to complete. This means that there - // will be no more calls into the executor and it can be safely deleted. - shardRegistry()->getExecutor()->shutdown(); - _executorThread.join(); +void CatalogManagerReplSetTestFixture::tearDown() { + // Stop the executor and wait for the executor thread to complete. This means that there + // will be no more calls into the executor and it can be safely deleted. + shardRegistry()->getExecutor()->shutdown(); + _executorThread.join(); - // This call will delete the shard registry, which will terminate the executor - grid.clearForUnitTests(); - } + // This call will delete the shard registry, which will terminate the executor + grid.clearForUnitTests(); +} - CatalogManagerReplicaSet* CatalogManagerReplSetTestFixture::catalogManager() const { - auto cm = dynamic_cast<CatalogManagerReplicaSet*>(grid.catalogManager()); - invariant(cm); +CatalogManagerReplicaSet* CatalogManagerReplSetTestFixture::catalogManager() const { + auto cm = dynamic_cast<CatalogManagerReplicaSet*>(grid.catalogManager()); + invariant(cm); - return cm; - } + return cm; +} - ShardRegistry* CatalogManagerReplSetTestFixture::shardRegistry() const { - return grid.shardRegistry(); - } +ShardRegistry* CatalogManagerReplSetTestFixture::shardRegistry() const { + return grid.shardRegistry(); +} - RemoteCommandRunnerMock* CatalogManagerReplSetTestFixture::commandRunner() const { - return RemoteCommandRunnerMock::get(shardRegistry()->getCommandRunner()); - } +RemoteCommandRunnerMock* CatalogManagerReplSetTestFixture::commandRunner() const { + return RemoteCommandRunnerMock::get(shardRegistry()->getCommandRunner()); +} - executor::NetworkInterfaceMock* CatalogManagerReplSetTestFixture::network() const { - return _mockNetwork; - } +executor::NetworkInterfaceMock* CatalogManagerReplSetTestFixture::network() const { + return _mockNetwork; +} - DistLockManagerMock* CatalogManagerReplSetTestFixture::distLock() const { - auto distLock = dynamic_cast<DistLockManagerMock*>(catalogManager()->getDistLockManager()); - invariant(distLock); +DistLockManagerMock* CatalogManagerReplSetTestFixture::distLock() const { + auto distLock = dynamic_cast<DistLockManagerMock*>(catalogManager()->getDistLockManager()); + invariant(distLock); - return distLock; - } + return distLock; +} - void CatalogManagerReplSetTestFixture::onCommand(OnCommandFunction func) { - network()->enterNetwork(); +void CatalogManagerReplSetTestFixture::onCommand(OnCommandFunction func) { + network()->enterNetwork(); - const NetworkInterfaceMock::NetworkOperationIterator noi = - network()->getNextReadyRequest(); - const RemoteCommandRequest& request = noi->getRequest(); + const NetworkInterfaceMock::NetworkOperationIterator noi = network()->getNextReadyRequest(); + const RemoteCommandRequest& request = noi->getRequest(); - const auto& resultStatus = func(request); + const auto& resultStatus = func(request); - BSONObjBuilder result; + BSONObjBuilder result; - if (resultStatus.isOK()) { - result.appendElements(resultStatus.getValue()); - } + if (resultStatus.isOK()) { + result.appendElements(resultStatus.getValue()); + } - Command::appendCommandStatus(result, resultStatus.getStatus()); + Command::appendCommandStatus(result, resultStatus.getStatus()); - const RemoteCommandResponse response(result.obj(), Milliseconds(1)); + const RemoteCommandResponse response(result.obj(), Milliseconds(1)); - network()->scheduleResponse(noi, network()->now(), response); + network()->scheduleResponse(noi, network()->now(), response); - network()->runReadyNetworkOperations(); + network()->runReadyNetworkOperations(); - network()->exitNetwork(); - } + network()->exitNetwork(); +} - void CatalogManagerReplSetTestFixture::onFindCommand(OnFindCommandFunction func) { - onCommand([&func](const RemoteCommandRequest& request) -> StatusWith<BSONObj> { +void CatalogManagerReplSetTestFixture::onFindCommand(OnFindCommandFunction func) { + onCommand([&func](const RemoteCommandRequest& request) -> StatusWith<BSONObj> { - const auto& resultStatus = func(request); + const auto& resultStatus = func(request); - if (!resultStatus.isOK()) { - return resultStatus.getStatus(); - } + if (!resultStatus.isOK()) { + return resultStatus.getStatus(); + } - BSONArrayBuilder arr; - for (const auto& obj : resultStatus.getValue()) { - arr.append(obj); - } + BSONArrayBuilder arr; + for (const auto& obj : resultStatus.getValue()) { + arr.append(obj); + } - const NamespaceString nss = NamespaceString(request.dbname, - request.cmdObj.firstElement().String()); - BSONObjBuilder result; - appendCursorResponseObject(0LL, nss.toString(), arr.arr(), &result); + const NamespaceString nss = + NamespaceString(request.dbname, request.cmdObj.firstElement().String()); + BSONObjBuilder result; + appendCursorResponseObject(0LL, nss.toString(), arr.arr(), &result); - return result.obj(); - }); - } + return result.obj(); + }); +} -} // namespace mongo +} // namespace mongo diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set_test_fixture.h b/src/mongo/s/catalog/replset/catalog_manager_replica_set_test_fixture.h index 05a5dcf4e1f..9331f44ad4f 100644 --- a/src/mongo/s/catalog/replset/catalog_manager_replica_set_test_fixture.h +++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set_test_fixture.h @@ -36,74 +36,74 @@ namespace mongo { - class BSONObj; - class CatalogManagerReplicaSet; - class DistLockManagerMock; - struct RemoteCommandRequest; - class RemoteCommandRunnerMock; - class ShardRegistry; - template<typename T> class StatusWith; +class BSONObj; +class CatalogManagerReplicaSet; +class DistLockManagerMock; +struct RemoteCommandRequest; +class RemoteCommandRunnerMock; +class ShardRegistry; +template <typename T> +class StatusWith; namespace executor { - class NetworkInterfaceMock; +class NetworkInterfaceMock; -} // namespace executor +} // namespace executor +/** + * Sets up the mocked out objects for testing the replica-set backed catalog manager. + */ +class CatalogManagerReplSetTestFixture : public mongo::unittest::Test { +public: + CatalogManagerReplSetTestFixture(); + ~CatalogManagerReplSetTestFixture(); + +protected: /** - * Sets up the mocked out objects for testing the replica-set backed catalog manager. + * Shortcut function to be used for generating mock responses to network requests. + * + * @param dbName Name of the database for which this request came. + * @param cmdObj Contents of the request. + * + * Return the BSON object representing the response(s) or an error, which will be passed + * back on the network. */ - class CatalogManagerReplSetTestFixture : public mongo::unittest::Test { - public: - CatalogManagerReplSetTestFixture(); - ~CatalogManagerReplSetTestFixture(); + using OnCommandFunction = std::function<StatusWith<BSONObj>(const RemoteCommandRequest&)>; - protected: - /** - * Shortcut function to be used for generating mock responses to network requests. - * - * @param dbName Name of the database for which this request came. - * @param cmdObj Contents of the request. - * - * Return the BSON object representing the response(s) or an error, which will be passed - * back on the network. - */ - using OnCommandFunction = - std::function<StatusWith<BSONObj>(const RemoteCommandRequest&)>; + using OnFindCommandFunction = + std::function<StatusWith<std::vector<BSONObj>>(const RemoteCommandRequest&)>; - using OnFindCommandFunction = - std::function<StatusWith<std::vector<BSONObj>>(const RemoteCommandRequest&)>; + CatalogManagerReplicaSet* catalogManager() const; - CatalogManagerReplicaSet* catalogManager() const; + ShardRegistry* shardRegistry() const; - ShardRegistry* shardRegistry() const; + RemoteCommandRunnerMock* commandRunner() const; - RemoteCommandRunnerMock* commandRunner() const; + executor::NetworkInterfaceMock* network() const; - executor::NetworkInterfaceMock* network() const; + DistLockManagerMock* distLock() const; - DistLockManagerMock* distLock() const; - - /** - * Blocking methods, which receive one message from the network and respond using the - * responses returned from the input function. This is a syntactic sugar for simple, - * single request + response or find tests. - */ - void onCommand(OnCommandFunction func); - void onFindCommand(OnFindCommandFunction func); + /** + * Blocking methods, which receive one message from the network and respond using the + * responses returned from the input function. This is a syntactic sugar for simple, + * single request + response or find tests. + */ + void onCommand(OnCommandFunction func); + void onFindCommand(OnFindCommandFunction func); - private: - void setUp() override; +private: + void setUp() override; - void tearDown() override; + void tearDown() override; - // Mocked out network under the task executor. This pointer is owned by the executor on - // the ShardRegistry, so it must not be accessed once the executor has been shut down. - executor::NetworkInterfaceMock* _mockNetwork; + // Mocked out network under the task executor. This pointer is owned by the executor on + // the ShardRegistry, so it must not be accessed once the executor has been shut down. + executor::NetworkInterfaceMock* _mockNetwork; - // Thread used to execute the task executor's loop. This thread will be busy until the - // shutdown is called on the shard registry's task executor. - std::thread _executorThread; - }; + // Thread used to execute the task executor's loop. This thread will be busy until the + // shutdown is called on the shard registry's task executor. + std::thread _executorThread; +}; -} // namespace mongo +} // namespace mongo diff --git a/src/mongo/s/catalog/replset/replset_dist_lock_manager.cpp b/src/mongo/s/catalog/replset/replset_dist_lock_manager.cpp index 5d38ea337cd..ef329ded1eb 100644 --- a/src/mongo/s/catalog/replset/replset_dist_lock_manager.cpp +++ b/src/mongo/s/catalog/replset/replset_dist_lock_manager.cpp @@ -48,342 +48,321 @@ namespace mongo { - using std::string; - using std::unique_ptr; - using stdx::chrono::milliseconds; - using std::chrono::duration_cast; - - ReplSetDistLockManager::ReplSetDistLockManager(ServiceContext* globalContext, - StringData processID, - unique_ptr<DistLockCatalog> catalog, - milliseconds pingInterval, - milliseconds lockExpiration): - _serviceContext(globalContext), - _processID(processID.toString()), - _catalog(std::move(catalog)), - _pingInterval(pingInterval), - _lockExpiration(lockExpiration) { +using std::string; +using std::unique_ptr; +using stdx::chrono::milliseconds; +using std::chrono::duration_cast; + +ReplSetDistLockManager::ReplSetDistLockManager(ServiceContext* globalContext, + StringData processID, + unique_ptr<DistLockCatalog> catalog, + milliseconds pingInterval, + milliseconds lockExpiration) + : _serviceContext(globalContext), + _processID(processID.toString()), + _catalog(std::move(catalog)), + _pingInterval(pingInterval), + _lockExpiration(lockExpiration) {} + +ReplSetDistLockManager::~ReplSetDistLockManager() = default; + +void ReplSetDistLockManager::startUp() { + _execThread = stdx::make_unique<stdx::thread>(&ReplSetDistLockManager::doTask, this); +} + +void ReplSetDistLockManager::shutDown() { + { + stdx::lock_guard<stdx::mutex> lk(_mutex); + _isShutDown = true; + _shutDownCV.notify_all(); } - ReplSetDistLockManager::~ReplSetDistLockManager() = default; + // Don't grab _mutex, otherwise will deadlock trying to join. Safe to read + // _execThread since it is modified only at statrUp(). + if (_execThread && _execThread->joinable()) { + _execThread->join(); + _execThread.reset(); + } - void ReplSetDistLockManager::startUp() { - _execThread = stdx::make_unique<stdx::thread>(&ReplSetDistLockManager::doTask, this); + auto status = _catalog->stopPing(_processID); + if (!status.isOK()) { + warning() << "error encountered while cleaning up distributed ping entry for " << _processID + << causedBy(status); } +} - void ReplSetDistLockManager::shutDown() { - { - stdx::lock_guard<stdx::mutex> lk(_mutex); - _isShutDown = true; - _shutDownCV.notify_all(); - } +bool ReplSetDistLockManager::isShutDown() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + return _isShutDown; +} - // Don't grab _mutex, otherwise will deadlock trying to join. Safe to read - // _execThread since it is modified only at statrUp(). - if (_execThread && _execThread->joinable()) { - _execThread->join(); - _execThread.reset(); - } +void ReplSetDistLockManager::doTask() { + LOG(0) << "creating distributed lock ping thread for process " << _processID + << " (sleeping for " << duration_cast<milliseconds>(_pingInterval).count() << " ms)"; + + Timer elapsedSincelastPing(_serviceContext->getTickSource()); + while (!isShutDown()) { + auto pingStatus = _catalog->ping(_processID, Date_t::now()); - auto status = _catalog->stopPing(_processID); - if (!status.isOK()) { - warning() << "error encountered while cleaning up distributed ping entry for " - << _processID << causedBy(status); + if (!pingStatus.isOK()) { + warning() << "pinging failed for distributed lock pinger" << causedBy(pingStatus); } - } - bool ReplSetDistLockManager::isShutDown() { - stdx::lock_guard<stdx::mutex> lk(_mutex); - return _isShutDown; - } + const milliseconds elapsed(elapsedSincelastPing.millis()); + if (elapsed > 10 * _pingInterval) { + warning() << "Lock pinger for proc: " << _processID << " was inactive for " << elapsed + << " ms"; + } + elapsedSincelastPing.reset(); - void ReplSetDistLockManager::doTask() { - LOG(0) << "creating distributed lock ping thread for process " << _processID - << " (sleeping for " - << duration_cast<milliseconds>(_pingInterval).count() << " ms)"; + std::deque<DistLockHandle> toUnlockBatch; + { + stdx::unique_lock<stdx::mutex> lk(_mutex); + toUnlockBatch.swap(_unlockList); + } - Timer elapsedSincelastPing(_serviceContext->getTickSource()); - while (!isShutDown()) { - auto pingStatus = _catalog->ping(_processID, Date_t::now()); + for (const auto& toUnlock : toUnlockBatch) { + auto unlockStatus = _catalog->unlock(toUnlock); - if (!pingStatus.isOK()) { - warning() << "pinging failed for distributed lock pinger" << causedBy(pingStatus); + if (!unlockStatus.isOK()) { + warning() << "Failed to unlock lock with " << LocksType::lockID() << ": " + << toUnlock << causedBy(unlockStatus); + queueUnlock(toUnlock); + } else { + LOG(0) << "distributed lock with " << LocksType::lockID() << ": " << toUnlock + << "' unlocked."; } - const milliseconds elapsed(elapsedSincelastPing.millis()); - if (elapsed > 10 * _pingInterval) { - warning() << "Lock pinger for proc: " << _processID - << " was inactive for " << elapsed << " ms"; + if (isShutDown()) { + return; } - elapsedSincelastPing.reset(); + } - std::deque<DistLockHandle> toUnlockBatch; - { - stdx::unique_lock<stdx::mutex> lk(_mutex); - toUnlockBatch.swap(_unlockList); - } + stdx::unique_lock<stdx::mutex> lk(_mutex); + _shutDownCV.wait_for(lk, _pingInterval); + } +} - for (const auto& toUnlock : toUnlockBatch) { - auto unlockStatus = _catalog->unlock(toUnlock); +StatusWith<bool> ReplSetDistLockManager::canOvertakeLock(LocksType lockDoc) { + const auto& processID = lockDoc.getProcess(); + auto pingStatus = _catalog->getPing(processID); + if (!pingStatus.isOK()) { + return pingStatus.getStatus(); + } - if (!unlockStatus.isOK()) { - warning() << "Failed to unlock lock with " << LocksType::lockID() - << ": " << toUnlock << causedBy(unlockStatus); - queueUnlock(toUnlock); - } - else { - LOG(0) << "distributed lock with " << LocksType::lockID() - << ": " << toUnlock << "' unlocked."; - } + const auto& pingDoc = pingStatus.getValue(); + string errMsg; + if (!pingDoc.isValid(&errMsg)) { + return {ErrorCodes::UnsupportedFormat, + str::stream() << "invalid ping document for " << processID << ": " << errMsg}; + } - if (isShutDown()) { - return; - } - } + Timer timer(_serviceContext->getTickSource()); + auto serverInfoStatus = _catalog->getServerInfo(); + if (!serverInfoStatus.isOK()) { + return serverInfoStatus.getStatus(); + } - stdx::unique_lock<stdx::mutex> lk(_mutex); - _shutDownCV.wait_for(lk, _pingInterval); - } + // Be conservative when determining that lock expiration has elapsed by + // taking into account the roundtrip delay of trying to get the local + // time from the config server. + milliseconds delay(timer.millis() / 2); // Assuming symmetrical delay. + + Date_t pingValue = pingDoc.getPing(); + const auto& serverInfo = serverInfoStatus.getValue(); + + stdx::lock_guard<stdx::mutex> lk(_mutex); + auto pingIter = _pingHistory.find(lockDoc.getName()); + + if (pingIter == _pingHistory.end()) { + // We haven't seen this lock before so we don't have any point of reference + // to compare and determine the elapsed time. Save the current ping info + // for this lock. + _pingHistory.emplace(std::piecewise_construct, + std::forward_as_tuple(lockDoc.getName()), + std::forward_as_tuple(processID, + pingValue, + serverInfo.serverTime, + lockDoc.getLockID(), + serverInfo.electionId)); + return false; } - StatusWith<bool> ReplSetDistLockManager::canOvertakeLock(LocksType lockDoc) { - const auto& processID = lockDoc.getProcess(); - auto pingStatus = _catalog->getPing(processID); - if (!pingStatus.isOK()) { - return pingStatus.getStatus(); - } + auto configServerLocalTime = serverInfo.serverTime - delay; - const auto& pingDoc = pingStatus.getValue(); - string errMsg; - if (!pingDoc.isValid(&errMsg)) { - return {ErrorCodes::UnsupportedFormat, - str::stream() << "invalid ping document for " << processID - << ": " << errMsg}; - } + auto* pingInfo = &pingIter->second; - Timer timer(_serviceContext->getTickSource()); - auto serverInfoStatus = _catalog->getServerInfo(); - if (!serverInfoStatus.isOK()) { - return serverInfoStatus.getStatus(); - } + LOG(1) << "checking last ping for lock '" << lockDoc.getName() << "' against last seen process " + << pingInfo->processId << " and ping " << pingInfo->lastPing; - // Be conservative when determining that lock expiration has elapsed by - // taking into account the roundtrip delay of trying to get the local - // time from the config server. - milliseconds delay(timer.millis() / 2); // Assuming symmetrical delay. + if (pingInfo->lastPing != pingValue || // ping is active - Date_t pingValue = pingDoc.getPing(); - const auto& serverInfo = serverInfoStatus.getValue(); + // Owner of this lock is now different from last time so we can't + // use the ping data. + pingInfo->lockSessionId != lockDoc.getLockID() || - stdx::lock_guard<stdx::mutex> lk(_mutex); - auto pingIter = _pingHistory.find(lockDoc.getName()); - - if (pingIter == _pingHistory.end()) { - // We haven't seen this lock before so we don't have any point of reference - // to compare and determine the elapsed time. Save the current ping info - // for this lock. - _pingHistory.emplace(std::piecewise_construct, - std::forward_as_tuple(lockDoc.getName()), - std::forward_as_tuple(processID, - pingValue, - serverInfo.serverTime, - lockDoc.getLockID(), - serverInfo.electionId)); - return false; - } + // Primary changed, we can't trust that clocks are synchronized so + // treat as if this is a new entry. + pingInfo->electionId != serverInfo.electionId) { + pingInfo->lastPing = pingValue; + pingInfo->electionId = serverInfo.electionId; + pingInfo->configLocalTime = configServerLocalTime; + pingInfo->lockSessionId = lockDoc.getLockID(); + return false; + } + + if (configServerLocalTime < pingInfo->configLocalTime) { + warning() << "config server local time went backwards, from last seen: " + << pingInfo->configLocalTime << " to " << configServerLocalTime; + return false; + } - auto configServerLocalTime = serverInfo.serverTime - delay; + milliseconds elapsedSinceLastPing(configServerLocalTime - pingInfo->configLocalTime); + if (elapsedSinceLastPing >= _lockExpiration) { + LOG(0) << "forcing lock '" << lockDoc.getName() << "' because elapsed time " + << duration_cast<milliseconds>(elapsedSinceLastPing).count() + << " ms >= takeover time " << duration_cast<milliseconds>(_lockExpiration).count() + << " ms"; + return true; + } - auto* pingInfo = &pingIter->second; + LOG(1) << "could not force lock '" << lockDoc.getName() << "' because elapsed time " + << duration_cast<milliseconds>(elapsedSinceLastPing).count() << " ms < takeover time " + << duration_cast<milliseconds>(_lockExpiration).count() << " ms"; + return false; +} - LOG(1) << "checking last ping for lock '" << lockDoc.getName() - << "' against last seen process " << pingInfo->processId - << " and ping " << pingInfo->lastPing; +StatusWith<DistLockManager::ScopedDistLock> ReplSetDistLockManager::lock( + StringData name, StringData whyMessage, milliseconds waitFor, milliseconds lockTryInterval) { + Timer timer(_serviceContext->getTickSource()); + Timer msgTimer(_serviceContext->getTickSource()); - if (pingInfo->lastPing != pingValue || // ping is active + while (waitFor <= milliseconds::zero() || milliseconds(timer.millis()) < waitFor) { + OID lockSessionID = OID::gen(); + string who = str::stream() << _processID << ":" << getThreadName(); - // Owner of this lock is now different from last time so we can't - // use the ping data. - pingInfo->lockSessionId != lockDoc.getLockID() || + LOG(1) << "trying to acquire new distributed lock for " << name + << " ( lock timeout : " << duration_cast<milliseconds>(_lockExpiration).count() + << " ms, ping interval : " << duration_cast<milliseconds>(_pingInterval).count() + << " ms, process : " << _processID << " )" + << " with lockSessionID: " << lockSessionID << ", why: " << whyMessage; - // Primary changed, we can't trust that clocks are synchronized so - // treat as if this is a new entry. - pingInfo->electionId != serverInfo.electionId) { - pingInfo->lastPing = pingValue; - pingInfo->electionId = serverInfo.electionId; - pingInfo->configLocalTime = configServerLocalTime; - pingInfo->lockSessionId = lockDoc.getLockID(); - return false; - } + auto lockResult = + _catalog->grabLock(name, lockSessionID, who, _processID, Date_t::now(), whyMessage); + + auto status = lockResult.getStatus(); - if (configServerLocalTime < pingInfo->configLocalTime) { - warning() << "config server local time went backwards, from last seen: " - << pingInfo->configLocalTime - << " to " << configServerLocalTime; - return false; + if (status.isOK()) { + // Lock is acquired since findAndModify was able to successfully modify + // the lock document. + LOG(0) << "distributed lock '" << name << "' acquired, ts : " << lockSessionID; + return ScopedDistLock(lockSessionID, this); } - milliseconds elapsedSinceLastPing(configServerLocalTime - pingInfo->configLocalTime); - if (elapsedSinceLastPing >= _lockExpiration) { - LOG(0) << "forcing lock '" << lockDoc.getName() - << "' because elapsed time " - << duration_cast<milliseconds>(elapsedSinceLastPing).count() - << " ms >= takeover time " - << duration_cast<milliseconds>(_lockExpiration).count() << " ms"; - return true; + if (status != ErrorCodes::LockStateChangeFailed) { + // An error occurred but the write might have actually been applied on the + // other side. Schedule an unlock to clean it up just in case. + queueUnlock(lockSessionID); + return status; } - LOG(1) << "could not force lock '" << lockDoc.getName() - << "' because elapsed time " - << duration_cast<milliseconds>(elapsedSinceLastPing).count() - << " ms < takeover time " - << duration_cast<milliseconds>(_lockExpiration).count() << " ms"; - return false; - } + // Get info from current lock and check if we can overtake it. + auto getLockStatusResult = _catalog->getLockByName(name); + const auto& getLockStatus = getLockStatusResult.getStatus(); - StatusWith<DistLockManager::ScopedDistLock> ReplSetDistLockManager::lock( - StringData name, - StringData whyMessage, - milliseconds waitFor, - milliseconds lockTryInterval) { - Timer timer(_serviceContext->getTickSource()); - Timer msgTimer(_serviceContext->getTickSource()); - - while (waitFor <= milliseconds::zero() || milliseconds(timer.millis()) < waitFor) { - OID lockSessionID = OID::gen(); - string who = str::stream() << _processID << ":" << getThreadName(); - - LOG(1) << "trying to acquire new distributed lock for " << name - << " ( lock timeout : " - << duration_cast<milliseconds>(_lockExpiration).count() - << " ms, ping interval : " - << duration_cast<milliseconds>(_pingInterval).count() - << " ms, process : " << _processID << " )" - << " with lockSessionID: " << lockSessionID - << ", why: " << whyMessage; - - auto lockResult = _catalog->grabLock(name, - lockSessionID, - who, - _processID, - Date_t::now(), - whyMessage); - - auto status = lockResult.getStatus(); - - if (status.isOK()) { - // Lock is acquired since findAndModify was able to successfully modify - // the lock document. - LOG(0) << "distributed lock '" << name << "' acquired, ts : " << lockSessionID; - return ScopedDistLock(lockSessionID, this); - } + if (!getLockStatusResult.isOK() && getLockStatus != ErrorCodes::LockNotFound) { + return getLockStatus; + } + + // Note: Only attempt to overtake locks that actually exists. If lock was not + // found, use the normal grab lock path to acquire it. + if (getLockStatusResult.isOK()) { + auto currentLock = getLockStatusResult.getValue(); + auto canOvertakeResult = canOvertakeLock(currentLock); - if (status != ErrorCodes::LockStateChangeFailed) { - // An error occurred but the write might have actually been applied on the - // other side. Schedule an unlock to clean it up just in case. - queueUnlock(lockSessionID); - return status; + if (!canOvertakeResult.isOK()) { + return canOvertakeResult.getStatus(); } - // Get info from current lock and check if we can overtake it. - auto getLockStatusResult = _catalog->getLockByName(name); - const auto& getLockStatus = getLockStatusResult.getStatus(); + if (canOvertakeResult.getValue()) { + auto overtakeResult = _catalog->overtakeLock(name, + lockSessionID, + currentLock.getLockID(), + who, + _processID, + Date_t::now(), + whyMessage); - if (!getLockStatusResult.isOK() && getLockStatus != ErrorCodes::LockNotFound) { - return getLockStatus; - } + const auto& overtakeStatus = overtakeResult.getStatus(); - // Note: Only attempt to overtake locks that actually exists. If lock was not - // found, use the normal grab lock path to acquire it. - if (getLockStatusResult.isOK()) { - auto currentLock = getLockStatusResult.getValue(); - auto canOvertakeResult = canOvertakeLock(currentLock); + if (overtakeResult.isOK()) { + // Lock is acquired since findAndModify was able to successfully modify + // the lock document. - if (!canOvertakeResult.isOK()) { - return canOvertakeResult.getStatus(); + LOG(0) << "lock '" << name << "' successfully forced"; + LOG(0) << "distributed lock '" << name << "' acquired, ts : " << lockSessionID; + return ScopedDistLock(lockSessionID, this); } - if (canOvertakeResult.getValue()) { - auto overtakeResult = _catalog->overtakeLock(name, - lockSessionID, - currentLock.getLockID(), - who, - _processID, - Date_t::now(), - whyMessage); - - const auto& overtakeStatus = overtakeResult.getStatus(); - - if (overtakeResult.isOK()) { - // Lock is acquired since findAndModify was able to successfully modify - // the lock document. - - LOG(0) << "lock '" << name << "' successfully forced"; - LOG(0) << "distributed lock '" << name - << "' acquired, ts : " << lockSessionID; - return ScopedDistLock(lockSessionID, this); - } - - if (overtakeStatus != ErrorCodes::LockStateChangeFailed) { - // An error occurred but the write might have actually been applied on the - // other side. Schedule an unlock to clean it up just in case. - queueUnlock(lockSessionID); - return overtakeStatus; - } + if (overtakeStatus != ErrorCodes::LockStateChangeFailed) { + // An error occurred but the write might have actually been applied on the + // other side. Schedule an unlock to clean it up just in case. + queueUnlock(lockSessionID); + return overtakeStatus; } } + } - LOG(1) << "distributed lock '" << name << "' was not acquired."; - - if (waitFor == milliseconds::zero()) { - break; - } + LOG(1) << "distributed lock '" << name << "' was not acquired."; - // Periodically message for debugging reasons - if (msgTimer.seconds() > 10) { - LOG(0) << "waited " << timer.seconds() << "s for distributed lock " << name - << " for " << whyMessage; + if (waitFor == milliseconds::zero()) { + break; + } - msgTimer.reset(); - } + // Periodically message for debugging reasons + if (msgTimer.seconds() > 10) { + LOG(0) << "waited " << timer.seconds() << "s for distributed lock " << name << " for " + << whyMessage; - milliseconds timeRemaining = - std::max(milliseconds::zero(), waitFor - milliseconds(timer.millis())); - sleepFor(std::min(lockTryInterval, timeRemaining)); + msgTimer.reset(); } - return {ErrorCodes::LockBusy, str::stream() << "timed out waiting for " << name}; + milliseconds timeRemaining = + std::max(milliseconds::zero(), waitFor - milliseconds(timer.millis())); + sleepFor(std::min(lockTryInterval, timeRemaining)); } - void ReplSetDistLockManager::unlock(const DistLockHandle& lockSessionID) { - auto unlockStatus = _catalog->unlock(lockSessionID); - - if (!unlockStatus.isOK()) { - queueUnlock(lockSessionID); - } - else { - LOG(0) << "distributed lock with " << LocksType::lockID() - << ": " << lockSessionID << "' unlocked."; - } - } + return {ErrorCodes::LockBusy, str::stream() << "timed out waiting for " << name}; +} - Status ReplSetDistLockManager::checkStatus(const DistLockHandle& lockHandle) { - auto lockStatus = _catalog->getLockByTS(lockHandle); +void ReplSetDistLockManager::unlock(const DistLockHandle& lockSessionID) { + auto unlockStatus = _catalog->unlock(lockSessionID); - if (!lockStatus.isOK()) { - return lockStatus.getStatus(); - } + if (!unlockStatus.isOK()) { + queueUnlock(lockSessionID); + } else { + LOG(0) << "distributed lock with " << LocksType::lockID() << ": " << lockSessionID + << "' unlocked."; + } +} - auto lockDoc = lockStatus.getValue(); - if (!lockDoc.isValid(nullptr)) { - return {ErrorCodes::LockNotFound, "lock owner changed"}; - } +Status ReplSetDistLockManager::checkStatus(const DistLockHandle& lockHandle) { + auto lockStatus = _catalog->getLockByTS(lockHandle); - return Status::OK(); + if (!lockStatus.isOK()) { + return lockStatus.getStatus(); } - void ReplSetDistLockManager::queueUnlock(const DistLockHandle& lockSessionID) { - stdx::unique_lock<stdx::mutex> lk(_mutex); - _unlockList.push_back(lockSessionID); + auto lockDoc = lockStatus.getValue(); + if (!lockDoc.isValid(nullptr)) { + return {ErrorCodes::LockNotFound, "lock owner changed"}; } + + return Status::OK(); +} + +void ReplSetDistLockManager::queueUnlock(const DistLockHandle& lockSessionID) { + stdx::unique_lock<stdx::mutex> lk(_mutex); + _unlockList.push_back(lockSessionID); +} } diff --git a/src/mongo/s/catalog/replset/replset_dist_lock_manager.h b/src/mongo/s/catalog/replset/replset_dist_lock_manager.h index 4e042e2fa41..1779925b8ec 100644 --- a/src/mongo/s/catalog/replset/replset_dist_lock_manager.h +++ b/src/mongo/s/catalog/replset/replset_dist_lock_manager.h @@ -43,88 +43,86 @@ namespace mongo { - class ServiceContext; - - class ReplSetDistLockManager final : public DistLockManager { - public: - ReplSetDistLockManager(ServiceContext* globalContext, - StringData processID, - std::unique_ptr<DistLockCatalog> catalog, - stdx::chrono::milliseconds pingInterval, - stdx::chrono::milliseconds lockExpiration); - - virtual ~ReplSetDistLockManager(); - - virtual void startUp() override; - virtual void shutDown() override; - - virtual StatusWith<DistLockManager::ScopedDistLock> lock( - StringData name, - StringData whyMessage, - stdx::chrono::milliseconds waitFor, - stdx::chrono::milliseconds lockTryInterval) override; - - protected: - - virtual void unlock(const DistLockHandle& lockSessionID) override; - - virtual Status checkStatus(const DistLockHandle& lockSessionID) override; - - private: - - /** - * Queue a lock to be unlocked asynchronously with retry until it doesn't error. - */ - void queueUnlock(const DistLockHandle& lockSessionID); - - /** - * Periodically pings and checks if there are locks queued that needs unlocking. - */ - void doTask(); - - /** - * Returns true if shutDown was called. - */ - bool isShutDown(); - - /** - * Returns true if the current process that owns the lock has no fresh pings since - * the lock expiration threshold. - */ - StatusWith<bool> canOvertakeLock(const LocksType lockDoc); - - // - // All member variables are labeled with one of the following codes indicating the - // synchronization rules for accessing them. - // - // (F) Self synchronizing. - // (M) Must hold _mutex for access. - // (I) Immutable, no synchronization needed. - // (S) Can only be called inside startUp/shutDown. - // - - ServiceContext* const _serviceContext; // (F) - - const std::string _processID; // (I) - const std::unique_ptr<DistLockCatalog> _catalog; // (I) - const stdx::chrono::milliseconds _pingInterval; // (I) - const stdx::chrono::milliseconds _lockExpiration; // (I) - - stdx::mutex _mutex; - std::unique_ptr<stdx::thread> _execThread; // (S) - - // Contains the list of locks queued for unlocking. Cases when unlock operation can - // be queued include: - // 1. First attempt on unlocking resulted in an error. - // 2. Attempting to grab or overtake a lock resulted in an error where we are uncertain - // whether the modification was actually applied or not, and call unlock to make - // sure that it was cleaned up. - std::deque<DistLockHandle> _unlockList; // (M) - - bool _isShutDown = false; // (M) - stdx::condition_variable _shutDownCV; // (M) - - // Map of lockName to last ping information. - std::unordered_map<std::string, DistLockPingInfo> _pingHistory; // (M) - }; +class ServiceContext; + +class ReplSetDistLockManager final : public DistLockManager { +public: + ReplSetDistLockManager(ServiceContext* globalContext, + StringData processID, + std::unique_ptr<DistLockCatalog> catalog, + stdx::chrono::milliseconds pingInterval, + stdx::chrono::milliseconds lockExpiration); + + virtual ~ReplSetDistLockManager(); + + virtual void startUp() override; + virtual void shutDown() override; + + virtual StatusWith<DistLockManager::ScopedDistLock> lock( + StringData name, + StringData whyMessage, + stdx::chrono::milliseconds waitFor, + stdx::chrono::milliseconds lockTryInterval) override; + +protected: + virtual void unlock(const DistLockHandle& lockSessionID) override; + + virtual Status checkStatus(const DistLockHandle& lockSessionID) override; + +private: + /** + * Queue a lock to be unlocked asynchronously with retry until it doesn't error. + */ + void queueUnlock(const DistLockHandle& lockSessionID); + + /** + * Periodically pings and checks if there are locks queued that needs unlocking. + */ + void doTask(); + + /** + * Returns true if shutDown was called. + */ + bool isShutDown(); + + /** + * Returns true if the current process that owns the lock has no fresh pings since + * the lock expiration threshold. + */ + StatusWith<bool> canOvertakeLock(const LocksType lockDoc); + + // + // All member variables are labeled with one of the following codes indicating the + // synchronization rules for accessing them. + // + // (F) Self synchronizing. + // (M) Must hold _mutex for access. + // (I) Immutable, no synchronization needed. + // (S) Can only be called inside startUp/shutDown. + // + + ServiceContext* const _serviceContext; // (F) + + const std::string _processID; // (I) + const std::unique_ptr<DistLockCatalog> _catalog; // (I) + const stdx::chrono::milliseconds _pingInterval; // (I) + const stdx::chrono::milliseconds _lockExpiration; // (I) + + stdx::mutex _mutex; + std::unique_ptr<stdx::thread> _execThread; // (S) + + // Contains the list of locks queued for unlocking. Cases when unlock operation can + // be queued include: + // 1. First attempt on unlocking resulted in an error. + // 2. Attempting to grab or overtake a lock resulted in an error where we are uncertain + // whether the modification was actually applied or not, and call unlock to make + // sure that it was cleaned up. + std::deque<DistLockHandle> _unlockList; // (M) + + bool _isShutDown = false; // (M) + stdx::condition_variable _shutDownCV; // (M) + + // Map of lockName to last ping information. + std::unordered_map<std::string, DistLockPingInfo> _pingHistory; // (M) +}; } diff --git a/src/mongo/s/catalog/replset/replset_dist_lock_manager_test.cpp b/src/mongo/s/catalog/replset/replset_dist_lock_manager_test.cpp index 50e05ece2cc..52395a65e62 100644 --- a/src/mongo/s/catalog/replset/replset_dist_lock_manager_test.cpp +++ b/src/mongo/s/catalog/replset/replset_dist_lock_manager_test.cpp @@ -61,138 +61,132 @@ namespace mongo { namespace { - using std::map; - using std::string; - using std::vector; +using std::map; +using std::string; +using std::vector; - const Seconds kUnlockTimeout(30); - const Milliseconds kPingInterval(2); - const Seconds kLockExpiration(10); +const Seconds kUnlockTimeout(30); +const Milliseconds kPingInterval(2); +const Seconds kLockExpiration(10); + +/** + * Basic fixture for ReplSetDistLockManager that starts it up before the test begins + * and shuts it down when a test finishes. + */ +class ReplSetDistLockManagerFixture : public mongo::unittest::Test { +public: + ReplSetDistLockManagerFixture() + : _dummyDoNotUse(stdx::make_unique<DistLockCatalogMock>()), + _mockCatalog(_dummyDoNotUse.get()), + _processID("test"), + _mgr(&_context, _processID, std::move(_dummyDoNotUse), kPingInterval, kLockExpiration) {} /** - * Basic fixture for ReplSetDistLockManager that starts it up before the test begins - * and shuts it down when a test finishes. + * Returns the lock manager instance that is being tested. */ - class ReplSetDistLockManagerFixture: public mongo::unittest::Test { - public: - ReplSetDistLockManagerFixture(): - _dummyDoNotUse(stdx::make_unique<DistLockCatalogMock>()), - _mockCatalog(_dummyDoNotUse.get()), - _processID("test"), - _mgr(&_context, - _processID, - std::move(_dummyDoNotUse), - kPingInterval, - kLockExpiration) { - } - - /** - * Returns the lock manager instance that is being tested. - */ - ReplSetDistLockManager* getMgr() { - return &_mgr; - } + ReplSetDistLockManager* getMgr() { + return &_mgr; + } - /** - * Returns the mocked catalog used by the lock manager being tested. - */ - DistLockCatalogMock* getMockCatalog() { - return _mockCatalog; - } + /** + * Returns the mocked catalog used by the lock manager being tested. + */ + DistLockCatalogMock* getMockCatalog() { + return _mockCatalog; + } - /** - * Get the process id that was initialiezd with the lock manager being tested. - */ - string getProcessID() const { - return _processID; - } + /** + * Get the process id that was initialiezd with the lock manager being tested. + */ + string getProcessID() const { + return _processID; + } - protected: - void setUp() override { - _context.setTickSource(stdx::make_unique<SystemTickSource>()); - _mgr.startUp(); - } +protected: + void setUp() override { + _context.setTickSource(stdx::make_unique<SystemTickSource>()); + _mgr.startUp(); + } - void tearDown() override { - // Don't care about what shutDown passes to stopPing here. - _mockCatalog->expectStopPing([](StringData){}, Status::OK()); - _mgr.shutDown(); - } + void tearDown() override { + // Don't care about what shutDown passes to stopPing here. + _mockCatalog->expectStopPing([](StringData) {}, Status::OK()); + _mgr.shutDown(); + } - TickSourceMock _tickSource; - std::unique_ptr<DistLockCatalogMock> _dummyDoNotUse; // dummy placeholder - DistLockCatalogMock* _mockCatalog; - string _processID; - ServiceContextNoop _context; - ReplSetDistLockManager _mgr; - }; + TickSourceMock _tickSource; + std::unique_ptr<DistLockCatalogMock> _dummyDoNotUse; // dummy placeholder + DistLockCatalogMock* _mockCatalog; + string _processID; + ServiceContextNoop _context; + ReplSetDistLockManager _mgr; +}; - class RSDistLockMgrWithMockTickSource: public ReplSetDistLockManagerFixture { - public: - /** - * Returns the mock tick source. - */ - TickSourceMock* getMockTickSource() { - return dynamic_cast<TickSourceMock*>(_context.getTickSource()); - } +class RSDistLockMgrWithMockTickSource : public ReplSetDistLockManagerFixture { +public: + /** + * Returns the mock tick source. + */ + TickSourceMock* getMockTickSource() { + return dynamic_cast<TickSourceMock*>(_context.getTickSource()); + } - protected: - void setUp() override { - _context.setTickSource(stdx::make_unique<TickSourceMock>()); - _mgr.startUp(); - } - }; +protected: + void setUp() override { + _context.setTickSource(stdx::make_unique<TickSourceMock>()); + _mgr.startUp(); + } +}; - std::string mapToString(const std::map<OID, int>& map) { - StringBuilder str; +std::string mapToString(const std::map<OID, int>& map) { + StringBuilder str; - for (const auto& entry : map) { - str << "(" << entry.first.toString() << ": " << entry.second << ")"; - } + for (const auto& entry : map) { + str << "(" << entry.first.toString() << ": " << entry.second << ")"; + } - return str.str(); - }; + return str.str(); +}; - std::string vectorToString(const std::vector<OID>& list) { - StringBuilder str; +std::string vectorToString(const std::vector<OID>& list) { + StringBuilder str; - for (const auto& entry : list) { - str << "(" << entry.toString() << ")"; - } + for (const auto& entry : list) { + str << "(" << entry.toString() << ")"; + } - return str.str(); - }; + return str.str(); +}; - /** - * Test scenario: - * 1. Grab lock. - * 2. Unlock (on destructor of ScopedDistLock). - * 3. Check lock id used in lock and unlock are the same. - */ - TEST_F(ReplSetDistLockManagerFixture, BasicLockLifeCycle) { - string lockName("test"); - Date_t now(Date_t::now()); - string whyMsg("because"); - - LocksType retLockDoc; - retLockDoc.setName(lockName); - retLockDoc.setState(LocksType::LOCKED); - retLockDoc.setProcess(getProcessID()); - retLockDoc.setWho("me"); - retLockDoc.setWhy(whyMsg); - // Will be different from the actual lock session id. For testing only. - retLockDoc.setLockID(OID::gen()); - - OID lockSessionIDPassed; - - getMockCatalog()->expectGrabLock( - [this, &lockName, &now, &whyMsg, &lockSessionIDPassed]( - StringData lockID, - const OID& lockSessionID, - StringData who, - StringData processId, - Date_t time, - StringData why) { +/** + * Test scenario: + * 1. Grab lock. + * 2. Unlock (on destructor of ScopedDistLock). + * 3. Check lock id used in lock and unlock are the same. + */ +TEST_F(ReplSetDistLockManagerFixture, BasicLockLifeCycle) { + string lockName("test"); + Date_t now(Date_t::now()); + string whyMsg("because"); + + LocksType retLockDoc; + retLockDoc.setName(lockName); + retLockDoc.setState(LocksType::LOCKED); + retLockDoc.setProcess(getProcessID()); + retLockDoc.setWho("me"); + retLockDoc.setWhy(whyMsg); + // Will be different from the actual lock session id. For testing only. + retLockDoc.setLockID(OID::gen()); + + OID lockSessionIDPassed; + + getMockCatalog()->expectGrabLock( + [this, &lockName, &now, &whyMsg, &lockSessionIDPassed](StringData lockID, + const OID& lockSessionID, + StringData who, + StringData processId, + Date_t time, + StringData why) { ASSERT_EQUALS(lockName, lockID); ASSERT_TRUE(lockSessionID.isSet()); ASSERT_EQUALS(getProcessID(), processId); @@ -200,72 +194,73 @@ namespace { ASSERT_EQUALS(whyMsg, why); lockSessionIDPassed = lockSessionID; - getMockCatalog()->expectNoGrabLock(); // Call only once. - }, retLockDoc); + getMockCatalog()->expectNoGrabLock(); // Call only once. + }, + retLockDoc); - int unlockCallCount = 0; - OID unlockSessionIDPassed; + int unlockCallCount = 0; + OID unlockSessionIDPassed; - { - auto lockStatus = getMgr()->lock(lockName, - whyMsg, - DistLockManager::kDefaultSingleLockAttemptTimeout, - DistLockManager::kDefaultLockRetryInterval); - ASSERT_OK(lockStatus.getStatus()); + { + auto lockStatus = getMgr()->lock(lockName, + whyMsg, + DistLockManager::kDefaultSingleLockAttemptTimeout, + DistLockManager::kDefaultLockRetryInterval); + ASSERT_OK(lockStatus.getStatus()); - getMockCatalog()->expectNoGrabLock(); - getMockCatalog()->expectUnLock( - [&unlockCallCount, &unlockSessionIDPassed](const OID& lockSessionID) { + getMockCatalog()->expectNoGrabLock(); + getMockCatalog()->expectUnLock( + [&unlockCallCount, &unlockSessionIDPassed](const OID& lockSessionID) { unlockCallCount++; unlockSessionIDPassed = lockSessionID; - }, Status::OK()); - } - - ASSERT_EQUALS(1, unlockCallCount); - ASSERT_EQUALS(lockSessionIDPassed, unlockSessionIDPassed); + }, + Status::OK()); } - /** - * Test scenario: - * 1. Grab lock fails up to 3 times. - * 2. Check that each attempt uses a unique lock session id. - * 3. Unlock (on destructor of ScopedDistLock). - * 4. Check lock id used in lock and unlock are the same. - */ - TEST_F(RSDistLockMgrWithMockTickSource, LockSuccessAfterRetry) { - string lockName("test"); - string me("me"); - OID lastTS; - Date_t lastTime(Date_t::now()); - string whyMsg("because"); - - int retryAttempt = 0; - const int kMaxRetryAttempt = 3; - - LocksType goodLockDoc; - goodLockDoc.setName(lockName); - goodLockDoc.setState(LocksType::LOCKED); - goodLockDoc.setProcess(getProcessID()); - goodLockDoc.setWho("me"); - goodLockDoc.setWhy(whyMsg); - goodLockDoc.setLockID(OID::gen()); - - getMockCatalog()->expectGrabLock( - [this, - &lockName, - &lastTS, - &me, - &lastTime, - &whyMsg, - &retryAttempt, - &kMaxRetryAttempt, - &goodLockDoc]( - StringData lockID, - const OID& lockSessionID, - StringData who, - StringData processId, - Date_t time, - StringData why) { + ASSERT_EQUALS(1, unlockCallCount); + ASSERT_EQUALS(lockSessionIDPassed, unlockSessionIDPassed); +} + +/** + * Test scenario: + * 1. Grab lock fails up to 3 times. + * 2. Check that each attempt uses a unique lock session id. + * 3. Unlock (on destructor of ScopedDistLock). + * 4. Check lock id used in lock and unlock are the same. + */ +TEST_F(RSDistLockMgrWithMockTickSource, LockSuccessAfterRetry) { + string lockName("test"); + string me("me"); + OID lastTS; + Date_t lastTime(Date_t::now()); + string whyMsg("because"); + + int retryAttempt = 0; + const int kMaxRetryAttempt = 3; + + LocksType goodLockDoc; + goodLockDoc.setName(lockName); + goodLockDoc.setState(LocksType::LOCKED); + goodLockDoc.setProcess(getProcessID()); + goodLockDoc.setWho("me"); + goodLockDoc.setWhy(whyMsg); + goodLockDoc.setLockID(OID::gen()); + + getMockCatalog()->expectGrabLock( + [this, + &lockName, + &lastTS, + &me, + &lastTime, + &whyMsg, + &retryAttempt, + &kMaxRetryAttempt, + &goodLockDoc](StringData lockID, + const OID& lockSessionID, + StringData who, + StringData processId, + Date_t time, + StringData why) { ASSERT_EQUALS(lockName, lockID); // Every attempt should have a unique sesssion ID. ASSERT_NOT_EQUALS(lastTS, lockSessionID); @@ -279,119 +274,107 @@ namespace { getMockTickSource()->advance(Milliseconds(1)); if (++retryAttempt >= kMaxRetryAttempt) { - getMockCatalog()->expectGrabLock([this, - &lockName, - &lastTS, - &me, - &lastTime, - &whyMsg]( - StringData lockID, - const OID& lockSessionID, - StringData who, - StringData processId, - Date_t time, - StringData why) { - ASSERT_EQUALS(lockName, lockID); - // Every attempt should have a unique sesssion ID. - ASSERT_NOT_EQUALS(lastTS, lockSessionID); - lastTS = lockSessionID; - ASSERT_TRUE(lockSessionID.isSet()); - ASSERT_EQUALS(getProcessID(), processId); - ASSERT_GREATER_THAN_OR_EQUALS(time, lastTime); - ASSERT_EQUALS(whyMsg, why); - - getMockCatalog()->expectNoGrabLock(); - - getMockCatalog()->expectGetLockByName([](StringData name) { - FAIL("should not attempt to overtake lock after successful lock"); - }, LocksType()); - }, goodLockDoc); + getMockCatalog()->expectGrabLock( + [this, &lockName, &lastTS, &me, &lastTime, &whyMsg](StringData lockID, + const OID& lockSessionID, + StringData who, + StringData processId, + Date_t time, + StringData why) { + ASSERT_EQUALS(lockName, lockID); + // Every attempt should have a unique sesssion ID. + ASSERT_NOT_EQUALS(lastTS, lockSessionID); + lastTS = lockSessionID; + ASSERT_TRUE(lockSessionID.isSet()); + ASSERT_EQUALS(getProcessID(), processId); + ASSERT_GREATER_THAN_OR_EQUALS(time, lastTime); + ASSERT_EQUALS(whyMsg, why); + + getMockCatalog()->expectNoGrabLock(); + + getMockCatalog()->expectGetLockByName([](StringData name) { + FAIL("should not attempt to overtake lock after successful lock"); + }, LocksType()); + }, + goodLockDoc); } - }, {ErrorCodes::LockStateChangeFailed, "nMod 0"}); + }, + {ErrorCodes::LockStateChangeFailed, "nMod 0"}); - // - // Setup mock for lock overtaking. - // + // + // Setup mock for lock overtaking. + // - LocksType currentLockDoc; - currentLockDoc.setName("test"); - currentLockDoc.setState(LocksType::LOCKED); - currentLockDoc.setProcess("otherProcess"); - currentLockDoc.setLockID(OID::gen()); - currentLockDoc.setWho("me"); - currentLockDoc.setWhy("why"); + LocksType currentLockDoc; + currentLockDoc.setName("test"); + currentLockDoc.setState(LocksType::LOCKED); + currentLockDoc.setProcess("otherProcess"); + currentLockDoc.setLockID(OID::gen()); + currentLockDoc.setWho("me"); + currentLockDoc.setWhy("why"); - getMockCatalog()->expectGetLockByName([](StringData name) { - ASSERT_EQUALS("test", name); - }, currentLockDoc); + getMockCatalog()->expectGetLockByName([](StringData name) { ASSERT_EQUALS("test", name); }, + currentLockDoc); - LockpingsType pingDoc; - pingDoc.setProcess("otherProcess"); - pingDoc.setPing(Date_t()); + LockpingsType pingDoc; + pingDoc.setProcess("otherProcess"); + pingDoc.setPing(Date_t()); - getMockCatalog()->expectGetPing([](StringData process) { - ASSERT_EQUALS("otherProcess", process); - }, pingDoc); + getMockCatalog()->expectGetPing( + [](StringData process) { ASSERT_EQUALS("otherProcess", process); }, pingDoc); - // Config server time is fixed, so overtaking will never succeed. - getMockCatalog()->expectGetServerInfo([]() { - }, DistLockCatalog::ServerInfo(Date_t(), OID())); + // Config server time is fixed, so overtaking will never succeed. + getMockCatalog()->expectGetServerInfo([]() {}, DistLockCatalog::ServerInfo(Date_t(), OID())); - // - // Try grabbing lock. - // + // + // Try grabbing lock. + // - int unlockCallCount = 0; - OID unlockSessionIDPassed; + int unlockCallCount = 0; + OID unlockSessionIDPassed; - { - auto lockStatus = getMgr()->lock(lockName, whyMsg, Milliseconds(10), Milliseconds(1)); - ASSERT_OK(lockStatus.getStatus()); + { + auto lockStatus = getMgr()->lock(lockName, whyMsg, Milliseconds(10), Milliseconds(1)); + ASSERT_OK(lockStatus.getStatus()); - getMockCatalog()->expectNoGrabLock(); - getMockCatalog()->expectUnLock( - [&unlockCallCount, &unlockSessionIDPassed](const OID& lockSessionID) { + getMockCatalog()->expectNoGrabLock(); + getMockCatalog()->expectUnLock( + [&unlockCallCount, &unlockSessionIDPassed](const OID& lockSessionID) { unlockCallCount++; unlockSessionIDPassed = lockSessionID; - }, Status::OK()); - } - - ASSERT_EQUALS(1, unlockCallCount); - ASSERT_EQUALS(lastTS, unlockSessionIDPassed); + }, + Status::OK()); } - /** - * Test scenario: - * 1. Grab lock fails up to 3 times. - * 2. Check that each attempt uses a unique lock session id. - * 3. Grab lock errors out on the fourth try. - * 4. Make sure that unlock is called to cleanup the last lock attempted that error out. - */ - TEST_F(RSDistLockMgrWithMockTickSource, LockFailsAfterRetry) { - string lockName("test"); - string me("me"); - OID lastTS; - Date_t lastTime(Date_t::now()); - string whyMsg("because"); - - int retryAttempt = 0; - const int kMaxRetryAttempt = 3; - - getMockCatalog()->expectGrabLock( - [this, - &lockName, - &lastTS, - &me, - &lastTime, - &whyMsg, - &retryAttempt, - &kMaxRetryAttempt]( - StringData lockID, - const OID& lockSessionID, - StringData who, - StringData processId, - Date_t time, - StringData why) { + ASSERT_EQUALS(1, unlockCallCount); + ASSERT_EQUALS(lastTS, unlockSessionIDPassed); +} + +/** + * Test scenario: + * 1. Grab lock fails up to 3 times. + * 2. Check that each attempt uses a unique lock session id. + * 3. Grab lock errors out on the fourth try. + * 4. Make sure that unlock is called to cleanup the last lock attempted that error out. + */ +TEST_F(RSDistLockMgrWithMockTickSource, LockFailsAfterRetry) { + string lockName("test"); + string me("me"); + OID lastTS; + Date_t lastTime(Date_t::now()); + string whyMsg("because"); + + int retryAttempt = 0; + const int kMaxRetryAttempt = 3; + + getMockCatalog()->expectGrabLock( + [this, &lockName, &lastTS, &me, &lastTime, &whyMsg, &retryAttempt, &kMaxRetryAttempt]( + StringData lockID, + const OID& lockSessionID, + StringData who, + StringData processId, + Date_t time, + StringData why) { ASSERT_EQUALS(lockName, lockID); // Every attempt should have a unique sesssion ID. ASSERT_NOT_EQUALS(lastTS, lockSessionID); @@ -405,124 +388,116 @@ namespace { getMockTickSource()->advance(Milliseconds(1)); if (++retryAttempt >= kMaxRetryAttempt) { - getMockCatalog()->expectGrabLock([this, - &lockName, - &lastTS, - &me, - &lastTime, - &whyMsg]( - StringData lockID, - const OID& lockSessionID, - StringData who, - StringData processId, - Date_t time, - StringData why) { - ASSERT_EQUALS(lockName, lockID); - // Every attempt should have a unique sesssion ID. - ASSERT_NOT_EQUALS(lastTS, lockSessionID); - lastTS = lockSessionID; - ASSERT_TRUE(lockSessionID.isSet()); - ASSERT_EQUALS(getProcessID(), processId); - ASSERT_GREATER_THAN_OR_EQUALS(time, lastTime); - ASSERT_EQUALS(whyMsg, why); - - getMockCatalog()->expectNoGrabLock(); - }, {ErrorCodes::NetworkTimeout, "bad test network"}); + getMockCatalog()->expectGrabLock( + [this, &lockName, &lastTS, &me, &lastTime, &whyMsg](StringData lockID, + const OID& lockSessionID, + StringData who, + StringData processId, + Date_t time, + StringData why) { + ASSERT_EQUALS(lockName, lockID); + // Every attempt should have a unique sesssion ID. + ASSERT_NOT_EQUALS(lastTS, lockSessionID); + lastTS = lockSessionID; + ASSERT_TRUE(lockSessionID.isSet()); + ASSERT_EQUALS(getProcessID(), processId); + ASSERT_GREATER_THAN_OR_EQUALS(time, lastTime); + ASSERT_EQUALS(whyMsg, why); + + getMockCatalog()->expectNoGrabLock(); + }, + {ErrorCodes::NetworkTimeout, "bad test network"}); } - }, {ErrorCodes::LockStateChangeFailed, "nMod 0"}); + }, + {ErrorCodes::LockStateChangeFailed, "nMod 0"}); - // Make mock return lock not found to skip lock overtaking. - getMockCatalog()->expectGetLockByName([](StringData) {}, - {ErrorCodes::LockNotFound, "not found!"}); + // Make mock return lock not found to skip lock overtaking. + getMockCatalog()->expectGetLockByName([](StringData) {}, + {ErrorCodes::LockNotFound, "not found!"}); - stdx::mutex unlockMutex; - stdx::condition_variable unlockCV; - OID unlockSessionIDPassed; - int unlockCallCount = 0; + stdx::mutex unlockMutex; + stdx::condition_variable unlockCV; + OID unlockSessionIDPassed; + int unlockCallCount = 0; - getMockCatalog()->expectUnLock( - [&unlockMutex, &unlockCV, &unlockCallCount, &unlockSessionIDPassed]( - const OID& lockSessionID) { + getMockCatalog()->expectUnLock( + [&unlockMutex, &unlockCV, &unlockCallCount, &unlockSessionIDPassed]( + const OID& lockSessionID) { stdx::unique_lock<stdx::mutex> lk(unlockMutex); unlockCallCount++; unlockSessionIDPassed = lockSessionID; unlockCV.notify_all(); - }, Status::OK()); + }, + Status::OK()); - { - auto lockStatus = getMgr()->lock(lockName, whyMsg, Milliseconds(10), Milliseconds(1)); - ASSERT_NOT_OK(lockStatus.getStatus()); - } + { + auto lockStatus = getMgr()->lock(lockName, whyMsg, Milliseconds(10), Milliseconds(1)); + ASSERT_NOT_OK(lockStatus.getStatus()); + } - bool didTimeout = false; - { - stdx::unique_lock<stdx::mutex> lk(unlockMutex); - if (unlockCallCount == 0) { - didTimeout = unlockCV.wait_for(lk, kUnlockTimeout) == stdx::cv_status::timeout; - } + bool didTimeout = false; + { + stdx::unique_lock<stdx::mutex> lk(unlockMutex); + if (unlockCallCount == 0) { + didTimeout = unlockCV.wait_for(lk, kUnlockTimeout) == stdx::cv_status::timeout; } - - // Join the background thread before trying to call asserts. Shutdown calls - // stopPing and we don't care in this test. - getMockCatalog()->expectStopPing([](StringData){}, Status::OK()); - getMgr()->shutDown(); - - // No assert until shutDown has been called to make sure that the background thread - // won't be trying to access the local variables that were captured by lamdas that - // may have gone out of scope when the assert unwinds the stack. - // No need to grab unlockMutex since there is only one thread running at this point. - - ASSERT_FALSE(didTimeout); - ASSERT_EQUALS(1, unlockCallCount); - ASSERT_EQUALS(lastTS, unlockSessionIDPassed); } - TEST_F(ReplSetDistLockManagerFixture, LockBusyNoRetry) { - getMockCatalog()->expectGrabLock([this]( - StringData, const OID&, StringData, StringData, Date_t, StringData) { - getMockCatalog()->expectNoGrabLock(); // Call only once. - }, {ErrorCodes::LockStateChangeFailed, "nMod 0"}); + // Join the background thread before trying to call asserts. Shutdown calls + // stopPing and we don't care in this test. + getMockCatalog()->expectStopPing([](StringData) {}, Status::OK()); + getMgr()->shutDown(); + + // No assert until shutDown has been called to make sure that the background thread + // won't be trying to access the local variables that were captured by lamdas that + // may have gone out of scope when the assert unwinds the stack. + // No need to grab unlockMutex since there is only one thread running at this point. + + ASSERT_FALSE(didTimeout); + ASSERT_EQUALS(1, unlockCallCount); + ASSERT_EQUALS(lastTS, unlockSessionIDPassed); +} + +TEST_F(ReplSetDistLockManagerFixture, LockBusyNoRetry) { + getMockCatalog()->expectGrabLock( + [this](StringData, const OID&, StringData, StringData, Date_t, StringData) { + getMockCatalog()->expectNoGrabLock(); // Call only once. + }, + {ErrorCodes::LockStateChangeFailed, "nMod 0"}); + + // Make mock return lock not found to skip lock overtaking. + getMockCatalog()->expectGetLockByName([](StringData) {}, + {ErrorCodes::LockNotFound, "not found!"}); + + auto status = getMgr()->lock("", "", Milliseconds(0), Milliseconds(0)).getStatus(); + ASSERT_NOT_OK(status); + ASSERT_EQUALS(ErrorCodes::LockBusy, status.code()); +} - // Make mock return lock not found to skip lock overtaking. - getMockCatalog()->expectGetLockByName([](StringData) {}, - {ErrorCodes::LockNotFound, "not found!"}); - - auto status = getMgr()->lock("", "", Milliseconds(0), Milliseconds(0)).getStatus(); - ASSERT_NOT_OK(status); - ASSERT_EQUALS(ErrorCodes::LockBusy, status.code()); - } - - /** - * Test scenario: - * 1. Attempt to grab lock. - * 2. Check that each attempt uses a unique lock session id. - * 3. Times out trying. - * 4. Checks result is error. - * 5. Implicitly check that unlock is not called (default setting of mock catalog). - */ - TEST_F(RSDistLockMgrWithMockTickSource, LockRetryTimeout) { - string lockName("test"); - string me("me"); - OID lastTS; - Date_t lastTime(Date_t::now()); - string whyMsg("because"); - - int retryAttempt = 0; - - getMockCatalog()->expectGrabLock( - [this, - &lockName, - &lastTS, - &me, - &lastTime, - &whyMsg, - &retryAttempt]( - StringData lockID, - const OID& lockSessionID, - StringData who, - StringData processId, - Date_t time, - StringData why) { +/** + * Test scenario: + * 1. Attempt to grab lock. + * 2. Check that each attempt uses a unique lock session id. + * 3. Times out trying. + * 4. Checks result is error. + * 5. Implicitly check that unlock is not called (default setting of mock catalog). + */ +TEST_F(RSDistLockMgrWithMockTickSource, LockRetryTimeout) { + string lockName("test"); + string me("me"); + OID lastTS; + Date_t lastTime(Date_t::now()); + string whyMsg("because"); + + int retryAttempt = 0; + + getMockCatalog()->expectGrabLock( + [this, &lockName, &lastTS, &me, &lastTime, &whyMsg, &retryAttempt](StringData lockID, + const OID& lockSessionID, + StringData who, + StringData processId, + Date_t time, + StringData why) { ASSERT_EQUALS(lockName, lockID); // Every attempt should have a unique sesssion ID. ASSERT_NOT_EQUALS(lastTS, lockSessionID); @@ -535,47 +510,41 @@ namespace { retryAttempt++; getMockTickSource()->advance(Milliseconds(1)); - }, {ErrorCodes::LockStateChangeFailed, "nMod 0"}); + }, + {ErrorCodes::LockStateChangeFailed, "nMod 0"}); - // Make mock return lock not found to skip lock overtaking. - getMockCatalog()->expectGetLockByName([](StringData) {}, - {ErrorCodes::LockNotFound, "not found!"}); + // Make mock return lock not found to skip lock overtaking. + getMockCatalog()->expectGetLockByName([](StringData) {}, + {ErrorCodes::LockNotFound, "not found!"}); - auto lockStatus = getMgr()->lock(lockName, - whyMsg, - Milliseconds(5), - Milliseconds(1)).getStatus(); - ASSERT_NOT_OK(lockStatus); + auto lockStatus = + getMgr()->lock(lockName, whyMsg, Milliseconds(5), Milliseconds(1)).getStatus(); + ASSERT_NOT_OK(lockStatus); - ASSERT_EQUALS(ErrorCodes::LockBusy, lockStatus.code()); - ASSERT_GREATER_THAN(retryAttempt, 1); - } + ASSERT_EQUALS(ErrorCodes::LockBusy, lockStatus.code()); + ASSERT_GREATER_THAN(retryAttempt, 1); +} - /** - * Test scenario: - * 1. Set mock to error on grab lock. - * 2. Grab lock attempted. - * 3. Wait for unlock to be called. - * 4. Check that lockSessionID used on all unlock is the same as the one used to grab lock. - */ - TEST_F(ReplSetDistLockManagerFixture, MustUnlockOnLockError) { - string lockName("test"); - string me("me"); - OID lastTS; - string whyMsg("because"); - - getMockCatalog()->expectGrabLock( - [this, - &lockName, - &lastTS, - &me, - &whyMsg]( - StringData lockID, - const OID& lockSessionID, - StringData who, - StringData processId, - Date_t time, - StringData why) { +/** + * Test scenario: + * 1. Set mock to error on grab lock. + * 2. Grab lock attempted. + * 3. Wait for unlock to be called. + * 4. Check that lockSessionID used on all unlock is the same as the one used to grab lock. + */ +TEST_F(ReplSetDistLockManagerFixture, MustUnlockOnLockError) { + string lockName("test"); + string me("me"); + OID lastTS; + string whyMsg("because"); + + getMockCatalog()->expectGrabLock( + [this, &lockName, &lastTS, &me, &whyMsg](StringData lockID, + const OID& lockSessionID, + StringData who, + StringData processId, + Date_t time, + StringData why) { ASSERT_EQUALS(lockName, lockID); // Every attempt should have a unique sesssion ID. ASSERT_TRUE(lockSessionID.isSet()); @@ -584,69 +553,69 @@ namespace { lastTS = lockSessionID; getMockCatalog()->expectNoGrabLock(); - }, {ErrorCodes::NetworkTimeout, "bad test network"}); + }, + {ErrorCodes::NetworkTimeout, "bad test network"}); - stdx::mutex unlockMutex; - stdx::condition_variable unlockCV; - int unlockCallCount = 0; - OID unlockSessionIDPassed; + stdx::mutex unlockMutex; + stdx::condition_variable unlockCV; + int unlockCallCount = 0; + OID unlockSessionIDPassed; - getMockCatalog()->expectUnLock( - [&unlockMutex, &unlockCV, &unlockCallCount, &unlockSessionIDPassed]( - const OID& lockSessionID) { + getMockCatalog()->expectUnLock( + [&unlockMutex, &unlockCV, &unlockCallCount, &unlockSessionIDPassed]( + const OID& lockSessionID) { stdx::unique_lock<stdx::mutex> lk(unlockMutex); unlockCallCount++; unlockSessionIDPassed = lockSessionID; unlockCV.notify_all(); - }, Status::OK()); - - auto lockStatus = getMgr()->lock(lockName, - whyMsg, - Milliseconds(10), - Milliseconds(1)).getStatus(); - ASSERT_NOT_OK(lockStatus); - ASSERT_EQUALS(ErrorCodes::NetworkTimeout, lockStatus.code()); - - bool didTimeout = false; - { - stdx::unique_lock<stdx::mutex> lk(unlockMutex); - if (unlockCallCount == 0) { - didTimeout = unlockCV.wait_for(lk, kUnlockTimeout) == stdx::cv_status::timeout; - } + }, + Status::OK()); + + auto lockStatus = + getMgr()->lock(lockName, whyMsg, Milliseconds(10), Milliseconds(1)).getStatus(); + ASSERT_NOT_OK(lockStatus); + ASSERT_EQUALS(ErrorCodes::NetworkTimeout, lockStatus.code()); + + bool didTimeout = false; + { + stdx::unique_lock<stdx::mutex> lk(unlockMutex); + if (unlockCallCount == 0) { + didTimeout = unlockCV.wait_for(lk, kUnlockTimeout) == stdx::cv_status::timeout; } + } - // Join the background thread before trying to call asserts. Shutdown calls - // stopPing and we don't care in this test. - getMockCatalog()->expectStopPing([](StringData){}, Status::OK()); - getMgr()->shutDown(); + // Join the background thread before trying to call asserts. Shutdown calls + // stopPing and we don't care in this test. + getMockCatalog()->expectStopPing([](StringData) {}, Status::OK()); + getMgr()->shutDown(); - // No assert until shutDown has been called to make sure that the background thread - // won't be trying to access the local variables that were captured by lamdas that - // may have gone out of scope when the assert unwinds the stack. - // No need to grab unlockMutex since there is only one thread running at this point. + // No assert until shutDown has been called to make sure that the background thread + // won't be trying to access the local variables that were captured by lamdas that + // may have gone out of scope when the assert unwinds the stack. + // No need to grab unlockMutex since there is only one thread running at this point. - ASSERT_FALSE(didTimeout); - ASSERT_EQUALS(1, unlockCallCount); - ASSERT_EQUALS(lastTS, unlockSessionIDPassed); - } + ASSERT_FALSE(didTimeout); + ASSERT_EQUALS(1, unlockCallCount); + ASSERT_EQUALS(lastTS, unlockSessionIDPassed); +} - /** - * Test scenario: - * 1. Ping thread started during setUp of fixture. - * 2. Wait until ping was called at least 3 times. - * 3. Check that correct process is being pinged. - * 4. Check that ping values are unique (based on the assumption that the system - * clock supports 2ms granularity). - */ - TEST_F(ReplSetDistLockManagerFixture, LockPinging) { - stdx::mutex testMutex; - stdx::condition_variable ping3TimesCV; - vector<Date_t> pingValues; - vector<string> processIDList; - - getMockCatalog()->expectPing( - [&testMutex, &ping3TimesCV, &processIDList, &pingValues]( - StringData processIDArg, Date_t ping) { +/** + * Test scenario: + * 1. Ping thread started during setUp of fixture. + * 2. Wait until ping was called at least 3 times. + * 3. Check that correct process is being pinged. + * 4. Check that ping values are unique (based on the assumption that the system + * clock supports 2ms granularity). + */ +TEST_F(ReplSetDistLockManagerFixture, LockPinging) { + stdx::mutex testMutex; + stdx::condition_variable ping3TimesCV; + vector<Date_t> pingValues; + vector<string> processIDList; + + getMockCatalog()->expectPing( + [&testMutex, &ping3TimesCV, &processIDList, &pingValues](StringData processIDArg, + Date_t ping) { stdx::lock_guard<stdx::mutex> lk(testMutex); processIDList.push_back(processIDArg.toString()); pingValues.push_back(ping); @@ -654,1152 +623,1127 @@ namespace { if (processIDList.size() >= 3) { ping3TimesCV.notify_all(); } - }, Status::OK()); - - bool didTimeout = false; - { - stdx::unique_lock<stdx::mutex> lk(testMutex); - if (processIDList.size() < 3) { - didTimeout = ping3TimesCV.wait_for(lk, Milliseconds(50)) == - stdx::cv_status::timeout; - } + }, + Status::OK()); + + bool didTimeout = false; + { + stdx::unique_lock<stdx::mutex> lk(testMutex); + if (processIDList.size() < 3) { + didTimeout = ping3TimesCV.wait_for(lk, Milliseconds(50)) == stdx::cv_status::timeout; } + } - // Join the background thread before trying to call asserts. Shutdown calls - // stopPing and we don't care in this test. - getMockCatalog()->expectStopPing([](StringData){}, Status::OK()); - getMgr()->shutDown(); + // Join the background thread before trying to call asserts. Shutdown calls + // stopPing and we don't care in this test. + getMockCatalog()->expectStopPing([](StringData) {}, Status::OK()); + getMgr()->shutDown(); - // No assert until shutDown has been called to make sure that the background thread - // won't be trying to access the local variables that were captured by lamdas that - // may have gone out of scope when the assert unwinds the stack. - // No need to grab testMutex since there is only one thread running at this point. + // No assert until shutDown has been called to make sure that the background thread + // won't be trying to access the local variables that were captured by lamdas that + // may have gone out of scope when the assert unwinds the stack. + // No need to grab testMutex since there is only one thread running at this point. - ASSERT_FALSE(didTimeout); + ASSERT_FALSE(didTimeout); - Date_t lastPing; - for (const auto& ping : pingValues) { - ASSERT_NOT_EQUALS(lastPing, ping); - lastPing = ping; - } - - for (const auto& processIDArg : processIDList) { - ASSERT_EQUALS(getProcessID(), processIDArg); - } + Date_t lastPing; + for (const auto& ping : pingValues) { + ASSERT_NOT_EQUALS(lastPing, ping); + lastPing = ping; } - /** - * Test scenario: - * 1. Grab lock. - * 2. Unlock fails 3 times. - * 3. Unlock finally succeeds at the 4th time. - * 4. Check that lockSessionID used on all unlock is the same as the one used to grab lock. - */ - TEST_F(ReplSetDistLockManagerFixture, UnlockUntilNoError) { - stdx::mutex unlockMutex; - stdx::condition_variable unlockCV; - const unsigned int kUnlockErrorCount = 3; - vector<OID> lockSessionIDPassed; + for (const auto& processIDArg : processIDList) { + ASSERT_EQUALS(getProcessID(), processIDArg); + } +} - getMockCatalog()->expectUnLock( - [this, &unlockMutex, &unlockCV, &kUnlockErrorCount, &lockSessionIDPassed]( - const OID& lockSessionID) { +/** + * Test scenario: + * 1. Grab lock. + * 2. Unlock fails 3 times. + * 3. Unlock finally succeeds at the 4th time. + * 4. Check that lockSessionID used on all unlock is the same as the one used to grab lock. + */ +TEST_F(ReplSetDistLockManagerFixture, UnlockUntilNoError) { + stdx::mutex unlockMutex; + stdx::condition_variable unlockCV; + const unsigned int kUnlockErrorCount = 3; + vector<OID> lockSessionIDPassed; + + getMockCatalog()->expectUnLock( + [this, &unlockMutex, &unlockCV, &kUnlockErrorCount, &lockSessionIDPassed]( + const OID& lockSessionID) { stdx::unique_lock<stdx::mutex> lk(unlockMutex); lockSessionIDPassed.push_back(lockSessionID); if (lockSessionIDPassed.size() >= kUnlockErrorCount) { getMockCatalog()->expectUnLock( - [&lockSessionIDPassed, &unlockMutex, &unlockCV]( - const OID& lockSessionID) { - stdx::unique_lock<stdx::mutex> lk(unlockMutex); - lockSessionIDPassed.push_back(lockSessionID); - unlockCV.notify_all(); - }, Status::OK()); - } - }, {ErrorCodes::NetworkTimeout, "bad test network"}); - - OID lockSessionID; - LocksType retLockDoc; - retLockDoc.setName("test"); - retLockDoc.setState(LocksType::LOCKED); - retLockDoc.setProcess(getProcessID()); - retLockDoc.setWho("me"); - retLockDoc.setWhy("why"); - // Will be different from the actual lock session id. For testing only. - retLockDoc.setLockID(OID::gen()); - - getMockCatalog()->expectGrabLock([&lockSessionID]( - StringData lockID, - const OID& lockSessionIDArg, - StringData who, - StringData processId, - Date_t time, - StringData why) { - lockSessionID = lockSessionIDArg; - }, retLockDoc); - - { - auto lockStatus = getMgr()->lock("test", "why", Milliseconds(0), Milliseconds(0)); - } - - bool didTimeout = false; - { - stdx::unique_lock<stdx::mutex> lk(unlockMutex); - if (lockSessionIDPassed.size() < kUnlockErrorCount) { - didTimeout = unlockCV.wait_for(lk, kUnlockTimeout) == stdx::cv_status::timeout; + [&lockSessionIDPassed, &unlockMutex, &unlockCV](const OID& lockSessionID) { + stdx::unique_lock<stdx::mutex> lk(unlockMutex); + lockSessionIDPassed.push_back(lockSessionID); + unlockCV.notify_all(); + }, + Status::OK()); } + }, + {ErrorCodes::NetworkTimeout, "bad test network"}); + + OID lockSessionID; + LocksType retLockDoc; + retLockDoc.setName("test"); + retLockDoc.setState(LocksType::LOCKED); + retLockDoc.setProcess(getProcessID()); + retLockDoc.setWho("me"); + retLockDoc.setWhy("why"); + // Will be different from the actual lock session id. For testing only. + retLockDoc.setLockID(OID::gen()); + + getMockCatalog()->expectGrabLock( + [&lockSessionID](StringData lockID, + const OID& lockSessionIDArg, + StringData who, + StringData processId, + Date_t time, + StringData why) { lockSessionID = lockSessionIDArg; }, + retLockDoc); + + { auto lockStatus = getMgr()->lock("test", "why", Milliseconds(0), Milliseconds(0)); } + + bool didTimeout = false; + { + stdx::unique_lock<stdx::mutex> lk(unlockMutex); + if (lockSessionIDPassed.size() < kUnlockErrorCount) { + didTimeout = unlockCV.wait_for(lk, kUnlockTimeout) == stdx::cv_status::timeout; } + } - // Join the background thread before trying to call asserts. Shutdown calls - // stopPing and we don't care in this test. - getMockCatalog()->expectStopPing([](StringData){}, Status::OK()); - getMgr()->shutDown(); + // Join the background thread before trying to call asserts. Shutdown calls + // stopPing and we don't care in this test. + getMockCatalog()->expectStopPing([](StringData) {}, Status::OK()); + getMgr()->shutDown(); - // No assert until shutDown has been called to make sure that the background thread - // won't be trying to access the local variables that were captured by lamdas that - // may have gone out of scope when the assert unwinds the stack. - // No need to grab testMutex since there is only one thread running at this point. + // No assert until shutDown has been called to make sure that the background thread + // won't be trying to access the local variables that were captured by lamdas that + // may have gone out of scope when the assert unwinds the stack. + // No need to grab testMutex since there is only one thread running at this point. - ASSERT_FALSE(didTimeout); + ASSERT_FALSE(didTimeout); - for (const auto& id : lockSessionIDPassed) { - ASSERT_EQUALS(lockSessionID, id); - } + for (const auto& id : lockSessionIDPassed) { + ASSERT_EQUALS(lockSessionID, id); } +} + +/** + * Test scenario: + * 1. Grab 2 locks. + * 2. Trigger unlocks by making ScopedDistLock go out of scope. + * 3. Unlocks fail and will be queued for retry. + * 4. Unlocks will keep on failing until we see at least 3 unique ids being unlocked more + * than once. This implies that both ids have been retried at least 3 times. + * 5. Check that the lock session id used when lock was called matches with unlock. + */ +TEST_F(ReplSetDistLockManagerFixture, MultipleQueuedUnlock) { + stdx::mutex testMutex; + stdx::condition_variable unlockCV; + + vector<OID> lockSessionIDPassed; + map<OID, int> unlockIDMap; // id -> count /** - * Test scenario: - * 1. Grab 2 locks. - * 2. Trigger unlocks by making ScopedDistLock go out of scope. - * 3. Unlocks fail and will be queued for retry. - * 4. Unlocks will keep on failing until we see at least 3 unique ids being unlocked more - * than once. This implies that both ids have been retried at least 3 times. - * 5. Check that the lock session id used when lock was called matches with unlock. + * Returns true if all values in the map are greater than 2. */ - TEST_F(ReplSetDistLockManagerFixture, MultipleQueuedUnlock) { - stdx::mutex testMutex; - stdx::condition_variable unlockCV; - - vector<OID> lockSessionIDPassed; - map<OID, int> unlockIDMap; // id -> count - - /** - * Returns true if all values in the map are greater than 2. - */ - auto mapEntriesGreaterThanTwo = [](const decltype(unlockIDMap)& map) -> bool { - auto iter = find_if(map.begin(), map.end(), - [](const std::remove_reference<decltype(map)>::type::value_type& entry) - -> bool { - return entry.second < 3; - }); - - return iter == map.end(); - }; + auto mapEntriesGreaterThanTwo = [](const decltype(unlockIDMap)& map) -> bool { + auto iter = find_if(map.begin(), + map.end(), + [](const std::remove_reference<decltype(map)>::type::value_type& entry) + -> bool { return entry.second < 3; }); - getMockCatalog()->expectUnLock( - [this, &unlockIDMap, &testMutex, &unlockCV, &mapEntriesGreaterThanTwo]( - const OID& lockSessionID) { + return iter == map.end(); + }; + + getMockCatalog()->expectUnLock( + [this, &unlockIDMap, &testMutex, &unlockCV, &mapEntriesGreaterThanTwo]( + const OID& lockSessionID) { stdx::unique_lock<stdx::mutex> lk(testMutex); unlockIDMap[lockSessionID]++; // Wait until we see at least 2 unique lockSessionID more than twice. if (unlockIDMap.size() >= 2 && mapEntriesGreaterThanTwo(unlockIDMap)) { - getMockCatalog()->expectUnLock( - [&testMutex, &unlockCV](const OID& lockSessionID) { + getMockCatalog()->expectUnLock([&testMutex, &unlockCV](const OID& lockSessionID) { stdx::unique_lock<stdx::mutex> lk(testMutex); unlockCV.notify_all(); }, Status::OK()); } - }, {ErrorCodes::NetworkTimeout, "bad test network"}); - - LocksType retLockDoc; - retLockDoc.setName("test"); - retLockDoc.setState(LocksType::LOCKED); - retLockDoc.setProcess(getProcessID()); - retLockDoc.setWho("me"); - retLockDoc.setWhy("why"); - // Will be different from the actual lock session id. For testing only. - retLockDoc.setLockID(OID::gen()); - - getMockCatalog()->expectGrabLock([&testMutex, &lockSessionIDPassed]( - StringData lockID, - const OID& lockSessionIDArg, - StringData who, - StringData processId, - Date_t time, - StringData why) { + }, + {ErrorCodes::NetworkTimeout, "bad test network"}); + + LocksType retLockDoc; + retLockDoc.setName("test"); + retLockDoc.setState(LocksType::LOCKED); + retLockDoc.setProcess(getProcessID()); + retLockDoc.setWho("me"); + retLockDoc.setWhy("why"); + // Will be different from the actual lock session id. For testing only. + retLockDoc.setLockID(OID::gen()); + + getMockCatalog()->expectGrabLock( + [&testMutex, &lockSessionIDPassed](StringData lockID, + const OID& lockSessionIDArg, + StringData who, + StringData processId, + Date_t time, + StringData why) { stdx::unique_lock<stdx::mutex> lk(testMutex); lockSessionIDPassed.push_back(lockSessionIDArg); - }, retLockDoc); - - { - auto lockStatus = getMgr()->lock("test", "why", Milliseconds(0), Milliseconds(0)); - auto otherStatus = getMgr()->lock("lock", "why", Milliseconds(0), Milliseconds(0)); - } - - bool didTimeout = false; - { - stdx::unique_lock<stdx::mutex> lk(testMutex); - - if (unlockIDMap.size() < 2 || !mapEntriesGreaterThanTwo(unlockIDMap)) { - didTimeout = unlockCV.wait_for(lk, kUnlockTimeout) == stdx::cv_status::timeout; - } - } - - // Join the background thread before trying to call asserts. Shutdown calls - // stopPing and we don't care in this test. - getMockCatalog()->expectStopPing([](StringData){}, Status::OK()); - getMgr()->shutDown(); + }, + retLockDoc); - // No assert until shutDown has been called to make sure that the background thread - // won't be trying to access the local variables that were captured by lamdas that - // may have gone out of scope when the assert unwinds the stack. - // No need to grab testMutex since there is only one thread running at this point. + { + auto lockStatus = getMgr()->lock("test", "why", Milliseconds(0), Milliseconds(0)); + auto otherStatus = getMgr()->lock("lock", "why", Milliseconds(0), Milliseconds(0)); + } - ASSERT_FALSE(didTimeout); - ASSERT_EQUALS(2u, lockSessionIDPassed.size()); + bool didTimeout = false; + { + stdx::unique_lock<stdx::mutex> lk(testMutex); - for (const auto& id : lockSessionIDPassed) { - ASSERT_GREATER_THAN(unlockIDMap[id], 2) - << "lockIDList: " << vectorToString(lockSessionIDPassed) - << ", map: " << mapToString(unlockIDMap); + if (unlockIDMap.size() < 2 || !mapEntriesGreaterThanTwo(unlockIDMap)) { + didTimeout = unlockCV.wait_for(lk, kUnlockTimeout) == stdx::cv_status::timeout; } } - TEST_F(ReplSetDistLockManagerFixture, CleanupPingOnShutdown) { - bool stopPingCalled = false; - getMockCatalog()->expectStopPing([this, & stopPingCalled]( - StringData processID) { - ASSERT_EQUALS(getProcessID(), processID); - stopPingCalled = true; - }, Status::OK()); - - getMgr()->shutDown(); - ASSERT_TRUE(stopPingCalled); - } + // Join the background thread before trying to call asserts. Shutdown calls + // stopPing and we don't care in this test. + getMockCatalog()->expectStopPing([](StringData) {}, Status::OK()); + getMgr()->shutDown(); - TEST_F(ReplSetDistLockManagerFixture, CheckLockStatusOK) { - LocksType retLockDoc; - retLockDoc.setName("test"); - retLockDoc.setState(LocksType::LOCKED); - retLockDoc.setProcess(getProcessID()); - retLockDoc.setWho("me"); - retLockDoc.setWhy("why"); - // Will be different from the actual lock session id. For testing only. - retLockDoc.setLockID(OID::gen()); + // No assert until shutDown has been called to make sure that the background thread + // won't be trying to access the local variables that were captured by lamdas that + // may have gone out of scope when the assert unwinds the stack. + // No need to grab testMutex since there is only one thread running at this point. - OID lockSessionID; + ASSERT_FALSE(didTimeout); + ASSERT_EQUALS(2u, lockSessionIDPassed.size()); - getMockCatalog()->expectGrabLock([&lockSessionID] - (StringData, const OID& ts, StringData, StringData, Date_t, StringData) { + for (const auto& id : lockSessionIDPassed) { + ASSERT_GREATER_THAN(unlockIDMap[id], 2) + << "lockIDList: " << vectorToString(lockSessionIDPassed) + << ", map: " << mapToString(unlockIDMap); + } +} + +TEST_F(ReplSetDistLockManagerFixture, CleanupPingOnShutdown) { + bool stopPingCalled = false; + getMockCatalog()->expectStopPing([this, &stopPingCalled](StringData processID) { + ASSERT_EQUALS(getProcessID(), processID); + stopPingCalled = true; + }, Status::OK()); + + getMgr()->shutDown(); + ASSERT_TRUE(stopPingCalled); +} + +TEST_F(ReplSetDistLockManagerFixture, CheckLockStatusOK) { + LocksType retLockDoc; + retLockDoc.setName("test"); + retLockDoc.setState(LocksType::LOCKED); + retLockDoc.setProcess(getProcessID()); + retLockDoc.setWho("me"); + retLockDoc.setWhy("why"); + // Will be different from the actual lock session id. For testing only. + retLockDoc.setLockID(OID::gen()); + + OID lockSessionID; + + getMockCatalog()->expectGrabLock( + [&lockSessionID](StringData, const OID& ts, StringData, StringData, Date_t, StringData) { lockSessionID = ts; - }, retLockDoc); + }, + retLockDoc); - auto lockStatus = getMgr()->lock("a", "", Milliseconds(0), Milliseconds(0)); - ASSERT_OK(lockStatus.getStatus()); + auto lockStatus = getMgr()->lock("a", "", Milliseconds(0), Milliseconds(0)); + ASSERT_OK(lockStatus.getStatus()); - getMockCatalog()->expectNoGrabLock(); - getMockCatalog()->expectUnLock([](const OID&) { + getMockCatalog()->expectNoGrabLock(); + getMockCatalog()->expectUnLock( + [](const OID&) { // Don't care - }, Status::OK()); + }, + Status::OK()); - auto& scopedLock = lockStatus.getValue(); + auto& scopedLock = lockStatus.getValue(); - getMockCatalog()->expectNoGrabLock(); - getMockCatalog()->expectGetLockByTS([&lockSessionID](const OID& ts) { - ASSERT_EQUALS(lockSessionID, ts); - }, retLockDoc); + getMockCatalog()->expectNoGrabLock(); + getMockCatalog()->expectGetLockByTS( + [&lockSessionID](const OID& ts) { ASSERT_EQUALS(lockSessionID, ts); }, retLockDoc); - ASSERT_OK(scopedLock.checkStatus()); - } + ASSERT_OK(scopedLock.checkStatus()); +} - TEST_F(ReplSetDistLockManagerFixture, CheckLockStatusNoLongerOwn) { - LocksType retLockDoc; - retLockDoc.setName("test"); - retLockDoc.setState(LocksType::LOCKED); - retLockDoc.setProcess(getProcessID()); - retLockDoc.setWho("me"); - retLockDoc.setWhy("why"); - // Will be different from the actual lock session id. For testing only. - retLockDoc.setLockID(OID::gen()); +TEST_F(ReplSetDistLockManagerFixture, CheckLockStatusNoLongerOwn) { + LocksType retLockDoc; + retLockDoc.setName("test"); + retLockDoc.setState(LocksType::LOCKED); + retLockDoc.setProcess(getProcessID()); + retLockDoc.setWho("me"); + retLockDoc.setWhy("why"); + // Will be different from the actual lock session id. For testing only. + retLockDoc.setLockID(OID::gen()); - OID lockSessionID; + OID lockSessionID; - getMockCatalog()->expectGrabLock([&lockSessionID] - (StringData, const OID& ts, StringData, StringData, Date_t, StringData) { + getMockCatalog()->expectGrabLock( + [&lockSessionID](StringData, const OID& ts, StringData, StringData, Date_t, StringData) { lockSessionID = ts; - }, retLockDoc); + }, + retLockDoc); - auto lockStatus = getMgr()->lock("a", "", Milliseconds(0), Milliseconds(0)); - ASSERT_OK(lockStatus.getStatus()); + auto lockStatus = getMgr()->lock("a", "", Milliseconds(0), Milliseconds(0)); + ASSERT_OK(lockStatus.getStatus()); - getMockCatalog()->expectNoGrabLock(); - getMockCatalog()->expectUnLock([](const OID&) { + getMockCatalog()->expectNoGrabLock(); + getMockCatalog()->expectUnLock( + [](const OID&) { // Don't care - }, Status::OK()); + }, + Status::OK()); - auto& scopedLock = lockStatus.getValue(); + auto& scopedLock = lockStatus.getValue(); - getMockCatalog()->expectNoGrabLock(); - getMockCatalog()->expectGetLockByTS([&lockSessionID](const OID& ts) { - ASSERT_EQUALS(lockSessionID, ts); - }, {ErrorCodes::LockNotFound, "no lock"}); + getMockCatalog()->expectNoGrabLock(); + getMockCatalog()->expectGetLockByTS([&lockSessionID](const OID& ts) { + ASSERT_EQUALS(lockSessionID, ts); + }, {ErrorCodes::LockNotFound, "no lock"}); - ASSERT_NOT_OK(scopedLock.checkStatus()); - } + ASSERT_NOT_OK(scopedLock.checkStatus()); +} - TEST_F(ReplSetDistLockManagerFixture, CheckLockStatusError) { - LocksType retLockDoc; - retLockDoc.setName("test"); - retLockDoc.setState(LocksType::LOCKED); - retLockDoc.setProcess(getProcessID()); - retLockDoc.setWho("me"); - retLockDoc.setWhy("why"); - // Will be different from the actual lock session id. For testing only. - retLockDoc.setLockID(OID::gen()); +TEST_F(ReplSetDistLockManagerFixture, CheckLockStatusError) { + LocksType retLockDoc; + retLockDoc.setName("test"); + retLockDoc.setState(LocksType::LOCKED); + retLockDoc.setProcess(getProcessID()); + retLockDoc.setWho("me"); + retLockDoc.setWhy("why"); + // Will be different from the actual lock session id. For testing only. + retLockDoc.setLockID(OID::gen()); - OID lockSessionID; + OID lockSessionID; - getMockCatalog()->expectGrabLock([&lockSessionID] - (StringData, const OID& ts, StringData, StringData, Date_t, StringData) { + getMockCatalog()->expectGrabLock( + [&lockSessionID](StringData, const OID& ts, StringData, StringData, Date_t, StringData) { lockSessionID = ts; - }, retLockDoc); + }, + retLockDoc); - auto lockStatus = getMgr()->lock("a", "", Milliseconds(0), Milliseconds(0)); - ASSERT_OK(lockStatus.getStatus()); + auto lockStatus = getMgr()->lock("a", "", Milliseconds(0), Milliseconds(0)); + ASSERT_OK(lockStatus.getStatus()); - getMockCatalog()->expectNoGrabLock(); - getMockCatalog()->expectUnLock([](const OID&) { + getMockCatalog()->expectNoGrabLock(); + getMockCatalog()->expectUnLock( + [](const OID&) { // Don't care - }, Status::OK()); + }, + Status::OK()); - auto& scopedLock = lockStatus.getValue(); + auto& scopedLock = lockStatus.getValue(); - getMockCatalog()->expectNoGrabLock(); - getMockCatalog()->expectGetLockByTS([&lockSessionID](const OID& ts) { - ASSERT_EQUALS(lockSessionID, ts); - }, {ErrorCodes::NetworkTimeout, "bad test network"}); + getMockCatalog()->expectNoGrabLock(); + getMockCatalog()->expectGetLockByTS([&lockSessionID](const OID& ts) { + ASSERT_EQUALS(lockSessionID, ts); + }, {ErrorCodes::NetworkTimeout, "bad test network"}); - ASSERT_NOT_OK(scopedLock.checkStatus()); - } + ASSERT_NOT_OK(scopedLock.checkStatus()); +} - /** - * Test scenario: - * 1. Attempt to grab lock fails because lock is already owned. - * 2. Try to get ping data and config server clock. - * 3. Since we don't have previous ping data to compare with, we cannot - * decide whether it's ok to overtake, so we can't. - * 4. Lock expiration has elapsed and the ping has not been updated since. - * 5. 2nd attempt to grab lock still fails for the same reason. - * 6. But since the ping is not fresh anymore, dist lock manager should overtake lock. - */ - TEST_F(ReplSetDistLockManagerFixture, BasicLockOvertaking) { - OID lastTS; +/** + * Test scenario: + * 1. Attempt to grab lock fails because lock is already owned. + * 2. Try to get ping data and config server clock. + * 3. Since we don't have previous ping data to compare with, we cannot + * decide whether it's ok to overtake, so we can't. + * 4. Lock expiration has elapsed and the ping has not been updated since. + * 5. 2nd attempt to grab lock still fails for the same reason. + * 6. But since the ping is not fresh anymore, dist lock manager should overtake lock. + */ +TEST_F(ReplSetDistLockManagerFixture, BasicLockOvertaking) { + OID lastTS; - getMockCatalog()->expectGrabLock([&lastTS]( - StringData, const OID& lockSessionID, StringData, StringData, Date_t, StringData) { + getMockCatalog()->expectGrabLock( + [&lastTS]( + StringData, const OID& lockSessionID, StringData, StringData, Date_t, StringData) { lastTS = lockSessionID; - }, {ErrorCodes::LockStateChangeFailed, "nMod 0"}); - - LocksType currentLockDoc; - currentLockDoc.setName("bar"); - currentLockDoc.setState(LocksType::LOCKED); - currentLockDoc.setProcess("otherProcess"); - currentLockDoc.setLockID(OID("5572007fda9e476582bf3716")); - currentLockDoc.setWho("me"); - currentLockDoc.setWhy("why"); - - getMockCatalog()->expectGetLockByName([](StringData name) { - ASSERT_EQUALS("bar", name); - }, currentLockDoc); - - LockpingsType pingDoc; - pingDoc.setProcess("otherProcess"); - pingDoc.setPing(Date_t()); - - getMockCatalog()->expectGetPing([](StringData process) { - ASSERT_EQUALS("otherProcess", process); - }, pingDoc); - - getMockCatalog()->expectGetServerInfo([]() { - }, DistLockCatalog::ServerInfo(Date_t(), OID())); - - // First attempt will record the ping data. - { - auto status = getMgr()->lock("bar", "", Milliseconds(0), Milliseconds(0)).getStatus(); - ASSERT_NOT_OK(status); - ASSERT_EQUALS(ErrorCodes::LockBusy, status.code()); - } + }, + {ErrorCodes::LockStateChangeFailed, "nMod 0"}); + + LocksType currentLockDoc; + currentLockDoc.setName("bar"); + currentLockDoc.setState(LocksType::LOCKED); + currentLockDoc.setProcess("otherProcess"); + currentLockDoc.setLockID(OID("5572007fda9e476582bf3716")); + currentLockDoc.setWho("me"); + currentLockDoc.setWhy("why"); + + getMockCatalog()->expectGetLockByName([](StringData name) { ASSERT_EQUALS("bar", name); }, + currentLockDoc); + + LockpingsType pingDoc; + pingDoc.setProcess("otherProcess"); + pingDoc.setPing(Date_t()); + + getMockCatalog()->expectGetPing( + [](StringData process) { ASSERT_EQUALS("otherProcess", process); }, pingDoc); - // Advance config server time to exceed lock expiration. - getMockCatalog()->expectGetServerInfo([]() { - }, DistLockCatalog::ServerInfo(Date_t() + kLockExpiration + Milliseconds(1), OID())); - - getMockCatalog()->expectOvertakeLock( - [this, &lastTS, ¤tLockDoc] - (StringData lockID, - const OID& lockSessionID, - const OID& currentHolderTS, - StringData who, - StringData processId, - Date_t time, - StringData why) { + getMockCatalog()->expectGetServerInfo([]() {}, DistLockCatalog::ServerInfo(Date_t(), OID())); + + // First attempt will record the ping data. + { + auto status = getMgr()->lock("bar", "", Milliseconds(0), Milliseconds(0)).getStatus(); + ASSERT_NOT_OK(status); + ASSERT_EQUALS(ErrorCodes::LockBusy, status.code()); + } + + // Advance config server time to exceed lock expiration. + getMockCatalog()->expectGetServerInfo( + []() {}, DistLockCatalog::ServerInfo(Date_t() + kLockExpiration + Milliseconds(1), OID())); + + getMockCatalog()->expectOvertakeLock( + [this, &lastTS, ¤tLockDoc](StringData lockID, + const OID& lockSessionID, + const OID& currentHolderTS, + StringData who, + StringData processId, + Date_t time, + StringData why) { ASSERT_EQUALS("bar", lockID); ASSERT_EQUALS(lastTS, lockSessionID); ASSERT_EQUALS(currentLockDoc.getLockID(), currentHolderTS); ASSERT_EQUALS(getProcessID(), processId); ASSERT_EQUALS("foo", why); - }, currentLockDoc); // return arbitrary valid lock document, for testing purposes only. + }, + currentLockDoc); // return arbitrary valid lock document, for testing purposes only. - int unlockCallCount = 0; - OID unlockSessionIDPassed; + int unlockCallCount = 0; + OID unlockSessionIDPassed; - // Second attempt should overtake lock. - { - auto lockStatus = getMgr()->lock("bar", - "foo", - Milliseconds(0), - Milliseconds(0)); + // Second attempt should overtake lock. + { + auto lockStatus = getMgr()->lock("bar", "foo", Milliseconds(0), Milliseconds(0)); - ASSERT_OK(lockStatus.getStatus()); + ASSERT_OK(lockStatus.getStatus()); - getMockCatalog()->expectNoGrabLock(); - getMockCatalog()->expectUnLock( - [&unlockCallCount, &unlockSessionIDPassed](const OID& lockSessionID) { + getMockCatalog()->expectNoGrabLock(); + getMockCatalog()->expectUnLock( + [&unlockCallCount, &unlockSessionIDPassed](const OID& lockSessionID) { unlockCallCount++; unlockSessionIDPassed = lockSessionID; - }, Status::OK()); - } - - ASSERT_EQUALS(1, unlockCallCount); - ASSERT_EQUALS(lastTS, unlockSessionIDPassed); + }, + Status::OK()); } - TEST_F(ReplSetDistLockManagerFixture, CannotOvertakeIfExpirationHasNotElapsed) { - getMockCatalog()->expectGrabLock([]( - StringData, const OID&, StringData, StringData, Date_t, StringData) { - // Don't care. - }, {ErrorCodes::LockStateChangeFailed, "nMod 0"}); - - LocksType currentLockDoc; - currentLockDoc.setName("bar"); - currentLockDoc.setState(LocksType::LOCKED); - currentLockDoc.setProcess("otherProcess"); - currentLockDoc.setLockID(OID("5572007fda9e476582bf3716")); - currentLockDoc.setWho("me"); - currentLockDoc.setWhy("why"); - - getMockCatalog()->expectGetLockByName([](StringData name) { - ASSERT_EQUALS("bar", name); - }, currentLockDoc); - - LockpingsType pingDoc; - pingDoc.setProcess("otherProcess"); - pingDoc.setPing(Date_t()); - - getMockCatalog()->expectGetPing([](StringData process) { - ASSERT_EQUALS("otherProcess", process); - }, pingDoc); - - getMockCatalog()->expectGetServerInfo([]() { - }, DistLockCatalog::ServerInfo(Date_t(), OID())); - - // First attempt will record the ping data. - { - auto status = getMgr()->lock("bar", "", Milliseconds(0), Milliseconds(0)).getStatus(); - ASSERT_NOT_OK(status); - ASSERT_EQUALS(ErrorCodes::LockBusy, status.code()); - } + ASSERT_EQUALS(1, unlockCallCount); + ASSERT_EQUALS(lastTS, unlockSessionIDPassed); +} - // Advance config server time to 1 millisecond before lock expiration. - getMockCatalog()->expectGetServerInfo([]() { - }, DistLockCatalog::ServerInfo(Date_t() + kLockExpiration - Milliseconds(1), OID())); +TEST_F(ReplSetDistLockManagerFixture, CannotOvertakeIfExpirationHasNotElapsed) { + getMockCatalog()->expectGrabLock( + [](StringData, const OID&, StringData, StringData, Date_t, StringData) { + // Don't care. + }, + {ErrorCodes::LockStateChangeFailed, "nMod 0"}); - // Second attempt should still not overtake lock. - { - auto status = getMgr()->lock("bar", "", Milliseconds(0), Milliseconds(0)).getStatus(); - ASSERT_NOT_OK(status); - ASSERT_EQUALS(ErrorCodes::LockBusy, status.code()); - } - } + LocksType currentLockDoc; + currentLockDoc.setName("bar"); + currentLockDoc.setState(LocksType::LOCKED); + currentLockDoc.setProcess("otherProcess"); + currentLockDoc.setLockID(OID("5572007fda9e476582bf3716")); + currentLockDoc.setWho("me"); + currentLockDoc.setWhy("why"); - TEST_F(ReplSetDistLockManagerFixture, GetPingErrorWhileOvertaking) { - getMockCatalog()->expectGrabLock([]( - StringData, const OID&, StringData, StringData, Date_t, StringData) { - // Don't care - }, {ErrorCodes::LockStateChangeFailed, "nMod 0"}); + getMockCatalog()->expectGetLockByName([](StringData name) { ASSERT_EQUALS("bar", name); }, + currentLockDoc); - LocksType currentLockDoc; - currentLockDoc.setName("bar"); - currentLockDoc.setState(LocksType::LOCKED); - currentLockDoc.setProcess("otherProcess"); - currentLockDoc.setLockID(OID("5572007fda9e476582bf3716")); - currentLockDoc.setWho("me"); - currentLockDoc.setWhy("why"); + LockpingsType pingDoc; + pingDoc.setProcess("otherProcess"); + pingDoc.setPing(Date_t()); - getMockCatalog()->expectGetLockByName([](StringData name) { - ASSERT_EQUALS("bar", name); - }, currentLockDoc); + getMockCatalog()->expectGetPing( + [](StringData process) { ASSERT_EQUALS("otherProcess", process); }, pingDoc); - getMockCatalog()->expectGetPing([](StringData process) { - ASSERT_EQUALS("otherProcess", process); - }, {ErrorCodes::NetworkTimeout, "bad test network"}); + getMockCatalog()->expectGetServerInfo([]() {}, DistLockCatalog::ServerInfo(Date_t(), OID())); + // First attempt will record the ping data. + { auto status = getMgr()->lock("bar", "", Milliseconds(0), Milliseconds(0)).getStatus(); ASSERT_NOT_OK(status); - ASSERT_EQUALS(ErrorCodes::NetworkTimeout, status.code()); + ASSERT_EQUALS(ErrorCodes::LockBusy, status.code()); } - TEST_F(ReplSetDistLockManagerFixture, GetInvalidPingDocumentWhileOvertaking) { - getMockCatalog()->expectGrabLock([]( - StringData, const OID&, StringData, StringData, Date_t, StringData) { - // Don't care - }, {ErrorCodes::LockStateChangeFailed, "nMod 0"}); - - LocksType currentLockDoc; - currentLockDoc.setName("bar"); - currentLockDoc.setState(LocksType::LOCKED); - currentLockDoc.setProcess("otherProcess"); - currentLockDoc.setLockID(OID("5572007fda9e476582bf3716")); - currentLockDoc.setWho("me"); - currentLockDoc.setWhy("why"); - - getMockCatalog()->expectGetLockByName([](StringData name) { - ASSERT_EQUALS("bar", name); - }, currentLockDoc); - - LockpingsType invalidPing; - getMockCatalog()->expectGetPing([](StringData process) { - ASSERT_EQUALS("otherProcess", process); - }, invalidPing); + // Advance config server time to 1 millisecond before lock expiration. + getMockCatalog()->expectGetServerInfo( + []() {}, DistLockCatalog::ServerInfo(Date_t() + kLockExpiration - Milliseconds(1), OID())); + // Second attempt should still not overtake lock. + { auto status = getMgr()->lock("bar", "", Milliseconds(0), Milliseconds(0)).getStatus(); ASSERT_NOT_OK(status); - ASSERT_EQUALS(ErrorCodes::UnsupportedFormat, status.code()); + ASSERT_EQUALS(ErrorCodes::LockBusy, status.code()); } +} - TEST_F(ReplSetDistLockManagerFixture, GetServerInfoErrorWhileOvertaking) { - getMockCatalog()->expectGrabLock([]( - StringData, const OID&, StringData, StringData, Date_t, StringData) { +TEST_F(ReplSetDistLockManagerFixture, GetPingErrorWhileOvertaking) { + getMockCatalog()->expectGrabLock( + [](StringData, const OID&, StringData, StringData, Date_t, StringData) { // Don't care - }, {ErrorCodes::LockStateChangeFailed, "nMod 0"}); - - LocksType currentLockDoc; - currentLockDoc.setName("bar"); - currentLockDoc.setState(LocksType::LOCKED); - currentLockDoc.setProcess("otherProcess"); - currentLockDoc.setLockID(OID("5572007fda9e476582bf3716")); - currentLockDoc.setWho("me"); - currentLockDoc.setWhy("why"); + }, + {ErrorCodes::LockStateChangeFailed, "nMod 0"}); + + LocksType currentLockDoc; + currentLockDoc.setName("bar"); + currentLockDoc.setState(LocksType::LOCKED); + currentLockDoc.setProcess("otherProcess"); + currentLockDoc.setLockID(OID("5572007fda9e476582bf3716")); + currentLockDoc.setWho("me"); + currentLockDoc.setWhy("why"); + + getMockCatalog()->expectGetLockByName([](StringData name) { ASSERT_EQUALS("bar", name); }, + currentLockDoc); + + getMockCatalog()->expectGetPing([](StringData process) { + ASSERT_EQUALS("otherProcess", process); + }, {ErrorCodes::NetworkTimeout, "bad test network"}); + + auto status = getMgr()->lock("bar", "", Milliseconds(0), Milliseconds(0)).getStatus(); + ASSERT_NOT_OK(status); + ASSERT_EQUALS(ErrorCodes::NetworkTimeout, status.code()); +} + +TEST_F(ReplSetDistLockManagerFixture, GetInvalidPingDocumentWhileOvertaking) { + getMockCatalog()->expectGrabLock( + [](StringData, const OID&, StringData, StringData, Date_t, StringData) { + // Don't care + }, + {ErrorCodes::LockStateChangeFailed, "nMod 0"}); + + LocksType currentLockDoc; + currentLockDoc.setName("bar"); + currentLockDoc.setState(LocksType::LOCKED); + currentLockDoc.setProcess("otherProcess"); + currentLockDoc.setLockID(OID("5572007fda9e476582bf3716")); + currentLockDoc.setWho("me"); + currentLockDoc.setWhy("why"); + + getMockCatalog()->expectGetLockByName([](StringData name) { ASSERT_EQUALS("bar", name); }, + currentLockDoc); + + LockpingsType invalidPing; + getMockCatalog()->expectGetPing( + [](StringData process) { ASSERT_EQUALS("otherProcess", process); }, invalidPing); + + auto status = getMgr()->lock("bar", "", Milliseconds(0), Milliseconds(0)).getStatus(); + ASSERT_NOT_OK(status); + ASSERT_EQUALS(ErrorCodes::UnsupportedFormat, status.code()); +} + +TEST_F(ReplSetDistLockManagerFixture, GetServerInfoErrorWhileOvertaking) { + getMockCatalog()->expectGrabLock( + [](StringData, const OID&, StringData, StringData, Date_t, StringData) { + // Don't care + }, + {ErrorCodes::LockStateChangeFailed, "nMod 0"}); + + LocksType currentLockDoc; + currentLockDoc.setName("bar"); + currentLockDoc.setState(LocksType::LOCKED); + currentLockDoc.setProcess("otherProcess"); + currentLockDoc.setLockID(OID("5572007fda9e476582bf3716")); + currentLockDoc.setWho("me"); + currentLockDoc.setWhy("why"); + + getMockCatalog()->expectGetLockByName([](StringData name) { ASSERT_EQUALS("bar", name); }, + currentLockDoc); + + LockpingsType pingDoc; + pingDoc.setProcess("otherProcess"); + pingDoc.setPing(Date_t()); + + getMockCatalog()->expectGetPing( + [](StringData process) { ASSERT_EQUALS("otherProcess", process); }, pingDoc); + + getMockCatalog()->expectGetServerInfo([]() {}, + {ErrorCodes::NetworkTimeout, "bad test network"}); + + auto status = getMgr()->lock("bar", "", Milliseconds(0), Milliseconds(0)).getStatus(); + ASSERT_NOT_OK(status); + ASSERT_EQUALS(ErrorCodes::NetworkTimeout, status.code()); +} + +TEST_F(ReplSetDistLockManagerFixture, GetLockErrorWhileOvertaking) { + getMockCatalog()->expectGrabLock( + [](StringData, const OID&, StringData, StringData, Date_t, StringData) { + // Don't care + }, + {ErrorCodes::LockStateChangeFailed, "nMod 0"}); - getMockCatalog()->expectGetLockByName([](StringData name) { - ASSERT_EQUALS("bar", name); - }, currentLockDoc); + getMockCatalog()->expectGetLockByName([](StringData name) { ASSERT_EQUALS("bar", name); }, + {ErrorCodes::NetworkTimeout, "bad test network"}); - LockpingsType pingDoc; - pingDoc.setProcess("otherProcess"); - pingDoc.setPing(Date_t()); + auto status = getMgr()->lock("bar", "", Milliseconds(0), Milliseconds(0)).getStatus(); + ASSERT_NOT_OK(status); + ASSERT_EQUALS(ErrorCodes::NetworkTimeout, status.code()); +} - getMockCatalog()->expectGetPing([](StringData process) { - ASSERT_EQUALS("otherProcess", process); - }, pingDoc); +TEST_F(ReplSetDistLockManagerFixture, GetLockDisappearedWhileOvertaking) { + getMockCatalog()->expectGrabLock( + [](StringData, const OID&, StringData, StringData, Date_t, StringData) { + // Don't care + }, + {ErrorCodes::LockStateChangeFailed, "nMod 0"}); - getMockCatalog()->expectGetServerInfo([]() { - }, {ErrorCodes::NetworkTimeout, "bad test network"}); + getMockCatalog()->expectGetLockByName([](StringData name) { ASSERT_EQUALS("bar", name); }, + {ErrorCodes::LockNotFound, "disappeared!"}); - auto status = getMgr()->lock("bar", "", Milliseconds(0), Milliseconds(0)).getStatus(); - ASSERT_NOT_OK(status); - ASSERT_EQUALS(ErrorCodes::NetworkTimeout, status.code()); - } + auto status = getMgr()->lock("bar", "", Milliseconds(0), Milliseconds(0)).getStatus(); + ASSERT_NOT_OK(status); + ASSERT_EQUALS(ErrorCodes::LockBusy, status.code()); +} - TEST_F(ReplSetDistLockManagerFixture, GetLockErrorWhileOvertaking) { - getMockCatalog()->expectGrabLock([]( - StringData, const OID&, StringData, StringData, Date_t, StringData) { +/** + * 1. Try to grab lock multiple times. + * 2. For each attempt, the ping is updated and the config server clock is advanced + * by increments of lock expiration duration. + * 3. All of the previous attempt should result in lock busy. + * 4. Try to grab lock again when the ping was not updated and lock expiration has elapsed. + */ +TEST_F(ReplSetDistLockManagerFixture, CannotOvertakeIfPingIsActive) { + getMockCatalog()->expectGrabLock( + [](StringData, const OID&, StringData, StringData, Date_t, StringData) { // Don't care - }, {ErrorCodes::LockStateChangeFailed, "nMod 0"}); - - getMockCatalog()->expectGetLockByName([](StringData name) { - ASSERT_EQUALS("bar", name); - }, {ErrorCodes::NetworkTimeout, "bad test network"}); + }, + {ErrorCodes::LockStateChangeFailed, "nMod 0"}); + + LocksType currentLockDoc; + currentLockDoc.setName("bar"); + currentLockDoc.setState(LocksType::LOCKED); + currentLockDoc.setProcess("otherProcess"); + currentLockDoc.setLockID(OID("5572007fda9e476582bf3716")); + currentLockDoc.setWho("me"); + currentLockDoc.setWhy("why"); + + Date_t currentPing; + LockpingsType pingDoc; + pingDoc.setProcess("otherProcess"); + + Date_t configServerLocalTime; + int getServerInfoCallCount = 0; + + getMockCatalog()->expectGetLockByName([](StringData name) { ASSERT_EQUALS("bar", name); }, + currentLockDoc); + + const int kLoopCount = 5; + for (int x = 0; x < kLoopCount; x++) { + // Advance config server time to reach lock expiration. + configServerLocalTime += kLockExpiration; - auto status = getMgr()->lock("bar", "", Milliseconds(0), Milliseconds(0)).getStatus(); - ASSERT_NOT_OK(status); - ASSERT_EQUALS(ErrorCodes::NetworkTimeout, status.code()); - } + currentPing += Milliseconds(1); + pingDoc.setPing(currentPing); - TEST_F(ReplSetDistLockManagerFixture, GetLockDisappearedWhileOvertaking) { - getMockCatalog()->expectGrabLock([]( - StringData, const OID&, StringData, StringData, Date_t, StringData) { - // Don't care - }, {ErrorCodes::LockStateChangeFailed, "nMod 0"}); + getMockCatalog()->expectGetPing( + [](StringData process) { ASSERT_EQUALS("otherProcess", process); }, pingDoc); - getMockCatalog()->expectGetLockByName([](StringData name) { - ASSERT_EQUALS("bar", name); - }, {ErrorCodes::LockNotFound, "disappeared!"}); + getMockCatalog()->expectGetServerInfo([&getServerInfoCallCount]() { + getServerInfoCallCount++; + }, DistLockCatalog::ServerInfo(configServerLocalTime, OID())); auto status = getMgr()->lock("bar", "", Milliseconds(0), Milliseconds(0)).getStatus(); ASSERT_NOT_OK(status); ASSERT_EQUALS(ErrorCodes::LockBusy, status.code()); } - /** - * 1. Try to grab lock multiple times. - * 2. For each attempt, the ping is updated and the config server clock is advanced - * by increments of lock expiration duration. - * 3. All of the previous attempt should result in lock busy. - * 4. Try to grab lock again when the ping was not updated and lock expiration has elapsed. - */ - TEST_F(ReplSetDistLockManagerFixture, CannotOvertakeIfPingIsActive) { - getMockCatalog()->expectGrabLock([]( - StringData, const OID&, StringData, StringData, Date_t, StringData) { - // Don't care - }, {ErrorCodes::LockStateChangeFailed, "nMod 0"}); - - LocksType currentLockDoc; - currentLockDoc.setName("bar"); - currentLockDoc.setState(LocksType::LOCKED); - currentLockDoc.setProcess("otherProcess"); - currentLockDoc.setLockID(OID("5572007fda9e476582bf3716")); - currentLockDoc.setWho("me"); - currentLockDoc.setWhy("why"); - - Date_t currentPing; - LockpingsType pingDoc; - pingDoc.setProcess("otherProcess"); - - Date_t configServerLocalTime; - int getServerInfoCallCount = 0; - - getMockCatalog()->expectGetLockByName([](StringData name) { - ASSERT_EQUALS("bar", name); - }, currentLockDoc); - - const int kLoopCount = 5; - for (int x = 0; x < kLoopCount; x++) { - // Advance config server time to reach lock expiration. - configServerLocalTime += kLockExpiration; - - currentPing += Milliseconds(1); - pingDoc.setPing(currentPing); - - getMockCatalog()->expectGetPing([](StringData process) { - ASSERT_EQUALS("otherProcess", process); - }, pingDoc); - - getMockCatalog()->expectGetServerInfo([&getServerInfoCallCount]() { - getServerInfoCallCount++; - }, DistLockCatalog::ServerInfo(configServerLocalTime, OID())); - - auto status = getMgr()->lock("bar", "", Milliseconds(0), Milliseconds(0)).getStatus(); - ASSERT_NOT_OK(status); - ASSERT_EQUALS(ErrorCodes::LockBusy, status.code()); - } - - ASSERT_EQUALS(kLoopCount, getServerInfoCallCount); - - configServerLocalTime += kLockExpiration; - getMockCatalog()->expectGetServerInfo([&getServerInfoCallCount]() { - getServerInfoCallCount++; - }, DistLockCatalog::ServerInfo(configServerLocalTime, OID())); - - OID lockTS; - // Make sure that overtake is now ok since ping is no longer updated. - getMockCatalog()->expectOvertakeLock( - [this, &lockTS, ¤tLockDoc] - (StringData lockID, - const OID& lockSessionID, - const OID& currentHolderTS, - StringData who, - StringData processId, - Date_t time, - StringData why) { + ASSERT_EQUALS(kLoopCount, getServerInfoCallCount); + + configServerLocalTime += kLockExpiration; + getMockCatalog()->expectGetServerInfo([&getServerInfoCallCount]() { + getServerInfoCallCount++; + }, DistLockCatalog::ServerInfo(configServerLocalTime, OID())); + + OID lockTS; + // Make sure that overtake is now ok since ping is no longer updated. + getMockCatalog()->expectOvertakeLock( + [this, &lockTS, ¤tLockDoc](StringData lockID, + const OID& lockSessionID, + const OID& currentHolderTS, + StringData who, + StringData processId, + Date_t time, + StringData why) { ASSERT_EQUALS("bar", lockID); lockTS = lockSessionID; ASSERT_EQUALS(currentLockDoc.getLockID(), currentHolderTS); ASSERT_EQUALS(getProcessID(), processId); ASSERT_EQUALS("foo", why); - }, currentLockDoc); // return arbitrary valid lock document, for testing purposes only. + }, + currentLockDoc); // return arbitrary valid lock document, for testing purposes only. - int unlockCallCount = 0; - OID unlockSessionIDPassed; + int unlockCallCount = 0; + OID unlockSessionIDPassed; - { - auto lockStatus = getMgr()->lock("bar", - "foo", - Milliseconds(0), - Milliseconds(0)); + { + auto lockStatus = getMgr()->lock("bar", "foo", Milliseconds(0), Milliseconds(0)); - ASSERT_OK(lockStatus.getStatus()); + ASSERT_OK(lockStatus.getStatus()); - getMockCatalog()->expectNoGrabLock(); - getMockCatalog()->expectUnLock( - [&unlockCallCount, &unlockSessionIDPassed](const OID& lockSessionID) { + getMockCatalog()->expectNoGrabLock(); + getMockCatalog()->expectUnLock( + [&unlockCallCount, &unlockSessionIDPassed](const OID& lockSessionID) { unlockCallCount++; unlockSessionIDPassed = lockSessionID; - }, Status::OK()); - } - - ASSERT_EQUALS(1, unlockCallCount); - ASSERT_EQUALS(lockTS, unlockSessionIDPassed); + }, + Status::OK()); } - /** - * 1. Try to grab lock multiple times. - * 2. For each attempt, the owner of the lock is different and the config server clock is - * advanced by increments of lock expiration duration. - * 3. All of the previous attempt should result in lock busy. - * 4. Try to grab lock again when the ping was not updated and lock expiration has elapsed. - */ - TEST_F(ReplSetDistLockManagerFixture, CannotOvertakeIfOwnerJustChanged) { - getMockCatalog()->expectGrabLock([]( - StringData, const OID&, StringData, StringData, Date_t, StringData) { + ASSERT_EQUALS(1, unlockCallCount); + ASSERT_EQUALS(lockTS, unlockSessionIDPassed); +} + +/** + * 1. Try to grab lock multiple times. + * 2. For each attempt, the owner of the lock is different and the config server clock is + * advanced by increments of lock expiration duration. + * 3. All of the previous attempt should result in lock busy. + * 4. Try to grab lock again when the ping was not updated and lock expiration has elapsed. + */ +TEST_F(ReplSetDistLockManagerFixture, CannotOvertakeIfOwnerJustChanged) { + getMockCatalog()->expectGrabLock( + [](StringData, const OID&, StringData, StringData, Date_t, StringData) { // Don't care - }, {ErrorCodes::LockStateChangeFailed, "nMod 0"}); - - LocksType currentLockDoc; - currentLockDoc.setName("bar"); - currentLockDoc.setState(LocksType::LOCKED); - currentLockDoc.setProcess("otherProcess"); - currentLockDoc.setLockID(OID("5572007fda9e476582bf3716")); - currentLockDoc.setWho("me"); - currentLockDoc.setWhy("why"); - - Date_t currentPing; - LockpingsType pingDoc; - pingDoc.setProcess("otherProcess"); - pingDoc.setPing(Date_t()); - - Date_t configServerLocalTime; - int getServerInfoCallCount = 0; - - getMockCatalog()->expectGetPing([](StringData process) { - ASSERT_EQUALS("otherProcess", process); - }, pingDoc); - - const int kLoopCount = 5; - for (int x = 0; x < kLoopCount; x++) { - // Advance config server time to reach lock expiration. - configServerLocalTime += kLockExpiration; - - currentLockDoc.setLockID(OID::gen()); - - getMockCatalog()->expectGetLockByName([](StringData name) { - ASSERT_EQUALS("bar", name); - }, currentLockDoc); - - getMockCatalog()->expectGetServerInfo([&getServerInfoCallCount]() { - getServerInfoCallCount++; - }, DistLockCatalog::ServerInfo(configServerLocalTime, OID())); - - auto status = getMgr()->lock("bar", "", Milliseconds(0), Milliseconds(0)).getStatus(); - ASSERT_NOT_OK(status); - ASSERT_EQUALS(ErrorCodes::LockBusy, status.code()); - } + }, + {ErrorCodes::LockStateChangeFailed, "nMod 0"}); + + LocksType currentLockDoc; + currentLockDoc.setName("bar"); + currentLockDoc.setState(LocksType::LOCKED); + currentLockDoc.setProcess("otherProcess"); + currentLockDoc.setLockID(OID("5572007fda9e476582bf3716")); + currentLockDoc.setWho("me"); + currentLockDoc.setWhy("why"); + + Date_t currentPing; + LockpingsType pingDoc; + pingDoc.setProcess("otherProcess"); + pingDoc.setPing(Date_t()); + + Date_t configServerLocalTime; + int getServerInfoCallCount = 0; + + getMockCatalog()->expectGetPing( + [](StringData process) { ASSERT_EQUALS("otherProcess", process); }, pingDoc); + + const int kLoopCount = 5; + for (int x = 0; x < kLoopCount; x++) { + // Advance config server time to reach lock expiration. + configServerLocalTime += kLockExpiration; - ASSERT_EQUALS(kLoopCount, getServerInfoCallCount); + currentLockDoc.setLockID(OID::gen()); + + getMockCatalog()->expectGetLockByName([](StringData name) { ASSERT_EQUALS("bar", name); }, + currentLockDoc); - configServerLocalTime += kLockExpiration; getMockCatalog()->expectGetServerInfo([&getServerInfoCallCount]() { getServerInfoCallCount++; }, DistLockCatalog::ServerInfo(configServerLocalTime, OID())); - OID lockTS; - // Make sure that overtake is now ok since lock owner didn't change. - getMockCatalog()->expectOvertakeLock( - [this, &lockTS, ¤tLockDoc] - (StringData lockID, - const OID& lockSessionID, - const OID& currentHolderTS, - StringData who, - StringData processId, - Date_t time, - StringData why) { + auto status = getMgr()->lock("bar", "", Milliseconds(0), Milliseconds(0)).getStatus(); + ASSERT_NOT_OK(status); + ASSERT_EQUALS(ErrorCodes::LockBusy, status.code()); + } + + ASSERT_EQUALS(kLoopCount, getServerInfoCallCount); + + configServerLocalTime += kLockExpiration; + getMockCatalog()->expectGetServerInfo([&getServerInfoCallCount]() { + getServerInfoCallCount++; + }, DistLockCatalog::ServerInfo(configServerLocalTime, OID())); + + OID lockTS; + // Make sure that overtake is now ok since lock owner didn't change. + getMockCatalog()->expectOvertakeLock( + [this, &lockTS, ¤tLockDoc](StringData lockID, + const OID& lockSessionID, + const OID& currentHolderTS, + StringData who, + StringData processId, + Date_t time, + StringData why) { ASSERT_EQUALS("bar", lockID); lockTS = lockSessionID; ASSERT_EQUALS(currentLockDoc.getLockID(), currentHolderTS); ASSERT_EQUALS(getProcessID(), processId); ASSERT_EQUALS("foo", why); - }, currentLockDoc); // return arbitrary valid lock document, for testing purposes only. + }, + currentLockDoc); // return arbitrary valid lock document, for testing purposes only. - int unlockCallCount = 0; - OID unlockSessionIDPassed; + int unlockCallCount = 0; + OID unlockSessionIDPassed; - { - auto lockStatus = getMgr()->lock("bar", - "foo", - Milliseconds(0), - Milliseconds(0)); + { + auto lockStatus = getMgr()->lock("bar", "foo", Milliseconds(0), Milliseconds(0)); - ASSERT_OK(lockStatus.getStatus()); + ASSERT_OK(lockStatus.getStatus()); - getMockCatalog()->expectNoGrabLock(); - getMockCatalog()->expectUnLock( - [&unlockCallCount, &unlockSessionIDPassed](const OID& lockSessionID) { + getMockCatalog()->expectNoGrabLock(); + getMockCatalog()->expectUnLock( + [&unlockCallCount, &unlockSessionIDPassed](const OID& lockSessionID) { unlockCallCount++; unlockSessionIDPassed = lockSessionID; - }, Status::OK()); - } - - ASSERT_EQUALS(1, unlockCallCount); - ASSERT_EQUALS(lockTS, unlockSessionIDPassed); + }, + Status::OK()); } - /** - * 1. Try to grab lock multiple times. - * 2. For each attempt, the electionId of the config server is different and the - * config server clock is advanced by increments of lock expiration duration. - * 3. All of the previous attempt should result in lock busy. - * 4. Try to grab lock again when the ping was not updated and lock expiration has elapsed. - */ - TEST_F(ReplSetDistLockManagerFixture, CannotOvertakeIfElectionIdChanged) { - getMockCatalog()->expectGrabLock([]( - StringData, const OID&, StringData, StringData, Date_t, StringData) { + ASSERT_EQUALS(1, unlockCallCount); + ASSERT_EQUALS(lockTS, unlockSessionIDPassed); +} + +/** + * 1. Try to grab lock multiple times. + * 2. For each attempt, the electionId of the config server is different and the + * config server clock is advanced by increments of lock expiration duration. + * 3. All of the previous attempt should result in lock busy. + * 4. Try to grab lock again when the ping was not updated and lock expiration has elapsed. + */ +TEST_F(ReplSetDistLockManagerFixture, CannotOvertakeIfElectionIdChanged) { + getMockCatalog()->expectGrabLock( + [](StringData, const OID&, StringData, StringData, Date_t, StringData) { // Don't care - }, {ErrorCodes::LockStateChangeFailed, "nMod 0"}); - - LocksType currentLockDoc; - currentLockDoc.setName("bar"); - currentLockDoc.setState(LocksType::LOCKED); - currentLockDoc.setProcess("otherProcess"); - currentLockDoc.setLockID(OID("5572007fda9e476582bf3716")); - currentLockDoc.setWho("me"); - currentLockDoc.setWhy("why"); - - Date_t currentPing; - LockpingsType pingDoc; - pingDoc.setProcess("otherProcess"); - pingDoc.setPing(Date_t()); - - Date_t configServerLocalTime; - int getServerInfoCallCount = 0; - - const LocksType& fixedLockDoc = currentLockDoc; - const LockpingsType& fixedPingDoc = pingDoc; - - const int kLoopCount = 5; - OID lastElectionId; - for (int x = 0; x < kLoopCount; x++) { - // Advance config server time to reach lock expiration. - configServerLocalTime += kLockExpiration; - - getMockCatalog()->expectGetLockByName([](StringData name) { - ASSERT_EQUALS("bar", name); - }, fixedLockDoc); - - getMockCatalog()->expectGetPing([](StringData process) { - ASSERT_EQUALS("otherProcess", process); - }, fixedPingDoc); - - lastElectionId = OID::gen(); - getMockCatalog()->expectGetServerInfo([&getServerInfoCallCount]() { - getServerInfoCallCount++; - }, DistLockCatalog::ServerInfo(configServerLocalTime, lastElectionId)); - - auto status = getMgr()->lock("bar", "", Milliseconds(0), Milliseconds(0)).getStatus(); - ASSERT_NOT_OK(status); - ASSERT_EQUALS(ErrorCodes::LockBusy, status.code()); - } + }, + {ErrorCodes::LockStateChangeFailed, "nMod 0"}); + + LocksType currentLockDoc; + currentLockDoc.setName("bar"); + currentLockDoc.setState(LocksType::LOCKED); + currentLockDoc.setProcess("otherProcess"); + currentLockDoc.setLockID(OID("5572007fda9e476582bf3716")); + currentLockDoc.setWho("me"); + currentLockDoc.setWhy("why"); + + Date_t currentPing; + LockpingsType pingDoc; + pingDoc.setProcess("otherProcess"); + pingDoc.setPing(Date_t()); + + Date_t configServerLocalTime; + int getServerInfoCallCount = 0; + + const LocksType& fixedLockDoc = currentLockDoc; + const LockpingsType& fixedPingDoc = pingDoc; + + const int kLoopCount = 5; + OID lastElectionId; + for (int x = 0; x < kLoopCount; x++) { + // Advance config server time to reach lock expiration. + configServerLocalTime += kLockExpiration; - ASSERT_EQUALS(kLoopCount, getServerInfoCallCount); + getMockCatalog()->expectGetLockByName([](StringData name) { ASSERT_EQUALS("bar", name); }, + fixedLockDoc); - configServerLocalTime += kLockExpiration; + getMockCatalog()->expectGetPing( + [](StringData process) { ASSERT_EQUALS("otherProcess", process); }, fixedPingDoc); + + lastElectionId = OID::gen(); getMockCatalog()->expectGetServerInfo([&getServerInfoCallCount]() { getServerInfoCallCount++; }, DistLockCatalog::ServerInfo(configServerLocalTime, lastElectionId)); - OID lockTS; - // Make sure that overtake is now ok since electionId didn't change. - getMockCatalog()->expectOvertakeLock( - [this, &lockTS, ¤tLockDoc] - (StringData lockID, - const OID& lockSessionID, - const OID& currentHolderTS, - StringData who, - StringData processId, - Date_t time, - StringData why) { + auto status = getMgr()->lock("bar", "", Milliseconds(0), Milliseconds(0)).getStatus(); + ASSERT_NOT_OK(status); + ASSERT_EQUALS(ErrorCodes::LockBusy, status.code()); + } + + ASSERT_EQUALS(kLoopCount, getServerInfoCallCount); + + configServerLocalTime += kLockExpiration; + getMockCatalog()->expectGetServerInfo([&getServerInfoCallCount]() { + getServerInfoCallCount++; + }, DistLockCatalog::ServerInfo(configServerLocalTime, lastElectionId)); + + OID lockTS; + // Make sure that overtake is now ok since electionId didn't change. + getMockCatalog()->expectOvertakeLock( + [this, &lockTS, ¤tLockDoc](StringData lockID, + const OID& lockSessionID, + const OID& currentHolderTS, + StringData who, + StringData processId, + Date_t time, + StringData why) { ASSERT_EQUALS("bar", lockID); lockTS = lockSessionID; ASSERT_EQUALS(currentLockDoc.getLockID(), currentHolderTS); ASSERT_EQUALS(getProcessID(), processId); ASSERT_EQUALS("foo", why); - }, currentLockDoc); // return arbitrary valid lock document, for testing purposes only. + }, + currentLockDoc); // return arbitrary valid lock document, for testing purposes only. - int unlockCallCount = 0; - OID unlockSessionIDPassed; + int unlockCallCount = 0; + OID unlockSessionIDPassed; - { - auto lockStatus = getMgr()->lock("bar", - "foo", - Milliseconds(0), - Milliseconds(0)); + { + auto lockStatus = getMgr()->lock("bar", "foo", Milliseconds(0), Milliseconds(0)); - ASSERT_OK(lockStatus.getStatus()); + ASSERT_OK(lockStatus.getStatus()); - getMockCatalog()->expectNoGrabLock(); - getMockCatalog()->expectUnLock( - [&unlockCallCount, &unlockSessionIDPassed](const OID& lockSessionID) { + getMockCatalog()->expectNoGrabLock(); + getMockCatalog()->expectUnLock( + [&unlockCallCount, &unlockSessionIDPassed](const OID& lockSessionID) { unlockCallCount++; unlockSessionIDPassed = lockSessionID; - }, Status::OK()); - } - - ASSERT_EQUALS(1, unlockCallCount); - ASSERT_EQUALS(lockTS, unlockSessionIDPassed); + }, + Status::OK()); } - /** - * Test scenario: - * 1. Attempt to grab lock fails because lock is already owned. - * 2. Try to get ping data and config server clock. - * 3. Since we don't have previous ping data to compare with, we cannot - * decide whether it's ok to overtake, so we can't. - * 4. Lock expiration has elapsed and the ping has not been updated since. - * 5. 2nd attempt to grab lock still fails for the same reason. - * 6. But since the ping is not fresh anymore, dist lock manager should overtake lock. - * 7. Attempt to overtake resulted in an error. - * 8. Check that unlock was called. - */ - TEST_F(ReplSetDistLockManagerFixture, LockOvertakingResultsInError) { - getMockCatalog()->expectGrabLock([]( - StringData, const OID&, StringData, StringData, Date_t, StringData) { + ASSERT_EQUALS(1, unlockCallCount); + ASSERT_EQUALS(lockTS, unlockSessionIDPassed); +} + +/** + * Test scenario: + * 1. Attempt to grab lock fails because lock is already owned. + * 2. Try to get ping data and config server clock. + * 3. Since we don't have previous ping data to compare with, we cannot + * decide whether it's ok to overtake, so we can't. + * 4. Lock expiration has elapsed and the ping has not been updated since. + * 5. 2nd attempt to grab lock still fails for the same reason. + * 6. But since the ping is not fresh anymore, dist lock manager should overtake lock. + * 7. Attempt to overtake resulted in an error. + * 8. Check that unlock was called. + */ +TEST_F(ReplSetDistLockManagerFixture, LockOvertakingResultsInError) { + getMockCatalog()->expectGrabLock( + [](StringData, const OID&, StringData, StringData, Date_t, StringData) { // Don't care - }, {ErrorCodes::LockStateChangeFailed, "nMod 0"}); - - LocksType currentLockDoc; - currentLockDoc.setName("bar"); - currentLockDoc.setState(LocksType::LOCKED); - currentLockDoc.setProcess("otherProcess"); - currentLockDoc.setLockID(OID("5572007fda9e476582bf3716")); - currentLockDoc.setWho("me"); - currentLockDoc.setWhy("why"); - - getMockCatalog()->expectGetLockByName([](StringData name) { - ASSERT_EQUALS("bar", name); - }, currentLockDoc); - - LockpingsType pingDoc; - pingDoc.setProcess("otherProcess"); - pingDoc.setPing(Date_t()); - - getMockCatalog()->expectGetPing([](StringData process) { - ASSERT_EQUALS("otherProcess", process); - }, pingDoc); - - getMockCatalog()->expectGetServerInfo([]() { - }, DistLockCatalog::ServerInfo(Date_t(), OID())); - - // First attempt will record the ping data. - { - auto status = getMgr()->lock("bar", "", Milliseconds(0), Milliseconds(0)).getStatus(); - ASSERT_NOT_OK(status); - ASSERT_EQUALS(ErrorCodes::LockBusy, status.code()); - } + }, + {ErrorCodes::LockStateChangeFailed, "nMod 0"}); + + LocksType currentLockDoc; + currentLockDoc.setName("bar"); + currentLockDoc.setState(LocksType::LOCKED); + currentLockDoc.setProcess("otherProcess"); + currentLockDoc.setLockID(OID("5572007fda9e476582bf3716")); + currentLockDoc.setWho("me"); + currentLockDoc.setWhy("why"); + + getMockCatalog()->expectGetLockByName([](StringData name) { ASSERT_EQUALS("bar", name); }, + currentLockDoc); + + LockpingsType pingDoc; + pingDoc.setProcess("otherProcess"); + pingDoc.setPing(Date_t()); + + getMockCatalog()->expectGetPing( + [](StringData process) { ASSERT_EQUALS("otherProcess", process); }, pingDoc); - // Advance config server time to exceed lock expiration. - getMockCatalog()->expectGetServerInfo([]() { - }, DistLockCatalog::ServerInfo(Date_t() + kLockExpiration + Milliseconds(1), OID())); - - OID lastTS; - getMockCatalog()->expectOvertakeLock( - [this, &lastTS, ¤tLockDoc] - (StringData lockID, - const OID& lockSessionID, - const OID& currentHolderTS, - StringData who, - StringData processId, - Date_t time, - StringData why) { + getMockCatalog()->expectGetServerInfo([]() {}, DistLockCatalog::ServerInfo(Date_t(), OID())); + + // First attempt will record the ping data. + { + auto status = getMgr()->lock("bar", "", Milliseconds(0), Milliseconds(0)).getStatus(); + ASSERT_NOT_OK(status); + ASSERT_EQUALS(ErrorCodes::LockBusy, status.code()); + } + + // Advance config server time to exceed lock expiration. + getMockCatalog()->expectGetServerInfo( + []() {}, DistLockCatalog::ServerInfo(Date_t() + kLockExpiration + Milliseconds(1), OID())); + + OID lastTS; + getMockCatalog()->expectOvertakeLock( + [this, &lastTS, ¤tLockDoc](StringData lockID, + const OID& lockSessionID, + const OID& currentHolderTS, + StringData who, + StringData processId, + Date_t time, + StringData why) { ASSERT_EQUALS("bar", lockID); lastTS = lockSessionID; ASSERT_EQUALS(currentLockDoc.getLockID(), currentHolderTS); ASSERT_EQUALS(getProcessID(), processId); ASSERT_EQUALS("foo", why); - }, {ErrorCodes::NetworkTimeout, "bad test network"}); + }, + {ErrorCodes::NetworkTimeout, "bad test network"}); - OID unlockSessionIDPassed; + OID unlockSessionIDPassed; - stdx::mutex unlockMutex; - stdx::condition_variable unlockCV; - getMockCatalog()->expectUnLock( - [&unlockSessionIDPassed, &unlockMutex, &unlockCV]( - const OID& lockSessionID) { + stdx::mutex unlockMutex; + stdx::condition_variable unlockCV; + getMockCatalog()->expectUnLock( + [&unlockSessionIDPassed, &unlockMutex, &unlockCV](const OID& lockSessionID) { stdx::unique_lock<stdx::mutex> lk(unlockMutex); unlockSessionIDPassed = lockSessionID; unlockCV.notify_all(); - }, Status::OK()); + }, + Status::OK()); - // Second attempt should overtake lock. - auto lockStatus = getMgr()->lock("bar", - "foo", - Milliseconds(0), - Milliseconds(0)); + // Second attempt should overtake lock. + auto lockStatus = getMgr()->lock("bar", "foo", Milliseconds(0), Milliseconds(0)); - ASSERT_NOT_OK(lockStatus.getStatus()); + ASSERT_NOT_OK(lockStatus.getStatus()); - bool didTimeout = false; - { - stdx::unique_lock<stdx::mutex> lk(unlockMutex); - if (!unlockSessionIDPassed.isSet()) { - didTimeout = unlockCV.wait_for(lk, kUnlockTimeout) == stdx::cv_status::timeout; - } + bool didTimeout = false; + { + stdx::unique_lock<stdx::mutex> lk(unlockMutex); + if (!unlockSessionIDPassed.isSet()) { + didTimeout = unlockCV.wait_for(lk, kUnlockTimeout) == stdx::cv_status::timeout; } + } - // Join the background thread before trying to call asserts. Shutdown calls - // stopPing and we don't care in this test. - getMockCatalog()->expectStopPing([](StringData){}, Status::OK()); - getMgr()->shutDown(); + // Join the background thread before trying to call asserts. Shutdown calls + // stopPing and we don't care in this test. + getMockCatalog()->expectStopPing([](StringData) {}, Status::OK()); + getMgr()->shutDown(); - // No assert until shutDown has been called to make sure that the background thread - // won't be trying to access the local variables that were captured by lamdas that - // may have gone out of scope when the assert unwinds the stack. - // No need to grab testMutex since there is only one thread running at this point. + // No assert until shutDown has been called to make sure that the background thread + // won't be trying to access the local variables that were captured by lamdas that + // may have gone out of scope when the assert unwinds the stack. + // No need to grab testMutex since there is only one thread running at this point. - ASSERT_FALSE(didTimeout); - ASSERT_EQUALS(lastTS, unlockSessionIDPassed); - } + ASSERT_FALSE(didTimeout); + ASSERT_EQUALS(lastTS, unlockSessionIDPassed); +} - /** - * Test scenario: - * 1. Attempt to grab lock fails because lock is already owned. - * 2. Try to get ping data and config server clock. - * 3. Since we don't have previous ping data to compare with, we cannot - * decide whether it's ok to overtake, so we can't. - * 4. Lock expiration has elapsed and the ping has not been updated since. - * 5. 2nd attempt to grab lock still fails for the same reason. - * 6. But since the ping is not fresh anymore, dist lock manager should overtake lock. - * 7. Attempt to overtake resulted failed because someone beat us into it. - */ - TEST_F(ReplSetDistLockManagerFixture, LockOvertakingFailed) { - getMockCatalog()->expectGrabLock([]( - StringData, const OID&, StringData, StringData, Date_t, StringData) { +/** + * Test scenario: + * 1. Attempt to grab lock fails because lock is already owned. + * 2. Try to get ping data and config server clock. + * 3. Since we don't have previous ping data to compare with, we cannot + * decide whether it's ok to overtake, so we can't. + * 4. Lock expiration has elapsed and the ping has not been updated since. + * 5. 2nd attempt to grab lock still fails for the same reason. + * 6. But since the ping is not fresh anymore, dist lock manager should overtake lock. + * 7. Attempt to overtake resulted failed because someone beat us into it. + */ +TEST_F(ReplSetDistLockManagerFixture, LockOvertakingFailed) { + getMockCatalog()->expectGrabLock( + [](StringData, const OID&, StringData, StringData, Date_t, StringData) { // Don't care - }, {ErrorCodes::LockStateChangeFailed, "nMod 0"}); - - LocksType currentLockDoc; - currentLockDoc.setName("bar"); - currentLockDoc.setState(LocksType::LOCKED); - currentLockDoc.setProcess("otherProcess"); - currentLockDoc.setLockID(OID("5572007fda9e476582bf3716")); - currentLockDoc.setWho("me"); - currentLockDoc.setWhy("why"); - - getMockCatalog()->expectGetLockByName([](StringData name) { - ASSERT_EQUALS("bar", name); - }, currentLockDoc); - - LockpingsType pingDoc; - pingDoc.setProcess("otherProcess"); - pingDoc.setPing(Date_t()); - - getMockCatalog()->expectGetPing([](StringData process) { - ASSERT_EQUALS("otherProcess", process); - }, pingDoc); - - getMockCatalog()->expectGetServerInfo([]() { - }, DistLockCatalog::ServerInfo(Date_t(), OID())); - - // First attempt will record the ping data. - { - auto status = getMgr()->lock("bar", "", Milliseconds(0), Milliseconds(0)).getStatus(); - ASSERT_NOT_OK(status); - ASSERT_EQUALS(ErrorCodes::LockBusy, status.code()); - } + }, + {ErrorCodes::LockStateChangeFailed, "nMod 0"}); + + LocksType currentLockDoc; + currentLockDoc.setName("bar"); + currentLockDoc.setState(LocksType::LOCKED); + currentLockDoc.setProcess("otherProcess"); + currentLockDoc.setLockID(OID("5572007fda9e476582bf3716")); + currentLockDoc.setWho("me"); + currentLockDoc.setWhy("why"); + + getMockCatalog()->expectGetLockByName([](StringData name) { ASSERT_EQUALS("bar", name); }, + currentLockDoc); + + LockpingsType pingDoc; + pingDoc.setProcess("otherProcess"); + pingDoc.setPing(Date_t()); - // Advance config server time to exceed lock expiration. - getMockCatalog()->expectGetServerInfo([]() { - }, DistLockCatalog::ServerInfo(Date_t() + kLockExpiration + Milliseconds(1), OID())); - - // Second attempt should overtake lock. - getMockCatalog()->expectOvertakeLock([this, ¤tLockDoc] - (StringData lockID, - const OID& lockSessionID, - const OID& currentHolderTS, - StringData who, - StringData processId, - Date_t time, - StringData why) { + getMockCatalog()->expectGetPing( + [](StringData process) { ASSERT_EQUALS("otherProcess", process); }, pingDoc); + + getMockCatalog()->expectGetServerInfo([]() {}, DistLockCatalog::ServerInfo(Date_t(), OID())); + + // First attempt will record the ping data. + { + auto status = getMgr()->lock("bar", "", Milliseconds(0), Milliseconds(0)).getStatus(); + ASSERT_NOT_OK(status); + ASSERT_EQUALS(ErrorCodes::LockBusy, status.code()); + } + + // Advance config server time to exceed lock expiration. + getMockCatalog()->expectGetServerInfo( + []() {}, DistLockCatalog::ServerInfo(Date_t() + kLockExpiration + Milliseconds(1), OID())); + + // Second attempt should overtake lock. + getMockCatalog()->expectOvertakeLock( + [this, ¤tLockDoc](StringData lockID, + const OID& lockSessionID, + const OID& currentHolderTS, + StringData who, + StringData processId, + Date_t time, + StringData why) { ASSERT_EQUALS("bar", lockID); ASSERT_EQUALS(currentLockDoc.getLockID(), currentHolderTS); ASSERT_EQUALS(getProcessID(), processId); ASSERT_EQUALS("foo", why); - }, {ErrorCodes::LockStateChangeFailed, "nmod 0"}); - - { - auto status = getMgr()->lock("bar", - "foo", - Milliseconds(0), - Milliseconds(0)).getStatus(); - ASSERT_NOT_OK(status); - ASSERT_EQUALS(ErrorCodes::LockBusy, status.code()); - } + }, + {ErrorCodes::LockStateChangeFailed, "nmod 0"}); + + { + auto status = getMgr()->lock("bar", "foo", Milliseconds(0), Milliseconds(0)).getStatus(); + ASSERT_NOT_OK(status); + ASSERT_EQUALS(ErrorCodes::LockBusy, status.code()); } +} - /** - * Test scenario: - * 1. Attempt to grab lock fails because lock is already owned. - * 2. Try to get ping data and config server clock. - * 3. Since we don't have previous ping data to compare with, we cannot - * decide whether it's ok to overtake, so we can't. - * 4. Lock expiration has elapsed and the ping has not been updated since. - * 5. 2nd attempt to grab lock still fails for the same reason. - * 6. But since the ping is not fresh anymore, dist lock manager should overtake lock. - * 7. Attempt to overtake resulted failed because someone beat us into it. - */ - TEST_F(ReplSetDistLockManagerFixture, CannotOvertakeIfConfigServerClockGoesBackwards) { - getMockCatalog()->expectGrabLock([]( - StringData, const OID&, StringData, StringData, Date_t, StringData) { +/** + * Test scenario: + * 1. Attempt to grab lock fails because lock is already owned. + * 2. Try to get ping data and config server clock. + * 3. Since we don't have previous ping data to compare with, we cannot + * decide whether it's ok to overtake, so we can't. + * 4. Lock expiration has elapsed and the ping has not been updated since. + * 5. 2nd attempt to grab lock still fails for the same reason. + * 6. But since the ping is not fresh anymore, dist lock manager should overtake lock. + * 7. Attempt to overtake resulted failed because someone beat us into it. + */ +TEST_F(ReplSetDistLockManagerFixture, CannotOvertakeIfConfigServerClockGoesBackwards) { + getMockCatalog()->expectGrabLock( + [](StringData, const OID&, StringData, StringData, Date_t, StringData) { // Don't care - }, {ErrorCodes::LockStateChangeFailed, "nMod 0"}); - - LocksType currentLockDoc; - currentLockDoc.setName("bar"); - currentLockDoc.setState(LocksType::LOCKED); - currentLockDoc.setProcess("otherProcess"); - currentLockDoc.setLockID(OID("5572007fda9e476582bf3716")); - currentLockDoc.setWho("me"); - currentLockDoc.setWhy("why"); - - getMockCatalog()->expectGetLockByName([](StringData name) { - ASSERT_EQUALS("bar", name); - }, currentLockDoc); - - LockpingsType pingDoc; - pingDoc.setProcess("otherProcess"); - pingDoc.setPing(Date_t()); - - getMockCatalog()->expectGetPing([](StringData process) { - ASSERT_EQUALS("otherProcess", process); - }, pingDoc); - - Date_t configClock(Date_t::now()); - getMockCatalog()->expectGetServerInfo([]() { - }, DistLockCatalog::ServerInfo(configClock, OID())); - - // First attempt will record the ping data. - { - auto status = getMgr()->lock("bar", "", Milliseconds(0), Milliseconds(0)).getStatus(); - ASSERT_NOT_OK(status); - ASSERT_EQUALS(ErrorCodes::LockBusy, status.code()); - } + }, + {ErrorCodes::LockStateChangeFailed, "nMod 0"}); - // Make config server time go backwards by lock expiration duration. - getMockCatalog()->expectGetServerInfo([]() { - }, DistLockCatalog::ServerInfo(configClock - kLockExpiration - Milliseconds(1), OID())); - - // Second attempt should not overtake lock. - { - auto status = getMgr()->lock("bar", - "foo", - Milliseconds(0), - Milliseconds(0)).getStatus(); - ASSERT_NOT_OK(status); - ASSERT_EQUALS(ErrorCodes::LockBusy, status.code()); - } + LocksType currentLockDoc; + currentLockDoc.setName("bar"); + currentLockDoc.setState(LocksType::LOCKED); + currentLockDoc.setProcess("otherProcess"); + currentLockDoc.setLockID(OID("5572007fda9e476582bf3716")); + currentLockDoc.setWho("me"); + currentLockDoc.setWhy("why"); + + getMockCatalog()->expectGetLockByName([](StringData name) { ASSERT_EQUALS("bar", name); }, + currentLockDoc); + + LockpingsType pingDoc; + pingDoc.setProcess("otherProcess"); + pingDoc.setPing(Date_t()); + + getMockCatalog()->expectGetPing( + [](StringData process) { ASSERT_EQUALS("otherProcess", process); }, pingDoc); + + Date_t configClock(Date_t::now()); + getMockCatalog()->expectGetServerInfo([]() {}, DistLockCatalog::ServerInfo(configClock, OID())); + + // First attempt will record the ping data. + { + auto status = getMgr()->lock("bar", "", Milliseconds(0), Milliseconds(0)).getStatus(); + ASSERT_NOT_OK(status); + ASSERT_EQUALS(ErrorCodes::LockBusy, status.code()); + } + + // Make config server time go backwards by lock expiration duration. + getMockCatalog()->expectGetServerInfo([]() { + }, DistLockCatalog::ServerInfo(configClock - kLockExpiration - Milliseconds(1), OID())); + + // Second attempt should not overtake lock. + { + auto status = getMgr()->lock("bar", "foo", Milliseconds(0), Milliseconds(0)).getStatus(); + ASSERT_NOT_OK(status); + ASSERT_EQUALS(ErrorCodes::LockBusy, status.code()); } +} -} // unnamed namespace -} // namespace mongo +} // unnamed namespace +} // namespace mongo |