/**
* Copyright (C) 2016 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
#include "mongo/platform/basic.h"
#include "mongo/db/s/metadata_manager.h"
#include "mongo/base/string_data.h"
#include "mongo/bson/simple_bsonobj_comparator.h"
#include "mongo/bson/util/builder.h"
#include "mongo/db/bson/dotted_path_support.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/query/internal_plans.h"
#include "mongo/db/range_arithmetic.h"
#include "mongo/db/s/collection_range_deleter.h"
#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
#include "mongo/util/time_support.h"
// MetadataManager maintains pointers to CollectionMetadata objects in a member list named
// _metadata. Each CollectionMetadata contains an immutable _chunksMap of chunks assigned to this
// shard, along with details related to its own lifecycle in a member _tracker.
//
// The current chunk mapping, used by queries starting up, is at _metadata.back(). Each query,
// when it starts up, requests and holds a ScopedCollectionMetadata object, and destroys it on
// termination. Each ScopedCollectionMetadata keeps a shared_ptr to its CollectionMetadata chunk
// mapping, and to the MetadataManager itself. CollectionMetadata mappings also keep a record of
// chunk ranges that may be deleted when it is determined that the range can no longer be in use.
//
// ScopedCollectionMetadata's destructor decrements the CollectionMetadata's usageCounter.
// Whenever a usageCounter drops to zero, we check whether any now-unused CollectionMetadata
// elements can be popped off the front of _metadata. We need to keep the unused elements in the
// middle (as seen below) because they may schedule deletions of chunks depended on by older
// mappings.
//
// New chunk mappings are pushed onto the back of _metadata. Subsequently started queries use the
// new mapping while still-running queries continue using the older "snapshot" mappings. We treat
// _metadata.back()'s usage count differently from the snapshots because it can't reliably be
// compared to zero; a new query may increment it at any time.
//
// (Note that the collection may be dropped or become unsharded, and even get made and sharded
// again, between construction and destruction of a ScopedCollectionMetadata).
//
// MetadataManager also contains a CollectionRangeDeleter _rangesToClean that queues orphan ranges
// being deleted in a background thread, and a mapping _receivingChunks of the ranges being migrated
// in, to avoid deleting them. Each range deletion is paired with a notification object triggered
// when the deletion is completed or abandoned.
//
// ____________________________
// (s): std::shared_ptr<> Clients:| ScopedCollectionMetadata |
// _________________________ +----(s) manager metadata (s)------------------+
// | CollectionShardingState | | |____________________________| | |
// | _metadataManager (s) | +-------(s) manager metadata (s)--------------+ |
// |____________________|____| | |____________________________| | | |
// ____________________v________ +------------(s) manager metadata (s)-----+ | |
// | MetadataManager | | |____________________________| | | |
// | |<--+ | | |
// | | ___________________________ (1 use) | | |
// | getActiveMetadata(): /---------->| CollectionMetadata |<---------+ | |
// | back(): [(s),------/ | | _________________________|_ | |
// | (s),-------------------->| CollectionMetadata | (0 uses) | |
// | _metadata: (s)]------\ | | | _________________________|_ | |
// | \-------------->| CollectionMetadata | | |
// | _receivingChunks | | | | | (2 uses) | |
// | _rangesToClean: | | | | _tracker: |<---------+ |
// | _________________________ | | | | _______________________ |<-----------+
// | | CollectionRangeDeleter | | | | | | Tracker | |
// | | | | | | | | | |
// | | _orphans [range,notif, | | | | | | usageCounter | |
// | | range,notif, | | | | | | orphans [range,notif, | |
// | | ... ] | | | | | | range,notif, | |
// | | | | | | | | ... ] | |
// | |_________________________| | |_| | |_______________________| |
// |_____________________________| | | _chunksMap |
// |_| _chunkVersion |
// | ... |
// |___________________________|
//
// Note that _metadata as shown here has its front() at the bottom, back() at the top. As usual,
// new entries are pushed onto the back, popped off the front.
namespace mongo {
MONGO_FP_DECLARE(suspendRangeDeletion);
using TaskExecutor = executor::TaskExecutor;
using CallbackArgs = TaskExecutor::CallbackArgs;
MetadataManager::MetadataManager(ServiceContext* sc, NamespaceString nss, TaskExecutor* executor)
: _nss(std::move(nss)),
_serviceContext(sc),
_receivingChunks(SimpleBSONObjComparator::kInstance.makeBSONObjIndexedMap()),
_executor(executor),
_rangesToClean() {}
MetadataManager::~MetadataManager() {
stdx::lock_guard scopedLock(_managerLock);
_clearAllCleanups(scopedLock);
auto metadata = std::move(_metadata);
}
void MetadataManager::_clearAllCleanups(WithLock lock) {
_clearAllCleanups(
lock,
{ErrorCodes::InterruptedDueToReplStateChange,
str::stream() << "Range deletions in " << _nss.ns()
<< " abandoned because collection was dropped or became unsharded"});
}
void MetadataManager::_clearAllCleanups(WithLock, Status status) {
for (auto& metadata : _metadata) {
std::ignore = _rangesToClean.add(std::move(metadata->_tracker.orphans));
}
_rangesToClean.clear(status);
}
ScopedCollectionMetadata MetadataManager::getActiveMetadata(std::shared_ptr self) {
stdx::lock_guard scopedLock(_managerLock);
if (!_metadata.empty()) {
return ScopedCollectionMetadata(scopedLock, std::move(self), _metadata.back());
}
return ScopedCollectionMetadata();
}
size_t MetadataManager::numberOfMetadataSnapshots() {
stdx::lock_guard scopedLock(_managerLock);
return _metadata.size() - 1;
}
void MetadataManager::refreshActiveMetadata(std::unique_ptr remoteMetadata) {
stdx::lock_guard scopedLock(_managerLock);
// Collection was never sharded in the first place. This check is necessary in order to avoid
// extraneous logging in the not-a-shard case, because all call sites always try to get the
// collection sharding information regardless of whether the node is sharded or not.
if (!remoteMetadata && _metadata.empty()) {
invariant(_receivingChunks.empty());
invariant(_rangesToClean.isEmpty());
return;
}
// Collection is becoming unsharded
if (!remoteMetadata) {
log() << "Marking collection " << _nss.ns() << " with " << _metadata.back()->toStringBasic()
<< " as no longer sharded";
_receivingChunks.clear();
_clearAllCleanups(scopedLock);
_metadata.clear();
return;
}
// We should never be setting unsharded metadata
invariant(!remoteMetadata->getCollVersion().isWriteCompatibleWith(ChunkVersion::UNSHARDED()));
invariant(!remoteMetadata->getShardVersion().isWriteCompatibleWith(ChunkVersion::UNSHARDED()));
// Collection is becoming sharded
if (_metadata.empty()) {
log() << "Marking collection " << _nss.ns() << " as sharded with "
<< remoteMetadata->toStringBasic();
invariant(_receivingChunks.empty());
invariant(_rangesToClean.isEmpty());
_setActiveMetadata(scopedLock, std::move(remoteMetadata));
return;
}
auto* activeMetadata = _metadata.back().get();
// If the metadata being installed has a different epoch from ours, this means the collection
// was dropped and recreated, so we must entirely reset the metadata state
if (activeMetadata->getCollVersion().epoch() != remoteMetadata->getCollVersion().epoch()) {
log() << "Overwriting metadata for collection " << _nss.ns() << " from "
<< activeMetadata->toStringBasic() << " to " << remoteMetadata->toStringBasic()
<< " due to epoch change";
_receivingChunks.clear();
_setActiveMetadata(scopedLock, std::move(remoteMetadata));
_clearAllCleanups(scopedLock);
return;
}
// We already have newer version
if (activeMetadata->getCollVersion() >= remoteMetadata->getCollVersion()) {
LOG(1) << "Ignoring update of active metadata " << activeMetadata->toStringBasic()
<< " with an older " << remoteMetadata->toStringBasic();
return;
}
log() << "Updating collection metadata for " << _nss.ns() << " from "
<< activeMetadata->toStringBasic() << " to " << remoteMetadata->toStringBasic();
// Resolve any receiving chunks, which might have completed by now.
// Should be no more than one.
for (auto it = _receivingChunks.begin(); it != _receivingChunks.end();) {
BSONObj const& min = it->first;
BSONObj const& max = it->second.getMaxKey();
if (!remoteMetadata->rangeOverlapsChunk(ChunkRange(min, max))) {
++it;
continue;
}
// The remote metadata contains a chunk we were earlier in the process of receiving, so
// we deem it successfully received.
LOG(2) << "Verified chunk " << ChunkRange(min, max) << " for collection " << _nss.ns()
<< " has been migrated to this shard earlier";
_receivingChunks.erase(it);
it = _receivingChunks.begin();
}
_setActiveMetadata(scopedLock, std::move(remoteMetadata));
}
void MetadataManager::_setActiveMetadata(WithLock lock,
std::unique_ptr newMetadata) {
invariant(newMetadata);
_metadata.push_back(std::move(newMetadata));
_retireExpiredMetadata(lock);
}
void MetadataManager::_retireExpiredMetadata(WithLock lock) {
if (_metadata.empty()) {
return; // The collection was dropped, or went unsharded, before the query was cleaned up.
}
for (; _metadata.front()->_tracker.usageCounter == 0; _metadata.pop_front()) {
// No ScopedCollectionMetadata can see _metadata->front(), other than, maybe, the caller.
if (!_metadata.front()->_tracker.orphans.empty()) {
log() << "Queries possibly dependent on " << _nss.ns()
<< " range(s) finished; scheduling ranges for deletion";
// It is safe to push orphan ranges from _metadata.back(), even though new queries might
// start any time, because any request to delete a range it maps is rejected.
_pushListToClean(lock, std::move(_metadata.front()->_tracker.orphans));
}
if (&_metadata.front() == &_metadata.back())
break; // do not pop the active chunk mapping!
}
}
// ScopedCollectionMetadata members
ScopedCollectionMetadata::ScopedCollectionMetadata(WithLock,
std::shared_ptr manager,
std::shared_ptr metadata)
: _metadata(std::move(metadata)), _manager(std::move(manager)) {
invariant(_metadata);
invariant(_manager);
++_metadata->_tracker.usageCounter;
}
BSONObj ScopedCollectionMetadata::extractDocumentKey(BSONObj const& doc) const {
BSONObj key;
if (*this) { // is sharded
auto const& pattern = _metadata->_cm->getShardKeyPattern();
key = dotted_path_support::extractElementsBasedOnTemplate(doc, pattern.toBSON());
if (pattern.hasId()) {
return key;
}
// else, try to append an _id field from the document.
}
if (auto id = doc["_id"_sd]) {
return key.isEmpty() ? id.wrap() : BSONObjBuilder(std::move(key)).append(id).obj();
}
// For legacy documents that lack an _id, use the document itself as its key.
return doc;
}
ScopedCollectionMetadata::~ScopedCollectionMetadata() {
_clear();
}
CollectionMetadata* ScopedCollectionMetadata::operator->() const {
return _metadata ? _metadata.get() : nullptr;
}
CollectionMetadata* ScopedCollectionMetadata::getMetadata() const {
return _metadata ? _metadata.get() : nullptr;
}
void ScopedCollectionMetadata::_clear() {
if (!_manager) {
return;
}
stdx::lock_guard managerLock(_manager->_managerLock);
invariant(_metadata->_tracker.usageCounter != 0);
if (--_metadata->_tracker.usageCounter == 0) {
// MetadataManager doesn't care which usageCounter went to zero. It justs retires all
// that are older than the oldest metadata still in use by queries. (Some start out at
// zero, some go to zero but can't be expired yet.) Note that new instances of
// ScopedCollectionMetadata may get attached to _metadata.back(), so its usage count can
// increase from zero, unlike other reference counts.
_manager->_retireExpiredMetadata(managerLock);
}
_metadata.reset();
_manager.reset();
}
// do not call with MetadataManager locked
ScopedCollectionMetadata::ScopedCollectionMetadata(ScopedCollectionMetadata&& other) {
*this = std::move(other); // Rely on being zero-initialized already.
}
// do not call with MetadataManager locked
ScopedCollectionMetadata& ScopedCollectionMetadata::operator=(ScopedCollectionMetadata&& other) {
if (this != &other) {
_clear();
_metadata = std::move(other._metadata);
_manager = std::move(other._manager);
}
return *this;
}
ScopedCollectionMetadata::operator bool() const {
return _metadata.get();
}
// Remaining MetadataManager members
void MetadataManager::toBSONPending(BSONArrayBuilder& bb) const {
for (auto it = _receivingChunks.begin(); it != _receivingChunks.end(); ++it) {
BSONArrayBuilder pendingBB(bb.subarrayStart());
pendingBB.append(it->first);
pendingBB.append(it->second.getMaxKey());
pendingBB.done();
}
}
void MetadataManager::append(BSONObjBuilder* builder) {
stdx::lock_guard scopedLock(_managerLock);
_rangesToClean.append(builder);
BSONArrayBuilder pcArr(builder->subarrayStart("pendingChunks"));
for (const auto& entry : _receivingChunks) {
BSONObjBuilder obj;
ChunkRange r = ChunkRange(entry.first, entry.second.getMaxKey());
r.append(&obj);
pcArr.append(obj.done());
}
pcArr.done();
if (_metadata.empty()) {
return;
}
BSONArrayBuilder amrArr(builder->subarrayStart("activeMetadataRanges"));
for (const auto& entry : _metadata.back()->getChunks()) {
BSONObjBuilder obj;
ChunkRange r = ChunkRange(entry.first, entry.second.getMaxKey());
r.append(&obj);
amrArr.append(obj.done());
}
amrArr.done();
}
namespace {
/**
* Deletes ranges, in background, until done, normally using a task executor attached to the
* ShardingState.
*
* Each time it completes cleaning up a range, it wakes up clients waiting on completion of
* that range, which may then verify that their range has no more deletions scheduled, and proceed.
*/
void scheduleCleanup(executor::TaskExecutor* executor,
NamespaceString nss,
OID epoch,
Date_t when) {
LOG(1) << "Scheduling cleanup on " << nss.ns() << " at " << when;
std::ignore = executor->scheduleWorkAt(
when, [ executor, nss = std::move(nss), epoch = std::move(epoch) ](auto&) {
MONGO_FAIL_POINT_PAUSE_WHILE_SET(suspendRangeDeletion);
const int maxToDelete = std::max(int(internalQueryExecYieldIterations.load()), 1);
Client::initThreadIfNotAlready("Collection Range Deleter");
auto UniqueOpCtx = Client::getCurrent()->makeOperationContext();
auto opCtx = UniqueOpCtx.get();
auto next = CollectionRangeDeleter::cleanUpNextRange(opCtx, nss, epoch, maxToDelete);
if (next) {
scheduleCleanup(executor, std::move(nss), std::move(epoch), *next);
}
});
// Ignore the result because we don't use the callback, and the only failure is when shutting
// down and there is nothing to do.
}
} // namespace
auto MetadataManager::_pushRangeToClean(WithLock lock, ChunkRange const& range, Date_t when)
-> CleanupNotification {
std::list ranges;
ranges.emplace_back(ChunkRange(range.getMin().getOwned(), range.getMax().getOwned()), when);
auto& notifn = ranges.back().notification;
_pushListToClean(lock, std::move(ranges));
return notifn;
}
void MetadataManager::_pushListToClean(WithLock, std::list ranges) {
auto when = _rangesToClean.add(std::move(ranges));
if (when) {
auto epoch = _metadata.back()->getCollVersion().epoch();
scheduleCleanup(_executor, _nss, std::move(epoch), *when);
}
invariant(ranges.empty());
}
void MetadataManager::_addToReceiving(WithLock, ChunkRange const& range) {
_receivingChunks.insert(
std::make_pair(range.getMin().getOwned(),
CachedChunkInfo(range.getMax().getOwned(), ChunkVersion::IGNORED())));
}
auto MetadataManager::beginReceive(ChunkRange const& range) -> CleanupNotification {
stdx::unique_lock scopedLock(_managerLock);
invariant(!_metadata.empty());
if (_overlapsInUseChunk(scopedLock, range)) {
return Status{ErrorCodes::RangeOverlapConflict,
"Documents in target range may still be in use on the destination shard."};
}
_addToReceiving(scopedLock, range);
log() << "Scheduling deletion of any documents in " << _nss.ns() << " range "
<< redact(range.toString()) << " before migrating in a chunk covering the range";
return _pushRangeToClean(scopedLock, range, Date_t{});
}
void MetadataManager::_removeFromReceiving(WithLock, ChunkRange const& range) {
auto it = _receivingChunks.find(range.getMin());
invariant(it != _receivingChunks.end());
_receivingChunks.erase(it);
}
void MetadataManager::forgetReceive(ChunkRange const& range) {
stdx::lock_guard scopedLock(_managerLock);
invariant(!_metadata.empty());
// This is potentially a partially received chunk, which needs to be cleaned up. We know none
// of these documents are in use, so they can go straight to the deletion queue.
log() << "Abandoning in-migration of " << _nss.ns() << " range " << range
<< "; scheduling deletion of any documents already copied";
invariant(!_overlapsInUseChunk(scopedLock, range));
_removeFromReceiving(scopedLock, range);
_pushRangeToClean(scopedLock, range, Date_t{}).abandon();
}
auto MetadataManager::cleanUpRange(ChunkRange const& range, Date_t whenToDelete)
-> CleanupNotification {
stdx::unique_lock scopedLock(_managerLock);
invariant(!_metadata.empty());
auto* const activeMetadata = _metadata.back().get();
auto* const overlapMetadata = _newestOverlappingMetadata(scopedLock, range);
if (overlapMetadata == activeMetadata) {
return Status{ErrorCodes::RangeOverlapConflict,
str::stream() << "Requested deletion range overlaps a live shard chunk"};
}
if (rangeMapOverlaps(_receivingChunks, range.getMin(), range.getMax())) {
return Status{ErrorCodes::RangeOverlapConflict,
str::stream() << "Requested deletion range overlaps a chunk being"
" migrated in"};
}
if (!overlapMetadata) {
// No running queries can depend on it, so queue it for deletion immediately.
const auto whenStr = (whenToDelete == Date_t{}) ? "immediate"_sd : "deferred"_sd;
log() << "Scheduling " << whenStr << " deletion of " << _nss.ns() << " range "
<< redact(range.toString());
return _pushRangeToClean(scopedLock, range, whenToDelete);
}
log() << "Deletion of " << _nss.ns() << " range " << redact(range.toString())
<< " will be scheduled after all possibly dependent queries finish";
// Put it on the oldest metadata permissible; the current one might live a long time.
auto& orphans = overlapMetadata->_tracker.orphans;
orphans.emplace_back(
Deletion{ChunkRange(range.getMin().getOwned(), range.getMax().getOwned()), whenToDelete});
return orphans.back().notification;
}
auto MetadataManager::overlappingMetadata(std::shared_ptr const& self,
ChunkRange const& range)
-> std::vector {
invariant(!_metadata.empty());
stdx::lock_guard scopedLock(_managerLock);
std::vector result;
result.reserve(_metadata.size());
auto it = _metadata.crbegin(); // start with the current active chunk mapping
if ((*it)->rangeOverlapsChunk(range)) {
// We ignore the refcount of the active mapping; effectively, we assume it is in use.
result.push_back(ScopedCollectionMetadata(scopedLock, self, *it));
}
++it; // step to snapshots
for (auto end = _metadata.crend(); it != end; ++it) {
// We want all the overlapping snapshot mappings still possibly in use by a query.
if ((*it)->_tracker.usageCounter > 0 && (*it)->rangeOverlapsChunk(range)) {
result.push_back(ScopedCollectionMetadata(scopedLock, self, *it));
}
}
return result;
}
size_t MetadataManager::numberOfRangesToCleanStillInUse() {
stdx::lock_guard scopedLock(_managerLock);
size_t count = 0;
for (auto& metadata : _metadata) {
count += metadata->_tracker.orphans.size();
}
return count;
}
size_t MetadataManager::numberOfRangesToClean() {
stdx::unique_lock scopedLock(_managerLock);
return _rangesToClean.size();
}
auto MetadataManager::trackOrphanedDataCleanup(ChunkRange const& range)
-> boost::optional {
stdx::unique_lock scopedLock(_managerLock);
auto overlaps = _overlapsInUseCleanups(scopedLock, range);
if (overlaps) {
return overlaps;
}
return _rangesToClean.overlaps(range);
}
auto MetadataManager::_newestOverlappingMetadata(WithLock, ChunkRange const& range) const
-> CollectionMetadata* {
invariant(!_metadata.empty());
if (_metadata.back()->rangeOverlapsChunk(range)) {
return _metadata.back().get();
}
for (auto it = ++_metadata.rbegin(), et = _metadata.rend(); it != et; ++it) {
if (((*it)->_tracker.usageCounter != 0) && (*it)->rangeOverlapsChunk(range)) {
return it->get();
}
}
return nullptr;
}
bool MetadataManager::_overlapsInUseChunk(WithLock lk, ChunkRange const& range) const {
auto* cm = _newestOverlappingMetadata(lk, range);
return (cm != nullptr);
}
auto MetadataManager::_overlapsInUseCleanups(WithLock, ChunkRange const& range) const
-> boost::optional {
invariant(!_metadata.empty());
for (auto it = _metadata.crbegin(), et = _metadata.crend(); it != et; ++it) {
auto cleanup = (*it)->_tracker.orphans.crbegin();
auto ec = (*it)->_tracker.orphans.crend();
for (; cleanup != ec; ++cleanup) {
if (bool(cleanup->range.overlapWith(range))) {
return cleanup->notification;
}
}
}
return boost::none;
}
boost::optional MetadataManager::getNextOrphanRange(BSONObj const& from) {
stdx::unique_lock scopedLock(_managerLock);
invariant(!_metadata.empty());
return _metadata.back()->getNextOrphanRange(_receivingChunks, from);
}
} // namespace mongo