/**
* Copyright (C) 2015 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects
* for all of the code used other than as permitted herein. If you modify
* file(s) with this exception, you may extend this exception to your
* version of the file(s), but you are not obligated to do so. If you do not
* wish to do so, delete this exception statement from your version. If you
* delete this exception statement from all source files in the program,
* then also delete it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
#include "mongo/platform/basic.h"
#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/client.h"
#include "mongo/db/concurrency/lock_state.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/repl/replication_coordinator_global.h"
#include "mongo/db/s/collection_metadata.h"
#include "mongo/db/s/migration_chunk_cloner_source.h"
#include "mongo/db/s/migration_source_manager.h"
#include "mongo/db/s/operation_sharding_state.h"
#include "mongo/db/s/sharded_connection_info.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/db/s/type_shard_identity.h"
#include "mongo/s/chunk_version.h"
#include "mongo/s/stale_exception.h"
namespace mongo {
namespace {
/**
* Used to perform shard identity initialization once it is certain that the document is committed.
*/
class ShardIdentityLopOpHandler final : public RecoveryUnit::Change {
public:
ShardIdentityLopOpHandler(OperationContext* txn, ShardIdentityType shardIdentity)
: _txn(txn), _shardIdentity(std::move(shardIdentity)) {}
void commit() override {
fassertNoTrace(
40071,
ShardingState::get(_txn)->initializeFromShardIdentity(_shardIdentity, Date_t::max()));
}
void rollback() override {}
private:
OperationContext* _txn;
const ShardIdentityType _shardIdentity;
};
} // unnamed namespace
using std::string;
CollectionShardingState::CollectionShardingState(
NamespaceString nss, std::unique_ptr initialMetadata)
: _nss(std::move(nss)), _metadata(std::move(initialMetadata)) {}
CollectionShardingState::~CollectionShardingState() {
invariant(!_sourceMgr);
}
CollectionShardingState* CollectionShardingState::get(OperationContext* txn,
const NamespaceString& nss) {
return CollectionShardingState::get(txn, nss.ns());
}
CollectionShardingState* CollectionShardingState::get(OperationContext* txn,
const std::string& ns) {
// Collection lock must be held to have a reference to the collection's sharding state
dassert(txn->lockState()->isCollectionLockedForMode(ns, MODE_IS));
ShardingState* const shardingState = ShardingState::get(txn);
return shardingState->getNS(ns);
}
void CollectionShardingState::setMetadata(std::shared_ptr newMetadata) {
if (newMetadata) {
invariant(!newMetadata->getCollVersion().isWriteCompatibleWith(ChunkVersion::UNSHARDED()));
invariant(!newMetadata->getShardVersion().isWriteCompatibleWith(ChunkVersion::UNSHARDED()));
}
_metadata = std::move(newMetadata);
}
MigrationSourceManager* CollectionShardingState::getMigrationSourceManager() {
return _sourceMgr;
}
void CollectionShardingState::setMigrationSourceManager(OperationContext* txn,
MigrationSourceManager* sourceMgr) {
invariant(txn->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_X));
invariant(sourceMgr);
invariant(!_sourceMgr);
_sourceMgr = sourceMgr;
}
void CollectionShardingState::clearMigrationSourceManager(OperationContext* txn) {
invariant(txn->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_X));
invariant(_sourceMgr);
_sourceMgr = nullptr;
}
void CollectionShardingState::checkShardVersionOrThrow(OperationContext* txn) const {
string errmsg;
ChunkVersion received;
ChunkVersion wanted;
if (!_checkShardVersionOk(txn, &errmsg, &received, &wanted)) {
throw SendStaleConfigException(
_nss.ns(),
str::stream() << "[" << _nss.ns() << "] shard version not ok: " << errmsg,
received,
wanted);
}
}
bool CollectionShardingState::isDocumentInMigratingChunk(OperationContext* txn,
const BSONObj& doc) {
dassert(txn->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX));
if (_sourceMgr) {
return _sourceMgr->getCloner()->isDocumentInMigratingChunk(txn, doc);
}
return false;
}
void CollectionShardingState::onInsertOp(OperationContext* txn, const BSONObj& insertedDoc) {
dassert(txn->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX));
if (serverGlobalParams.clusterRole == ClusterRole::ShardServer &&
_nss == NamespaceString::kConfigCollectionNamespace) {
if (auto idElem = insertedDoc["_id"]) {
if (idElem.str() == ShardIdentityType::IdName) {
auto shardIdentityDoc = uassertStatusOK(ShardIdentityType::fromBSON(insertedDoc));
txn->recoveryUnit()->registerChange(
new ShardIdentityLopOpHandler(txn, std::move(shardIdentityDoc)));
}
}
}
checkShardVersionOrThrow(txn);
if (_sourceMgr) {
_sourceMgr->getCloner()->onInsertOp(txn, insertedDoc);
}
}
void CollectionShardingState::onUpdateOp(OperationContext* txn, const BSONObj& updatedDoc) {
dassert(txn->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX));
checkShardVersionOrThrow(txn);
if (_sourceMgr) {
_sourceMgr->getCloner()->onUpdateOp(txn, updatedDoc);
}
}
void CollectionShardingState::onDeleteOp(OperationContext* txn, const BSONObj& deletedDocId) {
dassert(txn->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX));
if (txn->writesAreReplicated() && serverGlobalParams.clusterRole == ClusterRole::ShardServer &&
_nss == NamespaceString::kConfigCollectionNamespace) {
if (auto idElem = deletedDocId["_id"]) {
uassert(40070,
"cannot delete shardIdentity document while in --shardsvr mode",
idElem.str() != ShardIdentityType::IdName);
}
}
checkShardVersionOrThrow(txn);
if (_sourceMgr) {
_sourceMgr->getCloner()->onDeleteOp(txn, deletedDocId);
}
}
bool CollectionShardingState::_checkShardVersionOk(OperationContext* txn,
string* errmsg,
ChunkVersion* expectedShardVersion,
ChunkVersion* actualShardVersion) const {
Client* client = txn->getClient();
// Operations using the DBDirectClient are unversioned.
if (client->isInDirectClient()) {
return true;
}
if (!repl::ReplicationCoordinator::get(txn)->canAcceptWritesForDatabase(_nss.db())) {
// Right now connections to secondaries aren't versioned at all.
return true;
}
const auto& oss = OperationShardingState::get(txn);
// If there is a version attached to the OperationContext, use it as the received version.
// Otherwise, get the received version from the ShardedConnectionInfo.
if (oss.hasShardVersion()) {
*expectedShardVersion = oss.getShardVersion(_nss);
} else {
ShardedConnectionInfo* info = ShardedConnectionInfo::get(client, false);
if (!info) {
// There is no shard version information on either 'txn' or 'client'. This means that
// the operation represented by 'txn' is unversioned, and the shard version is always OK
// for unversioned operations.
return true;
}
*expectedShardVersion = info->getVersion(_nss.ns());
}
if (ChunkVersion::isIgnoredVersion(*expectedShardVersion)) {
return true;
}
// Set this for error messaging purposes before potentially returning false.
*actualShardVersion = (_metadata ? _metadata->getShardVersion() : ChunkVersion::UNSHARDED());
if (_sourceMgr && _sourceMgr->getMigrationCriticalSection()) {
*errmsg = str::stream() << "migration commit in progress for " << _nss.ns();
// Set migration critical section on operation sharding state: operation will wait for the
// migration to finish before returning failure and retrying.
OperationShardingState::get(txn).setMigrationCriticalSection(
_sourceMgr->getMigrationCriticalSection());
return false;
}
if (expectedShardVersion->isWriteCompatibleWith(*actualShardVersion)) {
return true;
}
//
// Figure out exactly why not compatible, send appropriate error message
// The versions themselves are returned in the error, so not needed in messages here
//
// Check epoch first, to send more meaningful message, since other parameters probably won't
// match either.
if (actualShardVersion->epoch() != expectedShardVersion->epoch()) {
*errmsg = str::stream() << "version epoch mismatch detected for " << _nss.ns() << ", "
<< "the collection may have been dropped and recreated";
return false;
}
if (!actualShardVersion->isSet() && expectedShardVersion->isSet()) {
*errmsg = str::stream() << "this shard no longer contains chunks for " << _nss.ns() << ", "
<< "the collection may have been dropped";
return false;
}
if (actualShardVersion->isSet() && !expectedShardVersion->isSet()) {
*errmsg = str::stream() << "this shard contains versioned chunks for " << _nss.ns() << ", "
<< "but no version set in request";
return false;
}
if (actualShardVersion->majorVersion() != expectedShardVersion->majorVersion()) {
// Could be > or < - wanted is > if this is the source of a migration, wanted < if this is
// the target of a migration
*errmsg = str::stream() << "version mismatch detected for " << _nss.ns();
return false;
}
// Those are all the reasons the versions can mismatch
MONGO_UNREACHABLE;
}
} // namespace mongo