/**
* Copyright (C) 2015 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects
* for all of the code used other than as permitted herein. If you modify
* file(s) with this exception, you may extend this exception to your
* version of the file(s), but you are not obligated to do so. If you do not
* wish to do so, delete this exception statement from your version. If you
* delete this exception statement from all source files in the program,
* then also delete it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
#include "mongo/platform/basic.h"
#include "mongo/db/s/migration_source_manager.h"
#include
#include
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/dbhelpers.h"
#include "mongo/db/exec/plan_stage.h"
#include "mongo/db/index/index_descriptor.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/query/internal_plans.h"
#include "mongo/db/record_id.h"
#include "mongo/logger/ramlog.h"
#include "mongo/s/chunk.h"
#include "mongo/s/d_state.h"
#include "mongo/s/shard_key_pattern.h"
#include "mongo/util/elapsed_tracker.h"
#include "mongo/util/log.h"
namespace mongo {
using std::list;
using std::string;
using std::unique_ptr;
namespace {
Tee* migrateLog = RamLog::get("migrate");
/**
* Used to receive invalidation notifications.
*
* XXX: move to the exec/ directory.
*/
class DeleteNotificationStage final : public PlanStage {
public:
DeleteNotificationStage(MigrationSourceManager* migrationSourceManager)
: PlanStage("NOTIFY_DELETE", nullptr), _migrationSourceManager(migrationSourceManager) {}
void doInvalidate(OperationContext* txn, const RecordId& dl, InvalidationType type) override {
if (type == INVALIDATION_DELETION) {
_migrationSourceManager->aboutToDelete(dl);
}
}
StageState work(WorkingSetID* out) override {
MONGO_UNREACHABLE;
}
bool isEOF() final {
MONGO_UNREACHABLE;
}
unique_ptr getStats() final {
MONGO_UNREACHABLE;
}
SpecificStats* getSpecificStats() const final {
MONGO_UNREACHABLE;
}
StageType stageType() const final {
return STAGE_NOTIFY_DELETE;
}
private:
MigrationSourceManager* const _migrationSourceManager;
};
bool isInRange(const BSONObj& obj,
const BSONObj& min,
const BSONObj& max,
const BSONObj& shardKeyPattern) {
ShardKeyPattern shardKey(shardKeyPattern);
BSONObj k = shardKey.extractShardKeyFromDoc(obj);
return k.woCompare(min) >= 0 && k.woCompare(max) < 0;
}
} // namespace
/**
* Used to commit work for LogOpForSharding. Used to keep track of changes in documents that are
* part of a chunk being migrated.
*/
class LogOpForShardingHandler final : public RecoveryUnit::Change {
public:
/**
* Invariant: idObj should belong to a document that is part of the active chunk being migrated
*/
LogOpForShardingHandler(MigrationSourceManager* migrateSourceManager,
const BSONObj& idObj,
const char op)
: _migrationSourceManager(migrateSourceManager), _idObj(idObj.getOwned()), _op(op) {}
void commit() override {
switch (_op) {
case 'd': {
stdx::lock_guard sl(_migrationSourceManager->_mutex);
_migrationSourceManager->_deleted.push_back(_idObj);
_migrationSourceManager->_memoryUsed += _idObj.firstElement().size() + 5;
break;
}
case 'i':
case 'u': {
stdx::lock_guard sl(_migrationSourceManager->_mutex);
_migrationSourceManager->_reload.push_back(_idObj);
_migrationSourceManager->_memoryUsed += _idObj.firstElement().size() + 5;
break;
}
default:
MONGO_UNREACHABLE;
}
}
void rollback() override {}
private:
MigrationSourceManager* const _migrationSourceManager;
const BSONObj _idObj;
const char _op;
};
MigrationSourceManager::MigrationSourceManager() = default;
MigrationSourceManager::~MigrationSourceManager() = default;
bool MigrationSourceManager::start(OperationContext* txn,
const std::string& ns,
const BSONObj& min,
const BSONObj& max,
const BSONObj& shardKeyPattern) {
invariant(!min.isEmpty());
invariant(!max.isEmpty());
invariant(!ns.empty());
// Get global shared to synchronize with logOp. Also see comments in the class
// members declaration for more details.
Lock::GlobalRead globalShared(txn->lockState());
stdx::lock_guard lk(_mutex);
if (_active) {
return false;
}
_nss = NamespaceString(ns);
_min = min;
_max = max;
_shardKeyPattern = shardKeyPattern;
invariant(_deleted.size() == 0);
invariant(_reload.size() == 0);
invariant(_memoryUsed == 0);
_active = true;
stdx::lock_guard tLock(_cloneLocsMutex);
invariant(_cloneLocs.size() == 0);
return true;
}
void MigrationSourceManager::done(OperationContext* txn) {
log() << "MigrateFromStatus::done About to acquire global lock to exit critical section";
// Get global shared to synchronize with logOp. Also see comments in the class
// members declaration for more details.
Lock::GlobalRead globalShared(txn->lockState());
stdx::lock_guard lk(_mutex);
_active = false;
_deleteNotifyExec.reset(NULL);
_inCriticalSection = false;
_inCriticalSectionCV.notify_all();
_deleted.clear();
_reload.clear();
_memoryUsed = 0;
stdx::lock_guard cloneLock(_cloneLocsMutex);
_cloneLocs.clear();
}
void MigrationSourceManager::logOp(OperationContext* txn,
const char* opstr,
const char* ns,
const BSONObj& obj,
BSONObj* patt,
bool notInActiveChunk) {
ensureShardVersionOKOrThrow(txn, ns);
const char op = opstr[0];
if (notInActiveChunk) {
// Ignore writes that came from the migration process like cleanup so they
// won't be transferred to the recipient shard. Also ignore ops from
// _migrateClone and _transferMods since it is impossible to move a chunk
// to self.
return;
}
dassert(txn->lockState()->isWriteLocked()); // Must have Global IX.
if (!_active)
return;
if (_nss != ns)
return;
// no need to log if this is not an insertion, an update, or an actual deletion
// note: opstr 'db' isn't a deletion but a mention that a database exists
// (for replication machinery mostly).
if (op == 'n' || op == 'c' || (op == 'd' && opstr[1] == 'b'))
return;
BSONElement ide;
if (patt)
ide = patt->getField("_id");
else
ide = obj["_id"];
if (ide.eoo()) {
warning() << "logOpForSharding got mod with no _id, ignoring obj: " << obj << migrateLog;
return;
}
if (op == 'i' && (!isInRange(obj, _min, _max, _shardKeyPattern))) {
return;
}
BSONObj idObj(ide.wrap());
if (op == 'u') {
BSONObj fullDoc;
OldClientContext ctx(txn, _nss.ns(), false);
if (!Helpers::findById(txn, ctx.db(), _nss.ns().c_str(), idObj, fullDoc)) {
warning() << "logOpForSharding couldn't find: " << idObj << " even though should have"
<< migrateLog;
dassert(false); // TODO: Abort the migration.
return;
}
if (!isInRange(fullDoc, _min, _max, _shardKeyPattern)) {
return;
}
}
// Note: can't check if delete is in active chunk since the document is gone!
txn->recoveryUnit()->registerChange(new LogOpForShardingHandler(this, idObj, op));
}
bool MigrationSourceManager::transferMods(OperationContext* txn,
string& errmsg,
BSONObjBuilder& b) {
long long size = 0;
{
AutoGetCollectionForRead ctx(txn, _getNS());
stdx::lock_guard sl(_mutex);
if (!_active) {
errmsg = "no active migration!";
return false;
}
// TODO: fix SERVER-16540 race
_xfer(txn, _nss.ns(), ctx.getDb(), &_deleted, b, "deleted", size, false);
_xfer(txn, _nss.ns(), ctx.getDb(), &_reload, b, "reload", size, true);
}
b.append("size", size);
return true;
}
bool MigrationSourceManager::storeCurrentLocs(OperationContext* txn,
long long maxChunkSize,
string& errmsg,
BSONObjBuilder& result) {
AutoGetCollection autoColl(txn, _getNS(), MODE_IS);
Collection* collection = autoColl.getCollection();
if (!collection) {
errmsg = "ns not found, should be impossible";
return false;
}
// Allow multiKey based on the invariant that shard keys must be single-valued. Therefore, any
// multi-key index prefixed by shard key cannot be multikey over the shard key fields.
IndexDescriptor* idx =
collection->getIndexCatalog()->findShardKeyPrefixedIndex(txn,
_shardKeyPattern,
false); // requireSingleKey
if (idx == NULL) {
errmsg = str::stream() << "can't find index with prefix " << _shardKeyPattern
<< " in storeCurrentLocs for " << _nss.toString();
return false;
}
// Assume both min and max non-empty, append MinKey's to make them fit chosen index
BSONObj min;
BSONObj max;
KeyPattern kp(idx->keyPattern());
{
// It's alright not to lock _mutex all the way through based on the assumption that this is
// only called by the main thread that drives the migration and only it can start and stop
// the current migration.
stdx::lock_guard sl(_mutex);
invariant(_deleteNotifyExec.get() == NULL);
unique_ptr ws = stdx::make_unique();
unique_ptr dns = stdx::make_unique(this);
// Takes ownership of 'ws' and 'dns'.
auto statusWithPlanExecutor = PlanExecutor::make(
txn, std::move(ws), std::move(dns), collection, PlanExecutor::YIELD_MANUAL);
invariant(statusWithPlanExecutor.isOK());
_deleteNotifyExec = std::move(statusWithPlanExecutor.getValue());
_deleteNotifyExec->registerExec();
min = Helpers::toKeyFormat(kp.extendRangeBound(_min, false));
max = Helpers::toKeyFormat(kp.extendRangeBound(_max, false));
}
unique_ptr exec(InternalPlanner::indexScan(txn,
collection,
idx,
min,
max,
false, // endKeyInclusive
PlanExecutor::YIELD_MANUAL));
// We can afford to yield here because any change to the base data that we might miss is already
// being queued and will migrate in the 'transferMods' stage.
exec->setYieldPolicy(PlanExecutor::YIELD_AUTO);
// Use the average object size to estimate how many objects a full chunk would carry do that
// while traversing the chunk's range using the sharding index, below there's a fair amount of
// slack before we determine a chunk is too large because object sizes will vary.
unsigned long long maxRecsWhenFull;
long long avgRecSize;
const long long totalRecs = collection->numRecords(txn);
if (totalRecs > 0) {
avgRecSize = collection->dataSize(txn) / totalRecs;
maxRecsWhenFull = maxChunkSize / avgRecSize;
maxRecsWhenFull = std::min((unsigned long long)(Chunk::MaxObjectPerChunk + 1),
130 * maxRecsWhenFull / 100 /* slack */);
} else {
avgRecSize = 0;
maxRecsWhenFull = Chunk::MaxObjectPerChunk + 1;
}
// Do a full traversal of the chunk and don't stop even if we think it is a large chunk we want
// the number of records to better report, in that case
bool isLargeChunk = false;
unsigned long long recCount = 0;
RecordId recordId;
while (PlanExecutor::ADVANCED == exec->getNext(NULL, &recordId)) {
if (!isLargeChunk) {
stdx::lock_guard lk(_cloneLocsMutex);
_cloneLocs.insert(recordId);
}
if (++recCount > maxRecsWhenFull) {
isLargeChunk = true;
// Continue on despite knowing that it will fail, just to get the correct value for
// recCount
}
}
exec.reset();
if (isLargeChunk) {
stdx::lock_guard sl(_mutex);
warning() << "cannot move chunk: the maximum number of documents for a chunk is "
<< maxRecsWhenFull << " , the maximum chunk size is " << maxChunkSize
<< " , average document size is " << avgRecSize << ". Found " << recCount
<< " documents in chunk "
<< " ns: " << _nss << " " << _min << " -> " << _max << migrateLog;
result.appendBool("chunkTooBig", true);
result.appendNumber("estimatedChunkSize", (long long)(recCount * avgRecSize));
errmsg = "chunk too big to move";
return false;
}
log() << "moveChunk number of documents: " << cloneLocsRemaining() << migrateLog;
txn->recoveryUnit()->abandonSnapshot();
return true;
}
bool MigrationSourceManager::clone(OperationContext* txn, string& errmsg, BSONObjBuilder& result) {
ElapsedTracker tracker(internalQueryExecYieldIterations, internalQueryExecYieldPeriodMS);
int allocSize = 0;
{
AutoGetCollection autoColl(txn, _getNS(), MODE_IS);
stdx::lock_guard sl(_mutex);
if (!_active) {
errmsg = "not active";
return false;
}
Collection* collection = autoColl.getCollection();
if (!collection) {
errmsg = str::stream() << "collection " << _nss.toString() << " does not exist";
return false;
}
allocSize = std::min(
BSONObjMaxUserSize,
static_cast((12 + collection->averageObjectSize(txn)) * cloneLocsRemaining()));
}
bool isBufferFilled = false;
BSONArrayBuilder clonedDocsArrayBuilder(allocSize);
while (!isBufferFilled) {
AutoGetCollection autoColl(txn, _getNS(), MODE_IS);
stdx::lock_guard sl(_mutex);
if (!_active) {
errmsg = "not active";
return false;
}
// TODO: fix SERVER-16540 race
Collection* collection = autoColl.getCollection();
if (!collection) {
errmsg = str::stream() << "collection " << _nss.toString() << " does not exist";
return false;
}
stdx::lock_guard lk(_cloneLocsMutex);
std::set::iterator cloneLocsIter = _cloneLocs.begin();
for (; cloneLocsIter != _cloneLocs.end(); ++cloneLocsIter) {
if (tracker.intervalHasElapsed()) // should I yield?
break;
RecordId recordId = *cloneLocsIter;
Snapshotted doc;
if (!collection->findDoc(txn, recordId, &doc)) {
// doc was deleted
continue;
}
// Use the builder size instead of accumulating 'doc's size so that we take
// into consideration the overhead of BSONArray indices, and *always*
// append one doc.
if (clonedDocsArrayBuilder.arrSize() != 0 &&
(clonedDocsArrayBuilder.len() + doc.value().objsize() + 1024) >
BSONObjMaxUserSize) {
isBufferFilled = true; // break out of outer while loop
break;
}
clonedDocsArrayBuilder.append(doc.value());
}
_cloneLocs.erase(_cloneLocs.begin(), cloneLocsIter);
// Note: must be holding _cloneLocsMutex, don't move this inside while condition!
if (_cloneLocs.empty()) {
break;
}
}
result.appendArray("objects", clonedDocsArrayBuilder.arr());
return true;
}
void MigrationSourceManager::aboutToDelete(const RecordId& dl) {
// Even though above we call findDoc to check for existance that check only works for non-mmapv1
// engines, and this is needed for mmapv1.
stdx::lock_guard lk(_cloneLocsMutex);
_cloneLocs.erase(dl);
}
std::size_t MigrationSourceManager::cloneLocsRemaining() const {
stdx::lock_guard lk(_cloneLocsMutex);
return _cloneLocs.size();
}
long long MigrationSourceManager::mbUsed() const {
stdx::lock_guard lk(_mutex);
return _memoryUsed / (1024 * 1024);
}
bool MigrationSourceManager::getInCriticalSection() const {
stdx::lock_guard lk(_mutex);
return _inCriticalSection;
}
void MigrationSourceManager::setInCriticalSection(bool inCritSec) {
stdx::lock_guard lk(_mutex);
_inCriticalSection = inCritSec;
_inCriticalSectionCV.notify_all();
}
bool MigrationSourceManager::waitTillNotInCriticalSection(int maxSecondsToWait) {
const auto deadline = stdx::chrono::system_clock::now() + Seconds(maxSecondsToWait);
stdx::unique_lock lk(_mutex);
while (_inCriticalSection) {
log() << "Waiting for " << maxSecondsToWait
<< " seconds for the migration critical section to end";
if (stdx::cv_status::timeout == _inCriticalSectionCV.wait_until(lk, deadline)) {
return false;
}
}
return true;
}
bool MigrationSourceManager::isActive() const {
stdx::lock_guard lk(_mutex);
return _active;
}
void MigrationSourceManager::_xfer(OperationContext* txn,
const string& ns,
Database* db,
std::list* docIdList,
BSONObjBuilder& builder,
const char* fieldName,
long long& size,
bool explode) {
const long long maxSize = 1024 * 1024;
if (docIdList->size() == 0 || size > maxSize) {
return;
}
BSONArrayBuilder arr(builder.subarrayStart(fieldName));
list::iterator docIdIter = docIdList->begin();
while (docIdIter != docIdList->end() && size < maxSize) {
BSONObj idDoc = *docIdIter;
if (explode) {
BSONObj fullDoc;
if (Helpers::findById(txn, db, ns.c_str(), idDoc, fullDoc)) {
arr.append(fullDoc);
size += fullDoc.objsize();
}
} else {
arr.append(idDoc);
size += idDoc.objsize();
}
docIdIter = docIdList->erase(docIdIter);
}
arr.done();
}
NamespaceString MigrationSourceManager::_getNS() const {
stdx::lock_guard lk(_mutex);
return _nss;
}
} // namespace mongo