/** * Copyright (C) 2013 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding #include "mongo/platform/basic.h" #include "mongo/s/chunk_manager_targeter.h" #include #include "mongo/s/chunk.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/config.h" #include "mongo/s/grid.h" #include "mongo/s/sharding_raii.h" #include "mongo/s/shard_key_pattern.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" namespace mongo { using std::shared_ptr; using str::stream; using std::map; using std::set; using std::string; using std::vector; namespace { enum UpdateType { UpdateType_Replacement, UpdateType_OpStyle, UpdateType_Unknown }; enum CompareResult { CompareResult_Unknown, CompareResult_GTE, CompareResult_LT }; const ShardKeyPattern virtualIdShardKey(BSON("_id" << 1)); // To match legacy reload behavior, we have to backoff on config reload per-thread // TODO: Centralize this behavior better by refactoring config reload in mongos boost::thread_specific_ptr perThreadBackoff; const int maxWaitMillis = 500; /** * There are two styles of update expressions: * * Replacement style: coll.update({ x : 1 }, { y : 2 }) * OpStyle: coll.update({ x : 1 }, { $set : { y : 2 } }) */ UpdateType getUpdateExprType(const BSONObj& updateExpr) { // Empty update is replacement-style, by default if (updateExpr.isEmpty()) { return UpdateType_Replacement; } UpdateType updateType = UpdateType_Unknown; BSONObjIterator it(updateExpr); while (it.more()) { BSONElement next = it.next(); if (next.fieldName()[0] == '$') { if (updateType == UpdateType_Unknown) { updateType = UpdateType_OpStyle; } else if (updateType == UpdateType_Replacement) { return UpdateType_Unknown; } } else { if (updateType == UpdateType_Unknown) { updateType = UpdateType_Replacement; } else if (updateType == UpdateType_OpStyle) { return UpdateType_Unknown; } } } return updateType; } /** * This returns "does the query have an _id field" and "is the _id field querying for a direct * value like _id : 3 and not _id : { $gt : 3 }" * * Ex: { _id : 1 } => true * { foo : , _id : 1 } => true * { _id : { $lt : 30 } } => false * { foo : } => false */ bool isExactIdQuery(OperationContext* txn, const BSONObj& query) { StatusWith status = virtualIdShardKey.extractShardKeyFromQuery(txn, query); if (!status.isOK()) { return false; } return !status.getValue()["_id"].eoo(); } void refreshBackoff() { if (!perThreadBackoff.get()) { perThreadBackoff.reset(new Backoff(maxWaitMillis, maxWaitMillis * 2)); } perThreadBackoff.get()->nextSleepMillis(); } // // Utilities to compare shard versions // /** * Returns the relationship of two shard versions. Shard versions of a collection that has not * been dropped and recreated and where there is at least one chunk on a shard are comparable, * otherwise the result is ambiguous. */ CompareResult compareShardVersions(const ChunkVersion& shardVersionA, const ChunkVersion& shardVersionB) { // Collection may have been dropped if (!shardVersionA.hasEqualEpoch(shardVersionB)) { return CompareResult_Unknown; } // Zero shard versions are only comparable to themselves if (!shardVersionA.isSet() || !shardVersionB.isSet()) { // If both are zero... if (!shardVersionA.isSet() && !shardVersionB.isSet()) { return CompareResult_GTE; } return CompareResult_Unknown; } if (shardVersionA < shardVersionB) { return CompareResult_LT; } else return CompareResult_GTE; } ChunkVersion getShardVersion(StringData shardName, const ChunkManager* manager, const Shard* primary) { dassert(!(manager && primary)); dassert(manager || primary); if (primary) { return ChunkVersion::UNSHARDED(); } return manager->getVersion(shardName.toString()); } /** * Returns the relationship between two maps of shard versions. As above, these maps are often * comparable when the collection has not been dropped and there is at least one chunk on the * shards. If any versions in the maps are not comparable, the result is _Unknown. * * If any versions in the first map (cached) are _LT the versions in the second map (remote), * the first (cached) versions are _LT the second (remote) versions. * * Note that the signature here is weird since our cached map of chunk versions is stored in a * ChunkManager or is implicit in the primary shard of the collection. */ CompareResult compareAllShardVersions(const ChunkManager* cachedChunkManager, const Shard* cachedPrimary, const map& remoteShardVersions) { CompareResult finalResult = CompareResult_GTE; for (map::const_iterator it = remoteShardVersions.begin(); it != remoteShardVersions.end(); ++it) { // Get the remote and cached version for the next shard const string& shardName = it->first; const ChunkVersion& remoteShardVersion = it->second; ChunkVersion cachedShardVersion; try { // Throws b/c shard constructor throws cachedShardVersion = getShardVersion(shardName, cachedChunkManager, cachedPrimary); } catch (const DBException& ex) { warning() << "could not lookup shard " << shardName << " in local cache, shard metadata may have changed" << " or be unavailable" << causedBy(ex); return CompareResult_Unknown; } // Compare the remote and cached versions CompareResult result = compareShardVersions(cachedShardVersion, remoteShardVersion); if (result == CompareResult_Unknown) return result; if (result == CompareResult_LT) finalResult = CompareResult_LT; // Note that we keep going after _LT b/c there could be more _Unknowns. } return finalResult; } /** * Whether or not the manager/primary pair is different from the other manager/primary pair. */ bool isMetadataDifferent(const shared_ptr& managerA, const shared_ptr& primaryA, const shared_ptr& managerB, const shared_ptr& primaryB) { if ((managerA && !managerB) || (!managerA && managerB) || (primaryA && !primaryB) || (!primaryA && primaryB)) return true; if (managerA) { return !managerA->getVersion().isStrictlyEqualTo(managerB->getVersion()); } dassert(NULL != primaryA.get()); return primaryA->getId() != primaryB->getId(); } /** * Whether or not the manager/primary pair was changed or refreshed from a previous version * of the metadata. */ bool wasMetadataRefreshed(const shared_ptr& managerA, const shared_ptr& primaryA, const shared_ptr& managerB, const shared_ptr& primaryB) { if (isMetadataDifferent(managerA, primaryA, managerB, primaryB)) return true; if (managerA) { dassert(managerB.get()); // otherwise metadata would be different return managerA->getSequenceNumber() != managerB->getSequenceNumber(); } return false; } } // namespace ChunkManagerTargeter::ChunkManagerTargeter(const NamespaceString& nss, TargeterStats* stats) : _nss(nss), _needsTargetingRefresh(false), _stats(stats) {} Status ChunkManagerTargeter::init(OperationContext* txn) { auto dbStatus = ScopedShardDatabase::getOrCreate(txn, _nss.db()); if (!dbStatus.isOK()) { return dbStatus.getStatus(); } auto scopedDb = std::move(dbStatus.getValue()); scopedDb.db()->getChunkManagerOrPrimary(txn, _nss.ns(), _manager, _primary); return Status::OK(); } const NamespaceString& ChunkManagerTargeter::getNS() const { return _nss; } Status ChunkManagerTargeter::targetInsert(OperationContext* txn, const BSONObj& doc, ShardEndpoint** endpoint) const { BSONObj shardKey; if (_manager) { // // Sharded collections have the following requirements for targeting: // // Inserts must contain the exact shard key. // shardKey = _manager->getShardKeyPattern().extractShardKeyFromDoc(doc); // Check shard key exists if (shardKey.isEmpty()) { return Status(ErrorCodes::ShardKeyNotFound, stream() << "document " << doc << " does not contain shard key for pattern " << _manager->getShardKeyPattern().toString()); } // Check shard key size on insert Status status = ShardKeyPattern::checkShardKeySize(shardKey); if (!status.isOK()) return status; } // Target the shard key or database primary if (!shardKey.isEmpty()) { return targetShardKey(txn, shardKey, doc.objsize(), endpoint); } else { if (!_primary) { return Status(ErrorCodes::NamespaceNotFound, str::stream() << "could not target insert in collection " << getNS().ns() << "; no metadata found"); } *endpoint = new ShardEndpoint(_primary->getId(), ChunkVersion::UNSHARDED()); return Status::OK(); } } Status ChunkManagerTargeter::targetUpdate(OperationContext* txn, const BatchedUpdateDocument& updateDoc, vector* endpoints) const { // // Update targeting may use either the query or the update. This is to support save-style // updates, of the form: // // coll.update({ _id : xxx }, { _id : xxx, shardKey : 1, foo : bar }, { upsert : true }) // // Because drivers do not know the shard key, they can't pull the shard key automatically // into the query doc, and to correctly support upsert we must target a single shard. // // The rule is simple - If the update is replacement style (no '$set'), we target using the // update. If the update is replacement style, we target using the query. // // If we have the exact shard key in either the query or replacement doc, we target using // that extracted key. // BSONObj query = updateDoc.getQuery(); BSONObj updateExpr = updateDoc.getUpdateExpr(); UpdateType updateType = getUpdateExprType(updateDoc.getUpdateExpr()); if (updateType == UpdateType_Unknown) { return Status(ErrorCodes::UnsupportedFormat, stream() << "update document " << updateExpr << " has mixed $operator and non-$operator style fields"); } BSONObj shardKey; if (_manager) { // // Sharded collections have the following futher requirements for targeting: // // Upserts must be targeted exactly by shard key. // Non-multi updates must be targeted exactly by shard key *or* exact _id. // // Get the shard key if (updateType == UpdateType_OpStyle) { // Target using the query StatusWith status = _manager->getShardKeyPattern().extractShardKeyFromQuery(txn, query); // Bad query if (!status.isOK()) return status.getStatus(); shardKey = status.getValue(); } else { // Target using the replacement document shardKey = _manager->getShardKeyPattern().extractShardKeyFromDoc(updateExpr); } // // Extra sharded update validation // if (updateDoc.getUpsert()) { // Sharded upserts *always* need to be exactly targeted by shard key if (shardKey.isEmpty()) { return Status(ErrorCodes::ShardKeyNotFound, stream() << "upsert " << updateDoc.toBSON() << " does not contain shard key for pattern " << _manager->getShardKeyPattern().toString()); } // Also check shard key size on upsert Status status = ShardKeyPattern::checkShardKeySize(shardKey); if (!status.isOK()) return status; } // Validate that single (non-multi) sharded updates are targeted by shard key or _id if (!updateDoc.getMulti() && shardKey.isEmpty() && !isExactIdQuery(txn, updateDoc.getQuery())) { return Status(ErrorCodes::ShardKeyNotFound, stream() << "update " << updateDoc.toBSON() << " does not contain _id or shard key for pattern " << _manager->getShardKeyPattern().toString()); } } // Target the shard key, query, or replacement doc if (!shardKey.isEmpty()) { // We can't rely on our query targeting to be exact ShardEndpoint* endpoint = NULL; Status result = targetShardKey(txn, shardKey, (query.objsize() + updateExpr.objsize()), &endpoint); endpoints->push_back(endpoint); return result; } else if (updateType == UpdateType_OpStyle) { return targetQuery(txn, query, endpoints); } else { return targetDoc(txn, updateExpr, endpoints); } } Status ChunkManagerTargeter::targetDelete(OperationContext* txn, const BatchedDeleteDocument& deleteDoc, vector* endpoints) const { BSONObj shardKey; if (_manager) { // // Sharded collections have the following further requirements for targeting: // // Limit-1 deletes must be targeted exactly by shard key *or* exact _id // // Get the shard key StatusWith status = _manager->getShardKeyPattern().extractShardKeyFromQuery(txn, deleteDoc.getQuery()); // Bad query if (!status.isOK()) return status.getStatus(); shardKey = status.getValue(); // Validate that single (limit-1) sharded deletes are targeted by shard key or _id if (deleteDoc.getLimit() == 1 && shardKey.isEmpty() && !isExactIdQuery(txn, deleteDoc.getQuery())) { return Status(ErrorCodes::ShardKeyNotFound, stream() << "delete " << deleteDoc.toBSON() << " does not contain _id or shard key for pattern " << _manager->getShardKeyPattern().toString()); } } // Target the shard key or delete query if (!shardKey.isEmpty()) { // We can't rely on our query targeting to be exact ShardEndpoint* endpoint = NULL; Status result = targetShardKey(txn, shardKey, 0, &endpoint); endpoints->push_back(endpoint); return result; } else { return targetQuery(txn, deleteDoc.getQuery(), endpoints); } } Status ChunkManagerTargeter::targetDoc(OperationContext* txn, const BSONObj& doc, vector* endpoints) const { // NOTE: This is weird and fragile, but it's the way our language works right now - // documents are either A) invalid or B) valid equality queries over themselves. return targetQuery(txn, doc, endpoints); } Status ChunkManagerTargeter::targetQuery(OperationContext* txn, const BSONObj& query, vector* endpoints) const { if (!_primary && !_manager) { return Status(ErrorCodes::NamespaceNotFound, stream() << "could not target query in " << getNS().ns() << "; no metadata found"); } set shardIds; if (_manager) { try { _manager->getShardIdsForQuery(txn, query, &shardIds); } catch (const DBException& ex) { return ex.toStatus(); } } else { shardIds.insert(_primary->getId()); } for (const ShardId& shardId : shardIds) { endpoints->push_back(new ShardEndpoint( shardId, _manager ? _manager->getVersion(shardId) : ChunkVersion::UNSHARDED())); } return Status::OK(); } Status ChunkManagerTargeter::targetShardKey(OperationContext* txn, const BSONObj& shardKey, long long estDataSize, ShardEndpoint** endpoint) const { invariant(NULL != _manager); shared_ptr chunk = _manager->findIntersectingChunk(txn, shardKey); // Track autosplit stats for sharded collections // Note: this is only best effort accounting and is not accurate. if (estDataSize > 0) { _stats->chunkSizeDelta[chunk->getMin()] += estDataSize; } *endpoint = new ShardEndpoint(chunk->getShardId(), _manager->getVersion(chunk->getShardId())); return Status::OK(); } Status ChunkManagerTargeter::targetCollection(vector* endpoints) const { if (!_primary && !_manager) { return Status(ErrorCodes::NamespaceNotFound, str::stream() << "could not target full range of " << getNS().ns() << "; metadata not found"); } set shardIds; if (_manager) { _manager->getAllShardIds(&shardIds); } else { shardIds.insert(_primary->getId()); } for (const ShardId& shardId : shardIds) { endpoints->push_back(new ShardEndpoint( shardId, _manager ? _manager->getVersion(shardId) : ChunkVersion::UNSHARDED())); } return Status::OK(); } Status ChunkManagerTargeter::targetAllShards(vector* endpoints) const { if (!_primary && !_manager) { return Status(ErrorCodes::NamespaceNotFound, str::stream() << "could not target every shard with versions for " << getNS().ns() << "; metadata not found"); } vector shardIds; grid.shardRegistry()->getAllShardIds(&shardIds); for (const ShardId& shardId : shardIds) { endpoints->push_back(new ShardEndpoint( shardId, _manager ? _manager->getVersion(shardId) : ChunkVersion::UNSHARDED())); } return Status::OK(); } void ChunkManagerTargeter::noteStaleResponse(const ShardEndpoint& endpoint, const BSONObj& staleInfo) { dassert(!_needsTargetingRefresh); ChunkVersion remoteShardVersion; if (staleInfo["vWanted"].eoo()) { // If we don't have a vWanted sent, assume the version is higher than our current // version. remoteShardVersion = getShardVersion(endpoint.shardName, _manager.get(), _primary.get()); remoteShardVersion.incMajor(); } else { remoteShardVersion = ChunkVersion::fromBSON(staleInfo, "vWanted"); } ShardVersionMap::iterator it = _remoteShardVersions.find(endpoint.shardName); if (it == _remoteShardVersions.end()) { _remoteShardVersions.insert(make_pair(endpoint.shardName, remoteShardVersion)); } else { ChunkVersion& previouslyNotedVersion = it->second; if (previouslyNotedVersion.hasEqualEpoch(remoteShardVersion)) { if (previouslyNotedVersion.isOlderThan(remoteShardVersion)) { previouslyNotedVersion = remoteShardVersion; } } else { // Epoch changed midway while applying the batch so set the version to something unique // and non-existent to force a reload when refreshIsNeeded is called. previouslyNotedVersion = ChunkVersion::IGNORED(); } } } void ChunkManagerTargeter::noteCouldNotTarget() { dassert(_remoteShardVersions.empty()); _needsTargetingRefresh = true; } Status ChunkManagerTargeter::refreshIfNeeded(OperationContext* txn, bool* wasChanged) { bool dummy; if (!wasChanged) { wasChanged = &dummy; } *wasChanged = false; // // Did we have any stale config or targeting errors at all? // if (!_needsTargetingRefresh && _remoteShardVersions.empty()) { return Status::OK(); } // // Get the latest metadata information from the cache if there were issues // shared_ptr lastManager = _manager; shared_ptr lastPrimary = _primary; auto dbStatus = ScopedShardDatabase::getOrCreate(txn, _nss.db()); if (!dbStatus.isOK()) { return dbStatus.getStatus(); } auto scopedDb = std::move(dbStatus.getValue()); scopedDb.db()->getChunkManagerOrPrimary(txn, _nss.ns(), _manager, _primary); // We now have the latest metadata from the cache. // // See if and how we need to do a remote refresh. // Either we couldn't target at all, or we have stale versions, but not both. // dassert(!(_needsTargetingRefresh && !_remoteShardVersions.empty())); if (_needsTargetingRefresh) { // Reset the field _needsTargetingRefresh = false; // If we couldn't target, we might need to refresh if we haven't remotely refreshed the // metadata since we last got it from the cache. bool alreadyRefreshed = wasMetadataRefreshed(lastManager, lastPrimary, _manager, _primary); // If didn't already refresh the targeting information, refresh it if (!alreadyRefreshed) { // To match previous behavior, we just need an incremental refresh here return refreshNow(txn, RefreshType_RefreshChunkManager); } *wasChanged = isMetadataDifferent(lastManager, lastPrimary, _manager, _primary); return Status::OK(); } else if (!_remoteShardVersions.empty()) { // If we got stale shard versions from remote shards, we may need to refresh // NOTE: Not sure yet if this can happen simultaneously with targeting issues CompareResult result = compareAllShardVersions(_manager.get(), _primary.get(), _remoteShardVersions); // Reset the versions _remoteShardVersions.clear(); if (result == CompareResult_Unknown) { // Our current shard versions aren't all comparable to the old versions, maybe drop return refreshNow(txn, RefreshType_ReloadDatabase); } else if (result == CompareResult_LT) { // Our current shard versions are less than the remote versions, but no drop return refreshNow(txn, RefreshType_RefreshChunkManager); } *wasChanged = isMetadataDifferent(lastManager, lastPrimary, _manager, _primary); return Status::OK(); } // unreachable dassert(false); return Status::OK(); } Status ChunkManagerTargeter::refreshNow(OperationContext* txn, RefreshType refreshType) { auto dbStatus = ScopedShardDatabase::getOrCreate(txn, _nss.db()); if (!dbStatus.isOK()) { return dbStatus.getStatus(); } auto scopedDb = std::move(dbStatus.getValue()); // Try not to spam the configs refreshBackoff(); // TODO: Improve synchronization and make more explicit if (refreshType == RefreshType_RefreshChunkManager) { try { // Forces a remote check of the collection info, synchronization between threads happens // internally scopedDb.db()->getChunkManagerIfExists(txn, _nss.ns(), true); } catch (const DBException& ex) { return Status(ErrorCodes::UnknownError, ex.toString()); } scopedDb.db()->getChunkManagerOrPrimary(txn, _nss.ns(), _manager, _primary); } else if (refreshType == RefreshType_ReloadDatabase) { try { // Dumps the db info, reloads it all, synchronization between threads happens internally scopedDb.db()->reload(txn); } catch (const DBException& ex) { return Status(ErrorCodes::UnknownError, ex.toString()); } scopedDb.db()->getChunkManagerOrPrimary(txn, _nss.ns(), _manager, _primary); } return Status::OK(); } } // namespace mongo