// @file chunk_diff_impl.hpp
/**
* Copyright (C) 2008 10gen Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects
* for all of the code used other than as permitted herein. If you modify
* file(s) with this exception, you may extend this exception to your
* version of the file(s), but you are not obligated to do so. If you do not
* wish to do so, delete this exception statement from your version. If you
* delete this exception statement from all source files in the program,
* then also delete it in the license file.
*/
#pragma once
#include "mongo/logger/log_severity.h"
#include "mongo/logger/logger.h"
#include "mongo/logger/logstream_builder.h"
#include "mongo/s/chunk_version.h"
#include "mongo/s/type_chunk.h"
#include "mongo/util/concurrency/thread_name.h"
namespace mongo {
template
bool ConfigDiffTracker::isOverlapping(const BSONObj& min, const BSONObj& max) {
RangeOverlap overlap = overlappingRange(min, max);
return overlap.first != overlap.second;
}
template
void ConfigDiffTracker::removeOverlapping(const BSONObj& min,
const BSONObj& max) {
verifyAttached();
RangeOverlap overlap = overlappingRange(min, max);
_currMap->erase(overlap.first, overlap.second);
}
template
typename ConfigDiffTracker::RangeOverlap
ConfigDiffTracker::overlappingRange(const BSONObj& min, const BSONObj& max) {
verifyAttached();
typename RangeMap::iterator low;
typename RangeMap::iterator high;
if (isMinKeyIndexed()) {
// Returns the first chunk with a min key that is >= min - implies the
// previous chunk cannot overlap min
low = _currMap->lower_bound(min);
// Returns the first chunk with a min key that is >= max - implies the
// chunk does not overlap max
high = _currMap->lower_bound(max);
} else {
// Returns the first chunk with a max key that is > min - implies that
// the chunk overlaps min
low = _currMap->upper_bound(min);
// Returns the first chunk with a max key that is > max - implies that
// the next chunk cannot not overlap max
high = _currMap->upper_bound(max);
}
return RangeOverlap(low, high);
}
template
int ConfigDiffTracker::calculateConfigDiff(const std::string& config) {
verifyAttached();
// Get the diff query required
Query diffQuery = configDiffQuery();
ScopedDbConnection conn(config, 30.0);
try {
// Open a cursor for the diff chunks
std::auto_ptr cursor =
conn->query(ChunkType::ConfigNS, diffQuery, 0, 0, 0, 0, (DEBUG_BUILD ? 2 : 1000000));
verify(cursor.get());
int diff = calculateConfigDiff(*cursor.get());
conn.done();
return diff;
} catch (DBException& e) {
// Should only happen on connection errors
e.addContext(str::stream() << "could not calculate config difference for ns " << _ns
<< " on " << config);
throw;
}
}
template
int ConfigDiffTracker::calculateConfigDiff(
DBClientCursorInterface& diffCursor) {
verifyAttached();
// Apply the chunk changes to the ranges and versions
//
// Overall idea here is to work in two steps :
// 1. For all the new chunks we find, increment the maximum version per-shard and
// per-collection, and remove any conflicting chunks from the ranges
// 2. For all the new chunks we're interested in (all of them for mongos, just chunks on the
// shard for mongod) add them to the ranges
//
std::vector newTracked;
// Store epoch now so it doesn't change when we change max
OID currEpoch = _maxVersion->epoch();
_validDiffs = 0;
while (diffCursor.more()) {
BSONObj diffChunkDoc = diffCursor.next();
ChunkVersion chunkVersion =
ChunkVersion::fromBSON(diffChunkDoc, ChunkType::DEPRECATED_lastmod());
if (diffChunkDoc[ChunkType::min()].type() != Object ||
diffChunkDoc[ChunkType::max()].type() != Object ||
diffChunkDoc[ChunkType::shard()].type() != String) {
using namespace logger;
LogstreamBuilder(
globalLogDomain(), getThreadName(), LogSeverity::Warning(), LogComponent::kSharding)
<< "got invalid chunk document " << diffChunkDoc
<< " when trying to load differing chunks" << std::endl;
continue;
}
if (!chunkVersion.isSet() || !chunkVersion.hasEqualEpoch(currEpoch)) {
using namespace logger;
LogstreamBuilder(
globalLogDomain(), getThreadName(), LogSeverity::Warning(), LogComponent::kSharding)
<< "got invalid chunk version " << chunkVersion << " in document " << diffChunkDoc
<< " when trying to load differing chunks at version "
<< ChunkVersion(_maxVersion->majorVersion(), _maxVersion->minorVersion(), currEpoch)
<< std::endl;
// Don't keep loading, since we know we'll be broken here
return -1;
}
_validDiffs++;
// Get max changed version and chunk version
if (chunkVersion > *_maxVersion)
*_maxVersion = chunkVersion;
// Chunk version changes
ShardType shard = shardFor(diffChunkDoc[ChunkType::shard()].String());
typename std::map::iterator shardVersionIt =
_maxShardVersions->find(shard);
if (shardVersionIt == _maxShardVersions->end() || shardVersionIt->second < chunkVersion) {
(*_maxShardVersions)[shard] = chunkVersion;
}
// See if we need to remove any chunks we are currently tracking b/c of this chunk's changes
removeOverlapping(diffChunkDoc[ChunkType::min()].Obj(),
diffChunkDoc[ChunkType::max()].Obj());
// Figure out which of the new chunks we need to track
// Important - we need to actually own this doc, in case the cursor decides to getMore or
// unbuffer
if (isTracked(diffChunkDoc))
newTracked.push_back(diffChunkDoc.getOwned());
}
using namespace logger;
if (globalLogDomain()->shouldLog(LogComponent::kSharding, LogSeverity::Debug(3))) {
LogstreamBuilder(
globalLogDomain(), getThreadName(), LogSeverity::Debug(3), LogComponent::kSharding)
<< "found " << _validDiffs << " new chunks for collection " << _ns << " (tracking "
<< newTracked.size() << "), new version is " << *_maxVersion << std::endl;
}
for (std::vector::iterator it = newTracked.begin(); it != newTracked.end(); it++) {
BSONObj chunkDoc = *it;
// Important - we need to make sure we actually own the min and max here
BSONObj min = chunkDoc[ChunkType::min()].Obj().getOwned();
BSONObj max = chunkDoc[ChunkType::max()].Obj().getOwned();
// Invariant enforced by sharding
// It's possible to read inconsistent state b/c of getMore() and yielding, so we want
// to detect as early as possible.
// TODO: This checks for overlap, we also should check for holes here iff we're tracking
// all chunks
if (isOverlapping(min, max))
return -1;
_currMap->insert(rangeFor(chunkDoc, min, max));
}
return _validDiffs;
}
template
Query ConfigDiffTracker::configDiffQuery() const {
verifyAttached();
//
// Basic idea behind the query is to find all the chunks $gte the current max version.
// Currently, any splits and merges will increment the current max version.
//
BSONObjBuilder queryB;
queryB.append(ChunkType::ns(), _ns);
BSONObjBuilder tsBuilder(queryB.subobjStart(ChunkType::DEPRECATED_lastmod()));
tsBuilder.appendTimestamp("$gte", _maxVersion->toLong());
tsBuilder.done();
BSONObj query = queryB.obj();
//
// NOTE: IT IS IMPORTANT FOR CONSISTENCY THAT WE SORT BY ASC VERSION, TO HANDLE
// CURSOR YIELDING BETWEEN CHUNKS BEING MIGRATED.
//
// This ensures that changes to chunk version (which will always be higher) will always
// come *after* our current position in the chunk cursor.
//
Query queryObj(query);
queryObj.sort(BSON("lastmod" << 1));
using namespace logger;
if (globalLogDomain()->shouldLog(LogComponent::kSharding, LogSeverity::Debug(2))) {
LogstreamBuilder(
globalLogDomain(), getThreadName(), LogSeverity::Debug(2), LogComponent::kSharding)
<< "major version query from " << *_maxVersion << " and over "
<< _maxShardVersions->size() << " shards is " << queryObj << std::endl;
}
return queryObj;
}
} // namespace mongo