diff options
Diffstat (limited to 'src/mongo/s/d_state.cpp')
-rw-r--r-- | src/mongo/s/d_state.cpp | 2145 |
1 files changed, 1049 insertions, 1096 deletions
diff --git a/src/mongo/s/d_state.cpp b/src/mongo/s/d_state.cpp index e7341e65093..3c7739bc804 100644 --- a/src/mongo/s/d_state.cpp +++ b/src/mongo/s/d_state.cpp @@ -72,1390 +72,1343 @@ namespace mongo { - using boost::optional; - using std::endl; - using std::string; - using std::stringstream; - using std::vector; +using boost::optional; +using std::endl; +using std::string; +using std::stringstream; +using std::vector; + +namespace { +const auto clientSCI = Client::declareDecoration<optional<ShardedConnectionInfo>>(); +} // namespace + +bool isMongos() { + return false; +} - namespace { - const auto clientSCI = Client::declareDecoration<optional<ShardedConnectionInfo>>(); - } // namespace - bool isMongos() { - return false; - } +// -----ShardingState START ---- +ShardingState::ShardingState() + : _enabled(false), + _configServerTickets(3 /* max number of concurrent config server refresh threads */) {} - // -----ShardingState START ---- +bool ShardingState::enabled() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + return _enabled; +} - ShardingState::ShardingState() - : _enabled(false), - _configServerTickets( 3 /* max number of concurrent config server refresh threads */ ) { - } +string ShardingState::getConfigServer() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + invariant(_enabled); - bool ShardingState::enabled() { - stdx::lock_guard<stdx::mutex> lk(_mutex); - return _enabled; - } + return grid.catalogManager()->connectionString().toString(); +} - string ShardingState::getConfigServer() { - stdx::lock_guard<stdx::mutex> lk(_mutex); - invariant(_enabled); +void ShardingState::initialize(const string& server) { + uassert(18509, + "Unable to obtain host name during sharding initialization.", + !getHostName().empty()); - return grid.catalogManager()->connectionString().toString(); - } + shardingState._initialize(server); +} - void ShardingState::initialize(const string& server) { - uassert(18509, - "Unable to obtain host name during sharding initialization.", - !getHostName().empty()); +// TODO: Consolidate and eliminate these various ways of setting / validating shard names +bool ShardingState::setShardName(const string& name) { + return setShardNameAndHost(name, ""); +} - shardingState._initialize(server); - } +std::string ShardingState::getShardName() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + return _shardName; +} - // TODO: Consolidate and eliminate these various ways of setting / validating shard names - bool ShardingState::setShardName( const string& name ) { - return setShardNameAndHost( name, "" ); - } +bool ShardingState::setShardNameAndHost(const string& name, const string& host) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + if (_shardName.size() == 0) { + // TODO SERVER-2299 remotely verify the name is sound w.r.t IPs + _shardName = name; - std::string ShardingState::getShardName() { - stdx::lock_guard<stdx::mutex> lk(_mutex); - return _shardName; - } + string clientAddr = cc().clientAddress(true); - bool ShardingState::setShardNameAndHost( const string& name, const string& host ) { - stdx::lock_guard<stdx::mutex> lk(_mutex); - if ( _shardName.size() == 0 ) { - // TODO SERVER-2299 remotely verify the name is sound w.r.t IPs - _shardName = name; + log() << "remote client " << clientAddr << " initialized this host " + << (host.empty() ? string("") : string("(") + host + ") ") << "as shard " << name; - string clientAddr = cc().clientAddress(true); + return true; + } - log() << "remote client " << clientAddr << " initialized this host " - << ( host.empty() ? string( "" ) : string( "(" ) + host + ") " ) - << "as shard " << name; + if (_shardName == name) + return true; - return true; - } + string clientAddr = cc().clientAddress(true); - if ( _shardName == name ) - return true; + warning() << "remote client " << clientAddr << " tried to initialize this host " + << (host.empty() ? string("") : string("(") + host + ") ") << "as shard " << name + << ", but shard name was previously initialized as " << _shardName; - string clientAddr = cc().clientAddress(true); + return false; +} - warning() << "remote client " << clientAddr << " tried to initialize this host " - << ( host.empty() ? string( "" ) : string( "(" ) + host + ") " ) - << "as shard " << name - << ", but shard name was previously initialized as " << _shardName; +void ShardingState::gotShardName(const string& name) { + gotShardNameAndHost(name, ""); +} - return false; - } +void ShardingState::gotShardNameAndHost(const string& name, const string& host) { + if (setShardNameAndHost(name, host)) + return; - void ShardingState::gotShardName( const string& name ) { - gotShardNameAndHost( name, "" ); - } + string clientAddr = cc().clientAddress(true); + stringstream ss; - void ShardingState::gotShardNameAndHost( const string& name, const string& host ) { - if ( setShardNameAndHost( name, host ) ) - return; + // Same error as above, to match for reporting + ss << "remote client " << clientAddr << " tried to initialize this host " + << (host.empty() ? string("") : string("(") + host + ") ") << "as shard " << name + << ", but shard name was previously initialized as " << _shardName; - string clientAddr = cc().clientAddress(true); - stringstream ss; + msgasserted(13298, ss.str()); +} - // Same error as above, to match for reporting - ss << "remote client " << clientAddr << " tried to initialize this host " - << ( host.empty() ? string( "" ) : string( "(" ) + host + ") " ) - << "as shard " << name - << ", but shard name was previously initialized as " << _shardName; +void ShardingState::clearCollectionMetadata() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + _collMetadata.clear(); +} - msgasserted( 13298 , ss.str() ); - } +// TODO we shouldn't need three ways for checking the version. Fix this. +bool ShardingState::hasVersion(const string& ns) { + stdx::lock_guard<stdx::mutex> lk(_mutex); - void ShardingState::clearCollectionMetadata() { - stdx::lock_guard<stdx::mutex> lk(_mutex); - _collMetadata.clear(); - } + CollectionMetadataMap::const_iterator it = _collMetadata.find(ns); + return it != _collMetadata.end(); +} - // TODO we shouldn't need three ways for checking the version. Fix this. - bool ShardingState::hasVersion( const string& ns ) { - stdx::lock_guard<stdx::mutex> lk(_mutex); +bool ShardingState::hasVersion(const string& ns, ChunkVersion& version) { + stdx::lock_guard<stdx::mutex> lk(_mutex); - CollectionMetadataMap::const_iterator it = _collMetadata.find(ns); - return it != _collMetadata.end(); - } + CollectionMetadataMap::const_iterator it = _collMetadata.find(ns); + if (it == _collMetadata.end()) + return false; - bool ShardingState::hasVersion( const string& ns , ChunkVersion& version ) { - stdx::lock_guard<stdx::mutex> lk(_mutex); + CollectionMetadataPtr p = it->second; + version = p->getShardVersion(); + return true; +} - CollectionMetadataMap::const_iterator it = _collMetadata.find(ns); - if ( it == _collMetadata.end() ) - return false; +ChunkVersion ShardingState::getVersion(const string& ns) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + CollectionMetadataMap::const_iterator it = _collMetadata.find(ns); + if (it != _collMetadata.end()) { CollectionMetadataPtr p = it->second; - version = p->getShardVersion(); - return true; + return p->getShardVersion(); + } else { + return ChunkVersion(0, 0, OID()); } +} - ChunkVersion ShardingState::getVersion(const string& ns) { - stdx::lock_guard<stdx::mutex> lk(_mutex); - - CollectionMetadataMap::const_iterator it = _collMetadata.find( ns ); - if ( it != _collMetadata.end() ) { - CollectionMetadataPtr p = it->second; - return p->getShardVersion(); - } - else { - return ChunkVersion( 0, 0, OID() ); - } - } +void ShardingState::donateChunk(OperationContext* txn, + const string& ns, + const BSONObj& min, + const BSONObj& max, + ChunkVersion version) { + invariant(txn->lockState()->isCollectionLockedForMode(ns, MODE_X)); + stdx::lock_guard<stdx::mutex> lk(_mutex); + + CollectionMetadataMap::const_iterator it = _collMetadata.find(ns); + verify(it != _collMetadata.end()); + CollectionMetadataPtr p = it->second; + + // empty shards should have version 0 + version = (p->getNumChunks() > 1) ? version : ChunkVersion(0, 0, p->getCollVersion().epoch()); + + ChunkType chunk; + chunk.setMin(min); + chunk.setMax(max); + string errMsg; + + CollectionMetadataPtr cloned(p->cloneMigrate(chunk, version, &errMsg)); + // uassert to match old behavior, TODO: report errors w/o throwing + uassert(16855, errMsg, NULL != cloned.get()); + + // TODO: a bit dangerous to have two different zero-version states - no-metadata and + // no-version + _collMetadata[ns] = cloned; +} - void ShardingState::donateChunk(OperationContext* txn, +void ShardingState::undoDonateChunk(OperationContext* txn, const string& ns, - const BSONObj& min, - const BSONObj& max, - ChunkVersion version) { + CollectionMetadataPtr prevMetadata) { + invariant(txn->lockState()->isCollectionLockedForMode(ns, MODE_X)); + stdx::lock_guard<stdx::mutex> lk(_mutex); - invariant(txn->lockState()->isCollectionLockedForMode(ns, MODE_X)); - stdx::lock_guard<stdx::mutex> lk( _mutex ); - - CollectionMetadataMap::const_iterator it = _collMetadata.find( ns ); - verify( it != _collMetadata.end() ) ; - CollectionMetadataPtr p = it->second; + log() << "ShardingState::undoDonateChunk acquired _mutex" << endl; - // empty shards should have version 0 - version = - ( p->getNumChunks() > 1 ) ? - version : ChunkVersion( 0, 0, p->getCollVersion().epoch() ); - - ChunkType chunk; - chunk.setMin( min ); - chunk.setMax( max ); - string errMsg; + CollectionMetadataMap::iterator it = _collMetadata.find(ns); + verify(it != _collMetadata.end()); + it->second = prevMetadata; +} - CollectionMetadataPtr cloned( p->cloneMigrate( chunk, version, &errMsg ) ); - // uassert to match old behavior, TODO: report errors w/o throwing - uassert( 16855, errMsg, NULL != cloned.get() ); +bool ShardingState::notePending(OperationContext* txn, + const string& ns, + const BSONObj& min, + const BSONObj& max, + const OID& epoch, + string* errMsg) { + invariant(txn->lockState()->isCollectionLockedForMode(ns, MODE_X)); + stdx::lock_guard<stdx::mutex> lk(_mutex); + + CollectionMetadataMap::const_iterator it = _collMetadata.find(ns); + if (it == _collMetadata.end()) { + *errMsg = str::stream() << "could not note chunk " + << "[" << min << "," << max << ")" + << " as pending because the local metadata for " << ns + << " has changed"; - // TODO: a bit dangerous to have two different zero-version states - no-metadata and - // no-version - _collMetadata[ns] = cloned; + return false; } - void ShardingState::undoDonateChunk(OperationContext* txn, - const string& ns, - CollectionMetadataPtr prevMetadata) { - - invariant(txn->lockState()->isCollectionLockedForMode(ns, MODE_X)); - stdx::lock_guard<stdx::mutex> lk( _mutex ); + CollectionMetadataPtr metadata = it->second; - log() << "ShardingState::undoDonateChunk acquired _mutex" << endl; + // This can currently happen because drops aren't synchronized with in-migrations + // The idea for checking this here is that in the future we shouldn't have this problem + if (metadata->getCollVersion().epoch() != epoch) { + *errMsg = str::stream() << "could not note chunk " + << "[" << min << "," << max << ")" + << " as pending because the epoch for " << ns + << " has changed from " << epoch << " to " + << metadata->getCollVersion().epoch(); - CollectionMetadataMap::iterator it = _collMetadata.find( ns ); - verify( it != _collMetadata.end() ); - it->second = prevMetadata; + return false; } - bool ShardingState::notePending(OperationContext* txn, - const string& ns, - const BSONObj& min, - const BSONObj& max, - const OID& epoch, - string* errMsg ) { + ChunkType chunk; + chunk.setMin(min); + chunk.setMax(max); - invariant(txn->lockState()->isCollectionLockedForMode(ns, MODE_X)); - stdx::lock_guard<stdx::mutex> lk( _mutex ); + CollectionMetadataPtr cloned(metadata->clonePlusPending(chunk, errMsg)); + if (!cloned) + return false; - CollectionMetadataMap::const_iterator it = _collMetadata.find( ns ); - if ( it == _collMetadata.end() ) { + _collMetadata[ns] = cloned; + return true; +} - *errMsg = str::stream() << "could not note chunk " << "[" << min << "," << max << ")" - << " as pending because the local metadata for " << ns - << " has changed"; +bool ShardingState::forgetPending(OperationContext* txn, + const string& ns, + const BSONObj& min, + const BSONObj& max, + const OID& epoch, + string* errMsg) { + invariant(txn->lockState()->isCollectionLockedForMode(ns, MODE_X)); + stdx::lock_guard<stdx::mutex> lk(_mutex); + + CollectionMetadataMap::const_iterator it = _collMetadata.find(ns); + if (it == _collMetadata.end()) { + *errMsg = str::stream() << "no need to forget pending chunk " + << "[" << min << "," << max << ")" + << " because the local metadata for " << ns << " has changed"; - return false; - } + return false; + } - CollectionMetadataPtr metadata = it->second; + CollectionMetadataPtr metadata = it->second; - // This can currently happen because drops aren't synchronized with in-migrations - // The idea for checking this here is that in the future we shouldn't have this problem - if ( metadata->getCollVersion().epoch() != epoch ) { + // This can currently happen because drops aren't synchronized with in-migrations + // The idea for checking this here is that in the future we shouldn't have this problem + if (metadata->getCollVersion().epoch() != epoch) { + *errMsg = str::stream() << "no need to forget pending chunk " + << "[" << min << "," << max << ")" + << " because the epoch for " << ns << " has changed from " << epoch + << " to " << metadata->getCollVersion().epoch(); - *errMsg = str::stream() << "could not note chunk " << "[" << min << "," << max << ")" - << " as pending because the epoch for " << ns - << " has changed from " - << epoch << " to " << metadata->getCollVersion().epoch(); + return false; + } - return false; - } + ChunkType chunk; + chunk.setMin(min); + chunk.setMax(max); - ChunkType chunk; - chunk.setMin( min ); - chunk.setMax( max ); + CollectionMetadataPtr cloned(metadata->cloneMinusPending(chunk, errMsg)); + if (!cloned) + return false; - CollectionMetadataPtr cloned( metadata->clonePlusPending( chunk, errMsg ) ); - if ( !cloned ) return false; + _collMetadata[ns] = cloned; + return true; +} - _collMetadata[ns] = cloned; - return true; - } +void ShardingState::splitChunk(OperationContext* txn, + const string& ns, + const BSONObj& min, + const BSONObj& max, + const vector<BSONObj>& splitKeys, + ChunkVersion version) { + invariant(txn->lockState()->isCollectionLockedForMode(ns, MODE_X)); + stdx::lock_guard<stdx::mutex> lk(_mutex); + + CollectionMetadataMap::const_iterator it = _collMetadata.find(ns); + verify(it != _collMetadata.end()); + + ChunkType chunk; + chunk.setMin(min); + chunk.setMax(max); + string errMsg; + + CollectionMetadataPtr cloned(it->second->cloneSplit(chunk, splitKeys, version, &errMsg)); + // uassert to match old behavior, TODO: report errors w/o throwing + uassert(16857, errMsg, NULL != cloned.get()); + + _collMetadata[ns] = cloned; +} - bool ShardingState::forgetPending(OperationContext* txn, - const string& ns, - const BSONObj& min, - const BSONObj& max, - const OID& epoch, - string* errMsg ) { +void ShardingState::mergeChunks(OperationContext* txn, + const string& ns, + const BSONObj& minKey, + const BSONObj& maxKey, + ChunkVersion mergedVersion) { + invariant(txn->lockState()->isCollectionLockedForMode(ns, MODE_X)); + stdx::lock_guard<stdx::mutex> lk(_mutex); - invariant(txn->lockState()->isCollectionLockedForMode(ns, MODE_X)); - stdx::lock_guard<stdx::mutex> lk( _mutex ); + CollectionMetadataMap::const_iterator it = _collMetadata.find(ns); + verify(it != _collMetadata.end()); - CollectionMetadataMap::const_iterator it = _collMetadata.find( ns ); - if ( it == _collMetadata.end() ) { + string errMsg; - *errMsg = str::stream() << "no need to forget pending chunk " - << "[" << min << "," << max << ")" - << " because the local metadata for " << ns << " has changed"; + CollectionMetadataPtr cloned(it->second->cloneMerge(minKey, maxKey, mergedVersion, &errMsg)); + // uassert to match old behavior, TODO: report errors w/o throwing + uassert(17004, errMsg, NULL != cloned.get()); - return false; - } + _collMetadata[ns] = cloned; +} - CollectionMetadataPtr metadata = it->second; +void ShardingState::resetMetadata(const string& ns) { + stdx::lock_guard<stdx::mutex> lk(_mutex); - // This can currently happen because drops aren't synchronized with in-migrations - // The idea for checking this here is that in the future we shouldn't have this problem - if ( metadata->getCollVersion().epoch() != epoch ) { + warning() << "resetting metadata for " << ns << ", this should only be used in testing" << endl; - *errMsg = str::stream() << "no need to forget pending chunk " - << "[" << min << "," << max << ")" - << " because the epoch for " << ns << " has changed from " - << epoch << " to " << metadata->getCollVersion().epoch(); + _collMetadata.erase(ns); +} - return false; - } +Status ShardingState::refreshMetadataIfNeeded(OperationContext* txn, + const string& ns, + const ChunkVersion& reqShardVersion, + ChunkVersion* latestShardVersion) { + // The _configServerTickets serializes this process such that only a small number of threads + // can try to refresh at the same time. - ChunkType chunk; - chunk.setMin( min ); - chunk.setMax( max ); + LOG(2) << "metadata refresh requested for " << ns << " at shard version " << reqShardVersion + << endl; - CollectionMetadataPtr cloned( metadata->cloneMinusPending( chunk, errMsg ) ); - if ( !cloned ) return false; + // + // Queuing of refresh requests starts here when remote reload is needed. This may take time. + // TODO: Explicitly expose the queuing discipline. + // - _collMetadata[ns] = cloned; - return true; - } + _configServerTickets.waitForTicket(); + TicketHolderReleaser needTicketFrom(&_configServerTickets); - void ShardingState::splitChunk(OperationContext* txn, - const string& ns, - const BSONObj& min, - const BSONObj& max, - const vector<BSONObj>& splitKeys, - ChunkVersion version ) { + // + // Fast path - check if the requested version is at a higher version than the current + // metadata version or a different epoch before verifying against config server. + // - invariant(txn->lockState()->isCollectionLockedForMode(ns, MODE_X)); - stdx::lock_guard<stdx::mutex> lk( _mutex ); + CollectionMetadataPtr storedMetadata; + { + stdx::lock_guard<stdx::mutex> lk(_mutex); + CollectionMetadataMap::iterator it = _collMetadata.find(ns); + if (it != _collMetadata.end()) + storedMetadata = it->second; + } + ChunkVersion storedShardVersion; + if (storedMetadata) + storedShardVersion = storedMetadata->getShardVersion(); + *latestShardVersion = storedShardVersion; + + if (storedShardVersion >= reqShardVersion && + storedShardVersion.epoch() == reqShardVersion.epoch()) { + // Don't need to remotely reload if we're in the same epoch with a >= version + return Status::OK(); + } + + // + // Slow path - remotely reload + // + // Cases: + // A) Initial config load and/or secondary take-over. + // B) Migration TO this shard finished, notified by mongos. + // C) Dropping a collection, notified (currently) by mongos. + // D) Stale client wants to reload metadata with a different *epoch*, so we aren't sure. + + if (storedShardVersion.epoch() != reqShardVersion.epoch()) { + // Need to remotely reload if our epochs aren't the same, to verify + LOG(1) << "metadata change requested for " << ns << ", from shard version " + << storedShardVersion << " to " << reqShardVersion + << ", need to verify with config server" << endl; + } else { + // Need to remotely reload since our epochs aren't the same but our version is greater + LOG(1) << "metadata version update requested for " << ns << ", from shard version " + << storedShardVersion << " to " << reqShardVersion + << ", need to verify with config server" << endl; + } - CollectionMetadataMap::const_iterator it = _collMetadata.find( ns ); - verify( it != _collMetadata.end() ) ; + return doRefreshMetadata(txn, ns, reqShardVersion, true, latestShardVersion); +} - ChunkType chunk; - chunk.setMin( min ); - chunk.setMax( max ); - string errMsg; +Status ShardingState::refreshMetadataNow(OperationContext* txn, + const string& ns, + ChunkVersion* latestShardVersion) { + return doRefreshMetadata(txn, ns, ChunkVersion(0, 0, OID()), false, latestShardVersion); +} - CollectionMetadataPtr cloned( it->second->cloneSplit( chunk, splitKeys, version, &errMsg ) ); - // uassert to match old behavior, TODO: report errors w/o throwing - uassert( 16857, errMsg, NULL != cloned.get() ); +void ShardingState::_initialize(const string& server) { + // Ensure only one caller at a time initializes + stdx::lock_guard<stdx::mutex> lk(_mutex); - _collMetadata[ns] = cloned; + if (_enabled) { + // TODO: Do we need to throw exception if the config servers have changed from what we + // already have in place? How do we test for that? + return; } - void ShardingState::mergeChunks(OperationContext* txn, - const string& ns, - const BSONObj& minKey, - const BSONObj& maxKey, - ChunkVersion mergedVersion ) { + ShardedConnectionInfo::addHook(); - invariant(txn->lockState()->isCollectionLockedForMode(ns, MODE_X)); - stdx::lock_guard<stdx::mutex> lk( _mutex ); + std::string errmsg; + ConnectionString configServerCS = ConnectionString::parse(server, errmsg); + uassert(28633, + str::stream() << "Invalid config server connection string: " << errmsg, + configServerCS.isValid()); - CollectionMetadataMap::const_iterator it = _collMetadata.find( ns ); - verify( it != _collMetadata.end() ); + auto catalogManager = stdx::make_unique<CatalogManagerLegacy>(); + uassertStatusOK(catalogManager->init(configServerCS)); - string errMsg; + auto shardRegistry = + stdx::make_unique<ShardRegistry>(stdx::make_unique<RemoteCommandTargeterFactoryImpl>(), + stdx::make_unique<RemoteCommandRunnerImpl>(0), + std::unique_ptr<executor::TaskExecutor>{nullptr}, + catalogManager.get()); - CollectionMetadataPtr cloned( it->second->cloneMerge( minKey, - maxKey, - mergedVersion, - &errMsg ) ); - // uassert to match old behavior, TODO: report errors w/o throwing - uassert( 17004, errMsg, NULL != cloned.get() ); + grid.init(std::move(catalogManager), std::move(shardRegistry)); - _collMetadata[ns] = cloned; - } + _enabled = true; +} - void ShardingState::resetMetadata( const string& ns ) { - stdx::lock_guard<stdx::mutex> lk( _mutex ); +Status ShardingState::doRefreshMetadata(OperationContext* txn, + const string& ns, + const ChunkVersion& reqShardVersion, + bool useRequestedVersion, + ChunkVersion* latestShardVersion) { + // The idea here is that we're going to reload the metadata from the config server, but + // we need to do so outside any locks. When we get our result back, if the current metadata + // has changed, we may not be able to install the new metadata. - warning() << "resetting metadata for " << ns << ", this should only be used in testing" - << endl; + // + // Get the initial metadata + // No DBLock is needed since the metadata is expected to change during reload. + // - _collMetadata.erase( ns ); - } + CollectionMetadataPtr beforeMetadata; - Status ShardingState::refreshMetadataIfNeeded( OperationContext* txn, - const string& ns, - const ChunkVersion& reqShardVersion, - ChunkVersion* latestShardVersion ) { - // The _configServerTickets serializes this process such that only a small number of threads - // can try to refresh at the same time. - - LOG( 2 ) << "metadata refresh requested for " << ns << " at shard version " - << reqShardVersion << endl; - - // - // Queuing of refresh requests starts here when remote reload is needed. This may take time. - // TODO: Explicitly expose the queuing discipline. - // - - _configServerTickets.waitForTicket(); - TicketHolderReleaser needTicketFrom( &_configServerTickets ); + stdx::lock_guard<stdx::mutex> lk(_mutex); - // - // Fast path - check if the requested version is at a higher version than the current - // metadata version or a different epoch before verifying against config server. - // + // We can't reload if sharding is not enabled - i.e. without a config server location + if (!_enabled) { + string errMsg = str::stream() << "cannot refresh metadata for " << ns + << " before sharding has been enabled"; - CollectionMetadataPtr storedMetadata; - { - stdx::lock_guard<stdx::mutex> lk( _mutex ); - CollectionMetadataMap::iterator it = _collMetadata.find( ns ); - if ( it != _collMetadata.end() ) storedMetadata = it->second; + warning() << errMsg; + return Status(ErrorCodes::NotYetInitialized, errMsg); } - ChunkVersion storedShardVersion; - if ( storedMetadata ) storedShardVersion = storedMetadata->getShardVersion(); - *latestShardVersion = storedShardVersion; - if ( storedShardVersion >= reqShardVersion && - storedShardVersion.epoch() == reqShardVersion.epoch() ) { + // We also can't reload if a shard name has not yet been set. + if (_shardName.empty()) { + string errMsg = str::stream() << "cannot refresh metadata for " << ns + << " before shard name has been set"; - // Don't need to remotely reload if we're in the same epoch with a >= version - return Status::OK(); + warning() << errMsg; + return Status(ErrorCodes::NotYetInitialized, errMsg); } - // - // Slow path - remotely reload - // - // Cases: - // A) Initial config load and/or secondary take-over. - // B) Migration TO this shard finished, notified by mongos. - // C) Dropping a collection, notified (currently) by mongos. - // D) Stale client wants to reload metadata with a different *epoch*, so we aren't sure. - - if ( storedShardVersion.epoch() != reqShardVersion.epoch() ) { - // Need to remotely reload if our epochs aren't the same, to verify - LOG( 1 ) << "metadata change requested for " << ns << ", from shard version " - << storedShardVersion << " to " << reqShardVersion - << ", need to verify with config server" << endl; - } - else { - // Need to remotely reload since our epochs aren't the same but our version is greater - LOG( 1 ) << "metadata version update requested for " << ns - << ", from shard version " << storedShardVersion << " to " << reqShardVersion - << ", need to verify with config server" << endl; + CollectionMetadataMap::iterator it = _collMetadata.find(ns); + if (it != _collMetadata.end()) { + beforeMetadata = it->second; } - - return doRefreshMetadata(txn, ns, reqShardVersion, true, latestShardVersion); } - Status ShardingState::refreshMetadataNow(OperationContext* txn, - const string& ns, - ChunkVersion* latestShardVersion) { - return doRefreshMetadata(txn, ns, ChunkVersion(0, 0, OID()), false, latestShardVersion); + ChunkVersion beforeShardVersion; + ChunkVersion beforeCollVersion; + if (beforeMetadata) { + beforeShardVersion = beforeMetadata->getShardVersion(); + beforeCollVersion = beforeMetadata->getCollVersion(); } - void ShardingState::_initialize(const string& server) { - // Ensure only one caller at a time initializes - stdx::lock_guard<stdx::mutex> lk(_mutex); + *latestShardVersion = beforeShardVersion; - if (_enabled) { - // TODO: Do we need to throw exception if the config servers have changed from what we - // already have in place? How do we test for that? - return; - } + // + // Determine whether we need to diff or fully reload + // - ShardedConnectionInfo::addHook(); + bool fullReload = false; + if (!beforeMetadata) { + // We don't have any metadata to reload from + fullReload = true; + } else if (useRequestedVersion && reqShardVersion.epoch() != beforeShardVersion.epoch()) { + // It's not useful to use the metadata as a base because we think the epoch will differ + fullReload = true; + } + + // + // Load the metadata from the remote server, start construction + // + + LOG(0) << "remotely refreshing metadata for " << ns + << (useRequestedVersion + ? string(" with requested shard version ") + reqShardVersion.toString() + : "") + << (fullReload ? ", current shard version is " : " based on current shard version ") + << beforeShardVersion << ", current metadata version is " << beforeCollVersion << endl; + + string errMsg; + + MetadataLoader mdLoader; + CollectionMetadata* remoteMetadataRaw = new CollectionMetadata(); + CollectionMetadataPtr remoteMetadata(remoteMetadataRaw); + + Timer refreshTimer; + Status status = mdLoader.makeCollectionMetadata(grid.catalogManager(), + ns, + getShardName(), + fullReload ? NULL : beforeMetadata.get(), + remoteMetadataRaw); + long long refreshMillis = refreshTimer.millis(); + + if (status.code() == ErrorCodes::NamespaceNotFound) { + remoteMetadata.reset(); + remoteMetadataRaw = NULL; + } else if (!status.isOK()) { + warning() << "could not remotely refresh metadata for " << ns << causedBy(status.reason()) + << endl; - std::string errmsg; - ConnectionString configServerCS = ConnectionString::parse(server, errmsg); - uassert(28633, - str::stream() << "Invalid config server connection string: " << errmsg, - configServerCS.isValid()); + return status; + } - auto catalogManager = stdx::make_unique<CatalogManagerLegacy>(); - uassertStatusOK(catalogManager->init(configServerCS)); + ChunkVersion remoteShardVersion; + ChunkVersion remoteCollVersion; + if (remoteMetadata) { + remoteShardVersion = remoteMetadata->getShardVersion(); + remoteCollVersion = remoteMetadata->getCollVersion(); + } - auto shardRegistry = stdx::make_unique<ShardRegistry>( - stdx::make_unique<RemoteCommandTargeterFactoryImpl>(), - stdx::make_unique<RemoteCommandRunnerImpl>(0), - std::unique_ptr<executor::TaskExecutor>{nullptr}, - catalogManager.get()); + // + // Get ready to install loaded metadata if needed + // - grid.init(std::move(catalogManager), std::move(shardRegistry)); + CollectionMetadataPtr afterMetadata; + ChunkVersion afterShardVersion; + ChunkVersion afterCollVersion; + ChunkVersion::VersionChoice choice; - _enabled = true; - } + // If we choose to install the new metadata, this describes the kind of install + enum InstallType { + InstallType_New, + InstallType_Update, + InstallType_Replace, + InstallType_Drop, + InstallType_None + } installType = InstallType_None; // compiler complains otherwise - Status ShardingState::doRefreshMetadata( OperationContext* txn, - const string& ns, - const ChunkVersion& reqShardVersion, - bool useRequestedVersion, - ChunkVersion* latestShardVersion ) { - // The idea here is that we're going to reload the metadata from the config server, but - // we need to do so outside any locks. When we get our result back, if the current metadata - // has changed, we may not be able to install the new metadata. + // Exclusive collection lock needed since we're now potentially changing the metadata, + // and don't want reads/writes to be ongoing. + ScopedTransaction transaction(txn, MODE_IX); + Lock::DBLock dbLock(txn->lockState(), nsToDatabaseSubstring(ns), MODE_IX); + Lock::CollectionLock collLock(txn->lockState(), ns, MODE_X); // - // Get the initial metadata - // No DBLock is needed since the metadata is expected to change during reload. + // Get the metadata now that the load has completed // - CollectionMetadataPtr beforeMetadata; - - { - stdx::lock_guard<stdx::mutex> lk( _mutex ); - - // We can't reload if sharding is not enabled - i.e. without a config server location - if (!_enabled) { - string errMsg = str::stream() << "cannot refresh metadata for " << ns - << " before sharding has been enabled"; - - warning() << errMsg; - return Status(ErrorCodes::NotYetInitialized, errMsg); - } - - // We also can't reload if a shard name has not yet been set. - if (_shardName.empty()) { - - string errMsg = str::stream() << "cannot refresh metadata for " << ns - << " before shard name has been set"; - - warning() << errMsg; - return Status(ErrorCodes::NotYetInitialized, errMsg); - } + stdx::lock_guard<stdx::mutex> lk(_mutex); - CollectionMetadataMap::iterator it = _collMetadata.find(ns); - if (it != _collMetadata.end()) { - beforeMetadata = it->second; - } - } + // Don't reload if our config server has changed or sharding is no longer enabled + if (!_enabled) { + string errMsg = str::stream() << "could not refresh metadata for " << ns + << ", sharding is no longer enabled"; - ChunkVersion beforeShardVersion; - ChunkVersion beforeCollVersion; - if ( beforeMetadata ) { - beforeShardVersion = beforeMetadata->getShardVersion(); - beforeCollVersion = beforeMetadata->getCollVersion(); + warning() << errMsg; + return Status(ErrorCodes::NotYetInitialized, errMsg); } - *latestShardVersion = beforeShardVersion; - - // - // Determine whether we need to diff or fully reload - // + CollectionMetadataMap::iterator it = _collMetadata.find(ns); + if (it != _collMetadata.end()) + afterMetadata = it->second; - bool fullReload = false; - if ( !beforeMetadata ) { - // We don't have any metadata to reload from - fullReload = true; - } - else if ( useRequestedVersion && reqShardVersion.epoch() != beforeShardVersion.epoch() ) { - // It's not useful to use the metadata as a base because we think the epoch will differ - fullReload = true; + if (afterMetadata) { + afterShardVersion = afterMetadata->getShardVersion(); + afterCollVersion = afterMetadata->getCollVersion(); } + *latestShardVersion = afterShardVersion; + // - // Load the metadata from the remote server, start construction + // Resolve newer pending chunks with the remote metadata, finish construction // - LOG( 0 ) << "remotely refreshing metadata for " << ns - << ( useRequestedVersion ? - string( " with requested shard version " ) + reqShardVersion.toString() : "" ) - << ( fullReload ? - ", current shard version is " : " based on current shard version " ) - << beforeShardVersion - << ", current metadata version is " << beforeCollVersion << endl; - - string errMsg; - - MetadataLoader mdLoader; - CollectionMetadata* remoteMetadataRaw = new CollectionMetadata(); - CollectionMetadataPtr remoteMetadata( remoteMetadataRaw ); - - Timer refreshTimer; - Status status = - mdLoader.makeCollectionMetadata(grid.catalogManager(), - ns, - getShardName(), - fullReload ? NULL : beforeMetadata.get(), - remoteMetadataRaw); - long long refreshMillis = refreshTimer.millis(); - - if ( status.code() == ErrorCodes::NamespaceNotFound ) { - remoteMetadata.reset(); - remoteMetadataRaw = NULL; - } - else if ( !status.isOK() ) { + status = mdLoader.promotePendingChunks(afterMetadata.get(), remoteMetadataRaw); - warning() << "could not remotely refresh metadata for " << ns - << causedBy( status.reason() ) << endl; + if (!status.isOK()) { + warning() << "remote metadata for " << ns + << " is inconsistent with current pending chunks" << causedBy(status.reason()) + << endl; return status; } - ChunkVersion remoteShardVersion; - ChunkVersion remoteCollVersion; - if ( remoteMetadata ) { - remoteShardVersion = remoteMetadata->getShardVersion(); - remoteCollVersion = remoteMetadata->getCollVersion(); - } - // - // Get ready to install loaded metadata if needed + // Compare the 'before', 'after', and 'remote' versions/epochs and choose newest + // Zero-epochs (sentinel value for "dropped" collections), are tested by + // !epoch.isSet(). // - CollectionMetadataPtr afterMetadata; - ChunkVersion afterShardVersion; - ChunkVersion afterCollVersion; - ChunkVersion::VersionChoice choice; - - // If we choose to install the new metadata, this describes the kind of install - enum InstallType { - InstallType_New, InstallType_Update, InstallType_Replace, InstallType_Drop, - InstallType_None - } installType = InstallType_None; // compiler complains otherwise - - { - // Exclusive collection lock needed since we're now potentially changing the metadata, - // and don't want reads/writes to be ongoing. - ScopedTransaction transaction(txn, MODE_IX); - Lock::DBLock dbLock(txn->lockState(), nsToDatabaseSubstring(ns), MODE_IX); - Lock::CollectionLock collLock(txn->lockState(), ns, MODE_X); - - // - // Get the metadata now that the load has completed - // - - stdx::lock_guard<stdx::mutex> lk( _mutex ); - - // Don't reload if our config server has changed or sharding is no longer enabled - if (!_enabled) { - string errMsg = str::stream() << "could not refresh metadata for " << ns - << ", sharding is no longer enabled"; - - warning() << errMsg; - return Status(ErrorCodes::NotYetInitialized, errMsg); + choice = ChunkVersion::chooseNewestVersion( + beforeCollVersion, afterCollVersion, remoteCollVersion); + + if (choice == ChunkVersion::VersionChoice_Remote) { + dassert(!remoteCollVersion.epoch().isSet() || remoteShardVersion >= beforeShardVersion); + + if (!afterCollVersion.epoch().isSet()) { + // First metadata load + installType = InstallType_New; + dassert(it == _collMetadata.end()); + _collMetadata.insert(make_pair(ns, remoteMetadata)); + } else if (remoteCollVersion.epoch().isSet() && + remoteCollVersion.epoch() == afterCollVersion.epoch()) { + // Update to existing metadata + installType = InstallType_Update; + + // Invariant: If CollMetadata was not found, version should be have been 0. + dassert(it != _collMetadata.end()); + it->second = remoteMetadata; + } else if (remoteCollVersion.epoch().isSet()) { + // New epoch detected, replacing metadata + installType = InstallType_Replace; + + // Invariant: If CollMetadata was not found, version should be have been 0. + dassert(it != _collMetadata.end()); + it->second = remoteMetadata; + } else { + dassert(!remoteCollVersion.epoch().isSet()); + + // Drop detected + installType = InstallType_Drop; + _collMetadata.erase(it); } - CollectionMetadataMap::iterator it = _collMetadata.find( ns ); - if ( it != _collMetadata.end() ) afterMetadata = it->second; + *latestShardVersion = remoteShardVersion; + } + } + // End _mutex + // End DBWrite + + // + // Do messaging based on what happened above + // + string localShardVersionMsg = beforeShardVersion.epoch() == afterShardVersion.epoch() + ? afterShardVersion.toString() + : beforeShardVersion.toString() + " / " + afterShardVersion.toString(); + + if (choice == ChunkVersion::VersionChoice_Unknown) { + string errMsg = str::stream() + << "need to retry loading metadata for " << ns + << ", collection may have been dropped or recreated during load" + << " (loaded shard version : " << remoteShardVersion.toString() + << ", stored shard versions : " << localShardVersionMsg << ", took " << refreshMillis + << "ms)"; + + warning() << errMsg; + return Status(ErrorCodes::RemoteChangeDetected, errMsg); + } - if ( afterMetadata ) { - afterShardVersion = afterMetadata->getShardVersion(); - afterCollVersion = afterMetadata->getCollVersion(); - } + if (choice == ChunkVersion::VersionChoice_Local) { + LOG(0) << "metadata of collection " << ns + << " already up to date (shard version : " << afterShardVersion.toString() + << ", took " << refreshMillis << "ms)" << endl; + return Status::OK(); + } - *latestShardVersion = afterShardVersion; + dassert(choice == ChunkVersion::VersionChoice_Remote); - // - // Resolve newer pending chunks with the remote metadata, finish construction - // + switch (installType) { + case InstallType_New: + LOG(0) << "collection " << ns << " was previously unsharded" + << ", new metadata loaded with shard version " << remoteShardVersion << endl; + break; + case InstallType_Update: + LOG(0) << "updating metadata for " << ns << " from shard version " + << localShardVersionMsg << " to shard version " << remoteShardVersion << endl; + break; + case InstallType_Replace: + LOG(0) << "replacing metadata for " << ns << " at shard version " + << localShardVersionMsg << " with a new epoch (shard version " + << remoteShardVersion << ")" << endl; + break; + case InstallType_Drop: + LOG(0) << "dropping metadata for " << ns << " at shard version " << localShardVersionMsg + << ", took " << refreshMillis << "ms" << endl; + break; + default: + verify(false); + break; + } - status = mdLoader.promotePendingChunks( afterMetadata.get(), remoteMetadataRaw ); + if (installType != InstallType_Drop) { + LOG(0) << "collection version was loaded at version " << remoteCollVersion << ", took " + << refreshMillis << "ms" << endl; + } - if ( !status.isOK() ) { + return Status::OK(); +} - warning() << "remote metadata for " << ns - << " is inconsistent with current pending chunks" - << causedBy( status.reason() ) << endl; +void ShardingState::appendInfo(BSONObjBuilder& builder) { + stdx::lock_guard<stdx::mutex> lk(_mutex); - return status; - } + builder.appendBool("enabled", _enabled); + if (!_enabled) { + return; + } - // - // Compare the 'before', 'after', and 'remote' versions/epochs and choose newest - // Zero-epochs (sentinel value for "dropped" collections), are tested by - // !epoch.isSet(). - // + builder.append("configServer", grid.catalogManager()->connectionString().toString()); + builder.append("shardName", _shardName); - choice = ChunkVersion::chooseNewestVersion( beforeCollVersion, - afterCollVersion, - remoteCollVersion ); + BSONObjBuilder versionB(builder.subobjStart("versions")); + for (CollectionMetadataMap::const_iterator it = _collMetadata.begin(); + it != _collMetadata.end(); + ++it) { + CollectionMetadataPtr metadata = it->second; + versionB.appendTimestamp(it->first, metadata->getShardVersion().toLong()); + } - if ( choice == ChunkVersion::VersionChoice_Remote ) { - dassert(!remoteCollVersion.epoch().isSet() || - remoteShardVersion >= beforeShardVersion); + versionB.done(); +} - if ( !afterCollVersion.epoch().isSet() ) { +bool ShardingState::needCollectionMetadata(Client* client, const string& ns) const { + if (!_enabled) + return false; - // First metadata load - installType = InstallType_New; - dassert( it == _collMetadata.end() ); - _collMetadata.insert( make_pair( ns, remoteMetadata ) ); - } - else if ( remoteCollVersion.epoch().isSet() && - remoteCollVersion.epoch() == afterCollVersion.epoch() ) { + if (!ShardedConnectionInfo::get(client, false)) + return false; - // Update to existing metadata - installType = InstallType_Update; + return true; +} - // Invariant: If CollMetadata was not found, version should be have been 0. - dassert( it != _collMetadata.end() ); - it->second = remoteMetadata; - } - else if ( remoteCollVersion.epoch().isSet() ) { +CollectionMetadataPtr ShardingState::getCollectionMetadata(const string& ns) { + stdx::lock_guard<stdx::mutex> lk(_mutex); - // New epoch detected, replacing metadata - installType = InstallType_Replace; + CollectionMetadataMap::const_iterator it = _collMetadata.find(ns); + if (it == _collMetadata.end()) { + return CollectionMetadataPtr(); + } else { + return it->second; + } +} - // Invariant: If CollMetadata was not found, version should be have been 0. - dassert( it != _collMetadata.end() ); - it->second = remoteMetadata; - } - else { - dassert( !remoteCollVersion.epoch().isSet() ); +ShardingState shardingState; - // Drop detected - installType = InstallType_Drop; - _collMetadata.erase( it ); - } +// -----ShardingState END ---- - *latestShardVersion = remoteShardVersion; - } - } - // End _mutex - // End DBWrite +// -----ShardedConnectionInfo START ---- - // - // Do messaging based on what happened above - // - string localShardVersionMsg = - beforeShardVersion.epoch() == afterShardVersion.epoch() ? - afterShardVersion.toString() : - beforeShardVersion.toString() + " / " + afterShardVersion.toString(); +ShardedConnectionInfo::ShardedConnectionInfo() { + _forceVersionOk = false; +} - if ( choice == ChunkVersion::VersionChoice_Unknown ) { +ShardedConnectionInfo* ShardedConnectionInfo::get(Client* client, bool create) { + auto& current = clientSCI(client); - string errMsg = str::stream() - << "need to retry loading metadata for " << ns - << ", collection may have been dropped or recreated during load" - << " (loaded shard version : " << remoteShardVersion.toString() - << ", stored shard versions : " << localShardVersionMsg - << ", took " << refreshMillis << "ms)"; + if (!current && create) { + LOG(1) << "entering shard mode for connection" << endl; + current = boost::in_place(); + } - warning() << errMsg; - return Status( ErrorCodes::RemoteChangeDetected, errMsg ); - } + return current ? ¤t.value() : nullptr; +} - if ( choice == ChunkVersion::VersionChoice_Local ) { +void ShardedConnectionInfo::reset(Client* client) { + clientSCI(client) = boost::none; +} - LOG( 0 ) << "metadata of collection " << ns << " already up to date (shard version : " - << afterShardVersion.toString() << ", took " << refreshMillis << "ms)" - << endl; - return Status::OK(); - } +const ChunkVersion ShardedConnectionInfo::getVersion(const string& ns) const { + NSVersionMap::const_iterator it = _versions.find(ns); + if (it != _versions.end()) { + return it->second; + } else { + return ChunkVersion(0, 0, OID()); + } +} - dassert( choice == ChunkVersion::VersionChoice_Remote ); +void ShardedConnectionInfo::setVersion(const string& ns, const ChunkVersion& version) { + _versions[ns] = version; +} - switch( installType ) { - case InstallType_New: - LOG( 0 ) << "collection " << ns << " was previously unsharded" - << ", new metadata loaded with shard version " << remoteShardVersion - << endl; - break; - case InstallType_Update: - LOG( 0 ) << "updating metadata for " << ns << " from shard version " - << localShardVersionMsg << " to shard version " << remoteShardVersion - << endl; - break; - case InstallType_Replace: - LOG( 0 ) << "replacing metadata for " << ns << " at shard version " - << localShardVersionMsg << " with a new epoch (shard version " - << remoteShardVersion << ")" << endl; - break; - case InstallType_Drop: - LOG( 0 ) << "dropping metadata for " << ns << " at shard version " - << localShardVersionMsg << ", took " << refreshMillis << "ms" << endl; - break; - default: - verify( false ); - break; - } +void ShardedConnectionInfo::addHook() { + static stdx::mutex lock; + static bool done = false; - if ( installType != InstallType_Drop ) { - LOG( 0 ) << "collection version was loaded at version " << remoteCollVersion - << ", took " << refreshMillis << "ms" << endl; - } + stdx::lock_guard<stdx::mutex> lk(lock); + if (!done) { + log() << "first cluster operation detected, adding sharding hook to enable versioning " + "and authentication to remote servers"; - return Status::OK(); + globalConnPool.addHook(new ShardingConnectionHook(false)); + shardConnectionPool.addHook(new ShardingConnectionHook(true)); + done = true; } +} - void ShardingState::appendInfo(BSONObjBuilder& builder) { - stdx::lock_guard<stdx::mutex> lk(_mutex); +class MongodShardCommand : public Command { +public: + MongodShardCommand(const char* n) : Command(n) {} + virtual bool slaveOk() const { + return false; + } + virtual bool adminOnly() const { + return true; + } +}; - builder.appendBool("enabled", _enabled); - if (!_enabled) { - return; - } - builder.append("configServer", grid.catalogManager()->connectionString().toString()); - builder.append("shardName", _shardName); +bool haveLocalShardingInfo(Client* client, const string& ns) { + if (!shardingState.enabled()) + return false; - BSONObjBuilder versionB(builder.subobjStart("versions")); - for (CollectionMetadataMap::const_iterator it = _collMetadata.begin(); - it != _collMetadata.end(); - ++it) { + if (!shardingState.hasVersion(ns)) + return false; - CollectionMetadataPtr metadata = it->second; - versionB.appendTimestamp(it->first, metadata->getShardVersion().toLong()); - } + return ShardedConnectionInfo::get(client, false) != NULL; +} - versionB.done(); - } +class UnsetShardingCommand : public MongodShardCommand { +public: + UnsetShardingCommand() : MongodShardCommand("unsetSharding") {} - bool ShardingState::needCollectionMetadata( Client* client, const string& ns ) const { - if ( ! _enabled ) - return false; + virtual void help(stringstream& help) const { + help << "internal"; + } - if ( ! ShardedConnectionInfo::get( client, false ) ) - return false; + virtual bool isWriteCommandForConfigServer() const { + return false; + } + virtual bool slaveOk() const { return true; } - CollectionMetadataPtr ShardingState::getCollectionMetadata( const string& ns ) { - stdx::lock_guard<stdx::mutex> lk( _mutex ); - - CollectionMetadataMap::const_iterator it = _collMetadata.find( ns ); - if ( it == _collMetadata.end() ) { - return CollectionMetadataPtr(); - } - else { - return it->second; - } + virtual void addRequiredPrivileges(const std::string& dbname, + const BSONObj& cmdObj, + std::vector<Privilege>* out) { + ActionSet actions; + actions.addAction(ActionType::internal); + out->push_back(Privilege(ResourcePattern::forClusterResource(), actions)); } - ShardingState shardingState; - - // -----ShardingState END ---- - - // -----ShardedConnectionInfo START ---- - - ShardedConnectionInfo::ShardedConnectionInfo() { - _forceVersionOk = false; + bool run(OperationContext* txn, + const string&, + BSONObj& cmdObj, + int, + string& errmsg, + BSONObjBuilder& result) { + ShardedConnectionInfo::reset(txn->getClient()); + return true; } - ShardedConnectionInfo* ShardedConnectionInfo::get( Client* client, bool create ) { - auto& current = clientSCI(client); +} unsetShardingCommand; - if (!current && create) { - LOG(1) << "entering shard mode for connection" << endl; - current = boost::in_place(); - } +class SetShardVersion : public MongodShardCommand { +public: + SetShardVersion() : MongodShardCommand("setShardVersion") {} - return current ? ¤t.value() : nullptr; + virtual void help(stringstream& help) const { + help << "internal"; } - void ShardedConnectionInfo::reset(Client* client) { - clientSCI(client) = boost::none; + virtual bool slaveOk() const { + return true; } - - const ChunkVersion ShardedConnectionInfo::getVersion( const string& ns ) const { - NSVersionMap::const_iterator it = _versions.find( ns ); - if ( it != _versions.end() ) { - return it->second; - } - else { - return ChunkVersion( 0, 0, OID() ); - } + virtual bool isWriteCommandForConfigServer() const { + return false; } - void ShardedConnectionInfo::setVersion( const string& ns , const ChunkVersion& version ) { - _versions[ns] = version; + virtual void addRequiredPrivileges(const std::string& dbname, + const BSONObj& cmdObj, + std::vector<Privilege>* out) { + ActionSet actions; + actions.addAction(ActionType::internal); + out->push_back(Privilege(ResourcePattern::forClusterResource(), actions)); } - void ShardedConnectionInfo::addHook() { - static stdx::mutex lock; - static bool done = false; + bool checkConfigOrInit(OperationContext* txn, + const string& configdb, + bool authoritative, + string& errmsg, + BSONObjBuilder& result, + bool locked = false) const { + if (configdb.size() == 0) { + errmsg = "no configdb"; + return false; + } - stdx::lock_guard<stdx::mutex> lk(lock); - if (!done) { - log() << "first cluster operation detected, adding sharding hook to enable versioning " - "and authentication to remote servers"; + if (shardingState.enabled()) { + if (configdb == shardingState.getConfigServer()) + return true; - globalConnPool.addHook(new ShardingConnectionHook(false)); - shardConnectionPool.addHook(new ShardingConnectionHook(true)); - done = true; - } - } + result.append("configdb", + BSON("stored" << shardingState.getConfigServer() << "given" << configdb)); - class MongodShardCommand : public Command { - public: - MongodShardCommand( const char * n ) : Command( n ) { - } - virtual bool slaveOk() const { + errmsg = str::stream() << "mongos specified a different config database string : " + << "stored : " << shardingState.getConfigServer() + << " vs given : " << configdb; return false; } - virtual bool adminOnly() const { - return true; - } - }; - - bool haveLocalShardingInfo( Client* client, const string& ns ) { - if ( ! shardingState.enabled() ) + if (!authoritative) { + result.appendBool("need_authoritative", true); + errmsg = "first setShardVersion"; return false; + } - if ( ! shardingState.hasVersion( ns ) ) - return false; + if (locked) { + ShardingState::initialize(configdb); + return true; + } - return ShardedConnectionInfo::get(client, false) != NULL; + ScopedTransaction transaction(txn, MODE_X); + Lock::GlobalWrite lk(txn->lockState()); + return checkConfigOrInit(txn, configdb, authoritative, errmsg, result, true); } - class UnsetShardingCommand : public MongodShardCommand { - public: - UnsetShardingCommand() : MongodShardCommand("unsetSharding") {} - - virtual void help( stringstream& help ) const { - help << "internal"; + bool run(OperationContext* txn, + const string&, + BSONObj& cmdObj, + int, + string& errmsg, + BSONObjBuilder& result) { + // Compatibility error for < v3.0 mongoses still active in the cluster + // TODO: Remove post-3.0 + if (!cmdObj["serverID"].eoo()) { + // This mongos is too old to talk to us + string errMsg = stream() << "v3.0 mongod is incompatible with v2.6 mongos, " + << "a v2.6 mongos may be running in the v3.0 cluster at " + << txn->getClient()->clientAddress(false); + error() << errMsg; + return appendCommandStatus(result, Status(ErrorCodes::ProtocolError, errMsg)); } - virtual bool isWriteCommandForConfigServer() const { return false; } + // Steps + // 1. check basic config + // 2. extract params from command + // 3. fast check + // 4. slow check (LOCKS) - virtual bool slaveOk() const { return true; } + // step 1 - virtual void addRequiredPrivileges(const std::string& dbname, - const BSONObj& cmdObj, - std::vector<Privilege>* out) { - ActionSet actions; - actions.addAction(ActionType::internal); - out->push_back(Privilege(ResourcePattern::forClusterResource(), actions)); + Client* client = txn->getClient(); + LastError::get(client).disable(); + ShardedConnectionInfo* info = ShardedConnectionInfo::get(client, true); + + bool authoritative = cmdObj.getBoolField("authoritative"); + + // check config server is ok or enable sharding + if (!checkConfigOrInit( + txn, cmdObj["configdb"].valuestrsafe(), authoritative, errmsg, result)) { + return false; } - bool run(OperationContext* txn, - const string&, - BSONObj& cmdObj, - int, - string& errmsg, - BSONObjBuilder& result) { - ShardedConnectionInfo::reset(txn->getClient()); - return true; + // check shard name is correct + if (cmdObj["shard"].type() == String) { + // The shard host is also sent when using setShardVersion, report this host if there + // is an error. + shardingState.gotShardNameAndHost(cmdObj["shard"].String(), cmdObj["shardHost"].str()); } - } unsetShardingCommand; + // Handle initial shard connection + if (cmdObj["version"].eoo() && cmdObj["init"].trueValue()) { + result.append("initialized", true); - class SetShardVersion : public MongodShardCommand { - public: - SetShardVersion() : MongodShardCommand("setShardVersion") {} + // Send back wire version to let mongos know what protocol we can speak + result.append("minWireVersion", minWireVersion); + result.append("maxWireVersion", maxWireVersion); - virtual void help( stringstream& help ) const { - help << "internal"; + return true; } - virtual bool slaveOk() const { return true; } - virtual bool isWriteCommandForConfigServer() const { return false; } - - virtual void addRequiredPrivileges(const std::string& dbname, - const BSONObj& cmdObj, - std::vector<Privilege>* out) { - ActionSet actions; - actions.addAction(ActionType::internal); - out->push_back(Privilege(ResourcePattern::forClusterResource(), actions)); + string ns = cmdObj["setShardVersion"].valuestrsafe(); + if (ns.size() == 0) { + errmsg = "need to specify namespace"; + return false; } - bool checkConfigOrInit(OperationContext* txn, - const string& configdb, - bool authoritative, - string& errmsg, - BSONObjBuilder& result, - bool locked = false ) const { - if ( configdb.size() == 0 ) { - errmsg = "no configdb"; - return false; - } - - if ( shardingState.enabled() ) { - if ( configdb == shardingState.getConfigServer() ) - return true; - - result.append( "configdb" , BSON( "stored" << shardingState.getConfigServer() << - "given" << configdb ) ); - - errmsg = str::stream() << "mongos specified a different config database string : " - << "stored : " << shardingState.getConfigServer() - << " vs given : " << configdb; - return false; - } - - if ( ! authoritative ) { - result.appendBool( "need_authoritative" , true ); - errmsg = "first setShardVersion"; - return false; - } - - if ( locked ) { - ShardingState::initialize(configdb); - return true; - } - ScopedTransaction transaction(txn, MODE_X); - Lock::GlobalWrite lk(txn->lockState()); - return checkConfigOrInit(txn, configdb, authoritative, errmsg, result, true); + // we can run on a slave up to here + if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesForDatabase( + nsToDatabase(ns))) { + result.append("errmsg", "not master"); + result.append("note", "from post init in setShardVersion"); + return false; } - bool run(OperationContext* txn, - const string&, - BSONObj& cmdObj, - int, - string& errmsg, - BSONObjBuilder& result) { - - // Compatibility error for < v3.0 mongoses still active in the cluster - // TODO: Remove post-3.0 - if (!cmdObj["serverID"].eoo()) { - - // This mongos is too old to talk to us - string errMsg = stream() << "v3.0 mongod is incompatible with v2.6 mongos, " - << "a v2.6 mongos may be running in the v3.0 cluster at " - << txn->getClient()->clientAddress(false); - error() << errMsg; - return appendCommandStatus(result, Status(ErrorCodes::ProtocolError, errMsg)); - } + // step 2 + if (!ChunkVersion::canParseBSON(cmdObj, "version")) { + errmsg = "need to specify version"; + return false; + } - // Steps - // 1. check basic config - // 2. extract params from command - // 3. fast check - // 4. slow check (LOCKS) - - // step 1 - - Client* client = txn->getClient(); - LastError::get(client).disable(); - ShardedConnectionInfo* info = ShardedConnectionInfo::get( client, true ); - - bool authoritative = cmdObj.getBoolField( "authoritative" ); - - // check config server is ok or enable sharding - if (!checkConfigOrInit( - txn, cmdObj["configdb"].valuestrsafe(), authoritative, errmsg, result)) { - return false; - } + const ChunkVersion version = ChunkVersion::fromBSON(cmdObj, "version"); - // check shard name is correct - if ( cmdObj["shard"].type() == String ) { - // The shard host is also sent when using setShardVersion, report this host if there - // is an error. - shardingState.gotShardNameAndHost( cmdObj["shard"].String(), - cmdObj["shardHost"].str() ); - } - - // Handle initial shard connection - if( cmdObj["version"].eoo() && cmdObj["init"].trueValue() ){ + // step 3 - result.append( "initialized", true ); + const ChunkVersion oldVersion = info->getVersion(ns); + const ChunkVersion globalVersion = shardingState.getVersion(ns); - // Send back wire version to let mongos know what protocol we can speak - result.append( "minWireVersion", minWireVersion ); - result.append( "maxWireVersion", maxWireVersion ); + oldVersion.addToBSON(result, "oldVersion"); - return true; + if (version.isWriteCompatibleWith(globalVersion)) { + // mongos and mongod agree! + if (!oldVersion.isWriteCompatibleWith(version)) { + if (oldVersion < globalVersion && oldVersion.hasEqualEpoch(globalVersion)) { + info->setVersion(ns, version); + } else if (authoritative) { + // this means there was a drop and our version is reset + info->setVersion(ns, version); + } else { + result.append("ns", ns); + result.appendBool("need_authoritative", true); + errmsg = "verifying drop on '" + ns + "'"; + return false; + } } - string ns = cmdObj["setShardVersion"].valuestrsafe(); - if ( ns.size() == 0 ) { - errmsg = "need to specify namespace"; - return false; - } + return true; + } + // step 4 + // Cases below all either return OR fall-through to remote metadata reload. + const bool isDropRequested = !version.isSet() && globalVersion.isSet(); - // we can run on a slave up to here - if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesForDatabase( - nsToDatabase(ns))) { - result.append( "errmsg" , "not master" ); - result.append( "note" , "from post init in setShardVersion" ); + if (isDropRequested) { + if (!authoritative) { + result.appendBool("need_authoritative", true); + result.append("ns", ns); + globalVersion.addToBSON(result, "globalVersion"); + errmsg = "dropping needs to be authoritative"; return false; } - // step 2 - if( ! ChunkVersion::canParseBSON( cmdObj, "version" ) ){ - errmsg = "need to specify version"; + // Fall through to metadata reload below + } else { + // Not Dropping + + // TODO: Refactor all of this + if (version < oldVersion && version.hasEqualEpoch(oldVersion)) { + errmsg = str::stream() << "this connection already had a newer version " + << "of collection '" << ns << "'"; + result.append("ns", ns); + version.addToBSON(result, "newVersion"); + globalVersion.addToBSON(result, "globalVersion"); return false; } - const ChunkVersion version = ChunkVersion::fromBSON( cmdObj, "version" ); - - // step 3 - - const ChunkVersion oldVersion = info->getVersion(ns); - const ChunkVersion globalVersion = shardingState.getVersion(ns); - - oldVersion.addToBSON( result, "oldVersion" ); - - if ( version.isWriteCompatibleWith( globalVersion )) { - // mongos and mongod agree! - if ( !oldVersion.isWriteCompatibleWith( version )) { - if ( oldVersion < globalVersion && - oldVersion.hasEqualEpoch( globalVersion )) { - info->setVersion( ns, version ); - } - else if ( authoritative ) { - // this means there was a drop and our version is reset - info->setVersion( ns, version ); - } - else { - result.append( "ns", ns ); - result.appendBool( "need_authoritative", true ); - errmsg = "verifying drop on '" + ns + "'"; - return false; - } - } - - return true; - } - - // step 4 - // Cases below all either return OR fall-through to remote metadata reload. - const bool isDropRequested = !version.isSet() && globalVersion.isSet(); - - if (isDropRequested) { - if ( ! authoritative ) { - result.appendBool( "need_authoritative" , true ); - result.append( "ns" , ns ); - globalVersion.addToBSON( result, "globalVersion" ); - errmsg = "dropping needs to be authoritative"; - return false; + // TODO: Refactor all of this + if (version < globalVersion && version.hasEqualEpoch(globalVersion)) { + while (shardingState.inCriticalMigrateSection()) { + log() << "waiting till out of critical section" << endl; + shardingState.waitTillNotInCriticalSection(10); } - - // Fall through to metadata reload below + errmsg = str::stream() << "shard global version for collection is higher " + << "than trying to set to '" << ns << "'"; + result.append("ns", ns); + version.addToBSON(result, "version"); + globalVersion.addToBSON(result, "globalVersion"); + result.appendBool("reloadConfig", true); + return false; } - else { - // Not Dropping - - // TODO: Refactor all of this - if ( version < oldVersion && version.hasEqualEpoch( oldVersion ) ) { - errmsg = str::stream() << "this connection already had a newer version " - << "of collection '" << ns << "'"; - result.append( "ns" , ns ); - version.addToBSON( result, "newVersion" ); - globalVersion.addToBSON( result, "globalVersion" ); - return false; - } - - // TODO: Refactor all of this - if ( version < globalVersion && version.hasEqualEpoch( globalVersion ) ) { - while ( shardingState.inCriticalMigrateSection() ) { - log() << "waiting till out of critical section" << endl; - shardingState.waitTillNotInCriticalSection( 10 ); - } - errmsg = str::stream() << "shard global version for collection is higher " - << "than trying to set to '" << ns << "'"; - result.append( "ns" , ns ); - version.addToBSON( result, "version" ); - globalVersion.addToBSON( result, "globalVersion" ); - result.appendBool( "reloadConfig" , true ); - return false; - } - if ( ! globalVersion.isSet() && ! authoritative ) { - // Needed b/c when the last chunk is moved off a shard, - // the version gets reset to zero, which should require a reload. - while ( shardingState.inCriticalMigrateSection() ) { - log() << "waiting till out of critical section" << endl; - shardingState.waitTillNotInCriticalSection( 10 ); - } - - // need authoritative for first look - result.append( "ns" , ns ); - result.appendBool( "need_authoritative" , true ); - errmsg = "first time for collection '" + ns + "'"; - return false; + if (!globalVersion.isSet() && !authoritative) { + // Needed b/c when the last chunk is moved off a shard, + // the version gets reset to zero, which should require a reload. + while (shardingState.inCriticalMigrateSection()) { + log() << "waiting till out of critical section" << endl; + shardingState.waitTillNotInCriticalSection(10); } - // Fall through to metadata reload below + // need authoritative for first look + result.append("ns", ns); + result.appendBool("need_authoritative", true); + errmsg = "first time for collection '" + ns + "'"; + return false; } - ChunkVersion currVersion; - Status status = shardingState.refreshMetadataIfNeeded(txn, ns, version, &currVersion); + // Fall through to metadata reload below + } - if (!status.isOK()) { + ChunkVersion currVersion; + Status status = shardingState.refreshMetadataIfNeeded(txn, ns, version, &currVersion); - // The reload itself was interrupted or confused here + if (!status.isOK()) { + // The reload itself was interrupted or confused here - errmsg = str::stream() << "could not refresh metadata for " << ns - << " with requested shard version " << version.toString() - << ", stored shard version is " << currVersion.toString() - << causedBy( status.reason() ); + errmsg = str::stream() << "could not refresh metadata for " << ns + << " with requested shard version " << version.toString() + << ", stored shard version is " << currVersion.toString() + << causedBy(status.reason()); - warning() << errmsg << endl; + warning() << errmsg << endl; - result.append( "ns" , ns ); - version.addToBSON( result, "version" ); - currVersion.addToBSON( result, "globalVersion" ); - result.appendBool( "reloadConfig", true ); + result.append("ns", ns); + version.addToBSON(result, "version"); + currVersion.addToBSON(result, "globalVersion"); + result.appendBool("reloadConfig", true); - return false; + return false; + } else if (!version.isWriteCompatibleWith(currVersion)) { + // We reloaded a version that doesn't match the version mongos was trying to + // set. + + errmsg = str::stream() << "requested shard version differs from" + << " config shard version for " << ns + << ", requested version is " << version.toString() + << " but found version " << currVersion.toString(); + + OCCASIONALLY warning() << errmsg << endl; + + // WARNING: the exact fields below are important for compatibility with mongos + // version reload. + + result.append("ns", ns); + currVersion.addToBSON(result, "globalVersion"); + + // If this was a reset of a collection or the last chunk moved out, inform mongos to + // do a full reload. + if (currVersion.epoch() != version.epoch() || !currVersion.isSet()) { + result.appendBool("reloadConfig", true); + // Zero-version also needed to trigger full mongos reload, sadly + // TODO: Make this saner, and less impactful (full reload on last chunk is bad) + ChunkVersion(0, 0, OID()).addToBSON(result, "version"); + // For debugging + version.addToBSON(result, "origVersion"); + } else { + version.addToBSON(result, "version"); } - else if ( !version.isWriteCompatibleWith( currVersion ) ) { - - // We reloaded a version that doesn't match the version mongos was trying to - // set. - errmsg = str::stream() << "requested shard version differs from" - << " config shard version for " << ns - << ", requested version is " << version.toString() - << " but found version " << currVersion.toString(); + return false; + } - OCCASIONALLY warning() << errmsg << endl; + info->setVersion(ns, version); + return true; + } - // WARNING: the exact fields below are important for compatibility with mongos - // version reload. +} setShardVersionCmd; - result.append( "ns" , ns ); - currVersion.addToBSON( result, "globalVersion" ); +class GetShardVersion : public MongodShardCommand { +public: + GetShardVersion() : MongodShardCommand("getShardVersion") {} - // If this was a reset of a collection or the last chunk moved out, inform mongos to - // do a full reload. - if (currVersion.epoch() != version.epoch() || !currVersion.isSet() ) { - result.appendBool( "reloadConfig", true ); - // Zero-version also needed to trigger full mongos reload, sadly - // TODO: Make this saner, and less impactful (full reload on last chunk is bad) - ChunkVersion( 0, 0, OID() ).addToBSON( result, "version" ); - // For debugging - version.addToBSON( result, "origVersion" ); - } - else { - version.addToBSON( result, "version" ); - } + virtual void help(stringstream& help) const { + help << " example: { getShardVersion : 'alleyinsider.foo' } "; + } - return false; - } + virtual bool isWriteCommandForConfigServer() const { + return false; + } - info->setVersion( ns , version ); - return true; + virtual Status checkAuthForCommand(ClientBasic* client, + const std::string& dbname, + const BSONObj& cmdObj) { + if (!AuthorizationSession::get(client)->isAuthorizedForActionsOnResource( + ResourcePattern::forExactNamespace(NamespaceString(parseNs(dbname, cmdObj))), + ActionType::getShardVersion)) { + return Status(ErrorCodes::Unauthorized, "Unauthorized"); } + return Status::OK(); + } + virtual std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const { + return parseNsFullyQualified(dbname, cmdObj); + } - } setShardVersionCmd; - - class GetShardVersion : public MongodShardCommand { - public: - GetShardVersion() : MongodShardCommand("getShardVersion") {} + bool run(OperationContext* txn, + const string&, + BSONObj& cmdObj, + int, + string& errmsg, + BSONObjBuilder& result) { + const string ns = cmdObj["getShardVersion"].valuestrsafe(); + if (ns.size() == 0) { + errmsg = "need to specify full namespace"; + return false; + } - virtual void help( stringstream& help ) const { - help << " example: { getShardVersion : 'alleyinsider.foo' } "; + if (shardingState.enabled()) { + result.append("configServer", shardingState.getConfigServer()); + } else { + result.append("configServer", ""); } - virtual bool isWriteCommandForConfigServer() const { return false; } + result.appendTimestamp("global", shardingState.getVersion(ns).toLong()); - virtual Status checkAuthForCommand(ClientBasic* client, - const std::string& dbname, - const BSONObj& cmdObj) { - if (!AuthorizationSession::get(client)->isAuthorizedForActionsOnResource( - ResourcePattern::forExactNamespace(NamespaceString(parseNs(dbname, cmdObj))), - ActionType::getShardVersion)) { - return Status(ErrorCodes::Unauthorized, "Unauthorized"); - } - return Status::OK(); - } - virtual std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const { - return parseNsFullyQualified(dbname, cmdObj); + ShardedConnectionInfo* const info = ShardedConnectionInfo::get(txn->getClient(), false); + result.appendBool("inShardedMode", info != NULL); + if (info) { + result.appendTimestamp("mine", info->getVersion(ns).toLong()); + } else { + result.appendTimestamp("mine", 0); } - bool run(OperationContext* txn, - const string&, - BSONObj& cmdObj, - int, - string& errmsg, - BSONObjBuilder& result) { - const string ns = cmdObj["getShardVersion"].valuestrsafe(); - if (ns.size() == 0) { - errmsg = "need to specify full namespace"; - return false; - } - - if (shardingState.enabled()) { - result.append("configServer", shardingState.getConfigServer()); - } - else { - result.append("configServer", ""); + if (cmdObj["fullMetadata"].trueValue()) { + CollectionMetadataPtr metadata = shardingState.getCollectionMetadata(ns); + if (metadata) { + result.append("metadata", metadata->toBSON()); + } else { + result.append("metadata", BSONObj()); } + } - result.appendTimestamp("global", shardingState.getVersion(ns).toLong()); + return true; + } - ShardedConnectionInfo* const info = ShardedConnectionInfo::get(txn->getClient(), false); - result.appendBool("inShardedMode", info != NULL); - if (info) { - result.appendTimestamp("mine", info->getVersion(ns).toLong()); - } - else { - result.appendTimestamp("mine", 0); - } +} getShardVersion; - if (cmdObj["fullMetadata"].trueValue()) { - CollectionMetadataPtr metadata = shardingState.getCollectionMetadata(ns); - if (metadata) { - result.append("metadata", metadata->toBSON()); - } - else { - result.append("metadata", BSONObj()); - } - } +class ShardingStateCmd : public MongodShardCommand { +public: + ShardingStateCmd() : MongodShardCommand("shardingState") {} - return true; - } + virtual bool isWriteCommandForConfigServer() const { + return true; + } - } getShardVersion; + virtual void addRequiredPrivileges(const std::string& dbname, + const BSONObj& cmdObj, + std::vector<Privilege>* out) { + ActionSet actions; + actions.addAction(ActionType::shardingState); + out->push_back(Privilege(ResourcePattern::forClusterResource(), actions)); + } - class ShardingStateCmd : public MongodShardCommand { - public: - ShardingStateCmd() : MongodShardCommand( "shardingState" ) {} + bool run(OperationContext* txn, + const string& dbname, + BSONObj& cmdObj, + int, + string& errmsg, + BSONObjBuilder& result) { + ScopedTransaction transaction(txn, MODE_IX); + Lock::DBLock dbXLock(txn->lockState(), dbname, MODE_X); + OldClientContext ctx(txn, dbname); + + shardingState.appendInfo(result); + return true; + } - virtual bool isWriteCommandForConfigServer() const { return true; } +} shardingStateCmd; - virtual void addRequiredPrivileges(const std::string& dbname, - const BSONObj& cmdObj, - std::vector<Privilege>* out) { - ActionSet actions; - actions.addAction(ActionType::shardingState); - out->push_back(Privilege(ResourcePattern::forClusterResource(), actions)); - } +/** + * @ return true if not in sharded mode + or if version for this client is ok + */ +static bool shardVersionOk(Client* client, + const string& ns, + string& errmsg, + ChunkVersion& received, + ChunkVersion& wanted) { + if (!shardingState.enabled()) + return true; - bool run(OperationContext* txn, - const string& dbname, - BSONObj& cmdObj, - int, - string& errmsg, - BSONObjBuilder& result) { - ScopedTransaction transaction(txn, MODE_IX); - Lock::DBLock dbXLock(txn->lockState(), dbname, MODE_X); - OldClientContext ctx(txn, dbname); - - shardingState.appendInfo( result ); - return true; - } + if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesForDatabase(nsToDatabase(ns))) { + // right now connections to secondaries aren't versioned at all + return true; + } - } shardingStateCmd; + ShardedConnectionInfo* info = ShardedConnectionInfo::get(client, false); - /** - * @ return true if not in sharded mode - or if version for this client is ok - */ - static bool shardVersionOk(Client* client, - const string& ns, - string& errmsg, - ChunkVersion& received, - ChunkVersion& wanted) { + if (!info) { + // this means the client has nothing sharded + // so this allows direct connections to do whatever they want + // which i think is the correct behavior + return true; + } - if ( ! shardingState.enabled() ) - return true; + if (info->inForceVersionOkMode()) { + return true; + } - if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesForDatabase( - nsToDatabase(ns))) { - // right now connections to secondaries aren't versioned at all - return true; - } + // TODO : all collections at some point, be sharded or not, will have a version + // (and a CollectionMetadata) + received = info->getVersion(ns); - ShardedConnectionInfo* info = ShardedConnectionInfo::get( client, false ); + if (ChunkVersion::isIgnoredVersion(received)) { + return true; + } - if ( ! info ) { - // this means the client has nothing sharded - // so this allows direct connections to do whatever they want - // which i think is the correct behavior - return true; - } + wanted = shardingState.getVersion(ns); - if ( info->inForceVersionOkMode() ) { - return true; - } + if (received.isWriteCompatibleWith(wanted)) + return true; - // TODO : all collections at some point, be sharded or not, will have a version - // (and a CollectionMetadata) - received = info->getVersion( ns ); + // + // Figure out exactly why not compatible, send appropriate error message + // The versions themselves are returned in the error, so not needed in messages here + // - if (ChunkVersion::isIgnoredVersion(received)) { - return true; - } + // Check epoch first, to send more meaningful message, since other parameters probably + // won't match either + if (!wanted.hasEqualEpoch(received)) { + errmsg = str::stream() << "version epoch mismatch detected for " << ns << ", " + << "the collection may have been dropped and recreated"; + return false; + } - wanted = shardingState.getVersion( ns ); + if (!wanted.isSet() && received.isSet()) { + errmsg = str::stream() << "this shard no longer contains chunks for " << ns << ", " + << "the collection may have been dropped"; + return false; + } - if( received.isWriteCompatibleWith( wanted ) ) return true; + if (wanted.isSet() && !received.isSet()) { + errmsg = str::stream() << "this shard contains versioned chunks for " << ns << ", " + << "but no version set in request"; + return false; + } + if (wanted.majorVersion() != received.majorVersion()) { // - // Figure out exactly why not compatible, send appropriate error message - // The versions themselves are returned in the error, so not needed in messages here + // Could be > or < - wanted is > if this is the source of a migration, + // wanted < if this is the target of a migration // - // Check epoch first, to send more meaningful message, since other parameters probably - // won't match either - if( ! wanted.hasEqualEpoch( received ) ){ - errmsg = str::stream() << "version epoch mismatch detected for " << ns << ", " - << "the collection may have been dropped and recreated"; - return false; - } - - if( ! wanted.isSet() && received.isSet() ){ - errmsg = str::stream() << "this shard no longer contains chunks for " << ns << ", " - << "the collection may have been dropped"; - return false; - } - - if( wanted.isSet() && ! received.isSet() ){ - errmsg = str::stream() << "this shard contains versioned chunks for " << ns << ", " - << "but no version set in request"; - return false; - } - - if( wanted.majorVersion() != received.majorVersion() ){ - - // - // Could be > or < - wanted is > if this is the source of a migration, - // wanted < if this is the target of a migration - // - - errmsg = str::stream() << "version mismatch detected for " << ns << ", " - << "stored major version " << wanted.majorVersion() - << " does not match received " << received.majorVersion(); - return false; - } - - // Those are all the reasons the versions can mismatch - verify( false ); - + errmsg = str::stream() << "version mismatch detected for " << ns << ", " + << "stored major version " << wanted.majorVersion() + << " does not match received " << received.majorVersion(); return false; - } - void ensureShardVersionOKOrThrow(Client* client, const std::string& ns) { - string errmsg; - ChunkVersion received; - ChunkVersion wanted; - if (!shardVersionOk(client, ns, errmsg, received, wanted)) { - StringBuilder sb; - sb << "[" << ns << "] shard version not ok: " << errmsg; - throw SendStaleConfigException(ns, sb.str(), received, wanted); - } - } + // Those are all the reasons the versions can mismatch + verify(false); - void usingAShardConnection( const string& addr ) { - } + return false; +} - void saveGLEStats(const BSONObj& result, StringData hostString) { - // Declared in cluster_last_error_info.h. - // - // This can be called in mongod, which is unfortunate. To fix this, - // we can redesign how connection pooling works on mongod for sharded operations. +void ensureShardVersionOKOrThrow(Client* client, const std::string& ns) { + string errmsg; + ChunkVersion received; + ChunkVersion wanted; + if (!shardVersionOk(client, ns, errmsg, received, wanted)) { + StringBuilder sb; + sb << "[" << ns << "] shard version not ok: " << errmsg; + throw SendStaleConfigException(ns, sb.str(), received, wanted); } +} + +void usingAShardConnection(const string& addr) {} +void saveGLEStats(const BSONObj& result, StringData hostString) { + // Declared in cluster_last_error_info.h. + // + // This can be called in mongod, which is unfortunate. To fix this, + // we can redesign how connection pooling works on mongod for sharded operations. +} } |