/** * Copyright (C) 2013 10gen Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects * for all of the code used other than as permitted herein. If you modify * file(s) with this exception, you may extend this exception to your * version of the file(s), but you are not obligated to do so. If you do not * wish to do so, delete this exception statement from your version. If you * delete this exception statement from all source files in the program, * then also delete it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding #include "mongo/platform/basic.h" #include #include "mongo/client/connpool.h" #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" #include "mongo/s/catalog/catalog_manager.h" #include "mongo/s/chunk.h" #include "mongo/s/config.h" #include "mongo/s/d_state.h" #include "mongo/s/catalog/dist_lock_manager.h" #include "mongo/s/grid.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" namespace mongo { using std::endl; using std::string; using mongoutils::str::stream; static Status runApplyOpsCmd(const std::vector&, const ChunkVersion&, const ChunkVersion&); static BSONObj buildMergeLogEntry(const std::vector&, const ChunkVersion&, const ChunkVersion&); static bool isEmptyChunk(const ChunkType&); bool mergeChunks(OperationContext* txn, const NamespaceString& nss, const BSONObj& minKey, const BSONObj& maxKey, const OID& epoch, string* errMsg) { // // Get sharding state up-to-date // ConnectionString configLoc = ConnectionString::parse(shardingState.getConfigServer(), *errMsg); if (!configLoc.isValid()) { warning() << *errMsg << endl; return false; } // // Get the distributed lock // string whyMessage = stream() << "merging chunks in " << nss.ns() << " from " << minKey << " to " << maxKey; auto scopedDistLock = grid.catalogManager()->getDistLockManager()->lock(nss.ns(), whyMessage); if (!scopedDistLock.isOK()) { *errMsg = stream() << "could not acquire collection lock for " << nss.ns() << " to merge chunks in [" << minKey << "," << maxKey << ")" << causedBy(scopedDistLock.getStatus()); warning() << *errMsg << endl; return false; } // // We now have the collection lock, refresh metadata to latest version and sanity check // ChunkVersion shardVersion; Status status = shardingState.refreshMetadataNow(txn, nss.ns(), &shardVersion); if (!status.isOK()) { *errMsg = str::stream() << "could not merge chunks, failed to refresh metadata for " << nss.ns() << causedBy(status.reason()); warning() << *errMsg << endl; return false; } if (epoch.isSet() && shardVersion.epoch() != epoch) { *errMsg = stream() << "could not merge chunks, collection " << nss.ns() << " has changed" << " since merge was sent" << "(sent epoch : " << epoch.toString() << ", current epoch : " << shardVersion.epoch().toString() << ")"; warning() << *errMsg << endl; return false; } CollectionMetadataPtr metadata = shardingState.getCollectionMetadata(nss.ns()); if (!metadata || metadata->getKeyPattern().isEmpty()) { *errMsg = stream() << "could not merge chunks, collection " << nss.ns() << " is not sharded"; warning() << *errMsg << endl; return false; } dassert(metadata->getShardVersion().equals(shardVersion)); if (!metadata->isValidKey(minKey) || !metadata->isValidKey(maxKey)) { *errMsg = stream() << "could not merge chunks, the range " << rangeToString(minKey, maxKey) << " is not valid" << " for collection " << nss.ns() << " with key pattern " << metadata->getKeyPattern(); warning() << *errMsg << endl; return false; } // // Get merged chunk information // ChunkVersion mergeVersion = metadata->getCollVersion(); mergeVersion.incMinor(); std::vector chunksToMerge; ChunkType itChunk; itChunk.setMin(minKey); itChunk.setMax(minKey); itChunk.setNS(nss.ns()); itChunk.setShard(shardingState.getShardName()); while (itChunk.getMax().woCompare(maxKey) < 0 && metadata->getNextChunk(itChunk.getMax(), &itChunk)) { chunksToMerge.push_back(itChunk); } if (chunksToMerge.empty()) { *errMsg = stream() << "could not merge chunks, collection " << nss.ns() << " range starting at " << minKey << " and ending at " << maxKey << " does not belong to shard " << shardingState.getShardName(); warning() << *errMsg << endl; return false; } // // Validate the range starts and ends at chunks and has no holes, error if not valid // BSONObj firstDocMin = chunksToMerge.front().getMin(); BSONObj firstDocMax = chunksToMerge.front().getMax(); // minKey is inclusive bool minKeyInRange = rangeContains(firstDocMin, firstDocMax, minKey); if (!minKeyInRange) { *errMsg = stream() << "could not merge chunks, collection " << nss.ns() << " range starting at " << minKey << " does not belong to shard " << shardingState.getShardName(); warning() << *errMsg << endl; return false; } BSONObj lastDocMin = chunksToMerge.back().getMin(); BSONObj lastDocMax = chunksToMerge.back().getMax(); // maxKey is exclusive bool maxKeyInRange = lastDocMin.woCompare(maxKey) < 0 && lastDocMax.woCompare(maxKey) >= 0; if (!maxKeyInRange) { *errMsg = stream() << "could not merge chunks, collection " << nss.ns() << " range ending at " << maxKey << " does not belong to shard " << shardingState.getShardName(); warning() << *errMsg << endl; return false; } bool validRangeStartKey = firstDocMin.woCompare(minKey) == 0; bool validRangeEndKey = lastDocMax.woCompare(maxKey) == 0; if (!validRangeStartKey || !validRangeEndKey) { *errMsg = stream() << "could not merge chunks, collection " << nss.ns() << " does not contain a chunk " << (!validRangeStartKey ? "starting at " + minKey.toString() : "") << (!validRangeStartKey && !validRangeEndKey ? " or " : "") << (!validRangeEndKey ? "ending at " + maxKey.toString() : ""); warning() << *errMsg << endl; return false; } if (chunksToMerge.size() == 1) { *errMsg = stream() << "could not merge chunks, collection " << nss.ns() << " already contains chunk for " << rangeToString(minKey, maxKey); warning() << *errMsg << endl; return false; } // Look for hole in range for (size_t i = 1; i < chunksToMerge.size(); ++i) { if (chunksToMerge[i - 1].getMax().woCompare(chunksToMerge[i].getMin()) != 0) { *errMsg = stream() << "could not merge chunks, collection " << nss.ns() << " has a hole in the range " << rangeToString(minKey, maxKey) << " at " << rangeToString(chunksToMerge[i - 1].getMax(), chunksToMerge[i].getMin()); warning() << *errMsg << endl; return false; } } // // Run apply ops command // Status applyOpsStatus = runApplyOpsCmd(chunksToMerge, shardVersion, mergeVersion); if (!applyOpsStatus.isOK()) { warning() << applyOpsStatus; return false; } // // Install merged chunk metadata // { ScopedTransaction transaction(txn, MODE_IX); Lock::DBLock writeLk(txn->lockState(), nss.db(), MODE_IX); Lock::CollectionLock collLock(txn->lockState(), nss.ns(), MODE_X); shardingState.mergeChunks(txn, nss.ns(), minKey, maxKey, mergeVersion); } // // Log change // BSONObj mergeLogEntry = buildMergeLogEntry(chunksToMerge, shardVersion, mergeVersion); grid.catalogManager()->logChange(txn, "merge", nss.ns(), mergeLogEntry); return true; } // // Utilities for building BSONObjs for applyOps and change logging // BSONObj buildMergeLogEntry(const std::vector& chunksToMerge, const ChunkVersion& currShardVersion, const ChunkVersion& newMergedVersion) { BSONObjBuilder logDetailB; BSONArrayBuilder mergedB(logDetailB.subarrayStart("merged")); for (const ChunkType& chunkToMerge : chunksToMerge) { mergedB.append(chunkToMerge.toBSON()); } mergedB.done(); currShardVersion.addToBSON(logDetailB, "prevShardVersion"); newMergedVersion.addToBSON(logDetailB, "mergedVersion"); return logDetailB.obj(); } BSONObj buildOpMergeChunk(const ChunkType& mergedChunk) { BSONObjBuilder opB; // Op basics opB.append("op", "u"); opB.appendBool("b", false); // no upserting opB.append("ns", ChunkType::ConfigNS); // New object opB.append("o", mergedChunk.toBSON()); // Query object opB.append("o2", BSON(ChunkType::name(mergedChunk.getName()))); return opB.obj(); } BSONObj buildOpRemoveChunk(const ChunkType& chunkToRemove) { BSONObjBuilder opB; // Op basics opB.append("op", "d"); // delete opB.append("ns", ChunkType::ConfigNS); opB.append("o", BSON(ChunkType::name(chunkToRemove.getName()))); return opB.obj(); } BSONArray buildOpPrecond(const string& ns, const string& shardName, const ChunkVersion& shardVersion) { BSONArrayBuilder preCond; BSONObjBuilder condB; condB.append("ns", ChunkType::ConfigNS); condB.append("q", BSON("query" << BSON(ChunkType::ns(ns)) << "orderby" << BSON(ChunkType::DEPRECATED_lastmod() << -1))); { BSONObjBuilder resB(condB.subobjStart("res")); shardVersion.addToBSON(resB, ChunkType::DEPRECATED_lastmod()); resB.done(); } preCond.append(condB.obj()); return preCond.arr(); } Status runApplyOpsCmd(const std::vector& chunksToMerge, const ChunkVersion& currShardVersion, const ChunkVersion& newMergedVersion) { BSONArrayBuilder updatesB; // The chunk we'll be "expanding" is the first chunk const ChunkType& firstChunk = chunksToMerge.front(); // Fill in details not tracked by metadata ChunkType mergedChunk(firstChunk); mergedChunk.setName(Chunk::genID(firstChunk.getNS(), firstChunk.getMin())); mergedChunk.setMax(chunksToMerge.back().getMax()); mergedChunk.setVersion(newMergedVersion); updatesB.append(buildOpMergeChunk(mergedChunk)); // Don't remove chunk we're expanding for (size_t i = 1; i < chunksToMerge.size(); ++i) { ChunkType chunkToMerge(chunksToMerge[i]); chunkToMerge.setName(Chunk::genID(chunkToMerge.getNS(), chunkToMerge.getMin())); updatesB.append(buildOpRemoveChunk(chunkToMerge)); } BSONArray preCond = buildOpPrecond(firstChunk.getNS(), firstChunk.getShard(), currShardVersion); return grid.catalogManager()->applyChunkOpsDeprecated(updatesB.arr(), preCond); } }