/**
* Copyright (C) 2015 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects
* for all of the code used other than as permitted herein. If you modify
* file(s) with this exception, you may extend this exception to your
* version of the file(s), but you are not obligated to do so. If you do not
* wish to do so, delete this exception statement from your version. If you
* delete this exception statement from all source files in the program,
* then also delete it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
#include "mongo/platform/basic.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/client/remote_command_targeter_factory_impl.h"
#include "mongo/db/client.h"
#include "mongo/db/concurrency/lock_state.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/s/collection_metadata.h"
#include "mongo/db/s/metadata_loader.h"
#include "mongo/db/s/sharded_connection_info.h"
#include "mongo/s/catalog/catalog_manager.h"
#include "mongo/s/catalog/type_chunk.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/chunk_version.h"
#include "mongo/s/grid.h"
#include "mongo/s/sharding_initialization.h"
#include "mongo/util/log.h"
#include "mongo/util/mongoutils/str.h"
#include "mongo/util/net/sock.h"
namespace mongo {
using std::shared_ptr;
using std::string;
using std::vector;
namespace {
const auto getShardingState = ServiceContext::declareDecoration();
enum class VersionChoice { Local, Remote, Unknown };
/**
* Compares a remotely-loaded version (remoteVersion) to the latest local version of a collection
* (localVersion) and returns which one is the newest.
*
* Because it isn't clear during epoch changes which epoch is newer, the local version before the
* reload occurred, 'prevLocalVersion', is used to determine whether the remote epoch is definitely
* newer, or we're not sure.
*/
VersionChoice chooseNewestVersion(ChunkVersion prevLocalVersion,
ChunkVersion localVersion,
ChunkVersion remoteVersion) {
OID prevEpoch = prevLocalVersion.epoch();
OID localEpoch = localVersion.epoch();
OID remoteEpoch = remoteVersion.epoch();
// Everything changed in-flight, so we need to try again
if (prevEpoch != localEpoch && localEpoch != remoteEpoch) {
return VersionChoice::Unknown;
}
// We're in the same (zero) epoch as the latest metadata, nothing to do
if (localEpoch == remoteEpoch && !remoteEpoch.isSet()) {
return VersionChoice::Local;
}
// We're in the same (non-zero) epoch as the latest metadata, so increment the version
if (localEpoch == remoteEpoch && remoteEpoch.isSet()) {
// Use the newer version if possible
if (localVersion < remoteVersion) {
return VersionChoice::Remote;
} else {
return VersionChoice::Local;
}
}
// We're now sure we're installing a new epoch and the epoch didn't change during reload
dassert(prevEpoch == localEpoch && localEpoch != remoteEpoch);
return VersionChoice::Remote;
}
} // namespace
bool isMongos() {
return false;
}
ShardingState::ShardingState()
: _enabled(false),
_configServerTickets(3 /* max number of concurrent config server refresh threads */) {}
ShardingState::~ShardingState() = default;
ShardingState* ShardingState::get(ServiceContext* serviceContext) {
return &getShardingState(serviceContext);
}
ShardingState* ShardingState::get(OperationContext* operationContext) {
return ShardingState::get(operationContext->getServiceContext());
}
bool ShardingState::enabled() {
stdx::lock_guard lk(_mutex);
return _enabled;
}
string ShardingState::getConfigServer(OperationContext* txn) {
stdx::lock_guard lk(_mutex);
invariant(_enabled);
return grid.shardRegistry()->getConfigServerConnectionString().toString();
}
string ShardingState::getShardName() {
stdx::lock_guard lk(_mutex);
invariant(_enabled);
return _shardName;
}
void ShardingState::initialize(const string& server) {
uassert(18509,
"Unable to obtain host name during sharding initialization.",
!getHostName().empty());
stdx::lock_guard lk(_mutex);
if (_enabled) {
// TODO: Do we need to throw exception if the config servers have changed from what we
// already have in place? How do we test for that?
return;
}
ShardedConnectionInfo::addHook();
ConnectionString configServerCS = uassertStatusOK(ConnectionString::parse(server));
uassertStatusOK(initializeGlobalShardingState(configServerCS));
_enabled = true;
}
void ShardingState::setShardName(const string& name) {
const string clientAddr = cc().clientAddress(true);
stdx::lock_guard lk(_mutex);
if (_shardName.empty()) {
// TODO SERVER-2299 remotely verify the name is sound w.r.t IPs
_shardName = name;
log() << "remote client " << clientAddr << " initialized this host as shard " << name;
return;
}
if (_shardName != name) {
const string message = str::stream()
<< "remote client " << clientAddr << " tried to initialize this host as shard " << name
<< ", but shard name was previously initialized as " << _shardName;
warning() << message;
uassertStatusOK({ErrorCodes::AlreadyInitialized, message});
}
}
void ShardingState::clearCollectionMetadata() {
stdx::lock_guard lk(_mutex);
_collMetadata.clear();
}
// TODO we shouldn't need three ways for checking the version. Fix this.
bool ShardingState::hasVersion(const string& ns) {
stdx::lock_guard lk(_mutex);
CollectionMetadataMap::const_iterator it = _collMetadata.find(ns);
return it != _collMetadata.end();
}
ChunkVersion ShardingState::getVersion(const string& ns) {
stdx::lock_guard lk(_mutex);
CollectionMetadataMap::const_iterator it = _collMetadata.find(ns);
if (it != _collMetadata.end()) {
shared_ptr p = it->second;
return p->getShardVersion();
} else {
return ChunkVersion(0, 0, OID());
}
}
void ShardingState::donateChunk(OperationContext* txn,
const string& ns,
const BSONObj& min,
const BSONObj& max,
ChunkVersion version) {
invariant(txn->lockState()->isCollectionLockedForMode(ns, MODE_X));
stdx::lock_guard lk(_mutex);
CollectionMetadataMap::const_iterator it = _collMetadata.find(ns);
verify(it != _collMetadata.end());
shared_ptr p = it->second;
// empty shards should have version 0
version = (p->getNumChunks() > 1) ? version : ChunkVersion(0, 0, p->getCollVersion().epoch());
ChunkType chunk;
chunk.setMin(min);
chunk.setMax(max);
string errMsg;
shared_ptr cloned(p->cloneMigrate(chunk, version, &errMsg));
// uassert to match old behavior, TODO: report errors w/o throwing
uassert(16855, errMsg, NULL != cloned.get());
// TODO: a bit dangerous to have two different zero-version states - no-metadata and
// no-version
_collMetadata[ns] = cloned;
}
void ShardingState::undoDonateChunk(OperationContext* txn,
const string& ns,
shared_ptr prevMetadata) {
invariant(txn->lockState()->isCollectionLockedForMode(ns, MODE_X));
stdx::lock_guard lk(_mutex);
log() << "ShardingState::undoDonateChunk acquired _mutex";
CollectionMetadataMap::iterator it = _collMetadata.find(ns);
verify(it != _collMetadata.end());
it->second = prevMetadata;
}
bool ShardingState::notePending(OperationContext* txn,
const string& ns,
const BSONObj& min,
const BSONObj& max,
const OID& epoch,
string* errMsg) {
invariant(txn->lockState()->isCollectionLockedForMode(ns, MODE_X));
stdx::lock_guard lk(_mutex);
CollectionMetadataMap::const_iterator it = _collMetadata.find(ns);
if (it == _collMetadata.end()) {
*errMsg = str::stream() << "could not note chunk "
<< "[" << min << "," << max << ")"
<< " as pending because the local metadata for " << ns
<< " has changed";
return false;
}
shared_ptr metadata = it->second;
// This can currently happen because drops aren't synchronized with in-migrations
// The idea for checking this here is that in the future we shouldn't have this problem
if (metadata->getCollVersion().epoch() != epoch) {
*errMsg = str::stream() << "could not note chunk "
<< "[" << min << "," << max << ")"
<< " as pending because the epoch for " << ns
<< " has changed from " << epoch << " to "
<< metadata->getCollVersion().epoch();
return false;
}
ChunkType chunk;
chunk.setMin(min);
chunk.setMax(max);
shared_ptr cloned(metadata->clonePlusPending(chunk, errMsg));
if (!cloned)
return false;
_collMetadata[ns] = cloned;
return true;
}
bool ShardingState::forgetPending(OperationContext* txn,
const string& ns,
const BSONObj& min,
const BSONObj& max,
const OID& epoch,
string* errMsg) {
invariant(txn->lockState()->isCollectionLockedForMode(ns, MODE_X));
stdx::lock_guard lk(_mutex);
CollectionMetadataMap::const_iterator it = _collMetadata.find(ns);
if (it == _collMetadata.end()) {
*errMsg = str::stream() << "no need to forget pending chunk "
<< "[" << min << "," << max << ")"
<< " because the local metadata for " << ns << " has changed";
return false;
}
shared_ptr metadata = it->second;
// This can currently happen because drops aren't synchronized with in-migrations
// The idea for checking this here is that in the future we shouldn't have this problem
if (metadata->getCollVersion().epoch() != epoch) {
*errMsg = str::stream() << "no need to forget pending chunk "
<< "[" << min << "," << max << ")"
<< " because the epoch for " << ns << " has changed from " << epoch
<< " to " << metadata->getCollVersion().epoch();
return false;
}
ChunkType chunk;
chunk.setMin(min);
chunk.setMax(max);
shared_ptr cloned(metadata->cloneMinusPending(chunk, errMsg));
if (!cloned)
return false;
_collMetadata[ns] = cloned;
return true;
}
void ShardingState::splitChunk(OperationContext* txn,
const string& ns,
const BSONObj& min,
const BSONObj& max,
const vector& splitKeys,
ChunkVersion version) {
invariant(txn->lockState()->isCollectionLockedForMode(ns, MODE_X));
stdx::lock_guard lk(_mutex);
CollectionMetadataMap::const_iterator it = _collMetadata.find(ns);
verify(it != _collMetadata.end());
ChunkType chunk;
chunk.setMin(min);
chunk.setMax(max);
string errMsg;
shared_ptr cloned(
it->second->cloneSplit(chunk, splitKeys, version, &errMsg));
// uassert to match old behavior, TODO: report errors w/o throwing
uassert(16857, errMsg, NULL != cloned.get());
_collMetadata[ns] = cloned;
}
void ShardingState::mergeChunks(OperationContext* txn,
const string& ns,
const BSONObj& minKey,
const BSONObj& maxKey,
ChunkVersion mergedVersion) {
invariant(txn->lockState()->isCollectionLockedForMode(ns, MODE_X));
stdx::lock_guard lk(_mutex);
CollectionMetadataMap::const_iterator it = _collMetadata.find(ns);
verify(it != _collMetadata.end());
string errMsg;
shared_ptr cloned(
it->second->cloneMerge(minKey, maxKey, mergedVersion, &errMsg));
// uassert to match old behavior, TODO: report errors w/o throwing
uassert(17004, errMsg, NULL != cloned.get());
_collMetadata[ns] = cloned;
}
void ShardingState::resetMetadata(const string& ns) {
stdx::lock_guard lk(_mutex);
warning() << "resetting metadata for " << ns << ", this should only be used in testing";
_collMetadata.erase(ns);
}
Status ShardingState::refreshMetadataIfNeeded(OperationContext* txn,
const string& ns,
const ChunkVersion& reqShardVersion,
ChunkVersion* latestShardVersion) {
// The _configServerTickets serializes this process such that only a small number of threads
// can try to refresh at the same time.
LOG(2) << "metadata refresh requested for " << ns << " at shard version " << reqShardVersion;
//
// Queuing of refresh requests starts here when remote reload is needed. This may take time.
// TODO: Explicitly expose the queuing discipline.
//
_configServerTickets.waitForTicket();
TicketHolderReleaser needTicketFrom(&_configServerTickets);
//
// Fast path - check if the requested version is at a higher version than the current
// metadata version or a different epoch before verifying against config server.
//
shared_ptr storedMetadata;
{
stdx::lock_guard lk(_mutex);
CollectionMetadataMap::iterator it = _collMetadata.find(ns);
if (it != _collMetadata.end())
storedMetadata = it->second;
}
ChunkVersion storedShardVersion;
if (storedMetadata)
storedShardVersion = storedMetadata->getShardVersion();
*latestShardVersion = storedShardVersion;
if (storedShardVersion >= reqShardVersion &&
storedShardVersion.epoch() == reqShardVersion.epoch()) {
// Don't need to remotely reload if we're in the same epoch with a >= version
return Status::OK();
}
//
// Slow path - remotely reload
//
// Cases:
// A) Initial config load and/or secondary take-over.
// B) Migration TO this shard finished, notified by mongos.
// C) Dropping a collection, notified (currently) by mongos.
// D) Stale client wants to reload metadata with a different *epoch*, so we aren't sure.
if (storedShardVersion.epoch() != reqShardVersion.epoch()) {
// Need to remotely reload if our epochs aren't the same, to verify
LOG(1) << "metadata change requested for " << ns << ", from shard version "
<< storedShardVersion << " to " << reqShardVersion
<< ", need to verify with config server";
} else {
// Need to remotely reload since our epochs aren't the same but our version is greater
LOG(1) << "metadata version update requested for " << ns << ", from shard version "
<< storedShardVersion << " to " << reqShardVersion
<< ", need to verify with config server";
}
return doRefreshMetadata(txn, ns, reqShardVersion, true, latestShardVersion);
}
Status ShardingState::refreshMetadataNow(OperationContext* txn,
const string& ns,
ChunkVersion* latestShardVersion) {
return doRefreshMetadata(txn, ns, ChunkVersion(0, 0, OID()), false, latestShardVersion);
}
Status ShardingState::doRefreshMetadata(OperationContext* txn,
const string& ns,
const ChunkVersion& reqShardVersion,
bool useRequestedVersion,
ChunkVersion* latestShardVersion) {
// The idea here is that we're going to reload the metadata from the config server, but
// we need to do so outside any locks. When we get our result back, if the current metadata
// has changed, we may not be able to install the new metadata.
//
// Get the initial metadata
// No DBLock is needed since the metadata is expected to change during reload.
//
shared_ptr beforeMetadata;
{
stdx::lock_guard lk(_mutex);
// We can't reload if sharding is not enabled - i.e. without a config server location
if (!_enabled) {
string errMsg = str::stream() << "cannot refresh metadata for " << ns
<< " before sharding has been enabled";
warning() << errMsg;
return Status(ErrorCodes::NotYetInitialized, errMsg);
}
// We also can't reload if a shard name has not yet been set.
if (_shardName.empty()) {
string errMsg = str::stream() << "cannot refresh metadata for " << ns
<< " before shard name has been set";
warning() << errMsg;
return Status(ErrorCodes::NotYetInitialized, errMsg);
}
CollectionMetadataMap::iterator it = _collMetadata.find(ns);
if (it != _collMetadata.end()) {
beforeMetadata = it->second;
}
}
ChunkVersion beforeShardVersion;
ChunkVersion beforeCollVersion;
if (beforeMetadata) {
beforeShardVersion = beforeMetadata->getShardVersion();
beforeCollVersion = beforeMetadata->getCollVersion();
}
*latestShardVersion = beforeShardVersion;
//
// Determine whether we need to diff or fully reload
//
bool fullReload = false;
if (!beforeMetadata) {
// We don't have any metadata to reload from
fullReload = true;
} else if (useRequestedVersion && reqShardVersion.epoch() != beforeShardVersion.epoch()) {
// It's not useful to use the metadata as a base because we think the epoch will differ
fullReload = true;
}
//
// Load the metadata from the remote server, start construction
//
LOG(0) << "remotely refreshing metadata for " << ns
<< (useRequestedVersion
? string(" with requested shard version ") + reqShardVersion.toString()
: "")
<< (fullReload ? ", current shard version is " : " based on current shard version ")
<< beforeShardVersion << ", current metadata version is " << beforeCollVersion;
string errMsg;
MetadataLoader mdLoader;
shared_ptr remoteMetadata(std::make_shared());
Timer refreshTimer;
long long refreshMillis;
{
auto catalogManagerGuard = grid.catalogManager(txn);
Status status = mdLoader.makeCollectionMetadata(catalogManagerGuard.get(),
ns,
getShardName(),
fullReload ? NULL : beforeMetadata.get(),
remoteMetadata.get());
refreshMillis = refreshTimer.millis();
if (status.code() == ErrorCodes::NamespaceNotFound) {
remoteMetadata.reset();
} else if (!status.isOK()) {
warning() << "could not remotely refresh metadata for " << ns
<< causedBy(status.reason());
return status;
}
}
ChunkVersion remoteShardVersion;
ChunkVersion remoteCollVersion;
if (remoteMetadata) {
remoteShardVersion = remoteMetadata->getShardVersion();
remoteCollVersion = remoteMetadata->getCollVersion();
}
//
// Get ready to install loaded metadata if needed
//
shared_ptr afterMetadata;
ChunkVersion afterShardVersion;
ChunkVersion afterCollVersion;
VersionChoice choice;
// If we choose to install the new metadata, this describes the kind of install
enum InstallType {
InstallType_New,
InstallType_Update,
InstallType_Replace,
InstallType_Drop,
InstallType_None
} installType = InstallType_None; // compiler complains otherwise
{
// Exclusive collection lock needed since we're now potentially changing the metadata,
// and don't want reads/writes to be ongoing.
ScopedTransaction transaction(txn, MODE_IX);
Lock::DBLock dbLock(txn->lockState(), nsToDatabaseSubstring(ns), MODE_IX);
Lock::CollectionLock collLock(txn->lockState(), ns, MODE_X);
//
// Get the metadata now that the load has completed
//
stdx::lock_guard lk(_mutex);
// Don't reload if our config server has changed or sharding is no longer enabled
if (!_enabled) {
string errMsg = str::stream() << "could not refresh metadata for " << ns
<< ", sharding is no longer enabled";
warning() << errMsg;
return Status(ErrorCodes::NotYetInitialized, errMsg);
}
CollectionMetadataMap::iterator it = _collMetadata.find(ns);
if (it != _collMetadata.end())
afterMetadata = it->second;
if (afterMetadata) {
afterShardVersion = afterMetadata->getShardVersion();
afterCollVersion = afterMetadata->getCollVersion();
}
*latestShardVersion = afterShardVersion;
//
// Resolve newer pending chunks with the remote metadata, finish construction
//
Status status = mdLoader.promotePendingChunks(afterMetadata.get(), remoteMetadata.get());
if (!status.isOK()) {
warning() << "remote metadata for " << ns
<< " is inconsistent with current pending chunks"
<< causedBy(status.reason());
return status;
}
//
// Compare the 'before', 'after', and 'remote' versions/epochs and choose newest
// Zero-epochs (sentinel value for "dropped" collections), are tested by
// !epoch.isSet().
//
choice = chooseNewestVersion(beforeCollVersion, afterCollVersion, remoteCollVersion);
if (choice == VersionChoice::Remote) {
dassert(!remoteCollVersion.epoch().isSet() || remoteShardVersion >= beforeShardVersion);
if (!afterCollVersion.epoch().isSet()) {
// First metadata load
installType = InstallType_New;
dassert(it == _collMetadata.end());
_collMetadata.insert(make_pair(ns, remoteMetadata));
} else if (remoteCollVersion.epoch().isSet() &&
remoteCollVersion.epoch() == afterCollVersion.epoch()) {
// Update to existing metadata
installType = InstallType_Update;
// Invariant: If CollMetadata was not found, version should be have been 0.
dassert(it != _collMetadata.end());
it->second = remoteMetadata;
} else if (remoteCollVersion.epoch().isSet()) {
// New epoch detected, replacing metadata
installType = InstallType_Replace;
// Invariant: If CollMetadata was not found, version should be have been 0.
dassert(it != _collMetadata.end());
it->second = remoteMetadata;
} else {
dassert(!remoteCollVersion.epoch().isSet());
// Drop detected
installType = InstallType_Drop;
_collMetadata.erase(it);
}
*latestShardVersion = remoteShardVersion;
}
}
// End _mutex
// End DBWrite
//
// Do messaging based on what happened above
//
string localShardVersionMsg = beforeShardVersion.epoch() == afterShardVersion.epoch()
? afterShardVersion.toString()
: beforeShardVersion.toString() + " / " + afterShardVersion.toString();
if (choice == VersionChoice::Unknown) {
string errMsg = str::stream()
<< "need to retry loading metadata for " << ns
<< ", collection may have been dropped or recreated during load"
<< " (loaded shard version : " << remoteShardVersion.toString()
<< ", stored shard versions : " << localShardVersionMsg << ", took " << refreshMillis
<< "ms)";
warning() << errMsg;
return Status(ErrorCodes::RemoteChangeDetected, errMsg);
}
if (choice == VersionChoice::Local) {
LOG(0) << "metadata of collection " << ns
<< " already up to date (shard version : " << afterShardVersion.toString()
<< ", took " << refreshMillis << "ms)";
return Status::OK();
}
dassert(choice == VersionChoice::Remote);
switch (installType) {
case InstallType_New:
LOG(0) << "collection " << ns << " was previously unsharded"
<< ", new metadata loaded with shard version " << remoteShardVersion;
break;
case InstallType_Update:
LOG(0) << "updating metadata for " << ns << " from shard version "
<< localShardVersionMsg << " to shard version " << remoteShardVersion;
break;
case InstallType_Replace:
LOG(0) << "replacing metadata for " << ns << " at shard version "
<< localShardVersionMsg << " with a new epoch (shard version "
<< remoteShardVersion << ")";
break;
case InstallType_Drop:
LOG(0) << "dropping metadata for " << ns << " at shard version " << localShardVersionMsg
<< ", took " << refreshMillis << "ms";
break;
default:
verify(false);
break;
}
if (installType != InstallType_Drop) {
LOG(0) << "collection version was loaded at version " << remoteCollVersion << ", took "
<< refreshMillis << "ms";
}
return Status::OK();
}
void ShardingState::appendInfo(OperationContext* txn, BSONObjBuilder& builder) {
stdx::lock_guard lk(_mutex);
builder.appendBool("enabled", _enabled);
if (!_enabled) {
return;
}
builder.append("configServer",
grid.shardRegistry()->getConfigServerConnectionString().toString());
builder.append("shardName", _shardName);
BSONObjBuilder versionB(builder.subobjStart("versions"));
for (CollectionMetadataMap::const_iterator it = _collMetadata.begin();
it != _collMetadata.end();
++it) {
shared_ptr metadata = it->second;
versionB.appendTimestamp(it->first, metadata->getShardVersion().toLong());
}
versionB.done();
}
bool ShardingState::needCollectionMetadata(Client* client, const string& ns) const {
if (!_enabled)
return false;
if (!ShardedConnectionInfo::get(client, false))
return false;
return true;
}
shared_ptr ShardingState::getCollectionMetadata(const string& ns) {
stdx::lock_guard lk(_mutex);
CollectionMetadataMap::const_iterator it = _collMetadata.find(ns);
if (it == _collMetadata.end()) {
return shared_ptr();
} else {
return it->second;
}
}
} // namespace mongo