summaryrefslogtreecommitdiff
path: root/src/mongo/s/chunk_manager_targeter.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/s/chunk_manager_targeter.cpp')
-rw-r--r--src/mongo/s/chunk_manager_targeter.cpp1177
1 files changed, 560 insertions, 617 deletions
diff --git a/src/mongo/s/chunk_manager_targeter.cpp b/src/mongo/s/chunk_manager_targeter.cpp
index 7ae106245e6..fce380ef8e9 100644
--- a/src/mongo/s/chunk_manager_targeter.cpp
+++ b/src/mongo/s/chunk_manager_targeter.cpp
@@ -37,759 +37,702 @@
namespace mongo {
- using std::endl;
- using std::map;
- using std::set;
- using std::string;
- using std::vector;
-
- using mongoutils::str::stream;
-
- /**
- * Helper to get the DBConfigPtr object in an exception-safe way.
- */
- static bool getDBConfigSafe( const StringData& db, DBConfigPtr& config, string* errMsg ) {
- try {
- config = grid.getDBConfig( db, true );
- if ( !config ) *errMsg = stream() << "could not load or create database " << db;
- }
- catch ( const DBException& ex ) {
- *errMsg = ex.toString();
- }
-
- return config.get();
- }
-
- ChunkManagerTargeter::ChunkManagerTargeter() :
- _needsTargetingRefresh( false ), _stats( new TargeterStats ) {
- }
-
- Status ChunkManagerTargeter::init( const NamespaceString& nss ) {
-
- _nss = nss;
-
- //
- // Get the latest metadata information from the cache
- //
+using std::endl;
+using std::map;
+using std::set;
+using std::string;
+using std::vector;
- DBConfigPtr config;
+using mongoutils::str::stream;
- string errMsg;
- if ( !getDBConfigSafe( _nss.db(), config, &errMsg ) ) {
- return Status( ErrorCodes::DatabaseNotFound, errMsg );
- }
-
- // Get either the chunk manager or primary shard
- config->getChunkManagerOrPrimary( _nss.ns(), _manager, _primary );
-
- return Status::OK();
- }
-
- const NamespaceString& ChunkManagerTargeter::getNS() const {
- return _nss;
+/**
+ * Helper to get the DBConfigPtr object in an exception-safe way.
+ */
+static bool getDBConfigSafe(const StringData& db, DBConfigPtr& config, string* errMsg) {
+ try {
+ config = grid.getDBConfig(db, true);
+ if (!config)
+ *errMsg = stream() << "could not load or create database " << db;
+ } catch (const DBException& ex) {
+ *errMsg = ex.toString();
}
- Status ChunkManagerTargeter::targetInsert( const BSONObj& doc,
- ShardEndpoint** endpoint ) const {
-
- BSONObj shardKey;
-
- if ( _manager ) {
-
- //
- // Sharded collections have the following requirements for targeting:
- //
- // Inserts must contain the exact shard key.
- //
+ return config.get();
+}
- shardKey = _manager->getShardKeyPattern().extractShardKeyFromDoc(doc);
+ChunkManagerTargeter::ChunkManagerTargeter()
+ : _needsTargetingRefresh(false), _stats(new TargeterStats) {}
- // Check shard key exists
- if (shardKey.isEmpty()) {
- return Status(ErrorCodes::ShardKeyNotFound,
- stream() << "document " << doc
- << " does not contain shard key for pattern "
- << _manager->getShardKeyPattern().toString());
- }
-
- // Check shard key size on insert
- Status status = ShardKeyPattern::checkShardKeySize(shardKey);
- if (!status.isOK())
- return status;
- }
+Status ChunkManagerTargeter::init(const NamespaceString& nss) {
+ _nss = nss;
- // Target the shard key or database primary
- if (!shardKey.isEmpty()) {
- return targetShardKey(shardKey, doc.objsize(), endpoint);
- }
- else {
+ //
+ // Get the latest metadata information from the cache
+ //
- if (!_primary) {
- return Status(ErrorCodes::NamespaceNotFound,
- str::stream() << "could not target insert in collection "
- << getNS().ns() << "; no metadata found");
- }
+ DBConfigPtr config;
- *endpoint = new ShardEndpoint(_primary->getName(), ChunkVersion::UNSHARDED());
- return Status::OK();
- }
+ string errMsg;
+ if (!getDBConfigSafe(_nss.db(), config, &errMsg)) {
+ return Status(ErrorCodes::DatabaseNotFound, errMsg);
}
- namespace {
-
- // TODO: Expose these for unit testing via dbtests
-
- enum UpdateType {
- UpdateType_Replacement, UpdateType_OpStyle, UpdateType_Unknown
- };
-
- /**
- * There are two styles of update expressions:
- * coll.update({ x : 1 }, { y : 2 }) // Replacement style
- * coll.update({ x : 1 }, { $set : { y : 2 } }) // OpStyle
- */
- UpdateType getUpdateExprType( const BSONObj& updateExpr ) {
-
- UpdateType updateType = UpdateType_Unknown;
-
- // Empty update is replacement-style, by default
- if ( updateExpr.isEmpty() ) return UpdateType_Replacement;
-
- BSONObjIterator it( updateExpr );
- while ( it.more() ) {
- BSONElement next = it.next();
-
- if ( next.fieldName()[0] == '$' ) {
- if ( updateType == UpdateType_Unknown ) {
- updateType = UpdateType_OpStyle;
- }
- else if ( updateType == UpdateType_Replacement ) {
- return UpdateType_Unknown;
- }
- }
- else {
- if ( updateType == UpdateType_Unknown ) {
- updateType = UpdateType_Replacement;
- }
- else if ( updateType == UpdateType_OpStyle ) {
- return UpdateType_Unknown;
- }
- }
- }
+ // Get either the chunk manager or primary shard
+ config->getChunkManagerOrPrimary(_nss.ns(), _manager, _primary);
- return updateType;
- }
+ return Status::OK();
+}
- /**
- * This returns "does the query have an _id field" and "is the _id field
- * querying for a direct value like _id : 3 and not _id : { $gt : 3 }"
- *
- * Ex: { _id : 1 } => true
- * { foo : <anything>, _id : 1 } => true
- * { _id : { $lt : 30 } } => false
- * { foo : <anything> } => false
- */
- bool isExactIdQuery(const BSONObj& query) {
- static const ShardKeyPattern virtualIdShardKey(BSON("_id" << 1));
- StatusWith<BSONObj> status = virtualIdShardKey.extractShardKeyFromQuery(query);
- if (!status.isOK())
- return false;
- return !status.getValue()["_id"].eoo();
- }
- }
+const NamespaceString& ChunkManagerTargeter::getNS() const {
+ return _nss;
+}
- Status ChunkManagerTargeter::targetUpdate( const BatchedUpdateDocument& updateDoc,
- vector<ShardEndpoint*>* endpoints ) const {
+Status ChunkManagerTargeter::targetInsert(const BSONObj& doc, ShardEndpoint** endpoint) const {
+ BSONObj shardKey;
+ if (_manager) {
//
- // Update targeting may use either the query or the update. This is to support save-style
- // updates, of the form:
+ // Sharded collections have the following requirements for targeting:
//
- // coll.update({ _id : xxx }, { _id : xxx, shardKey : 1, foo : bar }, { upsert : true })
- //
- // Because drivers do not know the shard key, they can't pull the shard key automatically
- // into the query doc, and to correctly support upsert we must target a single shard.
- //
- // The rule is simple - If the update is replacement style (no '$set'), we target using the
- // update. If the update is replacement style, we target using the query.
- //
- // If we have the exact shard key in either the query or replacement doc, we target using
- // that extracted key.
+ // Inserts must contain the exact shard key.
//
- BSONObj query = updateDoc.getQuery();
- BSONObj updateExpr = updateDoc.getUpdateExpr();
+ shardKey = _manager->getShardKeyPattern().extractShardKeyFromDoc(doc);
- UpdateType updateType = getUpdateExprType( updateDoc.getUpdateExpr() );
-
- if ( updateType == UpdateType_Unknown ) {
- return Status( ErrorCodes::UnsupportedFormat,
- stream() << "update document " << updateExpr
- << " has mixed $operator and non-$operator style fields" );
+ // Check shard key exists
+ if (shardKey.isEmpty()) {
+ return Status(ErrorCodes::ShardKeyNotFound,
+ stream() << "document " << doc
+ << " does not contain shard key for pattern "
+ << _manager->getShardKeyPattern().toString());
}
- BSONObj shardKey;
-
- if ( _manager ) {
-
- //
- // Sharded collections have the following futher requirements for targeting:
- //
- // Upserts must be targeted exactly by shard key.
- // Non-multi updates must be targeted exactly by shard key *or* exact _id.
- //
-
- // Get the shard key
- if (updateType == UpdateType_OpStyle) {
-
- // Target using the query
- StatusWith<BSONObj> status =
- _manager->getShardKeyPattern().extractShardKeyFromQuery(query);
+ // Check shard key size on insert
+ Status status = ShardKeyPattern::checkShardKeySize(shardKey);
+ if (!status.isOK())
+ return status;
+ }
- // Bad query
- if (!status.isOK())
- return status.getStatus();
+ // Target the shard key or database primary
+ if (!shardKey.isEmpty()) {
+ return targetShardKey(shardKey, doc.objsize(), endpoint);
+ } else {
+ if (!_primary) {
+ return Status(ErrorCodes::NamespaceNotFound,
+ str::stream() << "could not target insert in collection " << getNS().ns()
+ << "; no metadata found");
+ }
- shardKey = status.getValue();
- }
- else {
- // Target using the replacement document
- shardKey = _manager->getShardKeyPattern().extractShardKeyFromDoc(updateExpr);
- }
+ *endpoint = new ShardEndpoint(_primary->getName(), ChunkVersion::UNSHARDED());
+ return Status::OK();
+ }
+}
- //
- // Extra sharded update validation
- //
+namespace {
- if (updateDoc.getUpsert()) {
+// TODO: Expose these for unit testing via dbtests
- // Sharded upserts *always* need to be exactly targeted by shard key
- if (shardKey.isEmpty()) {
- return Status(ErrorCodes::ShardKeyNotFound,
- stream() << "upsert " << updateDoc.toBSON()
- << " does not contain shard key for pattern "
- << _manager->getShardKeyPattern().toString());
- }
+enum UpdateType { UpdateType_Replacement, UpdateType_OpStyle, UpdateType_Unknown };
- // Also check shard key size on upsert
- Status status = ShardKeyPattern::checkShardKeySize(shardKey);
- if (!status.isOK())
- return status;
+/**
+ * There are two styles of update expressions:
+ * coll.update({ x : 1 }, { y : 2 }) // Replacement style
+ * coll.update({ x : 1 }, { $set : { y : 2 } }) // OpStyle
+ */
+UpdateType getUpdateExprType(const BSONObj& updateExpr) {
+ UpdateType updateType = UpdateType_Unknown;
+
+ // Empty update is replacement-style, by default
+ if (updateExpr.isEmpty())
+ return UpdateType_Replacement;
+
+ BSONObjIterator it(updateExpr);
+ while (it.more()) {
+ BSONElement next = it.next();
+
+ if (next.fieldName()[0] == '$') {
+ if (updateType == UpdateType_Unknown) {
+ updateType = UpdateType_OpStyle;
+ } else if (updateType == UpdateType_Replacement) {
+ return UpdateType_Unknown;
}
-
- // Validate that single (non-multi) sharded updates are targeted by shard key or _id
- if (!updateDoc.getMulti() && shardKey.isEmpty()
- && !isExactIdQuery(updateDoc.getQuery())) {
- return Status(ErrorCodes::ShardKeyNotFound,
- stream() << "update " << updateDoc.toBSON()
- << " does not contain _id or shard key for pattern "
- << _manager->getShardKeyPattern().toString());
+ } else {
+ if (updateType == UpdateType_Unknown) {
+ updateType = UpdateType_Replacement;
+ } else if (updateType == UpdateType_OpStyle) {
+ return UpdateType_Unknown;
}
}
-
- // Target the shard key, query, or replacement doc
- if (!shardKey.isEmpty()) {
- // We can't rely on our query targeting to be exact
- ShardEndpoint* endpoint = NULL;
- Status result = targetShardKey(shardKey,
- (query.objsize() + updateExpr.objsize()),
- &endpoint);
- endpoints->push_back(endpoint);
- return result;
- }
- else if (updateType == UpdateType_OpStyle) {
- return targetQuery(query, endpoints);
- }
- else {
- return targetDoc(updateExpr, endpoints);
- }
}
- Status ChunkManagerTargeter::targetDelete( const BatchedDeleteDocument& deleteDoc,
- vector<ShardEndpoint*>* endpoints ) const {
+ return updateType;
+}
- BSONObj shardKey;
+/**
+ * This returns "does the query have an _id field" and "is the _id field
+ * querying for a direct value like _id : 3 and not _id : { $gt : 3 }"
+ *
+ * Ex: { _id : 1 } => true
+ * { foo : <anything>, _id : 1 } => true
+ * { _id : { $lt : 30 } } => false
+ * { foo : <anything> } => false
+ */
+bool isExactIdQuery(const BSONObj& query) {
+ static const ShardKeyPattern virtualIdShardKey(BSON("_id" << 1));
+ StatusWith<BSONObj> status = virtualIdShardKey.extractShardKeyFromQuery(query);
+ if (!status.isOK())
+ return false;
+ return !status.getValue()["_id"].eoo();
+}
+}
+
+Status ChunkManagerTargeter::targetUpdate(const BatchedUpdateDocument& updateDoc,
+ vector<ShardEndpoint*>* endpoints) const {
+ //
+ // Update targeting may use either the query or the update. This is to support save-style
+ // updates, of the form:
+ //
+ // coll.update({ _id : xxx }, { _id : xxx, shardKey : 1, foo : bar }, { upsert : true })
+ //
+ // Because drivers do not know the shard key, they can't pull the shard key automatically
+ // into the query doc, and to correctly support upsert we must target a single shard.
+ //
+ // The rule is simple - If the update is replacement style (no '$set'), we target using the
+ // update. If the update is replacement style, we target using the query.
+ //
+ // If we have the exact shard key in either the query or replacement doc, we target using
+ // that extracted key.
+ //
+
+ BSONObj query = updateDoc.getQuery();
+ BSONObj updateExpr = updateDoc.getUpdateExpr();
+
+ UpdateType updateType = getUpdateExprType(updateDoc.getUpdateExpr());
+
+ if (updateType == UpdateType_Unknown) {
+ return Status(ErrorCodes::UnsupportedFormat,
+ stream() << "update document " << updateExpr
+ << " has mixed $operator and non-$operator style fields");
+ }
- if ( _manager ) {
+ BSONObj shardKey;
- //
- // Sharded collections have the following further requirements for targeting:
- //
- // Limit-1 deletes must be targeted exactly by shard key *or* exact _id
- //
+ if (_manager) {
+ //
+ // Sharded collections have the following futher requirements for targeting:
+ //
+ // Upserts must be targeted exactly by shard key.
+ // Non-multi updates must be targeted exactly by shard key *or* exact _id.
+ //
- // Get the shard key
+ // Get the shard key
+ if (updateType == UpdateType_OpStyle) {
+ // Target using the query
StatusWith<BSONObj> status =
- _manager->getShardKeyPattern().extractShardKeyFromQuery(deleteDoc.getQuery());
+ _manager->getShardKeyPattern().extractShardKeyFromQuery(query);
// Bad query
if (!status.isOK())
return status.getStatus();
shardKey = status.getValue();
+ } else {
+ // Target using the replacement document
+ shardKey = _manager->getShardKeyPattern().extractShardKeyFromDoc(updateExpr);
+ }
- // Validate that single (limit-1) sharded deletes are targeted by shard key or _id
- if (deleteDoc.getLimit() == 1 && shardKey.isEmpty()
- && !isExactIdQuery(deleteDoc.getQuery())) {
+ //
+ // Extra sharded update validation
+ //
+
+ if (updateDoc.getUpsert()) {
+ // Sharded upserts *always* need to be exactly targeted by shard key
+ if (shardKey.isEmpty()) {
return Status(ErrorCodes::ShardKeyNotFound,
- stream() << "delete " << deleteDoc.toBSON()
- << " does not contain _id or shard key for pattern "
+ stream() << "upsert " << updateDoc.toBSON()
+ << " does not contain shard key for pattern "
<< _manager->getShardKeyPattern().toString());
}
- }
- // Target the shard key or delete query
- if (!shardKey.isEmpty()) {
- // We can't rely on our query targeting to be exact
- ShardEndpoint* endpoint = NULL;
- Status result = targetShardKey(shardKey, 0, &endpoint);
- endpoints->push_back(endpoint);
- return result;
+ // Also check shard key size on upsert
+ Status status = ShardKeyPattern::checkShardKeySize(shardKey);
+ if (!status.isOK())
+ return status;
}
- else {
- return targetQuery(deleteDoc.getQuery(), endpoints);
+
+ // Validate that single (non-multi) sharded updates are targeted by shard key or _id
+ if (!updateDoc.getMulti() && shardKey.isEmpty() && !isExactIdQuery(updateDoc.getQuery())) {
+ return Status(ErrorCodes::ShardKeyNotFound,
+ stream() << "update " << updateDoc.toBSON()
+ << " does not contain _id or shard key for pattern "
+ << _manager->getShardKeyPattern().toString());
}
}
- Status ChunkManagerTargeter::targetDoc(const BSONObj& doc,
- vector<ShardEndpoint*>* endpoints) const {
- // NOTE: This is weird and fragile, but it's the way our language works right now -
- // documents are either A) invalid or B) valid equality queries over themselves.
- return targetQuery(doc, endpoints);
+ // Target the shard key, query, or replacement doc
+ if (!shardKey.isEmpty()) {
+ // We can't rely on our query targeting to be exact
+ ShardEndpoint* endpoint = NULL;
+ Status result =
+ targetShardKey(shardKey, (query.objsize() + updateExpr.objsize()), &endpoint);
+ endpoints->push_back(endpoint);
+ return result;
+ } else if (updateType == UpdateType_OpStyle) {
+ return targetQuery(query, endpoints);
+ } else {
+ return targetDoc(updateExpr, endpoints);
}
+}
- Status ChunkManagerTargeter::targetQuery( const BSONObj& query,
- vector<ShardEndpoint*>* endpoints ) const {
+Status ChunkManagerTargeter::targetDelete(const BatchedDeleteDocument& deleteDoc,
+ vector<ShardEndpoint*>* endpoints) const {
+ BSONObj shardKey;
- if ( !_primary && !_manager ) {
- return Status( ErrorCodes::NamespaceNotFound,
- str::stream() << "could not target query in "
- << getNS().ns()
- << "; no metadata found" );
- }
+ if (_manager) {
+ //
+ // Sharded collections have the following further requirements for targeting:
+ //
+ // Limit-1 deletes must be targeted exactly by shard key *or* exact _id
+ //
- set<Shard> shards;
- if ( _manager ) {
- try {
- _manager->getShardsForQuery( shards, query );
- } catch ( const DBException& ex ) {
- return ex.toStatus();
- }
- }
- else {
- shards.insert( *_primary );
- }
+ // Get the shard key
+ StatusWith<BSONObj> status =
+ _manager->getShardKeyPattern().extractShardKeyFromQuery(deleteDoc.getQuery());
- for ( set<Shard>::iterator it = shards.begin(); it != shards.end(); ++it ) {
- endpoints->push_back(new ShardEndpoint(it->getName(),
- _manager ?
- _manager->getVersion(it->getName()) :
- ChunkVersion::UNSHARDED()));
+ // Bad query
+ if (!status.isOK())
+ return status.getStatus();
+
+ shardKey = status.getValue();
+
+ // Validate that single (limit-1) sharded deletes are targeted by shard key or _id
+ if (deleteDoc.getLimit() == 1 && shardKey.isEmpty() &&
+ !isExactIdQuery(deleteDoc.getQuery())) {
+ return Status(ErrorCodes::ShardKeyNotFound,
+ stream() << "delete " << deleteDoc.toBSON()
+ << " does not contain _id or shard key for pattern "
+ << _manager->getShardKeyPattern().toString());
}
+ }
- return Status::OK();
+ // Target the shard key or delete query
+ if (!shardKey.isEmpty()) {
+ // We can't rely on our query targeting to be exact
+ ShardEndpoint* endpoint = NULL;
+ Status result = targetShardKey(shardKey, 0, &endpoint);
+ endpoints->push_back(endpoint);
+ return result;
+ } else {
+ return targetQuery(deleteDoc.getQuery(), endpoints);
+ }
+}
+
+Status ChunkManagerTargeter::targetDoc(const BSONObj& doc,
+ vector<ShardEndpoint*>* endpoints) const {
+ // NOTE: This is weird and fragile, but it's the way our language works right now -
+ // documents are either A) invalid or B) valid equality queries over themselves.
+ return targetQuery(doc, endpoints);
+}
+
+Status ChunkManagerTargeter::targetQuery(const BSONObj& query,
+ vector<ShardEndpoint*>* endpoints) const {
+ if (!_primary && !_manager) {
+ return Status(ErrorCodes::NamespaceNotFound,
+ str::stream() << "could not target query in " << getNS().ns()
+ << "; no metadata found");
}
- Status ChunkManagerTargeter::targetShardKey(const BSONObj& shardKey,
- long long estDataSize,
- ShardEndpoint** endpoint) const {
- invariant(NULL != _manager);
+ set<Shard> shards;
+ if (_manager) {
+ try {
+ _manager->getShardsForQuery(shards, query);
+ } catch (const DBException& ex) {
+ return ex.toStatus();
+ }
+ } else {
+ shards.insert(*_primary);
+ }
- ChunkPtr chunk = _manager->findIntersectingChunk(shardKey);
+ for (set<Shard>::iterator it = shards.begin(); it != shards.end(); ++it) {
+ endpoints->push_back(new ShardEndpoint(it->getName(),
+ _manager ? _manager->getVersion(it->getName())
+ : ChunkVersion::UNSHARDED()));
+ }
- // Track autosplit stats for sharded collections
- // Note: this is only best effort accounting and is not accurate.
- if (estDataSize > 0)
- _stats->chunkSizeDelta[chunk->getMin()] += estDataSize;
+ return Status::OK();
+}
- Shard shard = chunk->getShard();
- *endpoint = new ShardEndpoint(shard.getName(),
- _manager->getVersion(shard.getName()));
+Status ChunkManagerTargeter::targetShardKey(const BSONObj& shardKey,
+ long long estDataSize,
+ ShardEndpoint** endpoint) const {
+ invariant(NULL != _manager);
- return Status::OK();
- }
+ ChunkPtr chunk = _manager->findIntersectingChunk(shardKey);
- Status ChunkManagerTargeter::targetCollection( vector<ShardEndpoint*>* endpoints ) const {
+ // Track autosplit stats for sharded collections
+ // Note: this is only best effort accounting and is not accurate.
+ if (estDataSize > 0)
+ _stats->chunkSizeDelta[chunk->getMin()] += estDataSize;
- if ( !_primary && !_manager ) {
- return Status( ErrorCodes::NamespaceNotFound,
- str::stream() << "could not target full range of "
- << getNS().ns()
- << "; metadata not found" );
- }
+ Shard shard = chunk->getShard();
+ *endpoint = new ShardEndpoint(shard.getName(), _manager->getVersion(shard.getName()));
- set<Shard> shards;
- if ( _manager ) {
- _manager->getAllShards( shards );
- }
- else {
- shards.insert( *_primary );
- }
+ return Status::OK();
+}
- for ( set<Shard>::iterator it = shards.begin(); it != shards.end(); ++it ) {
- endpoints->push_back(new ShardEndpoint(it->getName(),
- _manager ?
- _manager->getVersion(it->getName()) :
- ChunkVersion::UNSHARDED()));
- }
+Status ChunkManagerTargeter::targetCollection(vector<ShardEndpoint*>* endpoints) const {
+ if (!_primary && !_manager) {
+ return Status(ErrorCodes::NamespaceNotFound,
+ str::stream() << "could not target full range of " << getNS().ns()
+ << "; metadata not found");
+ }
- return Status::OK();
+ set<Shard> shards;
+ if (_manager) {
+ _manager->getAllShards(shards);
+ } else {
+ shards.insert(*_primary);
}
- Status ChunkManagerTargeter::targetAllShards( vector<ShardEndpoint*>* endpoints ) const {
+ for (set<Shard>::iterator it = shards.begin(); it != shards.end(); ++it) {
+ endpoints->push_back(new ShardEndpoint(it->getName(),
+ _manager ? _manager->getVersion(it->getName())
+ : ChunkVersion::UNSHARDED()));
+ }
- if ( !_primary && !_manager ) {
- return Status( ErrorCodes::NamespaceNotFound,
- str::stream() << "could not target every shard with versions for "
- << getNS().ns()
- << "; metadata not found" );
- }
+ return Status::OK();
+}
- vector<Shard> shards;
- Shard::getAllShards( shards );
+Status ChunkManagerTargeter::targetAllShards(vector<ShardEndpoint*>* endpoints) const {
+ if (!_primary && !_manager) {
+ return Status(ErrorCodes::NamespaceNotFound,
+ str::stream() << "could not target every shard with versions for "
+ << getNS().ns() << "; metadata not found");
+ }
- for ( vector<Shard>::iterator it = shards.begin(); it != shards.end(); ++it ) {
- endpoints->push_back(new ShardEndpoint(it->getName(),
- _manager ?
- _manager->getVersion(it->getName()) :
- ChunkVersion::UNSHARDED()));
- }
+ vector<Shard> shards;
+ Shard::getAllShards(shards);
- return Status::OK();
+ for (vector<Shard>::iterator it = shards.begin(); it != shards.end(); ++it) {
+ endpoints->push_back(new ShardEndpoint(it->getName(),
+ _manager ? _manager->getVersion(it->getName())
+ : ChunkVersion::UNSHARDED()));
}
- namespace {
-
- //
- // Utilities to compare shard versions
- //
+ return Status::OK();
+}
- enum CompareResult {
- CompareResult_Unknown, CompareResult_GTE, CompareResult_LT
- };
-
- /**
- * Returns the relationship of two shard versions. Shard versions of a collection that has
- * not been dropped and recreated and where there is at least one chunk on a shard are
- * comparable, otherwise the result is ambiguous.
- */
- CompareResult compareShardVersions( const ChunkVersion& shardVersionA,
- const ChunkVersion& shardVersionB ) {
-
- // Collection may have been dropped
- if ( !shardVersionA.hasEqualEpoch( shardVersionB ) ) return CompareResult_Unknown;
-
- // Zero shard versions are only comparable to themselves
- if ( !shardVersionA.isSet() || !shardVersionB.isSet() ) {
- // If both are zero...
- if ( !shardVersionA.isSet() && !shardVersionB.isSet() ) return CompareResult_GTE;
- // Otherwise...
- return CompareResult_Unknown;
- }
+namespace {
- if ( shardVersionA < shardVersionB ) return CompareResult_LT;
- else return CompareResult_GTE;
- }
+//
+// Utilities to compare shard versions
+//
- ChunkVersion getShardVersion( const StringData& shardName,
- const ChunkManagerPtr& manager,
- const ShardPtr& primary ) {
+enum CompareResult { CompareResult_Unknown, CompareResult_GTE, CompareResult_LT };
- dassert( !( manager && primary ) );
- dassert( manager || primary );
+/**
+ * Returns the relationship of two shard versions. Shard versions of a collection that has
+ * not been dropped and recreated and where there is at least one chunk on a shard are
+ * comparable, otherwise the result is ambiguous.
+ */
+CompareResult compareShardVersions(const ChunkVersion& shardVersionA,
+ const ChunkVersion& shardVersionB) {
+ // Collection may have been dropped
+ if (!shardVersionA.hasEqualEpoch(shardVersionB))
+ return CompareResult_Unknown;
+
+ // Zero shard versions are only comparable to themselves
+ if (!shardVersionA.isSet() || !shardVersionB.isSet()) {
+ // If both are zero...
+ if (!shardVersionA.isSet() && !shardVersionB.isSet())
+ return CompareResult_GTE;
+ // Otherwise...
+ return CompareResult_Unknown;
+ }
- if ( primary ) return ChunkVersion::UNSHARDED();
+ if (shardVersionA < shardVersionB)
+ return CompareResult_LT;
+ else
+ return CompareResult_GTE;
+}
- return manager->getVersion(shardName.toString());
- }
+ChunkVersion getShardVersion(const StringData& shardName,
+ const ChunkManagerPtr& manager,
+ const ShardPtr& primary) {
+ dassert(!(manager && primary));
+ dassert(manager || primary);
- /**
- * Returns the relationship between two maps of shard versions. As above, these maps are
- * often comparable when the collection has not been dropped and there is at least one
- * chunk on the shards.
- *
- * If any versions in the maps are not comparable, the result is _Unknown.
- *
- * If any versions in the first map (cached) are _LT the versions in the second map
- * (remote), the first (cached) versions are _LT the second (remote) versions.
- *
- * Note that the signature here is weird since our cached map of chunk versions is
- * stored in a ChunkManager or is implicit in the primary shard of the collection.
- */
- CompareResult //
- compareAllShardVersions( const ChunkManagerPtr& cachedShardVersions,
- const ShardPtr& cachedPrimary,
- const map<string, ChunkVersion>& remoteShardVersions ) {
-
- CompareResult finalResult = CompareResult_GTE;
-
- for ( map<string, ChunkVersion>::const_iterator it = remoteShardVersions.begin();
- it != remoteShardVersions.end(); ++it ) {
-
- //
- // Get the remote and cached version for the next shard
- //
-
- const string& shardName = it->first;
- const ChunkVersion& remoteShardVersion = it->second;
- ChunkVersion cachedShardVersion;
-
- try {
- // Throws b/c shard constructor throws
- cachedShardVersion = getShardVersion( shardName,
- cachedShardVersions,
- cachedPrimary );
- }
- catch ( const DBException& ex ) {
-
- warning() << "could not lookup shard " << shardName
- << " in local cache, shard metadata may have changed"
- << " or be unavailable" << causedBy( ex ) << endl;
-
- return CompareResult_Unknown;
- }
-
- //
- // Compare the remote and cached versions
- //
-
- CompareResult result = compareShardVersions( cachedShardVersion,
- remoteShardVersion );
-
- if ( result == CompareResult_Unknown ) return result;
- if ( result == CompareResult_LT ) finalResult = CompareResult_LT;
-
- // Note that we keep going after _LT b/c there could be more _Unknowns.
- }
+ if (primary)
+ return ChunkVersion::UNSHARDED();
- return finalResult;
- }
+ return manager->getVersion(shardName.toString());
+}
- /**
- * Whether or not the manager/primary pair is different from the other manager/primary pair
- */
- bool isMetadataDifferent( const ChunkManagerPtr& managerA,
- const ShardPtr& primaryA,
- const ChunkManagerPtr& managerB,
- const ShardPtr& primaryB ) {
+/**
+ * Returns the relationship between two maps of shard versions. As above, these maps are
+ * often comparable when the collection has not been dropped and there is at least one
+ * chunk on the shards.
+ *
+ * If any versions in the maps are not comparable, the result is _Unknown.
+ *
+ * If any versions in the first map (cached) are _LT the versions in the second map
+ * (remote), the first (cached) versions are _LT the second (remote) versions.
+ *
+ * Note that the signature here is weird since our cached map of chunk versions is
+ * stored in a ChunkManager or is implicit in the primary shard of the collection.
+ */
+CompareResult //
+ compareAllShardVersions(const ChunkManagerPtr& cachedShardVersions,
+ const ShardPtr& cachedPrimary,
+ const map<string, ChunkVersion>& remoteShardVersions) {
+ CompareResult finalResult = CompareResult_GTE;
+
+ for (map<string, ChunkVersion>::const_iterator it = remoteShardVersions.begin();
+ it != remoteShardVersions.end();
+ ++it) {
+ //
+ // Get the remote and cached version for the next shard
+ //
- if ( ( managerA && !managerB ) || ( !managerA && managerB ) || ( primaryA && !primaryB )
- || ( !primaryA && primaryB ) ) return true;
+ const string& shardName = it->first;
+ const ChunkVersion& remoteShardVersion = it->second;
+ ChunkVersion cachedShardVersion;
- if ( managerA ) {
- return !managerA->getVersion().isStrictlyEqualTo( managerB->getVersion() );
- }
+ try {
+ // Throws b/c shard constructor throws
+ cachedShardVersion = getShardVersion(shardName, cachedShardVersions, cachedPrimary);
+ } catch (const DBException& ex) {
+ warning() << "could not lookup shard " << shardName
+ << " in local cache, shard metadata may have changed"
+ << " or be unavailable" << causedBy(ex) << endl;
- dassert( NULL != primaryA.get() );
- return primaryA->getName() != primaryB->getName();
+ return CompareResult_Unknown;
}
- /**
- * Whether or not the manager/primary pair was changed or refreshed from a previous version
- * of the metadata.
- */
- bool wasMetadataRefreshed( const ChunkManagerPtr& managerA,
- const ShardPtr& primaryA,
- const ChunkManagerPtr& managerB,
- const ShardPtr& primaryB ) {
-
- if ( isMetadataDifferent( managerA, primaryA, managerB, primaryB ) )
- return true;
-
- if ( managerA ) {
- dassert( managerB.get() ); // otherwise metadata would be different
- return managerA->getSequenceNumber() != managerB->getSequenceNumber();
- }
+ //
+ // Compare the remote and cached versions
+ //
- return false;
- }
+ CompareResult result = compareShardVersions(cachedShardVersion, remoteShardVersion);
+
+ if (result == CompareResult_Unknown)
+ return result;
+ if (result == CompareResult_LT)
+ finalResult = CompareResult_LT;
+
+ // Note that we keep going after _LT b/c there could be more _Unknowns.
}
- void ChunkManagerTargeter::noteStaleResponse( const ShardEndpoint& endpoint,
- const BSONObj& staleInfo ) {
- dassert( !_needsTargetingRefresh );
+ return finalResult;
+}
- ChunkVersion remoteShardVersion;
- if ( staleInfo["vWanted"].eoo() ) {
- // If we don't have a vWanted sent, assume the version is higher than our current
- // version.
- remoteShardVersion = getShardVersion( endpoint.shardName, _manager, _primary );
- remoteShardVersion.incMajor();
- }
- else {
- remoteShardVersion = ChunkVersion::fromBSON( staleInfo, "vWanted" );
- }
+/**
+ * Whether or not the manager/primary pair is different from the other manager/primary pair
+ */
+bool isMetadataDifferent(const ChunkManagerPtr& managerA,
+ const ShardPtr& primaryA,
+ const ChunkManagerPtr& managerB,
+ const ShardPtr& primaryB) {
+ if ((managerA && !managerB) || (!managerA && managerB) || (primaryA && !primaryB) ||
+ (!primaryA && primaryB))
+ return true;
+
+ if (managerA) {
+ return !managerA->getVersion().isStrictlyEqualTo(managerB->getVersion());
+ }
- ShardVersionMap::iterator it = _remoteShardVersions.find( endpoint.shardName );
- if ( it == _remoteShardVersions.end() ) {
- _remoteShardVersions.insert( make_pair( endpoint.shardName, remoteShardVersion ) );
- }
- else {
- ChunkVersion& previouslyNotedVersion = it->second;
- if ( previouslyNotedVersion.hasEqualEpoch( remoteShardVersion )) {
- if ( previouslyNotedVersion.isOlderThan( remoteShardVersion )) {
- remoteShardVersion.cloneTo( &previouslyNotedVersion );
- }
- }
- else {
- // Epoch changed midway while applying the batch so set the version to
- // something unique and non-existent to force a reload when
- // refreshIsNeeded is called.
- ChunkVersion::IGNORED().cloneTo( &previouslyNotedVersion );
- }
- }
+ dassert(NULL != primaryA.get());
+ return primaryA->getName() != primaryB->getName();
+}
+
+/**
+ * Whether or not the manager/primary pair was changed or refreshed from a previous version
+ * of the metadata.
+ */
+bool wasMetadataRefreshed(const ChunkManagerPtr& managerA,
+ const ShardPtr& primaryA,
+ const ChunkManagerPtr& managerB,
+ const ShardPtr& primaryB) {
+ if (isMetadataDifferent(managerA, primaryA, managerB, primaryB))
+ return true;
+
+ if (managerA) {
+ dassert(managerB.get()); // otherwise metadata would be different
+ return managerA->getSequenceNumber() != managerB->getSequenceNumber();
}
- void ChunkManagerTargeter::noteCouldNotTarget() {
- dassert( _remoteShardVersions.empty() );
- _needsTargetingRefresh = true;
+ return false;
+}
+}
+
+void ChunkManagerTargeter::noteStaleResponse(const ShardEndpoint& endpoint,
+ const BSONObj& staleInfo) {
+ dassert(!_needsTargetingRefresh);
+
+ ChunkVersion remoteShardVersion;
+ if (staleInfo["vWanted"].eoo()) {
+ // If we don't have a vWanted sent, assume the version is higher than our current
+ // version.
+ remoteShardVersion = getShardVersion(endpoint.shardName, _manager, _primary);
+ remoteShardVersion.incMajor();
+ } else {
+ remoteShardVersion = ChunkVersion::fromBSON(staleInfo, "vWanted");
}
- const TargeterStats* ChunkManagerTargeter::getStats() const {
- return _stats.get();
+ ShardVersionMap::iterator it = _remoteShardVersions.find(endpoint.shardName);
+ if (it == _remoteShardVersions.end()) {
+ _remoteShardVersions.insert(make_pair(endpoint.shardName, remoteShardVersion));
+ } else {
+ ChunkVersion& previouslyNotedVersion = it->second;
+ if (previouslyNotedVersion.hasEqualEpoch(remoteShardVersion)) {
+ if (previouslyNotedVersion.isOlderThan(remoteShardVersion)) {
+ remoteShardVersion.cloneTo(&previouslyNotedVersion);
+ }
+ } else {
+ // Epoch changed midway while applying the batch so set the version to
+ // something unique and non-existent to force a reload when
+ // refreshIsNeeded is called.
+ ChunkVersion::IGNORED().cloneTo(&previouslyNotedVersion);
+ }
}
+}
- Status ChunkManagerTargeter::refreshIfNeeded( bool *wasChanged ) {
+void ChunkManagerTargeter::noteCouldNotTarget() {
+ dassert(_remoteShardVersions.empty());
+ _needsTargetingRefresh = true;
+}
- bool dummy;
- if ( !wasChanged )
- wasChanged = &dummy;
+const TargeterStats* ChunkManagerTargeter::getStats() const {
+ return _stats.get();
+}
- *wasChanged = false;
+Status ChunkManagerTargeter::refreshIfNeeded(bool* wasChanged) {
+ bool dummy;
+ if (!wasChanged)
+ wasChanged = &dummy;
- //
- // Did we have any stale config or targeting errors at all?
- //
+ *wasChanged = false;
- if ( !_needsTargetingRefresh && _remoteShardVersions.empty() ) return Status::OK();
+ //
+ // Did we have any stale config or targeting errors at all?
+ //
- //
- // Get the latest metadata information from the cache if there were issues
- //
+ if (!_needsTargetingRefresh && _remoteShardVersions.empty())
+ return Status::OK();
- ChunkManagerPtr lastManager = _manager;
- ShardPtr lastPrimary = _primary;
+ //
+ // Get the latest metadata information from the cache if there were issues
+ //
- DBConfigPtr config;
+ ChunkManagerPtr lastManager = _manager;
+ ShardPtr lastPrimary = _primary;
- string errMsg;
- if ( !getDBConfigSafe( _nss.db(), config, &errMsg ) ) {
- return Status( ErrorCodes::DatabaseNotFound, errMsg );
- }
+ DBConfigPtr config;
- // Get either the chunk manager or primary shard
- config->getChunkManagerOrPrimary( _nss.ns(), _manager, _primary );
-
- // We now have the latest metadata from the cache.
+ string errMsg;
+ if (!getDBConfigSafe(_nss.db(), config, &errMsg)) {
+ return Status(ErrorCodes::DatabaseNotFound, errMsg);
+ }
- //
- // See if and how we need to do a remote refresh.
- // Either we couldn't target at all, or we have stale versions, but not both.
- //
+ // Get either the chunk manager or primary shard
+ config->getChunkManagerOrPrimary(_nss.ns(), _manager, _primary);
- dassert( !( _needsTargetingRefresh && !_remoteShardVersions.empty() ) );
+ // We now have the latest metadata from the cache.
- if ( _needsTargetingRefresh ) {
+ //
+ // See if and how we need to do a remote refresh.
+ // Either we couldn't target at all, or we have stale versions, but not both.
+ //
- // Reset the field
- _needsTargetingRefresh = false;
+ dassert(!(_needsTargetingRefresh && !_remoteShardVersions.empty()));
- // If we couldn't target, we might need to refresh if we haven't remotely refreshed the
- // metadata since we last got it from the cache.
+ if (_needsTargetingRefresh) {
+ // Reset the field
+ _needsTargetingRefresh = false;
- bool alreadyRefreshed = wasMetadataRefreshed( lastManager,
- lastPrimary,
- _manager,
- _primary );
+ // If we couldn't target, we might need to refresh if we haven't remotely refreshed the
+ // metadata since we last got it from the cache.
- // If didn't already refresh the targeting information, refresh it
- if ( !alreadyRefreshed ) {
- // To match previous behavior, we just need an incremental refresh here
- return refreshNow( RefreshType_RefreshChunkManager );
- }
+ bool alreadyRefreshed = wasMetadataRefreshed(lastManager, lastPrimary, _manager, _primary);
- *wasChanged = isMetadataDifferent( lastManager, lastPrimary, _manager, _primary );
- return Status::OK();
+ // If didn't already refresh the targeting information, refresh it
+ if (!alreadyRefreshed) {
+ // To match previous behavior, we just need an incremental refresh here
+ return refreshNow(RefreshType_RefreshChunkManager);
}
- else if ( !_remoteShardVersions.empty() ) {
-
- // If we got stale shard versions from remote shards, we may need to refresh
- // NOTE: Not sure yet if this can happen simultaneously with targeting issues
- CompareResult result = compareAllShardVersions( _manager,
- _primary,
- _remoteShardVersions );
- // Reset the versions
- _remoteShardVersions.clear();
+ *wasChanged = isMetadataDifferent(lastManager, lastPrimary, _manager, _primary);
+ return Status::OK();
+ } else if (!_remoteShardVersions.empty()) {
+ // If we got stale shard versions from remote shards, we may need to refresh
+ // NOTE: Not sure yet if this can happen simultaneously with targeting issues
- if ( result == CompareResult_Unknown ) {
- // Our current shard versions aren't all comparable to the old versions, maybe drop
- return refreshNow( RefreshType_ReloadDatabase );
- }
- else if ( result == CompareResult_LT ) {
- // Our current shard versions are less than the remote versions, but no drop
- return refreshNow( RefreshType_RefreshChunkManager );
- }
+ CompareResult result = compareAllShardVersions(_manager, _primary, _remoteShardVersions);
+ // Reset the versions
+ _remoteShardVersions.clear();
- *wasChanged = isMetadataDifferent( lastManager, lastPrimary, _manager, _primary );
- return Status::OK();
+ if (result == CompareResult_Unknown) {
+ // Our current shard versions aren't all comparable to the old versions, maybe drop
+ return refreshNow(RefreshType_ReloadDatabase);
+ } else if (result == CompareResult_LT) {
+ // Our current shard versions are less than the remote versions, but no drop
+ return refreshNow(RefreshType_RefreshChunkManager);
}
- // unreachable
- dassert( false );
+ *wasChanged = isMetadataDifferent(lastManager, lastPrimary, _manager, _primary);
return Status::OK();
}
- // To match legacy reload behavior, we have to backoff on config reload per-thread
- // TODO: Centralize this behavior better by refactoring config reload in mongos
- static const int maxWaitMillis = 500;
- static boost::thread_specific_ptr<Backoff> perThreadBackoff;
-
- static void refreshBackoff() {
- if ( !perThreadBackoff.get() )
- perThreadBackoff.reset( new Backoff( maxWaitMillis, maxWaitMillis * 2 ) );
- perThreadBackoff.get()->nextSleepMillis();
- }
+ // unreachable
+ dassert(false);
+ return Status::OK();
+}
- Status ChunkManagerTargeter::refreshNow( RefreshType refreshType ) {
+// To match legacy reload behavior, we have to backoff on config reload per-thread
+// TODO: Centralize this behavior better by refactoring config reload in mongos
+static const int maxWaitMillis = 500;
+static boost::thread_specific_ptr<Backoff> perThreadBackoff;
- DBConfigPtr config;
+static void refreshBackoff() {
+ if (!perThreadBackoff.get())
+ perThreadBackoff.reset(new Backoff(maxWaitMillis, maxWaitMillis * 2));
+ perThreadBackoff.get()->nextSleepMillis();
+}
- string errMsg;
- if ( !getDBConfigSafe( _nss.db(), config, &errMsg ) ) {
- return Status( ErrorCodes::DatabaseNotFound, errMsg );
- }
+Status ChunkManagerTargeter::refreshNow(RefreshType refreshType) {
+ DBConfigPtr config;
- // Try not to spam the configs
- refreshBackoff();
+ string errMsg;
+ if (!getDBConfigSafe(_nss.db(), config, &errMsg)) {
+ return Status(ErrorCodes::DatabaseNotFound, errMsg);
+ }
- // TODO: Improve synchronization and make more explicit
- if ( refreshType == RefreshType_RefreshChunkManager ) {
- try {
- // Forces a remote check of the collection info, synchronization between threads
- // happens internally.
- config->getChunkManagerIfExists( _nss.ns(), true );
- }
- catch ( const DBException& ex ) {
- return Status( ErrorCodes::UnknownError, ex.toString() );
- }
- config->getChunkManagerOrPrimary( _nss.ns(), _manager, _primary );
- }
- else if ( refreshType == RefreshType_ReloadDatabase ) {
- try {
- // Dumps the db info, reloads it all, synchronization between threads happens
- // internally.
- config->reload();
- config->getChunkManagerIfExists( _nss.ns(), true, true );
- }
- catch ( const DBException& ex ) {
- return Status( ErrorCodes::UnknownError, ex.toString() );
- }
- config->getChunkManagerOrPrimary( _nss.ns(), _manager, _primary );
- }
+ // Try not to spam the configs
+ refreshBackoff();
- return Status::OK();
+ // TODO: Improve synchronization and make more explicit
+ if (refreshType == RefreshType_RefreshChunkManager) {
+ try {
+ // Forces a remote check of the collection info, synchronization between threads
+ // happens internally.
+ config->getChunkManagerIfExists(_nss.ns(), true);
+ } catch (const DBException& ex) {
+ return Status(ErrorCodes::UnknownError, ex.toString());
+ }
+ config->getChunkManagerOrPrimary(_nss.ns(), _manager, _primary);
+ } else if (refreshType == RefreshType_ReloadDatabase) {
+ try {
+ // Dumps the db info, reloads it all, synchronization between threads happens
+ // internally.
+ config->reload();
+ config->getChunkManagerIfExists(_nss.ns(), true, true);
+ } catch (const DBException& ex) {
+ return Status(ErrorCodes::UnknownError, ex.toString());
+ }
+ config->getChunkManagerOrPrimary(_nss.ns(), _manager, _primary);
}
-} // namespace mongo
+ return Status::OK();
+}
+} // namespace mongo