/**
* Copyright (C) 2016 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects
* for all of the code used other than as permitted herein. If you modify
* file(s) with this exception, you may extend this exception to your
* version of the file(s), but you are not obligated to do so. If you do not
* wish to do so, delete this exception statement from your version. If you
* delete this exception statement from all source files in the program,
* then also delete it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
#include "mongo/platform/basic.h"
#include
#include
#include "mongo/db/auth/action_type.h"
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/auth/privilege.h"
#include "mongo/db/commands.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/dbhelpers.h"
#include "mongo/db/index/index_descriptor.h"
#include "mongo/db/query/internal_plans.h"
#include "mongo/db/s/collection_metadata.h"
#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/s/operation_sharding_state.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/s/catalog/type_chunk.h"
#include "mongo/s/chunk_version.h"
#include "mongo/s/grid.h"
#include "mongo/s/stale_exception.h"
#include "mongo/util/log.h"
#include "mongo/util/mongoutils/str.h"
namespace mongo {
using std::string;
using std::unique_ptr;
using std::vector;
namespace {
/**
* Append min, max and version information from chunk to the buffer.
*/
void appendShortVersion(BufBuilder& b, const ChunkType& chunk) {
BSONObjBuilder bb(b);
bb.append(ChunkType::min(), chunk.getMin());
bb.append(ChunkType::max(), chunk.getMax());
if (chunk.isVersionSet())
chunk.getVersion().addToBSON(bb, ChunkType::DEPRECATED_lastmod());
bb.done();
}
bool checkIfSingleDoc(OperationContext* txn,
Collection* collection,
const IndexDescriptor* idx,
const ChunkType* chunk) {
KeyPattern kp(idx->keyPattern());
BSONObj newmin = Helpers::toKeyFormat(kp.extendRangeBound(chunk->getMin(), false));
BSONObj newmax = Helpers::toKeyFormat(kp.extendRangeBound(chunk->getMax(), true));
unique_ptr exec(InternalPlanner::indexScan(txn,
collection,
idx,
newmin,
newmax,
false, // endKeyInclusive
PlanExecutor::YIELD_MANUAL));
// check if exactly one document found
PlanExecutor::ExecState state;
BSONObj obj;
if (PlanExecutor::ADVANCED == (state = exec->getNext(&obj, NULL))) {
if (PlanExecutor::IS_EOF == (state = exec->getNext(&obj, NULL))) {
return true;
}
}
// Non-yielding collection scans from InternalPlanner will never error.
invariant(PlanExecutor::ADVANCED == state || PlanExecutor::IS_EOF == state);
return false;
}
class SplitChunkCommand : public Command {
public:
SplitChunkCommand() : Command("splitChunk") {}
void help(std::stringstream& help) const override {
help << "internal command usage only\n"
"example:\n"
" { splitChunk:\"db.foo\" , keyPattern: {a:1} , min : {a:100} , max: {a:200} { "
"splitKeys : [ {a:150} , ... ]}";
}
virtual bool supportsWriteConcern(const BSONObj& cmd) const override {
return false;
}
bool slaveOk() const override {
return false;
}
bool adminOnly() const override {
return true;
}
Status checkAuthForCommand(Client* client,
const std::string& dbname,
const BSONObj& cmdObj) override {
if (!AuthorizationSession::get(client)->isAuthorizedForActionsOnResource(
ResourcePattern::forClusterResource(), ActionType::internal)) {
return Status(ErrorCodes::Unauthorized, "Unauthorized");
}
return Status::OK();
}
std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const override {
return parseNsFullyQualified(dbname, cmdObj);
}
bool run(OperationContext* txn,
const std::string& dbname,
BSONObj& cmdObj,
int options,
std::string& errmsg,
BSONObjBuilder& result) override {
//
// 1. check whether parameters passed to splitChunk are sound
//
const NamespaceString nss = NamespaceString(parseNs(dbname, cmdObj));
if (!nss.isValid()) {
errmsg = str::stream() << "invalid namespace '" << nss.toString()
<< "' specified for command";
return false;
}
const BSONObj keyPattern = cmdObj["keyPattern"].Obj();
if (keyPattern.isEmpty()) {
errmsg = "need to specify the key pattern the collection is sharded over";
return false;
}
const BSONObj min = cmdObj["min"].Obj();
if (min.isEmpty()) {
errmsg = "need to specify the min key for the chunk";
return false;
}
const BSONObj max = cmdObj["max"].Obj();
if (max.isEmpty()) {
errmsg = "need to specify the max key for the chunk";
return false;
}
const string shardName = cmdObj["from"].str();
if (shardName.empty()) {
errmsg = "need specify server to split chunk at";
return false;
}
const BSONObj splitKeysElem = cmdObj["splitKeys"].Obj();
if (splitKeysElem.isEmpty()) {
errmsg = "need to provide the split points to chunk over";
return false;
}
vector splitKeys;
BSONObjIterator it(splitKeysElem);
while (it.more()) {
splitKeys.push_back(it.next().Obj().getOwned());
}
//
// Get sharding state up-to-date
//
ShardingState* const shardingState = ShardingState::get(txn);
// This could be the first call that enables sharding - make sure we initialize the
// sharding state for this shard.
if (!shardingState->enabled()) {
if (cmdObj["configdb"].type() != String) {
errmsg = "sharding not enabled";
warning() << errmsg;
return false;
}
const string configdb = cmdObj["configdb"].String();
shardingState->initializeFromConfigConnString(txn, configdb);
}
// Initialize our current shard name in the shard state if needed
shardingState->setShardName(shardName);
log() << "received splitChunk request: " << cmdObj;
//
// 2. lock the collection's metadata and get highest version for the current shard
//
const string whyMessage(str::stream() << "splitting chunk [" << min << ", " << max
<< ") in "
<< nss.toString());
auto scopedDistLock = grid.catalogClient(txn)->distLock(
txn, nss.ns(), whyMessage, DistLockManager::kSingleLockAttemptTimeout);
if (!scopedDistLock.isOK()) {
errmsg = str::stream() << "could not acquire collection lock for " << nss.toString()
<< " to split chunk [" << min << "," << max << ")"
<< causedBy(scopedDistLock.getStatus());
warning() << errmsg;
return false;
}
// Always check our version remotely
ChunkVersion shardVersion;
Status refreshStatus = shardingState->refreshMetadataNow(txn, nss.ns(), &shardVersion);
if (!refreshStatus.isOK()) {
errmsg = str::stream() << "splitChunk cannot split chunk "
<< "[" << min << "," << max << ")"
<< causedBy(refreshStatus.reason());
warning() << errmsg;
return false;
}
if (shardVersion.majorVersion() == 0) {
// It makes no sense to split if our version is zero and we have no chunks
errmsg = str::stream() << "splitChunk cannot split chunk "
<< "[" << min << "," << max << ")"
<< " with zero shard version";
warning() << errmsg;
return false;
}
const auto& oss = OperationShardingState::get(txn);
uassert(ErrorCodes::InvalidOptions, "collection version is missing", oss.hasShardVersion());
// Even though the splitChunk command transmits a value in the operation's shardVersion
// field, this value does not actually contain the shard version, but the global collection
// version.
ChunkVersion expectedCollectionVersion = oss.getShardVersion(nss);
if (expectedCollectionVersion.epoch() != shardVersion.epoch()) {
std::string msg = str::stream() << "splitChunk cannot split chunk "
<< "[" << min << "," << max << "), "
<< "collection may have been dropped. "
<< "current epoch: " << shardVersion.epoch()
<< ", cmd epoch: " << expectedCollectionVersion.epoch();
warning() << msg;
throw SendStaleConfigException(
nss.toString(), msg, expectedCollectionVersion, shardVersion);
}
ScopedCollectionMetadata collMetadata;
{
AutoGetCollection autoColl(txn, nss, MODE_IS);
// Get collection metadata
collMetadata = CollectionShardingState::get(txn, nss.ns())->getMetadata();
}
// With nonzero shard version, we must have metadata
invariant(collMetadata);
ChunkVersion collVersion = collMetadata->getCollVersion();
// With nonzero shard version, we must have a coll version >= our shard version
invariant(collVersion >= shardVersion);
ChunkType origChunk;
if (!collMetadata->getNextChunk(min, &origChunk) || origChunk.getMin().woCompare(min) ||
origChunk.getMax().woCompare(max)) {
// Our boundaries are different from those passed in
std::string msg = str::stream() << "splitChunk cannot find chunk "
<< "[" << min << "," << max << ")"
<< " to split, the chunk boundaries may be stale";
warning() << msg;
throw SendStaleConfigException(
nss.toString(), msg, expectedCollectionVersion, shardVersion);
}
log() << "splitChunk accepted at version " << shardVersion;
//
// 3. create the batch of updates to metadata ( the new chunks ) to be applied via
// 'applyOps' command
//
BSONObjBuilder logDetail;
appendShortVersion(logDetail.subobjStart("before"), origChunk);
LOG(1) << "before split on " << origChunk;
OwnedPointerVector newChunks;
ChunkVersion nextChunkVersion = collVersion;
BSONObj startKey = min;
splitKeys.push_back(max); // makes it easier to have 'max' in the next loop. remove later.
BSONArrayBuilder updates;
for (vector::const_iterator it = splitKeys.begin(); it != splitKeys.end(); ++it) {
BSONObj endKey = *it;
if (endKey.woCompare(startKey) == 0) {
errmsg = str::stream() << "split on the lower bound of chunk "
<< "[" << min << ", " << max << ")"
<< " is not allowed";
warning() << errmsg;
return false;
}
// Make sure splits don't create too-big shard keys
Status status = ShardKeyPattern::checkShardKeySize(endKey);
if (!status.isOK()) {
errmsg = status.reason();
warning() << errmsg;
return false;
}
// splits only update the 'minor' portion of version
nextChunkVersion.incMinor();
// build an update operation against the chunks collection of the config database with
// upsert true
BSONObjBuilder op;
op.append("op", "u");
op.appendBool("b", true);
op.append("ns", ChunkType::ConfigNS);
// add the modified (new) chunk information as the update object
BSONObjBuilder n(op.subobjStart("o"));
n.append(ChunkType::name(), ChunkType::genID(nss.ns(), startKey));
nextChunkVersion.addToBSON(n, ChunkType::DEPRECATED_lastmod());
n.append(ChunkType::ns(), nss.ns());
n.append(ChunkType::min(), startKey);
n.append(ChunkType::max(), endKey);
n.append(ChunkType::shard(), shardName);
n.done();
// add the chunk's _id as the query part of the update statement
BSONObjBuilder q(op.subobjStart("o2"));
q.append(ChunkType::name(), ChunkType::genID(nss.ns(), startKey));
q.done();
updates.append(op.obj());
// remember this chunk info for logging later
unique_ptr chunk(new ChunkType());
chunk->setMin(startKey);
chunk->setMax(endKey);
chunk->setVersion(nextChunkVersion);
newChunks.push_back(chunk.release());
startKey = endKey;
}
splitKeys.pop_back(); // 'max' was used as sentinel
BSONArrayBuilder preCond;
{
BSONObjBuilder b;
b.append("ns", ChunkType::ConfigNS);
b.append("q",
BSON("query" << BSON(ChunkType::ns(nss.ns())) << "orderby"
<< BSON(ChunkType::DEPRECATED_lastmod() << -1)));
{
BSONObjBuilder bb(b.subobjStart("res"));
// TODO: For backwards compatibility, we can't yet require an epoch here
bb.appendTimestamp(ChunkType::DEPRECATED_lastmod(), collVersion.toLong());
bb.done();
}
preCond.append(b.obj());
}
// NOTE: The newShardVersion resulting from this split is higher than any other chunk
// version, so it's also implicitly the newCollVersion
ChunkVersion newShardVersion = collVersion;
// Increment the minor version once, splitChunk increments once per split point
// (resulting in the correct final shard/collection version)
//
// TODO: Revisit this interface, it's a bit clunky
newShardVersion.incMinor();
// Ensure that the newly applied chunks would result in a correct metadata state
auto metadataAfterSplit =
uassertStatusOK(collMetadata->cloneSplit(min, max, splitKeys, newShardVersion));
//
// 4. apply the batch of updates to remote and local metadata
//
Status applyOpsStatus = grid.catalogClient(txn)->applyChunkOpsDeprecated(
txn, updates.arr(), preCond.arr(), nss.ns(), nextChunkVersion);
if (!applyOpsStatus.isOK()) {
return appendCommandStatus(result, applyOpsStatus);
}
//
// Install chunk metadata with knowledge about newly split chunks in this shard's state
//
{
ScopedTransaction scopedXact(txn, MODE_IX);
AutoGetCollection autoColl(txn, nss, MODE_IX, MODE_X);
auto css = CollectionShardingState::get(txn, nss);
css->refreshMetadata(txn, std::move(metadataAfterSplit));
}
//
// 5. logChanges
//
// single splits are logged different than multisplits
if (newChunks.size() == 2) {
appendShortVersion(logDetail.subobjStart("left"), *newChunks[0]);
appendShortVersion(logDetail.subobjStart("right"), *newChunks[1]);
grid.catalogClient(txn)->logChange(txn, "split", nss.ns(), logDetail.obj());
} else {
BSONObj beforeDetailObj = logDetail.obj();
BSONObj firstDetailObj = beforeDetailObj.getOwned();
const int newChunksSize = newChunks.size();
for (int i = 0; i < newChunksSize; i++) {
BSONObjBuilder chunkDetail;
chunkDetail.appendElements(beforeDetailObj);
chunkDetail.append("number", i + 1);
chunkDetail.append("of", newChunksSize);
appendShortVersion(chunkDetail.subobjStart("chunk"), *newChunks[i]);
grid.catalogClient(txn)->logChange(txn, "multi-split", nss.ns(), chunkDetail.obj());
}
}
dassert(newChunks.size() > 1);
{
// Select chunk to move out for "top chunk optimization".
KeyPattern shardKeyPattern(collMetadata->getKeyPattern());
AutoGetCollection autoColl(txn, nss, MODE_IS);
Collection* const collection = autoColl.getCollection();
if (!collection) {
warning() << "will not perform top-chunk checking since " << nss.toString()
<< " does not exist after splitting";
return true;
}
// Allow multiKey based on the invariant that shard keys must be
// single-valued. Therefore, any multi-key index prefixed by shard
// key cannot be multikey over the shard key fields.
IndexDescriptor* idx =
collection->getIndexCatalog()->findShardKeyPrefixedIndex(txn, keyPattern, false);
if (idx == NULL) {
return true;
}
const ChunkType* backChunk = newChunks.vector().back();
const ChunkType* frontChunk = newChunks.vector().front();
if (shardKeyPattern.globalMax().woCompare(backChunk->getMax()) == 0 &&
checkIfSingleDoc(txn, collection, idx, backChunk)) {
result.append("shouldMigrate",
BSON("min" << backChunk->getMin() << "max" << backChunk->getMax()));
} else if (shardKeyPattern.globalMin().woCompare(frontChunk->getMin()) == 0 &&
checkIfSingleDoc(txn, collection, idx, frontChunk)) {
result.append("shouldMigrate",
BSON("min" << frontChunk->getMin() << "max" << frontChunk->getMax()));
}
}
return true;
}
} cmdSplitChunk;
} // namespace
} // namespace mongo