// @file version_manager.cpp
/**
* Copyright (C) 2010 10gen Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects
* for all of the code used other than as permitted herein. If you modify
* file(s) with this exception, you may extend this exception to your
* version of the file(s), but you are not obligated to do so. If you do not
* wish to do so, delete this exception statement from your version. If you
* delete this exception statement from all source files in the program,
* then also delete it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
#include "mongo/platform/basic.h"
#include "mongo/s/version_manager.h"
#include "mongo/s/chunk.h"
#include "mongo/s/chunk_version.h"
#include "mongo/s/config.h"
#include "mongo/s/grid.h"
#include "mongo/s/shard.h"
#include "mongo/s/stale_exception.h" // for SendStaleConfigException
#include "mongo/util/log.h"
namespace mongo {
using std::endl;
using std::map;
using std::string;
// Global version manager
VersionManager versionManager;
/**
* Tracking information, per-connection, of the latest chunk manager iteration or sequence
* number that was used to send a shard version over this connection.
* When the chunk manager is replaced, implying new versions were loaded, the chunk manager
* sequence number is iterated by 1 and connections need to re-send shard versions.
*/
struct ConnectionShardStatus {
ConnectionShardStatus() : _mutex("ConnectionShardStatus") {}
bool hasAnySequenceSet(DBClientBase* conn) {
scoped_lock lk(_mutex);
SequenceMap::const_iterator seenConnIt = _map.find(conn->getConnectionId());
return seenConnIt != _map.end() && seenConnIt->second.size() > 0;
}
bool getSequence(DBClientBase* conn, const string& ns, unsigned long long* sequence) {
scoped_lock lk(_mutex);
SequenceMap::const_iterator seenConnIt = _map.find(conn->getConnectionId());
if (seenConnIt == _map.end())
return false;
map::const_iterator seenNSIt = seenConnIt->second.find(ns);
if (seenNSIt == seenConnIt->second.end())
return false;
*sequence = seenNSIt->second;
return true;
}
void setSequence(DBClientBase* conn, const string& ns, const unsigned long long& s) {
scoped_lock lk(_mutex);
_map[conn->getConnectionId()][ns] = s;
}
void reset(DBClientBase* conn) {
scoped_lock lk(_mutex);
_map.erase(conn->getConnectionId());
}
// protects _map
mongo::mutex _mutex;
// a map from a connection into ChunkManager's sequence number for each namespace
typedef map> SequenceMap;
SequenceMap _map;
} connectionShardStatus;
void VersionManager::resetShardVersionCB(DBClientBase* conn) {
connectionShardStatus.reset(conn);
}
bool VersionManager::isVersionableCB(DBClientBase* conn) {
return conn->type() == ConnectionString::MASTER || conn->type() == ConnectionString::SET;
}
DBClientBase* getVersionable(DBClientBase* conn) {
switch (conn->type()) {
case ConnectionString::INVALID:
massert(15904,
str::stream() << "cannot set version on invalid connection "
<< conn->toString(),
false);
return NULL;
case ConnectionString::MASTER:
return conn;
case ConnectionString::PAIR:
massert(15905,
str::stream() << "cannot set version or shard on pair connection "
<< conn->toString(),
false);
return NULL;
case ConnectionString::SYNC:
massert(15906,
str::stream() << "cannot set version or shard on sync connection "
<< conn->toString(),
false);
return NULL;
case ConnectionString::CUSTOM:
massert(16334,
str::stream() << "cannot set version or shard on custom connection "
<< conn->toString(),
false);
return NULL;
case ConnectionString::SET:
DBClientReplicaSet* set = (DBClientReplicaSet*)conn;
return &(set->masterConn());
}
verify(false);
return NULL;
}
bool VersionManager::forceRemoteCheckShardVersionCB(const string& ns) {
DBConfigPtr conf = grid.getDBConfig(ns);
if (!conf)
return false;
conf->reload();
// If we don't have a collection, don't refresh the chunk manager
if (nsGetCollection(ns).size() == 0)
return false;
ChunkManagerPtr manager = conf->getChunkManagerIfExists(ns, true, true);
if (!manager)
return false;
return true;
}
/**
* Special internal logic to run reduced version handshake for empty namespace operations to
* shards.
*
* Eventually this should go completely away, but for now many commands rely on unversioned but
* mongos-specific behavior on mongod (auditing and replication information in commands)
*/
static bool initShardVersionEmptyNS(DBClientBase* conn_in) {
bool ok;
BSONObj result;
DBClientBase* conn = NULL;
try {
// May throw if replica set primary is down
conn = getVersionable(conn_in);
dassert(conn); // errors thrown above
// Check to see if we've already initialized this connection
if (connectionShardStatus.hasAnySequenceSet(conn))
return false;
// Check to see if this is actually a shard and not a single config server
// NOTE: Config servers are registered only by the name "config" in the shard cache, not
// by host, so lookup by host will fail unless the host is also a shard.
Shard shard = Shard::findIfExists(conn->getServerAddress());
if (!shard.ok())
return false;
LOG(1) << "initializing shard connection to " << shard.toString() << endl;
ok = setShardVersion(*conn, "", ChunkVersion(), ChunkManagerPtr(), true, result);
} catch (const DBException&) {
// NOTE: Replica sets may fail to initShardVersion because future calls relying on
// correct versioning must later call checkShardVersion on the primary.
// Secondary queries and commands may not call checkShardVersion, but secondary ops
// aren't versioned at all.
if (conn_in->type() != ConnectionString::SET) {
throw;
}
// NOTE: Only old-style cluster operations will talk via DBClientReplicaSets - using
// checkShardVersion is required (which includes initShardVersion information) if these
// connections are used.
OCCASIONALLY {
warning() << "failed to initialize new replica set connection version, "
<< "will initialize on first use" << endl;
}
return false;
}
// Record the connection wire version if sent in the response, initShardVersion is a
// handshake for mongos->mongod connections.
if (!result["minWireVersion"].eoo()) {
int minWireVersion = result["minWireVersion"].numberInt();
int maxWireVersion = result["maxWireVersion"].numberInt();
conn->setWireVersions(minWireVersion, maxWireVersion);
}
LOG(3) << "initial sharding result : " << result << endl;
connectionShardStatus.setSequence(conn, "", 0);
return ok;
}
/**
* Updates the remote cached version on the remote shard host (primary, in the case of replica
* sets) if needed with a fully-qualified shard version for the given namespace:
* config server(s) + shard name + shard version
*
* If no remote cached version has ever been set, an initial shard version is sent.
*
* If the namespace is empty and no version has ever been sent, the config server + shard name
* is sent to the remote shard host to initialize the connection as coming from mongos.
* NOTE: This initialization is *best-effort only*. Operations which wish to correctly version
* must send the namespace.
*
* Config servers are special and are not (unless otherwise a shard) kept up to date with this
* protocol. This is safe so long as config servers only contain unversioned collections.
*
* It is an error to call checkShardVersion with an unversionable connection (isVersionableCB).
*
* @return true if we contacted the remote host
*/
bool checkShardVersion(DBClientBase* conn_in,
const string& ns,
ChunkManagerPtr refManager,
bool authoritative,
int tryNumber) {
// TODO: cache, optimize, etc...
// Empty namespaces are special - we require initialization but not versioning
if (ns.size() == 0) {
return initShardVersionEmptyNS(conn_in);
}
DBConfigPtr conf = grid.getDBConfig(ns);
if (!conf)
return false;
DBClientBase* conn = getVersionable(conn_in);
verify(conn); // errors thrown above
unsigned long long officialSequenceNumber = 0;
ShardPtr primary;
ChunkManagerPtr manager;
if (authoritative)
conf->getChunkManagerIfExists(ns, true);
conf->getChunkManagerOrPrimary(ns, manager, primary);
if (manager)
officialSequenceNumber = manager->getSequenceNumber();
// Check this manager against the reference manager
if (manager) {
Shard shard = Shard::make(conn->getServerAddress());
if (refManager && !refManager->compatibleWith(*manager, shard.getName())) {
const ChunkVersion refVersion(refManager->getVersion(shard.getName()));
const ChunkVersion currentVersion(manager->getVersion(shard.getName()));
string msg(str::stream()
<< "manager (" << currentVersion.toString() << " : "
<< manager->getSequenceNumber() << ") "
<< "not compatible with reference manager (" << refVersion.toString()
<< " : " << refManager->getSequenceNumber() << ") "
<< "on shard " << shard.getName() << " (" << shard.getAddress().toString()
<< ")");
throw SendStaleConfigException(ns, msg, refVersion, currentVersion);
}
} else if (refManager) {
Shard shard = Shard::make(conn->getServerAddress());
string msg(str::stream() << "not sharded ("
<< ((manager.get() == 0) ? string("") : str::stream()
<< manager->getSequenceNumber())
<< ") but has reference manager ("
<< refManager->getSequenceNumber() << ") "
<< "on conn " << conn->getServerAddress() << " ("
<< conn_in->getServerAddress() << ")");
throw SendStaleConfigException(
ns, msg, refManager->getVersion(shard.getName()), ChunkVersion::UNSHARDED());
}
// Do not send setShardVersion to collections on the config servers - this causes problems
// when config servers are also shards and get SSV with conflicting names.
// TODO: Make config servers regular shards
if (primary && primary->getName() == "config") {
return false;
}
// Has the ChunkManager been reloaded since the last time we updated the shard version over
// this connection? If we've never updated the shard version, do so now.
unsigned long long sequenceNumber = 0;
if (connectionShardStatus.getSequence(conn, ns, &sequenceNumber)) {
if (sequenceNumber == officialSequenceNumber) {
return false;
}
}
// Now that we're sure we're sending SSV and not to a single config server, get the shard
Shard shard = Shard::make(conn->getServerAddress());
ChunkVersion version = ChunkVersion(0, 0, OID());
if (manager)
version = manager->getVersion(shard.getName());
LOG(1) << "setting shard version of " << version << " for " << ns << " on shard "
<< shard.toString();
LOG(3) << "last version sent with chunk manager iteration " << sequenceNumber
<< ", current chunk manager iteration is " << officialSequenceNumber;
BSONObj result;
if (setShardVersion(*conn, ns, version, manager, authoritative, result)) {
// success!
LOG(1) << " setShardVersion success: " << result << endl;
connectionShardStatus.setSequence(conn, ns, officialSequenceNumber);
return true;
}
LOG(1) << " setShardVersion failed!\n" << result << endl;
if (result["need_authoritative"].trueValue())
massert(10428, "need_authoritative set but in authoritative mode already", !authoritative);
if (!authoritative) {
// use the original connection and get a fresh versionable connection
// since conn can be invalidated (or worse, freed) after the failure
checkShardVersion(conn_in, ns, refManager, 1, tryNumber + 1);
return true;
}
if (result["reloadConfig"].trueValue()) {
if (result["version"].timestampTime() == 0) {
warning() << "reloading full configuration for " << conf->getName()
<< ", connection state indicates significant version changes" << endl;
// reload db
conf->reload();
} else {
// reload config
conf->getChunkManager(ns, true);
}
}
const int maxNumTries = 7;
if (tryNumber < maxNumTries) {
LOG(tryNumber < (maxNumTries / 2) ? 1 : 0)
<< "going to retry checkShardVersion shard: " << shard.toString() << " " << result;
sleepmillis(10 * tryNumber);
// use the original connection and get a fresh versionable connection
// since conn can be invalidated (or worse, freed) after the failure
checkShardVersion(conn_in, ns, refManager, true, tryNumber + 1);
return true;
}
string errmsg = str::stream() << "setShardVersion failed shard: " << shard.toString() << " "
<< result;
log() << " " << errmsg << endl;
massert(10429, errmsg, 0);
return true;
}
bool VersionManager::checkShardVersionCB(DBClientBase* conn_in,
const string& ns,
bool authoritative,
int tryNumber) {
return checkShardVersion(conn_in, ns, ChunkManagerPtr(), authoritative, tryNumber);
}
bool VersionManager::checkShardVersionCB(ShardConnection* conn_in,
bool authoritative,
int tryNumber) {
return checkShardVersion(
conn_in->get(), conn_in->getNS(), conn_in->getManager(), authoritative, tryNumber);
}
} // namespace mongo