summaryrefslogtreecommitdiff
path: root/src/mongo/s/d_state.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/s/d_state.cpp')
-rw-r--r--src/mongo/s/d_state.cpp2145
1 files changed, 1049 insertions, 1096 deletions
diff --git a/src/mongo/s/d_state.cpp b/src/mongo/s/d_state.cpp
index e7341e65093..3c7739bc804 100644
--- a/src/mongo/s/d_state.cpp
+++ b/src/mongo/s/d_state.cpp
@@ -72,1390 +72,1343 @@
namespace mongo {
- using boost::optional;
- using std::endl;
- using std::string;
- using std::stringstream;
- using std::vector;
+using boost::optional;
+using std::endl;
+using std::string;
+using std::stringstream;
+using std::vector;
+
+namespace {
+const auto clientSCI = Client::declareDecoration<optional<ShardedConnectionInfo>>();
+} // namespace
+
+bool isMongos() {
+ return false;
+}
- namespace {
- const auto clientSCI = Client::declareDecoration<optional<ShardedConnectionInfo>>();
- } // namespace
- bool isMongos() {
- return false;
- }
+// -----ShardingState START ----
+ShardingState::ShardingState()
+ : _enabled(false),
+ _configServerTickets(3 /* max number of concurrent config server refresh threads */) {}
- // -----ShardingState START ----
+bool ShardingState::enabled() {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ return _enabled;
+}
- ShardingState::ShardingState()
- : _enabled(false),
- _configServerTickets( 3 /* max number of concurrent config server refresh threads */ ) {
- }
+string ShardingState::getConfigServer() {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ invariant(_enabled);
- bool ShardingState::enabled() {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- return _enabled;
- }
+ return grid.catalogManager()->connectionString().toString();
+}
- string ShardingState::getConfigServer() {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- invariant(_enabled);
+void ShardingState::initialize(const string& server) {
+ uassert(18509,
+ "Unable to obtain host name during sharding initialization.",
+ !getHostName().empty());
- return grid.catalogManager()->connectionString().toString();
- }
+ shardingState._initialize(server);
+}
- void ShardingState::initialize(const string& server) {
- uassert(18509,
- "Unable to obtain host name during sharding initialization.",
- !getHostName().empty());
+// TODO: Consolidate and eliminate these various ways of setting / validating shard names
+bool ShardingState::setShardName(const string& name) {
+ return setShardNameAndHost(name, "");
+}
- shardingState._initialize(server);
- }
+std::string ShardingState::getShardName() {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ return _shardName;
+}
- // TODO: Consolidate and eliminate these various ways of setting / validating shard names
- bool ShardingState::setShardName( const string& name ) {
- return setShardNameAndHost( name, "" );
- }
+bool ShardingState::setShardNameAndHost(const string& name, const string& host) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ if (_shardName.size() == 0) {
+ // TODO SERVER-2299 remotely verify the name is sound w.r.t IPs
+ _shardName = name;
- std::string ShardingState::getShardName() {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- return _shardName;
- }
+ string clientAddr = cc().clientAddress(true);
- bool ShardingState::setShardNameAndHost( const string& name, const string& host ) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- if ( _shardName.size() == 0 ) {
- // TODO SERVER-2299 remotely verify the name is sound w.r.t IPs
- _shardName = name;
+ log() << "remote client " << clientAddr << " initialized this host "
+ << (host.empty() ? string("") : string("(") + host + ") ") << "as shard " << name;
- string clientAddr = cc().clientAddress(true);
+ return true;
+ }
- log() << "remote client " << clientAddr << " initialized this host "
- << ( host.empty() ? string( "" ) : string( "(" ) + host + ") " )
- << "as shard " << name;
+ if (_shardName == name)
+ return true;
- return true;
- }
+ string clientAddr = cc().clientAddress(true);
- if ( _shardName == name )
- return true;
+ warning() << "remote client " << clientAddr << " tried to initialize this host "
+ << (host.empty() ? string("") : string("(") + host + ") ") << "as shard " << name
+ << ", but shard name was previously initialized as " << _shardName;
- string clientAddr = cc().clientAddress(true);
+ return false;
+}
- warning() << "remote client " << clientAddr << " tried to initialize this host "
- << ( host.empty() ? string( "" ) : string( "(" ) + host + ") " )
- << "as shard " << name
- << ", but shard name was previously initialized as " << _shardName;
+void ShardingState::gotShardName(const string& name) {
+ gotShardNameAndHost(name, "");
+}
- return false;
- }
+void ShardingState::gotShardNameAndHost(const string& name, const string& host) {
+ if (setShardNameAndHost(name, host))
+ return;
- void ShardingState::gotShardName( const string& name ) {
- gotShardNameAndHost( name, "" );
- }
+ string clientAddr = cc().clientAddress(true);
+ stringstream ss;
- void ShardingState::gotShardNameAndHost( const string& name, const string& host ) {
- if ( setShardNameAndHost( name, host ) )
- return;
+ // Same error as above, to match for reporting
+ ss << "remote client " << clientAddr << " tried to initialize this host "
+ << (host.empty() ? string("") : string("(") + host + ") ") << "as shard " << name
+ << ", but shard name was previously initialized as " << _shardName;
- string clientAddr = cc().clientAddress(true);
- stringstream ss;
+ msgasserted(13298, ss.str());
+}
- // Same error as above, to match for reporting
- ss << "remote client " << clientAddr << " tried to initialize this host "
- << ( host.empty() ? string( "" ) : string( "(" ) + host + ") " )
- << "as shard " << name
- << ", but shard name was previously initialized as " << _shardName;
+void ShardingState::clearCollectionMetadata() {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _collMetadata.clear();
+}
- msgasserted( 13298 , ss.str() );
- }
+// TODO we shouldn't need three ways for checking the version. Fix this.
+bool ShardingState::hasVersion(const string& ns) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
- void ShardingState::clearCollectionMetadata() {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- _collMetadata.clear();
- }
+ CollectionMetadataMap::const_iterator it = _collMetadata.find(ns);
+ return it != _collMetadata.end();
+}
- // TODO we shouldn't need three ways for checking the version. Fix this.
- bool ShardingState::hasVersion( const string& ns ) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+bool ShardingState::hasVersion(const string& ns, ChunkVersion& version) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
- CollectionMetadataMap::const_iterator it = _collMetadata.find(ns);
- return it != _collMetadata.end();
- }
+ CollectionMetadataMap::const_iterator it = _collMetadata.find(ns);
+ if (it == _collMetadata.end())
+ return false;
- bool ShardingState::hasVersion( const string& ns , ChunkVersion& version ) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ CollectionMetadataPtr p = it->second;
+ version = p->getShardVersion();
+ return true;
+}
- CollectionMetadataMap::const_iterator it = _collMetadata.find(ns);
- if ( it == _collMetadata.end() )
- return false;
+ChunkVersion ShardingState::getVersion(const string& ns) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ CollectionMetadataMap::const_iterator it = _collMetadata.find(ns);
+ if (it != _collMetadata.end()) {
CollectionMetadataPtr p = it->second;
- version = p->getShardVersion();
- return true;
+ return p->getShardVersion();
+ } else {
+ return ChunkVersion(0, 0, OID());
}
+}
- ChunkVersion ShardingState::getVersion(const string& ns) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
-
- CollectionMetadataMap::const_iterator it = _collMetadata.find( ns );
- if ( it != _collMetadata.end() ) {
- CollectionMetadataPtr p = it->second;
- return p->getShardVersion();
- }
- else {
- return ChunkVersion( 0, 0, OID() );
- }
- }
+void ShardingState::donateChunk(OperationContext* txn,
+ const string& ns,
+ const BSONObj& min,
+ const BSONObj& max,
+ ChunkVersion version) {
+ invariant(txn->lockState()->isCollectionLockedForMode(ns, MODE_X));
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+
+ CollectionMetadataMap::const_iterator it = _collMetadata.find(ns);
+ verify(it != _collMetadata.end());
+ CollectionMetadataPtr p = it->second;
+
+ // empty shards should have version 0
+ version = (p->getNumChunks() > 1) ? version : ChunkVersion(0, 0, p->getCollVersion().epoch());
+
+ ChunkType chunk;
+ chunk.setMin(min);
+ chunk.setMax(max);
+ string errMsg;
+
+ CollectionMetadataPtr cloned(p->cloneMigrate(chunk, version, &errMsg));
+ // uassert to match old behavior, TODO: report errors w/o throwing
+ uassert(16855, errMsg, NULL != cloned.get());
+
+ // TODO: a bit dangerous to have two different zero-version states - no-metadata and
+ // no-version
+ _collMetadata[ns] = cloned;
+}
- void ShardingState::donateChunk(OperationContext* txn,
+void ShardingState::undoDonateChunk(OperationContext* txn,
const string& ns,
- const BSONObj& min,
- const BSONObj& max,
- ChunkVersion version) {
+ CollectionMetadataPtr prevMetadata) {
+ invariant(txn->lockState()->isCollectionLockedForMode(ns, MODE_X));
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
- invariant(txn->lockState()->isCollectionLockedForMode(ns, MODE_X));
- stdx::lock_guard<stdx::mutex> lk( _mutex );
-
- CollectionMetadataMap::const_iterator it = _collMetadata.find( ns );
- verify( it != _collMetadata.end() ) ;
- CollectionMetadataPtr p = it->second;
+ log() << "ShardingState::undoDonateChunk acquired _mutex" << endl;
- // empty shards should have version 0
- version =
- ( p->getNumChunks() > 1 ) ?
- version : ChunkVersion( 0, 0, p->getCollVersion().epoch() );
-
- ChunkType chunk;
- chunk.setMin( min );
- chunk.setMax( max );
- string errMsg;
+ CollectionMetadataMap::iterator it = _collMetadata.find(ns);
+ verify(it != _collMetadata.end());
+ it->second = prevMetadata;
+}
- CollectionMetadataPtr cloned( p->cloneMigrate( chunk, version, &errMsg ) );
- // uassert to match old behavior, TODO: report errors w/o throwing
- uassert( 16855, errMsg, NULL != cloned.get() );
+bool ShardingState::notePending(OperationContext* txn,
+ const string& ns,
+ const BSONObj& min,
+ const BSONObj& max,
+ const OID& epoch,
+ string* errMsg) {
+ invariant(txn->lockState()->isCollectionLockedForMode(ns, MODE_X));
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+
+ CollectionMetadataMap::const_iterator it = _collMetadata.find(ns);
+ if (it == _collMetadata.end()) {
+ *errMsg = str::stream() << "could not note chunk "
+ << "[" << min << "," << max << ")"
+ << " as pending because the local metadata for " << ns
+ << " has changed";
- // TODO: a bit dangerous to have two different zero-version states - no-metadata and
- // no-version
- _collMetadata[ns] = cloned;
+ return false;
}
- void ShardingState::undoDonateChunk(OperationContext* txn,
- const string& ns,
- CollectionMetadataPtr prevMetadata) {
-
- invariant(txn->lockState()->isCollectionLockedForMode(ns, MODE_X));
- stdx::lock_guard<stdx::mutex> lk( _mutex );
+ CollectionMetadataPtr metadata = it->second;
- log() << "ShardingState::undoDonateChunk acquired _mutex" << endl;
+ // This can currently happen because drops aren't synchronized with in-migrations
+ // The idea for checking this here is that in the future we shouldn't have this problem
+ if (metadata->getCollVersion().epoch() != epoch) {
+ *errMsg = str::stream() << "could not note chunk "
+ << "[" << min << "," << max << ")"
+ << " as pending because the epoch for " << ns
+ << " has changed from " << epoch << " to "
+ << metadata->getCollVersion().epoch();
- CollectionMetadataMap::iterator it = _collMetadata.find( ns );
- verify( it != _collMetadata.end() );
- it->second = prevMetadata;
+ return false;
}
- bool ShardingState::notePending(OperationContext* txn,
- const string& ns,
- const BSONObj& min,
- const BSONObj& max,
- const OID& epoch,
- string* errMsg ) {
+ ChunkType chunk;
+ chunk.setMin(min);
+ chunk.setMax(max);
- invariant(txn->lockState()->isCollectionLockedForMode(ns, MODE_X));
- stdx::lock_guard<stdx::mutex> lk( _mutex );
+ CollectionMetadataPtr cloned(metadata->clonePlusPending(chunk, errMsg));
+ if (!cloned)
+ return false;
- CollectionMetadataMap::const_iterator it = _collMetadata.find( ns );
- if ( it == _collMetadata.end() ) {
+ _collMetadata[ns] = cloned;
+ return true;
+}
- *errMsg = str::stream() << "could not note chunk " << "[" << min << "," << max << ")"
- << " as pending because the local metadata for " << ns
- << " has changed";
+bool ShardingState::forgetPending(OperationContext* txn,
+ const string& ns,
+ const BSONObj& min,
+ const BSONObj& max,
+ const OID& epoch,
+ string* errMsg) {
+ invariant(txn->lockState()->isCollectionLockedForMode(ns, MODE_X));
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+
+ CollectionMetadataMap::const_iterator it = _collMetadata.find(ns);
+ if (it == _collMetadata.end()) {
+ *errMsg = str::stream() << "no need to forget pending chunk "
+ << "[" << min << "," << max << ")"
+ << " because the local metadata for " << ns << " has changed";
- return false;
- }
+ return false;
+ }
- CollectionMetadataPtr metadata = it->second;
+ CollectionMetadataPtr metadata = it->second;
- // This can currently happen because drops aren't synchronized with in-migrations
- // The idea for checking this here is that in the future we shouldn't have this problem
- if ( metadata->getCollVersion().epoch() != epoch ) {
+ // This can currently happen because drops aren't synchronized with in-migrations
+ // The idea for checking this here is that in the future we shouldn't have this problem
+ if (metadata->getCollVersion().epoch() != epoch) {
+ *errMsg = str::stream() << "no need to forget pending chunk "
+ << "[" << min << "," << max << ")"
+ << " because the epoch for " << ns << " has changed from " << epoch
+ << " to " << metadata->getCollVersion().epoch();
- *errMsg = str::stream() << "could not note chunk " << "[" << min << "," << max << ")"
- << " as pending because the epoch for " << ns
- << " has changed from "
- << epoch << " to " << metadata->getCollVersion().epoch();
+ return false;
+ }
- return false;
- }
+ ChunkType chunk;
+ chunk.setMin(min);
+ chunk.setMax(max);
- ChunkType chunk;
- chunk.setMin( min );
- chunk.setMax( max );
+ CollectionMetadataPtr cloned(metadata->cloneMinusPending(chunk, errMsg));
+ if (!cloned)
+ return false;
- CollectionMetadataPtr cloned( metadata->clonePlusPending( chunk, errMsg ) );
- if ( !cloned ) return false;
+ _collMetadata[ns] = cloned;
+ return true;
+}
- _collMetadata[ns] = cloned;
- return true;
- }
+void ShardingState::splitChunk(OperationContext* txn,
+ const string& ns,
+ const BSONObj& min,
+ const BSONObj& max,
+ const vector<BSONObj>& splitKeys,
+ ChunkVersion version) {
+ invariant(txn->lockState()->isCollectionLockedForMode(ns, MODE_X));
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+
+ CollectionMetadataMap::const_iterator it = _collMetadata.find(ns);
+ verify(it != _collMetadata.end());
+
+ ChunkType chunk;
+ chunk.setMin(min);
+ chunk.setMax(max);
+ string errMsg;
+
+ CollectionMetadataPtr cloned(it->second->cloneSplit(chunk, splitKeys, version, &errMsg));
+ // uassert to match old behavior, TODO: report errors w/o throwing
+ uassert(16857, errMsg, NULL != cloned.get());
+
+ _collMetadata[ns] = cloned;
+}
- bool ShardingState::forgetPending(OperationContext* txn,
- const string& ns,
- const BSONObj& min,
- const BSONObj& max,
- const OID& epoch,
- string* errMsg ) {
+void ShardingState::mergeChunks(OperationContext* txn,
+ const string& ns,
+ const BSONObj& minKey,
+ const BSONObj& maxKey,
+ ChunkVersion mergedVersion) {
+ invariant(txn->lockState()->isCollectionLockedForMode(ns, MODE_X));
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
- invariant(txn->lockState()->isCollectionLockedForMode(ns, MODE_X));
- stdx::lock_guard<stdx::mutex> lk( _mutex );
+ CollectionMetadataMap::const_iterator it = _collMetadata.find(ns);
+ verify(it != _collMetadata.end());
- CollectionMetadataMap::const_iterator it = _collMetadata.find( ns );
- if ( it == _collMetadata.end() ) {
+ string errMsg;
- *errMsg = str::stream() << "no need to forget pending chunk "
- << "[" << min << "," << max << ")"
- << " because the local metadata for " << ns << " has changed";
+ CollectionMetadataPtr cloned(it->second->cloneMerge(minKey, maxKey, mergedVersion, &errMsg));
+ // uassert to match old behavior, TODO: report errors w/o throwing
+ uassert(17004, errMsg, NULL != cloned.get());
- return false;
- }
+ _collMetadata[ns] = cloned;
+}
- CollectionMetadataPtr metadata = it->second;
+void ShardingState::resetMetadata(const string& ns) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
- // This can currently happen because drops aren't synchronized with in-migrations
- // The idea for checking this here is that in the future we shouldn't have this problem
- if ( metadata->getCollVersion().epoch() != epoch ) {
+ warning() << "resetting metadata for " << ns << ", this should only be used in testing" << endl;
- *errMsg = str::stream() << "no need to forget pending chunk "
- << "[" << min << "," << max << ")"
- << " because the epoch for " << ns << " has changed from "
- << epoch << " to " << metadata->getCollVersion().epoch();
+ _collMetadata.erase(ns);
+}
- return false;
- }
+Status ShardingState::refreshMetadataIfNeeded(OperationContext* txn,
+ const string& ns,
+ const ChunkVersion& reqShardVersion,
+ ChunkVersion* latestShardVersion) {
+ // The _configServerTickets serializes this process such that only a small number of threads
+ // can try to refresh at the same time.
- ChunkType chunk;
- chunk.setMin( min );
- chunk.setMax( max );
+ LOG(2) << "metadata refresh requested for " << ns << " at shard version " << reqShardVersion
+ << endl;
- CollectionMetadataPtr cloned( metadata->cloneMinusPending( chunk, errMsg ) );
- if ( !cloned ) return false;
+ //
+ // Queuing of refresh requests starts here when remote reload is needed. This may take time.
+ // TODO: Explicitly expose the queuing discipline.
+ //
- _collMetadata[ns] = cloned;
- return true;
- }
+ _configServerTickets.waitForTicket();
+ TicketHolderReleaser needTicketFrom(&_configServerTickets);
- void ShardingState::splitChunk(OperationContext* txn,
- const string& ns,
- const BSONObj& min,
- const BSONObj& max,
- const vector<BSONObj>& splitKeys,
- ChunkVersion version ) {
+ //
+ // Fast path - check if the requested version is at a higher version than the current
+ // metadata version or a different epoch before verifying against config server.
+ //
- invariant(txn->lockState()->isCollectionLockedForMode(ns, MODE_X));
- stdx::lock_guard<stdx::mutex> lk( _mutex );
+ CollectionMetadataPtr storedMetadata;
+ {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ CollectionMetadataMap::iterator it = _collMetadata.find(ns);
+ if (it != _collMetadata.end())
+ storedMetadata = it->second;
+ }
+ ChunkVersion storedShardVersion;
+ if (storedMetadata)
+ storedShardVersion = storedMetadata->getShardVersion();
+ *latestShardVersion = storedShardVersion;
+
+ if (storedShardVersion >= reqShardVersion &&
+ storedShardVersion.epoch() == reqShardVersion.epoch()) {
+ // Don't need to remotely reload if we're in the same epoch with a >= version
+ return Status::OK();
+ }
+
+ //
+ // Slow path - remotely reload
+ //
+ // Cases:
+ // A) Initial config load and/or secondary take-over.
+ // B) Migration TO this shard finished, notified by mongos.
+ // C) Dropping a collection, notified (currently) by mongos.
+ // D) Stale client wants to reload metadata with a different *epoch*, so we aren't sure.
+
+ if (storedShardVersion.epoch() != reqShardVersion.epoch()) {
+ // Need to remotely reload if our epochs aren't the same, to verify
+ LOG(1) << "metadata change requested for " << ns << ", from shard version "
+ << storedShardVersion << " to " << reqShardVersion
+ << ", need to verify with config server" << endl;
+ } else {
+ // Need to remotely reload since our epochs aren't the same but our version is greater
+ LOG(1) << "metadata version update requested for " << ns << ", from shard version "
+ << storedShardVersion << " to " << reqShardVersion
+ << ", need to verify with config server" << endl;
+ }
- CollectionMetadataMap::const_iterator it = _collMetadata.find( ns );
- verify( it != _collMetadata.end() ) ;
+ return doRefreshMetadata(txn, ns, reqShardVersion, true, latestShardVersion);
+}
- ChunkType chunk;
- chunk.setMin( min );
- chunk.setMax( max );
- string errMsg;
+Status ShardingState::refreshMetadataNow(OperationContext* txn,
+ const string& ns,
+ ChunkVersion* latestShardVersion) {
+ return doRefreshMetadata(txn, ns, ChunkVersion(0, 0, OID()), false, latestShardVersion);
+}
- CollectionMetadataPtr cloned( it->second->cloneSplit( chunk, splitKeys, version, &errMsg ) );
- // uassert to match old behavior, TODO: report errors w/o throwing
- uassert( 16857, errMsg, NULL != cloned.get() );
+void ShardingState::_initialize(const string& server) {
+ // Ensure only one caller at a time initializes
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
- _collMetadata[ns] = cloned;
+ if (_enabled) {
+ // TODO: Do we need to throw exception if the config servers have changed from what we
+ // already have in place? How do we test for that?
+ return;
}
- void ShardingState::mergeChunks(OperationContext* txn,
- const string& ns,
- const BSONObj& minKey,
- const BSONObj& maxKey,
- ChunkVersion mergedVersion ) {
+ ShardedConnectionInfo::addHook();
- invariant(txn->lockState()->isCollectionLockedForMode(ns, MODE_X));
- stdx::lock_guard<stdx::mutex> lk( _mutex );
+ std::string errmsg;
+ ConnectionString configServerCS = ConnectionString::parse(server, errmsg);
+ uassert(28633,
+ str::stream() << "Invalid config server connection string: " << errmsg,
+ configServerCS.isValid());
- CollectionMetadataMap::const_iterator it = _collMetadata.find( ns );
- verify( it != _collMetadata.end() );
+ auto catalogManager = stdx::make_unique<CatalogManagerLegacy>();
+ uassertStatusOK(catalogManager->init(configServerCS));
- string errMsg;
+ auto shardRegistry =
+ stdx::make_unique<ShardRegistry>(stdx::make_unique<RemoteCommandTargeterFactoryImpl>(),
+ stdx::make_unique<RemoteCommandRunnerImpl>(0),
+ std::unique_ptr<executor::TaskExecutor>{nullptr},
+ catalogManager.get());
- CollectionMetadataPtr cloned( it->second->cloneMerge( minKey,
- maxKey,
- mergedVersion,
- &errMsg ) );
- // uassert to match old behavior, TODO: report errors w/o throwing
- uassert( 17004, errMsg, NULL != cloned.get() );
+ grid.init(std::move(catalogManager), std::move(shardRegistry));
- _collMetadata[ns] = cloned;
- }
+ _enabled = true;
+}
- void ShardingState::resetMetadata( const string& ns ) {
- stdx::lock_guard<stdx::mutex> lk( _mutex );
+Status ShardingState::doRefreshMetadata(OperationContext* txn,
+ const string& ns,
+ const ChunkVersion& reqShardVersion,
+ bool useRequestedVersion,
+ ChunkVersion* latestShardVersion) {
+ // The idea here is that we're going to reload the metadata from the config server, but
+ // we need to do so outside any locks. When we get our result back, if the current metadata
+ // has changed, we may not be able to install the new metadata.
- warning() << "resetting metadata for " << ns << ", this should only be used in testing"
- << endl;
+ //
+ // Get the initial metadata
+ // No DBLock is needed since the metadata is expected to change during reload.
+ //
- _collMetadata.erase( ns );
- }
+ CollectionMetadataPtr beforeMetadata;
- Status ShardingState::refreshMetadataIfNeeded( OperationContext* txn,
- const string& ns,
- const ChunkVersion& reqShardVersion,
- ChunkVersion* latestShardVersion )
{
- // The _configServerTickets serializes this process such that only a small number of threads
- // can try to refresh at the same time.
-
- LOG( 2 ) << "metadata refresh requested for " << ns << " at shard version "
- << reqShardVersion << endl;
-
- //
- // Queuing of refresh requests starts here when remote reload is needed. This may take time.
- // TODO: Explicitly expose the queuing discipline.
- //
-
- _configServerTickets.waitForTicket();
- TicketHolderReleaser needTicketFrom( &_configServerTickets );
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
- //
- // Fast path - check if the requested version is at a higher version than the current
- // metadata version or a different epoch before verifying against config server.
- //
+ // We can't reload if sharding is not enabled - i.e. without a config server location
+ if (!_enabled) {
+ string errMsg = str::stream() << "cannot refresh metadata for " << ns
+ << " before sharding has been enabled";
- CollectionMetadataPtr storedMetadata;
- {
- stdx::lock_guard<stdx::mutex> lk( _mutex );
- CollectionMetadataMap::iterator it = _collMetadata.find( ns );
- if ( it != _collMetadata.end() ) storedMetadata = it->second;
+ warning() << errMsg;
+ return Status(ErrorCodes::NotYetInitialized, errMsg);
}
- ChunkVersion storedShardVersion;
- if ( storedMetadata ) storedShardVersion = storedMetadata->getShardVersion();
- *latestShardVersion = storedShardVersion;
- if ( storedShardVersion >= reqShardVersion &&
- storedShardVersion.epoch() == reqShardVersion.epoch() ) {
+ // We also can't reload if a shard name has not yet been set.
+ if (_shardName.empty()) {
+ string errMsg = str::stream() << "cannot refresh metadata for " << ns
+ << " before shard name has been set";
- // Don't need to remotely reload if we're in the same epoch with a >= version
- return Status::OK();
+ warning() << errMsg;
+ return Status(ErrorCodes::NotYetInitialized, errMsg);
}
- //
- // Slow path - remotely reload
- //
- // Cases:
- // A) Initial config load and/or secondary take-over.
- // B) Migration TO this shard finished, notified by mongos.
- // C) Dropping a collection, notified (currently) by mongos.
- // D) Stale client wants to reload metadata with a different *epoch*, so we aren't sure.
-
- if ( storedShardVersion.epoch() != reqShardVersion.epoch() ) {
- // Need to remotely reload if our epochs aren't the same, to verify
- LOG( 1 ) << "metadata change requested for " << ns << ", from shard version "
- << storedShardVersion << " to " << reqShardVersion
- << ", need to verify with config server" << endl;
- }
- else {
- // Need to remotely reload since our epochs aren't the same but our version is greater
- LOG( 1 ) << "metadata version update requested for " << ns
- << ", from shard version " << storedShardVersion << " to " << reqShardVersion
- << ", need to verify with config server" << endl;
+ CollectionMetadataMap::iterator it = _collMetadata.find(ns);
+ if (it != _collMetadata.end()) {
+ beforeMetadata = it->second;
}
-
- return doRefreshMetadata(txn, ns, reqShardVersion, true, latestShardVersion);
}
- Status ShardingState::refreshMetadataNow(OperationContext* txn,
- const string& ns,
- ChunkVersion* latestShardVersion) {
- return doRefreshMetadata(txn, ns, ChunkVersion(0, 0, OID()), false, latestShardVersion);
+ ChunkVersion beforeShardVersion;
+ ChunkVersion beforeCollVersion;
+ if (beforeMetadata) {
+ beforeShardVersion = beforeMetadata->getShardVersion();
+ beforeCollVersion = beforeMetadata->getCollVersion();
}
- void ShardingState::_initialize(const string& server) {
- // Ensure only one caller at a time initializes
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ *latestShardVersion = beforeShardVersion;
- if (_enabled) {
- // TODO: Do we need to throw exception if the config servers have changed from what we
- // already have in place? How do we test for that?
- return;
- }
+ //
+ // Determine whether we need to diff or fully reload
+ //
- ShardedConnectionInfo::addHook();
+ bool fullReload = false;
+ if (!beforeMetadata) {
+ // We don't have any metadata to reload from
+ fullReload = true;
+ } else if (useRequestedVersion && reqShardVersion.epoch() != beforeShardVersion.epoch()) {
+ // It's not useful to use the metadata as a base because we think the epoch will differ
+ fullReload = true;
+ }
+
+ //
+ // Load the metadata from the remote server, start construction
+ //
+
+ LOG(0) << "remotely refreshing metadata for " << ns
+ << (useRequestedVersion
+ ? string(" with requested shard version ") + reqShardVersion.toString()
+ : "")
+ << (fullReload ? ", current shard version is " : " based on current shard version ")
+ << beforeShardVersion << ", current metadata version is " << beforeCollVersion << endl;
+
+ string errMsg;
+
+ MetadataLoader mdLoader;
+ CollectionMetadata* remoteMetadataRaw = new CollectionMetadata();
+ CollectionMetadataPtr remoteMetadata(remoteMetadataRaw);
+
+ Timer refreshTimer;
+ Status status = mdLoader.makeCollectionMetadata(grid.catalogManager(),
+ ns,
+ getShardName(),
+ fullReload ? NULL : beforeMetadata.get(),
+ remoteMetadataRaw);
+ long long refreshMillis = refreshTimer.millis();
+
+ if (status.code() == ErrorCodes::NamespaceNotFound) {
+ remoteMetadata.reset();
+ remoteMetadataRaw = NULL;
+ } else if (!status.isOK()) {
+ warning() << "could not remotely refresh metadata for " << ns << causedBy(status.reason())
+ << endl;
- std::string errmsg;
- ConnectionString configServerCS = ConnectionString::parse(server, errmsg);
- uassert(28633,
- str::stream() << "Invalid config server connection string: " << errmsg,
- configServerCS.isValid());
+ return status;
+ }
- auto catalogManager = stdx::make_unique<CatalogManagerLegacy>();
- uassertStatusOK(catalogManager->init(configServerCS));
+ ChunkVersion remoteShardVersion;
+ ChunkVersion remoteCollVersion;
+ if (remoteMetadata) {
+ remoteShardVersion = remoteMetadata->getShardVersion();
+ remoteCollVersion = remoteMetadata->getCollVersion();
+ }
- auto shardRegistry = stdx::make_unique<ShardRegistry>(
- stdx::make_unique<RemoteCommandTargeterFactoryImpl>(),
- stdx::make_unique<RemoteCommandRunnerImpl>(0),
- std::unique_ptr<executor::TaskExecutor>{nullptr},
- catalogManager.get());
+ //
+ // Get ready to install loaded metadata if needed
+ //
- grid.init(std::move(catalogManager), std::move(shardRegistry));
+ CollectionMetadataPtr afterMetadata;
+ ChunkVersion afterShardVersion;
+ ChunkVersion afterCollVersion;
+ ChunkVersion::VersionChoice choice;
- _enabled = true;
- }
+ // If we choose to install the new metadata, this describes the kind of install
+ enum InstallType {
+ InstallType_New,
+ InstallType_Update,
+ InstallType_Replace,
+ InstallType_Drop,
+ InstallType_None
+ } installType = InstallType_None; // compiler complains otherwise
- Status ShardingState::doRefreshMetadata( OperationContext* txn,
- const string& ns,
- const ChunkVersion& reqShardVersion,
- bool useRequestedVersion,
- ChunkVersion* latestShardVersion )
{
- // The idea here is that we're going to reload the metadata from the config server, but
- // we need to do so outside any locks. When we get our result back, if the current metadata
- // has changed, we may not be able to install the new metadata.
+ // Exclusive collection lock needed since we're now potentially changing the metadata,
+ // and don't want reads/writes to be ongoing.
+ ScopedTransaction transaction(txn, MODE_IX);
+ Lock::DBLock dbLock(txn->lockState(), nsToDatabaseSubstring(ns), MODE_IX);
+ Lock::CollectionLock collLock(txn->lockState(), ns, MODE_X);
//
- // Get the initial metadata
- // No DBLock is needed since the metadata is expected to change during reload.
+ // Get the metadata now that the load has completed
//
- CollectionMetadataPtr beforeMetadata;
-
- {
- stdx::lock_guard<stdx::mutex> lk( _mutex );
-
- // We can't reload if sharding is not enabled - i.e. without a config server location
- if (!_enabled) {
- string errMsg = str::stream() << "cannot refresh metadata for " << ns
- << " before sharding has been enabled";
-
- warning() << errMsg;
- return Status(ErrorCodes::NotYetInitialized, errMsg);
- }
-
- // We also can't reload if a shard name has not yet been set.
- if (_shardName.empty()) {
-
- string errMsg = str::stream() << "cannot refresh metadata for " << ns
- << " before shard name has been set";
-
- warning() << errMsg;
- return Status(ErrorCodes::NotYetInitialized, errMsg);
- }
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
- CollectionMetadataMap::iterator it = _collMetadata.find(ns);
- if (it != _collMetadata.end()) {
- beforeMetadata = it->second;
- }
- }
+ // Don't reload if our config server has changed or sharding is no longer enabled
+ if (!_enabled) {
+ string errMsg = str::stream() << "could not refresh metadata for " << ns
+ << ", sharding is no longer enabled";
- ChunkVersion beforeShardVersion;
- ChunkVersion beforeCollVersion;
- if ( beforeMetadata ) {
- beforeShardVersion = beforeMetadata->getShardVersion();
- beforeCollVersion = beforeMetadata->getCollVersion();
+ warning() << errMsg;
+ return Status(ErrorCodes::NotYetInitialized, errMsg);
}
- *latestShardVersion = beforeShardVersion;
-
- //
- // Determine whether we need to diff or fully reload
- //
+ CollectionMetadataMap::iterator it = _collMetadata.find(ns);
+ if (it != _collMetadata.end())
+ afterMetadata = it->second;
- bool fullReload = false;
- if ( !beforeMetadata ) {
- // We don't have any metadata to reload from
- fullReload = true;
- }
- else if ( useRequestedVersion && reqShardVersion.epoch() != beforeShardVersion.epoch() ) {
- // It's not useful to use the metadata as a base because we think the epoch will differ
- fullReload = true;
+ if (afterMetadata) {
+ afterShardVersion = afterMetadata->getShardVersion();
+ afterCollVersion = afterMetadata->getCollVersion();
}
+ *latestShardVersion = afterShardVersion;
+
//
- // Load the metadata from the remote server, start construction
+ // Resolve newer pending chunks with the remote metadata, finish construction
//
- LOG( 0 ) << "remotely refreshing metadata for " << ns
- << ( useRequestedVersion ?
- string( " with requested shard version " ) + reqShardVersion.toString() : "" )
- << ( fullReload ?
- ", current shard version is " : " based on current shard version " )
- << beforeShardVersion
- << ", current metadata version is " << beforeCollVersion << endl;
-
- string errMsg;
-
- MetadataLoader mdLoader;
- CollectionMetadata* remoteMetadataRaw = new CollectionMetadata();
- CollectionMetadataPtr remoteMetadata( remoteMetadataRaw );
-
- Timer refreshTimer;
- Status status =
- mdLoader.makeCollectionMetadata(grid.catalogManager(),
- ns,
- getShardName(),
- fullReload ? NULL : beforeMetadata.get(),
- remoteMetadataRaw);
- long long refreshMillis = refreshTimer.millis();
-
- if ( status.code() == ErrorCodes::NamespaceNotFound ) {
- remoteMetadata.reset();
- remoteMetadataRaw = NULL;
- }
- else if ( !status.isOK() ) {
+ status = mdLoader.promotePendingChunks(afterMetadata.get(), remoteMetadataRaw);
- warning() << "could not remotely refresh metadata for " << ns
- << causedBy( status.reason() ) << endl;
+ if (!status.isOK()) {
+ warning() << "remote metadata for " << ns
+ << " is inconsistent with current pending chunks" << causedBy(status.reason())
+ << endl;
return status;
}
- ChunkVersion remoteShardVersion;
- ChunkVersion remoteCollVersion;
- if ( remoteMetadata ) {
- remoteShardVersion = remoteMetadata->getShardVersion();
- remoteCollVersion = remoteMetadata->getCollVersion();
- }
-
//
- // Get ready to install loaded metadata if needed
+ // Compare the 'before', 'after', and 'remote' versions/epochs and choose newest
+ // Zero-epochs (sentinel value for "dropped" collections), are tested by
+ // !epoch.isSet().
//
- CollectionMetadataPtr afterMetadata;
- ChunkVersion afterShardVersion;
- ChunkVersion afterCollVersion;
- ChunkVersion::VersionChoice choice;
-
- // If we choose to install the new metadata, this describes the kind of install
- enum InstallType {
- InstallType_New, InstallType_Update, InstallType_Replace, InstallType_Drop,
- InstallType_None
- } installType = InstallType_None; // compiler complains otherwise
-
- {
- // Exclusive collection lock needed since we're now potentially changing the metadata,
- // and don't want reads/writes to be ongoing.
- ScopedTransaction transaction(txn, MODE_IX);
- Lock::DBLock dbLock(txn->lockState(), nsToDatabaseSubstring(ns), MODE_IX);
- Lock::CollectionLock collLock(txn->lockState(), ns, MODE_X);
-
- //
- // Get the metadata now that the load has completed
- //
-
- stdx::lock_guard<stdx::mutex> lk( _mutex );
-
- // Don't reload if our config server has changed or sharding is no longer enabled
- if (!_enabled) {
- string errMsg = str::stream() << "could not refresh metadata for " << ns
- << ", sharding is no longer enabled";
-
- warning() << errMsg;
- return Status(ErrorCodes::NotYetInitialized, errMsg);
+ choice = ChunkVersion::chooseNewestVersion(
+ beforeCollVersion, afterCollVersion, remoteCollVersion);
+
+ if (choice == ChunkVersion::VersionChoice_Remote) {
+ dassert(!remoteCollVersion.epoch().isSet() || remoteShardVersion >= beforeShardVersion);
+
+ if (!afterCollVersion.epoch().isSet()) {
+ // First metadata load
+ installType = InstallType_New;
+ dassert(it == _collMetadata.end());
+ _collMetadata.insert(make_pair(ns, remoteMetadata));
+ } else if (remoteCollVersion.epoch().isSet() &&
+ remoteCollVersion.epoch() == afterCollVersion.epoch()) {
+ // Update to existing metadata
+ installType = InstallType_Update;
+
+ // Invariant: If CollMetadata was not found, version should be have been 0.
+ dassert(it != _collMetadata.end());
+ it->second = remoteMetadata;
+ } else if (remoteCollVersion.epoch().isSet()) {
+ // New epoch detected, replacing metadata
+ installType = InstallType_Replace;
+
+ // Invariant: If CollMetadata was not found, version should be have been 0.
+ dassert(it != _collMetadata.end());
+ it->second = remoteMetadata;
+ } else {
+ dassert(!remoteCollVersion.epoch().isSet());
+
+ // Drop detected
+ installType = InstallType_Drop;
+ _collMetadata.erase(it);
}
- CollectionMetadataMap::iterator it = _collMetadata.find( ns );
- if ( it != _collMetadata.end() ) afterMetadata = it->second;
+ *latestShardVersion = remoteShardVersion;
+ }
+ }
+ // End _mutex
+ // End DBWrite
+
+ //
+ // Do messaging based on what happened above
+ //
+ string localShardVersionMsg = beforeShardVersion.epoch() == afterShardVersion.epoch()
+ ? afterShardVersion.toString()
+ : beforeShardVersion.toString() + " / " + afterShardVersion.toString();
+
+ if (choice == ChunkVersion::VersionChoice_Unknown) {
+ string errMsg = str::stream()
+ << "need to retry loading metadata for " << ns
+ << ", collection may have been dropped or recreated during load"
+ << " (loaded shard version : " << remoteShardVersion.toString()
+ << ", stored shard versions : " << localShardVersionMsg << ", took " << refreshMillis
+ << "ms)";
+
+ warning() << errMsg;
+ return Status(ErrorCodes::RemoteChangeDetected, errMsg);
+ }
- if ( afterMetadata ) {
- afterShardVersion = afterMetadata->getShardVersion();
- afterCollVersion = afterMetadata->getCollVersion();
- }
+ if (choice == ChunkVersion::VersionChoice_Local) {
+ LOG(0) << "metadata of collection " << ns
+ << " already up to date (shard version : " << afterShardVersion.toString()
+ << ", took " << refreshMillis << "ms)" << endl;
+ return Status::OK();
+ }
- *latestShardVersion = afterShardVersion;
+ dassert(choice == ChunkVersion::VersionChoice_Remote);
- //
- // Resolve newer pending chunks with the remote metadata, finish construction
- //
+ switch (installType) {
+ case InstallType_New:
+ LOG(0) << "collection " << ns << " was previously unsharded"
+ << ", new metadata loaded with shard version " << remoteShardVersion << endl;
+ break;
+ case InstallType_Update:
+ LOG(0) << "updating metadata for " << ns << " from shard version "
+ << localShardVersionMsg << " to shard version " << remoteShardVersion << endl;
+ break;
+ case InstallType_Replace:
+ LOG(0) << "replacing metadata for " << ns << " at shard version "
+ << localShardVersionMsg << " with a new epoch (shard version "
+ << remoteShardVersion << ")" << endl;
+ break;
+ case InstallType_Drop:
+ LOG(0) << "dropping metadata for " << ns << " at shard version " << localShardVersionMsg
+ << ", took " << refreshMillis << "ms" << endl;
+ break;
+ default:
+ verify(false);
+ break;
+ }
- status = mdLoader.promotePendingChunks( afterMetadata.get(), remoteMetadataRaw );
+ if (installType != InstallType_Drop) {
+ LOG(0) << "collection version was loaded at version " << remoteCollVersion << ", took "
+ << refreshMillis << "ms" << endl;
+ }
- if ( !status.isOK() ) {
+ return Status::OK();
+}
- warning() << "remote metadata for " << ns
- << " is inconsistent with current pending chunks"
- << causedBy( status.reason() ) << endl;
+void ShardingState::appendInfo(BSONObjBuilder& builder) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
- return status;
- }
+ builder.appendBool("enabled", _enabled);
+ if (!_enabled) {
+ return;
+ }
- //
- // Compare the 'before', 'after', and 'remote' versions/epochs and choose newest
- // Zero-epochs (sentinel value for "dropped" collections), are tested by
- // !epoch.isSet().
- //
+ builder.append("configServer", grid.catalogManager()->connectionString().toString());
+ builder.append("shardName", _shardName);
- choice = ChunkVersion::chooseNewestVersion( beforeCollVersion,
- afterCollVersion,
- remoteCollVersion );
+ BSONObjBuilder versionB(builder.subobjStart("versions"));
+ for (CollectionMetadataMap::const_iterator it = _collMetadata.begin();
+ it != _collMetadata.end();
+ ++it) {
+ CollectionMetadataPtr metadata = it->second;
+ versionB.appendTimestamp(it->first, metadata->getShardVersion().toLong());
+ }
- if ( choice == ChunkVersion::VersionChoice_Remote ) {
- dassert(!remoteCollVersion.epoch().isSet() ||
- remoteShardVersion >= beforeShardVersion);
+ versionB.done();
+}
- if ( !afterCollVersion.epoch().isSet() ) {
+bool ShardingState::needCollectionMetadata(Client* client, const string& ns) const {
+ if (!_enabled)
+ return false;
- // First metadata load
- installType = InstallType_New;
- dassert( it == _collMetadata.end() );
- _collMetadata.insert( make_pair( ns, remoteMetadata ) );
- }
- else if ( remoteCollVersion.epoch().isSet() &&
- remoteCollVersion.epoch() == afterCollVersion.epoch() ) {
+ if (!ShardedConnectionInfo::get(client, false))
+ return false;
- // Update to existing metadata
- installType = InstallType_Update;
+ return true;
+}
- // Invariant: If CollMetadata was not found, version should be have been 0.
- dassert( it != _collMetadata.end() );
- it->second = remoteMetadata;
- }
- else if ( remoteCollVersion.epoch().isSet() ) {
+CollectionMetadataPtr ShardingState::getCollectionMetadata(const string& ns) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
- // New epoch detected, replacing metadata
- installType = InstallType_Replace;
+ CollectionMetadataMap::const_iterator it = _collMetadata.find(ns);
+ if (it == _collMetadata.end()) {
+ return CollectionMetadataPtr();
+ } else {
+ return it->second;
+ }
+}
- // Invariant: If CollMetadata was not found, version should be have been 0.
- dassert( it != _collMetadata.end() );
- it->second = remoteMetadata;
- }
- else {
- dassert( !remoteCollVersion.epoch().isSet() );
+ShardingState shardingState;
- // Drop detected
- installType = InstallType_Drop;
- _collMetadata.erase( it );
- }
+// -----ShardingState END ----
- *latestShardVersion = remoteShardVersion;
- }
- }
- // End _mutex
- // End DBWrite
+// -----ShardedConnectionInfo START ----
- //
- // Do messaging based on what happened above
- //
- string localShardVersionMsg =
- beforeShardVersion.epoch() == afterShardVersion.epoch() ?
- afterShardVersion.toString() :
- beforeShardVersion.toString() + " / " + afterShardVersion.toString();
+ShardedConnectionInfo::ShardedConnectionInfo() {
+ _forceVersionOk = false;
+}
- if ( choice == ChunkVersion::VersionChoice_Unknown ) {
+ShardedConnectionInfo* ShardedConnectionInfo::get(Client* client, bool create) {
+ auto& current = clientSCI(client);
- string errMsg = str::stream()
- << "need to retry loading metadata for " << ns
- << ", collection may have been dropped or recreated during load"
- << " (loaded shard version : " << remoteShardVersion.toString()
- << ", stored shard versions : " << localShardVersionMsg
- << ", took " << refreshMillis << "ms)";
+ if (!current && create) {
+ LOG(1) << "entering shard mode for connection" << endl;
+ current = boost::in_place();
+ }
- warning() << errMsg;
- return Status( ErrorCodes::RemoteChangeDetected, errMsg );
- }
+ return current ? &current.value() : nullptr;
+}
- if ( choice == ChunkVersion::VersionChoice_Local ) {
+void ShardedConnectionInfo::reset(Client* client) {
+ clientSCI(client) = boost::none;
+}
- LOG( 0 ) << "metadata of collection " << ns << " already up to date (shard version : "
- << afterShardVersion.toString() << ", took " << refreshMillis << "ms)"
- << endl;
- return Status::OK();
- }
+const ChunkVersion ShardedConnectionInfo::getVersion(const string& ns) const {
+ NSVersionMap::const_iterator it = _versions.find(ns);
+ if (it != _versions.end()) {
+ return it->second;
+ } else {
+ return ChunkVersion(0, 0, OID());
+ }
+}
- dassert( choice == ChunkVersion::VersionChoice_Remote );
+void ShardedConnectionInfo::setVersion(const string& ns, const ChunkVersion& version) {
+ _versions[ns] = version;
+}
- switch( installType ) {
- case InstallType_New:
- LOG( 0 ) << "collection " << ns << " was previously unsharded"
- << ", new metadata loaded with shard version " << remoteShardVersion
- << endl;
- break;
- case InstallType_Update:
- LOG( 0 ) << "updating metadata for " << ns << " from shard version "
- << localShardVersionMsg << " to shard version " << remoteShardVersion
- << endl;
- break;
- case InstallType_Replace:
- LOG( 0 ) << "replacing metadata for " << ns << " at shard version "
- << localShardVersionMsg << " with a new epoch (shard version "
- << remoteShardVersion << ")" << endl;
- break;
- case InstallType_Drop:
- LOG( 0 ) << "dropping metadata for " << ns << " at shard version "
- << localShardVersionMsg << ", took " << refreshMillis << "ms" << endl;
- break;
- default:
- verify( false );
- break;
- }
+void ShardedConnectionInfo::addHook() {
+ static stdx::mutex lock;
+ static bool done = false;
- if ( installType != InstallType_Drop ) {
- LOG( 0 ) << "collection version was loaded at version " << remoteCollVersion
- << ", took " << refreshMillis << "ms" << endl;
- }
+ stdx::lock_guard<stdx::mutex> lk(lock);
+ if (!done) {
+ log() << "first cluster operation detected, adding sharding hook to enable versioning "
+ "and authentication to remote servers";
- return Status::OK();
+ globalConnPool.addHook(new ShardingConnectionHook(false));
+ shardConnectionPool.addHook(new ShardingConnectionHook(true));
+ done = true;
}
+}
- void ShardingState::appendInfo(BSONObjBuilder& builder) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+class MongodShardCommand : public Command {
+public:
+ MongodShardCommand(const char* n) : Command(n) {}
+ virtual bool slaveOk() const {
+ return false;
+ }
+ virtual bool adminOnly() const {
+ return true;
+ }
+};
- builder.appendBool("enabled", _enabled);
- if (!_enabled) {
- return;
- }
- builder.append("configServer", grid.catalogManager()->connectionString().toString());
- builder.append("shardName", _shardName);
+bool haveLocalShardingInfo(Client* client, const string& ns) {
+ if (!shardingState.enabled())
+ return false;
- BSONObjBuilder versionB(builder.subobjStart("versions"));
- for (CollectionMetadataMap::const_iterator it = _collMetadata.begin();
- it != _collMetadata.end();
- ++it) {
+ if (!shardingState.hasVersion(ns))
+ return false;
- CollectionMetadataPtr metadata = it->second;
- versionB.appendTimestamp(it->first, metadata->getShardVersion().toLong());
- }
+ return ShardedConnectionInfo::get(client, false) != NULL;
+}
- versionB.done();
- }
+class UnsetShardingCommand : public MongodShardCommand {
+public:
+ UnsetShardingCommand() : MongodShardCommand("unsetSharding") {}
- bool ShardingState::needCollectionMetadata( Client* client, const string& ns ) const {
- if ( ! _enabled )
- return false;
+ virtual void help(stringstream& help) const {
+ help << "internal";
+ }
- if ( ! ShardedConnectionInfo::get( client, false ) )
- return false;
+ virtual bool isWriteCommandForConfigServer() const {
+ return false;
+ }
+ virtual bool slaveOk() const {
return true;
}
- CollectionMetadataPtr ShardingState::getCollectionMetadata( const string& ns ) {
- stdx::lock_guard<stdx::mutex> lk( _mutex );
-
- CollectionMetadataMap::const_iterator it = _collMetadata.find( ns );
- if ( it == _collMetadata.end() ) {
- return CollectionMetadataPtr();
- }
- else {
- return it->second;
- }
+ virtual void addRequiredPrivileges(const std::string& dbname,
+ const BSONObj& cmdObj,
+ std::vector<Privilege>* out) {
+ ActionSet actions;
+ actions.addAction(ActionType::internal);
+ out->push_back(Privilege(ResourcePattern::forClusterResource(), actions));
}
- ShardingState shardingState;
-
- // -----ShardingState END ----
-
- // -----ShardedConnectionInfo START ----
-
- ShardedConnectionInfo::ShardedConnectionInfo() {
- _forceVersionOk = false;
+ bool run(OperationContext* txn,
+ const string&,
+ BSONObj& cmdObj,
+ int,
+ string& errmsg,
+ BSONObjBuilder& result) {
+ ShardedConnectionInfo::reset(txn->getClient());
+ return true;
}
- ShardedConnectionInfo* ShardedConnectionInfo::get( Client* client, bool create ) {
- auto& current = clientSCI(client);
+} unsetShardingCommand;
- if (!current && create) {
- LOG(1) << "entering shard mode for connection" << endl;
- current = boost::in_place();
- }
+class SetShardVersion : public MongodShardCommand {
+public:
+ SetShardVersion() : MongodShardCommand("setShardVersion") {}
- return current ? &current.value() : nullptr;
+ virtual void help(stringstream& help) const {
+ help << "internal";
}
- void ShardedConnectionInfo::reset(Client* client) {
- clientSCI(client) = boost::none;
+ virtual bool slaveOk() const {
+ return true;
}
-
- const ChunkVersion ShardedConnectionInfo::getVersion( const string& ns ) const {
- NSVersionMap::const_iterator it = _versions.find( ns );
- if ( it != _versions.end() ) {
- return it->second;
- }
- else {
- return ChunkVersion( 0, 0, OID() );
- }
+ virtual bool isWriteCommandForConfigServer() const {
+ return false;
}
- void ShardedConnectionInfo::setVersion( const string& ns , const ChunkVersion& version ) {
- _versions[ns] = version;
+ virtual void addRequiredPrivileges(const std::string& dbname,
+ const BSONObj& cmdObj,
+ std::vector<Privilege>* out) {
+ ActionSet actions;
+ actions.addAction(ActionType::internal);
+ out->push_back(Privilege(ResourcePattern::forClusterResource(), actions));
}
- void ShardedConnectionInfo::addHook() {
- static stdx::mutex lock;
- static bool done = false;
+ bool checkConfigOrInit(OperationContext* txn,
+ const string& configdb,
+ bool authoritative,
+ string& errmsg,
+ BSONObjBuilder& result,
+ bool locked = false) const {
+ if (configdb.size() == 0) {
+ errmsg = "no configdb";
+ return false;
+ }
- stdx::lock_guard<stdx::mutex> lk(lock);
- if (!done) {
- log() << "first cluster operation detected, adding sharding hook to enable versioning "
- "and authentication to remote servers";
+ if (shardingState.enabled()) {
+ if (configdb == shardingState.getConfigServer())
+ return true;
- globalConnPool.addHook(new ShardingConnectionHook(false));
- shardConnectionPool.addHook(new ShardingConnectionHook(true));
- done = true;
- }
- }
+ result.append("configdb",
+ BSON("stored" << shardingState.getConfigServer() << "given" << configdb));
- class MongodShardCommand : public Command {
- public:
- MongodShardCommand( const char * n ) : Command( n ) {
- }
- virtual bool slaveOk() const {
+ errmsg = str::stream() << "mongos specified a different config database string : "
+ << "stored : " << shardingState.getConfigServer()
+ << " vs given : " << configdb;
return false;
}
- virtual bool adminOnly() const {
- return true;
- }
- };
-
- bool haveLocalShardingInfo( Client* client, const string& ns ) {
- if ( ! shardingState.enabled() )
+ if (!authoritative) {
+ result.appendBool("need_authoritative", true);
+ errmsg = "first setShardVersion";
return false;
+ }
- if ( ! shardingState.hasVersion( ns ) )
- return false;
+ if (locked) {
+ ShardingState::initialize(configdb);
+ return true;
+ }
- return ShardedConnectionInfo::get(client, false) != NULL;
+ ScopedTransaction transaction(txn, MODE_X);
+ Lock::GlobalWrite lk(txn->lockState());
+ return checkConfigOrInit(txn, configdb, authoritative, errmsg, result, true);
}
- class UnsetShardingCommand : public MongodShardCommand {
- public:
- UnsetShardingCommand() : MongodShardCommand("unsetSharding") {}
-
- virtual void help( stringstream& help ) const {
- help << "internal";
+ bool run(OperationContext* txn,
+ const string&,
+ BSONObj& cmdObj,
+ int,
+ string& errmsg,
+ BSONObjBuilder& result) {
+ // Compatibility error for < v3.0 mongoses still active in the cluster
+ // TODO: Remove post-3.0
+ if (!cmdObj["serverID"].eoo()) {
+ // This mongos is too old to talk to us
+ string errMsg = stream() << "v3.0 mongod is incompatible with v2.6 mongos, "
+ << "a v2.6 mongos may be running in the v3.0 cluster at "
+ << txn->getClient()->clientAddress(false);
+ error() << errMsg;
+ return appendCommandStatus(result, Status(ErrorCodes::ProtocolError, errMsg));
}
- virtual bool isWriteCommandForConfigServer() const { return false; }
+ // Steps
+ // 1. check basic config
+ // 2. extract params from command
+ // 3. fast check
+ // 4. slow check (LOCKS)
- virtual bool slaveOk() const { return true; }
+ // step 1
- virtual void addRequiredPrivileges(const std::string& dbname,
- const BSONObj& cmdObj,
- std::vector<Privilege>* out) {
- ActionSet actions;
- actions.addAction(ActionType::internal);
- out->push_back(Privilege(ResourcePattern::forClusterResource(), actions));
+ Client* client = txn->getClient();
+ LastError::get(client).disable();
+ ShardedConnectionInfo* info = ShardedConnectionInfo::get(client, true);
+
+ bool authoritative = cmdObj.getBoolField("authoritative");
+
+ // check config server is ok or enable sharding
+ if (!checkConfigOrInit(
+ txn, cmdObj["configdb"].valuestrsafe(), authoritative, errmsg, result)) {
+ return false;
}
- bool run(OperationContext* txn,
- const string&,
- BSONObj& cmdObj,
- int,
- string& errmsg,
- BSONObjBuilder& result) {
- ShardedConnectionInfo::reset(txn->getClient());
- return true;
+ // check shard name is correct
+ if (cmdObj["shard"].type() == String) {
+ // The shard host is also sent when using setShardVersion, report this host if there
+ // is an error.
+ shardingState.gotShardNameAndHost(cmdObj["shard"].String(), cmdObj["shardHost"].str());
}
- } unsetShardingCommand;
+ // Handle initial shard connection
+ if (cmdObj["version"].eoo() && cmdObj["init"].trueValue()) {
+ result.append("initialized", true);
- class SetShardVersion : public MongodShardCommand {
- public:
- SetShardVersion() : MongodShardCommand("setShardVersion") {}
+ // Send back wire version to let mongos know what protocol we can speak
+ result.append("minWireVersion", minWireVersion);
+ result.append("maxWireVersion", maxWireVersion);
- virtual void help( stringstream& help ) const {
- help << "internal";
+ return true;
}
- virtual bool slaveOk() const { return true; }
- virtual bool isWriteCommandForConfigServer() const { return false; }
-
- virtual void addRequiredPrivileges(const std::string& dbname,
- const BSONObj& cmdObj,
- std::vector<Privilege>* out) {
- ActionSet actions;
- actions.addAction(ActionType::internal);
- out->push_back(Privilege(ResourcePattern::forClusterResource(), actions));
+ string ns = cmdObj["setShardVersion"].valuestrsafe();
+ if (ns.size() == 0) {
+ errmsg = "need to specify namespace";
+ return false;
}
- bool checkConfigOrInit(OperationContext* txn,
- const string& configdb,
- bool authoritative,
- string& errmsg,
- BSONObjBuilder& result,
- bool locked = false ) const {
- if ( configdb.size() == 0 ) {
- errmsg = "no configdb";
- return false;
- }
-
- if ( shardingState.enabled() ) {
- if ( configdb == shardingState.getConfigServer() )
- return true;
-
- result.append( "configdb" , BSON( "stored" << shardingState.getConfigServer() <<
- "given" << configdb ) );
-
- errmsg = str::stream() << "mongos specified a different config database string : "
- << "stored : " << shardingState.getConfigServer()
- << " vs given : " << configdb;
- return false;
- }
-
- if ( ! authoritative ) {
- result.appendBool( "need_authoritative" , true );
- errmsg = "first setShardVersion";
- return false;
- }
-
- if ( locked ) {
- ShardingState::initialize(configdb);
- return true;
- }
- ScopedTransaction transaction(txn, MODE_X);
- Lock::GlobalWrite lk(txn->lockState());
- return checkConfigOrInit(txn, configdb, authoritative, errmsg, result, true);
+ // we can run on a slave up to here
+ if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesForDatabase(
+ nsToDatabase(ns))) {
+ result.append("errmsg", "not master");
+ result.append("note", "from post init in setShardVersion");
+ return false;
}
- bool run(OperationContext* txn,
- const string&,
- BSONObj& cmdObj,
- int,
- string& errmsg,
- BSONObjBuilder& result) {
-
- // Compatibility error for < v3.0 mongoses still active in the cluster
- // TODO: Remove post-3.0
- if (!cmdObj["serverID"].eoo()) {
-
- // This mongos is too old to talk to us
- string errMsg = stream() << "v3.0 mongod is incompatible with v2.6 mongos, "
- << "a v2.6 mongos may be running in the v3.0 cluster at "
- << txn->getClient()->clientAddress(false);
- error() << errMsg;
- return appendCommandStatus(result, Status(ErrorCodes::ProtocolError, errMsg));
- }
+ // step 2
+ if (!ChunkVersion::canParseBSON(cmdObj, "version")) {
+ errmsg = "need to specify version";
+ return false;
+ }
- // Steps
- // 1. check basic config
- // 2. extract params from command
- // 3. fast check
- // 4. slow check (LOCKS)
-
- // step 1
-
- Client* client = txn->getClient();
- LastError::get(client).disable();
- ShardedConnectionInfo* info = ShardedConnectionInfo::get( client, true );
-
- bool authoritative = cmdObj.getBoolField( "authoritative" );
-
- // check config server is ok or enable sharding
- if (!checkConfigOrInit(
- txn, cmdObj["configdb"].valuestrsafe(), authoritative, errmsg, result)) {
- return false;
- }
+ const ChunkVersion version = ChunkVersion::fromBSON(cmdObj, "version");
- // check shard name is correct
- if ( cmdObj["shard"].type() == String ) {
- // The shard host is also sent when using setShardVersion, report this host if there
- // is an error.
- shardingState.gotShardNameAndHost( cmdObj["shard"].String(),
- cmdObj["shardHost"].str() );
- }
-
- // Handle initial shard connection
- if( cmdObj["version"].eoo() && cmdObj["init"].trueValue() ){
+ // step 3
- result.append( "initialized", true );
+ const ChunkVersion oldVersion = info->getVersion(ns);
+ const ChunkVersion globalVersion = shardingState.getVersion(ns);
- // Send back wire version to let mongos know what protocol we can speak
- result.append( "minWireVersion", minWireVersion );
- result.append( "maxWireVersion", maxWireVersion );
+ oldVersion.addToBSON(result, "oldVersion");
- return true;
+ if (version.isWriteCompatibleWith(globalVersion)) {
+ // mongos and mongod agree!
+ if (!oldVersion.isWriteCompatibleWith(version)) {
+ if (oldVersion < globalVersion && oldVersion.hasEqualEpoch(globalVersion)) {
+ info->setVersion(ns, version);
+ } else if (authoritative) {
+ // this means there was a drop and our version is reset
+ info->setVersion(ns, version);
+ } else {
+ result.append("ns", ns);
+ result.appendBool("need_authoritative", true);
+ errmsg = "verifying drop on '" + ns + "'";
+ return false;
+ }
}
- string ns = cmdObj["setShardVersion"].valuestrsafe();
- if ( ns.size() == 0 ) {
- errmsg = "need to specify namespace";
- return false;
- }
+ return true;
+ }
+ // step 4
+ // Cases below all either return OR fall-through to remote metadata reload.
+ const bool isDropRequested = !version.isSet() && globalVersion.isSet();
- // we can run on a slave up to here
- if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesForDatabase(
- nsToDatabase(ns))) {
- result.append( "errmsg" , "not master" );
- result.append( "note" , "from post init in setShardVersion" );
+ if (isDropRequested) {
+ if (!authoritative) {
+ result.appendBool("need_authoritative", true);
+ result.append("ns", ns);
+ globalVersion.addToBSON(result, "globalVersion");
+ errmsg = "dropping needs to be authoritative";
return false;
}
- // step 2
- if( ! ChunkVersion::canParseBSON( cmdObj, "version" ) ){
- errmsg = "need to specify version";
+ // Fall through to metadata reload below
+ } else {
+ // Not Dropping
+
+ // TODO: Refactor all of this
+ if (version < oldVersion && version.hasEqualEpoch(oldVersion)) {
+ errmsg = str::stream() << "this connection already had a newer version "
+ << "of collection '" << ns << "'";
+ result.append("ns", ns);
+ version.addToBSON(result, "newVersion");
+ globalVersion.addToBSON(result, "globalVersion");
return false;
}
- const ChunkVersion version = ChunkVersion::fromBSON( cmdObj, "version" );
-
- // step 3
-
- const ChunkVersion oldVersion = info->getVersion(ns);
- const ChunkVersion globalVersion = shardingState.getVersion(ns);
-
- oldVersion.addToBSON( result, "oldVersion" );
-
- if ( version.isWriteCompatibleWith( globalVersion )) {
- // mongos and mongod agree!
- if ( !oldVersion.isWriteCompatibleWith( version )) {
- if ( oldVersion < globalVersion &&
- oldVersion.hasEqualEpoch( globalVersion )) {
- info->setVersion( ns, version );
- }
- else if ( authoritative ) {
- // this means there was a drop and our version is reset
- info->setVersion( ns, version );
- }
- else {
- result.append( "ns", ns );
- result.appendBool( "need_authoritative", true );
- errmsg = "verifying drop on '" + ns + "'";
- return false;
- }
- }
-
- return true;
- }
-
- // step 4
- // Cases below all either return OR fall-through to remote metadata reload.
- const bool isDropRequested = !version.isSet() && globalVersion.isSet();
-
- if (isDropRequested) {
- if ( ! authoritative ) {
- result.appendBool( "need_authoritative" , true );
- result.append( "ns" , ns );
- globalVersion.addToBSON( result, "globalVersion" );
- errmsg = "dropping needs to be authoritative";
- return false;
+ // TODO: Refactor all of this
+ if (version < globalVersion && version.hasEqualEpoch(globalVersion)) {
+ while (shardingState.inCriticalMigrateSection()) {
+ log() << "waiting till out of critical section" << endl;
+ shardingState.waitTillNotInCriticalSection(10);
}
-
- // Fall through to metadata reload below
+ errmsg = str::stream() << "shard global version for collection is higher "
+ << "than trying to set to '" << ns << "'";
+ result.append("ns", ns);
+ version.addToBSON(result, "version");
+ globalVersion.addToBSON(result, "globalVersion");
+ result.appendBool("reloadConfig", true);
+ return false;
}
- else {
- // Not Dropping
-
- // TODO: Refactor all of this
- if ( version < oldVersion && version.hasEqualEpoch( oldVersion ) ) {
- errmsg = str::stream() << "this connection already had a newer version "
- << "of collection '" << ns << "'";
- result.append( "ns" , ns );
- version.addToBSON( result, "newVersion" );
- globalVersion.addToBSON( result, "globalVersion" );
- return false;
- }
-
- // TODO: Refactor all of this
- if ( version < globalVersion && version.hasEqualEpoch( globalVersion ) ) {
- while ( shardingState.inCriticalMigrateSection() ) {
- log() << "waiting till out of critical section" << endl;
- shardingState.waitTillNotInCriticalSection( 10 );
- }
- errmsg = str::stream() << "shard global version for collection is higher "
- << "than trying to set to '" << ns << "'";
- result.append( "ns" , ns );
- version.addToBSON( result, "version" );
- globalVersion.addToBSON( result, "globalVersion" );
- result.appendBool( "reloadConfig" , true );
- return false;
- }
- if ( ! globalVersion.isSet() && ! authoritative ) {
- // Needed b/c when the last chunk is moved off a shard,
- // the version gets reset to zero, which should require a reload.
- while ( shardingState.inCriticalMigrateSection() ) {
- log() << "waiting till out of critical section" << endl;
- shardingState.waitTillNotInCriticalSection( 10 );
- }
-
- // need authoritative for first look
- result.append( "ns" , ns );
- result.appendBool( "need_authoritative" , true );
- errmsg = "first time for collection '" + ns + "'";
- return false;
+ if (!globalVersion.isSet() && !authoritative) {
+ // Needed b/c when the last chunk is moved off a shard,
+ // the version gets reset to zero, which should require a reload.
+ while (shardingState.inCriticalMigrateSection()) {
+ log() << "waiting till out of critical section" << endl;
+ shardingState.waitTillNotInCriticalSection(10);
}
- // Fall through to metadata reload below
+ // need authoritative for first look
+ result.append("ns", ns);
+ result.appendBool("need_authoritative", true);
+ errmsg = "first time for collection '" + ns + "'";
+ return false;
}
- ChunkVersion currVersion;
- Status status = shardingState.refreshMetadataIfNeeded(txn, ns, version, &currVersion);
+ // Fall through to metadata reload below
+ }
- if (!status.isOK()) {
+ ChunkVersion currVersion;
+ Status status = shardingState.refreshMetadataIfNeeded(txn, ns, version, &currVersion);
- // The reload itself was interrupted or confused here
+ if (!status.isOK()) {
+ // The reload itself was interrupted or confused here
- errmsg = str::stream() << "could not refresh metadata for " << ns
- << " with requested shard version " << version.toString()
- << ", stored shard version is " << currVersion.toString()
- << causedBy( status.reason() );
+ errmsg = str::stream() << "could not refresh metadata for " << ns
+ << " with requested shard version " << version.toString()
+ << ", stored shard version is " << currVersion.toString()
+ << causedBy(status.reason());
- warning() << errmsg << endl;
+ warning() << errmsg << endl;
- result.append( "ns" , ns );
- version.addToBSON( result, "version" );
- currVersion.addToBSON( result, "globalVersion" );
- result.appendBool( "reloadConfig", true );
+ result.append("ns", ns);
+ version.addToBSON(result, "version");
+ currVersion.addToBSON(result, "globalVersion");
+ result.appendBool("reloadConfig", true);
- return false;
+ return false;
+ } else if (!version.isWriteCompatibleWith(currVersion)) {
+ // We reloaded a version that doesn't match the version mongos was trying to
+ // set.
+
+ errmsg = str::stream() << "requested shard version differs from"
+ << " config shard version for " << ns
+ << ", requested version is " << version.toString()
+ << " but found version " << currVersion.toString();
+
+ OCCASIONALLY warning() << errmsg << endl;
+
+ // WARNING: the exact fields below are important for compatibility with mongos
+ // version reload.
+
+ result.append("ns", ns);
+ currVersion.addToBSON(result, "globalVersion");
+
+ // If this was a reset of a collection or the last chunk moved out, inform mongos to
+ // do a full reload.
+ if (currVersion.epoch() != version.epoch() || !currVersion.isSet()) {
+ result.appendBool("reloadConfig", true);
+ // Zero-version also needed to trigger full mongos reload, sadly
+ // TODO: Make this saner, and less impactful (full reload on last chunk is bad)
+ ChunkVersion(0, 0, OID()).addToBSON(result, "version");
+ // For debugging
+ version.addToBSON(result, "origVersion");
+ } else {
+ version.addToBSON(result, "version");
}
- else if ( !version.isWriteCompatibleWith( currVersion ) ) {
-
- // We reloaded a version that doesn't match the version mongos was trying to
- // set.
- errmsg = str::stream() << "requested shard version differs from"
- << " config shard version for " << ns
- << ", requested version is " << version.toString()
- << " but found version " << currVersion.toString();
+ return false;
+ }
- OCCASIONALLY warning() << errmsg << endl;
+ info->setVersion(ns, version);
+ return true;
+ }
- // WARNING: the exact fields below are important for compatibility with mongos
- // version reload.
+} setShardVersionCmd;
- result.append( "ns" , ns );
- currVersion.addToBSON( result, "globalVersion" );
+class GetShardVersion : public MongodShardCommand {
+public:
+ GetShardVersion() : MongodShardCommand("getShardVersion") {}
- // If this was a reset of a collection or the last chunk moved out, inform mongos to
- // do a full reload.
- if (currVersion.epoch() != version.epoch() || !currVersion.isSet() ) {
- result.appendBool( "reloadConfig", true );
- // Zero-version also needed to trigger full mongos reload, sadly
- // TODO: Make this saner, and less impactful (full reload on last chunk is bad)
- ChunkVersion( 0, 0, OID() ).addToBSON( result, "version" );
- // For debugging
- version.addToBSON( result, "origVersion" );
- }
- else {
- version.addToBSON( result, "version" );
- }
+ virtual void help(stringstream& help) const {
+ help << " example: { getShardVersion : 'alleyinsider.foo' } ";
+ }
- return false;
- }
+ virtual bool isWriteCommandForConfigServer() const {
+ return false;
+ }
- info->setVersion( ns , version );
- return true;
+ virtual Status checkAuthForCommand(ClientBasic* client,
+ const std::string& dbname,
+ const BSONObj& cmdObj) {
+ if (!AuthorizationSession::get(client)->isAuthorizedForActionsOnResource(
+ ResourcePattern::forExactNamespace(NamespaceString(parseNs(dbname, cmdObj))),
+ ActionType::getShardVersion)) {
+ return Status(ErrorCodes::Unauthorized, "Unauthorized");
}
+ return Status::OK();
+ }
+ virtual std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const {
+ return parseNsFullyQualified(dbname, cmdObj);
+ }
- } setShardVersionCmd;
-
- class GetShardVersion : public MongodShardCommand {
- public:
- GetShardVersion() : MongodShardCommand("getShardVersion") {}
+ bool run(OperationContext* txn,
+ const string&,
+ BSONObj& cmdObj,
+ int,
+ string& errmsg,
+ BSONObjBuilder& result) {
+ const string ns = cmdObj["getShardVersion"].valuestrsafe();
+ if (ns.size() == 0) {
+ errmsg = "need to specify full namespace";
+ return false;
+ }
- virtual void help( stringstream& help ) const {
- help << " example: { getShardVersion : 'alleyinsider.foo' } ";
+ if (shardingState.enabled()) {
+ result.append("configServer", shardingState.getConfigServer());
+ } else {
+ result.append("configServer", "");
}
- virtual bool isWriteCommandForConfigServer() const { return false; }
+ result.appendTimestamp("global", shardingState.getVersion(ns).toLong());
- virtual Status checkAuthForCommand(ClientBasic* client,
- const std::string& dbname,
- const BSONObj& cmdObj) {
- if (!AuthorizationSession::get(client)->isAuthorizedForActionsOnResource(
- ResourcePattern::forExactNamespace(NamespaceString(parseNs(dbname, cmdObj))),
- ActionType::getShardVersion)) {
- return Status(ErrorCodes::Unauthorized, "Unauthorized");
- }
- return Status::OK();
- }
- virtual std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const {
- return parseNsFullyQualified(dbname, cmdObj);
+ ShardedConnectionInfo* const info = ShardedConnectionInfo::get(txn->getClient(), false);
+ result.appendBool("inShardedMode", info != NULL);
+ if (info) {
+ result.appendTimestamp("mine", info->getVersion(ns).toLong());
+ } else {
+ result.appendTimestamp("mine", 0);
}
- bool run(OperationContext* txn,
- const string&,
- BSONObj& cmdObj,
- int,
- string& errmsg,
- BSONObjBuilder& result) {
- const string ns = cmdObj["getShardVersion"].valuestrsafe();
- if (ns.size() == 0) {
- errmsg = "need to specify full namespace";
- return false;
- }
-
- if (shardingState.enabled()) {
- result.append("configServer", shardingState.getConfigServer());
- }
- else {
- result.append("configServer", "");
+ if (cmdObj["fullMetadata"].trueValue()) {
+ CollectionMetadataPtr metadata = shardingState.getCollectionMetadata(ns);
+ if (metadata) {
+ result.append("metadata", metadata->toBSON());
+ } else {
+ result.append("metadata", BSONObj());
}
+ }
- result.appendTimestamp("global", shardingState.getVersion(ns).toLong());
+ return true;
+ }
- ShardedConnectionInfo* const info = ShardedConnectionInfo::get(txn->getClient(), false);
- result.appendBool("inShardedMode", info != NULL);
- if (info) {
- result.appendTimestamp("mine", info->getVersion(ns).toLong());
- }
- else {
- result.appendTimestamp("mine", 0);
- }
+} getShardVersion;
- if (cmdObj["fullMetadata"].trueValue()) {
- CollectionMetadataPtr metadata = shardingState.getCollectionMetadata(ns);
- if (metadata) {
- result.append("metadata", metadata->toBSON());
- }
- else {
- result.append("metadata", BSONObj());
- }
- }
+class ShardingStateCmd : public MongodShardCommand {
+public:
+ ShardingStateCmd() : MongodShardCommand("shardingState") {}
- return true;
- }
+ virtual bool isWriteCommandForConfigServer() const {
+ return true;
+ }
- } getShardVersion;
+ virtual void addRequiredPrivileges(const std::string& dbname,
+ const BSONObj& cmdObj,
+ std::vector<Privilege>* out) {
+ ActionSet actions;
+ actions.addAction(ActionType::shardingState);
+ out->push_back(Privilege(ResourcePattern::forClusterResource(), actions));
+ }
- class ShardingStateCmd : public MongodShardCommand {
- public:
- ShardingStateCmd() : MongodShardCommand( "shardingState" ) {}
+ bool run(OperationContext* txn,
+ const string& dbname,
+ BSONObj& cmdObj,
+ int,
+ string& errmsg,
+ BSONObjBuilder& result) {
+ ScopedTransaction transaction(txn, MODE_IX);
+ Lock::DBLock dbXLock(txn->lockState(), dbname, MODE_X);
+ OldClientContext ctx(txn, dbname);
+
+ shardingState.appendInfo(result);
+ return true;
+ }
- virtual bool isWriteCommandForConfigServer() const { return true; }
+} shardingStateCmd;
- virtual void addRequiredPrivileges(const std::string& dbname,
- const BSONObj& cmdObj,
- std::vector<Privilege>* out) {
- ActionSet actions;
- actions.addAction(ActionType::shardingState);
- out->push_back(Privilege(ResourcePattern::forClusterResource(), actions));
- }
+/**
+ * @ return true if not in sharded mode
+ or if version for this client is ok
+ */
+static bool shardVersionOk(Client* client,
+ const string& ns,
+ string& errmsg,
+ ChunkVersion& received,
+ ChunkVersion& wanted) {
+ if (!shardingState.enabled())
+ return true;
- bool run(OperationContext* txn,
- const string& dbname,
- BSONObj& cmdObj,
- int,
- string& errmsg,
- BSONObjBuilder& result) {
- ScopedTransaction transaction(txn, MODE_IX);
- Lock::DBLock dbXLock(txn->lockState(), dbname, MODE_X);
- OldClientContext ctx(txn, dbname);
-
- shardingState.appendInfo( result );
- return true;
- }
+ if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesForDatabase(nsToDatabase(ns))) {
+ // right now connections to secondaries aren't versioned at all
+ return true;
+ }
- } shardingStateCmd;
+ ShardedConnectionInfo* info = ShardedConnectionInfo::get(client, false);
- /**
- * @ return true if not in sharded mode
- or if version for this client is ok
- */
- static bool shardVersionOk(Client* client,
- const string& ns,
- string& errmsg,
- ChunkVersion& received,
- ChunkVersion& wanted) {
+ if (!info) {
+ // this means the client has nothing sharded
+ // so this allows direct connections to do whatever they want
+ // which i think is the correct behavior
+ return true;
+ }
- if ( ! shardingState.enabled() )
- return true;
+ if (info->inForceVersionOkMode()) {
+ return true;
+ }
- if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesForDatabase(
- nsToDatabase(ns))) {
- // right now connections to secondaries aren't versioned at all
- return true;
- }
+ // TODO : all collections at some point, be sharded or not, will have a version
+ // (and a CollectionMetadata)
+ received = info->getVersion(ns);
- ShardedConnectionInfo* info = ShardedConnectionInfo::get( client, false );
+ if (ChunkVersion::isIgnoredVersion(received)) {
+ return true;
+ }
- if ( ! info ) {
- // this means the client has nothing sharded
- // so this allows direct connections to do whatever they want
- // which i think is the correct behavior
- return true;
- }
+ wanted = shardingState.getVersion(ns);
- if ( info->inForceVersionOkMode() ) {
- return true;
- }
+ if (received.isWriteCompatibleWith(wanted))
+ return true;
- // TODO : all collections at some point, be sharded or not, will have a version
- // (and a CollectionMetadata)
- received = info->getVersion( ns );
+ //
+ // Figure out exactly why not compatible, send appropriate error message
+ // The versions themselves are returned in the error, so not needed in messages here
+ //
- if (ChunkVersion::isIgnoredVersion(received)) {
- return true;
- }
+ // Check epoch first, to send more meaningful message, since other parameters probably
+ // won't match either
+ if (!wanted.hasEqualEpoch(received)) {
+ errmsg = str::stream() << "version epoch mismatch detected for " << ns << ", "
+ << "the collection may have been dropped and recreated";
+ return false;
+ }
- wanted = shardingState.getVersion( ns );
+ if (!wanted.isSet() && received.isSet()) {
+ errmsg = str::stream() << "this shard no longer contains chunks for " << ns << ", "
+ << "the collection may have been dropped";
+ return false;
+ }
- if( received.isWriteCompatibleWith( wanted ) ) return true;
+ if (wanted.isSet() && !received.isSet()) {
+ errmsg = str::stream() << "this shard contains versioned chunks for " << ns << ", "
+ << "but no version set in request";
+ return false;
+ }
+ if (wanted.majorVersion() != received.majorVersion()) {
//
- // Figure out exactly why not compatible, send appropriate error message
- // The versions themselves are returned in the error, so not needed in messages here
+ // Could be > or < - wanted is > if this is the source of a migration,
+ // wanted < if this is the target of a migration
//
- // Check epoch first, to send more meaningful message, since other parameters probably
- // won't match either
- if( ! wanted.hasEqualEpoch( received ) ){
- errmsg = str::stream() << "version epoch mismatch detected for " << ns << ", "
- << "the collection may have been dropped and recreated";
- return false;
- }
-
- if( ! wanted.isSet() && received.isSet() ){
- errmsg = str::stream() << "this shard no longer contains chunks for " << ns << ", "
- << "the collection may have been dropped";
- return false;
- }
-
- if( wanted.isSet() && ! received.isSet() ){
- errmsg = str::stream() << "this shard contains versioned chunks for " << ns << ", "
- << "but no version set in request";
- return false;
- }
-
- if( wanted.majorVersion() != received.majorVersion() ){
-
- //
- // Could be > or < - wanted is > if this is the source of a migration,
- // wanted < if this is the target of a migration
- //
-
- errmsg = str::stream() << "version mismatch detected for " << ns << ", "
- << "stored major version " << wanted.majorVersion()
- << " does not match received " << received.majorVersion();
- return false;
- }
-
- // Those are all the reasons the versions can mismatch
- verify( false );
-
+ errmsg = str::stream() << "version mismatch detected for " << ns << ", "
+ << "stored major version " << wanted.majorVersion()
+ << " does not match received " << received.majorVersion();
return false;
-
}
- void ensureShardVersionOKOrThrow(Client* client, const std::string& ns) {
- string errmsg;
- ChunkVersion received;
- ChunkVersion wanted;
- if (!shardVersionOk(client, ns, errmsg, received, wanted)) {
- StringBuilder sb;
- sb << "[" << ns << "] shard version not ok: " << errmsg;
- throw SendStaleConfigException(ns, sb.str(), received, wanted);
- }
- }
+ // Those are all the reasons the versions can mismatch
+ verify(false);
- void usingAShardConnection( const string& addr ) {
- }
+ return false;
+}
- void saveGLEStats(const BSONObj& result, StringData hostString) {
- // Declared in cluster_last_error_info.h.
- //
- // This can be called in mongod, which is unfortunate. To fix this,
- // we can redesign how connection pooling works on mongod for sharded operations.
+void ensureShardVersionOKOrThrow(Client* client, const std::string& ns) {
+ string errmsg;
+ ChunkVersion received;
+ ChunkVersion wanted;
+ if (!shardVersionOk(client, ns, errmsg, received, wanted)) {
+ StringBuilder sb;
+ sb << "[" << ns << "] shard version not ok: " << errmsg;
+ throw SendStaleConfigException(ns, sb.str(), received, wanted);
}
+}
+
+void usingAShardConnection(const string& addr) {}
+void saveGLEStats(const BSONObj& result, StringData hostString) {
+ // Declared in cluster_last_error_info.h.
+ //
+ // This can be called in mongod, which is unfortunate. To fix this,
+ // we can redesign how connection pooling works on mongod for sharded operations.
+}
}