summaryrefslogtreecommitdiff
path: root/src/mongo/s/chunk_manager_targeter.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/s/chunk_manager_targeter.cpp')
-rw-r--r--src/mongo/s/chunk_manager_targeter.cpp1106
1 files changed, 523 insertions, 583 deletions
diff --git a/src/mongo/s/chunk_manager_targeter.cpp b/src/mongo/s/chunk_manager_targeter.cpp
index bac563fc027..446d4a5fafd 100644
--- a/src/mongo/s/chunk_manager_targeter.cpp
+++ b/src/mongo/s/chunk_manager_targeter.cpp
@@ -43,738 +43,678 @@
namespace mongo {
- using std::shared_ptr;
- using mongoutils::str::stream;
- using std::map;
- using std::set;
- using std::string;
- using std::vector;
+using std::shared_ptr;
+using mongoutils::str::stream;
+using std::map;
+using std::set;
+using std::string;
+using std::vector;
namespace {
- enum UpdateType {
- UpdateType_Replacement, UpdateType_OpStyle, UpdateType_Unknown
- };
-
- enum CompareResult {
- CompareResult_Unknown, CompareResult_GTE, CompareResult_LT
- };
-
- const ShardKeyPattern virtualIdShardKey(BSON("_id" << 1));
-
- // To match legacy reload behavior, we have to backoff on config reload per-thread
- // TODO: Centralize this behavior better by refactoring config reload in mongos
- boost::thread_specific_ptr<Backoff> perThreadBackoff;
- const int maxWaitMillis = 500;
-
- /**
- * There are two styles of update expressions:
- *
- * Replacement style: coll.update({ x : 1 }, { y : 2 })
- * OpStyle: coll.update({ x : 1 }, { $set : { y : 2 } })
- */
- UpdateType getUpdateExprType(const BSONObj& updateExpr) {
- // Empty update is replacement-style, by default
- if (updateExpr.isEmpty()) {
- return UpdateType_Replacement;
- }
+enum UpdateType { UpdateType_Replacement, UpdateType_OpStyle, UpdateType_Unknown };
- UpdateType updateType = UpdateType_Unknown;
+enum CompareResult { CompareResult_Unknown, CompareResult_GTE, CompareResult_LT };
- BSONObjIterator it(updateExpr);
- while (it.more()) {
- BSONElement next = it.next();
+const ShardKeyPattern virtualIdShardKey(BSON("_id" << 1));
- if (next.fieldName()[0] == '$') {
- if (updateType == UpdateType_Unknown) {
- updateType = UpdateType_OpStyle;
- }
- else if (updateType == UpdateType_Replacement) {
- return UpdateType_Unknown;
- }
- }
- else {
- if (updateType == UpdateType_Unknown) {
- updateType = UpdateType_Replacement;
- }
- else if (updateType == UpdateType_OpStyle) {
- return UpdateType_Unknown;
- }
- }
- }
+// To match legacy reload behavior, we have to backoff on config reload per-thread
+// TODO: Centralize this behavior better by refactoring config reload in mongos
+boost::thread_specific_ptr<Backoff> perThreadBackoff;
+const int maxWaitMillis = 500;
- return updateType;
- }
-
- /**
- * This returns "does the query have an _id field" and "is the _id field querying for a direct
- * value like _id : 3 and not _id : { $gt : 3 }"
- *
- * Ex: { _id : 1 } => true
- * { foo : <anything>, _id : 1 } => true
- * { _id : { $lt : 30 } } => false
- * { foo : <anything> } => false
- */
- bool isExactIdQuery(const BSONObj& query) {
- StatusWith<BSONObj> status = virtualIdShardKey.extractShardKeyFromQuery(query);
- if (!status.isOK()) {
- return false;
- }
-
- return !status.getValue()["_id"].eoo();
+/**
+ * There are two styles of update expressions:
+ *
+ * Replacement style: coll.update({ x : 1 }, { y : 2 })
+ * OpStyle: coll.update({ x : 1 }, { $set : { y : 2 } })
+ */
+UpdateType getUpdateExprType(const BSONObj& updateExpr) {
+ // Empty update is replacement-style, by default
+ if (updateExpr.isEmpty()) {
+ return UpdateType_Replacement;
}
- void refreshBackoff() {
- if (!perThreadBackoff.get()) {
- perThreadBackoff.reset(new Backoff(maxWaitMillis, maxWaitMillis * 2));
- }
-
- perThreadBackoff.get()->nextSleepMillis();
- }
+ UpdateType updateType = UpdateType_Unknown;
+ BSONObjIterator it(updateExpr);
+ while (it.more()) {
+ BSONElement next = it.next();
- //
- // Utilities to compare shard versions
- //
-
- /**
- * Returns the relationship of two shard versions. Shard versions of a collection that has not
- * been dropped and recreated and where there is at least one chunk on a shard are comparable,
- * otherwise the result is ambiguous.
- */
- CompareResult compareShardVersions(const ChunkVersion& shardVersionA,
- const ChunkVersion& shardVersionB) {
-
- // Collection may have been dropped
- if (!shardVersionA.hasEqualEpoch(shardVersionB)) {
- return CompareResult_Unknown;
- }
-
- // Zero shard versions are only comparable to themselves
- if (!shardVersionA.isSet() || !shardVersionB.isSet()) {
- // If both are zero...
- if (!shardVersionA.isSet() && !shardVersionB.isSet()) {
- return CompareResult_GTE;
+ if (next.fieldName()[0] == '$') {
+ if (updateType == UpdateType_Unknown) {
+ updateType = UpdateType_OpStyle;
+ } else if (updateType == UpdateType_Replacement) {
+ return UpdateType_Unknown;
+ }
+ } else {
+ if (updateType == UpdateType_Unknown) {
+ updateType = UpdateType_Replacement;
+ } else if (updateType == UpdateType_OpStyle) {
+ return UpdateType_Unknown;
}
-
- return CompareResult_Unknown;
}
+ }
- if (shardVersionA < shardVersionB) {
- return CompareResult_LT;
- }
+ return updateType;
+}
- else return CompareResult_GTE;
+/**
+ * This returns "does the query have an _id field" and "is the _id field querying for a direct
+ * value like _id : 3 and not _id : { $gt : 3 }"
+ *
+ * Ex: { _id : 1 } => true
+ * { foo : <anything>, _id : 1 } => true
+ * { _id : { $lt : 30 } } => false
+ * { foo : <anything> } => false
+ */
+bool isExactIdQuery(const BSONObj& query) {
+ StatusWith<BSONObj> status = virtualIdShardKey.extractShardKeyFromQuery(query);
+ if (!status.isOK()) {
+ return false;
}
- ChunkVersion getShardVersion(StringData shardName,
- const ChunkManager* manager,
- const Shard* primary) {
+ return !status.getValue()["_id"].eoo();
+}
- dassert(!(manager && primary));
- dassert(manager || primary);
+void refreshBackoff() {
+ if (!perThreadBackoff.get()) {
+ perThreadBackoff.reset(new Backoff(maxWaitMillis, maxWaitMillis * 2));
+ }
- if (primary) {
- return ChunkVersion::UNSHARDED();
- }
+ perThreadBackoff.get()->nextSleepMillis();
+}
- return manager->getVersion(shardName.toString());
- }
-
- /**
- * Returns the relationship between two maps of shard versions. As above, these maps are often
- * comparable when the collection has not been dropped and there is at least one chunk on the
- * shards. If any versions in the maps are not comparable, the result is _Unknown.
- *
- * If any versions in the first map (cached) are _LT the versions in the second map (remote),
- * the first (cached) versions are _LT the second (remote) versions.
- *
- * Note that the signature here is weird since our cached map of chunk versions is stored in a
- * ChunkManager or is implicit in the primary shard of the collection.
- */
- CompareResult compareAllShardVersions(const ChunkManager* cachedChunkManager,
- const Shard* cachedPrimary,
- const map<string, ChunkVersion>& remoteShardVersions) {
-
- CompareResult finalResult = CompareResult_GTE;
-
- for (map<string, ChunkVersion>::const_iterator it = remoteShardVersions.begin();
- it != remoteShardVersions.end();
- ++it) {
-
- // Get the remote and cached version for the next shard
- const string& shardName = it->first;
- const ChunkVersion& remoteShardVersion = it->second;
-
- ChunkVersion cachedShardVersion;
-
- try {
- // Throws b/c shard constructor throws
- cachedShardVersion = getShardVersion(shardName,
- cachedChunkManager,
- cachedPrimary);
- }
- catch (const DBException& ex) {
- warning() << "could not lookup shard " << shardName
- << " in local cache, shard metadata may have changed"
- << " or be unavailable" << causedBy(ex);
- return CompareResult_Unknown;
- }
+//
+// Utilities to compare shard versions
+//
- // Compare the remote and cached versions
- CompareResult result = compareShardVersions(cachedShardVersion, remoteShardVersion);
-
- if (result == CompareResult_Unknown) return result;
- if (result == CompareResult_LT) finalResult = CompareResult_LT;
+/**
+ * Returns the relationship of two shard versions. Shard versions of a collection that has not
+ * been dropped and recreated and where there is at least one chunk on a shard are comparable,
+ * otherwise the result is ambiguous.
+ */
+CompareResult compareShardVersions(const ChunkVersion& shardVersionA,
+ const ChunkVersion& shardVersionB) {
+ // Collection may have been dropped
+ if (!shardVersionA.hasEqualEpoch(shardVersionB)) {
+ return CompareResult_Unknown;
+ }
- // Note that we keep going after _LT b/c there could be more _Unknowns.
+ // Zero shard versions are only comparable to themselves
+ if (!shardVersionA.isSet() || !shardVersionB.isSet()) {
+ // If both are zero...
+ if (!shardVersionA.isSet() && !shardVersionB.isSet()) {
+ return CompareResult_GTE;
}
- return finalResult;
+ return CompareResult_Unknown;
}
- /**
- * Whether or not the manager/primary pair is different from the other manager/primary pair.
- */
- bool isMetadataDifferent(const ChunkManagerPtr& managerA,
- const ShardPtr& primaryA,
- const ChunkManagerPtr& managerB,
- const ShardPtr& primaryB) {
+ if (shardVersionA < shardVersionB) {
+ return CompareResult_LT;
+ }
- if ((managerA && !managerB) || (!managerA && managerB) || (primaryA && !primaryB) || (!primaryA && primaryB)) return true;
+ else
+ return CompareResult_GTE;
+}
- if (managerA) {
- return !managerA->getVersion().isStrictlyEqualTo(managerB->getVersion());
- }
+ChunkVersion getShardVersion(StringData shardName,
+ const ChunkManager* manager,
+ const Shard* primary) {
+ dassert(!(manager && primary));
+ dassert(manager || primary);
- dassert(NULL != primaryA.get());
- return primaryA->getId() != primaryB->getId();
+ if (primary) {
+ return ChunkVersion::UNSHARDED();
}
- /**
- * Whether or not the manager/primary pair was changed or refreshed from a previous version
- * of the metadata.
- */
- bool wasMetadataRefreshed(const ChunkManagerPtr& managerA,
- const ShardPtr& primaryA,
- const ChunkManagerPtr& managerB,
- const ShardPtr& primaryB) {
+ return manager->getVersion(shardName.toString());
+}
- if (isMetadataDifferent(managerA, primaryA, managerB, primaryB))
- return true;
+/**
+ * Returns the relationship between two maps of shard versions. As above, these maps are often
+ * comparable when the collection has not been dropped and there is at least one chunk on the
+ * shards. If any versions in the maps are not comparable, the result is _Unknown.
+ *
+ * If any versions in the first map (cached) are _LT the versions in the second map (remote),
+ * the first (cached) versions are _LT the second (remote) versions.
+ *
+ * Note that the signature here is weird since our cached map of chunk versions is stored in a
+ * ChunkManager or is implicit in the primary shard of the collection.
+ */
+CompareResult compareAllShardVersions(const ChunkManager* cachedChunkManager,
+ const Shard* cachedPrimary,
+ const map<string, ChunkVersion>& remoteShardVersions) {
+ CompareResult finalResult = CompareResult_GTE;
+
+ for (map<string, ChunkVersion>::const_iterator it = remoteShardVersions.begin();
+ it != remoteShardVersions.end();
+ ++it) {
+ // Get the remote and cached version for the next shard
+ const string& shardName = it->first;
+ const ChunkVersion& remoteShardVersion = it->second;
+
+ ChunkVersion cachedShardVersion;
+
+ try {
+ // Throws b/c shard constructor throws
+ cachedShardVersion = getShardVersion(shardName, cachedChunkManager, cachedPrimary);
+ } catch (const DBException& ex) {
+ warning() << "could not lookup shard " << shardName
+ << " in local cache, shard metadata may have changed"
+ << " or be unavailable" << causedBy(ex);
- if (managerA) {
- dassert(managerB.get()); // otherwise metadata would be different
- return managerA->getSequenceNumber() != managerB->getSequenceNumber();
+ return CompareResult_Unknown;
}
- return false;
- }
-
-} // namespace
+ // Compare the remote and cached versions
+ CompareResult result = compareShardVersions(cachedShardVersion, remoteShardVersion);
- ChunkManagerTargeter::ChunkManagerTargeter(const NamespaceString& nss)
- : _nss(nss),
- _needsTargetingRefresh(false) {
+ if (result == CompareResult_Unknown)
+ return result;
+ if (result == CompareResult_LT)
+ finalResult = CompareResult_LT;
+ // Note that we keep going after _LT b/c there could be more _Unknowns.
}
- Status ChunkManagerTargeter::init() {
- auto status = grid.implicitCreateDb(_nss.db().toString());
- if (!status.isOK()) {
- return status.getStatus();
- }
-
- shared_ptr<DBConfig> config = status.getValue();
- config->getChunkManagerOrPrimary(_nss.ns(), _manager, _primary);
-
- return Status::OK();
- }
+ return finalResult;
+}
- const NamespaceString& ChunkManagerTargeter::getNS() const {
- return _nss;
+/**
+ * Whether or not the manager/primary pair is different from the other manager/primary pair.
+ */
+bool isMetadataDifferent(const ChunkManagerPtr& managerA,
+ const ShardPtr& primaryA,
+ const ChunkManagerPtr& managerB,
+ const ShardPtr& primaryB) {
+ if ((managerA && !managerB) || (!managerA && managerB) || (primaryA && !primaryB) ||
+ (!primaryA && primaryB))
+ return true;
+
+ if (managerA) {
+ return !managerA->getVersion().isStrictlyEqualTo(managerB->getVersion());
}
- Status ChunkManagerTargeter::targetInsert( const BSONObj& doc,
- ShardEndpoint** endpoint ) const {
+ dassert(NULL != primaryA.get());
+ return primaryA->getId() != primaryB->getId();
+}
- BSONObj shardKey;
+/**
+* Whether or not the manager/primary pair was changed or refreshed from a previous version
+* of the metadata.
+*/
+bool wasMetadataRefreshed(const ChunkManagerPtr& managerA,
+ const ShardPtr& primaryA,
+ const ChunkManagerPtr& managerB,
+ const ShardPtr& primaryB) {
+ if (isMetadataDifferent(managerA, primaryA, managerB, primaryB))
+ return true;
+
+ if (managerA) {
+ dassert(managerB.get()); // otherwise metadata would be different
+ return managerA->getSequenceNumber() != managerB->getSequenceNumber();
+ }
- if ( _manager ) {
+ return false;
+}
- //
- // Sharded collections have the following requirements for targeting:
- //
- // Inserts must contain the exact shard key.
- //
+} // namespace
- shardKey = _manager->getShardKeyPattern().extractShardKeyFromDoc(doc);
+ChunkManagerTargeter::ChunkManagerTargeter(const NamespaceString& nss)
+ : _nss(nss), _needsTargetingRefresh(false) {}
- // Check shard key exists
- if (shardKey.isEmpty()) {
- return Status(ErrorCodes::ShardKeyNotFound,
- stream() << "document " << doc
- << " does not contain shard key for pattern "
- << _manager->getShardKeyPattern().toString());
- }
-
- // Check shard key size on insert
- Status status = ShardKeyPattern::checkShardKeySize(shardKey);
- if (!status.isOK())
- return status;
- }
+Status ChunkManagerTargeter::init() {
+ auto status = grid.implicitCreateDb(_nss.db().toString());
+ if (!status.isOK()) {
+ return status.getStatus();
+ }
- // Target the shard key or database primary
- if (!shardKey.isEmpty()) {
- return targetShardKey(shardKey, doc.objsize(), endpoint);
- }
- else {
+ shared_ptr<DBConfig> config = status.getValue();
+ config->getChunkManagerOrPrimary(_nss.ns(), _manager, _primary);
- if (!_primary) {
- return Status(ErrorCodes::NamespaceNotFound,
- str::stream() << "could not target insert in collection "
- << getNS().ns() << "; no metadata found");
- }
+ return Status::OK();
+}
- *endpoint = new ShardEndpoint(_primary->getId(), ChunkVersion::UNSHARDED());
- return Status::OK();
- }
- }
+const NamespaceString& ChunkManagerTargeter::getNS() const {
+ return _nss;
+}
- Status ChunkManagerTargeter::targetUpdate( const BatchedUpdateDocument& updateDoc,
- vector<ShardEndpoint*>* endpoints ) const {
+Status ChunkManagerTargeter::targetInsert(const BSONObj& doc, ShardEndpoint** endpoint) const {
+ BSONObj shardKey;
+ if (_manager) {
//
- // Update targeting may use either the query or the update. This is to support save-style
- // updates, of the form:
- //
- // coll.update({ _id : xxx }, { _id : xxx, shardKey : 1, foo : bar }, { upsert : true })
- //
- // Because drivers do not know the shard key, they can't pull the shard key automatically
- // into the query doc, and to correctly support upsert we must target a single shard.
+ // Sharded collections have the following requirements for targeting:
//
- // The rule is simple - If the update is replacement style (no '$set'), we target using the
- // update. If the update is replacement style, we target using the query.
+ // Inserts must contain the exact shard key.
//
- // If we have the exact shard key in either the query or replacement doc, we target using
- // that extracted key.
- //
-
- BSONObj query = updateDoc.getQuery();
- BSONObj updateExpr = updateDoc.getUpdateExpr();
- UpdateType updateType = getUpdateExprType( updateDoc.getUpdateExpr() );
+ shardKey = _manager->getShardKeyPattern().extractShardKeyFromDoc(doc);
- if ( updateType == UpdateType_Unknown ) {
- return Status( ErrorCodes::UnsupportedFormat,
- stream() << "update document " << updateExpr
- << " has mixed $operator and non-$operator style fields" );
+ // Check shard key exists
+ if (shardKey.isEmpty()) {
+ return Status(ErrorCodes::ShardKeyNotFound,
+ stream() << "document " << doc
+ << " does not contain shard key for pattern "
+ << _manager->getShardKeyPattern().toString());
}
- BSONObj shardKey;
-
- if ( _manager ) {
-
- //
- // Sharded collections have the following futher requirements for targeting:
- //
- // Upserts must be targeted exactly by shard key.
- // Non-multi updates must be targeted exactly by shard key *or* exact _id.
- //
-
- // Get the shard key
- if (updateType == UpdateType_OpStyle) {
-
- // Target using the query
- StatusWith<BSONObj> status =
- _manager->getShardKeyPattern().extractShardKeyFromQuery(query);
-
- // Bad query
- if (!status.isOK())
- return status.getStatus();
-
- shardKey = status.getValue();
- }
- else {
- // Target using the replacement document
- shardKey = _manager->getShardKeyPattern().extractShardKeyFromDoc(updateExpr);
- }
+ // Check shard key size on insert
+ Status status = ShardKeyPattern::checkShardKeySize(shardKey);
+ if (!status.isOK())
+ return status;
+ }
- //
- // Extra sharded update validation
- //
+ // Target the shard key or database primary
+ if (!shardKey.isEmpty()) {
+ return targetShardKey(shardKey, doc.objsize(), endpoint);
+ } else {
+ if (!_primary) {
+ return Status(ErrorCodes::NamespaceNotFound,
+ str::stream() << "could not target insert in collection " << getNS().ns()
+ << "; no metadata found");
+ }
- if (updateDoc.getUpsert()) {
+ *endpoint = new ShardEndpoint(_primary->getId(), ChunkVersion::UNSHARDED());
+ return Status::OK();
+ }
+}
- // Sharded upserts *always* need to be exactly targeted by shard key
- if (shardKey.isEmpty()) {
- return Status(ErrorCodes::ShardKeyNotFound,
- stream() << "upsert " << updateDoc.toBSON()
- << " does not contain shard key for pattern "
- << _manager->getShardKeyPattern().toString());
- }
+Status ChunkManagerTargeter::targetUpdate(const BatchedUpdateDocument& updateDoc,
+ vector<ShardEndpoint*>* endpoints) const {
+ //
+ // Update targeting may use either the query or the update. This is to support save-style
+ // updates, of the form:
+ //
+ // coll.update({ _id : xxx }, { _id : xxx, shardKey : 1, foo : bar }, { upsert : true })
+ //
+ // Because drivers do not know the shard key, they can't pull the shard key automatically
+ // into the query doc, and to correctly support upsert we must target a single shard.
+ //
+ // The rule is simple - If the update is replacement style (no '$set'), we target using the
+ // update. If the update is replacement style, we target using the query.
+ //
+ // If we have the exact shard key in either the query or replacement doc, we target using
+ // that extracted key.
+ //
- // Also check shard key size on upsert
- Status status = ShardKeyPattern::checkShardKeySize(shardKey);
- if (!status.isOK())
- return status;
- }
+ BSONObj query = updateDoc.getQuery();
+ BSONObj updateExpr = updateDoc.getUpdateExpr();
- // Validate that single (non-multi) sharded updates are targeted by shard key or _id
- if (!updateDoc.getMulti() && shardKey.isEmpty()
- && !isExactIdQuery(updateDoc.getQuery())) {
- return Status(ErrorCodes::ShardKeyNotFound,
- stream() << "update " << updateDoc.toBSON()
- << " does not contain _id or shard key for pattern "
- << _manager->getShardKeyPattern().toString());
- }
- }
+ UpdateType updateType = getUpdateExprType(updateDoc.getUpdateExpr());
- // Target the shard key, query, or replacement doc
- if (!shardKey.isEmpty()) {
- // We can't rely on our query targeting to be exact
- ShardEndpoint* endpoint = NULL;
- Status result = targetShardKey(shardKey,
- (query.objsize() + updateExpr.objsize()),
- &endpoint);
- endpoints->push_back(endpoint);
- return result;
- }
- else if (updateType == UpdateType_OpStyle) {
- return targetQuery(query, endpoints);
- }
- else {
- return targetDoc(updateExpr, endpoints);
- }
+ if (updateType == UpdateType_Unknown) {
+ return Status(ErrorCodes::UnsupportedFormat,
+ stream() << "update document " << updateExpr
+ << " has mixed $operator and non-$operator style fields");
}
- Status ChunkManagerTargeter::targetDelete( const BatchedDeleteDocument& deleteDoc,
- vector<ShardEndpoint*>* endpoints ) const {
-
- BSONObj shardKey;
-
- if ( _manager ) {
+ BSONObj shardKey;
- //
- // Sharded collections have the following further requirements for targeting:
- //
- // Limit-1 deletes must be targeted exactly by shard key *or* exact _id
- //
+ if (_manager) {
+ //
+ // Sharded collections have the following futher requirements for targeting:
+ //
+ // Upserts must be targeted exactly by shard key.
+ // Non-multi updates must be targeted exactly by shard key *or* exact _id.
+ //
- // Get the shard key
+ // Get the shard key
+ if (updateType == UpdateType_OpStyle) {
+ // Target using the query
StatusWith<BSONObj> status =
- _manager->getShardKeyPattern().extractShardKeyFromQuery(deleteDoc.getQuery());
+ _manager->getShardKeyPattern().extractShardKeyFromQuery(query);
// Bad query
if (!status.isOK())
return status.getStatus();
shardKey = status.getValue();
+ } else {
+ // Target using the replacement document
+ shardKey = _manager->getShardKeyPattern().extractShardKeyFromDoc(updateExpr);
+ }
- // Validate that single (limit-1) sharded deletes are targeted by shard key or _id
- if (deleteDoc.getLimit() == 1 && shardKey.isEmpty()
- && !isExactIdQuery(deleteDoc.getQuery())) {
+ //
+ // Extra sharded update validation
+ //
+
+ if (updateDoc.getUpsert()) {
+ // Sharded upserts *always* need to be exactly targeted by shard key
+ if (shardKey.isEmpty()) {
return Status(ErrorCodes::ShardKeyNotFound,
- stream() << "delete " << deleteDoc.toBSON()
- << " does not contain _id or shard key for pattern "
+ stream() << "upsert " << updateDoc.toBSON()
+ << " does not contain shard key for pattern "
<< _manager->getShardKeyPattern().toString());
}
- }
- // Target the shard key or delete query
- if (!shardKey.isEmpty()) {
- // We can't rely on our query targeting to be exact
- ShardEndpoint* endpoint = NULL;
- Status result = targetShardKey(shardKey, 0, &endpoint);
- endpoints->push_back(endpoint);
- return result;
+ // Also check shard key size on upsert
+ Status status = ShardKeyPattern::checkShardKeySize(shardKey);
+ if (!status.isOK())
+ return status;
}
- else {
- return targetQuery(deleteDoc.getQuery(), endpoints);
+
+ // Validate that single (non-multi) sharded updates are targeted by shard key or _id
+ if (!updateDoc.getMulti() && shardKey.isEmpty() && !isExactIdQuery(updateDoc.getQuery())) {
+ return Status(ErrorCodes::ShardKeyNotFound,
+ stream() << "update " << updateDoc.toBSON()
+ << " does not contain _id or shard key for pattern "
+ << _manager->getShardKeyPattern().toString());
}
}
- Status ChunkManagerTargeter::targetDoc(const BSONObj& doc,
- vector<ShardEndpoint*>* endpoints) const {
- // NOTE: This is weird and fragile, but it's the way our language works right now -
- // documents are either A) invalid or B) valid equality queries over themselves.
- return targetQuery(doc, endpoints);
+ // Target the shard key, query, or replacement doc
+ if (!shardKey.isEmpty()) {
+ // We can't rely on our query targeting to be exact
+ ShardEndpoint* endpoint = NULL;
+ Status result =
+ targetShardKey(shardKey, (query.objsize() + updateExpr.objsize()), &endpoint);
+ endpoints->push_back(endpoint);
+ return result;
+ } else if (updateType == UpdateType_OpStyle) {
+ return targetQuery(query, endpoints);
+ } else {
+ return targetDoc(updateExpr, endpoints);
}
+}
- Status ChunkManagerTargeter::targetQuery( const BSONObj& query,
- vector<ShardEndpoint*>* endpoints ) const {
-
- if ( !_primary && !_manager ) {
- return Status(ErrorCodes::NamespaceNotFound,
- stream() << "could not target query in "
- << getNS().ns() << "; no metadata found");
- }
-
- set<ShardId> shardIds;
- if ( _manager ) {
- try {
- _manager->getShardIdsForQuery(shardIds, query);
- } catch ( const DBException& ex ) {
- return ex.toStatus();
- }
- }
- else {
- shardIds.insert(_primary->getId());
- }
+Status ChunkManagerTargeter::targetDelete(const BatchedDeleteDocument& deleteDoc,
+ vector<ShardEndpoint*>* endpoints) const {
+ BSONObj shardKey;
- for (const ShardId& shardId : shardIds) {
- endpoints->push_back(new ShardEndpoint(shardId,
- _manager ? _manager->getVersion(shardId) :
- ChunkVersion::UNSHARDED()));
- }
+ if (_manager) {
+ //
+ // Sharded collections have the following further requirements for targeting:
+ //
+ // Limit-1 deletes must be targeted exactly by shard key *or* exact _id
+ //
- return Status::OK();
- }
+ // Get the shard key
+ StatusWith<BSONObj> status =
+ _manager->getShardKeyPattern().extractShardKeyFromQuery(deleteDoc.getQuery());
- Status ChunkManagerTargeter::targetShardKey(const BSONObj& shardKey,
- long long estDataSize,
- ShardEndpoint** endpoint) const {
- invariant(NULL != _manager);
+ // Bad query
+ if (!status.isOK())
+ return status.getStatus();
- ChunkPtr chunk = _manager->findIntersectingChunk(shardKey);
+ shardKey = status.getValue();
- // Track autosplit stats for sharded collections
- // Note: this is only best effort accounting and is not accurate.
- if (estDataSize > 0) {
- _stats.chunkSizeDelta[chunk->getMin()] += estDataSize;
+ // Validate that single (limit-1) sharded deletes are targeted by shard key or _id
+ if (deleteDoc.getLimit() == 1 && shardKey.isEmpty() &&
+ !isExactIdQuery(deleteDoc.getQuery())) {
+ return Status(ErrorCodes::ShardKeyNotFound,
+ stream() << "delete " << deleteDoc.toBSON()
+ << " does not contain _id or shard key for pattern "
+ << _manager->getShardKeyPattern().toString());
}
+ }
- *endpoint = new ShardEndpoint(chunk->getShardId(),
- _manager->getVersion(chunk->getShardId()));
+ // Target the shard key or delete query
+ if (!shardKey.isEmpty()) {
+ // We can't rely on our query targeting to be exact
+ ShardEndpoint* endpoint = NULL;
+ Status result = targetShardKey(shardKey, 0, &endpoint);
+ endpoints->push_back(endpoint);
+ return result;
+ } else {
+ return targetQuery(deleteDoc.getQuery(), endpoints);
+ }
+}
+
+Status ChunkManagerTargeter::targetDoc(const BSONObj& doc,
+ vector<ShardEndpoint*>* endpoints) const {
+ // NOTE: This is weird and fragile, but it's the way our language works right now -
+ // documents are either A) invalid or B) valid equality queries over themselves.
+ return targetQuery(doc, endpoints);
+}
+
+Status ChunkManagerTargeter::targetQuery(const BSONObj& query,
+ vector<ShardEndpoint*>* endpoints) const {
+ if (!_primary && !_manager) {
+ return Status(ErrorCodes::NamespaceNotFound,
+ stream() << "could not target query in " << getNS().ns()
+ << "; no metadata found");
+ }
- return Status::OK();
+ set<ShardId> shardIds;
+ if (_manager) {
+ try {
+ _manager->getShardIdsForQuery(shardIds, query);
+ } catch (const DBException& ex) {
+ return ex.toStatus();
+ }
+ } else {
+ shardIds.insert(_primary->getId());
}
- Status ChunkManagerTargeter::targetCollection( vector<ShardEndpoint*>* endpoints ) const {
+ for (const ShardId& shardId : shardIds) {
+ endpoints->push_back(new ShardEndpoint(
+ shardId, _manager ? _manager->getVersion(shardId) : ChunkVersion::UNSHARDED()));
+ }
- if ( !_primary && !_manager ) {
- return Status( ErrorCodes::NamespaceNotFound,
- str::stream() << "could not target full range of "
- << getNS().ns()
- << "; metadata not found" );
- }
+ return Status::OK();
+}
- set<ShardId> shardIds;
- if ( _manager ) {
- _manager->getAllShardIds(&shardIds);
- }
- else {
- shardIds.insert(_primary->getId());
- }
+Status ChunkManagerTargeter::targetShardKey(const BSONObj& shardKey,
+ long long estDataSize,
+ ShardEndpoint** endpoint) const {
+ invariant(NULL != _manager);
- for (const ShardId& shardId : shardIds) {
- endpoints->push_back(new ShardEndpoint(shardId,
- _manager ? _manager->getVersion(shardId) :
- ChunkVersion::UNSHARDED()));
- }
+ ChunkPtr chunk = _manager->findIntersectingChunk(shardKey);
- return Status::OK();
+ // Track autosplit stats for sharded collections
+ // Note: this is only best effort accounting and is not accurate.
+ if (estDataSize > 0) {
+ _stats.chunkSizeDelta[chunk->getMin()] += estDataSize;
}
- Status ChunkManagerTargeter::targetAllShards( vector<ShardEndpoint*>* endpoints ) const {
+ *endpoint = new ShardEndpoint(chunk->getShardId(), _manager->getVersion(chunk->getShardId()));
- if ( !_primary && !_manager ) {
- return Status( ErrorCodes::NamespaceNotFound,
- str::stream() << "could not target every shard with versions for "
- << getNS().ns()
- << "; metadata not found" );
- }
-
- vector<ShardId> shardIds;
- grid.shardRegistry()->getAllShardIds(&shardIds);
+ return Status::OK();
+}
- for (const ShardId& shardId : shardIds) {
- endpoints->push_back(new ShardEndpoint(shardId,
- _manager ?
- _manager->getVersion(shardId) :
- ChunkVersion::UNSHARDED()));
- }
+Status ChunkManagerTargeter::targetCollection(vector<ShardEndpoint*>* endpoints) const {
+ if (!_primary && !_manager) {
+ return Status(ErrorCodes::NamespaceNotFound,
+ str::stream() << "could not target full range of " << getNS().ns()
+ << "; metadata not found");
+ }
- return Status::OK();
+ set<ShardId> shardIds;
+ if (_manager) {
+ _manager->getAllShardIds(&shardIds);
+ } else {
+ shardIds.insert(_primary->getId());
}
- void ChunkManagerTargeter::noteStaleResponse( const ShardEndpoint& endpoint,
- const BSONObj& staleInfo ) {
- dassert( !_needsTargetingRefresh );
+ for (const ShardId& shardId : shardIds) {
+ endpoints->push_back(new ShardEndpoint(
+ shardId, _manager ? _manager->getVersion(shardId) : ChunkVersion::UNSHARDED()));
+ }
- ChunkVersion remoteShardVersion;
- if ( staleInfo["vWanted"].eoo() ) {
- // If we don't have a vWanted sent, assume the version is higher than our current
- // version.
- remoteShardVersion =
- getShardVersion(endpoint.shardName, _manager.get(), _primary.get());
- remoteShardVersion.incMajor();
- }
- else {
- remoteShardVersion = ChunkVersion::fromBSON( staleInfo, "vWanted" );
- }
+ return Status::OK();
+}
- ShardVersionMap::iterator it = _remoteShardVersions.find( endpoint.shardName );
- if ( it == _remoteShardVersions.end() ) {
- _remoteShardVersions.insert( make_pair( endpoint.shardName, remoteShardVersion ) );
- }
- else {
- ChunkVersion& previouslyNotedVersion = it->second;
- if ( previouslyNotedVersion.hasEqualEpoch( remoteShardVersion )) {
- if ( previouslyNotedVersion.isOlderThan( remoteShardVersion )) {
- remoteShardVersion.cloneTo( &previouslyNotedVersion );
- }
- }
- else {
- // Epoch changed midway while applying the batch so set the version to
- // something unique and non-existent to force a reload when
- // refreshIsNeeded is called.
- ChunkVersion::IGNORED().cloneTo( &previouslyNotedVersion );
- }
- }
+Status ChunkManagerTargeter::targetAllShards(vector<ShardEndpoint*>* endpoints) const {
+ if (!_primary && !_manager) {
+ return Status(ErrorCodes::NamespaceNotFound,
+ str::stream() << "could not target every shard with versions for "
+ << getNS().ns() << "; metadata not found");
}
- void ChunkManagerTargeter::noteCouldNotTarget() {
- dassert( _remoteShardVersions.empty() );
- _needsTargetingRefresh = true;
- }
+ vector<ShardId> shardIds;
+ grid.shardRegistry()->getAllShardIds(&shardIds);
- const TargeterStats* ChunkManagerTargeter::getStats() const {
- return &_stats;
+ for (const ShardId& shardId : shardIds) {
+ endpoints->push_back(new ShardEndpoint(
+ shardId, _manager ? _manager->getVersion(shardId) : ChunkVersion::UNSHARDED()));
}
- Status ChunkManagerTargeter::refreshIfNeeded( bool *wasChanged ) {
+ return Status::OK();
+}
+
+void ChunkManagerTargeter::noteStaleResponse(const ShardEndpoint& endpoint,
+ const BSONObj& staleInfo) {
+ dassert(!_needsTargetingRefresh);
+
+ ChunkVersion remoteShardVersion;
+ if (staleInfo["vWanted"].eoo()) {
+ // If we don't have a vWanted sent, assume the version is higher than our current
+ // version.
+ remoteShardVersion = getShardVersion(endpoint.shardName, _manager.get(), _primary.get());
+ remoteShardVersion.incMajor();
+ } else {
+ remoteShardVersion = ChunkVersion::fromBSON(staleInfo, "vWanted");
+ }
- bool dummy;
- if (!wasChanged) {
- wasChanged = &dummy;
+ ShardVersionMap::iterator it = _remoteShardVersions.find(endpoint.shardName);
+ if (it == _remoteShardVersions.end()) {
+ _remoteShardVersions.insert(make_pair(endpoint.shardName, remoteShardVersion));
+ } else {
+ ChunkVersion& previouslyNotedVersion = it->second;
+ if (previouslyNotedVersion.hasEqualEpoch(remoteShardVersion)) {
+ if (previouslyNotedVersion.isOlderThan(remoteShardVersion)) {
+ remoteShardVersion.cloneTo(&previouslyNotedVersion);
+ }
+ } else {
+ // Epoch changed midway while applying the batch so set the version to
+ // something unique and non-existent to force a reload when
+ // refreshIsNeeded is called.
+ ChunkVersion::IGNORED().cloneTo(&previouslyNotedVersion);
}
+ }
+}
- *wasChanged = false;
+void ChunkManagerTargeter::noteCouldNotTarget() {
+ dassert(_remoteShardVersions.empty());
+ _needsTargetingRefresh = true;
+}
- //
- // Did we have any stale config or targeting errors at all?
- //
+const TargeterStats* ChunkManagerTargeter::getStats() const {
+ return &_stats;
+}
- if (!_needsTargetingRefresh && _remoteShardVersions.empty()) {
- return Status::OK();
- }
+Status ChunkManagerTargeter::refreshIfNeeded(bool* wasChanged) {
+ bool dummy;
+ if (!wasChanged) {
+ wasChanged = &dummy;
+ }
- //
- // Get the latest metadata information from the cache if there were issues
- //
+ *wasChanged = false;
- ChunkManagerPtr lastManager = _manager;
- ShardPtr lastPrimary = _primary;
+ //
+ // Did we have any stale config or targeting errors at all?
+ //
- auto status = grid.implicitCreateDb(_nss.db().toString());
- if (!status.isOK()) {
- return status.getStatus();
- }
+ if (!_needsTargetingRefresh && _remoteShardVersions.empty()) {
+ return Status::OK();
+ }
- shared_ptr<DBConfig> config = status.getValue();
- config->getChunkManagerOrPrimary( _nss.ns(), _manager, _primary );
+ //
+ // Get the latest metadata information from the cache if there were issues
+ //
- // We now have the latest metadata from the cache.
+ ChunkManagerPtr lastManager = _manager;
+ ShardPtr lastPrimary = _primary;
- //
- // See if and how we need to do a remote refresh.
- // Either we couldn't target at all, or we have stale versions, but not both.
- //
+ auto status = grid.implicitCreateDb(_nss.db().toString());
+ if (!status.isOK()) {
+ return status.getStatus();
+ }
- dassert( !( _needsTargetingRefresh && !_remoteShardVersions.empty() ) );
+ shared_ptr<DBConfig> config = status.getValue();
+ config->getChunkManagerOrPrimary(_nss.ns(), _manager, _primary);
- if ( _needsTargetingRefresh ) {
+ // We now have the latest metadata from the cache.
- // Reset the field
- _needsTargetingRefresh = false;
+ //
+ // See if and how we need to do a remote refresh.
+ // Either we couldn't target at all, or we have stale versions, but not both.
+ //
- // If we couldn't target, we might need to refresh if we haven't remotely refreshed the
- // metadata since we last got it from the cache.
+ dassert(!(_needsTargetingRefresh && !_remoteShardVersions.empty()));
- bool alreadyRefreshed = wasMetadataRefreshed( lastManager,
- lastPrimary,
- _manager,
- _primary );
+ if (_needsTargetingRefresh) {
+ // Reset the field
+ _needsTargetingRefresh = false;
- // If didn't already refresh the targeting information, refresh it
- if ( !alreadyRefreshed ) {
- // To match previous behavior, we just need an incremental refresh here
- return refreshNow( RefreshType_RefreshChunkManager );
- }
+ // If we couldn't target, we might need to refresh if we haven't remotely refreshed the
+ // metadata since we last got it from the cache.
- *wasChanged = isMetadataDifferent( lastManager, lastPrimary, _manager, _primary );
- return Status::OK();
- }
- else if ( !_remoteShardVersions.empty() ) {
+ bool alreadyRefreshed = wasMetadataRefreshed(lastManager, lastPrimary, _manager, _primary);
- // If we got stale shard versions from remote shards, we may need to refresh
- // NOTE: Not sure yet if this can happen simultaneously with targeting issues
+ // If didn't already refresh the targeting information, refresh it
+ if (!alreadyRefreshed) {
+ // To match previous behavior, we just need an incremental refresh here
+ return refreshNow(RefreshType_RefreshChunkManager);
+ }
- CompareResult result = compareAllShardVersions(_manager.get(),
- _primary.get(),
- _remoteShardVersions);
- // Reset the versions
- _remoteShardVersions.clear();
+ *wasChanged = isMetadataDifferent(lastManager, lastPrimary, _manager, _primary);
+ return Status::OK();
+ } else if (!_remoteShardVersions.empty()) {
+ // If we got stale shard versions from remote shards, we may need to refresh
+ // NOTE: Not sure yet if this can happen simultaneously with targeting issues
- if ( result == CompareResult_Unknown ) {
- // Our current shard versions aren't all comparable to the old versions, maybe drop
- return refreshNow( RefreshType_ReloadDatabase );
- }
- else if ( result == CompareResult_LT ) {
- // Our current shard versions are less than the remote versions, but no drop
- return refreshNow( RefreshType_RefreshChunkManager );
- }
+ CompareResult result =
+ compareAllShardVersions(_manager.get(), _primary.get(), _remoteShardVersions);
+ // Reset the versions
+ _remoteShardVersions.clear();
- *wasChanged = isMetadataDifferent( lastManager, lastPrimary, _manager, _primary );
- return Status::OK();
+ if (result == CompareResult_Unknown) {
+ // Our current shard versions aren't all comparable to the old versions, maybe drop
+ return refreshNow(RefreshType_ReloadDatabase);
+ } else if (result == CompareResult_LT) {
+ // Our current shard versions are less than the remote versions, but no drop
+ return refreshNow(RefreshType_RefreshChunkManager);
}
- // unreachable
- dassert( false );
+ *wasChanged = isMetadataDifferent(lastManager, lastPrimary, _manager, _primary);
return Status::OK();
}
- Status ChunkManagerTargeter::refreshNow( RefreshType refreshType ) {
- auto status = grid.implicitCreateDb(_nss.db().toString());
- if (!status.isOK()) {
- return status.getStatus();
- }
+ // unreachable
+ dassert(false);
+ return Status::OK();
+}
- shared_ptr<DBConfig> config = status.getValue();
+Status ChunkManagerTargeter::refreshNow(RefreshType refreshType) {
+ auto status = grid.implicitCreateDb(_nss.db().toString());
+ if (!status.isOK()) {
+ return status.getStatus();
+ }
- // Try not to spam the configs
- refreshBackoff();
+ shared_ptr<DBConfig> config = status.getValue();
- // TODO: Improve synchronization and make more explicit
- if ( refreshType == RefreshType_RefreshChunkManager ) {
- try {
- // Forces a remote check of the collection info, synchronization between threads
- // happens internally.
- config->getChunkManagerIfExists( _nss.ns(), true );
- }
- catch ( const DBException& ex ) {
- return Status( ErrorCodes::UnknownError, ex.toString() );
- }
- config->getChunkManagerOrPrimary( _nss.ns(), _manager, _primary );
- }
- else if ( refreshType == RefreshType_ReloadDatabase ) {
- try {
- // Dumps the db info, reloads it all, synchronization between threads happens
- // internally.
- config->reload();
- config->getChunkManagerIfExists( _nss.ns(), true, true );
- }
- catch ( const DBException& ex ) {
- return Status( ErrorCodes::UnknownError, ex.toString() );
- }
+ // Try not to spam the configs
+ refreshBackoff();
- config->getChunkManagerOrPrimary( _nss.ns(), _manager, _primary );
+ // TODO: Improve synchronization and make more explicit
+ if (refreshType == RefreshType_RefreshChunkManager) {
+ try {
+ // Forces a remote check of the collection info, synchronization between threads
+ // happens internally.
+ config->getChunkManagerIfExists(_nss.ns(), true);
+ } catch (const DBException& ex) {
+ return Status(ErrorCodes::UnknownError, ex.toString());
+ }
+ config->getChunkManagerOrPrimary(_nss.ns(), _manager, _primary);
+ } else if (refreshType == RefreshType_ReloadDatabase) {
+ try {
+ // Dumps the db info, reloads it all, synchronization between threads happens
+ // internally.
+ config->reload();
+ config->getChunkManagerIfExists(_nss.ns(), true, true);
+ } catch (const DBException& ex) {
+ return Status(ErrorCodes::UnknownError, ex.toString());
}
- return Status::OK();
+ config->getChunkManagerOrPrimary(_nss.ns(), _manager, _primary);
}
-} // namespace mongo
+ return Status::OK();
+}
+} // namespace mongo