/**
* Copyright (C) 2012 10gen Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects
* for all of the code used other than as permitted herein. If you modify
* file(s) with this exception, you may extend this exception to your
* version of the file(s), but you are not obligated to do so. If you do not
* wish to do so, delete this exception statement from your version. If you
* delete this exception statement from all source files in the program,
* then also delete it in the license file.
*/
#include "mongo/platform/basic.h"
#include "mongo/s/cluster_client_internal.h"
#include
#include
#include "mongo/client/connpool.h"
#include "mongo/db/field_parser.h"
#include "mongo/db/write_concern.h"
#include "mongo/s/cluster_write.h"
#include "mongo/s/type_changelog.h"
#include "mongo/s/type_mongos.h"
#include "mongo/s/type_shard.h"
#include "mongo/util/log.h"
#include "mongo/util/stringutils.h"
namespace mongo {
MONGO_LOG_DEFAULT_COMPONENT_FILE(::mongo::logger::LogComponent::kSharding);
using std::string;
using std::vector;
using mongoutils::str::stream;
Status checkClusterMongoVersions(const ConnectionString& configLoc,
const string& minMongoVersion)
{
scoped_ptr connPtr;
//
// Find mongos pings in config server
//
try {
connPtr.reset(new ScopedDbConnection(configLoc, 30));
ScopedDbConnection& conn = *connPtr;
scoped_ptr cursor(_safeCursor(conn->query(MongosType::ConfigNS,
Query())));
while (cursor->more()) {
BSONObj pingDoc = cursor->next();
MongosType ping;
string errMsg;
// NOTE: We don't care if the ping is invalid, legacy stuff will be
if (!ping.parseBSON(pingDoc, &errMsg)) {
warning() << "could not parse ping document: " << pingDoc << causedBy(errMsg)
<< endl;
continue;
}
string mongoVersion = "2.0";
// Hack to determine older mongos versions from ping format
if (ping.isWaitingSet()) mongoVersion = "2.2";
if (ping.isMongoVersionSet() && ping.getMongoVersion() != "") {
mongoVersion = ping.getMongoVersion();
}
Date_t lastPing = ping.getPing();
long long quietIntervalMillis = 0;
Date_t currentJsTime = jsTime();
if (currentJsTime >= lastPing) {
quietIntervalMillis = static_cast(currentJsTime - lastPing);
}
long long quietIntervalMins = quietIntervalMillis / (60 * 1000);
// We assume that anything that hasn't pinged in 5 minutes is probably down
if (quietIntervalMins >= 5) {
log() << "stale mongos detected " << quietIntervalMins << " minutes ago,"
<< " network location is " << pingDoc["_id"].String()
<< ", not checking version" << endl;
}
else {
if (versionCmp(mongoVersion, minMongoVersion) < 0) {
return Status(ErrorCodes::RemoteValidationError,
stream() << "version " << mongoVersion
<< " detected on mongos at "
<< ping.getName()
<< ", but version >= " << minMongoVersion
<< " required; you must wait 5 minutes "
<< "after shutting down a pre-" << minMongoVersion
<< " mongos");
}
}
}
}
catch (const DBException& e) {
return e.toStatus("could not read mongos pings collection");
}
//
// Load shards from config server
//
vector servers;
try {
ScopedDbConnection& conn = *connPtr;
scoped_ptr cursor(_safeCursor(conn->query(ShardType::ConfigNS,
Query())));
while (cursor->more()) {
BSONObj shardDoc = cursor->next();
ShardType shard;
string errMsg;
if (!shard.parseBSON(shardDoc, &errMsg) || !shard.isValid(&errMsg)) {
connPtr->done();
return Status(ErrorCodes::UnsupportedFormat,
stream() << "invalid shard " << shardDoc
<< " read from the config server" << causedBy(errMsg));
}
ConnectionString shardLoc = ConnectionString::parse(shard.getHost(), errMsg);
if (shardLoc.type() == ConnectionString::INVALID) {
connPtr->done();
return Status(ErrorCodes::UnsupportedFormat,
stream() << "invalid shard host " << shard.getHost()
<< " read from the config server" << causedBy(errMsg));
}
vector shardServers = shardLoc.getServers();
servers.insert(servers.end(), shardServers.begin(), shardServers.end());
}
}
catch (const DBException& e) {
return e.toStatus("could not read shards collection");
}
connPtr->done();
// Add config servers to list of servers to check version against
vector configServers = configLoc.getServers();
servers.insert(servers.end(), configServers.begin(), configServers.end());
//
// We've now got all the shard info from the config server, start contacting the shards
// and config servers and verifying their versions.
//
for (vector::iterator serverIt = servers.begin();
serverIt != servers.end(); ++serverIt)
{
// Note: This will *always* be a single-host connection
ConnectionString serverLoc(*serverIt);
dassert(serverLoc.type() == ConnectionString::MASTER ||
serverLoc.type() == ConnectionString::CUSTOM); // for dbtests
log() << "checking that version of host " << serverLoc << " is compatible with "
<< minMongoVersion << endl;
scoped_ptr serverConnPtr;
bool resultOk;
BSONObj buildInfo;
try {
serverConnPtr.reset(new ScopedDbConnection(serverLoc, 30));
ScopedDbConnection& serverConn = *serverConnPtr;
resultOk = serverConn->runCommand("admin",
BSON("buildInfo" << 1),
buildInfo);
}
catch (const DBException& e) {
warning() << "could not run buildInfo command on " << serverLoc.toString() << " "
<< causedBy(e) << ". Please ensure that this server is up and at a "
"version >= "
<< minMongoVersion;
continue;
}
// TODO: Make running commands saner such that we can consolidate error handling
if (!resultOk) {
return Status(ErrorCodes::UnknownError,
stream() << DBClientConnection::getLastErrorString(buildInfo)
<< causedBy(buildInfo.toString()));
}
serverConnPtr->done();
verify(buildInfo["version"].type() == String);
string mongoVersion = buildInfo["version"].String();
if (versionCmp(mongoVersion, minMongoVersion) < 0) {
return Status(ErrorCodes::RemoteValidationError,
stream() << "version " << mongoVersion << " detected on mongo "
"server at " << serverLoc.toString() <<
", but version >= " << minMongoVersion << " required");
}
}
return Status::OK();
}
Status _findAllCollections(const ConnectionString& configLoc,
bool optionalEpochs,
OwnedPointerMap* collections)
{
scoped_ptr connPtr;
try {
connPtr.reset(new ScopedDbConnection(configLoc, 30));
ScopedDbConnection& conn = *connPtr;
scoped_ptr cursor(_safeCursor(conn->query(CollectionType::ConfigNS,
Query())));
while (cursor->more()) {
BSONObj collDoc = cursor->nextSafe();
// Replace with unique_ptr (also owned ptr map goes away)
auto_ptr coll(new CollectionType());
string errMsg;
bool parseOk = coll->parseBSON(collDoc, &errMsg);
// Needed for the v3 to v4 upgrade
bool epochNotSet = !coll->isEpochSet() || !coll->getEpoch().isSet();
if (optionalEpochs && epochNotSet) {
// Set our epoch to something here, just to allow
coll->setEpoch(OID::gen());
}
if (!parseOk || !coll->isValid(&errMsg)) {
return Status(ErrorCodes::UnsupportedFormat,
stream() << "invalid collection " << collDoc
<< " read from the config server" << causedBy(errMsg));
}
if (coll->isDroppedSet() && coll->getDropped()) {
continue;
}
if (optionalEpochs && epochNotSet) {
coll->setEpoch(OID());
}
// Get NS before releasing
string ns = coll->getNS();
collections->mutableMap().insert(make_pair(ns, coll.release()));
}
}
catch (const DBException& e) {
return e.toStatus();
}
connPtr->done();
return Status::OK();
}
Status findAllCollections(const ConnectionString& configLoc,
OwnedPointerMap* collections)
{
return _findAllCollections(configLoc, false, collections);
}
Status findAllCollectionsV3(const ConnectionString& configLoc,
OwnedPointerMap* collections)
{
return _findAllCollections(configLoc, true, collections);
}
Status findAllChunks(const ConnectionString& configLoc,
const string& ns,
OwnedPointerVector* chunks)
{
scoped_ptr connPtr;
scoped_ptr cursor;
try {
connPtr.reset(new ScopedDbConnection(configLoc, 30));
ScopedDbConnection& conn = *connPtr;
scoped_ptr cursor(_safeCursor(conn->query(ChunkType::ConfigNS,
BSON(ChunkType::ns(ns)))));
while (cursor->more()) {
BSONObj chunkDoc = cursor->nextSafe();
// TODO: replace with unique_ptr when available
auto_ptr chunk(new ChunkType());
string errMsg;
if (!chunk->parseBSON(chunkDoc, &errMsg) || !chunk->isValid(&errMsg)) {
connPtr->done();
return Status(ErrorCodes::UnsupportedFormat,
stream() << "invalid chunk " << chunkDoc
<< " read from the config server" << causedBy(errMsg));
}
chunks->mutableVector().push_back(chunk.release());
}
}
catch (const DBException& e) {
return e.toStatus();
}
connPtr->done();
return Status::OK();
}
Status logConfigChange(const ConnectionString& configLoc,
const string& clientHost,
const string& ns,
const string& description,
const BSONObj& details)
{
//
// The code for writing to the changelog collection exists elsewhere - we duplicate here to
// avoid dependency issues.
// TODO: Merge again once config.cpp is cleaned up.
//
string changeID = stream() << getHostNameCached() << "-" << terseCurrentTime() << "-"
<< OID::gen();
ChangelogType changelog;
changelog.setChangeID(changeID);
changelog.setServer(getHostNameCached());
changelog.setClientAddr(clientHost == "" ? "N/A" : clientHost);
changelog.setTime(jsTime());
changelog.setWhat(description);
changelog.setNS(ns);
changelog.setDetails(details);
log() << "about to log new metadata event: " << changelog.toBSON() << endl;
scoped_ptr connPtr;
try {
connPtr.reset(new ScopedDbConnection(configLoc, 30));
ScopedDbConnection& conn = *connPtr;
// TODO: better way here
static bool createdCapped = false;
if (!createdCapped) {
try {
conn->createCollection(ChangelogType::ConfigNS, 1024 * 1024 * 10, true);
}
catch (const DBException& e) {
// don't care, someone else may have done this for us
// if there's still a problem, caught in outer try
LOG(1) << "couldn't create the changelog, continuing " << e << endl;
}
createdCapped = true;
}
connPtr->done();
}
catch (const DBException& e) {
// if we got here, it means the config change is only in the log,
// it didn't make it to config.changelog
log() << "not logging config change: " << changeID << causedBy(e) << endl;
return e.toStatus();
}
Status result = clusterInsert( ChangelogType::ConfigNS,
changelog.toBSON(),
WriteConcernOptions::AllConfigs,
NULL );
if ( !result.isOK() ) {
return Status( result.code(), str::stream() << "failed to write to changelog: "
<< result.reason() );
}
return result;
}
// Helper function for safe writes to non-SCC config servers
void _checkGLE(ScopedDbConnection& conn) {
string error = conn->getLastError();
if (error != "") {
conn.done();
// TODO: Make error handling more consistent, throwing and re-catching makes things much
// simpler to manage
uasserted(16624, str::stream() << "operation failed" << causedBy(error));
}
}
// Helper function for safe cursors
DBClientCursor* _safeCursor(auto_ptr cursor) {
// TODO: Make error handling more consistent, it's annoying that cursors error out by
// throwing exceptions *and* being empty
uassert(16625, str::stream() << "cursor not found, transport error", cursor.get());
return cursor.release();
}
}