diff options
author | Randolph Tan <randolph@10gen.com> | 2015-08-10 16:52:11 -0400 |
---|---|---|
committer | Randolph Tan <randolph@10gen.com> | 2015-08-14 17:47:59 -0400 |
commit | 2a0c88b48eb28231b6bfe81b6b717ceafdece0a6 (patch) | |
tree | b92a020833df8dfcfaeb455718817bbe1ed37779 /src/mongo | |
parent | 0bb07f47f2fff10d8d4eaacfacb90307af6ff8d0 (diff) | |
download | mongo-2a0c88b48eb28231b6bfe81b6b717ceafdece0a6.tar.gz |
SERVER-19736 Add opTime invariant checks when updating cached config server data
Diffstat (limited to 'src/mongo')
20 files changed, 350 insertions, 133 deletions
diff --git a/src/mongo/db/s/metadata_loader.cpp b/src/mongo/db/s/metadata_loader.cpp index f6b72823cf2..52c25854810 100644 --- a/src/mongo/db/s/metadata_loader.cpp +++ b/src/mongo/db/s/metadata_loader.cpp @@ -114,7 +114,7 @@ Status MetadataLoader::_initCollection(CatalogManager* catalogManager, return coll.getStatus(); } - CollectionType collInfo = coll.getValue(); + const auto& collInfo = coll.getValue().value; if (collInfo.getDropped()) { return Status(ErrorCodes::NamespaceNotFound, str::stream() << "could not load metadata, collection " << ns @@ -174,8 +174,8 @@ Status MetadataLoader::initChunks(CatalogManager* catalogManager, try { std::vector<ChunkType> chunks; const auto diffQuery = differ.configDiffQuery(); - Status status = - catalogManager->getChunks(diffQuery.query, diffQuery.sort, boost::none, &chunks); + Status status = catalogManager->getChunks( + diffQuery.query, diffQuery.sort, boost::none, &chunks, nullptr); if (!status.isOK()) { if (status == ErrorCodes::HostUnreachable) { // Make our metadata invalid diff --git a/src/mongo/s/balance.cpp b/src/mongo/s/balance.cpp index fdd575c196c..f8d0ea5e135 100644 --- a/src/mongo/s/balance.cpp +++ b/src/mongo/s/balance.cpp @@ -289,7 +289,7 @@ void Balancer::_doBalanceRound(OperationContext* txn, invariant(candidateChunks); vector<CollectionType> collections; - Status collsStatus = grid.catalogManager(txn)->getCollections(nullptr, &collections); + Status collsStatus = grid.catalogManager(txn)->getCollections(nullptr, &collections, nullptr); if (!collsStatus.isOK()) { warning() << "Failed to retrieve the set of collections during balancing round " << collsStatus; @@ -334,7 +334,8 @@ void Balancer::_doBalanceRound(OperationContext* txn, grid.catalogManager(txn)->getChunks(BSON(ChunkType::ns(nss.ns())), BSON(ChunkType::min() << 1), boost::none, // all chunks - &allNsChunks); + &allNsChunks, + nullptr); set<BSONObj> allChunkMinimums; map<string, vector<ChunkType>> shardToChunksMap; diff --git a/src/mongo/s/catalog/catalog_cache.cpp b/src/mongo/s/catalog/catalog_cache.cpp index 9c0d0440c8a..cf062978991 100644 --- a/src/mongo/s/catalog/catalog_cache.cpp +++ b/src/mongo/s/catalog/catalog_cache.cpp @@ -55,12 +55,14 @@ StatusWith<shared_ptr<DBConfig>> CatalogCache::getDatabase(OperationContext* txn } // Need to load from the store - StatusWith<DatabaseType> status = grid.catalogManager(txn)->getDatabase(dbName); + auto status = grid.catalogManager(txn)->getDatabase(dbName); if (!status.isOK()) { return status.getStatus(); } - shared_ptr<DBConfig> db = std::make_shared<DBConfig>(dbName, status.getValue()); + const auto dbOpTimePair = status.getValue(); + shared_ptr<DBConfig> db = + std::make_shared<DBConfig>(dbName, dbOpTimePair.value, dbOpTimePair.opTime); db->load(txn); invariant(_databases.insert(std::make_pair(dbName, db)).second); diff --git a/src/mongo/s/catalog/catalog_manager.cpp b/src/mongo/s/catalog/catalog_manager.cpp index f8b79379eb4..9f57a0d91a1 100644 --- a/src/mongo/s/catalog/catalog_manager.cpp +++ b/src/mongo/s/catalog/catalog_manager.cpp @@ -342,13 +342,14 @@ StatusWith<string> CatalogManager::addShard(OperationContext* txn, // Check that none of the existing shard candidate's dbs exist already for (const string& dbName : dbNamesStatus.getValue()) { - StatusWith<DatabaseType> dbt = getDatabase(dbName); + auto dbt = getDatabase(dbName); if (dbt.isOK()) { + const auto& dbDoc = dbt.getValue().value; return Status(ErrorCodes::OperationFailed, str::stream() << "can't add shard " << "'" << shardConnectionString.toString() << "'" << " because a local database '" << dbName - << "' exists in another " << dbt.getValue().getPrimary()); + << "' exists in another " << dbDoc.getPrimary()); } else if (dbt != ErrorCodes::DatabaseNotFound) { return dbt.getStatus(); } diff --git a/src/mongo/s/catalog/catalog_manager.h b/src/mongo/s/catalog/catalog_manager.h index cf5080c62e1..561222c815c 100644 --- a/src/mongo/s/catalog/catalog_manager.h +++ b/src/mongo/s/catalog/catalog_manager.h @@ -35,6 +35,7 @@ #include "mongo/base/disallow_copying.h" #include "mongo/s/client/shard.h" +#include "mongo/s/optime_pair.h" #include "mongo/stdx/memory.h" namespace mongo { @@ -168,11 +169,12 @@ public: * * @param dbName name of the database (case sensitive) * - * Returns Status::OK along with the database information or any error code indicating the - * failure. These are some of the known failures: + * Returns Status::OK along with the database information and the OpTime of the config server + * which the database information was based upon. Otherwise, returns an error code indicating + * the failure. These are some of the known failures: * - DatabaseNotFound - database does not exist */ - virtual StatusWith<DatabaseType> getDatabase(const std::string& dbName) = 0; + virtual StatusWith<OpTimePair<DatabaseType>> getDatabase(const std::string& dbName) = 0; /** * Updates or creates the metadata for a given collection. @@ -184,11 +186,12 @@ public: * * @param collectionNs fully qualified name of the collection (case sensitive) * - * Returns Status::OK along with the collection information or any error code indicating + * Returns Status::OK along with the collection information and the OpTime of the config server + * which the collection information was based upon. Otherwise, returns an error code indicating * the failure. These are some of the known failures: * - NamespaceNotFound - collection does not exist */ - virtual StatusWith<CollectionType> getCollection(const std::string& collNs) = 0; + virtual StatusWith<OpTimePair<CollectionType>> getCollection(const std::string& collNs) = 0; /** * Retrieves all collections undera specified database (or in the system). @@ -196,11 +199,15 @@ public: * @param dbName an optional database name. Must be nullptr or non-empty. If nullptr is * specified, all collections on the system are returned. * @param collections variable to receive the set of collections. + * @param optime an out parameter that will contain the opTime of the config server. + * Can be null. Note that collections can be fetched in multiple batches and each batch + * can have a unique opTime. This opTime will be the one from the last batch. * * Returns a !OK status if an error occurs. */ virtual Status getCollections(const std::string* dbName, - std::vector<CollectionType>* collections) = 0; + std::vector<CollectionType>* collections, + repl::OpTime* optime) = 0; /** * Drops the specified collection from the collection metadata store. @@ -226,13 +233,17 @@ public: * @param sort Fields to use for sorting the results. Pass empty BSON object for no sort. * @param limit The number of chunk entries to return. Pass boost::none for no limit. * @param chunks Vector entry to receive the results + * @param optime an out parameter that will contain the opTime of the config server. + * Can be null. Note that chunks can be fetched in multiple batches and each batch + * can have a unique opTime. This opTime will be the one from the last batch. * * Returns a !OK status if an error occurs. */ virtual Status getChunks(const BSONObj& filter, const BSONObj& sort, boost::optional<int> limit, - std::vector<ChunkType>* chunks) = 0; + std::vector<ChunkType>* chunks, + repl::OpTime* opTime) = 0; /** * Retrieves all tags for the specified collection. diff --git a/src/mongo/s/catalog/catalog_manager_mock.cpp b/src/mongo/s/catalog/catalog_manager_mock.cpp index 235f23ae149..ba866ab088e 100644 --- a/src/mongo/s/catalog/catalog_manager_mock.cpp +++ b/src/mongo/s/catalog/catalog_manager_mock.cpp @@ -76,20 +76,21 @@ Status CatalogManagerMock::updateDatabase(const string& dbName, const DatabaseTy return Status::OK(); } -StatusWith<DatabaseType> CatalogManagerMock::getDatabase(const string& dbName) { - return DatabaseType(); +StatusWith<OpTimePair<DatabaseType>> CatalogManagerMock::getDatabase(const string& dbName) { + return OpTimePair<DatabaseType>(); } Status CatalogManagerMock::updateCollection(const string& collNs, const CollectionType& coll) { return Status::OK(); } -StatusWith<CollectionType> CatalogManagerMock::getCollection(const string& collNs) { - return CollectionType(); +StatusWith<OpTimePair<CollectionType>> CatalogManagerMock::getCollection(const string& collNs) { + return OpTimePair<CollectionType>(); } Status CatalogManagerMock::getCollections(const string* dbName, - vector<CollectionType>* collections) { + vector<CollectionType>* collections, + repl::OpTime* optime) { return Status::OK(); } @@ -104,7 +105,8 @@ Status CatalogManagerMock::getDatabasesForShard(const string& shardName, vector< Status CatalogManagerMock::getChunks(const BSONObj& filter, const BSONObj& sort, boost::optional<int> limit, - std::vector<ChunkType>* chunks) { + std::vector<ChunkType>* chunks, + repl::OpTime* opTime) { return Status::OK(); } diff --git a/src/mongo/s/catalog/catalog_manager_mock.h b/src/mongo/s/catalog/catalog_manager_mock.h index a8526d221e7..b2f0c505071 100644 --- a/src/mongo/s/catalog/catalog_manager_mock.h +++ b/src/mongo/s/catalog/catalog_manager_mock.h @@ -62,14 +62,15 @@ public: Status updateDatabase(const std::string& dbName, const DatabaseType& db) override; - StatusWith<DatabaseType> getDatabase(const std::string& dbName) override; + StatusWith<OpTimePair<DatabaseType>> getDatabase(const std::string& dbName) override; Status updateCollection(const std::string& collNs, const CollectionType& coll) override; - StatusWith<CollectionType> getCollection(const std::string& collNs) override; + StatusWith<OpTimePair<CollectionType>> getCollection(const std::string& collNs) override; Status getCollections(const std::string* dbName, - std::vector<CollectionType>* collections) override; + std::vector<CollectionType>* collections, + repl::OpTime* optime) override; Status dropCollection(OperationContext* txn, const NamespaceString& ns) override; @@ -79,7 +80,8 @@ public: Status getChunks(const BSONObj& filter, const BSONObj& sort, boost::optional<int> limit, - std::vector<ChunkType>* chunks) override; + std::vector<ChunkType>* chunks, + repl::OpTime* opTime) override; Status getTagsForCollection(const std::string& collectionNs, std::vector<TagsType>* tags) override; diff --git a/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp b/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp index 73d38b1e610..35881378374 100644 --- a/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp +++ b/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp @@ -280,13 +280,12 @@ Status CatalogManagerLegacy::shardCollection(OperationContext* txn, return scopedDistLock.getStatus(); } - StatusWith<DatabaseType> status = getDatabase(nsToDatabase(ns)); + auto status = getDatabase(nsToDatabase(ns)); if (!status.isOK()) { return status.getStatus(); } - DatabaseType dbt = status.getValue(); - ShardId dbPrimaryShardId = dbt.getPrimary(); + ShardId dbPrimaryShardId = status.getValue().value.getPrimary(); // This is an extra safety check that the collection is not getting sharded concurrently by // two different mongos instances. It is not 100%-proof, but it reduces the chance that two @@ -449,7 +448,7 @@ StatusWith<ShardDrainingStatus> CatalogManagerLegacy::removeShard(OperationConte return ShardDrainingStatus::ONGOING; } -StatusWith<DatabaseType> CatalogManagerLegacy::getDatabase(const std::string& dbName) { +StatusWith<OpTimePair<DatabaseType>> CatalogManagerLegacy::getDatabase(const std::string& dbName) { invariant(nsIsDbOnly(dbName)); // The two databases that are hosted on the config server are config and admin @@ -459,7 +458,7 @@ StatusWith<DatabaseType> CatalogManagerLegacy::getDatabase(const std::string& db dbt.setSharded(false); dbt.setPrimary("config"); - return dbt; + return OpTimePair<DatabaseType>(dbt); } ScopedDbConnection conn(_configServerConnectionString, 30.0); @@ -471,10 +470,18 @@ StatusWith<DatabaseType> CatalogManagerLegacy::getDatabase(const std::string& db } conn.done(); - return DatabaseType::fromBSON(dbObj); + + auto parseStatus = DatabaseType::fromBSON(dbObj); + + if (!parseStatus.isOK()) { + return parseStatus.getStatus(); + } + + return OpTimePair<DatabaseType>(parseStatus.getValue()); } -StatusWith<CollectionType> CatalogManagerLegacy::getCollection(const std::string& collNs) { +StatusWith<OpTimePair<CollectionType>> CatalogManagerLegacy::getCollection( + const std::string& collNs) { ScopedDbConnection conn(_configServerConnectionString, 30.0); BSONObj collObj = conn->findOne(CollectionType::ConfigNS, BSON(CollectionType::fullNs(collNs))); @@ -485,11 +492,19 @@ StatusWith<CollectionType> CatalogManagerLegacy::getCollection(const std::string } conn.done(); - return CollectionType::fromBSON(collObj); + + auto parseStatus = CollectionType::fromBSON(collObj); + + if (!parseStatus.isOK()) { + return parseStatus.getStatus(); + } + + return OpTimePair<CollectionType>(parseStatus.getValue()); } Status CatalogManagerLegacy::getCollections(const std::string* dbName, - std::vector<CollectionType>* collections) { + std::vector<CollectionType>* collections, + repl::OpTime* optime) { BSONObjBuilder b; if (dbName) { invariant(!dbName->empty()); @@ -792,7 +807,8 @@ Status CatalogManagerLegacy::getDatabasesForShard(const string& shardName, vecto Status CatalogManagerLegacy::getChunks(const BSONObj& query, const BSONObj& sort, boost::optional<int> limit, - vector<ChunkType>* chunks) { + vector<ChunkType>* chunks, + repl::OpTime* opTime) { chunks->clear(); try { diff --git a/src/mongo/s/catalog/legacy/catalog_manager_legacy.h b/src/mongo/s/catalog/legacy/catalog_manager_legacy.h index 391adf4ec40..bf9e92bc7d5 100644 --- a/src/mongo/s/catalog/legacy/catalog_manager_legacy.h +++ b/src/mongo/s/catalog/legacy/catalog_manager_legacy.h @@ -65,11 +65,13 @@ public: StatusWith<ShardDrainingStatus> removeShard(OperationContext* txn, const std::string& name) override; - StatusWith<DatabaseType> getDatabase(const std::string& dbName) override; + StatusWith<OpTimePair<DatabaseType>> getDatabase(const std::string& dbName) override; - StatusWith<CollectionType> getCollection(const std::string& collNs) override; + StatusWith<OpTimePair<CollectionType>> getCollection(const std::string& collNs) override; - Status getCollections(const std::string* dbName, std::vector<CollectionType>* collections); + Status getCollections(const std::string* dbName, + std::vector<CollectionType>* collections, + repl::OpTime* optime); Status dropCollection(OperationContext* txn, const NamespaceString& ns) override; @@ -79,7 +81,8 @@ public: Status getChunks(const BSONObj& query, const BSONObj& sort, boost::optional<int> limit, - std::vector<ChunkType>* chunks) override; + std::vector<ChunkType>* chunks, + repl::OpTime* opTime) override; Status getTagsForCollection(const std::string& collectionNs, std::vector<TagsType>* tags) override; diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp index dcc3e2d68e3..449d992d091 100644 --- a/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp +++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp @@ -45,6 +45,7 @@ #include "mongo/db/commands.h" #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" +#include "mongo/db/repl/optime.h" #include "mongo/db/repl/read_concern_args.h" #include "mongo/executor/network_interface.h" #include "mongo/rpc/get_status_from_command_result.h" @@ -77,6 +78,7 @@ namespace mongo { +using repl::OpTime; using std::set; using std::shared_ptr; using std::string; @@ -139,13 +141,12 @@ Status CatalogManagerReplicaSet::shardCollection(OperationContext* txn, return scopedDistLock.getStatus(); } - StatusWith<DatabaseType> status = getDatabase(nsToDatabase(ns)); + auto status = getDatabase(nsToDatabase(ns)); if (!status.isOK()) { return status.getStatus(); } - DatabaseType dbt = status.getValue(); - ShardId dbPrimaryShardId = dbt.getPrimary(); + ShardId dbPrimaryShardId = status.getValue().value.getPrimary(); const auto primaryShard = grid.shardRegistry()->getShard(dbPrimaryShardId); { @@ -333,7 +334,8 @@ StatusWith<ShardDrainingStatus> CatalogManagerReplicaSet::removeShard(OperationC return ShardDrainingStatus::COMPLETED; } -StatusWith<DatabaseType> CatalogManagerReplicaSet::getDatabase(const std::string& dbName) { +StatusWith<OpTimePair<DatabaseType>> CatalogManagerReplicaSet::getDatabase( + const std::string& dbName) { invariant(nsIsDbOnly(dbName)); // The two databases that are hosted on the config server are config and admin @@ -343,7 +345,7 @@ StatusWith<DatabaseType> CatalogManagerReplicaSet::getDatabase(const std::string dbt.setSharded(false); dbt.setPrimary("config"); - return dbt; + return OpTimePair<DatabaseType>(dbt); } const auto configShard = grid.shardRegistry()->getShard("config"); @@ -361,17 +363,23 @@ StatusWith<DatabaseType> CatalogManagerReplicaSet::getDatabase(const std::string return findStatus.getStatus(); } - const auto& docs = findStatus.getValue(); - if (docs.empty()) { + const auto& docsWithOpTime = findStatus.getValue(); + if (docsWithOpTime.value.empty()) { return {ErrorCodes::DatabaseNotFound, stream() << "database " << dbName << " not found"}; } - invariant(docs.size() == 1); + invariant(docsWithOpTime.value.size() == 1); + + auto parseStatus = DatabaseType::fromBSON(docsWithOpTime.value.front()); + if (!parseStatus.isOK()) { + return parseStatus.getStatus(); + } - return DatabaseType::fromBSON(docs.front()); + return OpTimePair<DatabaseType>(parseStatus.getValue(), docsWithOpTime.opTime); } -StatusWith<CollectionType> CatalogManagerReplicaSet::getCollection(const std::string& collNs) { +StatusWith<OpTimePair<CollectionType>> CatalogManagerReplicaSet::getCollection( + const std::string& collNs) { auto configShard = grid.shardRegistry()->getShard("config"); auto readHostStatus = configShard->getTargeter()->findHost(kConfigReadSelector); @@ -388,7 +396,8 @@ StatusWith<CollectionType> CatalogManagerReplicaSet::getCollection(const std::st return statusFind.getStatus(); } - const auto& retVal = statusFind.getValue(); + const auto& retOpTimePair = statusFind.getValue(); + const auto& retVal = retOpTimePair.value; if (retVal.empty()) { return Status(ErrorCodes::NamespaceNotFound, stream() << "collection " << collNs << " not found"); @@ -396,11 +405,17 @@ StatusWith<CollectionType> CatalogManagerReplicaSet::getCollection(const std::st invariant(retVal.size() == 1); - return CollectionType::fromBSON(retVal.front()); + auto parseStatus = CollectionType::fromBSON(retVal.front()); + if (!parseStatus.isOK()) { + return parseStatus.getStatus(); + } + + return OpTimePair<CollectionType>(parseStatus.getValue(), retOpTimePair.opTime); } Status CatalogManagerReplicaSet::getCollections(const std::string* dbName, - std::vector<CollectionType>* collections) { + std::vector<CollectionType>* collections, + OpTime* opTime) { BSONObjBuilder b; if (dbName) { invariant(!dbName->empty()); @@ -423,7 +438,9 @@ Status CatalogManagerReplicaSet::getCollections(const std::string* dbName, return findStatus.getStatus(); } - for (const BSONObj& obj : findStatus.getValue()) { + const auto& docsOpTimePair = findStatus.getValue(); + + for (const BSONObj& obj : docsOpTimePair.value) { const auto collectionResult = CollectionType::fromBSON(obj); if (!collectionResult.isOK()) { collections->clear(); @@ -436,6 +453,10 @@ Status CatalogManagerReplicaSet::getCollections(const std::string* dbName, collections->push_back(collectionResult.getValue()); } + if (opTime) { + *opTime = docsOpTimePair.opTime; + } + return Status::OK(); } @@ -645,7 +666,7 @@ StatusWith<SettingsType> CatalogManagerReplicaSet::getGlobalSettings(const strin return findStatus.getStatus(); } - const auto& docs = findStatus.getValue(); + const auto& docs = findStatus.getValue().value; if (docs.empty()) { return {ErrorCodes::NoMatchingDocument, str::stream() << "can't find settings document with key: " << key}; @@ -686,7 +707,7 @@ Status CatalogManagerReplicaSet::getDatabasesForShard(const string& shardName, return findStatus.getStatus(); } - for (const BSONObj& obj : findStatus.getValue()) { + for (const BSONObj& obj : findStatus.getValue().value) { string dbName; Status status = bsonExtractStringField(obj, DatabaseType::name(), &dbName); if (!status.isOK()) { @@ -703,7 +724,8 @@ Status CatalogManagerReplicaSet::getDatabasesForShard(const string& shardName, Status CatalogManagerReplicaSet::getChunks(const BSONObj& query, const BSONObj& sort, boost::optional<int> limit, - vector<ChunkType>* chunks) { + vector<ChunkType>* chunks, + OpTime* opTime) { chunks->clear(); auto configShard = grid.shardRegistry()->getShard("config"); @@ -720,7 +742,8 @@ Status CatalogManagerReplicaSet::getChunks(const BSONObj& query, return findStatus.getStatus(); } - for (const BSONObj& obj : findStatus.getValue()) { + const auto chunkDocsOpTimePair = findStatus.getValue(); + for (const BSONObj& obj : chunkDocsOpTimePair.value) { auto chunkRes = ChunkType::fromBSON(obj); if (!chunkRes.isOK()) { chunks->clear(); @@ -733,6 +756,10 @@ Status CatalogManagerReplicaSet::getChunks(const BSONObj& query, chunks->push_back(chunkRes.getValue()); } + if (opTime) { + *opTime = chunkDocsOpTimePair.opTime; + } + return Status::OK(); } @@ -754,7 +781,7 @@ Status CatalogManagerReplicaSet::getTagsForCollection(const std::string& collect if (!findStatus.isOK()) { return findStatus.getStatus(); } - for (const BSONObj& obj : findStatus.getValue()) { + for (const BSONObj& obj : findStatus.getValue().value) { auto tagRes = TagsType::fromBSON(obj); if (!tagRes.isOK()) { tags->clear(); @@ -786,7 +813,7 @@ StatusWith<string> CatalogManagerReplicaSet::getTagForChunk(const std::string& c return findStatus.getStatus(); } - const auto& docs = findStatus.getValue(); + const auto& docs = findStatus.getValue().value; if (docs.empty()) { return string{}; } @@ -819,7 +846,7 @@ Status CatalogManagerReplicaSet::getAllShards(vector<ShardType>* shards) { return findStatus.getStatus(); } - for (const BSONObj& doc : findStatus.getValue()) { + for (const BSONObj& doc : findStatus.getValue().value) { auto shardRes = ShardType::fromBSON(doc); if (!shardRes.isOK()) { shards->clear(); @@ -932,7 +959,7 @@ Status CatalogManagerReplicaSet::_checkDbDoesNotExist(const string& dbName, Data return findStatus.getStatus(); } - const auto& docs = findStatus.getValue(); + const auto& docs = findStatus.getValue().value; if (docs.empty()) { return Status::OK(); } @@ -977,7 +1004,7 @@ StatusWith<std::string> CatalogManagerReplicaSet::_generateNewShardName() { return findStatus.getStatus(); } - const auto& docs = findStatus.getValue(); + const auto& docs = findStatus.getValue().value; int count = 0; if (!docs.empty()) { @@ -1097,7 +1124,7 @@ StatusWith<VersionType> CatalogManagerReplicaSet::_getConfigVersion() { return findStatus.getStatus(); } - auto queryResults = findStatus.getValue(); + auto queryResults = findStatus.getValue().value; if (queryResults.size() > 1) { return {ErrorCodes::RemoteValidationError, @@ -1178,7 +1205,7 @@ StatusWith<BSONObj> CatalogManagerReplicaSet::_runCommandOnConfigWithNotMasterRe return response.response; } -StatusWith<std::vector<BSONObj>> CatalogManagerReplicaSet::_exhaustiveFindOnConfig( +StatusWith<OpTimePair<vector<BSONObj>>> CatalogManagerReplicaSet::_exhaustiveFindOnConfig( const HostAndPort& host, const NamespaceString& nss, const BSONObj& query, @@ -1198,15 +1225,15 @@ StatusWith<std::vector<BSONObj>> CatalogManagerReplicaSet::_exhaustiveFindOnConf _updateLastSeenConfigOpTime(response.opTime); - return std::move(response.docs); + return OpTimePair<vector<BSONObj>>(std::move(response.docs), response.opTime); } -repl::OpTime CatalogManagerReplicaSet::_getConfigOpTime() { +OpTime CatalogManagerReplicaSet::_getConfigOpTime() { stdx::lock_guard<stdx::mutex> lk(_mutex); return _configOpTime; } -void CatalogManagerReplicaSet::_updateLastSeenConfigOpTime(const repl::OpTime& optime) { +void CatalogManagerReplicaSet::_updateLastSeenConfigOpTime(const OpTime& optime) { stdx::lock_guard<stdx::mutex> lk(_mutex); if (_configOpTime < optime) { diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set.h b/src/mongo/s/catalog/replset/catalog_manager_replica_set.h index 70b28c98374..d5681ad7940 100644 --- a/src/mongo/s/catalog/replset/catalog_manager_replica_set.h +++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set.h @@ -62,12 +62,13 @@ public: StatusWith<ShardDrainingStatus> removeShard(OperationContext* txn, const std::string& name) override; - StatusWith<DatabaseType> getDatabase(const std::string& dbName) override; + StatusWith<OpTimePair<DatabaseType>> getDatabase(const std::string& dbName) override; - StatusWith<CollectionType> getCollection(const std::string& collNs) override; + StatusWith<OpTimePair<CollectionType>> getCollection(const std::string& collNs) override; Status getCollections(const std::string* dbName, - std::vector<CollectionType>* collections) override; + std::vector<CollectionType>* collections, + repl::OpTime* optime) override; Status dropCollection(OperationContext* txn, const NamespaceString& ns) override; @@ -77,7 +78,8 @@ public: Status getChunks(const BSONObj& query, const BSONObj& sort, boost::optional<int> limit, - std::vector<ChunkType>* chunks) override; + std::vector<ChunkType>* chunks, + repl::OpTime* opTime) override; Status getTagsForCollection(const std::string& collectionNs, std::vector<TagsType>* tags) override; @@ -144,11 +146,12 @@ private: StatusWith<BSONObj> _runCommandOnConfigWithNotMasterRetries(const std::string& dbName, BSONObj cmdObj); - StatusWith<std::vector<BSONObj>> _exhaustiveFindOnConfig(const HostAndPort& host, - const NamespaceString& nss, - const BSONObj& query, - const BSONObj& sort, - boost::optional<long long> limit); + StatusWith<OpTimePair<std::vector<BSONObj>>> _exhaustiveFindOnConfig( + const HostAndPort& host, + const NamespaceString& nss, + const BSONObj& query, + const BSONObj& sort, + boost::optional<long long> limit); /** * Appends a read committed read concern to the request object. diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set_test.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set_test.cpp index 1dc3b907536..c02fb31cd34 100644 --- a/src/mongo/s/catalog/replset/catalog_manager_replica_set_test.cpp +++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set_test.cpp @@ -66,6 +66,7 @@ using executor::RemoteCommandRequest; using executor::RemoteCommandResponse; using executor::TaskExecutor; using rpc::ReplSetMetadata; +using repl::OpTime; using std::string; using std::vector; using stdx::chrono::milliseconds; @@ -82,11 +83,15 @@ TEST_F(CatalogManagerReplSetTest, GetCollectionExisting) { expectedColl.setUpdatedAt(Date_t()); expectedColl.setEpoch(OID::gen()); + const OpTime newOpTime(Timestamp(7, 6), 5); + auto future = launchAsync([this, &expectedColl] { return assertGet(catalogManager()->getCollection(expectedColl.getNs().ns())); }); - onFindCommand([this, &expectedColl](const RemoteCommandRequest& request) { + onFindWithMetadataCommand([this, &expectedColl, newOpTime]( + const RemoteCommandRequest& request) { + ASSERT_EQUALS(BSON(rpc::kReplSetMetadataFieldName << 1), request.metadata); const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String()); @@ -102,12 +107,17 @@ TEST_F(CatalogManagerReplSetTest, GetCollectionExisting) { checkReadConcern(request.cmdObj, Timestamp(0, 0), 0); - return vector<BSONObj>{expectedColl.toBSON()}; + ReplSetMetadata metadata(10, newOpTime, OpTime(), 100, 30); + BSONObjBuilder builder; + metadata.writeToMetadata(&builder); + + return std::make_tuple(vector<BSONObj>{expectedColl.toBSON()}, builder.obj()); }); // Now wait for the getCollection call to return - const auto& actualColl = future.timed_get(kFutureTimeout); - ASSERT_EQ(expectedColl.toBSON(), actualColl.toBSON()); + const auto collOpTimePair = future.timed_get(kFutureTimeout); + ASSERT_EQ(newOpTime, collOpTimePair.opTime); + ASSERT_EQ(expectedColl.toBSON(), collOpTimePair.value.toBSON()); } TEST_F(CatalogManagerReplSetTest, GetCollectionNotExisting) { @@ -132,11 +142,13 @@ TEST_F(CatalogManagerReplSetTest, GetDatabaseExisting) { expectedDb.setPrimary("shard0000"); expectedDb.setSharded(true); + const OpTime newOpTime(Timestamp(7, 6), 5); + auto future = launchAsync([this, &expectedDb] { return assertGet(catalogManager()->getDatabase(expectedDb.getName())); }); - onFindCommand([this, &expectedDb](const RemoteCommandRequest& request) { + onFindWithMetadataCommand([this, &expectedDb, newOpTime](const RemoteCommandRequest& request) { const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String()); ASSERT_EQ(nss.ns(), DatabaseType::ConfigNS); @@ -151,11 +163,16 @@ TEST_F(CatalogManagerReplSetTest, GetDatabaseExisting) { checkReadConcern(request.cmdObj, Timestamp(0, 0), 0); - return vector<BSONObj>{expectedDb.toBSON()}; + ReplSetMetadata metadata(10, newOpTime, OpTime(), 100, 30); + BSONObjBuilder builder; + metadata.writeToMetadata(&builder); + + return std::make_tuple(vector<BSONObj>{expectedDb.toBSON()}, builder.obj()); }); - const auto& actualDb = future.timed_get(kFutureTimeout); - ASSERT_EQ(expectedDb.toBSON(), actualDb.toBSON()); + const auto dbOpTimePair = future.timed_get(kFutureTimeout); + ASSERT_EQ(newOpTime, dbOpTimePair.opTime); + ASSERT_EQ(expectedDb.toBSON(), dbOpTimePair.value.toBSON()); } TEST_F(CatalogManagerReplSetTest, GetDatabaseNotExisting) { @@ -385,17 +402,22 @@ TEST_F(CatalogManagerReplSetTest, GetChunksForNSWithSortAndLimit) { << ChunkType::DEPRECATED_lastmod() << BSON("$gte" << static_cast<long long>(queryChunkVersion.toLong())))); - auto future = launchAsync([this, &chunksQuery] { + const OpTime newOpTime(Timestamp(7, 6), 5); + + auto future = launchAsync([this, &chunksQuery, newOpTime] { vector<ChunkType> chunks; + OpTime opTime; - ASSERT_OK( - catalogManager()->getChunks(chunksQuery, BSON(ChunkType::version() << -1), 1, &chunks)); + ASSERT_OK(catalogManager()->getChunks( + chunksQuery, BSON(ChunkType::version() << -1), 1, &chunks, &opTime)); ASSERT_EQ(2U, chunks.size()); + ASSERT_EQ(newOpTime, opTime); return chunks; }); - onFindCommand([this, &chunksQuery, chunkA, chunkB](const RemoteCommandRequest& request) { + onFindWithMetadataCommand([this, &chunksQuery, chunkA, chunkB, newOpTime]( + const RemoteCommandRequest& request) { ASSERT_EQUALS(BSON(rpc::kReplSetMetadataFieldName << 1), request.metadata); const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String()); @@ -410,7 +432,11 @@ TEST_F(CatalogManagerReplSetTest, GetChunksForNSWithSortAndLimit) { checkReadConcern(request.cmdObj, Timestamp(0, 0), 0); - return vector<BSONObj>{chunkA.toBSON(), chunkB.toBSON()}; + ReplSetMetadata metadata(10, newOpTime, OpTime(), 100, 30); + BSONObjBuilder builder; + metadata.writeToMetadata(&builder); + + return std::make_tuple(vector<BSONObj>{chunkA.toBSON(), chunkB.toBSON()}, builder.obj()); }); const auto& chunks = future.timed_get(kFutureTimeout); @@ -431,7 +457,8 @@ TEST_F(CatalogManagerReplSetTest, GetChunksForNSNoSortNoLimit) { auto future = launchAsync([this, &chunksQuery] { vector<ChunkType> chunks; - ASSERT_OK(catalogManager()->getChunks(chunksQuery, BSONObj(), boost::none, &chunks)); + ASSERT_OK( + catalogManager()->getChunks(chunksQuery, BSONObj(), boost::none, &chunks, nullptr)); ASSERT_EQ(0U, chunks.size()); return chunks; @@ -470,7 +497,8 @@ TEST_F(CatalogManagerReplSetTest, GetChunksForNSInvalidChunk) { auto future = launchAsync([this, &chunksQuery] { vector<ChunkType> chunks; - Status status = catalogManager()->getChunks(chunksQuery, BSONObj(), boost::none, &chunks); + Status status = + catalogManager()->getChunks(chunksQuery, BSONObj(), boost::none, &chunks, nullptr); ASSERT_EQUALS(ErrorCodes::FailedToParse, status); ASSERT_EQ(0U, chunks.size()); @@ -856,16 +884,22 @@ TEST_F(CatalogManagerReplSetTest, GetCollectionsValidResultsNoDb) { coll3.setKeyPattern(KeyPattern{BSON("_id" << 1)}); ASSERT_OK(coll3.validate()); - auto future = launchAsync([this] { + const OpTime newOpTime(Timestamp(7, 6), 5); + + auto future = launchAsync([this, newOpTime] { vector<CollectionType> collections; - const auto status = catalogManager()->getCollections(nullptr, &collections); + OpTime opTime; + const auto status = catalogManager()->getCollections(nullptr, &collections, &opTime); ASSERT_OK(status); + ASSERT_EQ(newOpTime, opTime); + return collections; }); - onFindCommand([this, coll1, coll2, coll3](const RemoteCommandRequest& request) { + onFindWithMetadataCommand([this, coll1, coll2, coll3, newOpTime]( + const RemoteCommandRequest& request) { ASSERT_EQUALS(BSON(rpc::kReplSetMetadataFieldName << 1), request.metadata); const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String()); @@ -879,7 +913,12 @@ TEST_F(CatalogManagerReplSetTest, GetCollectionsValidResultsNoDb) { checkReadConcern(request.cmdObj, Timestamp(0, 0), 0); - return vector<BSONObj>{coll1.toBSON(), coll2.toBSON(), coll3.toBSON()}; + ReplSetMetadata metadata(10, newOpTime, OpTime(), 100, 30); + BSONObjBuilder builder; + metadata.writeToMetadata(&builder); + + return std::make_tuple(vector<BSONObj>{coll1.toBSON(), coll2.toBSON(), coll3.toBSON()}, + builder.obj()); }); const auto& actualColls = future.timed_get(kFutureTimeout); @@ -910,7 +949,7 @@ TEST_F(CatalogManagerReplSetTest, GetCollectionsValidResultsWithDb) { string dbName = "test"; vector<CollectionType> collections; - const auto status = catalogManager()->getCollections(&dbName, &collections); + const auto status = catalogManager()->getCollections(&dbName, &collections, nullptr); ASSERT_OK(status); return collections; @@ -949,7 +988,7 @@ TEST_F(CatalogManagerReplSetTest, GetCollectionsInvalidCollectionType) { string dbName = "test"; vector<CollectionType> collections; - const auto status = catalogManager()->getCollections(&dbName, &collections); + const auto status = catalogManager()->getCollections(&dbName, &collections, nullptr); ASSERT_EQ(ErrorCodes::FailedToParse, status); ASSERT_EQ(0U, collections.size()); @@ -2088,7 +2127,7 @@ TEST_F(CatalogManagerReplSetTest, EnableShardingNoDBExistsNoShards) { TEST_F(CatalogManagerReplSetTest, BasicReadAfterOpTime) { configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1")); - repl::OpTime lastOpTime; + OpTime lastOpTime; for (int x = 0; x < 3; x++) { auto future = launchAsync([this] { BSONObjBuilder responseBuilder; @@ -2096,7 +2135,7 @@ TEST_F(CatalogManagerReplSetTest, BasicReadAfterOpTime) { catalogManager()->runReadCommand("test", BSON("dummy" << 1), &responseBuilder)); }); - const repl::OpTime newOpTime(Timestamp(x + 2, x + 6), x + 5); + const OpTime newOpTime(Timestamp(x + 2, x + 6), x + 5); onCommandWithMetadata([this, &newOpTime, &lastOpTime](const RemoteCommandRequest& request) { ASSERT_EQUALS("test", request.dbname); @@ -2106,7 +2145,7 @@ TEST_F(CatalogManagerReplSetTest, BasicReadAfterOpTime) { ASSERT_EQ(string("dummy"), request.cmdObj.firstElementFieldName()); checkReadConcern(request.cmdObj, lastOpTime.getTimestamp(), lastOpTime.getTerm()); - ReplSetMetadata metadata(12, newOpTime, repl::OpTime(), 100, 3); + ReplSetMetadata metadata(10, newOpTime, repl::OpTime(), 100, 30); BSONObjBuilder builder; metadata.writeToMetadata(&builder); @@ -2129,8 +2168,8 @@ TEST_F(CatalogManagerReplSetTest, ReadAfterOpTimeShouldNotGoBack) { ASSERT_TRUE(catalogManager()->runReadCommand("test", BSON("dummy" << 1), &responseBuilder)); }); - repl::OpTime highestOpTime; - const repl::OpTime newOpTime(Timestamp(7, 6), 5); + OpTime highestOpTime; + const OpTime newOpTime(Timestamp(7, 6), 5); onCommandWithMetadata([this, &newOpTime, &highestOpTime](const RemoteCommandRequest& request) { ASSERT_EQUALS("test", request.dbname); @@ -2140,7 +2179,7 @@ TEST_F(CatalogManagerReplSetTest, ReadAfterOpTimeShouldNotGoBack) { ASSERT_EQ(string("dummy"), request.cmdObj.firstElementFieldName()); checkReadConcern(request.cmdObj, highestOpTime.getTimestamp(), highestOpTime.getTerm()); - ReplSetMetadata metadata(12, newOpTime, repl::OpTime(), 100, 3); + ReplSetMetadata metadata(10, newOpTime, repl::OpTime(), 100, 30); BSONObjBuilder builder; metadata.writeToMetadata(&builder); @@ -2157,7 +2196,7 @@ TEST_F(CatalogManagerReplSetTest, ReadAfterOpTimeShouldNotGoBack) { ASSERT_TRUE(catalogManager()->runReadCommand("test", BSON("dummy" << 1), &responseBuilder)); }); - const repl::OpTime oldOpTime(Timestamp(3, 10), 5); + const OpTime oldOpTime(Timestamp(3, 10), 5); onCommandWithMetadata([this, &oldOpTime, &highestOpTime](const RemoteCommandRequest& request) { ASSERT_EQUALS("test", request.dbname); @@ -2167,7 +2206,7 @@ TEST_F(CatalogManagerReplSetTest, ReadAfterOpTimeShouldNotGoBack) { ASSERT_EQ(string("dummy"), request.cmdObj.firstElementFieldName()); checkReadConcern(request.cmdObj, highestOpTime.getTimestamp(), highestOpTime.getTerm()); - ReplSetMetadata metadata(12, oldOpTime, repl::OpTime(), 100, 3); + ReplSetMetadata metadata(10, oldOpTime, repl::OpTime(), 100, 30); BSONObjBuilder builder; metadata.writeToMetadata(&builder); @@ -2190,7 +2229,7 @@ TEST_F(CatalogManagerReplSetTest, ReadAfterOpTimeShouldNotGoBack) { ASSERT_EQ(string("dummy"), request.cmdObj.firstElementFieldName()); checkReadConcern(request.cmdObj, highestOpTime.getTimestamp(), highestOpTime.getTerm()); - ReplSetMetadata metadata(12, oldOpTime, repl::OpTime(), 100, 3); + ReplSetMetadata metadata(10, oldOpTime, repl::OpTime(), 100, 30); BSONObjBuilder builder; metadata.writeToMetadata(&builder); @@ -2206,15 +2245,15 @@ TEST_F(CatalogManagerReplSetTest, ReadAfterOpTimeFindThenCmd) { auto future1 = launchAsync( [this] { ASSERT_OK(catalogManager()->getGlobalSettings("chunksize").getStatus()); }); - repl::OpTime highestOpTime; - const repl::OpTime newOpTime(Timestamp(7, 6), 5); + OpTime highestOpTime; + const OpTime newOpTime(Timestamp(7, 6), 5); onFindWithMetadataCommand( [this, &newOpTime, &highestOpTime](const RemoteCommandRequest& request) { ASSERT_EQUALS(BSON(rpc::kReplSetMetadataFieldName << 1), request.metadata); checkReadConcern(request.cmdObj, highestOpTime.getTimestamp(), highestOpTime.getTerm()); - ReplSetMetadata metadata(12, newOpTime, repl::OpTime(), 100, 3); + ReplSetMetadata metadata(10, newOpTime, repl::OpTime(), 100, 30); BSONObjBuilder builder; metadata.writeToMetadata(&builder); @@ -2235,7 +2274,7 @@ TEST_F(CatalogManagerReplSetTest, ReadAfterOpTimeFindThenCmd) { ASSERT_TRUE(catalogManager()->runReadCommand("test", BSON("dummy" << 1), &responseBuilder)); }); - const repl::OpTime oldOpTime(Timestamp(3, 10), 5); + const OpTime oldOpTime(Timestamp(3, 10), 5); onCommand([this, &oldOpTime, &highestOpTime](const RemoteCommandRequest& request) { ASSERT_EQUALS("test", request.dbname); @@ -2260,8 +2299,8 @@ TEST_F(CatalogManagerReplSetTest, ReadAfterOpTimeCmdThenFind) { ASSERT_TRUE(catalogManager()->runReadCommand("test", BSON("dummy" << 1), &responseBuilder)); }); - repl::OpTime highestOpTime; - const repl::OpTime newOpTime(Timestamp(7, 6), 5); + OpTime highestOpTime; + const OpTime newOpTime(Timestamp(7, 6), 5); onCommandWithMetadata([this, &newOpTime, &highestOpTime](const RemoteCommandRequest& request) { ASSERT_EQUALS("test", request.dbname); @@ -2271,7 +2310,7 @@ TEST_F(CatalogManagerReplSetTest, ReadAfterOpTimeCmdThenFind) { ASSERT_EQ(string("dummy"), request.cmdObj.firstElementFieldName()); checkReadConcern(request.cmdObj, highestOpTime.getTimestamp(), highestOpTime.getTerm()); - ReplSetMetadata metadata(12, newOpTime, repl::OpTime(), 100, 3); + ReplSetMetadata metadata(10, newOpTime, repl::OpTime(), 100, 30); BSONObjBuilder builder; metadata.writeToMetadata(&builder); @@ -2286,7 +2325,7 @@ TEST_F(CatalogManagerReplSetTest, ReadAfterOpTimeCmdThenFind) { auto future2 = launchAsync( [this] { ASSERT_OK(catalogManager()->getGlobalSettings("chunksize").getStatus()); }); - const repl::OpTime oldOpTime(Timestamp(3, 10), 5); + const OpTime oldOpTime(Timestamp(3, 10), 5); onFindCommand([this, &oldOpTime, &highestOpTime](const RemoteCommandRequest& request) { ASSERT_EQUALS(BSON(rpc::kReplSetMetadataFieldName << 1), request.metadata); diff --git a/src/mongo/s/chunk.cpp b/src/mongo/s/chunk.cpp index 10ebdc95a20..da6cbc196df 100644 --- a/src/mongo/s/chunk.cpp +++ b/src/mongo/s/chunk.cpp @@ -587,7 +587,7 @@ bool Chunk::splitIfShould(OperationContext* txn, long dataWritten) const { return false; } - shouldBalance = status.getValue().getAllowBalance(); + shouldBalance = status.getValue().value.getAllowBalance(); } log() << "autosplitted " << _manager->getns() << " shard: " << toString() << " into " diff --git a/src/mongo/s/chunk_manager.cpp b/src/mongo/s/chunk_manager.cpp index 638935ddbf5..e98b6259ccf 100644 --- a/src/mongo/s/chunk_manager.cpp +++ b/src/mongo/s/chunk_manager.cpp @@ -261,9 +261,13 @@ bool ChunkManager::_load(OperationContext* txn, // Get the diff query required auto diffQuery = differ.configDiffQuery(); + repl::OpTime opTime; std::vector<ChunkType> chunks; - uassertStatusOK( - grid.catalogManager(txn)->getChunks(diffQuery.query, diffQuery.sort, boost::none, &chunks)); + uassertStatusOK(grid.catalogManager(txn)->getChunks( + diffQuery.query, diffQuery.sort, boost::none, &chunks, &opTime)); + + invariant(opTime >= _configOpTime); + _configOpTime = opTime; int diffsApplied = differ.calculateConfigDiff(chunks); if (diffsApplied > 0) { @@ -281,6 +285,8 @@ bool ChunkManager::_load(OperationContext* txn, } } + _configOpTime = opTime; + return true; } else if (diffsApplied == 0) { // No chunks were found for the ns @@ -292,6 +298,7 @@ bool ChunkManager::_load(OperationContext* txn, shardVersions->clear(); _version = ChunkVersion(0, 0, OID()); + _configOpTime = opTime; return true; } else { // diffsApplied < 0 @@ -811,4 +818,8 @@ int ChunkManager::getCurrentDesiredChunkSize() const { return splitThreshold; } +repl::OpTime ChunkManager::getConfigOpTime() const { + return _configOpTime; +} + } // namespace mongo diff --git a/src/mongo/s/chunk_manager.h b/src/mongo/s/chunk_manager.h index 0ca94237920..c640bf26ed9 100644 --- a/src/mongo/s/chunk_manager.h +++ b/src/mongo/s/chunk_manager.h @@ -32,9 +32,10 @@ #include <string> #include <vector> -#include "mongo/util/concurrency/ticketholder.h" +#include "mongo/db/repl/optime.h" #include "mongo/s/chunk.h" #include "mongo/s/shard_key_pattern.h" +#include "mongo/util/concurrency/ticketholder.h" namespace mongo { @@ -243,6 +244,11 @@ public: std::shared_ptr<ChunkManager> reload(OperationContext* txn, bool force = true) const; // doesn't modify self! + /** + * Returns the opTime of config server the last time chunks were loaded. + */ + repl::OpTime getConfigOpTime() const; + private: // returns true if load was consistent bool _load(OperationContext* txn, @@ -273,6 +279,9 @@ private: // Max version across all chunks ChunkVersion _version; + // OpTime of config server the last time chunks were loaded. + repl::OpTime _configOpTime; + // // Split Heuristic info // diff --git a/src/mongo/s/commands/cluster_remove_shard_cmd.cpp b/src/mongo/s/commands/cluster_remove_shard_cmd.cpp index fc958920407..75af3d2ed9d 100644 --- a/src/mongo/s/commands/cluster_remove_shard_cmd.cpp +++ b/src/mongo/s/commands/cluster_remove_shard_cmd.cpp @@ -135,7 +135,8 @@ public: Status status = catalogManager->getChunks(BSON(ChunkType::shard(s->getId())), BSONObj(), boost::none, // return all - &chunks); + &chunks, + nullptr); if (!status.isOK()) { return appendCommandStatus(result, status); } diff --git a/src/mongo/s/config.cpp b/src/mongo/s/config.cpp index f173bfe05bd..e76f6feeb2b 100644 --- a/src/mongo/s/config.cpp +++ b/src/mongo/s/config.cpp @@ -62,7 +62,10 @@ using std::string; using std::unique_ptr; using std::vector; -CollectionInfo::CollectionInfo(OperationContext* txn, const CollectionType& coll) { +CollectionInfo::CollectionInfo(OperationContext* txn, + const CollectionType& coll, + repl::OpTime opTime) + : _configOpTime(std::move(opTime)) { _dropped = coll.getDropped(); shard(txn, new ChunkManager(coll)); @@ -82,6 +85,8 @@ void CollectionInfo::shard(OperationContext* txn, ChunkManager* manager) { // Do this *first* so we're invisible to everyone else manager->loadExistingRanges(txn, nullptr); + const auto cmOpTime = manager->getConfigOpTime(); + // // Collections with no chunks are unsharded, no matter what the collections entry says // This helps prevent errors when dropping in a different process @@ -134,7 +139,8 @@ void CollectionInfo::save(OperationContext* txn, const string& ns) { _dirty = false; } -DBConfig::DBConfig(std::string name, const DatabaseType& dbt) : _name(name) { +DBConfig::DBConfig(std::string name, const DatabaseType& dbt, repl::OpTime configOpTime) + : _name(name), _configOpTime(std::move(configOpTime)) { invariant(_name == dbt.getName()); _primaryId = dbt.getPrimary(); _shardingEnabled = dbt.getSharded(); @@ -215,11 +221,13 @@ bool DBConfig::removeSharding(OperationContext* txn, const string& ns) { return true; } -// Handles weird logic related to getting *either* a chunk manager *or* the collection primary shard +// Handles weird logic related to getting *either* a chunk manager *or* the collection primary +// shard void DBConfig::getChunkManagerOrPrimary(const string& ns, std::shared_ptr<ChunkManager>& manager, std::shared_ptr<Shard>& primary) { - // The logic here is basically that at any time, our collection can become sharded or unsharded + // The logic here is basically that at any time, our collection can become sharded or + // unsharded // via a command. If we're not sharded, we want to send data to the primary, if sharded, we // want to send data to the correct chunks, and we can't check both w/o the lock. @@ -311,8 +319,12 @@ std::shared_ptr<ChunkManager> DBConfig::getChunkManager(OperationContext* txn, // currently vector<ChunkType> newestChunk; if (oldVersion.isSet() && !forceReload) { - uassertStatusOK(grid.catalogManager(txn)->getChunks( - BSON(ChunkType::ns(ns)), BSON(ChunkType::DEPRECATED_lastmod() << -1), 1, &newestChunk)); + uassertStatusOK( + grid.catalogManager(txn)->getChunks(BSON(ChunkType::ns(ns)), + BSON(ChunkType::DEPRECATED_lastmod() << -1), + 1, + &newestChunk, + nullptr)); if (!newestChunk.empty()) { invariant(newestChunk.size() == 1); @@ -404,6 +416,10 @@ std::shared_ptr<ChunkManager> DBConfig::getChunkManager(OperationContext* txn, // end legacy behavior if (shouldReset) { + const auto cmOpTime = tempChunkManager->getConfigOpTime(); + invariant(cmOpTime >= _configOpTime); + invariant(cmOpTime >= ci.getCM()->getConfigOpTime()); + ci.resetCM(tempChunkManager.release()); } @@ -427,7 +443,7 @@ bool DBConfig::load(OperationContext* txn) { } bool DBConfig::_load(OperationContext* txn) { - StatusWith<DatabaseType> status = grid.catalogManager(txn)->getDatabase(_name); + auto status = grid.catalogManager(txn)->getDatabase(_name); if (status == ErrorCodes::DatabaseNotFound) { return false; } @@ -435,24 +451,38 @@ bool DBConfig::_load(OperationContext* txn) { // All other errors are connectivity, etc so throw an exception. uassertStatusOK(status.getStatus()); - DatabaseType dbt = status.getValue(); + const auto& dbOpTimePair = status.getValue(); + const auto& dbt = dbOpTimePair.value; invariant(_name == dbt.getName()); _primaryId = dbt.getPrimary(); _shardingEnabled = dbt.getSharded(); + invariant(dbOpTimePair.opTime >= _configOpTime); + _configOpTime = dbOpTimePair.opTime; + // Load all collections vector<CollectionType> collections; - uassertStatusOK(grid.catalogManager(txn)->getCollections(&_name, &collections)); + repl::OpTime configOpTimeWhenLoadingColl; + uassertStatusOK(grid.catalogManager(txn) + ->getCollections(&_name, &collections, &configOpTimeWhenLoadingColl)); int numCollsErased = 0; int numCollsSharded = 0; + invariant(configOpTimeWhenLoadingColl >= _configOpTime); + for (const auto& coll : collections) { + auto collIter = _collections.find(coll.getNs().ns()); + if (collIter != _collections.end()) { + invariant(configOpTimeWhenLoadingColl >= collIter->second.getConfigOpTime()); + } + if (coll.getDropped()) { _collections.erase(coll.getNs().ns()); numCollsErased++; } else { - _collections[coll.getNs().ns()] = CollectionInfo(txn, coll); + _collections[coll.getNs().ns()] = + CollectionInfo(txn, coll, configOpTimeWhenLoadingColl); numCollsSharded++; } } diff --git a/src/mongo/s/config.h b/src/mongo/s/config.h index a779c8ee70c..6c5d362f2b3 100644 --- a/src/mongo/s/config.h +++ b/src/mongo/s/config.h @@ -31,6 +31,7 @@ #include <set> #include "mongo/db/jsobj.h" +#include "mongo/db/repl/optime.h" #include "mongo/s/client/shard.h" #include "mongo/util/concurrency/mutex.h" @@ -48,7 +49,7 @@ struct CollectionInfo { _dropped = false; } - CollectionInfo(OperationContext* txn, const CollectionType& in); + CollectionInfo(OperationContext* txn, const CollectionType& in, repl::OpTime); ~CollectionInfo(); bool isSharded() const { @@ -85,12 +86,17 @@ struct CollectionInfo { return _key; } + repl::OpTime getConfigOpTime() const { + return _configOpTime; + } + private: BSONObj _key; bool _unique; std::shared_ptr<ChunkManager> _cm; bool _dirty; bool _dropped; + repl::OpTime _configOpTime; }; /** @@ -98,7 +104,7 @@ private: */ class DBConfig { public: - DBConfig(std::string name, const DatabaseType& dbt); + DBConfig(std::string name, const DatabaseType& dbt, repl::OpTime configOpTime); ~DBConfig(); /** @@ -192,6 +198,9 @@ protected: stdx::mutex _lock; CollectionInfoMap _collections; + // OpTime of config server when the database definition was loaded. + repl::OpTime _configOpTime; + // Ensures that only one thread at a time loads collection configuration data from // the config server stdx::mutex _hitConfigServerLock; diff --git a/src/mongo/s/d_migrate.cpp b/src/mongo/s/d_migrate.cpp index 8039123945a..4866b84653a 100644 --- a/src/mongo/s/d_migrate.cpp +++ b/src/mongo/s/d_migrate.cpp @@ -954,7 +954,8 @@ public: ->getChunks(BSON(ChunkType::ns(ns)), BSON(ChunkType::DEPRECATED_lastmod() << -1), 1, - &newestChunk); + &newestChunk, + nullptr); uassertStatusOK(status); ChunkVersion checkVersion; diff --git a/src/mongo/s/optime_pair.h b/src/mongo/s/optime_pair.h new file mode 100644 index 00000000000..8f843855a8e --- /dev/null +++ b/src/mongo/s/optime_pair.h @@ -0,0 +1,49 @@ +/** +* Copyright (C) 2015 MongoDB Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +* +* As a special exception, the copyright holders give permission to link the +* code of portions of this program with the OpenSSL library under certain +* conditions as described in each individual source file and distribute +* linked combinations including the program with the OpenSSL library. You +* must comply with the GNU Affero General Public License in all respects for +* all of the code used other than as permitted herein. If you modify file(s) +* with this exception, you may extend this exception to your version of the +* file(s), but you are not obligated to do so. If you do not wish to do so, +* delete this exception statement from your version. If you delete this +* exception statement from all source files in the program, then also delete +* it in the license file. +*/ + +#pragma once + +#include "mongo/db/repl/optime.h" + +namespace mongo { + +/** + * Basic structure that contains a value and an opTime. + */ +template <typename T> +struct OpTimePair { +public: + OpTimePair() = default; + explicit OpTimePair(T val) : value(std::move(val)) {} + OpTimePair(T val, repl::OpTime ts) : value(std::move(val)), opTime(std::move(ts)) {} + + T value; + repl::OpTime opTime; +}; + +} // namespace mongo |