diff options
author | Spencer T Brody <spencer@mongodb.com> | 2015-08-19 17:14:33 -0400 |
---|---|---|
committer | Spencer T Brody <spencer@mongodb.com> | 2015-08-25 12:56:28 -0400 |
commit | f415aad16ec26a89110a71232dc898218dc5d85c (patch) | |
tree | 14cae84b235c52981d8d661b5219d02aab193cf0 | |
parent | eb0430ee860d22b164cd603ce7186842f72c8537 (diff) | |
download | mongo-f415aad16ec26a89110a71232dc898218dc5d85c.tar.gz |
SERVER-19875 Add OperationContext to CatalogManager::getAllShards
69 files changed, 469 insertions, 397 deletions
diff --git a/src/mongo/client/parallel.cpp b/src/mongo/client/parallel.cpp index f35e4fe2662..169a233ab5e 100644 --- a/src/mongo/client/parallel.cpp +++ b/src/mongo/client/parallel.cpp @@ -457,43 +457,6 @@ BSONObj ParallelConnectionMetadata::toBSON() const { << "init" << initialized << "finish" << finished << "errored" << errored); } -BSONObj ParallelSortClusteredCursor::toBSON() const { - BSONObjBuilder b; - - b.append("tries", _totalTries); - - { - BSONObjBuilder bb; - for (map<ShardId, PCMData>::const_iterator i = _cursorMap.begin(), end = _cursorMap.end(); - i != end; - ++i) { - const auto shard = grid.shardRegistry()->getShard(i->first); - if (!shard) { - continue; - } - - bb.append(shard->toString(), i->second.toBSON()); - } - b.append("cursors", bb.obj().getOwned()); - } - - { - BSONObjBuilder bb; - for (map<string, int>::const_iterator i = _staleNSMap.begin(), end = _staleNSMap.end(); - i != end; - ++i) { - bb.append(i->first, i->second); - } - b.append("staleTries", bb.obj().getOwned()); - } - - return b.obj().getOwned(); -} - -string ParallelSortClusteredCursor::toString() const { - return str::stream() << "PCursor : " << toBSON(); -} - void ParallelSortClusteredCursor::fullInit(OperationContext* txn) { startInit(txn); finishInit(txn); @@ -546,7 +509,8 @@ void ParallelSortClusteredCursor::_handleStaleNS(OperationContext* txn, } } -void ParallelSortClusteredCursor::setupVersionAndHandleSlaveOk(PCStatePtr state, +void ParallelSortClusteredCursor::setupVersionAndHandleSlaveOk(OperationContext* txn, + PCStatePtr state, const ShardId& shardId, std::shared_ptr<Shard> primary, const NamespaceString& ns, @@ -562,7 +526,7 @@ void ParallelSortClusteredCursor::setupVersionAndHandleSlaveOk(PCStatePtr state, // Setup conn if (!state->conn) { - const auto shard = grid.shardRegistry()->getShard(shardId); + const auto shard = grid.shardRegistry()->getShard(txn, shardId); state->conn.reset(new ShardConnection(shard->getConnString(), ns.ns(), manager)); } @@ -653,7 +617,7 @@ void ParallelSortClusteredCursor::startInit(OperationContext* txn) { auto status = grid.catalogCache()->getDatabase(txn, nss.db().toString()); if (status.getStatus().code() != ErrorCodes::DatabaseNotFound) { config = uassertStatusOK(status); - config->getChunkManagerOrPrimary(nss.ns(), manager, primary); + config->getChunkManagerOrPrimary(txn, nss.ns(), manager, primary); } } @@ -735,7 +699,7 @@ void ParallelSortClusteredCursor::startInit(OperationContext* txn) { mdata.pcState.reset(new PCState()); PCStatePtr state = mdata.pcState; - setupVersionAndHandleSlaveOk(state, shardId, primary, nss, vinfo, manager); + setupVersionAndHandleSlaveOk(txn, state, shardId, primary, nss, vinfo, manager); const string& ns = _qSpec.ns(); @@ -1127,7 +1091,7 @@ void ParallelSortClusteredCursor::finishInit(OperationContext* txn) { _cursors[index].reset(mdata.pcState->cursor.get(), &mdata); { - const auto shard = grid.shardRegistry()->getShard(i->first); + const auto shard = grid.shardRegistry()->getShard(txn, i->first); _servers.insert(shard->getConnString().toString()); } @@ -1153,8 +1117,8 @@ int ParallelSortClusteredCursor::getNumQueryShards() { return _cursorMap.size(); } -std::shared_ptr<Shard> ParallelSortClusteredCursor::getQueryShard() { - return grid.shardRegistry()->getShard(_cursorMap.begin()->first); +ShardId ParallelSortClusteredCursor::getQueryShardId() { + return _cursorMap.begin()->first; } void ParallelSortClusteredCursor::getQueryShardIds(set<ShardId>& shardIds) { diff --git a/src/mongo/client/parallel.h b/src/mongo/client/parallel.h index bfb6c115123..ac507d91eca 100644 --- a/src/mongo/client/parallel.h +++ b/src/mongo/client/parallel.h @@ -194,7 +194,7 @@ public: * Returns the single shard with an open cursor. * It is an error to call this if getNumQueryShards() > 1 */ - std::shared_ptr<Shard> getQueryShard(); + ShardId getQueryShardId(); /** * Returns primary shard with an open cursor. @@ -204,9 +204,6 @@ public: DBClientCursorPtr getShardCursor(const ShardId& shardId); - BSONObj toBSON() const; - std::string toString() const; - void explain(BSONObjBuilder& b); private: @@ -251,7 +248,8 @@ private: * set connection and the primary cannot be reached, the version * will not be set if the slaveOk flag is set. */ - void setupVersionAndHandleSlaveOk(PCStatePtr state /* in & out */, + void setupVersionAndHandleSlaveOk(OperationContext* txn, + PCStatePtr state /* in & out */, const ShardId& shardId, std::shared_ptr<Shard> primary /* in */, const NamespaceString& ns, diff --git a/src/mongo/db/s/metadata_loader.cpp b/src/mongo/db/s/metadata_loader.cpp index b6a5aee155e..9a0823fb0b3 100644 --- a/src/mongo/db/s/metadata_loader.cpp +++ b/src/mongo/db/s/metadata_loader.cpp @@ -66,11 +66,11 @@ public: return chunk.getShard() == _currShard; } - virtual pair<BSONObj, BSONObj> rangeFor(const ChunkType& chunk) const { + virtual pair<BSONObj, BSONObj> rangeFor(OperationContext* txn, const ChunkType& chunk) const { return make_pair(chunk.getMin(), chunk.getMax()); } - virtual string shardFor(const string& name) const { + virtual string shardFor(OperationContext* txn, const string& name) const { return name; } @@ -193,7 +193,7 @@ Status MetadataLoader::initChunks(OperationContext* txn, // last time). If not, something has changed on the config server (potentially between // when we read the collection data and when we read the chunks data). // - int diffsApplied = differ.calculateConfigDiff(chunks); + int diffsApplied = differ.calculateConfigDiff(txn, chunks); if (diffsApplied > 0) { // Chunks found, return ok LOG(2) << "loaded " << diffsApplied << " chunks into new metadata for " << ns diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript index 8de255b7b2d..7a1662cc32b 100644 --- a/src/mongo/s/SConscript +++ b/src/mongo/s/SConscript @@ -59,6 +59,7 @@ env.CppUnitTest( ], LIBDEPS=[ 'common', + '$BUILD_DIR/mongo/db/service_context', ] ) diff --git a/src/mongo/s/balance.cpp b/src/mongo/s/balance.cpp index d9234b7f116..5971e9b04cf 100644 --- a/src/mongo/s/balance.cpp +++ b/src/mongo/s/balance.cpp @@ -213,7 +213,7 @@ void Balancer::_ping(OperationContext* txn, bool waiting) { NULL); } -bool Balancer::_checkOIDs() { +bool Balancer::_checkOIDs(OperationContext* txn) { vector<ShardId> all; grid.shardRegistry()->getAllShardIds(&all); @@ -221,7 +221,7 @@ bool Balancer::_checkOIDs() { map<int, string> oids; for (const ShardId& shardId : all) { - const auto s = grid.shardRegistry()->getShard(shardId); + const auto s = grid.shardRegistry()->getShard(txn, shardId); if (!s) { continue; } @@ -242,7 +242,7 @@ bool Balancer::_checkOIDs() { uassertStatusOK(grid.shardRegistry()->runCommand( shardHost, "admin", BSON("features" << 1 << "oidReset" << 1))); - const auto otherShard = grid.shardRegistry()->getShard(oids[x]); + const auto otherShard = grid.shardRegistry()->getShard(txn, oids[x]); if (otherShard) { const auto otherShardHost = uassertStatusOK(otherShard->getTargeter()->findHost( {ReadPreference::PrimaryOnly, TagSet::primaryOnly()})); @@ -439,7 +439,7 @@ void Balancer::_doBalanceRound(OperationContext* txn, } } -bool Balancer::_init() { +bool Balancer::_init(OperationContext* txn) { try { log() << "about to contact config servers and shards"; @@ -447,8 +447,8 @@ bool Balancer::_init() { // checks that each shard is indeed a different process (no hostname mixup) // these checks are redundant in that they're redone at every new round but we want to do // them initially here so to catch any problem soon - grid.shardRegistry()->reload(); - _checkOIDs(); + grid.shardRegistry()->reload(txn); + _checkOIDs(txn); log() << "config servers and shards contacted successfully"; @@ -474,7 +474,8 @@ void Balancer::run() { // This is the body of a BackgroundJob so if we throw here we're basically ending the balancer // thread prematurely. while (!inShutdown()) { - if (!_init()) { + auto txn = cc().makeOperationContext(); + if (!_init(txn.get())) { log() << "will retry to initialize balancer in one minute"; sleepsecs(60); continue; @@ -501,7 +502,7 @@ void Balancer::run() { BSONObj balancerResult; // use fresh shard state - grid.shardRegistry()->reload(); + grid.shardRegistry()->reload(txn.get()); // refresh chunk size (even though another balancer might be active) Chunk::refreshChunkSize(txn.get()); @@ -529,7 +530,7 @@ void Balancer::run() { continue; } - uassert(13258, "oids broken after resetting!", _checkOIDs()); + uassert(13258, "oids broken after resetting!", _checkOIDs(txn.get())); { auto scopedDistLock = grid.catalogManager(txn.get()) diff --git a/src/mongo/s/balance.h b/src/mongo/s/balance.h index 27b9d217640..861508c96e3 100644 --- a/src/mongo/s/balance.h +++ b/src/mongo/s/balance.h @@ -83,7 +83,7 @@ private: * * This method throws on a network exception */ - bool _init(); + bool _init(OperationContext* txn); /** * Gathers all the necessary information about shards and chunks, and decides whether there are @@ -118,7 +118,7 @@ private: * @return true if all the servers listed in configdb as being shards are reachable and are * distinct processes */ - bool _checkOIDs(); + bool _checkOIDs(OperationContext* txn); }; extern Balancer balancer; diff --git a/src/mongo/s/balancer_policy.cpp b/src/mongo/s/balancer_policy.cpp index 55aa205dfd7..1b81d8b3ec5 100644 --- a/src/mongo/s/balancer_policy.cpp +++ b/src/mongo/s/balancer_policy.cpp @@ -66,8 +66,10 @@ namespace { * ShardNotFound if shard by that id is not available on the registry * NoSuchKey if the version could not be retrieved */ -std::string retrieveShardMongoDVersion(ShardId shardId, ShardRegistry* shardRegistry) { - auto shard = shardRegistry->getShard(shardId); +std::string retrieveShardMongoDVersion(OperationContext* txn, + ShardId shardId, + ShardRegistry* shardRegistry) { + auto shard = shardRegistry->getShard(txn, shardId); if (!shard) { uassertStatusOK({ErrorCodes::ShardNotFound, "Shard not found"}); } @@ -276,7 +278,7 @@ void DistributionStatus::dump() const { Status DistributionStatus::populateShardInfoMap(OperationContext* txn, ShardInfoMap* shardInfo) { try { vector<ShardType> shards; - Status status = grid.catalogManager(txn)->getAllShards(&shards); + Status status = grid.catalogManager(txn)->getAllShards(txn, &shards); if (!status.isOK()) { return status; } @@ -285,10 +287,10 @@ Status DistributionStatus::populateShardInfoMap(OperationContext* txn, ShardInfo std::set<std::string> dummy; const long long shardSizeBytes = uassertStatusOK( - shardutil::retrieveTotalShardSize(shardData.getName(), grid.shardRegistry())); + shardutil::retrieveTotalShardSize(txn, shardData.getName(), grid.shardRegistry())); const std::string shardMongodVersion = - retrieveShardMongoDVersion(shardData.getName(), grid.shardRegistry()); + retrieveShardMongoDVersion(txn, shardData.getName(), grid.shardRegistry()); ShardInfo newShardEntry(shardData.getMaxSizeMB(), shardSizeBytes / 1024 / 1024, diff --git a/src/mongo/s/catalog/catalog_manager.h b/src/mongo/s/catalog/catalog_manager.h index 8548368ed49..2241de16392 100644 --- a/src/mongo/s/catalog/catalog_manager.h +++ b/src/mongo/s/catalog/catalog_manager.h @@ -286,7 +286,7 @@ public: * Retrieves all shards in this sharded cluster. * Returns a !OK status if an error occurs. */ - virtual Status getAllShards(std::vector<ShardType>* shards) = 0; + virtual Status getAllShards(OperationContext* txn, std::vector<ShardType>* shards) = 0; /** * Runs a user management command on the config servers, potentially synchronizing through diff --git a/src/mongo/s/catalog/catalog_manager_common.cpp b/src/mongo/s/catalog/catalog_manager_common.cpp index 2d82b767c12..ebdff4f5d1d 100644 --- a/src/mongo/s/catalog/catalog_manager_common.cpp +++ b/src/mongo/s/catalog/catalog_manager_common.cpp @@ -330,7 +330,7 @@ StatusWith<string> CatalogManagerCommon::addShard(OperationContext* txn, // If a name for a shard wasn't provided, generate one if (shardType.getName().empty()) { - StatusWith<string> result = _generateNewShardName(); + StatusWith<string> result = _generateNewShardName(txn); if (!result.isOK()) { return Status(ErrorCodes::OperationFailed, "error generating new shard name"); } @@ -351,7 +351,7 @@ StatusWith<string> CatalogManagerCommon::addShard(OperationContext* txn, } // Make sure the new shard is visible - grid.shardRegistry()->reload(); + grid.shardRegistry()->reload(txn); // Add all databases which were discovered on the new shard for (const string& dbName : dbNamesStatus.getValue()) { @@ -437,13 +437,13 @@ Status CatalogManagerCommon::createDatabase(OperationContext* txn, const std::st } // check for case sensitivity violations - Status status = _checkDbDoesNotExist(dbName, nullptr); + Status status = _checkDbDoesNotExist(txn, dbName, nullptr); if (!status.isOK()) { return status; } // Database does not exist, pick a shard and create a new entry - auto newShardIdStatus = selectShardForNewDatabase(grid.shardRegistry()); + auto newShardIdStatus = selectShardForNewDatabase(txn, grid.shardRegistry()); if (!newShardIdStatus.isOK()) { return newShardIdStatus.getStatus(); } @@ -468,12 +468,13 @@ Status CatalogManagerCommon::createDatabase(OperationContext* txn, const std::st } // static -StatusWith<ShardId> CatalogManagerCommon::selectShardForNewDatabase(ShardRegistry* shardRegistry) { +StatusWith<ShardId> CatalogManagerCommon::selectShardForNewDatabase(OperationContext* txn, + ShardRegistry* shardRegistry) { vector<ShardId> allShardIds; shardRegistry->getAllShardIds(&allShardIds); if (allShardIds.empty()) { - shardRegistry->reload(); + shardRegistry->reload(txn); shardRegistry->getAllShardIds(&allShardIds); if (allShardIds.empty()) { @@ -483,7 +484,8 @@ StatusWith<ShardId> CatalogManagerCommon::selectShardForNewDatabase(ShardRegistr ShardId candidateShardId = allShardIds[0]; - auto candidateSizeStatus = shardutil::retrieveTotalShardSize(candidateShardId, shardRegistry); + auto candidateSizeStatus = + shardutil::retrieveTotalShardSize(txn, candidateShardId, shardRegistry); if (!candidateSizeStatus.isOK()) { return candidateSizeStatus.getStatus(); } @@ -491,7 +493,7 @@ StatusWith<ShardId> CatalogManagerCommon::selectShardForNewDatabase(ShardRegistr for (size_t i = 1; i < allShardIds.size(); i++) { const ShardId shardId = allShardIds[i]; - const auto sizeStatus = shardutil::retrieveTotalShardSize(shardId, shardRegistry); + const auto sizeStatus = shardutil::retrieveTotalShardSize(txn, shardId, shardRegistry); if (!sizeStatus.isOK()) { return sizeStatus.getStatus(); } @@ -519,10 +521,10 @@ Status CatalogManagerCommon::enableSharding(OperationContext* txn, const std::st } // Check for case sensitivity violations - Status status = _checkDbDoesNotExist(dbName, &db); + Status status = _checkDbDoesNotExist(txn, dbName, &db); if (status.isOK()) { // Database does not exist, create a new entry - auto newShardIdStatus = selectShardForNewDatabase(grid.shardRegistry()); + auto newShardIdStatus = selectShardForNewDatabase(txn, grid.shardRegistry()); if (!newShardIdStatus.isOK()) { return newShardIdStatus.getStatus(); } diff --git a/src/mongo/s/catalog/catalog_manager_common.h b/src/mongo/s/catalog/catalog_manager_common.h index 42bd0a00f33..3c82e2684fd 100644 --- a/src/mongo/s/catalog/catalog_manager_common.h +++ b/src/mongo/s/catalog/catalog_manager_common.h @@ -70,7 +70,8 @@ protected: * Selects an optimal shard on which to place a newly created database from the set of * available shards. Will return ShardNotFound if shard could not be found. */ - static StatusWith<ShardId> selectShardForNewDatabase(ShardRegistry* shardRegistry); + static StatusWith<ShardId> selectShardForNewDatabase(OperationContext* txn, + ShardRegistry* shardRegistry); CatalogManagerCommon() = default; @@ -85,12 +86,14 @@ private: * NamespaceExists if it exists with the same casing * DatabaseDifferCase if it exists under different casing. */ - virtual Status _checkDbDoesNotExist(const std::string& dbName, DatabaseType* db) = 0; + virtual Status _checkDbDoesNotExist(OperationContext* txn, + const std::string& dbName, + DatabaseType* db) = 0; /** * Generates a unique name to be given to a newly added shard. */ - virtual StatusWith<std::string> _generateNewShardName() = 0; + virtual StatusWith<std::string> _generateNewShardName(OperationContext* txn) = 0; }; } // namespace mongo diff --git a/src/mongo/s/catalog/catalog_manager_mock.cpp b/src/mongo/s/catalog/catalog_manager_mock.cpp index ee4bba4f8ff..160cdb35e62 100644 --- a/src/mongo/s/catalog/catalog_manager_mock.cpp +++ b/src/mongo/s/catalog/catalog_manager_mock.cpp @@ -132,7 +132,7 @@ StatusWith<string> CatalogManagerMock::getTagForChunk(OperationContext* txn, return string(); } -Status CatalogManagerMock::getAllShards(vector<ShardType>* shards) { +Status CatalogManagerMock::getAllShards(OperationContext* txn, vector<ShardType>* shards) { return Status::OK(); } @@ -185,11 +185,13 @@ DistLockManager* CatalogManagerMock::getDistLockManager() { return _mockDistLockMgr.get(); } -Status CatalogManagerMock::_checkDbDoesNotExist(const std::string& dbName, DatabaseType* db) { +Status CatalogManagerMock::_checkDbDoesNotExist(OperationContext* txn, + const std::string& dbName, + DatabaseType* db) { return Status::OK(); } -StatusWith<std::string> CatalogManagerMock::_generateNewShardName() { +StatusWith<std::string> CatalogManagerMock::_generateNewShardName(OperationContext* txn) { return {ErrorCodes::InternalError, "Method not implemented"}; } diff --git a/src/mongo/s/catalog/catalog_manager_mock.h b/src/mongo/s/catalog/catalog_manager_mock.h index 9e183842af2..de697f195a2 100644 --- a/src/mongo/s/catalog/catalog_manager_mock.h +++ b/src/mongo/s/catalog/catalog_manager_mock.h @@ -104,7 +104,7 @@ public: const std::string& collectionNs, const ChunkType& chunk) override; - Status getAllShards(std::vector<ShardType>* shards) override; + Status getAllShards(OperationContext* txn, std::vector<ShardType>* shards) override; bool runUserManagementWriteCommand(OperationContext* txn, const std::string& commandName, @@ -146,9 +146,11 @@ public: Status initConfigVersion(OperationContext* txn) override; private: - Status _checkDbDoesNotExist(const std::string& dbName, DatabaseType* db) override; + Status _checkDbDoesNotExist(OperationContext* txn, + const std::string& dbName, + DatabaseType* db) override; - StatusWith<std::string> _generateNewShardName() override; + StatusWith<std::string> _generateNewShardName(OperationContext* txn) override; std::unique_ptr<DistLockManagerMock> _mockDistLockMgr; }; diff --git a/src/mongo/s/catalog/dist_lock_catalog_impl.cpp b/src/mongo/s/catalog/dist_lock_catalog_impl.cpp index 432b96494a5..b67a0472e43 100644 --- a/src/mongo/s/catalog/dist_lock_catalog_impl.cpp +++ b/src/mongo/s/catalog/dist_lock_catalog_impl.cpp @@ -157,7 +157,7 @@ DistLockCatalogImpl::DistLockCatalogImpl(ShardRegistry* shardRegistry, DistLockCatalogImpl::~DistLockCatalogImpl() = default; RemoteCommandTargeter* DistLockCatalogImpl::_targeter() { - return _client->getShard("config")->getTargeter(); + return _client->getConfigShard()->getTargeter(); } StatusWith<LockpingsType> DistLockCatalogImpl::getPing(StringData processID) { @@ -203,8 +203,8 @@ Status DistLockCatalogImpl::ping(StringData processID, Date_t ping) { request.setUpsert(true); request.setWriteConcern(_writeConcern); - auto resultStatus = _client->runCommandWithNotMasterRetries( - "config", _locksNS.db().toString(), request.toBSON()); + auto resultStatus = + _client->runCommandOnConfigWithNotMasterRetries(_locksNS.db().toString(), request.toBSON()); if (!resultStatus.isOK()) { return resultStatus.getStatus(); @@ -234,8 +234,8 @@ StatusWith<LocksType> DistLockCatalogImpl::grabLock(StringData lockID, request.setShouldReturnNew(true); request.setWriteConcern(_writeConcern); - auto resultStatus = _client->runCommandWithNotMasterRetries( - "config", _locksNS.db().toString(), request.toBSON()); + auto resultStatus = + _client->runCommandOnConfigWithNotMasterRetries(_locksNS.db().toString(), request.toBSON()); if (!resultStatus.isOK()) { return resultStatus.getStatus(); @@ -287,8 +287,8 @@ StatusWith<LocksType> DistLockCatalogImpl::overtakeLock(StringData lockID, request.setShouldReturnNew(true); request.setWriteConcern(_writeConcern); - auto resultStatus = _client->runCommandWithNotMasterRetries( - "config", _locksNS.db().toString(), request.toBSON()); + auto resultStatus = + _client->runCommandOnConfigWithNotMasterRetries(_locksNS.db().toString(), request.toBSON()); if (!resultStatus.isOK()) { return resultStatus.getStatus(); @@ -319,8 +319,8 @@ Status DistLockCatalogImpl::unlock(const OID& lockSessionID) { BSON("$set" << BSON(LocksType::state(LocksType::UNLOCKED)))); request.setWriteConcern(_writeConcern); - auto resultStatus = _client->runCommandWithNotMasterRetries( - "config", _locksNS.db().toString(), request.toBSON()); + auto resultStatus = + _client->runCommandOnConfigWithNotMasterRetries(_locksNS.db().toString(), request.toBSON()); if (!resultStatus.isOK()) { return resultStatus.getStatus(); @@ -448,8 +448,8 @@ Status DistLockCatalogImpl::stopPing(StringData processId) { FindAndModifyRequest::makeRemove(_lockPingNS, BSON(LockpingsType::process() << processId)); request.setWriteConcern(_writeConcern); - auto resultStatus = _client->runCommandWithNotMasterRetries( - "config", _locksNS.db().toString(), request.toBSON()); + auto resultStatus = + _client->runCommandOnConfigWithNotMasterRetries(_locksNS.db().toString(), request.toBSON()); if (!resultStatus.isOK()) { return resultStatus.getStatus(); diff --git a/src/mongo/s/catalog/dist_lock_catalog_impl_test.cpp b/src/mongo/s/catalog/dist_lock_catalog_impl_test.cpp index b9fe304530e..e635a8f2554 100644 --- a/src/mongo/s/catalog/dist_lock_catalog_impl_test.cpp +++ b/src/mongo/s/catalog/dist_lock_catalog_impl_test.cpp @@ -88,7 +88,7 @@ public: } RemoteCommandTargeterMock* targeter() { - return RemoteCommandTargeterMock::get(_shardRegistry->getShard("config")->getTargeter()); + return RemoteCommandTargeterMock::get(_shardRegistry->getConfigShard()->getTargeter()); } DistLockCatalogImpl* catalog() { diff --git a/src/mongo/s/catalog/forwarding_catalog_manager.cpp b/src/mongo/s/catalog/forwarding_catalog_manager.cpp index a3fae98e0ed..83701165426 100644 --- a/src/mongo/s/catalog/forwarding_catalog_manager.cpp +++ b/src/mongo/s/catalog/forwarding_catalog_manager.cpp @@ -286,11 +286,12 @@ StatusWith<std::string> ForwardingCatalogManager::getTagForChunk(OperationContex return retry([&] { return _actual->getTagForChunk(txn, collectionNs, chunk); }); } -Status ForwardingCatalogManager::getAllShards(std::vector<ShardType>* shards) { +Status ForwardingCatalogManager::getAllShards(OperationContext* txn, + std::vector<ShardType>* shards) { invariant(shards->empty()); return retry([&] { shards->clear(); - return _actual->getAllShards(shards); + return _actual->getAllShards(txn, shards); }); } diff --git a/src/mongo/s/catalog/forwarding_catalog_manager.h b/src/mongo/s/catalog/forwarding_catalog_manager.h index 69d67658dcb..7ee3740df66 100644 --- a/src/mongo/s/catalog/forwarding_catalog_manager.h +++ b/src/mongo/s/catalog/forwarding_catalog_manager.h @@ -139,7 +139,7 @@ public: const std::string& collectionNs, const ChunkType& chunk) override; - Status getAllShards(std::vector<ShardType>* shards) override; + Status getAllShards(OperationContext* txn, std::vector<ShardType>* shards) override; bool runUserManagementWriteCommand(OperationContext* txn, const std::string& commandName, diff --git a/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp b/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp index ccf6b7a2bb7..dce58700e54 100644 --- a/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp +++ b/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp @@ -300,7 +300,7 @@ Status CatalogManagerLegacy::shardCollection(OperationContext* txn, collectionDetail.append("collection", ns); string dbPrimaryShardStr; { - const auto shard = grid.shardRegistry()->getShard(dbPrimaryShardId); + const auto shard = grid.shardRegistry()->getShard(txn, dbPrimaryShardId); dbPrimaryShardStr = shard->toString(); } collectionDetail.append("primary", dbPrimaryShardStr); @@ -341,7 +341,7 @@ Status CatalogManagerLegacy::shardCollection(OperationContext* txn, } try { - const auto shard = grid.shardRegistry()->getShard(dbPrimaryShardId); + const auto shard = grid.shardRegistry()->getShard(txn, dbPrimaryShardId); ShardConnection conn(shard->getConnString(), ns); bool isVersionSet = conn.setVersion(); conn.done(); @@ -401,7 +401,7 @@ StatusWith<ShardDrainingStatus> CatalogManagerLegacy::removeShard(OperationConte return status; } - grid.shardRegistry()->reload(); + grid.shardRegistry()->reload(txn); conn.done(); // Record start in changelog @@ -431,7 +431,7 @@ StatusWith<ShardDrainingStatus> CatalogManagerLegacy::removeShard(OperationConte } grid.shardRegistry()->remove(name); - grid.shardRegistry()->reload(); + grid.shardRegistry()->reload(txn); conn.done(); // Record finish in changelog @@ -540,7 +540,7 @@ Status CatalogManagerLegacy::dropCollection(OperationContext* txn, const Namespa txn, txn->getClient()->clientAddress(true), "dropCollection.start", ns.ns(), BSONObj()); vector<ShardType> allShards; - Status status = getAllShards(&allShards); + Status status = getAllShards(txn, &allShards); if (!status.isOK()) { return status; } @@ -560,7 +560,7 @@ Status CatalogManagerLegacy::dropCollection(OperationContext* txn, const Namespa for (const auto& shardEntry : allShards) { auto dropResult = shardRegistry->runCommandWithNotMasterRetries( - shardEntry.getName(), ns.db().toString(), BSON("drop" << ns.coll())); + txn, shardEntry.getName(), ns.db().toString(), BSON("drop" << ns.coll())); if (!dropResult.isOK()) { return dropResult.getStatus(); @@ -625,7 +625,7 @@ Status CatalogManagerLegacy::dropCollection(OperationContext* txn, const Namespa true); auto ssvResult = shardRegistry->runCommandWithNotMasterRetries( - shardEntry.getName(), "admin", ssv.toBSON()); + txn, shardEntry.getName(), "admin", ssv.toBSON()); if (!ssvResult.isOK()) { return ssvResult.getStatus(); @@ -637,7 +637,7 @@ Status CatalogManagerLegacy::dropCollection(OperationContext* txn, const Namespa } auto unsetShardingStatus = shardRegistry->runCommandWithNotMasterRetries( - shardEntry.getName(), "admin", BSON("unsetSharding" << 1)); + txn, shardEntry.getName(), "admin", BSON("unsetSharding" << 1)); if (!unsetShardingStatus.isOK()) { return unsetShardingStatus.getStatus(); @@ -918,7 +918,7 @@ StatusWith<string> CatalogManagerLegacy::getTagForChunk(OperationContext* txn, return status.getStatus(); } -Status CatalogManagerLegacy::getAllShards(vector<ShardType>* shards) { +Status CatalogManagerLegacy::getAllShards(OperationContext* txn, vector<ShardType>* shards) { ScopedDbConnection conn(_configServerConnectionString, 30.0); std::unique_ptr<DBClientCursor> cursor( _safeCursor(conn->query(ShardType::ConfigNS, BSONObj()))); @@ -1118,7 +1118,9 @@ void CatalogManagerLegacy::writeConfigServerDirect(OperationContext* txn, exec.executeBatch(request, response); } -Status CatalogManagerLegacy::_checkDbDoesNotExist(const std::string& dbName, DatabaseType* db) { +Status CatalogManagerLegacy::_checkDbDoesNotExist(OperationContext* txn, + const std::string& dbName, + DatabaseType* db) { ScopedDbConnection conn(_configServerConnectionString, 30); BSONObjBuilder b; @@ -1153,7 +1155,7 @@ Status CatalogManagerLegacy::_checkDbDoesNotExist(const std::string& dbName, Dat return Status::OK(); } -StatusWith<string> CatalogManagerLegacy::_generateNewShardName() { +StatusWith<string> CatalogManagerLegacy::_generateNewShardName(OperationContext* txn) { BSONObj o; { ScopedDbConnection conn(_configServerConnectionString, 30); diff --git a/src/mongo/s/catalog/legacy/catalog_manager_legacy.h b/src/mongo/s/catalog/legacy/catalog_manager_legacy.h index 70e0a9d69aa..214929f132f 100644 --- a/src/mongo/s/catalog/legacy/catalog_manager_legacy.h +++ b/src/mongo/s/catalog/legacy/catalog_manager_legacy.h @@ -101,7 +101,7 @@ public: const std::string& collectionNs, const ChunkType& chunk) override; - Status getAllShards(std::vector<ShardType>* shards) override; + Status getAllShards(OperationContext* txn, std::vector<ShardType>* shards) override; /** * Grabs a distributed lock and runs the command on all config servers. @@ -146,9 +146,11 @@ public: Status initConfigVersion(OperationContext* txn) override; private: - Status _checkDbDoesNotExist(const std::string& dbName, DatabaseType* db) override; + Status _checkDbDoesNotExist(OperationContext* txn, + const std::string& dbName, + DatabaseType* db) override; - StatusWith<std::string> _generateNewShardName() override; + StatusWith<std::string> _generateNewShardName(OperationContext* txn) override; /** * Starts the thread that periodically checks data consistency amongst the config servers. diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp index a093c63b8ab..8878f4fd68e 100644 --- a/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp +++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp @@ -148,7 +148,7 @@ Status CatalogManagerReplicaSet::shardCollection(OperationContext* txn, } ShardId dbPrimaryShardId = status.getValue().value.getPrimary(); - const auto primaryShard = grid.shardRegistry()->getShard(dbPrimaryShardId); + const auto primaryShard = grid.shardRegistry()->getShard(txn, dbPrimaryShardId); { // In 3.0 and prior we include this extra safety check that the collection is not getting @@ -157,7 +157,7 @@ Status CatalogManagerReplicaSet::shardCollection(OperationContext* txn, // toes. Now we take the distributed lock so going forward this check won't be necessary // but we leave it around for compatibility with other mongoses from 3.0. // TODO(spencer): Remove this after 3.2 ships. - const auto configShard = grid.shardRegistry()->getShard("config"); + const auto configShard = grid.shardRegistry()->getShard(txn, "config"); const auto readHost = configShard->getTargeter()->findHost(kConfigReadSelector); if (!readHost.isOK()) { return readHost.getStatus(); @@ -219,7 +219,7 @@ Status CatalogManagerReplicaSet::shardCollection(OperationContext* txn, true); auto ssvStatus = grid.shardRegistry()->runCommandWithNotMasterRetries( - dbPrimaryShardId, "admin", ssv.toBSON()); + txn, dbPrimaryShardId, "admin", ssv.toBSON()); if (!ssvStatus.isOK()) { warning() << "could not update initial version of " << ns << " on shard primary " << dbPrimaryShardId << ssvStatus.getStatus(); @@ -236,7 +236,7 @@ Status CatalogManagerReplicaSet::shardCollection(OperationContext* txn, StatusWith<ShardDrainingStatus> CatalogManagerReplicaSet::removeShard(OperationContext* txn, const std::string& name) { - const auto configShard = grid.shardRegistry()->getShard("config"); + const auto configShard = grid.shardRegistry()->getShard(txn, "config"); const auto readHost = configShard->getTargeter()->findHost(kConfigReadSelector); if (!readHost.isOK()) { return readHost.getStatus(); @@ -288,7 +288,7 @@ StatusWith<ShardDrainingStatus> CatalogManagerReplicaSet::removeShard(OperationC return status; } - grid.shardRegistry()->reload(); + grid.shardRegistry()->reload(txn); // Record start in changelog logChange(txn, @@ -333,7 +333,7 @@ StatusWith<ShardDrainingStatus> CatalogManagerReplicaSet::removeShard(OperationC } grid.shardRegistry()->remove(name); - grid.shardRegistry()->reload(); + grid.shardRegistry()->reload(txn); // Record finish in changelog logChange(txn, txn->getClient()->clientAddress(true), "removeShard", "", BSON("shard" << name)); @@ -355,7 +355,7 @@ StatusWith<OpTimePair<DatabaseType>> CatalogManagerReplicaSet::getDatabase( return OpTimePair<DatabaseType>(dbt); } - const auto configShard = grid.shardRegistry()->getShard("config"); + const auto configShard = grid.shardRegistry()->getShard(txn, "config"); const auto readHost = configShard->getTargeter()->findHost(kConfigReadSelector); if (!readHost.isOK()) { return readHost.getStatus(); @@ -387,7 +387,7 @@ StatusWith<OpTimePair<DatabaseType>> CatalogManagerReplicaSet::getDatabase( StatusWith<OpTimePair<CollectionType>> CatalogManagerReplicaSet::getCollection( OperationContext* txn, const std::string& collNs) { - auto configShard = grid.shardRegistry()->getShard("config"); + auto configShard = grid.shardRegistry()->getShard(txn, "config"); auto readHostStatus = configShard->getTargeter()->findHost(kConfigReadSelector); if (!readHostStatus.isOK()) { @@ -431,7 +431,7 @@ Status CatalogManagerReplicaSet::getCollections(OperationContext* txn, string(str::stream() << "^" << pcrecpp::RE::QuoteMeta(*dbName) << "\\.")); } - auto configShard = grid.shardRegistry()->getShard("config"); + auto configShard = grid.shardRegistry()->getShard(txn, "config"); auto readHost = configShard->getTargeter()->findHost(kConfigReadSelector); if (!readHost.isOK()) { return readHost.getStatus(); @@ -473,7 +473,7 @@ Status CatalogManagerReplicaSet::dropCollection(OperationContext* txn, const Nam txn, txn->getClient()->clientAddress(true), "dropCollection.start", ns.ns(), BSONObj()); vector<ShardType> allShards; - Status status = getAllShards(&allShards); + Status status = getAllShards(txn, &allShards); if (!status.isOK()) { return status; } @@ -493,7 +493,7 @@ Status CatalogManagerReplicaSet::dropCollection(OperationContext* txn, const Nam for (const auto& shardEntry : allShards) { auto dropResult = shardRegistry->runCommandWithNotMasterRetries( - shardEntry.getName(), ns.db().toString(), BSON("drop" << ns.coll())); + txn, shardEntry.getName(), ns.db().toString(), BSON("drop" << ns.coll())); if (!dropResult.isOK()) { return dropResult.getStatus(); @@ -563,7 +563,7 @@ Status CatalogManagerReplicaSet::dropCollection(OperationContext* txn, const Nam true); auto ssvResult = shardRegistry->runCommandWithNotMasterRetries( - shardEntry.getName(), "admin", ssv.toBSON()); + txn, shardEntry.getName(), "admin", ssv.toBSON()); if (!ssvResult.isOK()) { return ssvResult.getStatus(); @@ -575,7 +575,7 @@ Status CatalogManagerReplicaSet::dropCollection(OperationContext* txn, const Nam } auto unsetShardingStatus = shardRegistry->runCommandWithNotMasterRetries( - shardEntry.getName(), "admin", BSON("unsetSharding" << 1)); + txn, shardEntry.getName(), "admin", BSON("unsetSharding" << 1)); if (!unsetShardingStatus.isOK()) { return unsetShardingStatus.getStatus(); @@ -667,7 +667,7 @@ void CatalogManagerReplicaSet::logChange(OperationContext* txn, StatusWith<SettingsType> CatalogManagerReplicaSet::getGlobalSettings(OperationContext* txn, const string& key) { - const auto configShard = grid.shardRegistry()->getShard("config"); + const auto configShard = grid.shardRegistry()->getShard(txn, "config"); const auto readHost = configShard->getTargeter()->findHost(kConfigReadSelector); if (!readHost.isOK()) { return readHost.getStatus(); @@ -709,7 +709,7 @@ StatusWith<SettingsType> CatalogManagerReplicaSet::getGlobalSettings(OperationCo Status CatalogManagerReplicaSet::getDatabasesForShard(OperationContext* txn, const string& shardName, vector<string>* dbs) { - auto configShard = grid.shardRegistry()->getShard("config"); + auto configShard = grid.shardRegistry()->getShard(txn, "config"); auto readHost = configShard->getTargeter()->findHost(kConfigReadSelector); if (!readHost.isOK()) { return readHost.getStatus(); @@ -746,7 +746,7 @@ Status CatalogManagerReplicaSet::getChunks(OperationContext* txn, OpTime* opTime) { chunks->clear(); - auto configShard = grid.shardRegistry()->getShard("config"); + auto configShard = grid.shardRegistry()->getShard(txn, "config"); auto readHostStatus = configShard->getTargeter()->findHost(kConfigReadSelector); if (!readHostStatus.isOK()) { return readHostStatus.getStatus(); @@ -786,7 +786,7 @@ Status CatalogManagerReplicaSet::getTagsForCollection(OperationContext* txn, std::vector<TagsType>* tags) { tags->clear(); - auto configShard = grid.shardRegistry()->getShard("config"); + auto configShard = grid.shardRegistry()->getShard(txn, "config"); auto readHostStatus = configShard->getTargeter()->findHost(kConfigReadSelector); if (!readHostStatus.isOK()) { return readHostStatus.getStatus(); @@ -818,7 +818,7 @@ Status CatalogManagerReplicaSet::getTagsForCollection(OperationContext* txn, StatusWith<string> CatalogManagerReplicaSet::getTagForChunk(OperationContext* txn, const std::string& collectionNs, const ChunkType& chunk) { - auto configShard = grid.shardRegistry()->getShard("config"); + auto configShard = grid.shardRegistry()->getShard(txn, "config"); auto readHostStatus = configShard->getTargeter()->findHost(kConfigReadSelector); if (!readHostStatus.isOK()) { return readHostStatus.getStatus(); @@ -850,8 +850,8 @@ StatusWith<string> CatalogManagerReplicaSet::getTagForChunk(OperationContext* tx return tagsResult.getValue().getTag(); } -Status CatalogManagerReplicaSet::getAllShards(vector<ShardType>* shards) { - const auto configShard = grid.shardRegistry()->getShard("config"); +Status CatalogManagerReplicaSet::getAllShards(OperationContext* txn, vector<ShardType>* shards) { + const auto configShard = grid.shardRegistry()->getShard(txn, "config"); const auto readHost = configShard->getTargeter()->findHost(kConfigReadSelector); if (!readHost.isOK()) { return readHost.getStatus(); @@ -908,14 +908,14 @@ bool CatalogManagerReplicaSet::runReadCommand(OperationContext* txn, cmdBuilder.appendElements(cmdObj); _appendReadConcern(&cmdBuilder); - return _runReadCommand(dbname, cmdBuilder.done(), kConfigReadSelector, result); + return _runReadCommand(txn, dbname, cmdBuilder.done(), kConfigReadSelector, result); } bool CatalogManagerReplicaSet::runUserManagementReadCommand(OperationContext* txn, const std::string& dbname, const BSONObj& cmdObj, BSONObjBuilder* result) { - return _runReadCommand(dbname, cmdObj, kConfigPrimaryPreferredSelector, result); + return _runReadCommand(txn, dbname, cmdObj, kConfigPrimaryPreferredSelector, result); } Status CatalogManagerReplicaSet::applyChunkOpsDeprecated(OperationContext* txn, @@ -964,12 +964,14 @@ void CatalogManagerReplicaSet::writeConfigServerDirect(OperationContext* txn, } } -Status CatalogManagerReplicaSet::_checkDbDoesNotExist(const string& dbName, DatabaseType* db) { +Status CatalogManagerReplicaSet::_checkDbDoesNotExist(OperationContext* txn, + const string& dbName, + DatabaseType* db) { BSONObjBuilder queryBuilder; queryBuilder.appendRegex( DatabaseType::name(), (string) "^" + pcrecpp::RE::QuoteMeta(dbName) + "$", "i"); - const auto configShard = grid.shardRegistry()->getShard("config"); + const auto configShard = grid.shardRegistry()->getShard(txn, "config"); const auto readHost = configShard->getTargeter()->findHost(kConfigReadSelector); if (!readHost.isOK()) { return readHost.getStatus(); @@ -1010,8 +1012,8 @@ Status CatalogManagerReplicaSet::_checkDbDoesNotExist(const string& dbName, Data << " have: " << actualDbName << " want to add: " << dbName); } -StatusWith<std::string> CatalogManagerReplicaSet::_generateNewShardName() { - const auto configShard = grid.shardRegistry()->getShard("config"); +StatusWith<std::string> CatalogManagerReplicaSet::_generateNewShardName(OperationContext* txn) { + const auto configShard = grid.shardRegistry()->getShard(txn, "config"); const auto readHost = configShard->getTargeter()->findHost(kConfigReadSelector); if (!readHost.isOK()) { return readHost.getStatus(); @@ -1083,7 +1085,7 @@ StatusWith<long long> CatalogManagerReplicaSet::_runCountCommandOnConfig(const H } Status CatalogManagerReplicaSet::initConfigVersion(OperationContext* txn) { - auto versionStatus = _getConfigVersion(); + auto versionStatus = _getConfigVersion(txn); if (!versionStatus.isOK()) { return versionStatus.getStatus(); } @@ -1130,8 +1132,8 @@ Status CatalogManagerReplicaSet::initConfigVersion(OperationContext* txn) { return Status::OK(); } -StatusWith<VersionType> CatalogManagerReplicaSet::_getConfigVersion() { - const auto configShard = grid.shardRegistry()->getShard("config"); +StatusWith<VersionType> CatalogManagerReplicaSet::_getConfigVersion(OperationContext* txn) { + const auto configShard = grid.shardRegistry()->getShard(txn, "config"); const auto readHostStatus = configShard->getTargeter()->findHost(kConfigReadSelector); if (!readHostStatus.isOK()) { return readHostStatus.getStatus(); @@ -1214,8 +1216,8 @@ StatusWith<BSONObj> CatalogManagerReplicaSet::_runCommandOnConfig(const HostAndP StatusWith<BSONObj> CatalogManagerReplicaSet::_runCommandOnConfigWithNotMasterRetries( const std::string& dbName, BSONObj cmdObj) { - auto result = grid.shardRegistry()->runCommandWithNotMasterRetries( - "config", dbName, cmdObj, kReplMetadata); + auto result = + grid.shardRegistry()->runCommandOnConfigWithNotMasterRetries(dbName, cmdObj, kReplMetadata); if (!result.isOK()) { return result.getStatus(); @@ -1270,11 +1272,12 @@ void CatalogManagerReplicaSet::_appendReadConcern(BSONObjBuilder* builder) { readConcern.appendInfo(builder); } -bool CatalogManagerReplicaSet::_runReadCommand(const std::string& dbname, +bool CatalogManagerReplicaSet::_runReadCommand(OperationContext* txn, + const std::string& dbname, const BSONObj& cmdObj, const ReadPreferenceSetting& settings, BSONObjBuilder* result) { - auto targeter = grid.shardRegistry()->getShard("config")->getTargeter(); + auto targeter = grid.shardRegistry()->getShard(txn, "config")->getTargeter(); auto target = targeter->findHost(settings); if (!target.isOK()) { return Command::appendCommandStatus(*result, target.getStatus()); diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set.h b/src/mongo/s/catalog/replset/catalog_manager_replica_set.h index 3a9b6c51171..faa603090eb 100644 --- a/src/mongo/s/catalog/replset/catalog_manager_replica_set.h +++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set.h @@ -98,7 +98,7 @@ public: const std::string& collectionNs, const ChunkType& chunk) override; - Status getAllShards(std::vector<ShardType>* shards) override; + Status getAllShards(OperationContext* txn, std::vector<ShardType>* shards) override; bool runUserManagementWriteCommand(OperationContext* txn, const std::string& commandName, @@ -140,11 +140,14 @@ public: Status initConfigVersion(OperationContext* txn) override; private: - Status _checkDbDoesNotExist(const std::string& dbName, DatabaseType* db) override; + Status _checkDbDoesNotExist(OperationContext* txn, + const std::string& dbName, + DatabaseType* db) override; - StatusWith<std::string> _generateNewShardName() override; + StatusWith<std::string> _generateNewShardName(OperationContext* txn) override; - bool _runReadCommand(const std::string& dbname, + bool _runReadCommand(OperationContext* txn, + const std::string& dbname, const BSONObj& cmdObj, const ReadPreferenceSetting& settings, BSONObjBuilder* result); @@ -179,7 +182,7 @@ private: /** * Returns the current cluster schema/protocol version. */ - StatusWith<VersionType> _getConfigVersion(); + StatusWith<VersionType> _getConfigVersion(OperationContext* txn); /** * Returns the highest last known config server opTime. diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set_drop_coll_test.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set_drop_coll_test.cpp index e2af5b8393f..1bc870462b1 100644 --- a/src/mongo/s/catalog/replset/catalog_manager_replica_set_drop_coll_test.cpp +++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set_drop_coll_test.cpp @@ -79,11 +79,11 @@ public: setupShards({_shard1, _shard2}); RemoteCommandTargeterMock* shard1Targeter = RemoteCommandTargeterMock::get( - shardRegistry()->getShard(_shard1.getName())->getTargeter()); + shardRegistry()->getShard(operationContext(), _shard1.getName())->getTargeter()); shard1Targeter->setFindHostReturnValue(HostAndPort(_shard1.getHost())); RemoteCommandTargeterMock* shard2Targeter = RemoteCommandTargeterMock::get( - shardRegistry()->getShard(_shard2.getName())->getTargeter()); + shardRegistry()->getShard(operationContext(), _shard2.getName())->getTargeter()); shard2Targeter->setFindHostReturnValue(HostAndPort(_shard2.getHost())); } @@ -293,7 +293,7 @@ TEST_F(DropColl2ShardTest, DistLockBusy) { TEST_F(DropColl2ShardTest, FirstShardTargeterError) { RemoteCommandTargeterMock* shard1Targeter = RemoteCommandTargeterMock::get( - shardRegistry()->getShard(shard1().getName())->getTargeter()); + shardRegistry()->getShard(operationContext(), shard1().getName())->getTargeter()); shard1Targeter->setFindHostReturnValue({ErrorCodes::HostUnreachable, "bad test network"}); auto future = launchAsync([this] { @@ -369,7 +369,7 @@ TEST_F(DropColl2ShardTest, FirstShardDropCmdError) { TEST_F(DropColl2ShardTest, SecondShardTargeterError) { RemoteCommandTargeterMock* shard2Targeter = RemoteCommandTargeterMock::get( - shardRegistry()->getShard(shard2().getHost())->getTargeter()); + shardRegistry()->getShard(operationContext(), shard2().getHost())->getTargeter()); shard2Targeter->setFindHostReturnValue({ErrorCodes::HostUnreachable, "bad test network"}); auto future = launchAsync([this] { diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set_shard_collection_test.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set_shard_collection_test.cpp index 64788d58b38..b31279c7411 100644 --- a/src/mongo/s/catalog/replset/catalog_manager_replica_set_shard_collection_test.cpp +++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set_shard_collection_test.cpp @@ -332,7 +332,8 @@ TEST_F(ShardCollectionTest, noInitialChunksOrData) { setupShards(vector<ShardType>{shard}); - RemoteCommandTargeterMock::get(grid.shardRegistry()->getShard(shard.getName())->getTargeter()) + RemoteCommandTargeterMock::get( + grid.shardRegistry()->getShard(operationContext(), shard.getName())->getTargeter()) ->setFindHostReturnValue(shardHost); string ns = "db1.foo"; @@ -459,11 +460,14 @@ TEST_F(ShardCollectionTest, withInitialChunks) { setupShards(vector<ShardType>{shard0, shard1, shard2}); - RemoteCommandTargeterMock::get(grid.shardRegistry()->getShard(shard0.getName())->getTargeter()) + RemoteCommandTargeterMock::get( + grid.shardRegistry()->getShard(operationContext(), shard0.getName())->getTargeter()) ->setFindHostReturnValue(shard0Host); - RemoteCommandTargeterMock::get(grid.shardRegistry()->getShard(shard1.getName())->getTargeter()) + RemoteCommandTargeterMock::get( + grid.shardRegistry()->getShard(operationContext(), shard1.getName())->getTargeter()) ->setFindHostReturnValue(shard1Host); - RemoteCommandTargeterMock::get(grid.shardRegistry()->getShard(shard2.getName())->getTargeter()) + RemoteCommandTargeterMock::get( + grid.shardRegistry()->getShard(operationContext(), shard2.getName())->getTargeter()) ->setFindHostReturnValue(shard2Host); string ns = "db1.foo"; @@ -628,7 +632,8 @@ TEST_F(ShardCollectionTest, withInitialData) { setupShards(vector<ShardType>{shard}); - RemoteCommandTargeterMock::get(grid.shardRegistry()->getShard(shard.getName())->getTargeter()) + RemoteCommandTargeterMock::get( + grid.shardRegistry()->getShard(operationContext(), shard.getName())->getTargeter()) ->setFindHostReturnValue(shardHost); string ns = "db1.foo"; diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set_test.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set_test.cpp index 3cbf3cc1cd9..805bc5fa737 100644 --- a/src/mongo/s/catalog/replset/catalog_manager_replica_set_test.cpp +++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set_test.cpp @@ -323,7 +323,7 @@ TEST_F(CatalogManagerReplSetTest, GetAllShardsValid) { auto future = launchAsync([this] { vector<ShardType> shards; - ASSERT_OK(catalogManager()->getAllShards(&shards)); + ASSERT_OK(catalogManager()->getAllShards(operationContext(), &shards)); return shards; }); @@ -358,7 +358,7 @@ TEST_F(CatalogManagerReplSetTest, GetAllShardsWithInvalidShard) { auto future = launchAsync([this] { vector<ShardType> shards; - Status status = catalogManager()->getAllShards(&shards); + Status status = catalogManager()->getAllShards(operationContext(), &shards); ASSERT_EQ(ErrorCodes::FailedToParse, status); ASSERT_EQ(0U, shards.size()); @@ -1496,7 +1496,7 @@ TEST_F(CatalogManagerReplSetTest, createDatabaseSuccess) { s2.setHost("ShardHost2:27017"); // Prime the shard registry with information about the existing shards - auto future = launchAsync([this] { shardRegistry()->reload(); }); + auto future = launchAsync([this] { shardRegistry()->reload(operationContext()); }); onFindCommand([&](const RemoteCommandRequest& request) { ASSERT_EQUALS(configHost, request.target); @@ -1518,11 +1518,14 @@ TEST_F(CatalogManagerReplSetTest, createDatabaseSuccess) { future.timed_get(kFutureTimeout); // Set up all the target mocks return values. - RemoteCommandTargeterMock::get(shardRegistry()->getShard(s0.getName())->getTargeter()) + RemoteCommandTargeterMock::get( + shardRegistry()->getShard(operationContext(), s0.getName())->getTargeter()) ->setFindHostReturnValue(HostAndPort(s0.getHost())); - RemoteCommandTargeterMock::get(shardRegistry()->getShard(s1.getName())->getTargeter()) + RemoteCommandTargeterMock::get( + shardRegistry()->getShard(operationContext(), s1.getName())->getTargeter()) ->setFindHostReturnValue(HostAndPort(s1.getHost())); - RemoteCommandTargeterMock::get(shardRegistry()->getShard(s2.getName())->getTargeter()) + RemoteCommandTargeterMock::get( + shardRegistry()->getShard(operationContext(), s2.getName())->getTargeter()) ->setFindHostReturnValue(HostAndPort(s2.getHost())); @@ -1781,7 +1784,7 @@ TEST_F(CatalogManagerReplSetTest, createDatabaseDuplicateKeyOnInsert) { s2.setHost("ShardHost2:27017"); // Prime the shard registry with information about the existing shards - auto future = launchAsync([this] { shardRegistry()->reload(); }); + auto future = launchAsync([this] { shardRegistry()->reload(operationContext()); }); onFindCommand([&](const RemoteCommandRequest& request) { ASSERT_EQUALS(configHost, request.target); @@ -1802,11 +1805,14 @@ TEST_F(CatalogManagerReplSetTest, createDatabaseDuplicateKeyOnInsert) { future.timed_get(kFutureTimeout); // Set up all the target mocks return values. - RemoteCommandTargeterMock::get(shardRegistry()->getShard(s0.getName())->getTargeter()) + RemoteCommandTargeterMock::get( + shardRegistry()->getShard(operationContext(), s0.getName())->getTargeter()) ->setFindHostReturnValue(HostAndPort(s0.getHost())); - RemoteCommandTargeterMock::get(shardRegistry()->getShard(s1.getName())->getTargeter()) + RemoteCommandTargeterMock::get( + shardRegistry()->getShard(operationContext(), s1.getName())->getTargeter()) ->setFindHostReturnValue(HostAndPort(s1.getHost())); - RemoteCommandTargeterMock::get(shardRegistry()->getShard(s2.getName())->getTargeter()) + RemoteCommandTargeterMock::get( + shardRegistry()->getShard(operationContext(), s2.getName())->getTargeter()) ->setFindHostReturnValue(HostAndPort(s2.getHost())); @@ -1917,8 +1923,8 @@ TEST_F(CatalogManagerReplSetTest, EnableShardingNoDBExists) { setupShards(vector<ShardType>{shard}); - RemoteCommandTargeterMock* shardTargeter = - RemoteCommandTargeterMock::get(shardRegistry()->getShard("shard0")->getTargeter()); + RemoteCommandTargeterMock* shardTargeter = RemoteCommandTargeterMock::get( + shardRegistry()->getShard(operationContext(), "shard0")->getTargeter()); shardTargeter->setFindHostReturnValue(HostAndPort("shard0:12")); distLock()->expectLock( diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set_test_fixture.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set_test_fixture.cpp index 4abfa250466..961ae5db7c2 100644 --- a/src/mongo/s/catalog/replset/catalog_manager_replica_set_test_fixture.cpp +++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set_test_fixture.cpp @@ -197,7 +197,7 @@ void CatalogManagerReplSetTestFixture::onFindWithMetadataCommand( } void CatalogManagerReplSetTestFixture::setupShards(const std::vector<ShardType>& shards) { - auto future = launchAsync([this] { shardRegistry()->reload(); }); + auto future = launchAsync([this] { shardRegistry()->reload(operationContext()); }); expectGetShards(shards); diff --git a/src/mongo/s/chunk.cpp b/src/mongo/s/chunk.cpp index ffe7f865650..8fba2d03de9 100644 --- a/src/mongo/s/chunk.cpp +++ b/src/mongo/s/chunk.cpp @@ -127,7 +127,7 @@ bool tryMoveToOtherShard(OperationContext* txn, log() << "moving chunk (auto): " << toMove->toString() << " to: " << newLocation; - shared_ptr<Shard> newShard = grid.shardRegistry()->getShard(newLocation); + shared_ptr<Shard> newShard = grid.shardRegistry()->getShard(txn, newLocation); if (!newShard) { warning() << "Newly selected shard " << newLocation << " could not be found."; return false; @@ -159,29 +159,24 @@ int Chunk::MaxObjectPerChunk = 250000; // Can be overridden from command line bool Chunk::ShouldAutoSplit = true; -Chunk::Chunk(const ChunkManager* manager, BSONObj from) +Chunk::Chunk(OperationContext* txn, const ChunkManager* manager, ChunkType from) : _manager(manager), _lastmod(0, 0, OID()), _dataWritten(mkDataWritten()) { - string ns = from.getStringField(ChunkType::ns().c_str()); - _shardId = from.getStringField(ChunkType::shard().c_str()); + string ns = from.getNS(); + _shardId = from.getShard(); - _lastmod = ChunkVersion::fromBSON(from[ChunkType::DEPRECATED_lastmod()]); + _lastmod = from.getVersion(); verify(_lastmod.isSet()); - _min = from.getObjectField(ChunkType::min().c_str()).getOwned(); - _max = from.getObjectField(ChunkType::max().c_str()).getOwned(); + _min = from.getMin().getOwned(); + _max = from.getMax().getOwned(); - _jumbo = from[ChunkType::jumbo()].trueValue(); + _jumbo = from.getJumbo(); uassert(10170, "Chunk needs a ns", !ns.empty()); uassert(13327, "Chunk ns must match server ns", ns == _manager->getns()); - - { - const auto shard = grid.shardRegistry()->getShard(_shardId); - uassert(10171, "Chunk needs a server", shard); - } - uassert(10172, "Chunk needs a min", !_min.isEmpty()); uassert(10173, "Chunk needs a max", !_max.isEmpty()); + uassert(10171, "Chunk needs a server", grid.shardRegistry()->getShard(txn, _shardId)); } Chunk::Chunk(const ChunkManager* info, @@ -219,7 +214,7 @@ bool Chunk::_maxIsInf() const { return 0 == _manager->getShardKeyPattern().getKeyPattern().globalMax().woCompare(getMax()); } -BSONObj Chunk::_getExtremeKey(bool doSplitAtLower) const { +BSONObj Chunk::_getExtremeKey(OperationContext* txn, bool doSplitAtLower) const { Query q; if (doSplitAtLower) { q.sort(_manager->getShardKeyPattern().toBSON()); @@ -241,7 +236,7 @@ BSONObj Chunk::_getExtremeKey(bool doSplitAtLower) const { } // find the extreme key - ScopedDbConnection conn(_getShardConnectionString()); + ScopedDbConnection conn(_getShardConnectionString(txn)); BSONObj end; if (doSplitAtLower) { @@ -271,9 +266,9 @@ BSONObj Chunk::_getExtremeKey(bool doSplitAtLower) const { return _manager->getShardKeyPattern().extractShardKeyFromDoc(end); } -void Chunk::pickMedianKey(BSONObj& medianKey) const { +void Chunk::pickMedianKey(OperationContext* txn, BSONObj& medianKey) const { // Ask the mongod holding this chunk to figure out the split points. - ScopedDbConnection conn(_getShardConnectionString()); + ScopedDbConnection conn(_getShardConnectionString(txn)); BSONObj result; BSONObjBuilder cmd; cmd.append("splitVector", _manager->getns()); @@ -298,7 +293,8 @@ void Chunk::pickMedianKey(BSONObj& medianKey) const { conn.done(); } -void Chunk::pickSplitVector(vector<BSONObj>& splitPoints, +void Chunk::pickSplitVector(OperationContext* txn, + vector<BSONObj>& splitPoints, long long chunkSize /* bytes */, int maxPoints, int maxObjs) const { @@ -312,7 +308,7 @@ void Chunk::pickSplitVector(vector<BSONObj>& splitPoints, cmd.append("maxChunkObjects", maxObjs); BSONObj cmdObj = cmd.obj(); - const auto primaryShard = grid.shardRegistry()->getShard(getShardId()); + const auto primaryShard = grid.shardRegistry()->getShard(txn, getShardId()); auto targetStatus = primaryShard->getTargeter()->findHost({ReadPreference::PrimaryPreferred, TagSet{}}); uassertStatusOK(targetStatus); @@ -328,13 +324,15 @@ void Chunk::pickSplitVector(vector<BSONObj>& splitPoints, } } -void Chunk::determineSplitPoints(bool atMedian, vector<BSONObj>* splitPoints) const { +void Chunk::determineSplitPoints(OperationContext* txn, + bool atMedian, + vector<BSONObj>* splitPoints) const { // if splitting is not obligatory we may return early if there are not enough data // we cap the number of objects that would fall in the first half (before the split point) // the rationale is we'll find a split point without traversing all the data if (atMedian) { BSONObj medianKey; - pickMedianKey(medianKey); + pickMedianKey(txn, medianKey); if (!medianKey.isEmpty()) splitPoints->push_back(medianKey); } else { @@ -350,7 +348,7 @@ void Chunk::determineSplitPoints(bool atMedian, vector<BSONObj>* splitPoints) co chunkSize = std::min(_dataWritten, Chunk::MaxChunkSize); } - pickSplitVector(*splitPoints, chunkSize, 0, MaxObjectPerChunk); + pickSplitVector(txn, *splitPoints, chunkSize, 0, MaxObjectPerChunk); if (splitPoints->size() <= 1) { // no split points means there isn't enough data to split on @@ -373,7 +371,7 @@ Status Chunk::split(OperationContext* txn, bool atMedian = mode == Chunk::atMedian; vector<BSONObj> splitPoints; - determineSplitPoints(atMedian, &splitPoints); + determineSplitPoints(txn, atMedian, &splitPoints); if (splitPoints.empty()) { string msg; if (atMedian) { @@ -394,12 +392,12 @@ Status Chunk::split(OperationContext* txn, if (mode == Chunk::autoSplitInternal && KeyPattern::isOrderedKeyPattern(_manager->getShardKeyPattern().toBSON())) { if (_minIsInf()) { - BSONObj key = _getExtremeKey(true); + BSONObj key = _getExtremeKey(txn, true); if (!key.isEmpty()) { splitPoints[0] = key.getOwned(); } } else if (_maxIsInf()) { - BSONObj key = _getExtremeKey(false); + BSONObj key = _getExtremeKey(txn, false); if (!key.isEmpty()) { splitPoints.pop_back(); splitPoints.push_back(key); @@ -436,7 +434,7 @@ Status Chunk::multiSplit(OperationContext* txn, const vector<BSONObj>& m, BSONOb uassert(13333, "can't split a chunk in that many parts", m.size() < maxSplitPoints); uassert(13003, "can't split a chunk with only one distinct value", _min.woCompare(_max)); - ScopedDbConnection conn(_getShardConnectionString()); + ScopedDbConnection conn(_getShardConnectionString(txn)); BSONObjBuilder cmd; cmd.append("splitChunk", _manager->getns()); @@ -482,13 +480,13 @@ bool Chunk::moveAndCommit(OperationContext* txn, log() << "moving chunk ns: " << _manager->getns() << " moving ( " << toString() << ") " << getShardId() << " -> " << toShardId; - const auto from = grid.shardRegistry()->getShard(getShardId()); + const auto from = grid.shardRegistry()->getShard(txn, getShardId()); BSONObjBuilder builder; builder.append("moveChunk", _manager->getns()); builder.append("from", from->getConnString().toString()); { - const auto toShard = grid.shardRegistry()->getShard(toShardId); + const auto toShard = grid.shardRegistry()->getShard(txn, toShardId); builder.append("to", toShard->getConnString().toString()); } // NEEDED FOR 2.0 COMPATIBILITY @@ -605,7 +603,7 @@ bool Chunk::splitIfShould(OperationContext* txn, long dataWritten) const { ChunkType chunkToMove; { - const auto shard = grid.shardRegistry()->getShard(getShardId()); + const auto shard = grid.shardRegistry()->getShard(txn, getShardId()); chunkToMove.setShard(shard->toString()); } chunkToMove.setMin(range["min"].embeddedObject()); @@ -627,28 +625,11 @@ bool Chunk::splitIfShould(OperationContext* txn, long dataWritten) const { } } -std::string Chunk::_getShardConnectionString() const { - const auto shard = grid.shardRegistry()->getShard(getShardId()); +std::string Chunk::_getShardConnectionString(OperationContext* txn) const { + const auto shard = grid.shardRegistry()->getShard(txn, getShardId()); return shard->getConnString().toString(); } -long Chunk::getPhysicalSize() const { - ScopedDbConnection conn(_getShardConnectionString()); - - BSONObj result; - uassert(10169, - "datasize failed!", - conn->runCommand("admin", - BSON("datasize" << _manager->getns() << "keyPattern" - << _manager->getShardKeyPattern().toBSON() << "min" - << getMin() << "max" << getMax() << "maxSize" - << (MaxChunkSize + 1) << "estimate" << true), - result)); - - conn.done(); - return (long)result["size"].number(); -} - void Chunk::appendShortVersion(const char* name, BSONObjBuilder& b) const { BSONObjBuilder bb(b.subobjStart(name)); bb.append(ChunkType::min(), _min); diff --git a/src/mongo/s/chunk.h b/src/mongo/s/chunk.h index 1ec9a9447cb..ef4b619b342 100644 --- a/src/mongo/s/chunk.h +++ b/src/mongo/s/chunk.h @@ -28,6 +28,7 @@ #pragma once +#include "mongo/s/catalog/type_chunk.h" #include "mongo/s/chunk_version.h" #include "mongo/s/client/shard.h" @@ -62,7 +63,7 @@ public: autoSplitInternal }; - Chunk(const ChunkManager* info, BSONObj from); + Chunk(OperationContext* txn, const ChunkManager* info, ChunkType from); Chunk(const ChunkManager* info, const BSONObj& min, const BSONObj& max, @@ -161,7 +162,7 @@ public: * * @param medianKey the key that divides this chunk, if there is one, or empty */ - void pickMedianKey(BSONObj& medianKey) const; + void pickMedianKey(OperationContext* txn, BSONObj& medianKey) const; /** * Ask the mongod holding this chunk to figure out the split points. @@ -170,7 +171,8 @@ public: * @param maxPoints limits the number of split points that are needed, zero is max (optional) * @param maxObjs limits the number of objects in each chunk, zero is as max (optional) */ - void pickSplitVector(std::vector<BSONObj>& splitPoints, + void pickSplitVector(OperationContext* txn, + std::vector<BSONObj>& splitPoints, long long chunkSize, int maxPoints = 0, int maxObjs = 0) const; @@ -199,12 +201,6 @@ public: BSONObj& res) const; /** - * @return size of shard in bytes - * talks to mongod to do this - */ - long getPhysicalSize() const; - - /** * marks this chunk as a jumbo chunk * that means the chunk will be inelligble for migrates */ @@ -261,7 +257,7 @@ private: /** * Returns the connection string for the shard on which this chunk resides. */ - std::string _getShardConnectionString() const; + std::string _getShardConnectionString(OperationContext* txn) const; // if min/max key is pos/neg infinity bool _minIsInf() const; @@ -293,7 +289,7 @@ private: * is simply an ordered list of ascending/descending field names. Examples: * {a : 1, b : -1} is not special. {a : "hashed"} is. */ - BSONObj _getExtremeKey(bool doSplitAtLower) const; + BSONObj _getExtremeKey(OperationContext* txn, bool doSplitAtLower) const; /** * Determines the appropriate split points for this chunk. @@ -301,7 +297,9 @@ private: * @param atMedian perform a single split at the middle of this chunk. * @param splitPoints out parameter containing the chosen split points. Can be empty. */ - void determineSplitPoints(bool atMedian, std::vector<BSONObj>* splitPoints) const; + void determineSplitPoints(OperationContext* txn, + bool atMedian, + std::vector<BSONObj>* splitPoints) const; /** * initializes _dataWritten with a random value so that a mongos restart diff --git a/src/mongo/s/chunk_diff.cpp b/src/mongo/s/chunk_diff.cpp index e3b0f9fa2f0..8cdfaf52475 100644 --- a/src/mongo/s/chunk_diff.cpp +++ b/src/mongo/s/chunk_diff.cpp @@ -109,7 +109,8 @@ typename ConfigDiffTracker<ValType>::RangeOverlap ConfigDiffTracker<ValType>::ov } template <class ValType> -int ConfigDiffTracker<ValType>::calculateConfigDiff(const std::vector<ChunkType>& chunks) { +int ConfigDiffTracker<ValType>::calculateConfigDiff(OperationContext* txn, + const std::vector<ChunkType>& chunks) { _assertAttached(); // Apply the chunk changes to the ranges and versions @@ -149,7 +150,7 @@ int ConfigDiffTracker<ValType>::calculateConfigDiff(const std::vector<ChunkType> } // Chunk version changes - ShardId shard = shardFor(chunk.getShard()); + ShardId shard = shardFor(txn, chunk.getShard()); typename MaxChunkVersionMap::const_iterator shardVersionIt = _maxShardVersions->find(shard); if (shardVersionIt == _maxShardVersions->end() || shardVersionIt->second < chunkVersion) { @@ -181,7 +182,7 @@ int ConfigDiffTracker<ValType>::calculateConfigDiff(const std::vector<ChunkType> return -1; } - _currMap->insert(rangeFor(chunk)); + _currMap->insert(rangeFor(txn, chunk)); } return _validDiffs; diff --git a/src/mongo/s/chunk_diff.h b/src/mongo/s/chunk_diff.h index ce53a95a043..cf47c34eb41 100644 --- a/src/mongo/s/chunk_diff.h +++ b/src/mongo/s/chunk_diff.h @@ -38,6 +38,7 @@ namespace mongo { class ChunkType; struct ChunkVersion; +class OperationContext; class ConfigDiffTrackerBase { public: @@ -113,7 +114,7 @@ public: // Applies changes to the config data from a vector of chunks passed in. Also includes minor // version changes for particular major-version chunks if explicitly specified. // Returns the number of diffs processed, or -1 if the diffs were inconsistent. - int calculateConfigDiff(const std::vector<ChunkType>& chunks); + int calculateConfigDiff(OperationContext* txn, const std::vector<ChunkType>& chunks); // Returns the query needed to find new changes to a collection from the config server // Needed only if a custom connection is required to the config server @@ -133,9 +134,10 @@ protected: return true; } - virtual std::pair<BSONObj, ValType> rangeFor(const ChunkType& chunk) const = 0; + virtual std::pair<BSONObj, ValType> rangeFor(OperationContext* txn, + const ChunkType& chunk) const = 0; - virtual ShardId shardFor(const std::string& name) const = 0; + virtual ShardId shardFor(OperationContext* txn, const std::string& name) const = 0; private: void _assertAttached() const; diff --git a/src/mongo/s/chunk_diff_test.cpp b/src/mongo/s/chunk_diff_test.cpp index 76df3e999a6..28dd7e63f1e 100644 --- a/src/mongo/s/chunk_diff_test.cpp +++ b/src/mongo/s/chunk_diff_test.cpp @@ -33,6 +33,7 @@ #include <utility> #include "mongo/db/jsobj.h" +#include "mongo/db/operation_context_noop.h" #include "mongo/platform/random.h" #include "mongo/s/catalog/type_chunk.h" #include "mongo/s/chunk_diff.h" @@ -61,11 +62,11 @@ public: return true; } - virtual pair<BSONObj, BSONObj> rangeFor(const ChunkType& chunk) const { + virtual pair<BSONObj, BSONObj> rangeFor(OperationContext* txn, const ChunkType& chunk) const { return make_pair(chunk.getMin(), chunk.getMax()); } - virtual ShardId shardFor(const string& name) const { + virtual ShardId shardFor(OperationContext* txn, const string& name) const { return name; } }; @@ -82,7 +83,7 @@ public: return false; } - virtual pair<BSONObj, BSONObj> rangeFor(const ChunkType& chunk) const { + virtual pair<BSONObj, BSONObj> rangeFor(OperationContext* txn, const ChunkType& chunk) const { return make_pair(chunk.getMax(), chunk.getMin()); } }; @@ -108,6 +109,7 @@ protected: ~ChunkDiffUnitTest() = default; void runTest(bool isInverse) { + OperationContextNoop txn; int numShards = 10; int numInitialChunks = 5; @@ -173,7 +175,7 @@ protected: convertBSONArrayToChunkTypes(chunks, &chunksVector); // Validate initial load - differ->calculateConfigDiff(chunksVector); + differ->calculateConfigDiff(&txn, chunksVector); validate(isInverse, chunksVector, ranges, maxVersion, maxShardVersions); // Generate a lot of diffs, and keep validating that updating from the diffs always gives us @@ -328,7 +330,7 @@ protected: std::vector<ChunkType> chunksVector; convertBSONArrayToChunkTypes(chunks, &chunksVector); - differ->calculateConfigDiff(chunksVector); + differ->calculateConfigDiff(&txn, chunksVector); validate(isInverse, chunksVector, ranges, maxVersion, maxShardVersions); } diff --git a/src/mongo/s/chunk_manager.cpp b/src/mongo/s/chunk_manager.cpp index e9465705e3a..8d01285dadc 100644 --- a/src/mongo/s/chunk_manager.cpp +++ b/src/mongo/s/chunk_manager.cpp @@ -89,13 +89,14 @@ public: return false; } - pair<BSONObj, shared_ptr<Chunk>> rangeFor(const ChunkType& chunk) const final { - shared_ptr<Chunk> c(new Chunk(_manager, chunk.toBSON())); + pair<BSONObj, shared_ptr<Chunk>> rangeFor(OperationContext* txn, + const ChunkType& chunk) const final { + shared_ptr<Chunk> c(new Chunk(txn, _manager, chunk)); return make_pair(chunk.getMax(), c); } - string shardFor(const string& hostName) const final { - const auto shard = grid.shardRegistry()->getShard(hostName); + string shardFor(OperationContext* txn, const string& hostName) const final { + const auto shard = grid.shardRegistry()->getShard(txn, hostName); return shard->getId(); } @@ -269,14 +270,14 @@ bool ChunkManager::_load(OperationContext* txn, invariant(opTime >= _configOpTime); _configOpTime = opTime; - int diffsApplied = differ.calculateConfigDiff(chunks); + int diffsApplied = differ.calculateConfigDiff(txn, chunks); if (diffsApplied > 0) { LOG(2) << "loaded " << diffsApplied << " chunks into new chunk manager for " << _ns << " with version " << _version; // Add all existing shards we find to the shards set for (ShardVersionMap::iterator it = shardVersions->begin(); it != shardVersions->end();) { - shared_ptr<Shard> shard = grid.shardRegistry()->getShard(it->first); + shared_ptr<Shard> shard = grid.shardRegistry()->getShard(txn, it->first); if (shard) { shardIds.insert(it->first); ++it; @@ -339,7 +340,8 @@ void ChunkManager::_printChunks() const { } } -void ChunkManager::calcInitSplitsAndShards(const ShardId& primaryShardId, +void ChunkManager::calcInitSplitsAndShards(OperationContext* txn, + const ShardId& primaryShardId, const vector<BSONObj>* initPoints, const set<ShardId>* initShardIds, vector<BSONObj>* splitPoints, @@ -353,7 +355,7 @@ void ChunkManager::calcInitSplitsAndShards(const ShardId& primaryShardId, if (!initPoints || !initPoints->size()) { // discover split points - const auto primaryShard = grid.shardRegistry()->getShard(primaryShardId); + const auto primaryShard = grid.shardRegistry()->getShard(txn, primaryShardId); auto targetStatus = primaryShard->getTargeter()->findHost({ReadPreference::PrimaryPreferred, TagSet{}}); uassertStatusOK(targetStatus); @@ -368,7 +370,7 @@ void ChunkManager::calcInitSplitsAndShards(const ShardId& primaryShardId, uassertStatusOK(bsonExtractIntegerField(result.getValue(), "n", &numObjects)); if (numObjects > 0) - c.pickSplitVector(*splitPoints, Chunk::MaxChunkSize); + c.pickSplitVector(txn, *splitPoints, Chunk::MaxChunkSize); // since docs already exists, must use primary shard shardIds->push_back(primaryShardId); @@ -403,7 +405,7 @@ void ChunkManager::createFirstChunks(OperationContext* txn, vector<BSONObj> splitPoints; vector<ShardId> shardIds; - calcInitSplitsAndShards(primaryShardId, initPoints, initShardIds, &splitPoints, &shardIds); + calcInitSplitsAndShards(txn, primaryShardId, initPoints, initShardIds, &splitPoints, &shardIds); // this is the first chunk; start the versioning from scratch diff --git a/src/mongo/s/chunk_manager.h b/src/mongo/s/chunk_manager.h index c640bf26ed9..a87d70f2fed 100644 --- a/src/mongo/s/chunk_manager.h +++ b/src/mongo/s/chunk_manager.h @@ -171,7 +171,8 @@ public: // Helpers for load - void calcInitSplitsAndShards(const ShardId& primaryShardId, + void calcInitSplitsAndShards(OperationContext* txn, + const ShardId& primaryShardId, const std::vector<BSONObj>* initPoints, const std::set<ShardId>* initShardIds, std::vector<BSONObj>* splitPoints, diff --git a/src/mongo/s/chunk_manager_targeter.cpp b/src/mongo/s/chunk_manager_targeter.cpp index a3de1e2235e..320ba5a4fb8 100644 --- a/src/mongo/s/chunk_manager_targeter.cpp +++ b/src/mongo/s/chunk_manager_targeter.cpp @@ -273,7 +273,7 @@ Status ChunkManagerTargeter::init(OperationContext* txn) { } shared_ptr<DBConfig> config = status.getValue(); - config->getChunkManagerOrPrimary(_nss.ns(), _manager, _primary); + config->getChunkManagerOrPrimary(txn, _nss.ns(), _manager, _primary); return Status::OK(); } @@ -642,7 +642,7 @@ Status ChunkManagerTargeter::refreshIfNeeded(OperationContext* txn, bool* wasCha } shared_ptr<DBConfig> config = status.getValue(); - config->getChunkManagerOrPrimary(_nss.ns(), _manager, _primary); + config->getChunkManagerOrPrimary(txn, _nss.ns(), _manager, _primary); // We now have the latest metadata from the cache. @@ -716,7 +716,7 @@ Status ChunkManagerTargeter::refreshNow(OperationContext* txn, RefreshType refre } catch (const DBException& ex) { return Status(ErrorCodes::UnknownError, ex.toString()); } - config->getChunkManagerOrPrimary(_nss.ns(), _manager, _primary); + config->getChunkManagerOrPrimary(txn, _nss.ns(), _manager, _primary); } else if (refreshType == RefreshType_ReloadDatabase) { try { // Dumps the db info, reloads it all, synchronization between threads happens @@ -727,7 +727,7 @@ Status ChunkManagerTargeter::refreshNow(OperationContext* txn, RefreshType refre return Status(ErrorCodes::UnknownError, ex.toString()); } - config->getChunkManagerOrPrimary(_nss.ns(), _manager, _primary); + config->getChunkManagerOrPrimary(txn, _nss.ns(), _manager, _primary); } return Status::OK(); diff --git a/src/mongo/s/client/shard_connection.cpp b/src/mongo/s/client/shard_connection.cpp index 3850e6d747f..9be2f45a028 100644 --- a/src/mongo/s/client/shard_connection.cpp +++ b/src/mongo/s/client/shard_connection.cpp @@ -288,7 +288,7 @@ public: // Now only check top-level shard connections for (const ShardId& shardId : all) { try { - const auto shard = grid.shardRegistry()->getShard(shardId); + const auto shard = grid.shardRegistry()->getShard(txn, shardId); if (!shard) { continue; } diff --git a/src/mongo/s/client/shard_registry.cpp b/src/mongo/s/client/shard_registry.cpp index 0397d44d8c9..69c2c6448e6 100644 --- a/src/mongo/s/client/shard_registry.cpp +++ b/src/mongo/s/client/shard_registry.cpp @@ -94,9 +94,9 @@ void ShardRegistry::shutdown() { _executor->join(); } -void ShardRegistry::reload() { +void ShardRegistry::reload(OperationContext* txn) { vector<ShardType> shards; - Status status = grid.catalogManager()->getAllShards(&shards); + Status status = grid.catalogManager()->getAllShards(txn, &shards); uassert(13632, str::stream() << "could not get updated shard list from config server due to " << status.toString(), @@ -126,18 +126,24 @@ void ShardRegistry::reload() { } } -shared_ptr<Shard> ShardRegistry::getShard(const ShardId& shardId) { +shared_ptr<Shard> ShardRegistry::getShard(OperationContext* txn, const ShardId& shardId) { shared_ptr<Shard> shard = _findUsingLookUp(shardId); if (shard) { return shard; } // If we can't find the shard, we might just need to reload the cache - reload(); + reload(txn); return _findUsingLookUp(shardId); } +shared_ptr<Shard> ShardRegistry::getConfigShard() { + shared_ptr<Shard> shard = _findUsingLookUp("config"); + invariant(shard); + return shard; +} + unique_ptr<Shard> ShardRegistry::createConnection(const ConnectionString& connStr) const { return stdx::make_unique<Shard>("<unnamed>", connStr, _targeterFactory->create(connStr)); } @@ -404,10 +410,29 @@ StatusWith<ShardRegistry::CommandResponse> ShardRegistry::runCommandWithMetadata return cmdResponse; } -StatusWith<BSONObj> ShardRegistry::runCommandWithNotMasterRetries(const ShardId& shardId, +StatusWith<BSONObj> ShardRegistry::runCommandOnConfigWithNotMasterRetries(const std::string& dbname, + const BSONObj& cmdObj) { + auto status = runCommandOnConfigWithNotMasterRetries(dbname, cmdObj, rpc::makeEmptyMetadata()); + + if (!status.isOK()) { + return status.getStatus(); + } + + return status.getValue().response; +} + +StatusWith<ShardRegistry::CommandResponse> ShardRegistry::runCommandOnConfigWithNotMasterRetries( + const std::string& dbname, const BSONObj& cmdObj, const BSONObj& metadata) { + auto configShard = getConfigShard(); + return _runCommandWithNotMasterRetries(configShard->getTargeter(), dbname, cmdObj, metadata); +} + +StatusWith<BSONObj> ShardRegistry::runCommandWithNotMasterRetries(OperationContext* txn, + const ShardId& shardId, const std::string& dbname, const BSONObj& cmdObj) { - auto status = runCommandWithNotMasterRetries(shardId, dbname, cmdObj, rpc::makeEmptyMetadata()); + auto status = + runCommandWithNotMasterRetries(txn, shardId, dbname, cmdObj, rpc::makeEmptyMetadata()); if (!status.isOK()) { return status.getStatus(); @@ -417,11 +442,20 @@ StatusWith<BSONObj> ShardRegistry::runCommandWithNotMasterRetries(const ShardId& } StatusWith<ShardRegistry::CommandResponse> ShardRegistry::runCommandWithNotMasterRetries( + OperationContext* txn, const ShardId& shardId, const std::string& dbname, const BSONObj& cmdObj, const BSONObj& metadata) { - auto targeter = getShard(shardId)->getTargeter(); + auto shard = getShard(txn, shardId); + return _runCommandWithNotMasterRetries(shard->getTargeter(), dbname, cmdObj, metadata); +} + +StatusWith<ShardRegistry::CommandResponse> ShardRegistry::_runCommandWithNotMasterRetries( + RemoteCommandTargeter* targeter, + const std::string& dbname, + const BSONObj& cmdObj, + const BSONObj& metadata) { const ReadPreferenceSetting readPref(ReadPreference::PrimaryOnly, TagSet{}); for (int i = 0; i < kNotMasterNumRetries; ++i) { diff --git a/src/mongo/s/client/shard_registry.h b/src/mongo/s/client/shard_registry.h index 5f1349de7f8..38cdae15cbc 100644 --- a/src/mongo/s/client/shard_registry.h +++ b/src/mongo/s/client/shard_registry.h @@ -46,6 +46,7 @@ class BSONObjBuilder; class CatalogManager; struct HostAndPort; class NamespaceString; +class OperationContext; class RemoteCommandTargeterFactory; class Shard; class ShardType; @@ -123,12 +124,17 @@ public: return _configServerCS; } - void reload(); + void reload(OperationContext* txn); /** * Returns shared pointer to shard object with given shard id. */ - std::shared_ptr<Shard> getShard(const ShardId& shardId); + std::shared_ptr<Shard> getShard(OperationContext* txn, const ShardId& shardId); + + /** + * Returns shared pointer to the shard object representing the config servers. + */ + std::shared_ptr<Shard> getConfigShard(); /** * Instantiates a new detached shard connection, which does not appear in the list of shards @@ -196,15 +202,24 @@ public: * response object for any kind of command-specific failure. The only exception is * NotMaster errors, which we intercept and follow the rules described above for handling. */ - StatusWith<BSONObj> runCommandWithNotMasterRetries(const ShardId& shard, + StatusWith<BSONObj> runCommandWithNotMasterRetries(OperationContext* txn, + const ShardId& shard, const std::string& dbname, const BSONObj& cmdObj); - StatusWith<CommandResponse> runCommandWithNotMasterRetries(const ShardId& shardId, + StatusWith<CommandResponse> runCommandWithNotMasterRetries(OperationContext* txn, + const ShardId& shardId, const std::string& dbname, const BSONObj& cmdObj, const BSONObj& metadata); + StatusWith<BSONObj> runCommandOnConfigWithNotMasterRetries(const std::string& dbname, + const BSONObj& cmdObj); + + StatusWith<CommandResponse> runCommandOnConfigWithNotMasterRetries(const std::string& dbname, + const BSONObj& cmdObj, + const BSONObj& metadata); + private: typedef std::map<ShardId, std::shared_ptr<Shard>> ShardMap; @@ -220,6 +235,11 @@ private: std::shared_ptr<Shard> _findUsingLookUp(const ShardId& shardId); + StatusWith<CommandResponse> _runCommandWithNotMasterRetries(RemoteCommandTargeter* targeter, + const std::string& dbname, + const BSONObj& cmdObj, + const BSONObj& metadata); + // Factory to obtain remote command targeters for shards const std::unique_ptr<RemoteCommandTargeterFactory> _targeterFactory; diff --git a/src/mongo/s/cluster_explain.cpp b/src/mongo/s/cluster_explain.cpp index 6f6ae2cb95e..cb6143e6231 100644 --- a/src/mongo/s/cluster_explain.cpp +++ b/src/mongo/s/cluster_explain.cpp @@ -191,7 +191,8 @@ const char* ClusterExplain::getStageNameForReadOp( } // static -void ClusterExplain::buildPlannerInfo(const vector<Strategy::CommandResult>& shardResults, +void ClusterExplain::buildPlannerInfo(OperationContext* txn, + const vector<Strategy::CommandResult>& shardResults, const char* mongosStageName, BSONObjBuilder* out) { BSONObjBuilder queryPlannerBob(out->subobjStart("queryPlanner")); @@ -209,7 +210,7 @@ void ClusterExplain::buildPlannerInfo(const vector<Strategy::CommandResult>& sha singleShardBob.append("shardName", shardResults[i].shardTargetId); { - const auto shard = grid.shardRegistry()->getShard(shardResults[i].shardTargetId); + const auto shard = grid.shardRegistry()->getShard(txn, shardResults[i].shardTargetId); singleShardBob.append("connectionString", shard->getConnString().toString()); } appendIfRoom(&singleShardBob, serverInfo, "serverInfo"); @@ -331,7 +332,8 @@ void ClusterExplain::buildExecStats(const vector<Strategy::CommandResult>& shard } // static -Status ClusterExplain::buildExplainResult(const vector<Strategy::CommandResult>& shardResults, +Status ClusterExplain::buildExplainResult(OperationContext* txn, + const vector<Strategy::CommandResult>& shardResults, const char* mongosStageName, long long millisElapsed, BSONObjBuilder* out) { @@ -341,7 +343,7 @@ Status ClusterExplain::buildExplainResult(const vector<Strategy::CommandResult>& return validateStatus; } - buildPlannerInfo(shardResults, mongosStageName, out); + buildPlannerInfo(txn, shardResults, mongosStageName, out); buildExecStats(shardResults, mongosStageName, millisElapsed, out); return Status::OK(); diff --git a/src/mongo/s/cluster_explain.h b/src/mongo/s/cluster_explain.h index dad3b25c5e7..6023a1db4f7 100644 --- a/src/mongo/s/cluster_explain.h +++ b/src/mongo/s/cluster_explain.h @@ -36,6 +36,8 @@ namespace mongo { +class OperationContext; + /** * Namespace for the collection of static methods used by commands in the implementation of * explain on mongos. @@ -67,7 +69,8 @@ public: * * On success, the output is added to the BSONObj builder 'out'. */ - static Status buildExplainResult(const std::vector<Strategy::CommandResult>& shardResults, + static Status buildExplainResult(OperationContext* txn, + const std::vector<Strategy::CommandResult>& shardResults, const char* mongosStageName, long long millisElapsed, BSONObjBuilder* out); @@ -95,7 +98,8 @@ private: * The planner info will display 'mongosStageName' as the name of the execution stage * performed by mongos after gathering results from the shards. */ - static void buildPlannerInfo(const std::vector<Strategy::CommandResult>& shardResults, + static void buildPlannerInfo(OperationContext* txn, + const std::vector<Strategy::CommandResult>& shardResults, const char* mongosStageName, BSONObjBuilder* out); diff --git a/src/mongo/s/cluster_write.cpp b/src/mongo/s/cluster_write.cpp index 4660e8dbabd..5bc28be8fb3 100644 --- a/src/mongo/s/cluster_write.cpp +++ b/src/mongo/s/cluster_write.cpp @@ -127,7 +127,7 @@ void splitIfNeeded(OperationContext* txn, const NamespaceString& nss, const Targ ChunkManagerPtr chunkManager; shared_ptr<Shard> dummyShard; - config->getChunkManagerOrPrimary(nss.ns(), chunkManager, dummyShard); + config->getChunkManagerOrPrimary(txn, nss.ns(), chunkManager, dummyShard); if (!chunkManager) { return; diff --git a/src/mongo/s/commands/cluster_count_cmd.cpp b/src/mongo/s/commands/cluster_count_cmd.cpp index 3253afc6ba2..4c3146d9659 100644 --- a/src/mongo/s/commands/cluster_count_cmd.cpp +++ b/src/mongo/s/commands/cluster_count_cmd.cpp @@ -221,7 +221,7 @@ public: const char* mongosStageName = ClusterExplain::getStageNameForReadOp(shardResults, cmdObj); return ClusterExplain::buildExplainResult( - shardResults, mongosStageName, millisElapsed, out); + txn, shardResults, mongosStageName, millisElapsed, out); } } clusterCountCmd; diff --git a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp index fe9ab7787d5..120af9b345b 100644 --- a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp +++ b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp @@ -90,7 +90,7 @@ public: shared_ptr<Shard> shard; if (!conf->isShardingEnabled() || !conf->isSharded(ns)) { - shard = grid.shardRegistry()->getShard(conf->getPrimaryId()); + shard = grid.shardRegistry()->getShard(txn, conf->getPrimaryId()); } else { shared_ptr<ChunkManager> chunkMgr = _getChunkManager(txn, conf, ns); @@ -104,7 +104,7 @@ public: BSONObj shardKey = status.getValue(); ChunkPtr chunk = chunkMgr->findIntersectingChunk(txn, shardKey); - shard = grid.shardRegistry()->getShard(chunk->getShardId()); + shard = grid.shardRegistry()->getShard(txn, chunk->getShardId()); } BSONObjBuilder explainCmd; @@ -114,7 +114,7 @@ public: Timer timer; BSONObjBuilder result; - bool ok = _runCommand(conf, shard->getId(), ns, explainCmd.obj(), result); + bool ok = _runCommand(txn, conf, shard->getId(), ns, explainCmd.obj(), result); long long millisElapsed = timer.millis(); if (!ok) { @@ -132,7 +132,7 @@ public: shardResults.push_back(cmdResult); return ClusterExplain::buildExplainResult( - shardResults, ClusterExplain::kSingleShard, millisElapsed, out); + txn, shardResults, ClusterExplain::kSingleShard, millisElapsed, out); } virtual bool run(OperationContext* txn, @@ -147,7 +147,7 @@ public: // require that the parsing be pulled into this function. auto conf = uassertStatusOK(grid.implicitCreateDb(txn, dbName)); if (!conf->isShardingEnabled() || !conf->isSharded(ns)) { - return _runCommand(conf, conf->getPrimaryId(), ns, cmdObj, result); + return _runCommand(txn, conf, conf->getPrimaryId(), ns, cmdObj, result); } shared_ptr<ChunkManager> chunkMgr = _getChunkManager(txn, conf, ns); @@ -163,7 +163,7 @@ public: BSONObj shardKey = status.getValue(); ChunkPtr chunk = chunkMgr->findIntersectingChunk(txn, shardKey); - bool ok = _runCommand(conf, chunk->getShardId(), ns, cmdObj, result); + bool ok = _runCommand(txn, conf, chunk->getShardId(), ns, cmdObj, result); if (ok) { // check whether split is necessary (using update object for size heuristic) if (Chunk::ShouldAutoSplit) { @@ -203,14 +203,15 @@ private: return shardKey; } - bool _runCommand(shared_ptr<DBConfig> conf, + bool _runCommand(OperationContext* txn, + shared_ptr<DBConfig> conf, const ShardId& shardId, const string& ns, const BSONObj& cmdObj, BSONObjBuilder& result) const { BSONObj res; - const auto shard = grid.shardRegistry()->getShard(shardId); + const auto shard = grid.shardRegistry()->getShard(txn, shardId); ShardConnection conn(shard->getConnString(), ns); bool ok = conn->runCommand(conf->name(), cmdObj, res); conn.done(); diff --git a/src/mongo/s/commands/cluster_find_cmd.cpp b/src/mongo/s/commands/cluster_find_cmd.cpp index 6a884e372e3..e38bca1334b 100644 --- a/src/mongo/s/commands/cluster_find_cmd.cpp +++ b/src/mongo/s/commands/cluster_find_cmd.cpp @@ -143,7 +143,7 @@ public: const char* mongosStageName = ClusterExplain::getStageNameForReadOp(shardResults, cmdObj); return ClusterExplain::buildExplainResult( - shardResults, mongosStageName, millisElapsed, out); + txn, shardResults, mongosStageName, millisElapsed, out); } bool run(OperationContext* txn, diff --git a/src/mongo/s/commands/cluster_fsync_cmd.cpp b/src/mongo/s/commands/cluster_fsync_cmd.cpp index 90735a25784..34098d450d6 100644 --- a/src/mongo/s/commands/cluster_fsync_cmd.cpp +++ b/src/mongo/s/commands/cluster_fsync_cmd.cpp @@ -86,7 +86,7 @@ public: grid.shardRegistry()->getAllShardIds(&shardIds); for (const ShardId& shardId : shardIds) { - const auto s = grid.shardRegistry()->getShard(shardId); + const auto s = grid.shardRegistry()->getShard(txn, shardId); if (!s) { continue; } diff --git a/src/mongo/s/commands/cluster_kill_op.cpp b/src/mongo/s/commands/cluster_kill_op.cpp index 056f4b7ff60..d99abb461f0 100644 --- a/src/mongo/s/commands/cluster_kill_op.cpp +++ b/src/mongo/s/commands/cluster_kill_op.cpp @@ -99,7 +99,7 @@ public: log() << "want to kill op: " << opToKill; // Will throw if shard id is not found - auto shard = grid.shardRegistry()->getShard(shardIdent); + auto shard = grid.shardRegistry()->getShard(txn, shardIdent); if (!shard) { return appendCommandStatus( result, diff --git a/src/mongo/s/commands/cluster_list_databases_cmd.cpp b/src/mongo/s/commands/cluster_list_databases_cmd.cpp index 0878123bc39..fc2ea7b22f0 100644 --- a/src/mongo/s/commands/cluster_list_databases_cmd.cpp +++ b/src/mongo/s/commands/cluster_list_databases_cmd.cpp @@ -94,7 +94,7 @@ public: grid.shardRegistry()->getAllShardIds(&shardIds); for (const ShardId& shardId : shardIds) { - const auto s = grid.shardRegistry()->getShard(shardId); + const auto s = grid.shardRegistry()->getShard(txn, shardId); if (!s) { continue; } diff --git a/src/mongo/s/commands/cluster_list_shards_cmd.cpp b/src/mongo/s/commands/cluster_list_shards_cmd.cpp index da852b42ee7..dedbc136726 100644 --- a/src/mongo/s/commands/cluster_list_shards_cmd.cpp +++ b/src/mongo/s/commands/cluster_list_shards_cmd.cpp @@ -74,7 +74,7 @@ public: std::string& errmsg, BSONObjBuilder& result) { std::vector<ShardType> shards; - Status status = grid.catalogManager(txn)->getAllShards(&shards); + Status status = grid.catalogManager(txn)->getAllShards(txn, &shards); if (!status.isOK()) { return appendCommandStatus(result, status); } diff --git a/src/mongo/s/commands/cluster_map_reduce_cmd.cpp b/src/mongo/s/commands/cluster_map_reduce_cmd.cpp index 728ebc76bd7..3c942c5e89e 100644 --- a/src/mongo/s/commands/cluster_map_reduce_cmd.cpp +++ b/src/mongo/s/commands/cluster_map_reduce_cmd.cpp @@ -266,7 +266,7 @@ public: if (!shardedInput && !shardedOutput && !customOutDB) { LOG(1) << "simple MR, just passthrough"; - const auto shard = grid.shardRegistry()->getShard(confIn->getPrimaryId()); + const auto shard = grid.shardRegistry()->getShard(txn, confIn->getPrimaryId()); ShardConnection conn(shard->getConnString(), ""); BSONObj res; @@ -312,7 +312,7 @@ public: // Need to gather list of all servers even if an error happened string server; { - const auto shard = grid.shardRegistry()->getShard(mrResult.shardTargetId); + const auto shard = grid.shardRegistry()->getShard(txn, mrResult.shardTargetId); server = shard->getConnString().toString(); } servers.insert(server); @@ -403,7 +403,7 @@ public: BSONObj singleResult; if (!shardedOutput) { - const auto shard = grid.shardRegistry()->getShard(confOut->getPrimaryId()); + const auto shard = grid.shardRegistry()->getShard(txn, confOut->getPrimaryId()); LOG(1) << "MR with single shard output, NS=" << finalColLong << " primary=" << shard->toString(); @@ -481,7 +481,8 @@ public: for (const auto& mrResult : mrCommandResults) { string server; { - const auto shard = grid.shardRegistry()->getShard(mrResult.shardTargetId); + const auto shard = + grid.shardRegistry()->getShard(txn, mrResult.shardTargetId); server = shard->getConnString().toString(); } singleResult = mrResult.result; diff --git a/src/mongo/s/commands/cluster_merge_chunks_cmd.cpp b/src/mongo/s/commands/cluster_merge_chunks_cmd.cpp index 2c3802a8f3b..f040fafa4c6 100644 --- a/src/mongo/s/commands/cluster_merge_chunks_cmd.cpp +++ b/src/mongo/s/commands/cluster_merge_chunks_cmd.cpp @@ -183,7 +183,7 @@ public: // Throws, but handled at level above. Don't want to rewrap to preserve exception // formatting. - const auto shard = grid.shardRegistry()->getShard(firstChunk->getShardId()); + const auto shard = grid.shardRegistry()->getShard(txn, firstChunk->getShardId()); if (!shard) { return appendCommandStatus( result, diff --git a/src/mongo/s/commands/cluster_move_chunk_cmd.cpp b/src/mongo/s/commands/cluster_move_chunk_cmd.cpp index 9dc54bb633c..8f033427b6e 100644 --- a/src/mongo/s/commands/cluster_move_chunk_cmd.cpp +++ b/src/mongo/s/commands/cluster_move_chunk_cmd.cpp @@ -140,7 +140,7 @@ public: return false; } - const auto to = grid.shardRegistry()->getShard(toString); + const auto to = grid.shardRegistry()->getShard(txn, toString); if (!to) { string msg(str::stream() << "Could not move chunk in '" << nss.ns() << "' to shard '" << toString << "' because that shard does not exist"); @@ -208,7 +208,7 @@ public: } { - const auto from = grid.shardRegistry()->getShard(chunk->getShardId()); + const auto from = grid.shardRegistry()->getShard(txn, chunk->getShardId()); if (from->getId() == to->getId()) { errmsg = "that chunk is already on that shard"; return false; diff --git a/src/mongo/s/commands/cluster_move_primary_cmd.cpp b/src/mongo/s/commands/cluster_move_primary_cmd.cpp index dff18f954c2..2736a962a9c 100644 --- a/src/mongo/s/commands/cluster_move_primary_cmd.cpp +++ b/src/mongo/s/commands/cluster_move_primary_cmd.cpp @@ -126,7 +126,7 @@ public: return false; } - shared_ptr<Shard> toShard = grid.shardRegistry()->getShard(to); + shared_ptr<Shard> toShard = grid.shardRegistry()->getShard(txn, to); if (!toShard) { string msg(str::stream() << "Could not move database '" << dbname << "' to shard '" << to << "' because the shard does not exist"); @@ -134,7 +134,7 @@ public: return appendCommandStatus(result, Status(ErrorCodes::ShardNotFound, msg)); } - shared_ptr<Shard> fromShard = grid.shardRegistry()->getShard(config->getPrimaryId()); + shared_ptr<Shard> fromShard = grid.shardRegistry()->getShard(txn, config->getPrimaryId()); invariant(fromShard); if (fromShard->getConnString().sameLogicalEndpoint(toShard->getConnString())) { diff --git a/src/mongo/s/commands/cluster_pipeline_cmd.cpp b/src/mongo/s/commands/cluster_pipeline_cmd.cpp index ce272013709..820fa945ad9 100644 --- a/src/mongo/s/commands/cluster_pipeline_cmd.cpp +++ b/src/mongo/s/commands/cluster_pipeline_cmd.cpp @@ -110,7 +110,7 @@ public: shared_ptr<DBConfig> conf = status.getValue(); if (!conf->isShardingEnabled()) { - return aggPassthrough(conf, cmdObj, result, options); + return aggPassthrough(txn, conf, cmdObj, result, options); } intrusive_ptr<ExpressionContext> mergeCtx = @@ -131,7 +131,7 @@ public: } if (!conf->isSharded(fullns)) { - return aggPassthrough(conf, cmdObj, result, options); + return aggPassthrough(txn, conf, cmdObj, result, options); } // If the first $match stage is an exact match on the shard key, we only have to send it @@ -234,7 +234,7 @@ public: const auto& mergingShardId = needPrimaryShardMerger ? conf->getPrimaryId() : shardResults[prng.nextInt32(shardResults.size())].shardTargetId; - const auto mergingShard = grid.shardRegistry()->getShard(mergingShardId); + const auto mergingShard = grid.shardRegistry()->getShard(txn, mergingShardId); ShardConnection conn(mergingShard->getConnString(), outputNsOrEmpty); BSONObj mergedResults = aggRunCommand(conn.get(), dbname, mergeCmd.freeze().toBson(), options); @@ -261,7 +261,8 @@ private: // returned cursors with mongos's cursorCache. BSONObj aggRunCommand(DBClientBase* conn, const string& db, BSONObj cmd, int queryOptions); - bool aggPassthrough(shared_ptr<DBConfig> conf, + bool aggPassthrough(OperationContext* txn, + shared_ptr<DBConfig> conf, BSONObj cmd, BSONObjBuilder& result, int queryOptions); @@ -398,12 +399,13 @@ BSONObj PipelineCommand::aggRunCommand(DBClientBase* conn, return result; } -bool PipelineCommand::aggPassthrough(shared_ptr<DBConfig> conf, +bool PipelineCommand::aggPassthrough(OperationContext* txn, + shared_ptr<DBConfig> conf, BSONObj cmd, BSONObjBuilder& out, int queryOptions) { // Temporary hack. See comment on declaration for details. - const auto shard = grid.shardRegistry()->getShard(conf->getPrimaryId()); + const auto shard = grid.shardRegistry()->getShard(txn, conf->getPrimaryId()); ShardConnection conn(shard->getConnString(), ""); BSONObj result = aggRunCommand(conn.get(), conf->name(), cmd, queryOptions); conn.done(); diff --git a/src/mongo/s/commands/cluster_remove_shard_cmd.cpp b/src/mongo/s/commands/cluster_remove_shard_cmd.cpp index a9ad5bd122d..c225bfa2f55 100644 --- a/src/mongo/s/commands/cluster_remove_shard_cmd.cpp +++ b/src/mongo/s/commands/cluster_remove_shard_cmd.cpp @@ -86,7 +86,7 @@ public: BSONObjBuilder& result) { const string target = cmdObj.firstElement().valuestrsafe(); - const auto s = grid.shardRegistry()->getShard(target); + const auto s = grid.shardRegistry()->getShard(txn, target); if (!s) { string msg(str::stream() << "Could not drop shard '" << target << "' because it does not exist"); diff --git a/src/mongo/s/commands/cluster_shard_collection_cmd.cpp b/src/mongo/s/commands/cluster_shard_collection_cmd.cpp index c84946580f4..9a7cfa376c0 100644 --- a/src/mongo/s/commands/cluster_shard_collection_cmd.cpp +++ b/src/mongo/s/commands/cluster_shard_collection_cmd.cpp @@ -171,7 +171,7 @@ public: // The rest of the checks require a connection to the primary db ConnectionString shardConnString; { - const auto shard = grid.shardRegistry()->getShard(config->getPrimaryId()); + const auto shard = grid.shardRegistry()->getShard(txn, config->getPrimaryId()); shardConnString = shard->getConnString(); } @@ -416,7 +416,7 @@ public: int i = 0; for (ChunkMap::const_iterator c = chunkMap.begin(); c != chunkMap.end(); ++c, ++i) { const ShardId& shardId = shardIds[i % numShards]; - const auto to = grid.shardRegistry()->getShard(shardId); + const auto to = grid.shardRegistry()->getShard(txn, shardId); if (!to) { continue; } diff --git a/src/mongo/s/commands/cluster_write_cmd.cpp b/src/mongo/s/commands/cluster_write_cmd.cpp index cea3c22307b..2a7176339fe 100644 --- a/src/mongo/s/commands/cluster_write_cmd.cpp +++ b/src/mongo/s/commands/cluster_write_cmd.cpp @@ -126,7 +126,7 @@ public: } return ClusterExplain::buildExplainResult( - shardResults, ClusterExplain::kWriteOnShards, timer.millis(), out); + txn, shardResults, ClusterExplain::kWriteOnShards, timer.millis(), out); } virtual bool run(OperationContext* txn, @@ -265,7 +265,7 @@ private: const ShardEndpoint* endpoint = *it; ConnectionString host; - Status status = resolver.chooseWriteHost(endpoint->shardName, &host); + Status status = resolver.chooseWriteHost(txn, endpoint->shardName, &host); if (!status.isOK()) return status; @@ -291,7 +291,7 @@ private: Strategy::CommandResult result; result.target = host; { - const auto shard = grid.shardRegistry()->getShard(host.toString()); + const auto shard = grid.shardRegistry()->getShard(txn, host.toString()); result.shardTargetId = shard->getId(); } result.result = response.toBSON(); diff --git a/src/mongo/s/commands/commands_public.cpp b/src/mongo/s/commands/commands_public.cpp index 65ebfe77ad5..6a404bad604 100644 --- a/src/mongo/s/commands/commands_public.cpp +++ b/src/mongo/s/commands/commands_public.cpp @@ -100,30 +100,36 @@ public: } protected: - bool passthrough(shared_ptr<DBConfig> conf, const BSONObj& cmdObj, BSONObjBuilder& result) { - return _passthrough(conf->name(), conf, cmdObj, 0, result); + bool passthrough(OperationContext* txn, + shared_ptr<DBConfig> conf, + const BSONObj& cmdObj, + BSONObjBuilder& result) { + return _passthrough(txn, conf->name(), conf, cmdObj, 0, result); } - bool adminPassthrough(shared_ptr<DBConfig> conf, + bool adminPassthrough(OperationContext* txn, + shared_ptr<DBConfig> conf, const BSONObj& cmdObj, BSONObjBuilder& result) { - return _passthrough("admin", conf, cmdObj, 0, result); + return _passthrough(txn, "admin", conf, cmdObj, 0, result); } - bool passthrough(shared_ptr<DBConfig> conf, + bool passthrough(OperationContext* txn, + shared_ptr<DBConfig> conf, const BSONObj& cmdObj, int options, BSONObjBuilder& result) { - return _passthrough(conf->name(), conf, cmdObj, options, result); + return _passthrough(txn, conf->name(), conf, cmdObj, options, result); } private: - bool _passthrough(const string& db, + bool _passthrough(OperationContext* txn, + const string& db, shared_ptr<DBConfig> conf, const BSONObj& cmdObj, int options, BSONObjBuilder& result) { - const auto shard = grid.shardRegistry()->getShard(conf->getPrimaryId()); + const auto shard = grid.shardRegistry()->getShard(txn, conf->getPrimaryId()); ShardConnection conn(shard->getConnString(), ""); BSONObj res; @@ -155,7 +161,7 @@ public: shared_ptr<DBConfig> conf = status.getValue(); if (!conf->isShardingEnabled() || !conf->isSharded(fullns)) { - shardIds.push_back(conf->getShardId(fullns)); + shardIds.push_back(conf->getShardId(txn, fullns)); } else { grid.shardRegistry()->getAllShardIds(&shardIds); } @@ -177,7 +183,7 @@ public: auto conf = uassertStatusOK(grid.catalogCache()->getDatabase(txn, dbName)); if (!conf->isSharded(fullns)) { - return passthrough(conf, cmdObj, options, result); + return passthrough(txn, conf, cmdObj, options, result); } return appendCommandStatus( @@ -407,7 +413,7 @@ public: shared_ptr<DBConfig> conf = status.getValue(); - return passthrough(conf, cmdObj, result); + return passthrough(txn, conf, cmdObj, result); } } createCmd; @@ -445,7 +451,7 @@ public: const auto& db = status.getValue(); if (!db->isShardingEnabled() || !db->isSharded(fullns)) { log() << "\tdrop going to do passthrough"; - return passthrough(db, cmdObj, result); + return passthrough(txn, db, cmdObj, result); } uassertStatusOK(grid.catalogManager(txn)->dropCollection(txn, NamespaceString(fullns))); @@ -486,14 +492,14 @@ public: uassert(13138, "You can't rename a sharded collection", !confFrom->isSharded(fullnsFrom)); uassert(13139, "You can't rename to a sharded collection", !confTo->isSharded(fullnsTo)); - const ShardId& shardTo = confTo->getShardId(fullnsTo); - const ShardId& shardFrom = confFrom->getShardId(fullnsFrom); + const ShardId& shardTo = confTo->getShardId(txn, fullnsTo); + const ShardId& shardFrom = confFrom->getShardId(txn, fullnsFrom); uassert(13137, "Source and destination collections must be on same shard", shardFrom == shardTo); - return adminPassthrough(confFrom, cmdObj, result); + return adminPassthrough(txn, confFrom, cmdObj, result); } } renameCollectionCmd; @@ -526,7 +532,7 @@ public: const string fromhost = cmdObj.getStringField("fromhost"); if (!fromhost.empty()) { - return adminPassthrough(confTo, cmdObj, result); + return adminPassthrough(txn, confTo, cmdObj, result); } else { const string fromdb = cmdObj.getStringField("fromdb"); uassert(13399, "need a fromdb argument", !fromdb.empty()); @@ -545,12 +551,12 @@ public: } { - const auto& shard = grid.shardRegistry()->getShard(confFrom->getPrimaryId()); + const auto& shard = grid.shardRegistry()->getShard(txn, confFrom->getPrimaryId()); b.append("fromhost", shard->getConnString().toString()); } BSONObj fixed = b.obj(); - return adminPassthrough(confTo, fixed, result); + return adminPassthrough(txn, confTo, fixed, result); } } @@ -580,7 +586,7 @@ public: result.appendBool("sharded", false); result.append("primary", conf->getPrimaryId()); - return passthrough(conf, cmdObj, result); + return passthrough(txn, conf, cmdObj, result); } result.appendBool("sharded", true); @@ -603,7 +609,7 @@ public: cm->getAllShardIds(&shardIds); for (const ShardId& shardId : shardIds) { - const auto shard = grid.shardRegistry()->getShard(shardId); + const auto shard = grid.shardRegistry()->getShard(txn, shardId); if (!shard) { continue; } @@ -741,7 +747,7 @@ public: auto conf = uassertStatusOK(grid.catalogCache()->getDatabase(txn, dbName)); if (!conf->isShardingEnabled() || !conf->isSharded(fullns)) { - return passthrough(conf, cmdObj, result); + return passthrough(txn, conf, cmdObj, result); } ChunkManagerPtr cm = conf->getChunkManager(txn, fullns); @@ -772,7 +778,7 @@ public: set<ShardId> shardIds; cm->getShardIdsForRange(shardIds, min, max); for (const ShardId& shardId : shardIds) { - const auto shard = grid.shardRegistry()->getShard(shardId); + const auto shard = grid.shardRegistry()->getShard(txn, shardId); if (!shard) { continue; } @@ -859,7 +865,7 @@ public: shardResults.push_back(singleResult); return ClusterExplain::buildExplainResult( - shardResults, ClusterExplain::kSingleShard, millisElapsed, out); + txn, shardResults, ClusterExplain::kSingleShard, millisElapsed, out); } } groupCmd; @@ -931,7 +937,7 @@ public: shared_ptr<DBConfig> conf = status.getValue(); if (!conf->isShardingEnabled() || !conf->isSharded(fullns)) { - return passthrough(conf, cmdObj, options, result); + return passthrough(txn, conf, cmdObj, options, result); } ChunkManagerPtr cm = conf->getChunkManager(txn, fullns); @@ -945,7 +951,7 @@ public: int size = 32; for (const ShardId& shardId : shardIds) { - const auto shard = grid.shardRegistry()->getShard(shardId); + const auto shard = grid.shardRegistry()->getShard(txn, shardId); if (!shard) { continue; } @@ -1011,7 +1017,7 @@ public: const char* mongosStageName = ClusterExplain::getStageNameForReadOp(shardResults, cmdObj); return ClusterExplain::buildExplainResult( - shardResults, mongosStageName, millisElapsed, out); + txn, shardResults, mongosStageName, millisElapsed, out); } } disinctCmd; @@ -1046,7 +1052,7 @@ public: auto conf = uassertStatusOK(grid.catalogCache()->getDatabase(txn, dbName)); if (!conf->isShardingEnabled() || !conf->isSharded(fullns)) { - return passthrough(conf, cmdObj, result); + return passthrough(txn, conf, cmdObj, result); } ChunkManagerPtr cm = conf->getChunkManager(txn, fullns); @@ -1174,7 +1180,7 @@ public: auto conf = uassertStatusOK(grid.catalogCache()->getDatabase(txn, dbName)); if (!conf->isShardingEnabled() || !conf->isSharded(fullns)) { - return passthrough(conf, cmdObj, options, result); + return passthrough(txn, conf, cmdObj, options, result); } ChunkManagerPtr cm = conf->getChunkManager(txn, fullns); @@ -1193,7 +1199,7 @@ public: list<shared_ptr<Future::CommandResult>> futures; BSONArrayBuilder shardArray; for (const ShardId& shardId : shardIds) { - const auto shard = grid.shardRegistry()->getShard(shardId); + const auto shard = grid.shardRegistry()->getShard(txn, shardId); if (!shard) { continue; } @@ -1344,7 +1350,7 @@ public: } shared_ptr<DBConfig> conf = status.getValue(); - return passthrough(conf, cmdObj, result); + return passthrough(txn, conf, cmdObj, result); } } evalCmd; @@ -1384,9 +1390,9 @@ public: } shared_ptr<DBConfig> conf = status.getValue(); - bool retval = passthrough(conf, cmdObj, result); + bool retval = passthrough(txn, conf, cmdObj, result); - const auto shard = grid.shardRegistry()->getShard(conf->getPrimaryId()); + const auto shard = grid.shardRegistry()->getShard(txn, conf->getPrimaryId()); Status storeCursorStatus = storePossibleCursor(shard->getConnString().toString(), result.asTempObj()); if (!storeCursorStatus.isOK()) { @@ -1416,9 +1422,9 @@ public: string& errmsg, BSONObjBuilder& result) { auto conf = uassertStatusOK(grid.catalogCache()->getDatabase(txn, dbName)); - bool retval = passthrough(conf, cmdObj, result); + bool retval = passthrough(txn, conf, cmdObj, result); - const auto shard = grid.shardRegistry()->getShard(conf->getPrimaryId()); + const auto shard = grid.shardRegistry()->getShard(txn, conf->getPrimaryId()); Status storeCursorStatus = storePossibleCursor(shard->getConnString().toString(), result.asTempObj()); if (!storeCursorStatus.isOK()) { diff --git a/src/mongo/s/commands/run_on_all_shards_cmd.cpp b/src/mongo/s/commands/run_on_all_shards_cmd.cpp index 91b10dd1db0..157c556d65a 100644 --- a/src/mongo/s/commands/run_on_all_shards_cmd.cpp +++ b/src/mongo/s/commands/run_on_all_shards_cmd.cpp @@ -86,7 +86,7 @@ bool RunOnAllShardsCommand::run(OperationContext* txn, std::list<std::shared_ptr<Future::CommandResult>> futures; for (const ShardId& shardId : shardIds) { - const auto shard = grid.shardRegistry()->getShard(shardId); + const auto shard = grid.shardRegistry()->getShard(txn, shardId); if (!shard) { continue; } diff --git a/src/mongo/s/config.cpp b/src/mongo/s/config.cpp index 6d4977089d0..0f6a0918fa9 100644 --- a/src/mongo/s/config.cpp +++ b/src/mongo/s/config.cpp @@ -164,10 +164,10 @@ bool DBConfig::isSharded(const string& ns) { return i->second.isSharded(); } -const ShardId& DBConfig::getShardId(const string& ns) { +const ShardId& DBConfig::getShardId(OperationContext* txn, const string& ns) { uassert(28679, "ns can't be sharded", !isSharded(ns)); - uassert(10178, "no primary!", grid.shardRegistry()->getShard(_primaryId)); + uassert(10178, "no primary!", grid.shardRegistry()->getShard(txn, _primaryId)); return _primaryId; } @@ -220,7 +220,8 @@ bool DBConfig::removeSharding(OperationContext* txn, const string& ns) { // Handles weird logic related to getting *either* a chunk manager *or* the collection primary // shard -void DBConfig::getChunkManagerOrPrimary(const string& ns, +void DBConfig::getChunkManagerOrPrimary(OperationContext* txn, + const string& ns, std::shared_ptr<ChunkManager>& manager, std::shared_ptr<Shard>& primary) { // The logic here is basically that at any time, our collection can become sharded or @@ -239,7 +240,7 @@ void DBConfig::getChunkManagerOrPrimary(const string& ns, // No namespace if (i == _collections.end()) { // If we don't know about this namespace, it's unsharded by default - primary = grid.shardRegistry()->getShard(_primaryId); + primary = grid.shardRegistry()->getShard(txn, _primaryId); } else { CollectionInfo& cInfo = i->second; @@ -250,7 +251,7 @@ void DBConfig::getChunkManagerOrPrimary(const string& ns, if (_shardingEnabled && cInfo.isSharded()) { manager = cInfo.getCM(); } else { - primary = grid.shardRegistry()->getShard(_primaryId); + primary = grid.shardRegistry()->getShard(txn, _primaryId); } } } @@ -428,7 +429,7 @@ std::shared_ptr<ChunkManager> DBConfig::getChunkManager(OperationContext* txn, } void DBConfig::setPrimary(OperationContext* txn, const std::string& s) { - const auto shard = grid.shardRegistry()->getShard(s); + const auto shard = grid.shardRegistry()->getShard(txn, s); stdx::lock_guard<stdx::mutex> lk(_lock); _primaryId = shard->getId(); @@ -572,7 +573,7 @@ bool DBConfig::dropDatabase(OperationContext* txn, string& errmsg) { // 3 { - const auto shard = grid.shardRegistry()->getShard(_primaryId); + const auto shard = grid.shardRegistry()->getShard(txn, _primaryId); ScopedDbConnection conn(shard->getConnString(), 30.0); BSONObj res; if (!conn->dropDatabase(_name, &res)) { @@ -584,7 +585,7 @@ bool DBConfig::dropDatabase(OperationContext* txn, string& errmsg) { // 4 for (const ShardId& shardId : shardIds) { - const auto shard = grid.shardRegistry()->getShard(shardId); + const auto shard = grid.shardRegistry()->getShard(txn, shardId); if (!shard) { continue; } diff --git a/src/mongo/s/config.h b/src/mongo/s/config.h index 6c5d362f2b3..d01a2951664 100644 --- a/src/mongo/s/config.h +++ b/src/mongo/s/config.h @@ -145,7 +145,8 @@ public: // Atomically returns *either* the chunk manager *or* the primary shard for the collection, // neither if the collection doesn't exist. - void getChunkManagerOrPrimary(const std::string& ns, + void getChunkManagerOrPrimary(OperationContext* txn, + const std::string& ns, std::shared_ptr<ChunkManager>& manager, std::shared_ptr<Shard>& primary); @@ -161,7 +162,7 @@ public: /** * Returns shard id for primary shard for the database for which this DBConfig represents. */ - const ShardId& getShardId(const std::string& ns); + const ShardId& getShardId(OperationContext* txn, const std::string& ns); void setPrimary(OperationContext* txn, const std::string& s); diff --git a/src/mongo/s/d_migrate.cpp b/src/mongo/s/d_migrate.cpp index f74d1b45265..cf687a61a66 100644 --- a/src/mongo/s/d_migrate.cpp +++ b/src/mongo/s/d_migrate.cpp @@ -417,7 +417,7 @@ public: // Make sure we're as up-to-date as possible with shard information // This catches the case where we had to previously changed a shard's host by // removing/adding a shard with the same name - grid.shardRegistry()->reload(); + grid.shardRegistry()->reload(txn); MoveTimingHelper timing( txn, "from", ns, min, max, 6 /* steps */, &errmsg, toShardName, fromShardName); @@ -547,7 +547,7 @@ public: // Resolve the shard connection strings. { - std::shared_ptr<Shard> fromShard = grid.shardRegistry()->getShard(fromShardName); + std::shared_ptr<Shard> fromShard = grid.shardRegistry()->getShard(txn, fromShardName); uassert(28674, str::stream() << "Source shard " << fromShardName << " is missing. This indicates metadata corruption.", @@ -555,7 +555,7 @@ public: fromShardCS = fromShard->getConnString(); - std::shared_ptr<Shard> toShard = grid.shardRegistry()->getShard(toShardName); + std::shared_ptr<Shard> toShard = grid.shardRegistry()->getShard(txn, toShardName); uassert(28675, str::stream() << "Destination shard " << toShardName << " is missing. This indicates metadata corruption.", diff --git a/src/mongo/s/dbclient_shard_resolver.cpp b/src/mongo/s/dbclient_shard_resolver.cpp index 92694a51243..bc7dc38fb27 100644 --- a/src/mongo/s/dbclient_shard_resolver.cpp +++ b/src/mongo/s/dbclient_shard_resolver.cpp @@ -39,10 +39,11 @@ namespace mongo { using std::string; -Status DBClientShardResolver::chooseWriteHost(const string& shardName, +Status DBClientShardResolver::chooseWriteHost(OperationContext* txn, + const string& shardName, ConnectionString* shardHost) const { // Internally uses our shard cache, does no reload - std::shared_ptr<Shard> shard = grid.shardRegistry()->getShard(shardName); + std::shared_ptr<Shard> shard = grid.shardRegistry()->getShard(txn, shardName); if (!shard) { return Status(ErrorCodes::ShardNotFound, str::stream() << "unknown shard name " << shardName); diff --git a/src/mongo/s/dbclient_shard_resolver.h b/src/mongo/s/dbclient_shard_resolver.h index 076dd078562..ba70bdf1dc3 100644 --- a/src/mongo/s/dbclient_shard_resolver.h +++ b/src/mongo/s/dbclient_shard_resolver.h @@ -54,7 +54,9 @@ public: * Returns ReplicaSetNotFound if the replica set is not being tracked * Returns !OK with message if the shard host could not be found for other reasons. */ - Status chooseWriteHost(const std::string& shardName, ConnectionString* shardHost) const; + Status chooseWriteHost(OperationContext* txn, + const std::string& shardName, + ConnectionString* shardHost) const override; /** * Resolves a replica set connection string to a master or returns an error. diff --git a/src/mongo/s/mock_shard_resolver.h b/src/mongo/s/mock_shard_resolver.h index 7816556abe9..d2263274436 100644 --- a/src/mongo/s/mock_shard_resolver.h +++ b/src/mongo/s/mock_shard_resolver.h @@ -41,7 +41,9 @@ class MockShardResolver : public ShardResolver { public: virtual ~MockShardResolver() {} - Status chooseWriteHost(const std::string& shardName, ConnectionString* shardHost) const { + Status chooseWriteHost(OperationContext* txn, + const std::string& shardName, + ConnectionString* shardHost) const { *shardHost = unittest::assertGet(ConnectionString::parse(std::string("$") + shardName + ":12345")); return Status::OK(); diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp index 7441a613124..29dd5ab74dd 100644 --- a/src/mongo/s/query/cluster_find.cpp +++ b/src/mongo/s/query/cluster_find.cpp @@ -113,7 +113,7 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* txn, chunkManager->getShardIdsForQuery(shardIds, query.getParsed().getFilter()); for (auto id : shardIds) { - shards.emplace_back(shardRegistry->getShard(id)); + shards.emplace_back(shardRegistry->getShard(txn, id)); } } @@ -233,7 +233,7 @@ StatusWith<CursorId> ClusterFind::runQuery(OperationContext* txn, std::shared_ptr<ChunkManager> chunkManager; std::shared_ptr<Shard> primary; - dbConfig.getValue()->getChunkManagerOrPrimary(query.nss().ns(), chunkManager, primary); + dbConfig.getValue()->getChunkManagerOrPrimary(txn, query.nss().ns(), chunkManager, primary); // Re-target and re-send the initial find command to the shards until we have established the // shard version. diff --git a/src/mongo/s/shard_resolver.h b/src/mongo/s/shard_resolver.h index 6c229290805..4a0fa53f48c 100644 --- a/src/mongo/s/shard_resolver.h +++ b/src/mongo/s/shard_resolver.h @@ -36,6 +36,8 @@ namespace mongo { +class OperationContext; + /** * Given a shard name, the ShardResolver resolves a particular host on that shard to contact. * @@ -51,7 +53,8 @@ public: * * Returns !OK with message if the shard host could not be found for other reasons. */ - virtual Status chooseWriteHost(const std::string& shardName, + virtual Status chooseWriteHost(OperationContext* txn, + const std::string& shardName, ConnectionString* shardHost) const = 0; }; diff --git a/src/mongo/s/shard_util.cpp b/src/mongo/s/shard_util.cpp index 53e4b0c0eb2..21d25e539ab 100644 --- a/src/mongo/s/shard_util.cpp +++ b/src/mongo/s/shard_util.cpp @@ -39,8 +39,10 @@ namespace mongo { namespace shardutil { -StatusWith<long long> retrieveTotalShardSize(ShardId shardId, ShardRegistry* shardRegistry) { - auto shard = shardRegistry->getShard(shardId); +StatusWith<long long> retrieveTotalShardSize(OperationContext* txn, + ShardId shardId, + ShardRegistry* shardRegistry) { + auto shard = shardRegistry->getShard(txn, shardId); if (!shard) { return {ErrorCodes::ShardNotFound, str::stream() << "shard " << shardId << " not found"}; } diff --git a/src/mongo/s/shard_util.h b/src/mongo/s/shard_util.h index b9d546b75a2..640dd09d5c1 100644 --- a/src/mongo/s/shard_util.h +++ b/src/mongo/s/shard_util.h @@ -34,6 +34,7 @@ namespace mongo { +class OperationContext; class ShardRegistry; template <typename T> class StatusWith; @@ -50,7 +51,9 @@ namespace shardutil { * ShardNotFound if shard by that id is not available on the registry * NoSuchKey if the total shard size could not be retrieved */ -StatusWith<long long> retrieveTotalShardSize(ShardId shardId, ShardRegistry* shardRegistry); +StatusWith<long long> retrieveTotalShardSize(OperationContext* txn, + ShardId shardId, + ShardRegistry* shardRegistry); }; } // namespace mongo diff --git a/src/mongo/s/strategy.cpp b/src/mongo/s/strategy.cpp index 2a21d11208c..f0ff5d58c29 100644 --- a/src/mongo/s/strategy.cpp +++ b/src/mongo/s/strategy.cpp @@ -113,12 +113,12 @@ static bool doShardedIndexQuery(OperationContext* txn, Request& r, const QuerySp ShardPtr shard; ChunkManagerPtr cm; - config->getChunkManagerOrPrimary(indexNSSQuery.ns(), cm, shard); + config->getChunkManagerOrPrimary(txn, indexNSSQuery.ns(), cm, shard); if (cm) { set<ShardId> shardIds; cm->getAllShardIds(&shardIds); verify(shardIds.size() > 0); - shard = grid.shardRegistry()->getShard(*shardIds.begin()); + shard = grid.shardRegistry()->getShard(txn, *shardIds.begin()); } ShardConnection dbcon(shard->getConnString(), r.getns()); @@ -304,7 +304,7 @@ void Strategy::queryOp(OperationContext* txn, Request& r) { // Remote cursors are stored remotely, we shouldn't need this around. unique_ptr<ParallelSortClusteredCursor> cursorDeleter(cursor); - ShardPtr shard = cursor->getQueryShard(); + ShardPtr shard = grid.shardRegistry()->getShard(txn, cursor->getQueryShardId()); verify(shard.get()); DBClientCursorPtr shardCursor = cursor->getShardCursor(shard->getId()); @@ -491,7 +491,7 @@ Status Strategy::commandOpUnsharded(OperationContext* txn, return Status(ErrorCodes::IllegalOperation, ss); } - const auto primaryShard = grid.shardRegistry()->getShard(conf->getPrimaryId()); + const auto primaryShard = grid.shardRegistry()->getShard(txn, conf->getPrimaryId()); BSONObj shardResult; try { @@ -577,7 +577,7 @@ void Strategy::getMore(OperationContext* txn, Request& r) { ShardPtr primary; ChunkManagerPtr info; - config->getChunkManagerOrPrimary(ns, info, primary); + config->getChunkManagerOrPrimary(txn, ns, info, primary); // // TODO: Cleanup cursor cache, consolidate into single codepath diff --git a/src/mongo/s/version_manager.cpp b/src/mongo/s/version_manager.cpp index 54c9299195e..8aa785b6d20 100644 --- a/src/mongo/s/version_manager.cpp +++ b/src/mongo/s/version_manager.cpp @@ -108,7 +108,8 @@ private: /** * Sends the setShardVersion command on the specified connection. */ -bool setShardVersion(DBClientBase& conn, +bool setShardVersion(OperationContext* txn, + DBClientBase& conn, const string& ns, const ConnectionString& configServer, ChunkVersion version, @@ -118,7 +119,7 @@ bool setShardVersion(DBClientBase& conn, ShardId shardId; ConnectionString shardCS; { - const auto shard = grid.shardRegistry()->getShard(conn.getServerAddress()); + const auto shard = grid.shardRegistry()->getShard(txn, conn.getServerAddress()); shardId = shard->getId(); shardCS = shard->getConnString(); } @@ -205,14 +206,15 @@ bool initShardVersionEmptyNS(OperationContext* txn, DBClientBase* conn_in) { // Check to see if this is actually a shard and not a single config server // NOTE: Config servers are registered only by the name "config" in the shard cache, not // by host, so lookup by host will fail unless the host is also a shard. - const auto shard = grid.shardRegistry()->getShard(conn->getServerAddress()); + const auto shard = grid.shardRegistry()->getShard(txn, conn->getServerAddress()); if (!shard) { return false; } LOG(1) << "initializing shard connection to " << shard->toString(); - ok = setShardVersion(*conn, + ok = setShardVersion(txn, + *conn, "", grid.shardRegistry()->getConfigServerConnectionString(), ChunkVersion(), @@ -301,7 +303,7 @@ bool checkShardVersion(OperationContext* txn, shared_ptr<Shard> primary; shared_ptr<ChunkManager> manager; - conf->getChunkManagerOrPrimary(ns, manager, primary); + conf->getChunkManagerOrPrimary(txn, ns, manager, primary); unsigned long long officialSequenceNumber = 0; @@ -313,7 +315,7 @@ bool checkShardVersion(OperationContext* txn, return false; } - const auto shard = grid.shardRegistry()->getShard(conn->getServerAddress()); + const auto shard = grid.shardRegistry()->getShard(txn, conn->getServerAddress()); uassert(ErrorCodes::ShardNotFound, str::stream() << conn->getServerAddress() << " is not recognized as a shard", shard); @@ -368,7 +370,8 @@ bool checkShardVersion(OperationContext* txn, << ", current chunk manager iteration is " << officialSequenceNumber; BSONObj result; - if (setShardVersion(*conn, + if (setShardVersion(txn, + *conn, ns, grid.shardRegistry()->getConfigServerConnectionString(), version, diff --git a/src/mongo/s/write_ops/batch_write_exec.cpp b/src/mongo/s/write_ops/batch_write_exec.cpp index 9891b182e29..87d5dfe779e 100644 --- a/src/mongo/s/write_ops/batch_write_exec.cpp +++ b/src/mongo/s/write_ops/batch_write_exec.cpp @@ -180,7 +180,7 @@ void BatchWriteExec::executeBatch(OperationContext* txn, // Figure out what host we need to dispatch our targeted batch ConnectionString shardHost; Status resolveStatus = - _resolver->chooseWriteHost(nextBatch->getEndpoint().shardName, &shardHost); + _resolver->chooseWriteHost(txn, nextBatch->getEndpoint().shardName, &shardHost); if (!resolveStatus.isOK()) { ++_stats->numResolveErrors; diff --git a/src/mongo/s/write_ops/batch_write_exec_test.cpp b/src/mongo/s/write_ops/batch_write_exec_test.cpp index 65658935818..19c851d1ee0 100644 --- a/src/mongo/s/write_ops/batch_write_exec_test.cpp +++ b/src/mongo/s/write_ops/batch_write_exec_test.cpp @@ -52,7 +52,7 @@ namespace { */ class MockSingleShardBackend { public: - MockSingleShardBackend(const NamespaceString& nss) { + MockSingleShardBackend(OperationContext* txn, const NamespaceString& nss) { // Initialize targeting to a mock shard ShardEndpoint endpoint("shard", ChunkVersion::IGNORED()); vector<MockRange*> mockRanges; @@ -61,7 +61,7 @@ public: targeter.init(mockRanges); // Get the connection string for the mock shard - resolver.chooseWriteHost(mockRanges.front()->endpoint.shardName, &shardHost); + resolver.chooseWriteHost(txn, mockRanges.front()->endpoint.shardName, &shardHost); // Executor using the mock backend exec.reset(new BatchWriteExec(&targeter, &resolver, &dispatcher)); @@ -92,7 +92,7 @@ TEST(BatchWriteExecTests, SingleOp) { OperationContextNoop txn; NamespaceString nss("foo.bar"); - MockSingleShardBackend backend(nss); + MockSingleShardBackend backend(&txn, nss); BatchedCommandRequest request(BatchedCommandRequest::BatchType_Insert); request.setNS(nss); @@ -117,7 +117,7 @@ TEST(BatchWriteExecTests, SingleOpError) { OperationContextNoop txn; NamespaceString nss("foo.bar"); - MockSingleShardBackend backend(nss); + MockSingleShardBackend backend(&txn, nss); vector<MockWriteResult*> mockResults; BatchedCommandResponse errResponse; @@ -168,7 +168,7 @@ TEST(BatchWriteExecTests, StaleOp) { // Do single-target, single doc batch write op request.getInsertRequest()->addToDocuments(BSON("x" << 1)); - MockSingleShardBackend backend(nss); + MockSingleShardBackend backend(&txn, nss); vector<MockWriteResult*> mockResults; WriteErrorDetail error; @@ -203,7 +203,7 @@ TEST(BatchWriteExecTests, MultiStaleOp) { // Do single-target, single doc batch write op request.getInsertRequest()->addToDocuments(BSON("x" << 1)); - MockSingleShardBackend backend(nss); + MockSingleShardBackend backend(&txn, nss); vector<MockWriteResult*> mockResults; WriteErrorDetail error; @@ -243,7 +243,7 @@ TEST(BatchWriteExecTests, TooManyStaleOp) { request.getInsertRequest()->addToDocuments(BSON("x" << 1)); request.getInsertRequest()->addToDocuments(BSON("x" << 2)); - MockSingleShardBackend backend(nss); + MockSingleShardBackend backend(&txn, nss); vector<MockWriteResult*> mockResults; WriteErrorDetail error; @@ -282,7 +282,7 @@ TEST(BatchWriteExecTests, ManyStaleOpWithMigration) { // Do single-target, single doc batch write op request.getInsertRequest()->addToDocuments(BSON("x" << 1)); - MockSingleShardBackend backend(nss); + MockSingleShardBackend backend(&txn, nss); vector<MockWriteResult*> mockResults; WriteErrorDetail error; |