diff options
Diffstat (limited to 'src/mongo/s/version_manager.cpp')
-rw-r--r-- | src/mongo/s/version_manager.cpp | 628 |
1 files changed, 309 insertions, 319 deletions
diff --git a/src/mongo/s/version_manager.cpp b/src/mongo/s/version_manager.cpp index ad96e57b3b7..17c624e541d 100644 --- a/src/mongo/s/version_manager.cpp +++ b/src/mongo/s/version_manager.cpp @@ -51,396 +51,386 @@ namespace mongo { - using std::shared_ptr; - using std::endl; - using std::map; - using std::string; - - // Global version manager - VersionManager versionManager; - - /** - * Tracking information, per-connection, of the latest chunk manager iteration or sequence - * number that was used to send a shard version over this connection. - * When the chunk manager is replaced, implying new versions were loaded, the chunk manager - * sequence number is iterated by 1 and connections need to re-send shard versions. - */ - struct ConnectionShardStatus { - - bool hasAnySequenceSet(DBClientBase* conn) { - stdx::lock_guard<stdx::mutex> lk(_mutex); - - SequenceMap::const_iterator seenConnIt = _map.find(conn->getConnectionId()); - return seenConnIt != _map.end() && seenConnIt->second.size() > 0; - } +using std::shared_ptr; +using std::endl; +using std::map; +using std::string; - bool getSequence(DBClientBase * conn, - const string& ns, - unsigned long long* sequence) { +// Global version manager +VersionManager versionManager; - stdx::lock_guard<stdx::mutex> lk(_mutex); +/** + * Tracking information, per-connection, of the latest chunk manager iteration or sequence + * number that was used to send a shard version over this connection. + * When the chunk manager is replaced, implying new versions were loaded, the chunk manager + * sequence number is iterated by 1 and connections need to re-send shard versions. + */ +struct ConnectionShardStatus { + bool hasAnySequenceSet(DBClientBase* conn) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + + SequenceMap::const_iterator seenConnIt = _map.find(conn->getConnectionId()); + return seenConnIt != _map.end() && seenConnIt->second.size() > 0; + } - SequenceMap::const_iterator seenConnIt = _map.find(conn->getConnectionId()); - if (seenConnIt == _map.end()) - return false; + bool getSequence(DBClientBase* conn, const string& ns, unsigned long long* sequence) { + stdx::lock_guard<stdx::mutex> lk(_mutex); - map<string, unsigned long long>::const_iterator seenNSIt = seenConnIt->second.find(ns); - if (seenNSIt == seenConnIt->second.end()) - return false; + SequenceMap::const_iterator seenConnIt = _map.find(conn->getConnectionId()); + if (seenConnIt == _map.end()) + return false; - *sequence = seenNSIt->second; - return true; - } + map<string, unsigned long long>::const_iterator seenNSIt = seenConnIt->second.find(ns); + if (seenNSIt == seenConnIt->second.end()) + return false; - void setSequence( DBClientBase * conn , const string& ns , const unsigned long long& s ) { - stdx::lock_guard<stdx::mutex> lk( _mutex ); - _map[conn->getConnectionId()][ns] = s; - } + *sequence = seenNSIt->second; + return true; + } - void reset( DBClientBase * conn ) { - stdx::lock_guard<stdx::mutex> lk( _mutex ); - _map.erase( conn->getConnectionId() ); - } + void setSequence(DBClientBase* conn, const string& ns, const unsigned long long& s) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + _map[conn->getConnectionId()][ns] = s; + } - // protects _map - stdx::mutex _mutex; + void reset(DBClientBase* conn) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + _map.erase(conn->getConnectionId()); + } - // a map from a connection into ChunkManager's sequence number for each namespace - typedef map<unsigned long long, map<string,unsigned long long> > SequenceMap; - SequenceMap _map; + // protects _map + stdx::mutex _mutex; - } connectionShardStatus; + // a map from a connection into ChunkManager's sequence number for each namespace + typedef map<unsigned long long, map<string, unsigned long long>> SequenceMap; + SequenceMap _map; - void VersionManager::resetShardVersionCB( DBClientBase * conn ) { - connectionShardStatus.reset( conn ); - } +} connectionShardStatus; - bool VersionManager::isVersionableCB(DBClientBase* conn) { - // We do not version shard connections when issued from mongod - if (!isMongos()) { - return false; - } +void VersionManager::resetShardVersionCB(DBClientBase* conn) { + connectionShardStatus.reset(conn); +} - return conn->type() == ConnectionString::MASTER || conn->type() == ConnectionString::SET; +bool VersionManager::isVersionableCB(DBClientBase* conn) { + // We do not version shard connections when issued from mongod + if (!isMongos()) { + return false; } - DBClientBase* getVersionable(DBClientBase* conn) { - switch (conn->type()) { + return conn->type() == ConnectionString::MASTER || conn->type() == ConnectionString::SET; +} + +DBClientBase* getVersionable(DBClientBase* conn) { + switch (conn->type()) { case ConnectionString::INVALID: - massert(15904, str::stream() << "cannot set version on invalid connection " - << conn->toString(), false); + massert(15904, + str::stream() << "cannot set version on invalid connection " + << conn->toString(), + false); return nullptr; case ConnectionString::MASTER: return conn; case ConnectionString::SYNC: - massert(15906, str::stream() << "cannot set version or shard on sync connection " - << conn->toString(), false); + massert(15906, + str::stream() << "cannot set version or shard on sync connection " + << conn->toString(), + false); return nullptr; case ConnectionString::CUSTOM: - massert(16334, str::stream() << "cannot set version or shard on custom connection " - << conn->toString(), false); + massert(16334, + str::stream() << "cannot set version or shard on custom connection " + << conn->toString(), + false); return nullptr; case ConnectionString::SET: DBClientReplicaSet* set = (DBClientReplicaSet*)conn; return &(set->masterConn()); - } - - MONGO_UNREACHABLE; } - bool VersionManager::forceRemoteCheckShardVersionCB(const string& ns) { - const NamespaceString nss(ns); + MONGO_UNREACHABLE; +} - // This will force the database catalog entry to be reloaded - grid.catalogCache()->invalidate(nss.db().toString()); +bool VersionManager::forceRemoteCheckShardVersionCB(const string& ns) { + const NamespaceString nss(ns); - auto status = grid.catalogCache()->getDatabase(nss.db().toString()); - if (!status.isOK()) { - return false; - } + // This will force the database catalog entry to be reloaded + grid.catalogCache()->invalidate(nss.db().toString()); - shared_ptr<DBConfig> conf = status.getValue(); - - // If we don't have a collection, don't refresh the chunk manager - if (nsGetCollection(ns).size() == 0) { - return false; - } - - ChunkManagerPtr manager = conf->getChunkManagerIfExists(ns, true, true); - if (!manager) { - return false; - } + auto status = grid.catalogCache()->getDatabase(nss.db().toString()); + if (!status.isOK()) { + return false; + } - return true; + shared_ptr<DBConfig> conf = status.getValue(); + // If we don't have a collection, don't refresh the chunk manager + if (nsGetCollection(ns).size() == 0) { + return false; } - /** - * Special internal logic to run reduced version handshake for empty namespace operations to - * shards. - * - * Eventually this should go completely away, but for now many commands rely on unversioned but - * mongos-specific behavior on mongod (auditing and replication information in commands) - */ - static bool initShardVersionEmptyNS(DBClientBase * conn_in) { - - bool ok; - BSONObj result; - DBClientBase* conn = NULL; - try { - // May throw if replica set primary is down - conn = getVersionable( conn_in ); - dassert( conn ); // errors thrown above - - // Check to see if we've already initialized this connection - if (connectionShardStatus.hasAnySequenceSet(conn)) - return false; - - // Check to see if this is actually a shard and not a single config server - // NOTE: Config servers are registered only by the name "config" in the shard cache, not - // by host, so lookup by host will fail unless the host is also a shard. - const auto shard = grid.shardRegistry()->getShard(conn->getServerAddress()); - if (!shard) { - return false; - } - - LOG(1) << "initializing shard connection to " << shard->toString() << endl; - - ok = setShardVersion(*conn, - "", - grid.catalogManager()->connectionString().toString(), - ChunkVersion(), - NULL, - true, - result); - } - catch( const DBException& ) { - - // NOTE: Replica sets may fail to initShardVersion because future calls relying on - // correct versioning must later call checkShardVersion on the primary. - // Secondary queries and commands may not call checkShardVersion, but secondary ops - // aren't versioned at all. - if ( conn_in->type() != ConnectionString::SET ) { - throw; - } + ChunkManagerPtr manager = conf->getChunkManagerIfExists(ns, true, true); + if (!manager) { + return false; + } - // NOTE: Only old-style cluster operations will talk via DBClientReplicaSets - using - // checkShardVersion is required (which includes initShardVersion information) if these - // connections are used. + return true; +} - OCCASIONALLY { - warning() << "failed to initialize new replica set connection version, " - << "will initialize on first use" << endl; - } +/** + * Special internal logic to run reduced version handshake for empty namespace operations to + * shards. + * + * Eventually this should go completely away, but for now many commands rely on unversioned but + * mongos-specific behavior on mongod (auditing and replication information in commands) + */ +static bool initShardVersionEmptyNS(DBClientBase* conn_in) { + bool ok; + BSONObj result; + DBClientBase* conn = NULL; + try { + // May throw if replica set primary is down + conn = getVersionable(conn_in); + dassert(conn); // errors thrown above + + // Check to see if we've already initialized this connection + if (connectionShardStatus.hasAnySequenceSet(conn)) + return false; + // Check to see if this is actually a shard and not a single config server + // NOTE: Config servers are registered only by the name "config" in the shard cache, not + // by host, so lookup by host will fail unless the host is also a shard. + const auto shard = grid.shardRegistry()->getShard(conn->getServerAddress()); + if (!shard) { return false; } - // Record the connection wire version if sent in the response, initShardVersion is a - // handshake for mongos->mongod connections. - if ( !result["minWireVersion"].eoo() ) { + LOG(1) << "initializing shard connection to " << shard->toString() << endl; + + ok = setShardVersion(*conn, + "", + grid.catalogManager()->connectionString().toString(), + ChunkVersion(), + NULL, + true, + result); + } catch (const DBException&) { + // NOTE: Replica sets may fail to initShardVersion because future calls relying on + // correct versioning must later call checkShardVersion on the primary. + // Secondary queries and commands may not call checkShardVersion, but secondary ops + // aren't versioned at all. + if (conn_in->type() != ConnectionString::SET) { + throw; + } + + // NOTE: Only old-style cluster operations will talk via DBClientReplicaSets - using + // checkShardVersion is required (which includes initShardVersion information) if these + // connections are used. - int minWireVersion = result["minWireVersion"].numberInt(); - int maxWireVersion = result["maxWireVersion"].numberInt(); - conn->setWireVersions( minWireVersion, maxWireVersion ); + OCCASIONALLY { + warning() << "failed to initialize new replica set connection version, " + << "will initialize on first use" << endl; } - LOG(3) << "initial sharding result : " << result << endl; + return false; + } - connectionShardStatus.setSequence(conn, "", 0); - return ok; + // Record the connection wire version if sent in the response, initShardVersion is a + // handshake for mongos->mongod connections. + if (!result["minWireVersion"].eoo()) { + int minWireVersion = result["minWireVersion"].numberInt(); + int maxWireVersion = result["maxWireVersion"].numberInt(); + conn->setWireVersions(minWireVersion, maxWireVersion); } - /** - * Updates the remote cached version on the remote shard host (primary, in the case of replica - * sets) if needed with a fully-qualified shard version for the given namespace: - * config server(s) + shard name + shard version - * - * If no remote cached version has ever been set, an initial shard version is sent. - * - * If the namespace is empty and no version has ever been sent, the config server + shard name - * is sent to the remote shard host to initialize the connection as coming from mongos. - * NOTE: This initialization is *best-effort only*. Operations which wish to correctly version - * must send the namespace. - * - * Config servers are special and are not (unless otherwise a shard) kept up to date with this - * protocol. This is safe so long as config servers only contain unversioned collections. - * - * It is an error to call checkShardVersion with an unversionable connection (isVersionableCB). - * - * @return true if we contacted the remote host - */ - bool checkShardVersion(DBClientBase* conn_in, - const string& ns, - ChunkManagerPtr refManager, - bool authoritative, - int tryNumber) { - - // TODO: cache, optimize, etc... - - // Empty namespaces are special - we require initialization but not versioning - if (ns.size() == 0) { - return initShardVersionEmptyNS(conn_in); - } + LOG(3) << "initial sharding result : " << result << endl; - auto status = grid.catalogCache()->getDatabase(nsToDatabase(ns)); - if (!status.isOK()) { - return false; - } + connectionShardStatus.setSequence(conn, "", 0); + return ok; +} - shared_ptr<DBConfig> conf = status.getValue(); +/** + * Updates the remote cached version on the remote shard host (primary, in the case of replica + * sets) if needed with a fully-qualified shard version for the given namespace: + * config server(s) + shard name + shard version + * + * If no remote cached version has ever been set, an initial shard version is sent. + * + * If the namespace is empty and no version has ever been sent, the config server + shard name + * is sent to the remote shard host to initialize the connection as coming from mongos. + * NOTE: This initialization is *best-effort only*. Operations which wish to correctly version + * must send the namespace. + * + * Config servers are special and are not (unless otherwise a shard) kept up to date with this + * protocol. This is safe so long as config servers only contain unversioned collections. + * + * It is an error to call checkShardVersion with an unversionable connection (isVersionableCB). + * + * @return true if we contacted the remote host + */ +bool checkShardVersion(DBClientBase* conn_in, + const string& ns, + ChunkManagerPtr refManager, + bool authoritative, + int tryNumber) { + // TODO: cache, optimize, etc... + + // Empty namespaces are special - we require initialization but not versioning + if (ns.size() == 0) { + return initShardVersionEmptyNS(conn_in); + } - DBClientBase* conn = getVersionable( conn_in ); - verify(conn); // errors thrown above + auto status = grid.catalogCache()->getDatabase(nsToDatabase(ns)); + if (!status.isOK()) { + return false; + } - unsigned long long officialSequenceNumber = 0; + shared_ptr<DBConfig> conf = status.getValue(); - ShardPtr primary; - ChunkManagerPtr manager; - if (authoritative) - conf->getChunkManagerIfExists(ns, true); + DBClientBase* conn = getVersionable(conn_in); + verify(conn); // errors thrown above - conf->getChunkManagerOrPrimary(ns, manager, primary); + unsigned long long officialSequenceNumber = 0; - if (manager) { - officialSequenceNumber = manager->getSequenceNumber(); - } + ShardPtr primary; + ChunkManagerPtr manager; + if (authoritative) + conf->getChunkManagerIfExists(ns, true); - const auto shard = grid.shardRegistry()->getShard(conn->getServerAddress()); - uassert(ErrorCodes::ShardNotFound, - str::stream() << conn->getServerAddress() << " is not recognized as a shard", - shard); - - // Check this manager against the reference manager - if (manager) { - - if (refManager && !refManager->compatibleWith(*manager, shard->getId())) { - const ChunkVersion refVersion(refManager->getVersion(shard->getId())); - const ChunkVersion currentVersion(manager->getVersion(shard->getId())); - - string msg(str::stream() << "manager (" - << currentVersion.toString() - << " : " << manager->getSequenceNumber() << ") " - << "not compatible with reference manager (" - << refVersion.toString() - << " : " << refManager->getSequenceNumber() << ") " - << "on shard " << shard->getId() - << " (" << shard->getConnString().toString() << ")"); - - throw SendStaleConfigException(ns, - msg, - refVersion, - currentVersion); - } - } - else if (refManager) { - - string msg( str::stream() << "not sharded (" - << ( (manager.get() == 0) ? string( "<none>" ) : - str::stream() << manager->getSequenceNumber() ) - << ") but has reference manager (" - << refManager->getSequenceNumber() << ") " - << "on conn " << conn->getServerAddress() << " (" - << conn_in->getServerAddress() << ")" ); - - throw SendStaleConfigException(ns, - msg, - refManager->getVersion(shard->getId()), - ChunkVersion::UNSHARDED()); - } + conf->getChunkManagerOrPrimary(ns, manager, primary); - // Do not send setShardVersion to collections on the config servers - this causes problems - // when config servers are also shards and get SSV with conflicting names. - // TODO: Make config servers regular shards - if (primary && primary->getId() == "config") { - return false; - } + if (manager) { + officialSequenceNumber = manager->getSequenceNumber(); + } - // Has the ChunkManager been reloaded since the last time we updated the shard version over - // this connection? If we've never updated the shard version, do so now. - unsigned long long sequenceNumber = 0; - if (connectionShardStatus.getSequence(conn, ns, &sequenceNumber)) { - if (sequenceNumber == officialSequenceNumber) { - return false; - } + const auto shard = grid.shardRegistry()->getShard(conn->getServerAddress()); + uassert(ErrorCodes::ShardNotFound, + str::stream() << conn->getServerAddress() << " is not recognized as a shard", + shard); + + // Check this manager against the reference manager + if (manager) { + if (refManager && !refManager->compatibleWith(*manager, shard->getId())) { + const ChunkVersion refVersion(refManager->getVersion(shard->getId())); + const ChunkVersion currentVersion(manager->getVersion(shard->getId())); + + string msg(str::stream() + << "manager (" << currentVersion.toString() << " : " + << manager->getSequenceNumber() << ") " + << "not compatible with reference manager (" << refVersion.toString() + << " : " << refManager->getSequenceNumber() << ") " + << "on shard " << shard->getId() << " (" << shard->getConnString().toString() + << ")"); + + throw SendStaleConfigException(ns, msg, refVersion, currentVersion); } + } else if (refManager) { + string msg(str::stream() << "not sharded (" + << ((manager.get() == 0) ? string("<none>") : str::stream() + << manager->getSequenceNumber()) + << ") but has reference manager (" + << refManager->getSequenceNumber() << ") " + << "on conn " << conn->getServerAddress() << " (" + << conn_in->getServerAddress() << ")"); + + throw SendStaleConfigException( + ns, msg, refManager->getVersion(shard->getId()), ChunkVersion::UNSHARDED()); + } - ChunkVersion version = ChunkVersion(0, 0, OID()); - if (manager) { - version = manager->getVersion(shard->getId()); - } + // Do not send setShardVersion to collections on the config servers - this causes problems + // when config servers are also shards and get SSV with conflicting names. + // TODO: Make config servers regular shards + if (primary && primary->getId() == "config") { + return false; + } - LOG(1) << "setting shard version of " << version << " for " << ns << " on shard " - << shard->toString(); + // Has the ChunkManager been reloaded since the last time we updated the shard version over + // this connection? If we've never updated the shard version, do so now. + unsigned long long sequenceNumber = 0; + if (connectionShardStatus.getSequence(conn, ns, &sequenceNumber)) { + if (sequenceNumber == officialSequenceNumber) { + return false; + } + } - LOG(3) << "last version sent with chunk manager iteration " << sequenceNumber - << ", current chunk manager iteration is " << officialSequenceNumber; + ChunkVersion version = ChunkVersion(0, 0, OID()); + if (manager) { + version = manager->getVersion(shard->getId()); + } - BSONObj result; - if (setShardVersion(*conn, - ns, - grid.catalogManager()->connectionString().toString(), - version, - manager.get(), - authoritative, - result)) { + LOG(1) << "setting shard version of " << version << " for " << ns << " on shard " + << shard->toString(); + + LOG(3) << "last version sent with chunk manager iteration " << sequenceNumber + << ", current chunk manager iteration is " << officialSequenceNumber; + + BSONObj result; + if (setShardVersion(*conn, + ns, + grid.catalogManager()->connectionString().toString(), + version, + manager.get(), + authoritative, + result)) { + LOG(1) << " setShardVersion success: " << result; + connectionShardStatus.setSequence(conn, ns, officialSequenceNumber); + return true; + } - LOG(1) << " setShardVersion success: " << result; - connectionShardStatus.setSequence( conn , ns , officialSequenceNumber ); - return true; - } + LOG(1) << " setShardVersion failed!\n" << result << endl; - LOG(1) << " setShardVersion failed!\n" << result << endl; + if (result["need_authoritative"].trueValue()) + massert(10428, "need_authoritative set but in authoritative mode already", !authoritative); - if ( result["need_authoritative"].trueValue() ) - massert( 10428 , "need_authoritative set but in authoritative mode already" , ! authoritative ); + if (!authoritative) { + // use the original connection and get a fresh versionable connection + // since conn can be invalidated (or worse, freed) after the failure + checkShardVersion(conn_in, ns, refManager, 1, tryNumber + 1); + return true; + } - if ( ! authoritative ) { - // use the original connection and get a fresh versionable connection - // since conn can be invalidated (or worse, freed) after the failure - checkShardVersion(conn_in, ns, refManager, 1, tryNumber + 1); - return true; - } - - if ( result["reloadConfig"].trueValue() ) { - if( result["version"].timestampTime() == Date_t() ){ - - warning() << "reloading full configuration for " << conf->name() - << ", connection state indicates significant version changes"; - - // reload db - conf->reload(); - } - else { - // reload config - conf->getChunkManager( ns , true ); - } - } + if (result["reloadConfig"].trueValue()) { + if (result["version"].timestampTime() == Date_t()) { + warning() << "reloading full configuration for " << conf->name() + << ", connection state indicates significant version changes"; - const int maxNumTries = 7; - if ( tryNumber < maxNumTries ) { - LOG( tryNumber < ( maxNumTries / 2 ) ? 1 : 0 ) - << "going to retry checkShardVersion shard: " << shard->toString() << " " << result; - sleepmillis( 10 * tryNumber ); - // use the original connection and get a fresh versionable connection - // since conn can be invalidated (or worse, freed) after the failure - checkShardVersion(conn_in, ns, refManager, true, tryNumber + 1); - return true; + // reload db + conf->reload(); + } else { + // reload config + conf->getChunkManager(ns, true); } - - string errmsg = str::stream() << "setShardVersion failed shard: " << shard->toString() - << " " << result; - log() << " " << errmsg << endl; - massert( 10429 , errmsg , 0 ); - return true; } - bool VersionManager::checkShardVersionCB( DBClientBase* conn_in , const string& ns , bool authoritative , int tryNumber ) { - return checkShardVersion( conn_in, ns, ChunkManagerPtr(), authoritative, tryNumber ); + const int maxNumTries = 7; + if (tryNumber < maxNumTries) { + LOG(tryNumber < (maxNumTries / 2) ? 1 : 0) + << "going to retry checkShardVersion shard: " << shard->toString() << " " << result; + sleepmillis(10 * tryNumber); + // use the original connection and get a fresh versionable connection + // since conn can be invalidated (or worse, freed) after the failure + checkShardVersion(conn_in, ns, refManager, true, tryNumber + 1); + return true; } - bool VersionManager::checkShardVersionCB( ShardConnection* conn_in , bool authoritative , int tryNumber ) { - return checkShardVersion( conn_in->get(), conn_in->getNS(), conn_in->getManager(), authoritative, tryNumber ); - } + string errmsg = str::stream() << "setShardVersion failed shard: " << shard->toString() << " " + << result; + log() << " " << errmsg << endl; + massert(10429, errmsg, 0); + return true; +} + +bool VersionManager::checkShardVersionCB(DBClientBase* conn_in, + const string& ns, + bool authoritative, + int tryNumber) { + return checkShardVersion(conn_in, ns, ChunkManagerPtr(), authoritative, tryNumber); +} + +bool VersionManager::checkShardVersionCB(ShardConnection* conn_in, + bool authoritative, + int tryNumber) { + return checkShardVersion( + conn_in->get(), conn_in->getNS(), conn_in->getManager(), authoritative, tryNumber); +} } // namespace mongo |