/**
* Copyright (C) 2016 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
#include "mongo/platform/basic.h"
#include "mongo/db/s/metadata_manager.h"
#include "mongo/base/string_data.h"
#include "mongo/bson/simple_bsonobj_comparator.h"
#include "mongo/bson/util/builder.h"
#include "mongo/db/query/internal_plans.h"
#include "mongo/db/range_arithmetic.h"
#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/s/grid.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
#include "mongo/util/time_support.h"
// MetadataManager maintains pointers to CollectionMetadata objects in a member list named
// _metadata. Each CollectionMetadata contains an immutable _chunksMap of chunks assigned to this
// shard, along with details related to its own lifecycle in a member _tracker.
//
// The current chunk mapping, used by queries starting up, is at _metadata.back(). Each query,
// when it starts up, requests and holds a ScopedCollectionMetadata object, and destroys it on
// termination. Each ScopedCollectionMetadata keeps a shared_ptr to its CollectionMetadata chunk
// mapping, and to the MetadataManager itself. CollectionMetadata mappings also keep a record of
// chunk ranges that may be deleted when it is determined that the range can no longer be in use.
//
// ScopedCollectionMetadata's destructor decrements the CollectionMetadata's usageCounter.
// Whenever a usageCounter drops to zero, we check whether any now-unused CollectionMetadata
// elements can be popped off the front of _metadata. We need to keep the unused elements in the
// middle (as seen below) because they may schedule deletions of chunks depended on by older
// mappings.
//
// New chunk mappings are pushed onto the back of _metadata. Subsequently started queries use the
// new mapping while still-running queries continue using the older "snapshot" mappings. We treat
// _metadata.back()'s usage count differently from the snapshots because it can't reliably be
// compared to zero; a new query may increment it at any time.
//
// (Note that the collection may be dropped or become unsharded, and even get made and sharded
// again, between construction and destruction of a ScopedCollectionMetadata).
//
// MetadataManager also contains a CollectionRangeDeleter _rangesToClean that queues orphan ranges
// being deleted in a background thread, and a mapping _receivingChunks of the ranges being migrated
// in, to avoid deleting them. Each range deletion is paired with a notification object triggered
// when the deletion is completed or abandoned.
//
// ____________________________
// (s): std::shared_ptr<> Clients:| ScopedCollectionMetadata |
// _________________________ +----(s) manager metadata (s)------------------+
// | CollectionShardingState | | |____________________________| | |
// | _metadataManager (s) | +-------(s) manager metadata (s)--------------+ |
// |____________________|____| | |____________________________| | | |
// ____________________v________ +------------(s) manager metadata (s)-----+ | |
// | MetadataManager | | |____________________________| | | |
// | |<--+ | | |
// | | ___________________________ (1 use) | | |
// | getActiveMetadata(): /---------->| CollectionMetadata |<---------+ | |
// | back(): [(s),------/ | | _________________________|_ | |
// | (s),-------------------->| CollectionMetadata | (0 uses) | |
// | _metadata: (s)]------\ | | | _________________________|_ | |
// | \-------------->| CollectionMetadata | | |
// | _receivingChunks | | | | | (2 uses) | |
// | _rangesToClean: | | | | _tracker: |<---------+ |
// | _________________________ | | | | _______________________ |<-----------+
// | | CollectionRangeDeleter | | | | | | Tracker | |
// | | | | | | | | | |
// | | _orphans [range,notif, | | | | | | usageCounter | |
// | | range,notif, | | | | | | orphans [range,notif, | |
// | | ... ] | | | | | | range,notif, | |
// | | | | | | | | ... ] | |
// | |_________________________| | |_| | |_______________________| |
// |_____________________________| | | _chunksMap |
// |_| _chunkVersion |
// | ... |
// |___________________________|
//
// Note that _metadata as shown here has its front() at the bottom, back() at the top. As usual,
// new entries are pushed onto the back, popped off the front.
namespace mongo {
namespace {
using TaskExecutor = executor::TaskExecutor;
using CallbackArgs = TaskExecutor::CallbackArgs;
MONGO_FAIL_POINT_DEFINE(suspendRangeDeletion);
class UnshardedCollection : public ScopedCollectionMetadata::Impl {
public:
UnshardedCollection() = default;
const CollectionMetadata& get() override {
return _metadata;
}
private:
CollectionMetadata _metadata;
};
const auto kUnshardedCollection = std::make_shared();
/**
* Deletes ranges, in background, until done, normally using a task executor attached to the
* ShardingState.
*
* Each time it completes cleaning up a range, it wakes up clients waiting on completion of that
* range, which may then verify that their range has no more deletions scheduled, and proceed.
*/
void scheduleCleanup(executor::TaskExecutor* executor,
NamespaceString nss,
OID epoch,
Date_t when) {
LOG(1) << "Scheduling cleanup on " << nss.ns() << " at " << when;
auto swCallbackHandle = executor->scheduleWorkAt(
when, [ executor, nss = std::move(nss), epoch = std::move(epoch) ](auto&) {
Client::initThreadIfNotAlready("Collection Range Deleter");
auto uniqueOpCtx = Client::getCurrent()->makeOperationContext();
auto opCtx = uniqueOpCtx.get();
const int maxToDelete = std::max(int(internalQueryExecYieldIterations.load()), 1);
MONGO_FAIL_POINT_PAUSE_WHILE_SET(suspendRangeDeletion);
auto next = CollectionRangeDeleter::cleanUpNextRange(opCtx, nss, epoch, maxToDelete);
if (next) {
scheduleCleanup(executor, std::move(nss), std::move(epoch), *next);
}
});
if (!swCallbackHandle.isOK()) {
log() << "Failed to schedule the orphan data cleanup task"
<< causedBy(redact(swCallbackHandle.getStatus()));
}
}
} // namespace
class RangePreserver : public ScopedCollectionMetadata::Impl {
public:
// Must be called locked with the MetadataManager's _managerLock
RangePreserver(WithLock,
std::shared_ptr metadataManager,
std::shared_ptr metadataTracker)
: _metadataManager(std::move(metadataManager)),
_metadataTracker(std::move(metadataTracker)) {
++_metadataTracker->usageCounter;
}
~RangePreserver() {
stdx::lock_guard managerLock(_metadataManager->_managerLock);
invariant(_metadataTracker->usageCounter != 0);
if (--_metadataTracker->usageCounter == 0) {
// MetadataManager doesn't care which usageCounter went to zero. It just retires all
// that are older than the oldest metadata still in use by queries (some start out at
// zero, some go to zero but can't be expired yet).
//
// Note that new instances of ScopedCollectionMetadata may get attached to
// _metadata.back(), so its usage count can increase from zero, unlike other reference
// counts.
_metadataManager->_retireExpiredMetadata(managerLock);
}
}
const CollectionMetadata& get() override {
return _metadataTracker->metadata;
}
private:
friend boost::optional MetadataManager::getActiveMetadata(
std::shared_ptr, const boost::optional&);
std::shared_ptr _metadataManager;
std::shared_ptr _metadataTracker;
};
MetadataManager::MetadataManager(ServiceContext* serviceContext,
NamespaceString nss,
TaskExecutor* executor)
: _serviceContext(serviceContext),
_nss(std::move(nss)),
_executor(executor),
_receivingChunks(SimpleBSONObjComparator::kInstance.makeBSONObjIndexedMap()) {}
MetadataManager::~MetadataManager() {
clearFilteringMetadata();
}
void MetadataManager::_clearAllCleanups(WithLock lock) {
_clearAllCleanups(
lock,
{ErrorCodes::InterruptedDueToStepDown,
str::stream() << "Range deletions in " << _nss.ns()
<< " abandoned because collection was dropped or became unsharded"});
}
void MetadataManager::_clearAllCleanups(WithLock, Status status) {
for (auto& tracker : _metadata) {
std::ignore = _rangesToClean.add(std::move(tracker->orphans));
}
_rangesToClean.clear(status);
}
boost::optional MetadataManager::getActiveMetadata(
std::shared_ptr self, const boost::optional& atClusterTime) {
stdx::lock_guard lg(_managerLock);
if (_metadata.empty()) {
return boost::none;
}
auto activeMetadataTracker = _metadata.back();
const auto& activeMetadata = activeMetadataTracker->metadata;
// We don't keep routing history for unsharded collections, so if the collection is unsharded
// just return the active metadata
if (!atClusterTime || !activeMetadata.isSharded()) {
return ScopedCollectionMetadata(std::make_shared(
lg, std::move(self), std::move(activeMetadataTracker)));
}
auto chunkManager = activeMetadata.getChunkManager();
auto chunkManagerAtClusterTime = std::make_shared(
chunkManager->getRoutingHistory(), atClusterTime->asTimestamp());
class MetadataAtTimestamp : public ScopedCollectionMetadata::Impl {
public:
MetadataAtTimestamp(CollectionMetadata metadata) : _metadata(std::move(metadata)) {}
const CollectionMetadata& get() override {
return _metadata;
}
private:
CollectionMetadata _metadata;
};
return ScopedCollectionMetadata(std::make_shared(
CollectionMetadata(chunkManagerAtClusterTime, activeMetadata.shardId())));
}
size_t MetadataManager::numberOfMetadataSnapshots() const {
stdx::lock_guard lg(_managerLock);
if (_metadata.empty())
return 0;
return _metadata.size() - 1;
}
void MetadataManager::setFilteringMetadata(CollectionMetadata remoteMetadata) {
stdx::lock_guard lg(_managerLock);
// Collection is becoming sharded
if (_metadata.empty()) {
LOG(0) << "Marking collection " << _nss.ns() << " as " << remoteMetadata.toStringBasic();
invariant(_receivingChunks.empty());
invariant(_rangesToClean.isEmpty());
_setActiveMetadata(lg, std::move(remoteMetadata));
return;
}
const auto& activeMetadata = _metadata.back()->metadata;
// If the metadata being installed has a different epoch from ours, this means the collection
// was dropped and recreated, so we must entirely reset the metadata state
if (activeMetadata.getCollVersion().epoch() != remoteMetadata.getCollVersion().epoch()) {
LOG(0) << "Updating metadata for collection " << _nss.ns() << " from "
<< activeMetadata.toStringBasic() << " to " << remoteMetadata.toStringBasic()
<< " due to epoch change";
_receivingChunks.clear();
_clearAllCleanups(lg);
_metadata.clear();
_setActiveMetadata(lg, std::move(remoteMetadata));
return;
}
// We already have the same or newer version
if (activeMetadata.getCollVersion() >= remoteMetadata.getCollVersion()) {
LOG(1) << "Ignoring update of active metadata " << activeMetadata.toStringBasic()
<< " with an older " << remoteMetadata.toStringBasic();
return;
}
LOG(0) << "Updating metadata for collection " << _nss.ns() << " from "
<< activeMetadata.toStringBasic() << " to " << remoteMetadata.toStringBasic()
<< " due to version change";
// Resolve any receiving chunks, which might have completed by now
for (auto it = _receivingChunks.begin(); it != _receivingChunks.end();) {
const ChunkRange receivingRange(it->first, it->second);
if (!remoteMetadata.rangeOverlapsChunk(receivingRange)) {
++it;
continue;
}
// The remote metadata contains a chunk we were earlier in the process of receiving, so we
// deem it successfully received
LOG(2) << "Verified chunk " << redact(receivingRange.toString()) << " for collection "
<< _nss.ns() << " has been migrated to this shard earlier";
_receivingChunks.erase(it);
it = _receivingChunks.begin();
}
_setActiveMetadata(lg, std::move(remoteMetadata));
}
void MetadataManager::clearFilteringMetadata() {
stdx::lock_guard lg(_managerLock);
_receivingChunks.clear();
_clearAllCleanups(lg);
_metadata.clear();
}
void MetadataManager::_setActiveMetadata(WithLock wl, CollectionMetadata newMetadata) {
_metadata.emplace_back(std::make_shared(std::move(newMetadata)));
_retireExpiredMetadata(wl);
}
void MetadataManager::_retireExpiredMetadata(WithLock lock) {
while (_metadata.size() > 1 && !_metadata.front()->usageCounter) {
if (!_metadata.front()->orphans.empty()) {
LOG(0) << "Queries possibly dependent on " << _nss.ns()
<< " range(s) finished; scheduling ranges for deletion";
// It is safe to push orphan ranges from _metadata.back(), even though new queries might
// start any time, because any request to delete a range it maps is rejected.
_pushListToClean(lock, std::move(_metadata.front()->orphans));
}
_metadata.pop_front();
}
}
void MetadataManager::toBSONPending(BSONArrayBuilder& bb) const {
stdx::lock_guard lg(_managerLock);
for (auto it = _receivingChunks.begin(); it != _receivingChunks.end(); ++it) {
BSONArrayBuilder pendingBB(bb.subarrayStart());
pendingBB.append(it->first);
pendingBB.append(it->second);
pendingBB.done();
}
}
void MetadataManager::append(BSONObjBuilder* builder) const {
stdx::lock_guard lg(_managerLock);
_rangesToClean.append(builder);
BSONArrayBuilder pcArr(builder->subarrayStart("pendingChunks"));
for (const auto& entry : _receivingChunks) {
BSONObjBuilder obj;
ChunkRange r = ChunkRange(entry.first, entry.second);
r.append(&obj);
pcArr.append(obj.done());
}
pcArr.done();
if (_metadata.empty()) {
return;
}
BSONArrayBuilder amrArr(builder->subarrayStart("activeMetadataRanges"));
for (const auto& entry : _metadata.back()->metadata.getChunks()) {
BSONObjBuilder obj;
ChunkRange r = ChunkRange(entry.first, entry.second);
r.append(&obj);
amrArr.append(obj.done());
}
amrArr.done();
}
auto MetadataManager::_pushRangeToClean(WithLock lock, ChunkRange const& range, Date_t when)
-> CleanupNotification {
std::list ranges;
ranges.emplace_back(ChunkRange(range.getMin().getOwned(), range.getMax().getOwned()), when);
auto& notifn = ranges.back().notification;
_pushListToClean(lock, std::move(ranges));
return notifn;
}
void MetadataManager::_pushListToClean(WithLock, std::list ranges) {
auto when = _rangesToClean.add(std::move(ranges));
if (when) {
scheduleCleanup(
_executor, _nss, _metadata.back()->metadata.getCollVersion().epoch(), *when);
}
}
auto MetadataManager::beginReceive(ChunkRange const& range) -> CleanupNotification {
stdx::lock_guard lg(_managerLock);
invariant(!_metadata.empty());
if (_overlapsInUseChunk(lg, range)) {
return Status{ErrorCodes::RangeOverlapConflict,
"Documents in target range may still be in use on the destination shard."};
}
_receivingChunks.emplace(range.getMin().getOwned(), range.getMax().getOwned());
log() << "Scheduling deletion of any documents in " << _nss.ns() << " range "
<< redact(range.toString()) << " before migrating in a chunk covering the range";
return _pushRangeToClean(lg, range, Date_t{});
}
void MetadataManager::forgetReceive(ChunkRange const& range) {
stdx::lock_guard lg(_managerLock);
invariant(!_metadata.empty());
// This is potentially a partially received chunk, which needs to be cleaned up. We know none
// of these documents are in use, so they can go straight to the deletion queue.
log() << "Abandoning in-migration of " << _nss.ns() << " range " << range
<< "; scheduling deletion of any documents already copied";
invariant(!_overlapsInUseChunk(lg, range));
auto it = _receivingChunks.find(range.getMin());
invariant(it != _receivingChunks.end());
_receivingChunks.erase(it);
_pushRangeToClean(lg, range, Date_t{}).abandon();
}
auto MetadataManager::cleanUpRange(ChunkRange const& range, Date_t whenToDelete)
-> CleanupNotification {
stdx::lock_guard lg(_managerLock);
invariant(!_metadata.empty());
auto* const activeMetadata = _metadata.back().get();
auto* const overlapMetadata = _findNewestOverlappingMetadata(lg, range);
if (overlapMetadata == activeMetadata) {
return Status{ErrorCodes::RangeOverlapConflict,
str::stream() << "Requested deletion range overlaps a live shard chunk"};
}
if (rangeMapOverlaps(_receivingChunks, range.getMin(), range.getMax())) {
return Status{ErrorCodes::RangeOverlapConflict,
str::stream() << "Requested deletion range overlaps a chunk being"
" migrated in"};
}
if (!overlapMetadata) {
// No running queries can depend on it, so queue it for deletion immediately.
const auto whenStr = (whenToDelete == Date_t{}) ? "immediate"_sd : "deferred"_sd;
log() << "Scheduling " << whenStr << " deletion of " << _nss.ns() << " range "
<< redact(range.toString());
return _pushRangeToClean(lg, range, whenToDelete);
}
log() << "Deletion of " << _nss.ns() << " range " << redact(range.toString())
<< " will be scheduled after all possibly dependent queries finish";
// Put it on the oldest metadata permissible; the current one might live a long time.
auto& orphans = overlapMetadata->orphans;
orphans.emplace_back(ChunkRange(range.getMin().getOwned(), range.getMax().getOwned()),
whenToDelete);
return orphans.back().notification;
}
size_t MetadataManager::numberOfRangesToCleanStillInUse() const {
stdx::lock_guard lg(_managerLock);
size_t count = 0;
for (auto& tracker : _metadata) {
count += tracker->orphans.size();
}
return count;
}
size_t MetadataManager::numberOfRangesToClean() const {
stdx::lock_guard lg(_managerLock);
return _rangesToClean.size();
}
auto MetadataManager::trackOrphanedDataCleanup(ChunkRange const& range) const
-> boost::optional {
stdx::lock_guard lg(_managerLock);
auto overlaps = _overlapsInUseCleanups(lg, range);
if (overlaps) {
return overlaps;
}
return _rangesToClean.overlaps(range);
}
auto MetadataManager::_findNewestOverlappingMetadata(WithLock, ChunkRange const& range)
-> CollectionMetadataTracker* {
invariant(!_metadata.empty());
auto it = _metadata.rbegin();
if ((*it)->metadata.rangeOverlapsChunk(range)) {
return (*it).get();
}
++it;
for (; it != _metadata.rend(); ++it) {
auto& tracker = *it;
if (tracker->usageCounter && tracker->metadata.rangeOverlapsChunk(range)) {
return tracker.get();
}
}
return nullptr;
}
bool MetadataManager::_overlapsInUseChunk(WithLock lk, ChunkRange const& range) {
auto* cm = _findNewestOverlappingMetadata(lk, range);
return (cm != nullptr);
}
auto MetadataManager::_overlapsInUseCleanups(WithLock, ChunkRange const& range) const
-> boost::optional {
invariant(!_metadata.empty());
for (auto it = _metadata.rbegin(); it != _metadata.rend(); ++it) {
const auto& orphans = (*it)->orphans;
for (auto itOrphans = orphans.rbegin(); itOrphans != orphans.rend(); ++itOrphans) {
const auto& orphan = *itOrphans;
if (orphan.range.overlapWith(range)) {
return orphan.notification;
}
}
}
return boost::none;
}
boost::optional MetadataManager::getNextOrphanRange(BSONObj const& from) const {
stdx::lock_guard lg(_managerLock);
invariant(!_metadata.empty());
return _metadata.back()->metadata.getNextOrphanRange(_receivingChunks, from);
}
} // namespace mongo