/**
* Copyright (C) 2012 10gen Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects
* for all of the code used other than as permitted herein. If you modify
* file(s) with this exception, you may extend this exception to your
* version of the file(s), but you are not obligated to do so. If you do not
* wish to do so, delete this exception statement from your version. If you
* delete this exception statement from all source files in the program,
* then also delete it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
#include "mongo/platform/basic.h"
#include "mongo/db/s/metadata_loader.h"
#include
#include "mongo/client/dbclientinterface.h"
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/repl/replication_coordinator_impl.h"
#include "mongo/db/s/collection_metadata.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/rpc/unique_message.h"
#include "mongo/s/catalog/sharding_catalog_client.h"
#include "mongo/s/catalog/type_chunk.h"
#include "mongo/s/catalog/type_collection.h"
#include "mongo/s/chunk_diff.h"
#include "mongo/s/chunk_version.h"
#include "mongo/s/write_ops/batched_command_request.h"
#include "mongo/util/log.h"
namespace mongo {
using std::make_pair;
using std::map;
using std::pair;
using std::string;
namespace {
/**
* This is an adapter so we can use config diffs - mongos and mongod do them slightly
* differently.
*
* The mongod adapter here tracks only a single shard, and stores ranges by (min, max).
*/
class SCMConfigDiffTracker : public ConfigDiffTracker {
public:
SCMConfigDiffTracker(const std::string& ns,
RangeMap* currMap,
ChunkVersion* maxVersion,
MaxChunkVersionMap* maxShardVersions,
const ShardId& currShard)
: ConfigDiffTracker(ns, currMap, maxVersion, maxShardVersions),
_currShard(currShard) {}
virtual bool isTracked(const ChunkType& chunk) const {
return chunk.getShard() == _currShard;
}
virtual pair rangeFor(OperationContext* txn,
const ChunkType& chunk) const {
return make_pair(chunk.getMin(), CachedChunkInfo(chunk.getMax(), chunk.getVersion()));
}
virtual ShardId shardFor(OperationContext* txn, const ShardId& name) const {
return name;
}
virtual string nameFrom(const string& shard) const {
return shard;
}
private:
const ShardId _currShard;
};
} // namespace
Status MetadataLoader::makeCollectionMetadata(OperationContext* txn,
ShardingCatalogClient* catalogClient,
const string& ns,
const string& shard,
const CollectionMetadata* oldMetadata,
CollectionMetadata* metadata) {
Status initCollectionStatus = _initCollection(txn, catalogClient, ns, shard, metadata);
if (!initCollectionStatus.isOK()) {
return initCollectionStatus;
}
return _initChunks(txn, catalogClient, ns, shard, oldMetadata, metadata);
}
Status MetadataLoader::_initCollection(OperationContext* txn,
ShardingCatalogClient* catalogClient,
const string& ns,
const string& shard,
CollectionMetadata* metadata) {
// Get the config.collections entry for 'ns'.
auto coll = catalogClient->getCollection(txn, ns);
if (!coll.isOK()) {
return coll.getStatus();
}
// Check that the collection hasn't been dropped: passing this check does not mean the
// collection hasn't been dropped and recreated.
const auto& collInfo = coll.getValue().value;
if (collInfo.getDropped()) {
return {ErrorCodes::NamespaceNotFound,
str::stream() << "Could not load metadata because collection " << ns
<< " was dropped"};
}
metadata->_keyPattern = collInfo.getKeyPattern().toBSON();
metadata->fillKeyPatternFields();
metadata->_shardVersion = ChunkVersion(0, 0, collInfo.getEpoch());
metadata->_collVersion = ChunkVersion(0, 0, collInfo.getEpoch());
return Status::OK();
}
Status MetadataLoader::_initChunks(OperationContext* txn,
ShardingCatalogClient* catalogClient,
const string& ns,
const string& shard,
const CollectionMetadata* oldMetadata,
CollectionMetadata* metadata) {
const OID epoch = metadata->getCollVersion().epoch();
SCMConfigDiffTracker::MaxChunkVersionMap versionMap;
versionMap[shard] = metadata->_shardVersion;
bool fullReload = true;
// Check to see if we should use the old version or not.
if (oldMetadata) {
// If our epochs are compatible, it's useful to use the old metadata for diffs: this leads
// to a performance gain because not all the chunks must be reloaded, just the ones this
// shard has not seen -- they will have higher versions than present in oldMetadata.
if (oldMetadata->getCollVersion().hasEqualEpoch(epoch)) {
fullReload = false;
invariant(oldMetadata->isValid());
versionMap[shard] = oldMetadata->_shardVersion;
metadata->_collVersion = oldMetadata->_collVersion;
// TODO: This could be made more efficient if copying not required, but
// not as frequently reloaded as in mongos.
metadata->_chunksMap = oldMetadata->_chunksMap;
LOG(2) << "loading new chunks for collection " << ns
<< " using old metadata w/ version " << oldMetadata->getShardVersion() << " and "
<< metadata->_chunksMap.size() << " chunks";
} else {
log() << "reloading collection metadata for " << ns << " with new epoch "
<< epoch.toString() << ", the current epoch is "
<< oldMetadata->getCollVersion().epoch().toString();
}
}
// Exposes the new metadata's range map and version to the "differ" which would ultimately be
// responsible for filling them up
SCMConfigDiffTracker differ(
ns, &metadata->_chunksMap, &metadata->_collVersion, &versionMap, shard);
try {
const auto diffQuery = SCMConfigDiffTracker::createConfigDiffQuery(NamespaceString(ns),
metadata->_collVersion);
std::vector chunks;
Status status = catalogClient->getChunks(txn,
diffQuery.query,
diffQuery.sort,
boost::none,
&chunks,
nullptr,
repl::ReadConcernLevel::kMajorityReadConcern);
if (!status.isOK()) {
return status;
}
// If we are the primary, or a standalone, persist new chunks locally.
status = _writeNewChunksIfPrimary(
txn, NamespaceString(ns), chunks, metadata->_collVersion.epoch());
if (!status.isOK()) {
return status;
}
//
// The diff tracker should always find at least one chunk (the highest chunk we saw
// last time). If not, something has changed on the config server (potentially between
// when we read the collection data and when we read the chunks data).
//
int diffsApplied = differ.calculateConfigDiff(txn, chunks);
if (diffsApplied > 0) {
// Chunks found, return ok
LOG(2) << "loaded " << diffsApplied << " chunks into new metadata for " << ns
<< " with version " << metadata->_collVersion;
// If the last chunk was moved off of this shard, the shardVersion should be reset to
// zero (if we did not conduct a full reload and oldMetadata was present,
// versionMap[shard] was previously set to the oldMetadata's shardVersion for
// performance gains).
if (!fullReload && metadata->_chunksMap.empty()) {
versionMap[shard] = ChunkVersion(0, 0, epoch);
}
metadata->_shardVersion = versionMap[shard];
metadata->fillRanges();
invariant(metadata->isValid());
return Status::OK();
} else if (diffsApplied == 0) {
// No chunks found, the collection is dropping or we're confused
// If this is a full reload, assume it is a drop for backwards compatibility
// TODO: drop the config.collections entry *before* the chunks and eliminate this
// ambiguity
return {fullReload ? ErrorCodes::NamespaceNotFound : ErrorCodes::RemoteChangeDetected,
str::stream() << "No chunks found when reloading " << ns
<< ", previous version was "
<< metadata->_collVersion.toString()
<< (fullReload ? ", this is a drop" : "")};
} else {
// Invalid chunks found, our epoch may have changed because we dropped/recreated the
// collection
return {ErrorCodes::RemoteChangeDetected,
str::stream() << "Invalid chunks found when reloading " << ns
<< ", previous version was "
<< metadata->_collVersion.toString()
<< ", this should be rare"};
}
} catch (const DBException& ex) {
return ex.toStatus();
}
}
Status MetadataLoader::_writeNewChunksIfPrimary(OperationContext* txn,
const NamespaceString& nss,
const std::vector& chunks,
const OID& currEpoch) {
NamespaceString chunkMetadataNss(ChunkType::ConfigNS + "." + nss.ns());
// Only do the write(s) if this is a primary or standalone. Otherwise, return OK.
if (serverGlobalParams.clusterRole != ClusterRole::ShardServer ||
!repl::ReplicationCoordinator::get(txn)->canAcceptWritesForDatabase_UNSAFE(
txn, chunkMetadataNss.ns())) {
return Status::OK();
}
try {
DBDirectClient client(txn);
/**
* Here are examples of the operations that can happen on the config server to update
* the config.chunks collection. 'chunks' only includes the chunks that result from the
* operations, which can be read from the config server, not any that were removed, so
* we must delete any chunks that overlap with the new 'chunks'.
*
* CollectionVersion = 10.3
*
* moveChunk
* {_id: 3, max: 5, version: 10.1} --> {_id: 3, max: 5, version: 11.0}
*
* splitChunk
* {_id: 3, max: 9, version 10.3} --> {_id: 3, max: 5, version 10.4}
* {_id: 5, max: 8, version 10.5}
* {_id: 8, max: 9, version 10.6}
*
* mergeChunk
* {_id: 10, max: 14, version 4.3} --> {_id: 10, max: 22, version 10.4}
* {_id: 14, max: 19, version 7.1}
* {_id: 19, max: 22, version 2.0}
*
*/
for (auto& chunk : chunks) {
// Check for a different epoch.
if (!chunk.getVersion().hasEqualEpoch(currEpoch)) {
// This means the collection was dropped and recreated. Drop the chunk metadata
// and return.
rpc::UniqueReply commandResponse =
client.runCommandWithMetadata(chunkMetadataNss.db().toString(),
"drop",
rpc::makeEmptyMetadata(),
BSON("drop" << chunkMetadataNss.coll()));
Status status = getStatusFromCommandResult(commandResponse->getCommandReply());
// A NamespaceNotFound error is okay because it's possible that we find a new epoch
// twice in a row before ever inserting documents.
if (!status.isOK() && status.code() != ErrorCodes::NamespaceNotFound) {
return status;
}
return Status{ErrorCodes::RemoteChangeDetected,
str::stream() << "Invalid chunks found when reloading '"
<< nss.toString()
<< "'. Previous collection epoch was '"
<< currEpoch.toString()
<< "', but unexpectedly found a new epoch '"
<< chunk.getVersion().epoch().toString()
<< "'. Collection was dropped and recreated."};
}
// Delete any overlapping chunk ranges. Overlapping chunks will have a min value
// ("_id") between (chunk.min, chunk.max].
//
// query: { "_id" : {"$gte": chunk.min, "$lt": chunk.max}}
auto deleteDocs(stdx::make_unique());
deleteDocs->setQuery(BSON(ChunkType::minShardID << BSON(
"$gte" << chunk.getMin() << "$lt" << chunk.getMax())));
deleteDocs->setLimit(0);
auto deleteRequest(stdx::make_unique());
deleteRequest->addToDeletes(deleteDocs.release());
BatchedCommandRequest batchedDeleteRequest(deleteRequest.release());
batchedDeleteRequest.setNS(chunkMetadataNss);
const BSONObj deleteCmdObj = batchedDeleteRequest.toBSON();
rpc::UniqueReply deleteCommandResponse =
client.runCommandWithMetadata(chunkMetadataNss.db().toString(),
deleteCmdObj.firstElementFieldName(),
rpc::makeEmptyMetadata(),
deleteCmdObj);
auto deleteStatus =
getStatusFromCommandResult(deleteCommandResponse->getCommandReply());
if (!deleteStatus.isOK()) {
return deleteStatus;
}
// Now the document can be expected to cleanly insert without overlap.
auto insert(stdx::make_unique());
insert->addToDocuments(chunk.toShardBSON());
BatchedCommandRequest insertRequest(insert.release());
insertRequest.setNS(chunkMetadataNss);
const BSONObj insertCmdObj = insertRequest.toBSON();
rpc::UniqueReply commandResponse =
client.runCommandWithMetadata(chunkMetadataNss.db().toString(),
insertCmdObj.firstElementFieldName(),
rpc::makeEmptyMetadata(),
insertCmdObj);
auto insertStatus = getStatusFromCommandResult(commandResponse->getCommandReply());
if (!insertStatus.isOK()) {
return insertStatus;
}
}
return Status::OK();
} catch (const DBException& ex) {
return ex.toStatus();
}
}
} // namespace mongo