// @file version_manager.cpp /** * Copyright (C) 2010 10gen Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects * for all of the code used other than as permitted herein. If you modify * file(s) with this exception, you may extend this exception to your * version of the file(s), but you are not obligated to do so. If you do not * wish to do so, delete this exception statement from your version. If you * delete this exception statement from all source files in the program, * then also delete it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding #include "mongo/platform/basic.h" #include "mongo/s/version_manager.h" #include "mongo/s/chunk.h" #include "mongo/s/chunk_version.h" #include "mongo/s/config.h" #include "mongo/s/grid.h" #include "mongo/s/shard.h" #include "mongo/s/stale_exception.h" // for SendStaleConfigException #include "mongo/util/log.h" namespace mongo { using std::endl; using std::map; using std::string; // Global version manager VersionManager versionManager; /** * Tracking information, per-connection, of the latest chunk manager iteration or sequence * number that was used to send a shard version over this connection. * When the chunk manager is replaced, implying new versions were loaded, the chunk manager * sequence number is iterated by 1 and connections need to re-send shard versions. */ struct ConnectionShardStatus { ConnectionShardStatus() : _mutex("ConnectionShardStatus") {} bool hasAnySequenceSet(DBClientBase* conn) { scoped_lock lk(_mutex); SequenceMap::const_iterator seenConnIt = _map.find(conn->getConnectionId()); return seenConnIt != _map.end() && seenConnIt->second.size() > 0; } bool getSequence(DBClientBase* conn, const string& ns, unsigned long long* sequence) { scoped_lock lk(_mutex); SequenceMap::const_iterator seenConnIt = _map.find(conn->getConnectionId()); if (seenConnIt == _map.end()) return false; map::const_iterator seenNSIt = seenConnIt->second.find(ns); if (seenNSIt == seenConnIt->second.end()) return false; *sequence = seenNSIt->second; return true; } void setSequence(DBClientBase* conn, const string& ns, const unsigned long long& s) { scoped_lock lk(_mutex); _map[conn->getConnectionId()][ns] = s; } void reset(DBClientBase* conn) { scoped_lock lk(_mutex); _map.erase(conn->getConnectionId()); } // protects _map mongo::mutex _mutex; // a map from a connection into ChunkManager's sequence number for each namespace typedef map> SequenceMap; SequenceMap _map; } connectionShardStatus; void VersionManager::resetShardVersionCB(DBClientBase* conn) { connectionShardStatus.reset(conn); } bool VersionManager::isVersionableCB(DBClientBase* conn) { return conn->type() == ConnectionString::MASTER || conn->type() == ConnectionString::SET; } DBClientBase* getVersionable(DBClientBase* conn) { switch (conn->type()) { case ConnectionString::INVALID: massert(15904, str::stream() << "cannot set version on invalid connection " << conn->toString(), false); return NULL; case ConnectionString::MASTER: return conn; case ConnectionString::PAIR: massert(15905, str::stream() << "cannot set version or shard on pair connection " << conn->toString(), false); return NULL; case ConnectionString::SYNC: massert(15906, str::stream() << "cannot set version or shard on sync connection " << conn->toString(), false); return NULL; case ConnectionString::CUSTOM: massert(16334, str::stream() << "cannot set version or shard on custom connection " << conn->toString(), false); return NULL; case ConnectionString::SET: DBClientReplicaSet* set = (DBClientReplicaSet*)conn; return &(set->masterConn()); } verify(false); return NULL; } bool VersionManager::forceRemoteCheckShardVersionCB(const string& ns) { DBConfigPtr conf = grid.getDBConfig(ns); if (!conf) return false; conf->reload(); // If we don't have a collection, don't refresh the chunk manager if (nsGetCollection(ns).size() == 0) return false; ChunkManagerPtr manager = conf->getChunkManagerIfExists(ns, true, true); if (!manager) return false; return true; } /** * Special internal logic to run reduced version handshake for empty namespace operations to * shards. * * Eventually this should go completely away, but for now many commands rely on unversioned but * mongos-specific behavior on mongod (auditing and replication information in commands) */ static bool initShardVersionEmptyNS(DBClientBase* conn_in) { bool ok; BSONObj result; DBClientBase* conn = NULL; try { // May throw if replica set primary is down conn = getVersionable(conn_in); dassert(conn); // errors thrown above // Check to see if we've already initialized this connection if (connectionShardStatus.hasAnySequenceSet(conn)) return false; // Check to see if this is actually a shard and not a single config server // NOTE: Config servers are registered only by the name "config" in the shard cache, not // by host, so lookup by host will fail unless the host is also a shard. Shard shard = Shard::findIfExists(conn->getServerAddress()); if (!shard.ok()) return false; LOG(1) << "initializing shard connection to " << shard.toString() << endl; ok = setShardVersion(*conn, "", ChunkVersion(), ChunkManagerPtr(), true, result); } catch (const DBException&) { // NOTE: Replica sets may fail to initShardVersion because future calls relying on // correct versioning must later call checkShardVersion on the primary. // Secondary queries and commands may not call checkShardVersion, but secondary ops // aren't versioned at all. if (conn_in->type() != ConnectionString::SET) { throw; } // NOTE: Only old-style cluster operations will talk via DBClientReplicaSets - using // checkShardVersion is required (which includes initShardVersion information) if these // connections are used. OCCASIONALLY { warning() << "failed to initialize new replica set connection version, " << "will initialize on first use" << endl; } return false; } // Record the connection wire version if sent in the response, initShardVersion is a // handshake for mongos->mongod connections. if (!result["minWireVersion"].eoo()) { int minWireVersion = result["minWireVersion"].numberInt(); int maxWireVersion = result["maxWireVersion"].numberInt(); conn->setWireVersions(minWireVersion, maxWireVersion); } LOG(3) << "initial sharding result : " << result << endl; connectionShardStatus.setSequence(conn, "", 0); return ok; } /** * Updates the remote cached version on the remote shard host (primary, in the case of replica * sets) if needed with a fully-qualified shard version for the given namespace: * config server(s) + shard name + shard version * * If no remote cached version has ever been set, an initial shard version is sent. * * If the namespace is empty and no version has ever been sent, the config server + shard name * is sent to the remote shard host to initialize the connection as coming from mongos. * NOTE: This initialization is *best-effort only*. Operations which wish to correctly version * must send the namespace. * * Config servers are special and are not (unless otherwise a shard) kept up to date with this * protocol. This is safe so long as config servers only contain unversioned collections. * * It is an error to call checkShardVersion with an unversionable connection (isVersionableCB). * * @return true if we contacted the remote host */ bool checkShardVersion(DBClientBase* conn_in, const string& ns, ChunkManagerPtr refManager, bool authoritative, int tryNumber) { // TODO: cache, optimize, etc... // Empty namespaces are special - we require initialization but not versioning if (ns.size() == 0) { return initShardVersionEmptyNS(conn_in); } DBConfigPtr conf = grid.getDBConfig(ns); if (!conf) return false; DBClientBase* conn = getVersionable(conn_in); verify(conn); // errors thrown above unsigned long long officialSequenceNumber = 0; ShardPtr primary; ChunkManagerPtr manager; if (authoritative) conf->getChunkManagerIfExists(ns, true); conf->getChunkManagerOrPrimary(ns, manager, primary); if (manager) officialSequenceNumber = manager->getSequenceNumber(); // Check this manager against the reference manager if (manager) { Shard shard = Shard::make(conn->getServerAddress()); if (refManager && !refManager->compatibleWith(*manager, shard.getName())) { const ChunkVersion refVersion(refManager->getVersion(shard.getName())); const ChunkVersion currentVersion(manager->getVersion(shard.getName())); string msg(str::stream() << "manager (" << currentVersion.toString() << " : " << manager->getSequenceNumber() << ") " << "not compatible with reference manager (" << refVersion.toString() << " : " << refManager->getSequenceNumber() << ") " << "on shard " << shard.getName() << " (" << shard.getAddress().toString() << ")"); throw SendStaleConfigException(ns, msg, refVersion, currentVersion); } } else if (refManager) { Shard shard = Shard::make(conn->getServerAddress()); string msg(str::stream() << "not sharded (" << ((manager.get() == 0) ? string("") : str::stream() << manager->getSequenceNumber()) << ") but has reference manager (" << refManager->getSequenceNumber() << ") " << "on conn " << conn->getServerAddress() << " (" << conn_in->getServerAddress() << ")"); throw SendStaleConfigException( ns, msg, refManager->getVersion(shard.getName()), ChunkVersion::UNSHARDED()); } // Do not send setShardVersion to collections on the config servers - this causes problems // when config servers are also shards and get SSV with conflicting names. // TODO: Make config servers regular shards if (primary && primary->getName() == "config") { return false; } // Has the ChunkManager been reloaded since the last time we updated the shard version over // this connection? If we've never updated the shard version, do so now. unsigned long long sequenceNumber = 0; if (connectionShardStatus.getSequence(conn, ns, &sequenceNumber)) { if (sequenceNumber == officialSequenceNumber) { return false; } } // Now that we're sure we're sending SSV and not to a single config server, get the shard Shard shard = Shard::make(conn->getServerAddress()); ChunkVersion version = ChunkVersion(0, 0, OID()); if (manager) version = manager->getVersion(shard.getName()); LOG(1) << "setting shard version of " << version << " for " << ns << " on shard " << shard.toString(); LOG(3) << "last version sent with chunk manager iteration " << sequenceNumber << ", current chunk manager iteration is " << officialSequenceNumber; BSONObj result; if (setShardVersion(*conn, ns, version, manager, authoritative, result)) { // success! LOG(1) << " setShardVersion success: " << result << endl; connectionShardStatus.setSequence(conn, ns, officialSequenceNumber); return true; } LOG(1) << " setShardVersion failed!\n" << result << endl; if (result["need_authoritative"].trueValue()) massert(10428, "need_authoritative set but in authoritative mode already", !authoritative); if (!authoritative) { // use the original connection and get a fresh versionable connection // since conn can be invalidated (or worse, freed) after the failure checkShardVersion(conn_in, ns, refManager, 1, tryNumber + 1); return true; } if (result["reloadConfig"].trueValue()) { if (result["version"].timestampTime() == 0) { warning() << "reloading full configuration for " << conf->getName() << ", connection state indicates significant version changes" << endl; // reload db conf->reload(); } else { // reload config conf->getChunkManager(ns, true); } } const int maxNumTries = 7; if (tryNumber < maxNumTries) { LOG(tryNumber < (maxNumTries / 2) ? 1 : 0) << "going to retry checkShardVersion shard: " << shard.toString() << " " << result; sleepmillis(10 * tryNumber); // use the original connection and get a fresh versionable connection // since conn can be invalidated (or worse, freed) after the failure checkShardVersion(conn_in, ns, refManager, true, tryNumber + 1); return true; } string errmsg = str::stream() << "setShardVersion failed shard: " << shard.toString() << " " << result; log() << " " << errmsg << endl; massert(10429, errmsg, 0); return true; } bool VersionManager::checkShardVersionCB(DBClientBase* conn_in, const string& ns, bool authoritative, int tryNumber) { return checkShardVersion(conn_in, ns, ChunkManagerPtr(), authoritative, tryNumber); } bool VersionManager::checkShardVersionCB(ShardConnection* conn_in, bool authoritative, int tryNumber) { return checkShardVersion( conn_in->get(), conn_in->getNS(), conn_in->getManager(), authoritative, tryNumber); } } // namespace mongo