diff options
author | David Storch <david.storch@10gen.com> | 2014-08-01 13:15:36 -0400 |
---|---|---|
committer | David Storch <david.storch@10gen.com> | 2014-08-11 16:51:47 -0400 |
commit | c184143fa4d8a4fdf4fdc684404d4aad3e55794b (patch) | |
tree | 2948d8aa90ccff4528a7a441ec6ad0e8df6e07cf | |
parent | 2a90d1e20230800f8dc5a01ea066873354fcd938 (diff) | |
download | mongo-c184143fa4d8a4fdf4fdc684404d4aad3e55794b.tar.gz |
SERVER-14497 UpdateStage
-rw-r--r-- | src/mongo/db/exec/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/exec/plan_stats.h | 35 | ||||
-rw-r--r-- | src/mongo/db/exec/update.cpp | 818 | ||||
-rw-r--r-- | src/mongo/db/exec/update.h | 164 | ||||
-rw-r--r-- | src/mongo/db/ops/update.cpp | 734 | ||||
-rw-r--r-- | src/mongo/db/query/get_executor.cpp | 83 | ||||
-rw-r--r-- | src/mongo/db/query/get_executor.h | 49 | ||||
-rw-r--r-- | src/mongo/db/query/stage_types.h | 2 | ||||
-rw-r--r-- | src/mongo/dbtests/query_stage_update.cpp | 364 |
9 files changed, 1556 insertions, 694 deletions
diff --git a/src/mongo/db/exec/SConscript b/src/mongo/db/exec/SConscript index 01b57a9ff2e..0d5ab3e2b29 100644 --- a/src/mongo/db/exec/SConscript +++ b/src/mongo/db/exec/SConscript @@ -64,6 +64,7 @@ env.Library( "stagedebug_cmd.cpp", "subplan.cpp", "text.cpp", + "update.cpp", "working_set_common.cpp", ], LIBDEPS = [ diff --git a/src/mongo/db/exec/plan_stats.h b/src/mongo/db/exec/plan_stats.h index adbc0b6d986..113be7af2a1 100644 --- a/src/mongo/db/exec/plan_stats.h +++ b/src/mongo/db/exec/plan_stats.h @@ -559,6 +559,41 @@ namespace mongo { BSONObj keyPattern; }; + struct UpdateStats : public SpecificStats { + UpdateStats() + : nMatched(0), + nModified(0), + fastmod(false), + fastmodinsert(false), + inserted(false) { } + + virtual SpecificStats* clone() const { + return new UpdateStats(*this); + } + + // The number of documents which match the query part of the update. + size_t nMatched; + + // The number of documents modified by this update. + size_t nModified; + + // A 'fastmod' update is an in-place update that does not have to modify + // any indices. It's "fast" because the only work needed is changing the bits + // inside the document. + bool fastmod; + + // A 'fastmodinsert' is an insert resulting from an {upsert: true} update + // which is a doc-replacement style update. It's "fast" because we don't need + // to compute the document to insert based on the modifiers. + bool fastmodinsert; + + // Is this an {upsert: true} update that did an insert? + bool inserted; + + // The object that was inserted. This is an empty document if no insert was performed. + BSONObj objInserted; + }; + struct TextStats : public SpecificStats { TextStats() : keysExamined(0), fetches(0), parsedTextQuery() { } diff --git a/src/mongo/db/exec/update.cpp b/src/mongo/db/exec/update.cpp new file mode 100644 index 00000000000..83a89262f46 --- /dev/null +++ b/src/mongo/db/exec/update.cpp @@ -0,0 +1,818 @@ +/** + * Copyright (C) 2014 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/exec/update.h" + +#include "mongo/bson/mutable/algorithm.h" +#include "mongo/db/exec/working_set_common.h" +#include "mongo/db/ops/update_lifecycle.h" +#include "mongo/db/repl/repl_coordinator_global.h" +#include "mongo/db/repl/oplog.h" + +namespace mongo { + + namespace mb = mutablebson; + + namespace { + + const char idFieldName[] = "_id"; + const FieldRef idFieldRef(idFieldName); + + Status storageValid(const mb::Document&, const bool); + Status storageValid(const mb::ConstElement&, const bool); + Status storageValidChildren(const mb::ConstElement&, const bool); + + /** + * mutable::document storageValid check -- like BSONObj::_okForStorage + */ + Status storageValid(const mb::Document& doc, const bool deep = true) { + mb::ConstElement currElem = doc.root().leftChild(); + while (currElem.ok()) { + if (currElem.getFieldName() == idFieldName) { + switch (currElem.getType()) { + case RegEx: + case Array: + case Undefined: + return Status(ErrorCodes::InvalidIdField, + str::stream() << "The '_id' value cannot be of type " + << typeName(currElem.getType())); + default: + break; + } + } + Status s = storageValid(currElem, deep); + if (!s.isOK()) + return s; + currElem = currElem.rightSibling(); + } + + return Status::OK(); + } + + /** + * Validates an element that has a field name which starts with a dollar sign ($). + * In the case of a DBRef field ($id, $ref, [$db]) these fields may be valid in + * the correct order/context only. + */ + Status validateDollarPrefixElement(const mb::ConstElement elem, const bool deep) { + mb::ConstElement curr = elem; + StringData currName = elem.getFieldName(); + + // Found a $db field + if (currName == "$db") { + if (curr.getType() != String) { + return Status(ErrorCodes::InvalidDBRef, + str::stream() << "The DBRef $db field must be a String, not a " + << typeName(curr.getType())); + } + curr = curr.leftSibling(); + + if (!curr.ok() || (curr.getFieldName() != "$id")) + return Status(ErrorCodes::InvalidDBRef, + "Found $db field without a $id before it, which is invalid."); + + currName = curr.getFieldName(); + } + + // Found a $id field + if (currName == "$id") { + Status s = storageValidChildren(curr, deep); + if (!s.isOK()) + return s; + + curr = curr.leftSibling(); + if (!curr.ok() || (curr.getFieldName() != "$ref")) { + return Status(ErrorCodes::InvalidDBRef, + "Found $id field without a $ref before it, which is invalid."); + } + + currName = curr.getFieldName(); + } + + if (currName == "$ref") { + if (curr.getType() != String) { + return Status(ErrorCodes::InvalidDBRef, + str::stream() << "The DBRef $ref field must be a String, not a " + << typeName(curr.getType())); + } + + if (!curr.rightSibling().ok() || curr.rightSibling().getFieldName() != "$id") + return Status(ErrorCodes::InvalidDBRef, + str::stream() << "The DBRef $ref field must be " + "following by a $id field"); + } + else { + // not an okay, $ prefixed field name. + return Status(ErrorCodes::DollarPrefixedFieldName, + str::stream() << "The dollar ($) prefixed field '" + << elem.getFieldName() << "' in '" + << mb::getFullName(elem) + << "' is not valid for storage."); + + } + + return Status::OK(); + } + + Status storageValid(const mb::ConstElement& elem, const bool deep = true) { + if (!elem.ok()) + return Status(ErrorCodes::BadValue, "Invalid elements cannot be stored."); + + // Field names of elements inside arrays are not meaningful in mutable bson, + // so we do not want to validate them. + // + // TODO: Revisit how mutable handles array field names. We going to need to make + // this better if we ever want to support ordered updates that can alter the same + // element repeatedly; see SERVER-12848. + const bool childOfArray = elem.parent().ok() ? + (elem.parent().getType() == mongo::Array) : false; + + if (!childOfArray) { + StringData fieldName = elem.getFieldName(); + // Cannot start with "$", unless dbref + if (fieldName[0] == '$') { + Status status = validateDollarPrefixElement(elem, deep); + if (!status.isOK()) + return status; + } + else if (fieldName.find(".") != string::npos) { + // Field name cannot have a "." in it. + return Status(ErrorCodes::DottedFieldName, + str::stream() << "The dotted field '" + << elem.getFieldName() << "' in '" + << mb::getFullName(elem) + << "' is not valid for storage."); + } + } + + // Check children if there are any. + Status s = storageValidChildren(elem, deep); + if (!s.isOK()) + return s; + + return Status::OK(); + } + + Status storageValidChildren(const mb::ConstElement& elem, const bool deep) { + if (!elem.hasChildren()) + return Status::OK(); + + mb::ConstElement curr = elem.leftChild(); + while (curr.ok()) { + Status s = storageValid(curr, deep); + if (!s.isOK()) + return s; + curr = curr.rightSibling(); + } + + return Status::OK(); + } + + /** + * This will verify that all updated fields are + * 1.) Valid for storage (checking parent to make sure things like DBRefs are valid) + * 2.) Compare updated immutable fields do not change values + * + * If updateFields is empty then it was replacement and/or we need to check all fields + */ + inline Status validate(const BSONObj& original, + const FieldRefSet& updatedFields, + const mb::Document& updated, + const std::vector<FieldRef*>* immutableAndSingleValueFields, + const ModifierInterface::Options& opts) { + + LOG(3) << "update validate options -- " + << " updatedFields: " << updatedFields + << " immutableAndSingleValueFields.size:" + << (immutableAndSingleValueFields ? immutableAndSingleValueFields->size() : 0) + << " fromRepl: " << opts.fromReplication + << " validate:" << opts.enforceOkForStorage; + + // 1.) Loop through each updated field and validate for storage + // and detect immutable field updates + + // The set of possibly changed immutable fields -- we will need to check their vals + FieldRefSet changedImmutableFields; + + // Check to see if there were no fields specified or if we are not validating + // The case if a range query, or query that didn't result in saved fields + if (updatedFields.empty() || !opts.enforceOkForStorage) { + if (opts.enforceOkForStorage) { + // No specific fields were updated so the whole doc must be checked + Status s = storageValid(updated, true); + if (!s.isOK()) + return s; + } + + // Check all immutable fields + if (immutableAndSingleValueFields) + changedImmutableFields.fillFrom(*immutableAndSingleValueFields); + } + else { + + // TODO: Change impl so we don't need to create a new FieldRefSet + // -- move all conflict logic into static function on FieldRefSet? + FieldRefSet immutableFieldRef; + if (immutableAndSingleValueFields) + immutableFieldRef.fillFrom(*immutableAndSingleValueFields); + + FieldRefSet::const_iterator where = updatedFields.begin(); + const FieldRefSet::const_iterator end = updatedFields.end(); + for( ; where != end; ++where) { + const FieldRef& current = **where; + + // Find the updated field in the updated document. + mutablebson::ConstElement newElem = updated.root(); + size_t currentPart = 0; + while (newElem.ok() && currentPart < current.numParts()) + newElem = newElem[current.getPart(currentPart++)]; + + // newElem might be missing if $unset/$renamed-away + if (newElem.ok()) { + Status s = storageValid(newElem, true); + if (!s.isOK()) + return s; + } + // Check if the updated field conflicts with immutable fields + immutableFieldRef.findConflicts(¤t, &changedImmutableFields); + } + } + + const bool checkIdField = (updatedFields.empty() && !original.isEmpty()) || + updatedFields.findConflicts(&idFieldRef, NULL); + + // Add _id to fields to check since it too is immutable + if (checkIdField) + changedImmutableFields.keepShortest(&idFieldRef); + else if (changedImmutableFields.empty()) { + // Return early if nothing changed which is immutable + return Status::OK(); + } + + LOG(4) << "Changed immutable fields: " << changedImmutableFields; + // 2.) Now compare values of the changed immutable fields (to make sure they haven't) + + const mutablebson::ConstElement newIdElem = updated.root()[idFieldName]; + + FieldRefSet::const_iterator where = changedImmutableFields.begin(); + const FieldRefSet::const_iterator end = changedImmutableFields.end(); + for( ; where != end; ++where ) { + const FieldRef& current = **where; + + // Find the updated field in the updated document. + mutablebson::ConstElement newElem = updated.root(); + size_t currentPart = 0; + while (newElem.ok() && currentPart < current.numParts()) + newElem = newElem[current.getPart(currentPart++)]; + + if (!newElem.ok()) { + if (original.isEmpty()) { + // If the _id is missing and not required, then skip this check + if (!(current.dottedField() == idFieldName)) + return Status(ErrorCodes::NoSuchKey, + mongoutils::str::stream() + << "After applying the update, the new" + << " document was missing the '" + << current.dottedField() + << "' (required and immutable) field."); + + } + else { + if (current.dottedField() != idFieldName) + return Status(ErrorCodes::ImmutableField, + mongoutils::str::stream() + << "After applying the update to the document with " + << newIdElem.toString() + << ", the '" << current.dottedField() + << "' (required and immutable) field was " + "found to have been removed --" + << original); + } + } + else { + + // Find the potentially affected field in the original document. + const BSONElement oldElem = original.getFieldDotted(current.dottedField()); + const BSONElement oldIdElem = original.getField(idFieldName); + + // Ensure no arrays since neither _id nor shard keys can be in an array, or one. + mb::ConstElement currElem = newElem; + while (currElem.ok()) { + if (currElem.getType() == Array) { + return Status(ErrorCodes::NotSingleValueField, + mongoutils::str::stream() + << "After applying the update to the document {" + << (oldIdElem.ok() ? oldIdElem.toString() : + newIdElem.toString()) + << " , ...}, the (immutable) field '" + << current.dottedField() + << "' was found to be an array or array descendant."); + } + currElem = currElem.parent(); + } + + // If we have both (old and new), compare them. If we just have new we are good + if (oldElem.ok() && newElem.compareWithBSONElement(oldElem, false) != 0) { + return Status(ErrorCodes::ImmutableField, + mongoutils::str::stream() + << "After applying the update to the document {" + << (oldIdElem.ok() ? oldIdElem.toString() : + newIdElem.toString()) + << " , ...}, the (immutable) field '" << current.dottedField() + << "' was found to have been altered to " + << newElem.toString()); + } + } + } + + return Status::OK(); + } + + Status ensureIdAndFirst(mb::Document& doc) { + mb::Element idElem = mb::findFirstChildNamed(doc.root(), idFieldName); + + // Move _id as first element if it exists + if (idElem.ok()) { + if (idElem.leftSibling().ok()) { + Status s = idElem.remove(); + if (!s.isOK()) + return s; + s = doc.root().pushFront(idElem); + if (!s.isOK()) + return s; + } + } + else { + // Create _id if the document does not currently have one. + idElem = doc.makeElementNewOID(idFieldName); + if (!idElem.ok()) + return Status(ErrorCodes::BadValue, + "Could not create new _id ObjectId element.", + 17268); + Status s = doc.root().pushFront(idElem); + if (!s.isOK()) + return s; + } + + return Status::OK(); + + } + } // namespace + + // static + const char* UpdateStage::kStageType = "UPDATE"; + + UpdateStage::UpdateStage(const UpdateStageParams& params, + WorkingSet* ws, + Database* db, + PlanStage* child) + : _params(params), + _ws(ws), + _db(db), + _child(child), + _commonStats(kStageType), + _updatedLocs(params.request->isMulti() ? new DiskLocSet() : NULL), + _doc(params.driver->getDocument()) { + // We are an update until we fall into the insert case. + params.driver->setContext(ModifierInterface::ExecInfo::UPDATE_CONTEXT); + + _collection = db->getCollection(params.request->getOpCtx(), + params.request->getNamespaceString().ns()); + } + + void UpdateStage::transformAndUpdate(BSONObj& oldObj, DiskLoc& loc) { + const UpdateRequest* request = _params.request; + UpdateDriver* driver = _params.driver; + CanonicalQuery* cq = _params.canonicalQuery; + UpdateLifecycle* lifecycle = request->getLifecycle(); + + // Ask the driver to apply the mods. It may be that the driver can apply those "in + // place", that is, some values of the old document just get adjusted without any + // change to the binary layout on the bson layer. It may be that a whole new + // document is needed to accomodate the new bson layout of the resulting document. + _doc.reset(oldObj, mutablebson::Document::kInPlaceEnabled); + BSONObj logObj; + + FieldRefSet updatedFields; + + Status status = Status::OK(); + if (!driver->needMatchDetails()) { + // If we don't need match details, avoid doing the rematch + status = driver->update(StringData(), &_doc, &logObj, &updatedFields); + } + else { + // If there was a matched field, obtain it. + MatchDetails matchDetails; + matchDetails.requestElemMatchKey(); + + dassert(cq); + verify(cq->root()->matchesBSON(oldObj, &matchDetails)); + + string matchedField; + if (matchDetails.hasElemMatchKey()) + matchedField = matchDetails.elemMatchKey(); + + // TODO: Right now, each mod checks in 'prepare' that if it needs positional + // data, that a non-empty StringData() was provided. In principle, we could do + // that check here in an else clause to the above conditional and remove the + // checks from the mods. + + status = driver->update(matchedField, &_doc, &logObj, &updatedFields); + } + + if (!status.isOK()) { + uasserted(16837, status.reason()); + } + + // Ensure _id exists and is first + uassertStatusOK(ensureIdAndFirst(_doc)); + + // If the driver applied the mods in place, we can ask the mutable for what + // changed. We call those changes "damages". :) We use the damages to inform the + // journal what was changed, and then apply them to the original document + // ourselves. If, however, the driver applied the mods out of place, we ask it to + // generate a new, modified document for us. In that case, the file manager will + // take care of the journaling details for us. + // + // This code flow is admittedly odd. But, right now, journaling is baked in the file + // manager. And if we aren't using the file manager, we have to do jounaling + // ourselves. + bool docWasModified = false; + BSONObj newObj; + const char* source = NULL; + bool inPlace = _doc.getInPlaceUpdates(&_damages, &source); + + // If something changed in the document, verify that no immutable fields were changed + // and data is valid for storage. + if ((!inPlace || !_damages.empty()) ) { + if (!(request->isFromReplication() || request->isFromMigration())) { + const std::vector<FieldRef*>* immutableFields = NULL; + if (lifecycle) + immutableFields = lifecycle->getImmutableFields(); + + uassertStatusOK(validate(oldObj, + updatedFields, + _doc, + immutableFields, + driver->modOptions()) ); + } + } + + WriteUnitOfWork wunit(request->getOpCtx()->recoveryUnit()); + + // Save state before making changes + saveState(); + + if (inPlace && !driver->modsAffectIndices()) { + // If a set of modifiers were all no-ops, we are still 'in place', but there is + // no work to do, in which case we want to consider the object unchanged. + if (!_damages.empty() ) { + _collection->updateDocumentWithDamages(request->getOpCtx(), loc, source, _damages); + docWasModified = true; + _specificStats.fastmod = true; + } + + newObj = oldObj; + } + else { + // The updates were not in place. Apply them through the file manager. + + // XXX: With experimental document-level locking, we do not hold the sufficient + // locks, so this would cause corruption. + fassert(18516, !useExperimentalDocLocking); + + newObj = _doc.getObject(); + uassert(17419, + str::stream() << "Resulting document after update is larger than " + << BSONObjMaxUserSize, + newObj.objsize() <= BSONObjMaxUserSize); + StatusWith<DiskLoc> res = _collection->updateDocument(request->getOpCtx(), + loc, + newObj, + true, + _params.opDebug); + uassertStatusOK(res.getStatus()); + DiskLoc newLoc = res.getValue(); + docWasModified = true; + + // If the document moved, we might see it again in a collection scan (maybe it's + // a document after our current document). + // + // If the document is indexed and the mod changes an indexed value, we might see it + // again. For an example, see the comment above near declaration of updatedLocs. + if (_updatedLocs && (newLoc != loc || driver->modsAffectIndices())) { + _updatedLocs->insert(newLoc); + } + } + + // Restore state after modification + restoreState(request->getOpCtx()); + + // Call logOp if requested. + if (request->shouldCallLogOp() && !logObj.isEmpty()) { + BSONObj idQuery = driver->makeOplogEntryQuery(newObj, request->isMulti()); + repl::logOp(request->getOpCtx(), + "u", + request->getNamespaceString().ns().c_str(), + logObj, + &idQuery, + NULL, + request->isFromMigration()); + } + + wunit.commit(); + + // Only record doc modifications if they wrote (exclude no-ops) + if (docWasModified) { + _specificStats.nModified++; + } + + // Opportunity for journaling to write during the update. + request->getOpCtx()->recoveryUnit()->commitIfNeeded(); + } + + void UpdateStage::doInsert() { + _specificStats.inserted = true; + + const UpdateRequest* request = _params.request; + UpdateDriver* driver = _params.driver; + CanonicalQuery* cq = _params.canonicalQuery; + UpdateLifecycle* lifecycle = request->getLifecycle(); + + // Since this is an insert (no docs found and upsert:true), we will be logging it + // as an insert in the oplog. We don't need the driver's help to build the + // oplog record, then. We also set the context of the update driver to the INSERT_CONTEXT. + // Some mods may only work in that context (e.g. $setOnInsert). + driver->setLogOp(false); + driver->setContext(ModifierInterface::ExecInfo::INSERT_CONTEXT); + + // Reset the document we will be writing to + _doc.reset(); + + // This remains the empty object in the case of an object replacement, but in the case + // of an upsert where we are creating a base object from the query and applying mods, + // we capture the query as the original so that we can detect immutable field mutations. + BSONObj original = BSONObj(); + + // Calling populateDocumentWithQueryFields will populate the '_doc' with fields from the + // query which creates the base of the update for the inserted doc (because upsert + // was true). + if (cq) { + uassertStatusOK(driver->populateDocumentWithQueryFields(cq, _doc)); + if (!driver->isDocReplacement()) { + _specificStats.fastmodinsert = true; + // We need all the fields from the query to compare against for validation below. + original = _doc.getObject(); + } + else { + original = request->getQuery(); + } + } + else { + fassert(17354, CanonicalQuery::isSimpleIdQuery(request->getQuery())); + BSONElement idElt = request->getQuery()[idFieldName]; + original = idElt.wrap(); + fassert(17352, _doc.root().appendElement(idElt)); + } + + // Apply the update modifications and then log the update as an insert manually. + Status status = driver->update(StringData(), &_doc); + if (!status.isOK()) { + uasserted(16836, status.reason()); + } + + // Ensure _id exists and is first + uassertStatusOK(ensureIdAndFirst(_doc)); + + // Validate that the object replacement or modifiers resulted in a document + // that contains all the immutable keys and can be stored if it isn't coming + // from a migration or via replication. + if (!(request->isFromReplication() || request->isFromMigration())){ + const std::vector<FieldRef*>* immutableFields = NULL; + if (lifecycle) + immutableFields = lifecycle->getImmutableFields(); + + FieldRefSet noFields; + // This will only validate the modified fields if not a replacement. + uassertStatusOK(validate(original, + noFields, + _doc, + immutableFields, + driver->modOptions()) ); + } + + // Insert the doc + BSONObj newObj = _doc.getObject(); + uassert(17420, + str::stream() << "Document to upsert is larger than " << BSONObjMaxUserSize, + newObj.objsize() <= BSONObjMaxUserSize); + + WriteUnitOfWork wunit(request->getOpCtx()->recoveryUnit()); + // Only create the collection if the doc will be inserted. + if (!_collection) { + _collection = _db->getCollection(request->getOpCtx(), + request->getNamespaceString().ns()); + if (!_collection) { + _collection = _db->createCollection(request->getOpCtx(), + request->getNamespaceString().ns()); + } + } + + + StatusWith<DiskLoc> newLoc = _collection->insertDocument(request->getOpCtx(), + newObj, + !request->isGod()/*enforceQuota*/); + uassertStatusOK(newLoc.getStatus()); + if (request->shouldCallLogOp()) { + repl::logOp(request->getOpCtx(), + "i", + request->getNamespaceString().ns().c_str(), + newObj, + NULL, + NULL, + request->isFromMigration()); + } + + wunit.commit(); + + _specificStats.objInserted = newObj; + } + + bool UpdateStage::doneUpdating() { + // We're done updating if either the child has no more results to give us, or we've + // already gotten a result back and we're not a multi-update. + return _child->isEOF() || (_specificStats.nMatched > 0 && !_params.request->isMulti()); + } + + bool UpdateStage::needInsert() { + // We need to insert if + // 1) we haven't inserted already, + // 2) the child stage returned zero matches, and + // 3) the user asked for an upsert. + return !_specificStats.inserted + && _specificStats.nMatched == 0 + && _params.request->isUpsert(); + } + + bool UpdateStage::isEOF() { + return doneUpdating() && !needInsert(); + } + + PlanStage::StageState UpdateStage::work(WorkingSetID* out) { + ++_commonStats.works; + + // Adds the amount of time taken by work() to executionTimeMillis. + ScopedTimer timer(&_commonStats.executionTimeMillis); + + if (isEOF()) { return PlanStage::IS_EOF; } + + if (doneUpdating()) { + // Even if we're done updating, we may have some inserting left to do. + if (needInsert()) { + doInsert(); + } + + // At this point either we're done updating and there was no insert to do, + // or we're done updating and we're done inserting. Either way, we're EOF. + invariant(isEOF()); + return PlanStage::IS_EOF; + } + + // If we're here, then we still have to ask for results from the child and apply + // updates to them. We should only get here if the collection exists. + invariant(_collection); + + WorkingSetID id = WorkingSet::INVALID_ID; + StageState status = _child->work(&id); + + if (PlanStage::ADVANCED == status) { + // Need to get these things from the result returned by the child. + DiskLoc loc; + BSONObj oldObj; + + WorkingSetMember* member = _ws->get(id); + + if (!member->hasLoc()) { + _ws->free(id); + const std::string errmsg = "update stage failed to read member w/ loc from child"; + *out = WorkingSetCommon::allocateStatusMember(_ws, Status(ErrorCodes::InternalError, + errmsg)); + return PlanStage::FAILURE; + } + loc = member->loc; + + // Updates can't have projections. This means that covering analysis will always add + // a fetch. We should always get fetched data, and never just key data. + invariant(member->hasObj()); + oldObj = member->obj; + + // If we're here, then we have retrieved both a DiskLoc and the corresponding + // unowned object from the child stage. Since we have the object and the diskloc, + // we can free the WSM. + _ws->free(id); + + // We fill this with the new locs of moved doc so we don't double-update. + if (_updatedLocs && _updatedLocs->count(loc) > 0) { + // Found a loc that we already updated. + ++_commonStats.needTime; + return PlanStage::NEED_TIME; + } + + ++_specificStats.nMatched; + + // Do the update and return. + transformAndUpdate(oldObj, loc); + ++_commonStats.needTime; + return PlanStage::NEED_TIME; + } + else if (PlanStage::IS_EOF == status) { + // The child is out of results, but we might not be done yet because we still might + // have to do an insert. + ++_commonStats.needTime; + return PlanStage::NEED_TIME; + } + else if (PlanStage::FAILURE == status) { + *out = id; + // If a stage fails, it may create a status WSM to indicate why it failed, in which case + // 'id' is valid. If ID is invalid, we create our own error message. + if (WorkingSet::INVALID_ID == id) { + const std::string errmsg = "delete stage failed to read in results from child"; + *out = WorkingSetCommon::allocateStatusMember(_ws, Status(ErrorCodes::InternalError, + errmsg)); + return PlanStage::FAILURE; + } + return status; + } + else { + if (PlanStage::NEED_TIME == status) { + ++_commonStats.needTime; + } + return status; + } + } + + void UpdateStage::saveState() { + ++_commonStats.yields; + _child->saveState(); + } + + void UpdateStage::restoreState(OperationContext* opCtx) { + ++_commonStats.unyields; + _child->restoreState(opCtx); + } + + void UpdateStage::invalidate(const DiskLoc& dl, InvalidationType type) { + ++_commonStats.invalidates; + _child->invalidate(dl, type); + } + + vector<PlanStage*> UpdateStage::getChildren() const { + vector<PlanStage*> children; + children.push_back(_child.get()); + return children; + } + + PlanStageStats* UpdateStage::getStats() { + _commonStats.isEOF = isEOF(); + auto_ptr<PlanStageStats> ret(new PlanStageStats(_commonStats, STAGE_UPDATE)); + ret->specific.reset(new UpdateStats(_specificStats)); + ret->children.push_back(_child->getStats()); + return ret.release(); + } + + const CommonStats* UpdateStage::getCommonStats() { + return &_commonStats; + } + + const SpecificStats* UpdateStage::getSpecificStats() { + return &_specificStats; + } + +} // namespace mongo diff --git a/src/mongo/db/exec/update.h b/src/mongo/db/exec/update.h new file mode 100644 index 00000000000..862074e4001 --- /dev/null +++ b/src/mongo/db/exec/update.h @@ -0,0 +1,164 @@ +/** + * Copyright (C) 2014 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/catalog/database.h" +#include "mongo/db/exec/plan_stage.h" +#include "mongo/db/jsobj.h" +#include "mongo/db/ops/update_driver.h" +#include "mongo/db/ops/update_request.h" + +namespace mongo { + + class OperationContext; + + struct UpdateStageParams { + + UpdateStageParams(const UpdateRequest* r, + UpdateDriver* d, + OpDebug* o) + : request(r), + driver(d), + opDebug(o), + canonicalQuery(NULL) { } + + // Contains update parameters like whether it's a multi update or an upsert. Not owned. + // Must outlive the UpdateStage. + const UpdateRequest* request; + + // Contains the logic for applying mods to documents. Not owned. Must outlive + // the UpdateStage. + UpdateDriver* driver; + + // Needed to pass to Collection::updateDocument(...). + OpDebug* opDebug; + + // Not owned here. + CanonicalQuery* canonicalQuery; + + private: + // Default constructor not allowed. + UpdateStageParams(); + }; + + /** + * Execution stage responsible for updates to documents and upserts. NEED_TIME is returned + * after performing an update or an insert. + * + * Callers of work() must be holding a write lock. + */ + class UpdateStage : public PlanStage { + MONGO_DISALLOW_COPYING(UpdateStage); + public: + UpdateStage(const UpdateStageParams& params, + WorkingSet* ws, + Database* db, + PlanStage* child); + + virtual bool isEOF(); + virtual StageState work(WorkingSetID* out); + + virtual void saveState(); + virtual void restoreState(OperationContext* opCtx); + virtual void invalidate(const DiskLoc& dl, InvalidationType type); + + virtual std::vector<PlanStage*> getChildren() const; + + virtual StageType stageType() const { return STAGE_UPDATE; } + + virtual PlanStageStats* getStats(); + + virtual const CommonStats* getCommonStats(); + + virtual const SpecificStats* getSpecificStats(); + + static const char* kStageType; + + private: + /** + * Computes the result of applying mods to the document 'oldObj' at DiskLoc 'loc' in + * memory, then commits these changes to the database. + */ + void transformAndUpdate(BSONObj& oldObj, DiskLoc& loc); + + /** + * Computes the document to insert and inserts it into the collection. Used if the + * user requested an upsert and no matching documents were found. + */ + void doInsert(); + + /** + * Have we performed all necessary updates? Even if this is true, we might not be EOF, + * as we might still have to do an insert. + */ + bool doneUpdating(); + + /** + * Examines the stats / update request and returns whether there is still an insert left + * to do. If so then this stage is not EOF yet. + */ + bool needInsert(); + + UpdateStageParams _params; + + // Not owned by us. + WorkingSet* _ws; + + // Not owned by us. + Database* _db; + Collection* _collection; + + // Owned by us. + scoped_ptr<PlanStage> _child; + + // Stats + CommonStats _commonStats; + UpdateStats _specificStats; + + // If the update was in-place, we may see it again. This only matters if we're doing + // a multi-update; if we're not doing a multi-update we stop after one update and we + // won't see any more docs. + // + // For example: If we're scanning an index {x:1} and performing {$inc:{x:5}}, we'll keep + // moving the document forward and it will continue to reappear in our index scan. + // Unless the index is multikey, the underlying query machinery won't de-dup. + // + // If the update wasn't in-place we may see it again. Our query may return the new + // document and we wouldn't want to update that. + // + // So, no matter what, we keep track of where the doc wound up. + typedef unordered_set<DiskLoc, DiskLoc::Hasher> DiskLocSet; + const boost::scoped_ptr<DiskLocSet> _updatedLocs; + + // These get reused for each update. + mutablebson::Document& _doc; + mutablebson::DamageVector _damages; + }; + +} // namespace mongo diff --git a/src/mongo/db/ops/update.cpp b/src/mongo/db/ops/update.cpp index 921f7fd02e6..f1af1023d86 100644 --- a/src/mongo/db/ops/update.cpp +++ b/src/mongo/db/ops/update.cpp @@ -32,39 +32,26 @@ #include "mongo/db/ops/update.h" -#include <cstring> // for memcpy - -#include "mongo/bson/mutable/algorithm.h" -#include "mongo/bson/mutable/document.h" #include "mongo/client/dbclientinterface.h" #include "mongo/db/catalog/collection.h" #include "mongo/db/catalog/database_holder.h" #include "mongo/db/clientcursor.h" +#include "mongo/db/exec/update.h" #include "mongo/db/operation_context_impl.h" #include "mongo/db/ops/update_driver.h" #include "mongo/db/ops/update_executor.h" #include "mongo/db/ops/update_lifecycle.h" #include "mongo/db/query/explain.h" #include "mongo/db/query/get_executor.h" -#include "mongo/db/query/lite_parsed_query.h" -#include "mongo/db/query/query_planner_common.h" -#include "mongo/db/repl/oplog.h" -#include "mongo/db/repl/repl_coordinator_global.h" #include "mongo/db/update_index_data.h" -#include "mongo/platform/unordered_set.h" #include "mongo/util/log.h" namespace mongo { MONGO_LOG_DEFAULT_COMPONENT_FILE(::mongo::logger::LogComponent::kQuery); - namespace mb = mutablebson; - namespace { - const char idFieldName[] = "_id"; - const FieldRef idFieldRef(idFieldName); - // TODO: Make this a function on NamespaceString, or make it cleaner. inline void validateUpdate(const char* ns , const BSONObj& updateobj, @@ -80,345 +67,6 @@ namespace mongo { } } - Status storageValid(const mb::Document&, const bool); - Status storageValid(const mb::ConstElement&, const bool); - Status storageValidChildren(const mb::ConstElement&, const bool); - - /** - * mutable::document storageValid check -- like BSONObj::_okForStorage - */ - Status storageValid(const mb::Document& doc, const bool deep = true) { - mb::ConstElement currElem = doc.root().leftChild(); - while (currElem.ok()) { - if (currElem.getFieldName() == idFieldName) { - switch (currElem.getType()) { - case RegEx: - case Array: - case Undefined: - return Status(ErrorCodes::InvalidIdField, - str::stream() << "The '_id' value cannot be of type " - << typeName(currElem.getType())); - default: - break; - } - } - Status s = storageValid(currElem, deep); - if (!s.isOK()) - return s; - currElem = currElem.rightSibling(); - } - - return Status::OK(); - } - - /** - * Validates an element that has a field name which starts with a dollar sign ($). - * In the case of a DBRef field ($id, $ref, [$db]) these fields may be valid in - * the correct order/context only. - */ - Status validateDollarPrefixElement(const mb::ConstElement elem, const bool deep) { - mb::ConstElement curr = elem; - StringData currName = elem.getFieldName(); - - // Found a $db field - if (currName == "$db") { - if (curr.getType() != String) { - return Status(ErrorCodes::InvalidDBRef, - str::stream() << "The DBRef $db field must be a String, not a " - << typeName(curr.getType())); - } - curr = curr.leftSibling(); - - if (!curr.ok() || (curr.getFieldName() != "$id")) - return Status(ErrorCodes::InvalidDBRef, - "Found $db field without a $id before it, which is invalid."); - - currName = curr.getFieldName(); - } - - // Found a $id field - if (currName == "$id") { - Status s = storageValidChildren(curr, deep); - if (!s.isOK()) - return s; - - curr = curr.leftSibling(); - if (!curr.ok() || (curr.getFieldName() != "$ref")) { - return Status(ErrorCodes::InvalidDBRef, - "Found $id field without a $ref before it, which is invalid."); - } - - currName = curr.getFieldName(); - } - - if (currName == "$ref") { - if (curr.getType() != String) { - return Status(ErrorCodes::InvalidDBRef, - str::stream() << "The DBRef $ref field must be a String, not a " - << typeName(curr.getType())); - } - - if (!curr.rightSibling().ok() || curr.rightSibling().getFieldName() != "$id") - return Status(ErrorCodes::InvalidDBRef, - str::stream() << "The DBRef $ref field must be " - "following by a $id field"); - } - else { - // not an okay, $ prefixed field name. - return Status(ErrorCodes::DollarPrefixedFieldName, - str::stream() << "The dollar ($) prefixed field '" - << elem.getFieldName() << "' in '" - << mb::getFullName(elem) - << "' is not valid for storage."); - - } - - return Status::OK(); - } - - Status storageValid(const mb::ConstElement& elem, const bool deep = true) { - if (!elem.ok()) - return Status(ErrorCodes::BadValue, "Invalid elements cannot be stored."); - - // Field names of elements inside arrays are not meaningful in mutable bson, - // so we do not want to validate them. - // - // TODO: Revisit how mutable handles array field names. We going to need to make - // this better if we ever want to support ordered updates that can alter the same - // element repeatedly; see SERVER-12848. - const bool childOfArray = elem.parent().ok() ? - (elem.parent().getType() == mongo::Array) : false; - - if (!childOfArray) { - StringData fieldName = elem.getFieldName(); - // Cannot start with "$", unless dbref - if (fieldName[0] == '$') { - Status status = validateDollarPrefixElement(elem, deep); - if (!status.isOK()) - return status; - } - else if (fieldName.find(".") != string::npos) { - // Field name cannot have a "." in it. - return Status(ErrorCodes::DottedFieldName, - str::stream() << "The dotted field '" - << elem.getFieldName() << "' in '" - << mb::getFullName(elem) - << "' is not valid for storage."); - } - } - - // Check children if there are any. - Status s = storageValidChildren(elem, deep); - if (!s.isOK()) - return s; - - return Status::OK(); - } - - Status storageValidChildren(const mb::ConstElement& elem, const bool deep) { - if (!elem.hasChildren()) - return Status::OK(); - - mb::ConstElement curr = elem.leftChild(); - while (curr.ok()) { - Status s = storageValid(curr, deep); - if (!s.isOK()) - return s; - curr = curr.rightSibling(); - } - - return Status::OK(); - } - - /** - * This will verify that all updated fields are - * 1.) Valid for storage (checking parent to make sure things like DBRefs are valid) - * 2.) Compare updated immutable fields do not change values - * - * If updateFields is empty then it was replacement and/or we need to check all fields - */ - inline Status validate(const BSONObj& original, - const FieldRefSet& updatedFields, - const mb::Document& updated, - const std::vector<FieldRef*>* immutableAndSingleValueFields, - const ModifierInterface::Options& opts) { - - LOG(3) << "update validate options -- " - << " updatedFields: " << updatedFields - << " immutableAndSingleValueFields.size:" - << (immutableAndSingleValueFields ? immutableAndSingleValueFields->size() : 0) - << " fromRepl: " << opts.fromReplication - << " validate:" << opts.enforceOkForStorage; - - // 1.) Loop through each updated field and validate for storage - // and detect immutable field updates - - // The set of possibly changed immutable fields -- we will need to check their vals - FieldRefSet changedImmutableFields; - - // Check to see if there were no fields specified or if we are not validating - // The case if a range query, or query that didn't result in saved fields - if (updatedFields.empty() || !opts.enforceOkForStorage) { - if (opts.enforceOkForStorage) { - // No specific fields were updated so the whole doc must be checked - Status s = storageValid(updated, true); - if (!s.isOK()) - return s; - } - - // Check all immutable fields - if (immutableAndSingleValueFields) - changedImmutableFields.fillFrom(*immutableAndSingleValueFields); - } - else { - - // TODO: Change impl so we don't need to create a new FieldRefSet - // -- move all conflict logic into static function on FieldRefSet? - FieldRefSet immutableFieldRef; - if (immutableAndSingleValueFields) - immutableFieldRef.fillFrom(*immutableAndSingleValueFields); - - FieldRefSet::const_iterator where = updatedFields.begin(); - const FieldRefSet::const_iterator end = updatedFields.end(); - for( ; where != end; ++where) { - const FieldRef& current = **where; - - // Find the updated field in the updated document. - mutablebson::ConstElement newElem = updated.root(); - size_t currentPart = 0; - while (newElem.ok() && currentPart < current.numParts()) - newElem = newElem[current.getPart(currentPart++)]; - - // newElem might be missing if $unset/$renamed-away - if (newElem.ok()) { - Status s = storageValid(newElem, true); - if (!s.isOK()) - return s; - } - // Check if the updated field conflicts with immutable fields - immutableFieldRef.findConflicts(¤t, &changedImmutableFields); - } - } - - const bool checkIdField = (updatedFields.empty() && !original.isEmpty()) || - updatedFields.findConflicts(&idFieldRef, NULL); - - // Add _id to fields to check since it too is immutable - if (checkIdField) - changedImmutableFields.keepShortest(&idFieldRef); - else if (changedImmutableFields.empty()) { - // Return early if nothing changed which is immutable - return Status::OK(); - } - - LOG(4) << "Changed immutable fields: " << changedImmutableFields; - // 2.) Now compare values of the changed immutable fields (to make sure they haven't) - - const mutablebson::ConstElement newIdElem = updated.root()[idFieldName]; - - FieldRefSet::const_iterator where = changedImmutableFields.begin(); - const FieldRefSet::const_iterator end = changedImmutableFields.end(); - for( ; where != end; ++where ) { - const FieldRef& current = **where; - - // Find the updated field in the updated document. - mutablebson::ConstElement newElem = updated.root(); - size_t currentPart = 0; - while (newElem.ok() && currentPart < current.numParts()) - newElem = newElem[current.getPart(currentPart++)]; - - if (!newElem.ok()) { - if (original.isEmpty()) { - // If the _id is missing and not required, then skip this check - if (!(current.dottedField() == idFieldName)) - return Status(ErrorCodes::NoSuchKey, - mongoutils::str::stream() - << "After applying the update, the new" - << " document was missing the '" - << current.dottedField() - << "' (required and immutable) field."); - - } - else { - if (current.dottedField() != idFieldName) - return Status(ErrorCodes::ImmutableField, - mongoutils::str::stream() - << "After applying the update to the document with " - << newIdElem.toString() - << ", the '" << current.dottedField() - << "' (required and immutable) field was " - "found to have been removed --" - << original); - } - } - else { - - // Find the potentially affected field in the original document. - const BSONElement oldElem = original.getFieldDotted(current.dottedField()); - const BSONElement oldIdElem = original.getField(idFieldName); - - // Ensure no arrays since neither _id nor shard keys can be in an array, or one. - mb::ConstElement currElem = newElem; - while (currElem.ok()) { - if (currElem.getType() == Array) { - return Status(ErrorCodes::NotSingleValueField, - mongoutils::str::stream() - << "After applying the update to the document {" - << (oldIdElem.ok() ? oldIdElem.toString() : - newIdElem.toString()) - << " , ...}, the (immutable) field '" - << current.dottedField() - << "' was found to be an array or array descendant."); - } - currElem = currElem.parent(); - } - - // If we have both (old and new), compare them. If we just have new we are good - if (oldElem.ok() && newElem.compareWithBSONElement(oldElem, false) != 0) { - return Status(ErrorCodes::ImmutableField, - mongoutils::str::stream() - << "After applying the update to the document {" - << (oldIdElem.ok() ? oldIdElem.toString() : - newIdElem.toString()) - << " , ...}, the (immutable) field '" << current.dottedField() - << "' was found to have been altered to " - << newElem.toString()); - } - } - } - - return Status::OK(); - } - - Status ensureIdAndFirst(mb::Document& doc) { - mb::Element idElem = mb::findFirstChildNamed(doc.root(), idFieldName); - - // Move _id as first element if it exists - if (idElem.ok()) { - if (idElem.leftSibling().ok()) { - Status s = idElem.remove(); - if (!s.isOK()) - return s; - s = doc.root().pushFront(idElem); - if (!s.isOK()) - return s; - } - } - else { - // Create _id if the document does not currently have one. - idElem = doc.makeElementNewOID(idFieldName); - if (!idElem.ok()) - return Status(ErrorCodes::BadValue, - "Could not create new _id ObjectId element.", - 17268); - Status s = doc.root().pushFront(idElem); - if (!s.isOK()) - return s; - } - - return Status::OK(); - - } } // namespace UpdateResult update(Database* db, @@ -455,245 +103,53 @@ namespace mongo { } PlanExecutor* rawExec; - Status status = cq ? - getExecutor(request.getOpCtx(), collection, cqHolder.release(), &rawExec) : - getExecutor(request.getOpCtx(), collection, nsString.ns(), request.getQuery(), - &rawExec); + Status status = Status::OK(); + if (cq) { + // This is the regular path for when we have a CanonicalQuery. + status = getExecutorUpdate(request.getOpCtx(), db, cqHolder.release(), &request, driver, + opDebug, &rawExec); + } + else { + // This is the idhack fast-path for getting a PlanExecutor without doing the work + // to create a CanonicalQuery. + status = getExecutorUpdate(request.getOpCtx(), db, nsString.ns(), &request, driver, + opDebug, &rawExec); + } uassert(17243, "could not get executor" + request.getQuery().toString() + "; " + causedBy(status), status.isOK()); // Create the plan executor and setup all deps. - auto_ptr<PlanExecutor> exec(rawExec); + scoped_ptr<PlanExecutor> exec(rawExec); // Register executor with the collection cursor cache. const ScopedExecutorRegistration safety(exec.get()); - // Get the canonical query which the underlying executor is using. This may be NULL in - // the case of idhack updates. - cq = exec->getCanonicalQuery(); - - // - // We'll start assuming we have one or more documents for this update. (Otherwise, - // we'll fall-back to insert case (if upsert is true).) - // - - // We are an update until we fall into the insert case below. - driver->setContext(ModifierInterface::ExecInfo::UPDATE_CONTEXT); - - int numMatched = 0; - - // If the update was in-place, we may see it again. This only matters if we're doing - // a multi-update; if we're not doing a multi-update we stop after one update and we - // won't see any more docs. - // - // For example: If we're scanning an index {x:1} and performing {$inc:{x:5}}, we'll keep - // moving the document forward and it will continue to reappear in our index scan. - // Unless the index is multikey, the underlying query machinery won't de-dup. - // - // If the update wasn't in-place we may see it again. Our query may return the new - // document and we wouldn't want to update that. - // - // So, no matter what, we keep track of where the doc wound up. - typedef unordered_set<DiskLoc, DiskLoc::Hasher> DiskLocSet; - const scoped_ptr<DiskLocSet> updatedLocs(request.isMulti() ? new DiskLocSet : NULL); - - // Reset these counters on each call. We might re-enter this function to retry this - // update if we throw a page fault exception below, and we rely on these counters - // reflecting only the actions taken locally. In particlar, we must have the no-op - // counter reset so that we can meaningfully comapre it with numMatched above. - opDebug->nModified = 0; - - // -1 for these fields means that we don't have a value. Once the update completes - // we request these values from the plan executor. - opDebug->nscanned = -1; - opDebug->nscannedObjects = -1; - - // Get the cached document from the update driver. - mutablebson::Document& doc = driver->getDocument(); - mutablebson::DamageVector damages; - - // Used during iteration of docs - BSONObj oldObj; - - // Get first doc, and location - PlanExecutor::ExecState state = PlanExecutor::ADVANCED; - - uassert(ErrorCodes::NotMaster, - mongoutils::str::stream() << "Not primary while updating " << nsString.ns(), - !request.shouldCallLogOp() - || repl::getGlobalReplicationCoordinator()->canAcceptWritesForDatabase( - nsString.db())); - - while (true) { - // Get next doc, and location - DiskLoc loc; - state = exec->getNext(&oldObj, &loc); - - if (state != PlanExecutor::ADVANCED) { - if (state == PlanExecutor::IS_EOF) { - // We have reached the logical end of the loop, so do yielding recovery - break; - } - else { - uassertStatusOK(Status(ErrorCodes::InternalError, - str::stream() << " Update query failed -- " - << PlanExecutor::statestr(state))); - } - } - - // We fill this with the new locs of moved doc so we don't double-update. - if (updatedLocs && updatedLocs->count(loc) > 0) { - continue; - } - - numMatched++; - - // Ask the driver to apply the mods. It may be that the driver can apply those "in - // place", that is, some values of the old document just get adjusted without any - // change to the binary layout on the bson layer. It may be that a whole new - // document is needed to accomodate the new bson layout of the resulting document. - doc.reset(oldObj, mutablebson::Document::kInPlaceEnabled); - BSONObj logObj; - - - FieldRefSet updatedFields; - - Status status = Status::OK(); - if (!driver->needMatchDetails()) { - // If we don't need match details, avoid doing the rematch - status = driver->update(StringData(), &doc, &logObj, &updatedFields); - } - else { - // If there was a matched field, obtain it. - MatchDetails matchDetails; - matchDetails.requestElemMatchKey(); - - dassert(cq); - verify(cq->root()->matchesBSON(oldObj, &matchDetails)); - - string matchedField; - if (matchDetails.hasElemMatchKey()) - matchedField = matchDetails.elemMatchKey(); - - // TODO: Right now, each mod checks in 'prepare' that if it needs positional - // data, that a non-empty StringData() was provided. In principle, we could do - // that check here in an else clause to the above conditional and remove the - // checks from the mods. - - status = driver->update(matchedField, &doc, &logObj, &updatedFields); - } + // Run the plan (don't need to collect results because UpdateStage always returns + // NEED_TIME). + uassertStatusOK(exec->executePlan()); - if (!status.isOK()) { - uasserted(16837, status.reason()); - } - - // Ensure _id exists and is first - uassertStatusOK(ensureIdAndFirst(doc)); - - // If the driver applied the mods in place, we can ask the mutable for what - // changed. We call those changes "damages". :) We use the damages to inform the - // journal what was changed, and then apply them to the original document - // ourselves. If, however, the driver applied the mods out of place, we ask it to - // generate a new, modified document for us. In that case, the file manager will - // take care of the journaling details for us. - // - // This code flow is admittedly odd. But, right now, journaling is baked in the file - // manager. And if we aren't using the file manager, we have to do jounaling - // ourselves. - bool docWasModified = false; - BSONObj newObj; - const char* source = NULL; - bool inPlace = doc.getInPlaceUpdates(&damages, &source); - - // If something changed in the document, verify that no immutable fields were changed - // and data is valid for storage. - if ((!inPlace || !damages.empty()) ) { - if (!(request.isFromReplication() || request.isFromMigration())) { - const std::vector<FieldRef*>* immutableFields = NULL; - if (lifecycle) - immutableFields = lifecycle->getImmutableFields(); - - uassertStatusOK(validate(oldObj, - updatedFields, - doc, - immutableFields, - driver->modOptions()) ); - } - } - - WriteUnitOfWork wunit(request.getOpCtx()->recoveryUnit()); - - // Save state before making changes - exec->saveState(); - - if (inPlace && !driver->modsAffectIndices()) { - // If a set of modifiers were all no-ops, we are still 'in place', but there is - // no work to do, in which case we want to consider the object unchanged. - if (!damages.empty() ) { - collection->updateDocumentWithDamages(request.getOpCtx(), loc, source, damages); - docWasModified = true; - opDebug->fastmod = true; - } - - newObj = oldObj; - } - else { - // The updates were not in place. Apply them through the file manager. - - // XXX: With experimental document-level locking, we do not hold the sufficient - // locks, so this would cause corruption. - fassert(18516, !useExperimentalDocLocking); - - newObj = doc.getObject(); - uassert(17419, - str::stream() << "Resulting document after update is larger than " - << BSONObjMaxUserSize, - newObj.objsize() <= BSONObjMaxUserSize); - StatusWith<DiskLoc> res = collection->updateDocument(request.getOpCtx(), - loc, - newObj, - true, - opDebug); - uassertStatusOK(res.getStatus()); - DiskLoc newLoc = res.getValue(); - docWasModified = true; - - // If the document moved, we might see it again in a collection scan (maybe it's - // a document after our current document). - // - // If the document is indexed and the mod changes an indexed value, we might see it - // again. For an example, see the comment above near declaration of updatedLocs. - if (updatedLocs && (newLoc != loc || driver->modsAffectIndices())) { - updatedLocs->insert(newLoc); - } - } - - // Restore state after modification - uassert(17278, - "Update could not restore plan executor state after updating a document.", - exec->restoreState(request.getOpCtx())); - - // Call logOp if requested. - if (request.shouldCallLogOp() && !logObj.isEmpty()) { - BSONObj idQuery = driver->makeOplogEntryQuery(newObj, request.isMulti()); - repl::logOp(request.getOpCtx(), "u", nsString.ns().c_str(), logObj, &idQuery, - NULL, request.isFromMigration()); - } + // Get stats from the root stage. + invariant(exec->getRootStage()->stageType() == STAGE_UPDATE); + UpdateStage* updateStage = static_cast<UpdateStage*>(exec->getRootStage()); + const UpdateStats* updateStats = + static_cast<const UpdateStats*>(updateStage->getSpecificStats()); - wunit.commit(); + // Use stats from the root stage to fill out opDebug. + opDebug->nMatched = updateStats->nMatched; + opDebug->nModified = updateStats->nModified; + opDebug->upsert = updateStats->inserted; + opDebug->fastmodinsert = updateStats->fastmodinsert; + opDebug->fastmod = updateStats->fastmod; - // Only record doc modifications if they wrote (exclude no-ops) - if (docWasModified) - opDebug->nModified++; - - if (!request.isMulti()) { - break; - } - - // Opportunity for journaling to write during the update. - request.getOpCtx()->recoveryUnit()->commitIfNeeded(); + // Historically, 'opDebug' considers 'nMatched' and 'nModified' to be 1 (rather than 0) if + // there is an upsert that inserts a document. The UpdateStage does not participate in this + // madness in order to have saner stats reporting for explain. This means that we have to + // set these values "manually" in the case of an insert. + if (updateStats->inserted) { + opDebug->nMatched = 1; + opDebug->nModified = 1; } // Get summary information about the plan. @@ -702,121 +158,11 @@ namespace mongo { opDebug->nscanned = stats.totalKeysExamined; opDebug->nscannedObjects = stats.totalDocsExamined; - // TODO: Can this be simplified? - if ((numMatched > 0) || (numMatched == 0 && !request.isUpsert()) ) { - opDebug->nMatched = numMatched; - return UpdateResult(numMatched > 0 /* updated existing object(s) */, - !driver->isDocReplacement() /* $mod or obj replacement */, - opDebug->nModified /* number of modified docs, no no-ops */, - numMatched /* # of docs matched/updated, even no-ops */, - BSONObj()); - } - - // - // We haven't found any existing document so an insert is done - // (upsert is true). - // - opDebug->upsert = true; - - // Since this is an insert (no docs found and upsert:true), we will be logging it - // as an insert in the oplog. We don't need the driver's help to build the - // oplog record, then. We also set the context of the update driver to the INSERT_CONTEXT. - // Some mods may only work in that context (e.g. $setOnInsert). - driver->setLogOp(false); - driver->setContext(ModifierInterface::ExecInfo::INSERT_CONTEXT); - - // Reset the document we will be writing to - doc.reset(); - - // This remains the empty object in the case of an object replacement, but in the case - // of an upsert where we are creating a base object from the query and applying mods, - // we capture the query as the original so that we can detect immutable field mutations. - BSONObj original = BSONObj(); - - // Calling createFromQuery will populate the 'doc' with fields from the query which - // creates the base of the update for the inserterd doc (because upsert was true) - if (cq) { - uassertStatusOK(driver->populateDocumentWithQueryFields(cq, doc)); - if (!driver->isDocReplacement()) { - opDebug->fastmodinsert = true; - // We need all the fields from the query to compare against for validation below. - original = doc.getObject(); - } - else { - original = request.getQuery(); - } - } - else { - fassert(17354, CanonicalQuery::isSimpleIdQuery(request.getQuery())); - BSONElement idElt = request.getQuery()["_id"]; - original = idElt.wrap(); - fassert(17352, doc.root().appendElement(idElt)); - } - - // Apply the update modifications and then log the update as an insert manually. - status = driver->update(StringData(), &doc); - if (!status.isOK()) { - uasserted(16836, status.reason()); - } - - // Ensure _id exists and is first - uassertStatusOK(ensureIdAndFirst(doc)); - - // Validate that the object replacement or modifiers resulted in a document - // that contains all the immutable keys and can be stored if it isn't coming - // from a migration or via replication. - if (!(request.isFromReplication() || request.isFromMigration())){ - const std::vector<FieldRef*>* immutableFields = NULL; - if (lifecycle) - immutableFields = lifecycle->getImmutableFields(); - - FieldRefSet noFields; - // This will only validate the modified fields if not a replacement. - uassertStatusOK(validate(original, - noFields, - doc, - immutableFields, - driver->modOptions()) ); - } - - // Insert the doc - BSONObj newObj = doc.getObject(); - uassert(17420, - str::stream() << "Document to upsert is larger than " << BSONObjMaxUserSize, - newObj.objsize() <= BSONObjMaxUserSize); - - WriteUnitOfWork wunit(request.getOpCtx()->recoveryUnit()); - // Only create the collection if the doc will be inserted. - if (!collection) { - collection = db->getCollection(request.getOpCtx(), request.getNamespaceString().ns()); - if (!collection) { - collection = db->createCollection(request.getOpCtx(), request.getNamespaceString().ns()); - } - } - - - StatusWith<DiskLoc> newLoc = collection->insertDocument(request.getOpCtx(), - newObj, - !request.isGod() /*enforceQuota*/); - uassertStatusOK(newLoc.getStatus()); - if (request.shouldCallLogOp()) { - repl::logOp(request.getOpCtx(), - "i", - nsString.ns().c_str(), - newObj, - NULL, - NULL, - request.isFromMigration()); - } - - wunit.commit(); - - opDebug->nMatched = 1; - return UpdateResult(false /* updated a non existing document */, - !driver->isDocReplacement() /* $mod or obj replacement? */, - 1 /* docs written*/, - 1 /* count of updated documents */, - newObj /* object that was upserted */ ); + return UpdateResult(updateStats->nMatched > 0 /* Did we update at least one obj? */, + !driver->isDocReplacement() /* $mod or obj replacement */, + opDebug->nModified /* number of modified docs, no no-ops */, + opDebug->nMatched /* # of docs matched/updated, even no-ops */, + updateStats->objInserted); } BSONObj applyUpdateOperators(const BSONObj& from, const BSONObj& operators) { diff --git a/src/mongo/db/query/get_executor.cpp b/src/mongo/db/query/get_executor.cpp index 7a347c203d8..b2a84a18eb0 100644 --- a/src/mongo/db/query/get_executor.cpp +++ b/src/mongo/db/query/get_executor.cpp @@ -42,6 +42,7 @@ #include "mongo/db/exec/projection.h" #include "mongo/db/exec/shard_filter.h" #include "mongo/db/exec/subplan.h" +#include "mongo/db/exec/update.h" #include "mongo/db/query/canonical_query.h" #include "mongo/db/query/explain.h" #include "mongo/db/query/query_settings.h" @@ -458,6 +459,10 @@ namespace mongo { return Status::OK(); } + // + // Delete + // + Status getExecutorDelete(OperationContext* txn, Collection* collection, CanonicalQuery* rawCanonicalQuery, @@ -529,6 +534,84 @@ namespace mongo { } // + // Update + // + + Status getExecutorUpdate(OperationContext* txn, + Database* db, + CanonicalQuery* rawCanonicalQuery, + const UpdateRequest* request, + UpdateDriver* driver, + OpDebug* opDebug, + PlanExecutor** execOut) { + auto_ptr<CanonicalQuery> canonicalQuery(rawCanonicalQuery); + auto_ptr<WorkingSet> ws(new WorkingSet()); + Collection* collection = db->getCollection(request->getOpCtx(), + request->getNamespaceString().ns()); + + PlanStage* root; + QuerySolution* querySolution; + Status status = prepareExecution(txn, collection, ws.get(), canonicalQuery.get(), 0, &root, + &querySolution); + if (!status.isOK()) { + return status; + } + invariant(root); + UpdateStageParams updateStageParams(request, driver, opDebug); + updateStageParams.canonicalQuery = rawCanonicalQuery; + root = new UpdateStage(updateStageParams, ws.get(), db, root); + // We must have a tree of stages in order to have a valid plan executor, but the query + // solution may be null. Takes ownership of all args other than 'collection'. + *execOut = new PlanExecutor(ws.release(), root, querySolution, canonicalQuery.release(), + collection); + return Status::OK(); + } + + Status getExecutorUpdate(OperationContext* txn, + Database* db, + const std::string& ns, + const UpdateRequest* request, + UpdateDriver* driver, + OpDebug* opDebug, + PlanExecutor** execOut) { + auto_ptr<WorkingSet> ws(new WorkingSet()); + Collection* collection = db->getCollection(request->getOpCtx(), + request->getNamespaceString().ns()); + const BSONObj& unparsedQuery = request->getQuery(); + + UpdateStageParams updateStageParams(request, driver, opDebug); + if (!collection) { + LOG(2) << "Collection " << ns << " does not exist." + << " Using EOF stage: " << unparsedQuery.toString(); + UpdateStage* updateStage = new UpdateStage(updateStageParams, ws.get(), db, + new EOFStage()); + *execOut = new PlanExecutor(ws.release(), updateStage, ns); + return Status::OK(); + } + + if (CanonicalQuery::isSimpleIdQuery(unparsedQuery) && + collection->getIndexCatalog()->findIdIndex()) { + LOG(2) << "Using idhack: " << unparsedQuery.toString(); + + PlanStage* idHackStage = new IDHackStage(txn, collection, unparsedQuery["_id"].wrap(), + ws.get()); + UpdateStage* root = new UpdateStage(updateStageParams, ws.get(), db, idHackStage); + *execOut = new PlanExecutor(ws.release(), root, collection); + return Status::OK(); + } + + const WhereCallbackReal whereCallback(txn, collection->ns().db()); + CanonicalQuery* cq; + Status status = CanonicalQuery::canonicalize(collection->ns(), unparsedQuery, &cq, + whereCallback); + if (!status.isOK()) + return status; + + // Takes ownership of 'cq'. + return getExecutorUpdate(txn, db, cq, request, driver, opDebug, execOut); + } + + // // Count hack // diff --git a/src/mongo/db/query/get_executor.h b/src/mongo/db/query/get_executor.h index 3e8cb781bd0..cb49007260f 100644 --- a/src/mongo/db/query/get_executor.h +++ b/src/mongo/db/query/get_executor.h @@ -31,10 +31,13 @@ #include "mongo/db/query/query_planner_params.h" #include "mongo/db/query/query_settings.h" #include "mongo/db/query/query_solution.h" +#include "mongo/db/ops/update_driver.h" +#include "mongo/db/ops/update_request.h" namespace mongo { class Collection; + class Database; /** * Filter indexes retrieved from index catalog by @@ -121,6 +124,10 @@ namespace mongo { const BSONObj& hintObj, PlanExecutor** execOut); + // + // Delete + // + /** * Get a PlanExecutor for a delete operation. 'rawCanonicalQuery' describes the predicate for * the documents to be deleted. A write lock is required to execute the returned plan. @@ -156,4 +163,46 @@ namespace mongo { bool shouldCallLogOp, PlanExecutor** execOut); + // + // Update + // + + /** + * Get a PlanExecutor for an update operation. 'rawCanonicalQuery' describes the predicate for + * the documents to be deleted. A write lock is required to execute the returned plan. + * + * Takes ownership of 'rawCanonicalQuery'. Does not take ownership of other args. + * + * If the query is valid and an executor could be created, returns Status::OK() and populates + * *out with the PlanExecutor. + * + * If the query cannot be executed, returns a Status indicating why. + */ + Status getExecutorUpdate(OperationContext* txn, + Database* db, + CanonicalQuery* rawCanonicalQuery, + const UpdateRequest* request, + UpdateDriver* driver, + OpDebug* opDebug, + PlanExecutor** execOut); + + /** + * Overload of getExecutorUpdate() above, for when a canonicalQuery is not available. Used to + * support idhack-powered updates. + * + * If the query is valid and an executor could be created, returns Status::OK() and populates + * *out with the PlanExecutor. + * + * Does not take ownership of its arguments. + * + * If the query cannot be executed, returns a Status indicating why. + */ + Status getExecutorUpdate(OperationContext* txn, + Database* db, + const std::string& ns, + const UpdateRequest* request, + UpdateDriver* driver, + OpDebug* opDebug, + PlanExecutor** execOut); + } // namespace mongo diff --git a/src/mongo/db/query/stage_types.h b/src/mongo/db/query/stage_types.h index 791145cce91..a2635ef8d22 100644 --- a/src/mongo/db/query/stage_types.h +++ b/src/mongo/db/query/stage_types.h @@ -88,6 +88,8 @@ namespace mongo { STAGE_SUBPLAN, STAGE_TEXT, STAGE_UNKNOWN, + + STAGE_UPDATE, }; } // namespace mongo diff --git a/src/mongo/dbtests/query_stage_update.cpp b/src/mongo/dbtests/query_stage_update.cpp new file mode 100644 index 00000000000..a8975e40d05 --- /dev/null +++ b/src/mongo/dbtests/query_stage_update.cpp @@ -0,0 +1,364 @@ +/** + * Copyright (C) 2014 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +/** + * This file tests db/exec/update.cpp (UpdateStage). + */ + +#include "mongo/db/catalog/collection.h" +#include "mongo/db/catalog/database.h" +#include "mongo/db/exec/collection_scan.h" +#include "mongo/db/exec/eof.h" +#include "mongo/db/exec/update.h" +#include "mongo/db/exec/working_set.h" +#include "mongo/db/jsobj.h" +#include "mongo/db/json.h" +#include "mongo/db/operation_context_impl.h" +#include "mongo/db/ops/update_driver.h" +#include "mongo/db/ops/update_lifecycle_impl.h" +#include "mongo/db/ops/update_request.h" +#include "mongo/db/query/canonical_query.h" +#include "mongo/dbtests/dbtests.h" + +namespace QueryStageUpdate { + + class QueryStageUpdateBase { + public: + QueryStageUpdateBase() + : _client(&_txn), + _ns("unittests.QueryStageUpdate"), + _nsString(StringData(ns())) { + Client::WriteContext ctx(&_txn, ns()); + _client.dropCollection(ns()); + ctx.commit(); + } + + virtual ~QueryStageUpdateBase() { + Client::WriteContext ctx(&_txn, ns()); + _client.dropCollection(ns()); + ctx.commit(); + } + + void insert(const BSONObj& doc) { + _client.insert(ns(), doc); + } + + void remove(const BSONObj& obj) { + _client.remove(ns(), obj); + } + + size_t count(const BSONObj& query) { + return _client.count(ns(), query, 0, 0, 0); + } + + CanonicalQuery* canonicalize(const BSONObj& query) { + CanonicalQuery* cq; + Status status = CanonicalQuery::canonicalize(ns(), query, &cq); + ASSERT_OK(status); + return cq; + } + + /** + * Runs the update operation by calling work until EOF. Asserts that + * the update stage always returns NEED_TIME. + */ + void runUpdate(UpdateStage* updateStage) { + WorkingSetID id = WorkingSet::INVALID_ID; + PlanStage::StageState state = PlanStage::NEED_TIME; + while (PlanStage::IS_EOF != state) { + ASSERT_EQUALS(PlanStage::NEED_TIME, state); + state = updateStage->work(&id); + } + } + + /** + * Returns a vector of all of the documents currently in 'collection'. + * + * Uses a forward collection scan stage to get the docs, and populates 'out' with + * the results. + */ + void getCollContents(Collection* collection, vector<BSONObj>* out) { + WorkingSet ws; + + CollectionScanParams params; + params.collection = collection; + params.direction = CollectionScanParams::FORWARD; + params.tailable = false; + + scoped_ptr<CollectionScan> scan(new CollectionScan(&_txn, params, &ws, NULL)); + while (!scan->isEOF()) { + WorkingSetID id = WorkingSet::INVALID_ID; + PlanStage::StageState state = scan->work(&id); + if (PlanStage::ADVANCED == state) { + WorkingSetMember* member = ws.get(id); + verify(member->hasObj()); + out->push_back(member->obj); + } + } + } + + void getLocs(Collection* collection, + CollectionScanParams::Direction direction, + vector<DiskLoc>* out) { + WorkingSet ws; + + CollectionScanParams params; + params.collection = collection; + params.direction = direction; + params.tailable = false; + + scoped_ptr<CollectionScan> scan(new CollectionScan(&_txn, params, &ws, NULL)); + while (!scan->isEOF()) { + WorkingSetID id = WorkingSet::INVALID_ID; + PlanStage::StageState state = scan->work(&id); + if (PlanStage::ADVANCED == state) { + WorkingSetMember* member = ws.get(id); + verify(member->hasLoc()); + out->push_back(member->loc); + } + } + } + + /** + * Asserts that 'objs' contains 'expectedDoc'. + */ + void assertHasDoc(const vector<BSONObj>& objs, const BSONObj& expectedDoc) { + bool foundDoc = false; + for (size_t i = 0; i < objs.size(); i++) { + if (0 == objs[i].woCompare(expectedDoc)) { + foundDoc = true; + break; + } + } + ASSERT(foundDoc); + } + + const char* ns() { return _ns.c_str(); } + + const NamespaceString& nsString() { return _nsString; } + + protected: + OperationContextImpl _txn; + + private: + DBDirectClient _client; + + std::string _ns; + NamespaceString _nsString; + }; + + /** + * Test an upsert into an empty collection. + */ + class QueryStageUpdateUpsertEmptyColl : public QueryStageUpdateBase { + public: + void run() { + // Run the update. + { + Client::WriteContext ctx(&_txn, ns()); + Client& c = cc(); + CurOp& curOp = *c.curop(); + OpDebug* opDebug = &curOp.debug(); + UpdateDriver driver( (UpdateDriver::Options()) ); + Database* db = ctx.ctx().db(); + + // Collection should be empty. + ASSERT_EQUALS(0U, count(BSONObj())); + + UpdateRequest request(&_txn, nsString()); + UpdateLifecycleImpl updateLifecycle(false, nsString()); + request.setLifecycle(&updateLifecycle); + + // Update is the upsert {_id: 0, x: 1}, {$set: {y: 2}}. + BSONObj query = fromjson("{_id: 0, x: 1}"); + BSONObj updates = fromjson("{$set: {y: 2}}"); + + request.setUpsert(); + request.setQuery(query); + request.setUpdates(updates); + + ASSERT_OK(driver.parse(request.getUpdates(), request.isMulti())); + + // Setup update params. + UpdateStageParams params(&request, &driver, opDebug); + scoped_ptr<CanonicalQuery> cq(canonicalize(query)); + params.canonicalQuery = cq.get(); + + scoped_ptr<WorkingSet> ws(new WorkingSet()); + auto_ptr<EOFStage> eofStage(new EOFStage()); + + scoped_ptr<UpdateStage> updateStage( + new UpdateStage(params, ws.get(), db, eofStage.release())); + + runUpdate(updateStage.get()); + ctx.commit(); + } + + // Verify the contents of the resulting collection. + { + Client::ReadContext ctx(&_txn, ns()); + Collection* collection = ctx.ctx().db()->getCollection(&_txn, ns()); + + vector<BSONObj> objs; + getCollContents(collection, &objs); + + // Expect a single document, {_id: 0, x: 1, y: 2}. + ASSERT_EQUALS(1U, objs.size()); + ASSERT_EQUALS(objs[0], fromjson("{_id: 0, x: 1, y: 2}")); + } + } + }; + + /** + * Test receipt of an invalidation: case in which the document about to updated + * is deleted. + */ + class QueryStageUpdateSkipInvalidatedDoc : public QueryStageUpdateBase { + public: + void run() { + // Run the update. + { + Client::WriteContext ctx(&_txn, ns()); + + // Populate the collection. + for (int i = 0; i < 10; ++i) { + insert(BSON("_id" << i << "foo" << i)); + } + ASSERT_EQUALS(10U, count(BSONObj())); + + Client& c = cc(); + CurOp& curOp = *c.curop(); + OpDebug* opDebug = &curOp.debug(); + UpdateDriver driver( (UpdateDriver::Options()) ); + Database* db = ctx.ctx().db(); + Collection* coll = db->getCollection(&_txn, ns()); + + // Get the DiskLocs that would be returned by an in-order scan. + vector<DiskLoc> locs; + getLocs(coll, CollectionScanParams::FORWARD, &locs); + + UpdateRequest request(&_txn, nsString()); + UpdateLifecycleImpl updateLifecycle(false, nsString()); + request.setLifecycle(&updateLifecycle); + + // Update is a multi-update that sets 'bar' to 3 in every document + // where foo is less than 5. + BSONObj query = fromjson("{foo: {$lt: 5}}"); + BSONObj updates = fromjson("{$set: {bar: 3}}"); + + request.setMulti(); + request.setQuery(query); + request.setUpdates(updates); + + ASSERT_OK(driver.parse(request.getUpdates(), request.isMulti())); + + // Configure the scan. + CollectionScanParams collScanParams; + collScanParams.collection = coll; + collScanParams.direction = CollectionScanParams::FORWARD; + collScanParams.tailable = false; + + // Configure the update. + UpdateStageParams updateParams(&request, &driver, opDebug); + scoped_ptr<CanonicalQuery> cq(canonicalize(query)); + updateParams.canonicalQuery = cq.get(); + + scoped_ptr<WorkingSet> ws(new WorkingSet()); + auto_ptr<CollectionScan> cs( + new CollectionScan(&_txn, collScanParams, ws.get(), cq->root())); + + scoped_ptr<UpdateStage> updateStage( + new UpdateStage(updateParams, ws.get(), db, cs.release())); + + const UpdateStats* stats = + static_cast<const UpdateStats*>(updateStage->getSpecificStats()); + + const size_t targetDocIndex = 3; + + while (stats->nModified < targetDocIndex) { + WorkingSetID id = WorkingSet::INVALID_ID; + PlanStage::StageState state = updateStage->work(&id); + ASSERT_EQUALS(PlanStage::NEED_TIME, state); + } + + // Remove locs[targetDocIndex]; + updateStage->saveState(); + updateStage->invalidate(locs[targetDocIndex], INVALIDATION_DELETION); + BSONObj targetDoc = coll->docFor(locs[targetDocIndex]); + ASSERT(!targetDoc.isEmpty()); + remove(targetDoc); + updateStage->restoreState(&_txn); + + // Do the remaining updates. + while (!updateStage->isEOF()) { + WorkingSetID id = WorkingSet::INVALID_ID; + PlanStage::StageState state = updateStage->work(&id); + ASSERT(PlanStage::NEED_TIME == state || PlanStage::IS_EOF == state); + } + + ctx.commit(); + + // 4 of the 5 matching documents should have been modified (one was deleted). + ASSERT_EQUALS(4U, stats->nModified); + ASSERT_EQUALS(4U, stats->nMatched); + } + + // Check the contents of the collection. + { + Client::ReadContext ctx(&_txn, ns()); + Collection* collection = ctx.ctx().db()->getCollection(&_txn, ns()); + + vector<BSONObj> objs; + getCollContents(collection, &objs); + + // Verify that the collection now has 9 docs (one was deleted). + ASSERT_EQUALS(9U, objs.size()); + + // Make sure that the collection has certain documents. + assertHasDoc(objs, fromjson("{_id: 0, foo: 0, bar: 3}")); + assertHasDoc(objs, fromjson("{_id: 1, foo: 1, bar: 3}")); + assertHasDoc(objs, fromjson("{_id: 2, foo: 2, bar: 3}")); + assertHasDoc(objs, fromjson("{_id: 4, foo: 4, bar: 3}")); + assertHasDoc(objs, fromjson("{_id: 5, foo: 5}")); + assertHasDoc(objs, fromjson("{_id: 6, foo: 6}")); + } + } + }; + + class All : public Suite { + public: + All() : Suite("query_stage_update") {} + + void setupTests() { + // Stage-specific tests below. + add<QueryStageUpdateUpsertEmptyColl>(); + add<QueryStageUpdateSkipInvalidatedDoc>(); + } + } all; + +} // namespace QueryStageUpdate |