/**
* Copyright (C) 2015 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects
* for all of the code used other than as permitted herein. If you modify
* file(s) with this exception, you may extend this exception to your
* version of the file(s), but you are not obligated to do so. If you do not
* wish to do so, delete this exception statement from your version. If you
* delete this exception statement from all source files in the program,
* then also delete it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
#include "mongo/platform/basic.h"
#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/bson/util/bson_extract.h"
#include "mongo/db/catalog/catalog_raii.h"
#include "mongo/db/client.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/s/migration_chunk_cloner_source.h"
#include "mongo/db/s/migration_source_manager.h"
#include "mongo/db/s/operation_sharding_state.h"
#include "mongo/db/s/shard_identity_rollback_notifier.h"
#include "mongo/db/s/sharded_connection_info.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/db/s/type_shard_identity.h"
#include "mongo/db/server_options.h"
#include "mongo/db/server_parameters.h"
#include "mongo/db/service_context.h"
#include "mongo/s/balancer_configuration.h"
#include "mongo/s/catalog/sharding_catalog_manager.h"
#include "mongo/s/catalog/type_config_version.h"
#include "mongo/s/catalog/type_shard.h"
#include "mongo/s/catalog/type_shard_collection.h"
#include "mongo/s/catalog_cache.h"
#include "mongo/s/catalog_cache_loader.h"
#include "mongo/s/chunk_version.h"
#include "mongo/s/cluster_identity_loader.h"
#include "mongo/s/grid.h"
#include "mongo/s/stale_exception.h"
#include "mongo/util/log.h"
namespace mongo {
namespace {
// How long to wait before starting cleanup of an emigrated chunk range
MONGO_EXPORT_SERVER_PARAMETER(orphanCleanupDelaySecs, int, 900); // 900s = 15m
/**
* Used to perform shard identity initialization once it is certain that the document is committed.
*/
class ShardIdentityLogOpHandler final : public RecoveryUnit::Change {
public:
ShardIdentityLogOpHandler(OperationContext* opCtx, ShardIdentityType shardIdentity)
: _opCtx(opCtx), _shardIdentity(std::move(shardIdentity)) {}
void commit() override {
fassertNoTrace(
40071, ShardingState::get(_opCtx)->initializeFromShardIdentity(_opCtx, _shardIdentity));
}
void rollback() override {}
private:
OperationContext* _opCtx;
const ShardIdentityType _shardIdentity;
};
/**
* Used to notify the catalog cache loader of a new collection version and invalidate the in-memory
* routing table cache once the oplog updates are committed and become visible.
*/
class CollectionVersionLogOpHandler final : public RecoveryUnit::Change {
public:
CollectionVersionLogOpHandler(OperationContext* opCtx, const NamespaceString& nss)
: _opCtx(opCtx), _nss(nss) {}
void commit() override {
invariant(_opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX));
CatalogCacheLoader::get(_opCtx).notifyOfCollectionVersionUpdate(_nss);
// This is a hack to get around CollectionShardingState::refreshMetadata() requiring the X
// lock: markNotShardedAtStepdown() doesn't have a lock check. Temporary measure until
// SERVER-31595 removes the X lock requirement.
CollectionShardingState::get(_opCtx, _nss)->markNotShardedAtStepdown();
}
void rollback() override {}
private:
OperationContext* _opCtx;
const NamespaceString _nss;
};
/**
* Caller must hold the global lock in some mode other than MODE_NONE.
*/
bool isStandaloneOrPrimary(OperationContext* opCtx) {
dassert(opCtx->lockState()->isLocked());
auto replCoord = repl::ReplicationCoordinator::get(opCtx);
bool isReplSet = replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet;
return !isReplSet || (repl::ReplicationCoordinator::get(opCtx)->getMemberState() ==
repl::MemberState::RS_PRIMARY);
}
} // unnamed namespace
CollectionShardingState::CollectionShardingState(ServiceContext* sc, NamespaceString nss)
: _nss(std::move(nss)),
_metadataManager(std::make_shared(
sc, _nss, ShardingState::get(sc)->getRangeDeleterTaskExecutor())) {}
CollectionShardingState::~CollectionShardingState() {
invariant(!_sourceMgr);
}
CollectionShardingState* CollectionShardingState::get(OperationContext* opCtx,
const NamespaceString& nss) {
return CollectionShardingState::get(opCtx, nss.ns());
}
CollectionShardingState* CollectionShardingState::get(OperationContext* opCtx,
const std::string& ns) {
// Collection lock must be held to have a reference to the collection's sharding state
dassert(opCtx->lockState()->isCollectionLockedForMode(ns, MODE_IS));
ShardingState* const shardingState = ShardingState::get(opCtx);
return shardingState->getNS(ns, opCtx);
}
ScopedCollectionMetadata CollectionShardingState::getMetadata() {
return _metadataManager->getActiveMetadata(_metadataManager);
}
void CollectionShardingState::refreshMetadata(OperationContext* opCtx,
std::unique_ptr newMetadata) {
invariant(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_X));
_metadataManager->refreshActiveMetadata(std::move(newMetadata));
}
void CollectionShardingState::markNotShardedAtStepdown() {
_metadataManager->refreshActiveMetadata(nullptr);
}
auto CollectionShardingState::beginReceive(ChunkRange const& range) -> CleanupNotification {
return _metadataManager->beginReceive(range);
}
void CollectionShardingState::forgetReceive(const ChunkRange& range) {
_metadataManager->forgetReceive(range);
}
auto CollectionShardingState::cleanUpRange(ChunkRange const& range, CleanWhen when)
-> CleanupNotification {
Date_t time = (when == kNow) ? Date_t{} : Date_t::now() +
stdx::chrono::seconds{orphanCleanupDelaySecs.load()};
return _metadataManager->cleanUpRange(range, time);
}
std::vector CollectionShardingState::overlappingMetadata(
ChunkRange const& range) const {
return _metadataManager->overlappingMetadata(_metadataManager, range);
}
MigrationSourceManager* CollectionShardingState::getMigrationSourceManager() {
return _sourceMgr;
}
void CollectionShardingState::setMigrationSourceManager(OperationContext* opCtx,
MigrationSourceManager* sourceMgr) {
invariant(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_X));
invariant(sourceMgr);
invariant(!_sourceMgr);
_sourceMgr = sourceMgr;
}
void CollectionShardingState::clearMigrationSourceManager(OperationContext* opCtx) {
invariant(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_X));
invariant(_sourceMgr);
_sourceMgr = nullptr;
}
void CollectionShardingState::checkShardVersionOrThrow(OperationContext* opCtx) {
std::string errmsg;
ChunkVersion received;
ChunkVersion wanted;
if (!_checkShardVersionOk(opCtx, &errmsg, &received, &wanted)) {
throw StaleConfigException(
_nss.ns(), str::stream() << "shard version not ok: " << errmsg, received, wanted);
}
}
bool CollectionShardingState::collectionIsSharded() {
auto metadata = getMetadata().getMetadata();
if (metadata && (metadata->getCollVersion().isStrictlyEqualTo(ChunkVersion::UNSHARDED()))) {
return false;
}
// If 'metadata' is null, then the shard doesn't know if this collection is sharded or not. In
// this scenario we will assume this collection is sharded. We will know sharding state
// definitively once SERVER-24960 has been fixed.
return true;
}
// Call with collection unlocked. Note that the CollectionShardingState object involved might not
// exist anymore at the time of the call, or indeed anytime outside the AutoGetCollection block, so
// anything that might alias something in it must be copied first.
Status CollectionShardingState::waitForClean(OperationContext* opCtx,
const NamespaceString& nss,
OID const& epoch,
ChunkRange orphanRange) {
while (true) {
boost::optional stillScheduled;
{
AutoGetCollection autoColl(opCtx, nss, MODE_IX);
auto css = CollectionShardingState::get(opCtx, nss);
{
// First, see if collection was dropped, but do it in a separate scope in order to
// not hold reference on it, which would make it appear in use
auto metadata = css->_metadataManager->getActiveMetadata(css->_metadataManager);
if (!metadata || metadata->getCollVersion().epoch() != epoch) {
return {ErrorCodes::StaleShardVersion, "Collection being migrated was dropped"};
}
}
stillScheduled = css->trackOrphanedDataCleanup(orphanRange);
if (!stillScheduled) {
log() << "Finished deleting " << nss.ns() << " range "
<< redact(orphanRange.toString());
return Status::OK();
}
}
log() << "Waiting for deletion of " << nss.ns() << " range " << orphanRange;
Status result = stillScheduled->waitStatus(opCtx);
if (!result.isOK()) {
return {result.code(),
str::stream() << "Failed to delete orphaned " << nss.ns() << " range "
<< orphanRange.toString()
<< " due to "
<< result.reason()};
}
}
MONGO_UNREACHABLE;
}
auto CollectionShardingState::trackOrphanedDataCleanup(ChunkRange const& range)
-> boost::optional {
return _metadataManager->trackOrphanedDataCleanup(range);
}
boost::optional CollectionShardingState::getNextOrphanRange(BSONObj const& from) {
return _metadataManager->getNextOrphanRange(from);
}
void CollectionShardingState::onInsertOp(OperationContext* opCtx,
const BSONObj& insertedDoc,
const repl::OpTime& opTime) {
dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX));
if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) {
if (_nss == NamespaceString::kServerConfigurationNamespace) {
if (auto idElem = insertedDoc["_id"]) {
if (idElem.str() == ShardIdentityType::IdName) {
auto shardIdentityDoc =
uassertStatusOK(ShardIdentityType::fromBSON(insertedDoc));
uassertStatusOK(shardIdentityDoc.validate());
opCtx->recoveryUnit()->registerChange(
new ShardIdentityLogOpHandler(opCtx, std::move(shardIdentityDoc)));
}
}
}
if (ShardingState::get(opCtx)->enabled()) {
_incrementChunkOnInsertOrUpdate(opCtx, insertedDoc, insertedDoc.objsize());
}
}
checkShardVersionOrThrow(opCtx);
if (_sourceMgr) {
_sourceMgr->getCloner()->onInsertOp(opCtx, insertedDoc, opTime);
}
}
void CollectionShardingState::onUpdateOp(OperationContext* opCtx,
const BSONObj& query,
const BSONObj& update,
const BSONObj& updatedDoc,
const repl::OpTime& opTime,
const repl::OpTime& prePostImageOpTime) {
dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX));
if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) {
if (_nss == NamespaceString::kShardConfigCollectionsCollectionName) {
_onConfigCollectionsUpdateOp(opCtx, query, update, updatedDoc);
}
if (ShardingState::get(opCtx)->enabled()) {
_incrementChunkOnInsertOrUpdate(opCtx, updatedDoc, update.objsize());
}
}
checkShardVersionOrThrow(opCtx);
if (_sourceMgr) {
_sourceMgr->getCloner()->onUpdateOp(opCtx, updatedDoc, opTime, prePostImageOpTime);
}
}
auto CollectionShardingState::makeDeleteState(BSONObj const& doc) -> DeleteState {
return {getMetadata().extractDocumentKey(doc).getOwned(),
_sourceMgr && _sourceMgr->getCloner()->isDocumentInMigratingChunk(doc)};
}
void CollectionShardingState::onDeleteOp(OperationContext* opCtx,
const DeleteState& deleteState,
const repl::OpTime& opTime,
const repl::OpTime& preImageOpTime) {
dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX));
if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) {
if (_nss == NamespaceString::kShardConfigCollectionsCollectionName) {
_onConfigDeleteInvalidateCachedMetadataAndNotify(opCtx, deleteState.documentKey);
}
if (_nss == NamespaceString::kServerConfigurationNamespace) {
if (auto idElem = deleteState.documentKey["_id"]) {
auto idStr = idElem.str();
if (idStr == ShardIdentityType::IdName) {
if (!repl::ReplicationCoordinator::get(opCtx)->getMemberState().rollback()) {
uasserted(40070,
"cannot delete shardIdentity document while in --shardsvr mode");
} else {
warning() << "Shard identity document rolled back. Will shut down after "
"finishing rollback.";
ShardIdentityRollbackNotifier::get(opCtx)->recordThatRollbackHappened();
}
}
}
}
}
if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) {
if (_nss == VersionType::ConfigNS) {
if (!repl::ReplicationCoordinator::get(opCtx)->getMemberState().rollback()) {
uasserted(40302, "cannot delete config.version document while in --configsvr mode");
} else {
// Throw out any cached information related to the cluster ID.
ShardingCatalogManager::get(opCtx)
->discardCachedConfigDatabaseInitializationState();
ClusterIdentityLoader::get(opCtx)->discardCachedClusterId();
}
}
}
checkShardVersionOrThrow(opCtx);
if (_sourceMgr && deleteState.isMigrating) {
_sourceMgr->getCloner()->onDeleteOp(opCtx, deleteState.documentKey, opTime, preImageOpTime);
}
}
void CollectionShardingState::onDropCollection(OperationContext* opCtx,
const NamespaceString& collectionName) {
dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX));
if (serverGlobalParams.clusterRole == ClusterRole::ShardServer &&
_nss == NamespaceString::kServerConfigurationNamespace) {
// Dropping system collections is not allowed for end users.
invariant(!opCtx->writesAreReplicated());
invariant(repl::ReplicationCoordinator::get(opCtx)->getMemberState().rollback());
// Can't confirm whether there was a ShardIdentity document or not yet, so assume there was
// one and shut down the process to clear the in-memory sharding state.
warning() << "admin.system.version collection rolled back. Will shut down after "
"finishing rollback";
ShardIdentityRollbackNotifier::get(opCtx)->recordThatRollbackHappened();
}
if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) {
if (_nss == VersionType::ConfigNS) {
if (!repl::ReplicationCoordinator::get(opCtx)->getMemberState().rollback()) {
uasserted(40303, "cannot drop config.version document while in --configsvr mode");
} else {
// Throw out any cached information related to the cluster ID.
ShardingCatalogManager::get(opCtx)
->discardCachedConfigDatabaseInitializationState();
ClusterIdentityLoader::get(opCtx)->discardCachedClusterId();
}
}
}
}
void CollectionShardingState::_onConfigCollectionsUpdateOp(OperationContext* opCtx,
const BSONObj& query,
const BSONObj& update,
const BSONObj& updatedDoc) {
dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX));
invariant(serverGlobalParams.clusterRole == ClusterRole::ShardServer);
// Notification of routing table changes are only needed on secondaries.
if (isStandaloneOrPrimary(opCtx)) {
return;
}
// Extract which user collection was updated.
std::string updatedCollection;
fassertStatusOK(
40477, bsonExtractStringField(query, ShardCollectionType::ns.name(), &updatedCollection));
// Parse the '$set' update.
BSONElement setElement;
Status setStatus = bsonExtractTypedField(update, StringData("$set"), Object, &setElement);
if (setStatus.isOK()) {
BSONObj setField = setElement.Obj();
const NamespaceString updatedNss(updatedCollection);
// Need the WUOW to retain the lock for CollectionVersionLogOpHandler::commit().
AutoGetCollection autoColl(opCtx, updatedNss, MODE_IX);
if (setField.hasField(ShardCollectionType::lastRefreshedCollectionVersion.name())) {
opCtx->recoveryUnit()->registerChange(
new CollectionVersionLogOpHandler(opCtx, updatedNss));
}
if (setField.hasField(ShardCollectionType::enterCriticalSectionCounter.name())) {
// This is a hack to get around CollectionShardingState::refreshMetadata() requiring the
// X lock: markNotShardedAtStepdown() doesn't have a lock check. Temporary measure until
// SERVER-31595 removes the X lock requirement.
CollectionShardingState::get(opCtx, updatedNss)->markNotShardedAtStepdown();
}
}
}
void CollectionShardingState::_onConfigDeleteInvalidateCachedMetadataAndNotify(
OperationContext* opCtx, const BSONObj& query) {
dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX));
invariant(serverGlobalParams.clusterRole == ClusterRole::ShardServer);
// Notification of routing table changes are only needed on secondaries.
if (isStandaloneOrPrimary(opCtx)) {
return;
}
// Extract which collection entry is being deleted from the _id field.
std::string deletedCollection;
fassertStatusOK(
40479, bsonExtractStringField(query, ShardCollectionType::ns.name(), &deletedCollection));
const NamespaceString deletedNss(deletedCollection);
// Need the WUOW to retain the lock for CollectionVersionLogOpHandler::commit().
AutoGetCollection autoColl(opCtx, deletedNss, MODE_IX);
opCtx->recoveryUnit()->registerChange(new CollectionVersionLogOpHandler(opCtx, deletedNss));
}
bool CollectionShardingState::_checkShardVersionOk(OperationContext* opCtx,
std::string* errmsg,
ChunkVersion* expectedShardVersion,
ChunkVersion* actualShardVersion) {
auto* const client = opCtx->getClient();
auto& oss = OperationShardingState::get(opCtx);
// If there is a version attached to the OperationContext, use it as the received version.
// Otherwise, get the received version from the ShardedConnectionInfo.
if (oss.hasShardVersion()) {
*expectedShardVersion = oss.getShardVersion(_nss);
} else {
ShardedConnectionInfo* info = ShardedConnectionInfo::get(client, false);
if (!info) {
// There is no shard version information on either 'opCtx' or 'client'. This means that
// the operation represented by 'opCtx' is unversioned, and the shard version is always
// OK for unversioned operations.
return true;
}
*expectedShardVersion = info->getVersion(_nss.ns());
}
// An operation with read concern 'available' should never have shardVersion set.
invariant(repl::ReadConcernArgs::get(opCtx).getLevel() !=
repl::ReadConcernLevel::kAvailableReadConcern);
if (ChunkVersion::isIgnoredVersion(*expectedShardVersion)) {
return true;
}
// Set this for error messaging purposes before potentially returning false.
auto metadata = getMetadata();
*actualShardVersion = metadata ? metadata->getShardVersion() : ChunkVersion::UNSHARDED();
if (_sourceMgr) {
const bool isReader = !opCtx->lockState()->isWriteLocked();
auto criticalSectionSignal = _sourceMgr->getMigrationCriticalSectionSignal(isReader);
if (criticalSectionSignal) {
*errmsg = str::stream() << "migration commit in progress for " << _nss.ns();
// Set migration critical section on operation sharding state: operation will wait for
// the migration to finish before returning failure and retrying.
oss.setMigrationCriticalSectionSignal(criticalSectionSignal);
return false;
}
}
if (expectedShardVersion->isWriteCompatibleWith(*actualShardVersion)) {
return true;
}
//
// Figure out exactly why not compatible, send appropriate error message
// The versions themselves are returned in the error, so not needed in messages here
//
// Check epoch first, to send more meaningful message, since other parameters probably won't
// match either.
if (actualShardVersion->epoch() != expectedShardVersion->epoch()) {
*errmsg = str::stream() << "version epoch mismatch detected for " << _nss.ns() << ", "
<< "the collection may have been dropped and recreated";
return false;
}
if (!actualShardVersion->isSet() && expectedShardVersion->isSet()) {
*errmsg = str::stream() << "this shard no longer contains chunks for " << _nss.ns() << ", "
<< "the collection may have been dropped";
return false;
}
if (actualShardVersion->isSet() && !expectedShardVersion->isSet()) {
*errmsg = str::stream() << "this shard contains versioned chunks for " << _nss.ns() << ", "
<< "but no version set in request";
return false;
}
if (actualShardVersion->majorVersion() != expectedShardVersion->majorVersion()) {
// Could be > or < - wanted is > if this is the source of a migration, wanted < if this is
// the target of a migration
*errmsg = str::stream() << "version mismatch detected for " << _nss.ns();
return false;
}
// Those are all the reasons the versions can mismatch
MONGO_UNREACHABLE;
}
uint64_t CollectionShardingState::_incrementChunkOnInsertOrUpdate(OperationContext* opCtx,
const BSONObj& document,
long dataWritten) {
// Here, get the collection metadata and check if it exists. If it doesn't exist, then the
// collection is not sharded, and we can simply return -1.
ScopedCollectionMetadata metadata = getMetadata();
if (!metadata) {
return -1;
}
std::shared_ptr cm = metadata->getChunkManager();
const ShardKeyPattern& shardKeyPattern = cm->getShardKeyPattern();
// Each inserted/updated document should contain the shard key. The only instance in which a
// document could not contain a shard key is if the insert/update is performed through mongod
// explicitly, as opposed to first routed through mongos.
BSONObj shardKey = shardKeyPattern.extractShardKeyFromDoc(document);
if (shardKey.woCompare(BSONObj()) == 0) {
warning() << "inserting document " << document.toString() << " without shard key pattern "
<< shardKeyPattern << " into a sharded collection";
return -1;
}
// Use the shard key to locate the chunk into which the document was updated, and increment the
// number of bytes tracked for the chunk. Note that we can assume the simple collation, because
// shard keys do not support non-simple collations.
auto chunk = cm->findIntersectingChunkWithSimpleCollation(shardKey);
chunk->addBytesWritten(dataWritten);
// If the chunk becomes too large, then we call the ChunkSplitter to schedule a split. Then, we
// reset the tracking for that chunk to 0.
if (_shouldSplitChunk(opCtx, shardKeyPattern, *chunk)) {
// TODO: call ChunkSplitter here
chunk->clearBytesWritten();
}
return chunk->getBytesWritten();
}
bool CollectionShardingState::_shouldSplitChunk(OperationContext* opCtx,
const ShardKeyPattern& shardKeyPattern,
const Chunk& chunk) {
const auto balancerConfig = Grid::get(opCtx)->getBalancerConfiguration();
invariant(balancerConfig);
const KeyPattern keyPattern = shardKeyPattern.getKeyPattern();
const bool minIsInf = (0 == keyPattern.globalMin().woCompare(chunk.getMin()));
const bool maxIsInf = (0 == keyPattern.globalMax().woCompare(chunk.getMax()));
return chunk.shouldSplit(balancerConfig->getMaxChunkSizeBytes(), minIsInf, maxIsInf);
}
} // namespace mongo