summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/exec/SConscript1
-rw-r--r--src/mongo/db/exec/plan_stats.h35
-rw-r--r--src/mongo/db/exec/update.cpp818
-rw-r--r--src/mongo/db/exec/update.h164
-rw-r--r--src/mongo/db/ops/update.cpp734
-rw-r--r--src/mongo/db/query/get_executor.cpp83
-rw-r--r--src/mongo/db/query/get_executor.h49
-rw-r--r--src/mongo/db/query/stage_types.h2
-rw-r--r--src/mongo/dbtests/query_stage_update.cpp364
9 files changed, 1556 insertions, 694 deletions
diff --git a/src/mongo/db/exec/SConscript b/src/mongo/db/exec/SConscript
index 01b57a9ff2e..0d5ab3e2b29 100644
--- a/src/mongo/db/exec/SConscript
+++ b/src/mongo/db/exec/SConscript
@@ -64,6 +64,7 @@ env.Library(
"stagedebug_cmd.cpp",
"subplan.cpp",
"text.cpp",
+ "update.cpp",
"working_set_common.cpp",
],
LIBDEPS = [
diff --git a/src/mongo/db/exec/plan_stats.h b/src/mongo/db/exec/plan_stats.h
index adbc0b6d986..113be7af2a1 100644
--- a/src/mongo/db/exec/plan_stats.h
+++ b/src/mongo/db/exec/plan_stats.h
@@ -559,6 +559,41 @@ namespace mongo {
BSONObj keyPattern;
};
+ struct UpdateStats : public SpecificStats {
+ UpdateStats()
+ : nMatched(0),
+ nModified(0),
+ fastmod(false),
+ fastmodinsert(false),
+ inserted(false) { }
+
+ virtual SpecificStats* clone() const {
+ return new UpdateStats(*this);
+ }
+
+ // The number of documents which match the query part of the update.
+ size_t nMatched;
+
+ // The number of documents modified by this update.
+ size_t nModified;
+
+ // A 'fastmod' update is an in-place update that does not have to modify
+ // any indices. It's "fast" because the only work needed is changing the bits
+ // inside the document.
+ bool fastmod;
+
+ // A 'fastmodinsert' is an insert resulting from an {upsert: true} update
+ // which is a doc-replacement style update. It's "fast" because we don't need
+ // to compute the document to insert based on the modifiers.
+ bool fastmodinsert;
+
+ // Is this an {upsert: true} update that did an insert?
+ bool inserted;
+
+ // The object that was inserted. This is an empty document if no insert was performed.
+ BSONObj objInserted;
+ };
+
struct TextStats : public SpecificStats {
TextStats() : keysExamined(0), fetches(0), parsedTextQuery() { }
diff --git a/src/mongo/db/exec/update.cpp b/src/mongo/db/exec/update.cpp
new file mode 100644
index 00000000000..83a89262f46
--- /dev/null
+++ b/src/mongo/db/exec/update.cpp
@@ -0,0 +1,818 @@
+/**
+ * Copyright (C) 2014 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/exec/update.h"
+
+#include "mongo/bson/mutable/algorithm.h"
+#include "mongo/db/exec/working_set_common.h"
+#include "mongo/db/ops/update_lifecycle.h"
+#include "mongo/db/repl/repl_coordinator_global.h"
+#include "mongo/db/repl/oplog.h"
+
+namespace mongo {
+
+ namespace mb = mutablebson;
+
+ namespace {
+
+ const char idFieldName[] = "_id";
+ const FieldRef idFieldRef(idFieldName);
+
+ Status storageValid(const mb::Document&, const bool);
+ Status storageValid(const mb::ConstElement&, const bool);
+ Status storageValidChildren(const mb::ConstElement&, const bool);
+
+ /**
+ * mutable::document storageValid check -- like BSONObj::_okForStorage
+ */
+ Status storageValid(const mb::Document& doc, const bool deep = true) {
+ mb::ConstElement currElem = doc.root().leftChild();
+ while (currElem.ok()) {
+ if (currElem.getFieldName() == idFieldName) {
+ switch (currElem.getType()) {
+ case RegEx:
+ case Array:
+ case Undefined:
+ return Status(ErrorCodes::InvalidIdField,
+ str::stream() << "The '_id' value cannot be of type "
+ << typeName(currElem.getType()));
+ default:
+ break;
+ }
+ }
+ Status s = storageValid(currElem, deep);
+ if (!s.isOK())
+ return s;
+ currElem = currElem.rightSibling();
+ }
+
+ return Status::OK();
+ }
+
+ /**
+ * Validates an element that has a field name which starts with a dollar sign ($).
+ * In the case of a DBRef field ($id, $ref, [$db]) these fields may be valid in
+ * the correct order/context only.
+ */
+ Status validateDollarPrefixElement(const mb::ConstElement elem, const bool deep) {
+ mb::ConstElement curr = elem;
+ StringData currName = elem.getFieldName();
+
+ // Found a $db field
+ if (currName == "$db") {
+ if (curr.getType() != String) {
+ return Status(ErrorCodes::InvalidDBRef,
+ str::stream() << "The DBRef $db field must be a String, not a "
+ << typeName(curr.getType()));
+ }
+ curr = curr.leftSibling();
+
+ if (!curr.ok() || (curr.getFieldName() != "$id"))
+ return Status(ErrorCodes::InvalidDBRef,
+ "Found $db field without a $id before it, which is invalid.");
+
+ currName = curr.getFieldName();
+ }
+
+ // Found a $id field
+ if (currName == "$id") {
+ Status s = storageValidChildren(curr, deep);
+ if (!s.isOK())
+ return s;
+
+ curr = curr.leftSibling();
+ if (!curr.ok() || (curr.getFieldName() != "$ref")) {
+ return Status(ErrorCodes::InvalidDBRef,
+ "Found $id field without a $ref before it, which is invalid.");
+ }
+
+ currName = curr.getFieldName();
+ }
+
+ if (currName == "$ref") {
+ if (curr.getType() != String) {
+ return Status(ErrorCodes::InvalidDBRef,
+ str::stream() << "The DBRef $ref field must be a String, not a "
+ << typeName(curr.getType()));
+ }
+
+ if (!curr.rightSibling().ok() || curr.rightSibling().getFieldName() != "$id")
+ return Status(ErrorCodes::InvalidDBRef,
+ str::stream() << "The DBRef $ref field must be "
+ "following by a $id field");
+ }
+ else {
+ // not an okay, $ prefixed field name.
+ return Status(ErrorCodes::DollarPrefixedFieldName,
+ str::stream() << "The dollar ($) prefixed field '"
+ << elem.getFieldName() << "' in '"
+ << mb::getFullName(elem)
+ << "' is not valid for storage.");
+
+ }
+
+ return Status::OK();
+ }
+
+ Status storageValid(const mb::ConstElement& elem, const bool deep = true) {
+ if (!elem.ok())
+ return Status(ErrorCodes::BadValue, "Invalid elements cannot be stored.");
+
+ // Field names of elements inside arrays are not meaningful in mutable bson,
+ // so we do not want to validate them.
+ //
+ // TODO: Revisit how mutable handles array field names. We going to need to make
+ // this better if we ever want to support ordered updates that can alter the same
+ // element repeatedly; see SERVER-12848.
+ const bool childOfArray = elem.parent().ok() ?
+ (elem.parent().getType() == mongo::Array) : false;
+
+ if (!childOfArray) {
+ StringData fieldName = elem.getFieldName();
+ // Cannot start with "$", unless dbref
+ if (fieldName[0] == '$') {
+ Status status = validateDollarPrefixElement(elem, deep);
+ if (!status.isOK())
+ return status;
+ }
+ else if (fieldName.find(".") != string::npos) {
+ // Field name cannot have a "." in it.
+ return Status(ErrorCodes::DottedFieldName,
+ str::stream() << "The dotted field '"
+ << elem.getFieldName() << "' in '"
+ << mb::getFullName(elem)
+ << "' is not valid for storage.");
+ }
+ }
+
+ // Check children if there are any.
+ Status s = storageValidChildren(elem, deep);
+ if (!s.isOK())
+ return s;
+
+ return Status::OK();
+ }
+
+ Status storageValidChildren(const mb::ConstElement& elem, const bool deep) {
+ if (!elem.hasChildren())
+ return Status::OK();
+
+ mb::ConstElement curr = elem.leftChild();
+ while (curr.ok()) {
+ Status s = storageValid(curr, deep);
+ if (!s.isOK())
+ return s;
+ curr = curr.rightSibling();
+ }
+
+ return Status::OK();
+ }
+
+ /**
+ * This will verify that all updated fields are
+ * 1.) Valid for storage (checking parent to make sure things like DBRefs are valid)
+ * 2.) Compare updated immutable fields do not change values
+ *
+ * If updateFields is empty then it was replacement and/or we need to check all fields
+ */
+ inline Status validate(const BSONObj& original,
+ const FieldRefSet& updatedFields,
+ const mb::Document& updated,
+ const std::vector<FieldRef*>* immutableAndSingleValueFields,
+ const ModifierInterface::Options& opts) {
+
+ LOG(3) << "update validate options -- "
+ << " updatedFields: " << updatedFields
+ << " immutableAndSingleValueFields.size:"
+ << (immutableAndSingleValueFields ? immutableAndSingleValueFields->size() : 0)
+ << " fromRepl: " << opts.fromReplication
+ << " validate:" << opts.enforceOkForStorage;
+
+ // 1.) Loop through each updated field and validate for storage
+ // and detect immutable field updates
+
+ // The set of possibly changed immutable fields -- we will need to check their vals
+ FieldRefSet changedImmutableFields;
+
+ // Check to see if there were no fields specified or if we are not validating
+ // The case if a range query, or query that didn't result in saved fields
+ if (updatedFields.empty() || !opts.enforceOkForStorage) {
+ if (opts.enforceOkForStorage) {
+ // No specific fields were updated so the whole doc must be checked
+ Status s = storageValid(updated, true);
+ if (!s.isOK())
+ return s;
+ }
+
+ // Check all immutable fields
+ if (immutableAndSingleValueFields)
+ changedImmutableFields.fillFrom(*immutableAndSingleValueFields);
+ }
+ else {
+
+ // TODO: Change impl so we don't need to create a new FieldRefSet
+ // -- move all conflict logic into static function on FieldRefSet?
+ FieldRefSet immutableFieldRef;
+ if (immutableAndSingleValueFields)
+ immutableFieldRef.fillFrom(*immutableAndSingleValueFields);
+
+ FieldRefSet::const_iterator where = updatedFields.begin();
+ const FieldRefSet::const_iterator end = updatedFields.end();
+ for( ; where != end; ++where) {
+ const FieldRef& current = **where;
+
+ // Find the updated field in the updated document.
+ mutablebson::ConstElement newElem = updated.root();
+ size_t currentPart = 0;
+ while (newElem.ok() && currentPart < current.numParts())
+ newElem = newElem[current.getPart(currentPart++)];
+
+ // newElem might be missing if $unset/$renamed-away
+ if (newElem.ok()) {
+ Status s = storageValid(newElem, true);
+ if (!s.isOK())
+ return s;
+ }
+ // Check if the updated field conflicts with immutable fields
+ immutableFieldRef.findConflicts(&current, &changedImmutableFields);
+ }
+ }
+
+ const bool checkIdField = (updatedFields.empty() && !original.isEmpty()) ||
+ updatedFields.findConflicts(&idFieldRef, NULL);
+
+ // Add _id to fields to check since it too is immutable
+ if (checkIdField)
+ changedImmutableFields.keepShortest(&idFieldRef);
+ else if (changedImmutableFields.empty()) {
+ // Return early if nothing changed which is immutable
+ return Status::OK();
+ }
+
+ LOG(4) << "Changed immutable fields: " << changedImmutableFields;
+ // 2.) Now compare values of the changed immutable fields (to make sure they haven't)
+
+ const mutablebson::ConstElement newIdElem = updated.root()[idFieldName];
+
+ FieldRefSet::const_iterator where = changedImmutableFields.begin();
+ const FieldRefSet::const_iterator end = changedImmutableFields.end();
+ for( ; where != end; ++where ) {
+ const FieldRef& current = **where;
+
+ // Find the updated field in the updated document.
+ mutablebson::ConstElement newElem = updated.root();
+ size_t currentPart = 0;
+ while (newElem.ok() && currentPart < current.numParts())
+ newElem = newElem[current.getPart(currentPart++)];
+
+ if (!newElem.ok()) {
+ if (original.isEmpty()) {
+ // If the _id is missing and not required, then skip this check
+ if (!(current.dottedField() == idFieldName))
+ return Status(ErrorCodes::NoSuchKey,
+ mongoutils::str::stream()
+ << "After applying the update, the new"
+ << " document was missing the '"
+ << current.dottedField()
+ << "' (required and immutable) field.");
+
+ }
+ else {
+ if (current.dottedField() != idFieldName)
+ return Status(ErrorCodes::ImmutableField,
+ mongoutils::str::stream()
+ << "After applying the update to the document with "
+ << newIdElem.toString()
+ << ", the '" << current.dottedField()
+ << "' (required and immutable) field was "
+ "found to have been removed --"
+ << original);
+ }
+ }
+ else {
+
+ // Find the potentially affected field in the original document.
+ const BSONElement oldElem = original.getFieldDotted(current.dottedField());
+ const BSONElement oldIdElem = original.getField(idFieldName);
+
+ // Ensure no arrays since neither _id nor shard keys can be in an array, or one.
+ mb::ConstElement currElem = newElem;
+ while (currElem.ok()) {
+ if (currElem.getType() == Array) {
+ return Status(ErrorCodes::NotSingleValueField,
+ mongoutils::str::stream()
+ << "After applying the update to the document {"
+ << (oldIdElem.ok() ? oldIdElem.toString() :
+ newIdElem.toString())
+ << " , ...}, the (immutable) field '"
+ << current.dottedField()
+ << "' was found to be an array or array descendant.");
+ }
+ currElem = currElem.parent();
+ }
+
+ // If we have both (old and new), compare them. If we just have new we are good
+ if (oldElem.ok() && newElem.compareWithBSONElement(oldElem, false) != 0) {
+ return Status(ErrorCodes::ImmutableField,
+ mongoutils::str::stream()
+ << "After applying the update to the document {"
+ << (oldIdElem.ok() ? oldIdElem.toString() :
+ newIdElem.toString())
+ << " , ...}, the (immutable) field '" << current.dottedField()
+ << "' was found to have been altered to "
+ << newElem.toString());
+ }
+ }
+ }
+
+ return Status::OK();
+ }
+
+ Status ensureIdAndFirst(mb::Document& doc) {
+ mb::Element idElem = mb::findFirstChildNamed(doc.root(), idFieldName);
+
+ // Move _id as first element if it exists
+ if (idElem.ok()) {
+ if (idElem.leftSibling().ok()) {
+ Status s = idElem.remove();
+ if (!s.isOK())
+ return s;
+ s = doc.root().pushFront(idElem);
+ if (!s.isOK())
+ return s;
+ }
+ }
+ else {
+ // Create _id if the document does not currently have one.
+ idElem = doc.makeElementNewOID(idFieldName);
+ if (!idElem.ok())
+ return Status(ErrorCodes::BadValue,
+ "Could not create new _id ObjectId element.",
+ 17268);
+ Status s = doc.root().pushFront(idElem);
+ if (!s.isOK())
+ return s;
+ }
+
+ return Status::OK();
+
+ }
+ } // namespace
+
+ // static
+ const char* UpdateStage::kStageType = "UPDATE";
+
+ UpdateStage::UpdateStage(const UpdateStageParams& params,
+ WorkingSet* ws,
+ Database* db,
+ PlanStage* child)
+ : _params(params),
+ _ws(ws),
+ _db(db),
+ _child(child),
+ _commonStats(kStageType),
+ _updatedLocs(params.request->isMulti() ? new DiskLocSet() : NULL),
+ _doc(params.driver->getDocument()) {
+ // We are an update until we fall into the insert case.
+ params.driver->setContext(ModifierInterface::ExecInfo::UPDATE_CONTEXT);
+
+ _collection = db->getCollection(params.request->getOpCtx(),
+ params.request->getNamespaceString().ns());
+ }
+
+ void UpdateStage::transformAndUpdate(BSONObj& oldObj, DiskLoc& loc) {
+ const UpdateRequest* request = _params.request;
+ UpdateDriver* driver = _params.driver;
+ CanonicalQuery* cq = _params.canonicalQuery;
+ UpdateLifecycle* lifecycle = request->getLifecycle();
+
+ // Ask the driver to apply the mods. It may be that the driver can apply those "in
+ // place", that is, some values of the old document just get adjusted without any
+ // change to the binary layout on the bson layer. It may be that a whole new
+ // document is needed to accomodate the new bson layout of the resulting document.
+ _doc.reset(oldObj, mutablebson::Document::kInPlaceEnabled);
+ BSONObj logObj;
+
+ FieldRefSet updatedFields;
+
+ Status status = Status::OK();
+ if (!driver->needMatchDetails()) {
+ // If we don't need match details, avoid doing the rematch
+ status = driver->update(StringData(), &_doc, &logObj, &updatedFields);
+ }
+ else {
+ // If there was a matched field, obtain it.
+ MatchDetails matchDetails;
+ matchDetails.requestElemMatchKey();
+
+ dassert(cq);
+ verify(cq->root()->matchesBSON(oldObj, &matchDetails));
+
+ string matchedField;
+ if (matchDetails.hasElemMatchKey())
+ matchedField = matchDetails.elemMatchKey();
+
+ // TODO: Right now, each mod checks in 'prepare' that if it needs positional
+ // data, that a non-empty StringData() was provided. In principle, we could do
+ // that check here in an else clause to the above conditional and remove the
+ // checks from the mods.
+
+ status = driver->update(matchedField, &_doc, &logObj, &updatedFields);
+ }
+
+ if (!status.isOK()) {
+ uasserted(16837, status.reason());
+ }
+
+ // Ensure _id exists and is first
+ uassertStatusOK(ensureIdAndFirst(_doc));
+
+ // If the driver applied the mods in place, we can ask the mutable for what
+ // changed. We call those changes "damages". :) We use the damages to inform the
+ // journal what was changed, and then apply them to the original document
+ // ourselves. If, however, the driver applied the mods out of place, we ask it to
+ // generate a new, modified document for us. In that case, the file manager will
+ // take care of the journaling details for us.
+ //
+ // This code flow is admittedly odd. But, right now, journaling is baked in the file
+ // manager. And if we aren't using the file manager, we have to do jounaling
+ // ourselves.
+ bool docWasModified = false;
+ BSONObj newObj;
+ const char* source = NULL;
+ bool inPlace = _doc.getInPlaceUpdates(&_damages, &source);
+
+ // If something changed in the document, verify that no immutable fields were changed
+ // and data is valid for storage.
+ if ((!inPlace || !_damages.empty()) ) {
+ if (!(request->isFromReplication() || request->isFromMigration())) {
+ const std::vector<FieldRef*>* immutableFields = NULL;
+ if (lifecycle)
+ immutableFields = lifecycle->getImmutableFields();
+
+ uassertStatusOK(validate(oldObj,
+ updatedFields,
+ _doc,
+ immutableFields,
+ driver->modOptions()) );
+ }
+ }
+
+ WriteUnitOfWork wunit(request->getOpCtx()->recoveryUnit());
+
+ // Save state before making changes
+ saveState();
+
+ if (inPlace && !driver->modsAffectIndices()) {
+ // If a set of modifiers were all no-ops, we are still 'in place', but there is
+ // no work to do, in which case we want to consider the object unchanged.
+ if (!_damages.empty() ) {
+ _collection->updateDocumentWithDamages(request->getOpCtx(), loc, source, _damages);
+ docWasModified = true;
+ _specificStats.fastmod = true;
+ }
+
+ newObj = oldObj;
+ }
+ else {
+ // The updates were not in place. Apply them through the file manager.
+
+ // XXX: With experimental document-level locking, we do not hold the sufficient
+ // locks, so this would cause corruption.
+ fassert(18516, !useExperimentalDocLocking);
+
+ newObj = _doc.getObject();
+ uassert(17419,
+ str::stream() << "Resulting document after update is larger than "
+ << BSONObjMaxUserSize,
+ newObj.objsize() <= BSONObjMaxUserSize);
+ StatusWith<DiskLoc> res = _collection->updateDocument(request->getOpCtx(),
+ loc,
+ newObj,
+ true,
+ _params.opDebug);
+ uassertStatusOK(res.getStatus());
+ DiskLoc newLoc = res.getValue();
+ docWasModified = true;
+
+ // If the document moved, we might see it again in a collection scan (maybe it's
+ // a document after our current document).
+ //
+ // If the document is indexed and the mod changes an indexed value, we might see it
+ // again. For an example, see the comment above near declaration of updatedLocs.
+ if (_updatedLocs && (newLoc != loc || driver->modsAffectIndices())) {
+ _updatedLocs->insert(newLoc);
+ }
+ }
+
+ // Restore state after modification
+ restoreState(request->getOpCtx());
+
+ // Call logOp if requested.
+ if (request->shouldCallLogOp() && !logObj.isEmpty()) {
+ BSONObj idQuery = driver->makeOplogEntryQuery(newObj, request->isMulti());
+ repl::logOp(request->getOpCtx(),
+ "u",
+ request->getNamespaceString().ns().c_str(),
+ logObj,
+ &idQuery,
+ NULL,
+ request->isFromMigration());
+ }
+
+ wunit.commit();
+
+ // Only record doc modifications if they wrote (exclude no-ops)
+ if (docWasModified) {
+ _specificStats.nModified++;
+ }
+
+ // Opportunity for journaling to write during the update.
+ request->getOpCtx()->recoveryUnit()->commitIfNeeded();
+ }
+
+ void UpdateStage::doInsert() {
+ _specificStats.inserted = true;
+
+ const UpdateRequest* request = _params.request;
+ UpdateDriver* driver = _params.driver;
+ CanonicalQuery* cq = _params.canonicalQuery;
+ UpdateLifecycle* lifecycle = request->getLifecycle();
+
+ // Since this is an insert (no docs found and upsert:true), we will be logging it
+ // as an insert in the oplog. We don't need the driver's help to build the
+ // oplog record, then. We also set the context of the update driver to the INSERT_CONTEXT.
+ // Some mods may only work in that context (e.g. $setOnInsert).
+ driver->setLogOp(false);
+ driver->setContext(ModifierInterface::ExecInfo::INSERT_CONTEXT);
+
+ // Reset the document we will be writing to
+ _doc.reset();
+
+ // This remains the empty object in the case of an object replacement, but in the case
+ // of an upsert where we are creating a base object from the query and applying mods,
+ // we capture the query as the original so that we can detect immutable field mutations.
+ BSONObj original = BSONObj();
+
+ // Calling populateDocumentWithQueryFields will populate the '_doc' with fields from the
+ // query which creates the base of the update for the inserted doc (because upsert
+ // was true).
+ if (cq) {
+ uassertStatusOK(driver->populateDocumentWithQueryFields(cq, _doc));
+ if (!driver->isDocReplacement()) {
+ _specificStats.fastmodinsert = true;
+ // We need all the fields from the query to compare against for validation below.
+ original = _doc.getObject();
+ }
+ else {
+ original = request->getQuery();
+ }
+ }
+ else {
+ fassert(17354, CanonicalQuery::isSimpleIdQuery(request->getQuery()));
+ BSONElement idElt = request->getQuery()[idFieldName];
+ original = idElt.wrap();
+ fassert(17352, _doc.root().appendElement(idElt));
+ }
+
+ // Apply the update modifications and then log the update as an insert manually.
+ Status status = driver->update(StringData(), &_doc);
+ if (!status.isOK()) {
+ uasserted(16836, status.reason());
+ }
+
+ // Ensure _id exists and is first
+ uassertStatusOK(ensureIdAndFirst(_doc));
+
+ // Validate that the object replacement or modifiers resulted in a document
+ // that contains all the immutable keys and can be stored if it isn't coming
+ // from a migration or via replication.
+ if (!(request->isFromReplication() || request->isFromMigration())){
+ const std::vector<FieldRef*>* immutableFields = NULL;
+ if (lifecycle)
+ immutableFields = lifecycle->getImmutableFields();
+
+ FieldRefSet noFields;
+ // This will only validate the modified fields if not a replacement.
+ uassertStatusOK(validate(original,
+ noFields,
+ _doc,
+ immutableFields,
+ driver->modOptions()) );
+ }
+
+ // Insert the doc
+ BSONObj newObj = _doc.getObject();
+ uassert(17420,
+ str::stream() << "Document to upsert is larger than " << BSONObjMaxUserSize,
+ newObj.objsize() <= BSONObjMaxUserSize);
+
+ WriteUnitOfWork wunit(request->getOpCtx()->recoveryUnit());
+ // Only create the collection if the doc will be inserted.
+ if (!_collection) {
+ _collection = _db->getCollection(request->getOpCtx(),
+ request->getNamespaceString().ns());
+ if (!_collection) {
+ _collection = _db->createCollection(request->getOpCtx(),
+ request->getNamespaceString().ns());
+ }
+ }
+
+
+ StatusWith<DiskLoc> newLoc = _collection->insertDocument(request->getOpCtx(),
+ newObj,
+ !request->isGod()/*enforceQuota*/);
+ uassertStatusOK(newLoc.getStatus());
+ if (request->shouldCallLogOp()) {
+ repl::logOp(request->getOpCtx(),
+ "i",
+ request->getNamespaceString().ns().c_str(),
+ newObj,
+ NULL,
+ NULL,
+ request->isFromMigration());
+ }
+
+ wunit.commit();
+
+ _specificStats.objInserted = newObj;
+ }
+
+ bool UpdateStage::doneUpdating() {
+ // We're done updating if either the child has no more results to give us, or we've
+ // already gotten a result back and we're not a multi-update.
+ return _child->isEOF() || (_specificStats.nMatched > 0 && !_params.request->isMulti());
+ }
+
+ bool UpdateStage::needInsert() {
+ // We need to insert if
+ // 1) we haven't inserted already,
+ // 2) the child stage returned zero matches, and
+ // 3) the user asked for an upsert.
+ return !_specificStats.inserted
+ && _specificStats.nMatched == 0
+ && _params.request->isUpsert();
+ }
+
+ bool UpdateStage::isEOF() {
+ return doneUpdating() && !needInsert();
+ }
+
+ PlanStage::StageState UpdateStage::work(WorkingSetID* out) {
+ ++_commonStats.works;
+
+ // Adds the amount of time taken by work() to executionTimeMillis.
+ ScopedTimer timer(&_commonStats.executionTimeMillis);
+
+ if (isEOF()) { return PlanStage::IS_EOF; }
+
+ if (doneUpdating()) {
+ // Even if we're done updating, we may have some inserting left to do.
+ if (needInsert()) {
+ doInsert();
+ }
+
+ // At this point either we're done updating and there was no insert to do,
+ // or we're done updating and we're done inserting. Either way, we're EOF.
+ invariant(isEOF());
+ return PlanStage::IS_EOF;
+ }
+
+ // If we're here, then we still have to ask for results from the child and apply
+ // updates to them. We should only get here if the collection exists.
+ invariant(_collection);
+
+ WorkingSetID id = WorkingSet::INVALID_ID;
+ StageState status = _child->work(&id);
+
+ if (PlanStage::ADVANCED == status) {
+ // Need to get these things from the result returned by the child.
+ DiskLoc loc;
+ BSONObj oldObj;
+
+ WorkingSetMember* member = _ws->get(id);
+
+ if (!member->hasLoc()) {
+ _ws->free(id);
+ const std::string errmsg = "update stage failed to read member w/ loc from child";
+ *out = WorkingSetCommon::allocateStatusMember(_ws, Status(ErrorCodes::InternalError,
+ errmsg));
+ return PlanStage::FAILURE;
+ }
+ loc = member->loc;
+
+ // Updates can't have projections. This means that covering analysis will always add
+ // a fetch. We should always get fetched data, and never just key data.
+ invariant(member->hasObj());
+ oldObj = member->obj;
+
+ // If we're here, then we have retrieved both a DiskLoc and the corresponding
+ // unowned object from the child stage. Since we have the object and the diskloc,
+ // we can free the WSM.
+ _ws->free(id);
+
+ // We fill this with the new locs of moved doc so we don't double-update.
+ if (_updatedLocs && _updatedLocs->count(loc) > 0) {
+ // Found a loc that we already updated.
+ ++_commonStats.needTime;
+ return PlanStage::NEED_TIME;
+ }
+
+ ++_specificStats.nMatched;
+
+ // Do the update and return.
+ transformAndUpdate(oldObj, loc);
+ ++_commonStats.needTime;
+ return PlanStage::NEED_TIME;
+ }
+ else if (PlanStage::IS_EOF == status) {
+ // The child is out of results, but we might not be done yet because we still might
+ // have to do an insert.
+ ++_commonStats.needTime;
+ return PlanStage::NEED_TIME;
+ }
+ else if (PlanStage::FAILURE == status) {
+ *out = id;
+ // If a stage fails, it may create a status WSM to indicate why it failed, in which case
+ // 'id' is valid. If ID is invalid, we create our own error message.
+ if (WorkingSet::INVALID_ID == id) {
+ const std::string errmsg = "delete stage failed to read in results from child";
+ *out = WorkingSetCommon::allocateStatusMember(_ws, Status(ErrorCodes::InternalError,
+ errmsg));
+ return PlanStage::FAILURE;
+ }
+ return status;
+ }
+ else {
+ if (PlanStage::NEED_TIME == status) {
+ ++_commonStats.needTime;
+ }
+ return status;
+ }
+ }
+
+ void UpdateStage::saveState() {
+ ++_commonStats.yields;
+ _child->saveState();
+ }
+
+ void UpdateStage::restoreState(OperationContext* opCtx) {
+ ++_commonStats.unyields;
+ _child->restoreState(opCtx);
+ }
+
+ void UpdateStage::invalidate(const DiskLoc& dl, InvalidationType type) {
+ ++_commonStats.invalidates;
+ _child->invalidate(dl, type);
+ }
+
+ vector<PlanStage*> UpdateStage::getChildren() const {
+ vector<PlanStage*> children;
+ children.push_back(_child.get());
+ return children;
+ }
+
+ PlanStageStats* UpdateStage::getStats() {
+ _commonStats.isEOF = isEOF();
+ auto_ptr<PlanStageStats> ret(new PlanStageStats(_commonStats, STAGE_UPDATE));
+ ret->specific.reset(new UpdateStats(_specificStats));
+ ret->children.push_back(_child->getStats());
+ return ret.release();
+ }
+
+ const CommonStats* UpdateStage::getCommonStats() {
+ return &_commonStats;
+ }
+
+ const SpecificStats* UpdateStage::getSpecificStats() {
+ return &_specificStats;
+ }
+
+} // namespace mongo
diff --git a/src/mongo/db/exec/update.h b/src/mongo/db/exec/update.h
new file mode 100644
index 00000000000..862074e4001
--- /dev/null
+++ b/src/mongo/db/exec/update.h
@@ -0,0 +1,164 @@
+/**
+ * Copyright (C) 2014 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/catalog/database.h"
+#include "mongo/db/exec/plan_stage.h"
+#include "mongo/db/jsobj.h"
+#include "mongo/db/ops/update_driver.h"
+#include "mongo/db/ops/update_request.h"
+
+namespace mongo {
+
+ class OperationContext;
+
+ struct UpdateStageParams {
+
+ UpdateStageParams(const UpdateRequest* r,
+ UpdateDriver* d,
+ OpDebug* o)
+ : request(r),
+ driver(d),
+ opDebug(o),
+ canonicalQuery(NULL) { }
+
+ // Contains update parameters like whether it's a multi update or an upsert. Not owned.
+ // Must outlive the UpdateStage.
+ const UpdateRequest* request;
+
+ // Contains the logic for applying mods to documents. Not owned. Must outlive
+ // the UpdateStage.
+ UpdateDriver* driver;
+
+ // Needed to pass to Collection::updateDocument(...).
+ OpDebug* opDebug;
+
+ // Not owned here.
+ CanonicalQuery* canonicalQuery;
+
+ private:
+ // Default constructor not allowed.
+ UpdateStageParams();
+ };
+
+ /**
+ * Execution stage responsible for updates to documents and upserts. NEED_TIME is returned
+ * after performing an update or an insert.
+ *
+ * Callers of work() must be holding a write lock.
+ */
+ class UpdateStage : public PlanStage {
+ MONGO_DISALLOW_COPYING(UpdateStage);
+ public:
+ UpdateStage(const UpdateStageParams& params,
+ WorkingSet* ws,
+ Database* db,
+ PlanStage* child);
+
+ virtual bool isEOF();
+ virtual StageState work(WorkingSetID* out);
+
+ virtual void saveState();
+ virtual void restoreState(OperationContext* opCtx);
+ virtual void invalidate(const DiskLoc& dl, InvalidationType type);
+
+ virtual std::vector<PlanStage*> getChildren() const;
+
+ virtual StageType stageType() const { return STAGE_UPDATE; }
+
+ virtual PlanStageStats* getStats();
+
+ virtual const CommonStats* getCommonStats();
+
+ virtual const SpecificStats* getSpecificStats();
+
+ static const char* kStageType;
+
+ private:
+ /**
+ * Computes the result of applying mods to the document 'oldObj' at DiskLoc 'loc' in
+ * memory, then commits these changes to the database.
+ */
+ void transformAndUpdate(BSONObj& oldObj, DiskLoc& loc);
+
+ /**
+ * Computes the document to insert and inserts it into the collection. Used if the
+ * user requested an upsert and no matching documents were found.
+ */
+ void doInsert();
+
+ /**
+ * Have we performed all necessary updates? Even if this is true, we might not be EOF,
+ * as we might still have to do an insert.
+ */
+ bool doneUpdating();
+
+ /**
+ * Examines the stats / update request and returns whether there is still an insert left
+ * to do. If so then this stage is not EOF yet.
+ */
+ bool needInsert();
+
+ UpdateStageParams _params;
+
+ // Not owned by us.
+ WorkingSet* _ws;
+
+ // Not owned by us.
+ Database* _db;
+ Collection* _collection;
+
+ // Owned by us.
+ scoped_ptr<PlanStage> _child;
+
+ // Stats
+ CommonStats _commonStats;
+ UpdateStats _specificStats;
+
+ // If the update was in-place, we may see it again. This only matters if we're doing
+ // a multi-update; if we're not doing a multi-update we stop after one update and we
+ // won't see any more docs.
+ //
+ // For example: If we're scanning an index {x:1} and performing {$inc:{x:5}}, we'll keep
+ // moving the document forward and it will continue to reappear in our index scan.
+ // Unless the index is multikey, the underlying query machinery won't de-dup.
+ //
+ // If the update wasn't in-place we may see it again. Our query may return the new
+ // document and we wouldn't want to update that.
+ //
+ // So, no matter what, we keep track of where the doc wound up.
+ typedef unordered_set<DiskLoc, DiskLoc::Hasher> DiskLocSet;
+ const boost::scoped_ptr<DiskLocSet> _updatedLocs;
+
+ // These get reused for each update.
+ mutablebson::Document& _doc;
+ mutablebson::DamageVector _damages;
+ };
+
+} // namespace mongo
diff --git a/src/mongo/db/ops/update.cpp b/src/mongo/db/ops/update.cpp
index 921f7fd02e6..f1af1023d86 100644
--- a/src/mongo/db/ops/update.cpp
+++ b/src/mongo/db/ops/update.cpp
@@ -32,39 +32,26 @@
#include "mongo/db/ops/update.h"
-#include <cstring> // for memcpy
-
-#include "mongo/bson/mutable/algorithm.h"
-#include "mongo/bson/mutable/document.h"
#include "mongo/client/dbclientinterface.h"
#include "mongo/db/catalog/collection.h"
#include "mongo/db/catalog/database_holder.h"
#include "mongo/db/clientcursor.h"
+#include "mongo/db/exec/update.h"
#include "mongo/db/operation_context_impl.h"
#include "mongo/db/ops/update_driver.h"
#include "mongo/db/ops/update_executor.h"
#include "mongo/db/ops/update_lifecycle.h"
#include "mongo/db/query/explain.h"
#include "mongo/db/query/get_executor.h"
-#include "mongo/db/query/lite_parsed_query.h"
-#include "mongo/db/query/query_planner_common.h"
-#include "mongo/db/repl/oplog.h"
-#include "mongo/db/repl/repl_coordinator_global.h"
#include "mongo/db/update_index_data.h"
-#include "mongo/platform/unordered_set.h"
#include "mongo/util/log.h"
namespace mongo {
MONGO_LOG_DEFAULT_COMPONENT_FILE(::mongo::logger::LogComponent::kQuery);
- namespace mb = mutablebson;
-
namespace {
- const char idFieldName[] = "_id";
- const FieldRef idFieldRef(idFieldName);
-
// TODO: Make this a function on NamespaceString, or make it cleaner.
inline void validateUpdate(const char* ns ,
const BSONObj& updateobj,
@@ -80,345 +67,6 @@ namespace mongo {
}
}
- Status storageValid(const mb::Document&, const bool);
- Status storageValid(const mb::ConstElement&, const bool);
- Status storageValidChildren(const mb::ConstElement&, const bool);
-
- /**
- * mutable::document storageValid check -- like BSONObj::_okForStorage
- */
- Status storageValid(const mb::Document& doc, const bool deep = true) {
- mb::ConstElement currElem = doc.root().leftChild();
- while (currElem.ok()) {
- if (currElem.getFieldName() == idFieldName) {
- switch (currElem.getType()) {
- case RegEx:
- case Array:
- case Undefined:
- return Status(ErrorCodes::InvalidIdField,
- str::stream() << "The '_id' value cannot be of type "
- << typeName(currElem.getType()));
- default:
- break;
- }
- }
- Status s = storageValid(currElem, deep);
- if (!s.isOK())
- return s;
- currElem = currElem.rightSibling();
- }
-
- return Status::OK();
- }
-
- /**
- * Validates an element that has a field name which starts with a dollar sign ($).
- * In the case of a DBRef field ($id, $ref, [$db]) these fields may be valid in
- * the correct order/context only.
- */
- Status validateDollarPrefixElement(const mb::ConstElement elem, const bool deep) {
- mb::ConstElement curr = elem;
- StringData currName = elem.getFieldName();
-
- // Found a $db field
- if (currName == "$db") {
- if (curr.getType() != String) {
- return Status(ErrorCodes::InvalidDBRef,
- str::stream() << "The DBRef $db field must be a String, not a "
- << typeName(curr.getType()));
- }
- curr = curr.leftSibling();
-
- if (!curr.ok() || (curr.getFieldName() != "$id"))
- return Status(ErrorCodes::InvalidDBRef,
- "Found $db field without a $id before it, which is invalid.");
-
- currName = curr.getFieldName();
- }
-
- // Found a $id field
- if (currName == "$id") {
- Status s = storageValidChildren(curr, deep);
- if (!s.isOK())
- return s;
-
- curr = curr.leftSibling();
- if (!curr.ok() || (curr.getFieldName() != "$ref")) {
- return Status(ErrorCodes::InvalidDBRef,
- "Found $id field without a $ref before it, which is invalid.");
- }
-
- currName = curr.getFieldName();
- }
-
- if (currName == "$ref") {
- if (curr.getType() != String) {
- return Status(ErrorCodes::InvalidDBRef,
- str::stream() << "The DBRef $ref field must be a String, not a "
- << typeName(curr.getType()));
- }
-
- if (!curr.rightSibling().ok() || curr.rightSibling().getFieldName() != "$id")
- return Status(ErrorCodes::InvalidDBRef,
- str::stream() << "The DBRef $ref field must be "
- "following by a $id field");
- }
- else {
- // not an okay, $ prefixed field name.
- return Status(ErrorCodes::DollarPrefixedFieldName,
- str::stream() << "The dollar ($) prefixed field '"
- << elem.getFieldName() << "' in '"
- << mb::getFullName(elem)
- << "' is not valid for storage.");
-
- }
-
- return Status::OK();
- }
-
- Status storageValid(const mb::ConstElement& elem, const bool deep = true) {
- if (!elem.ok())
- return Status(ErrorCodes::BadValue, "Invalid elements cannot be stored.");
-
- // Field names of elements inside arrays are not meaningful in mutable bson,
- // so we do not want to validate them.
- //
- // TODO: Revisit how mutable handles array field names. We going to need to make
- // this better if we ever want to support ordered updates that can alter the same
- // element repeatedly; see SERVER-12848.
- const bool childOfArray = elem.parent().ok() ?
- (elem.parent().getType() == mongo::Array) : false;
-
- if (!childOfArray) {
- StringData fieldName = elem.getFieldName();
- // Cannot start with "$", unless dbref
- if (fieldName[0] == '$') {
- Status status = validateDollarPrefixElement(elem, deep);
- if (!status.isOK())
- return status;
- }
- else if (fieldName.find(".") != string::npos) {
- // Field name cannot have a "." in it.
- return Status(ErrorCodes::DottedFieldName,
- str::stream() << "The dotted field '"
- << elem.getFieldName() << "' in '"
- << mb::getFullName(elem)
- << "' is not valid for storage.");
- }
- }
-
- // Check children if there are any.
- Status s = storageValidChildren(elem, deep);
- if (!s.isOK())
- return s;
-
- return Status::OK();
- }
-
- Status storageValidChildren(const mb::ConstElement& elem, const bool deep) {
- if (!elem.hasChildren())
- return Status::OK();
-
- mb::ConstElement curr = elem.leftChild();
- while (curr.ok()) {
- Status s = storageValid(curr, deep);
- if (!s.isOK())
- return s;
- curr = curr.rightSibling();
- }
-
- return Status::OK();
- }
-
- /**
- * This will verify that all updated fields are
- * 1.) Valid for storage (checking parent to make sure things like DBRefs are valid)
- * 2.) Compare updated immutable fields do not change values
- *
- * If updateFields is empty then it was replacement and/or we need to check all fields
- */
- inline Status validate(const BSONObj& original,
- const FieldRefSet& updatedFields,
- const mb::Document& updated,
- const std::vector<FieldRef*>* immutableAndSingleValueFields,
- const ModifierInterface::Options& opts) {
-
- LOG(3) << "update validate options -- "
- << " updatedFields: " << updatedFields
- << " immutableAndSingleValueFields.size:"
- << (immutableAndSingleValueFields ? immutableAndSingleValueFields->size() : 0)
- << " fromRepl: " << opts.fromReplication
- << " validate:" << opts.enforceOkForStorage;
-
- // 1.) Loop through each updated field and validate for storage
- // and detect immutable field updates
-
- // The set of possibly changed immutable fields -- we will need to check their vals
- FieldRefSet changedImmutableFields;
-
- // Check to see if there were no fields specified or if we are not validating
- // The case if a range query, or query that didn't result in saved fields
- if (updatedFields.empty() || !opts.enforceOkForStorage) {
- if (opts.enforceOkForStorage) {
- // No specific fields were updated so the whole doc must be checked
- Status s = storageValid(updated, true);
- if (!s.isOK())
- return s;
- }
-
- // Check all immutable fields
- if (immutableAndSingleValueFields)
- changedImmutableFields.fillFrom(*immutableAndSingleValueFields);
- }
- else {
-
- // TODO: Change impl so we don't need to create a new FieldRefSet
- // -- move all conflict logic into static function on FieldRefSet?
- FieldRefSet immutableFieldRef;
- if (immutableAndSingleValueFields)
- immutableFieldRef.fillFrom(*immutableAndSingleValueFields);
-
- FieldRefSet::const_iterator where = updatedFields.begin();
- const FieldRefSet::const_iterator end = updatedFields.end();
- for( ; where != end; ++where) {
- const FieldRef& current = **where;
-
- // Find the updated field in the updated document.
- mutablebson::ConstElement newElem = updated.root();
- size_t currentPart = 0;
- while (newElem.ok() && currentPart < current.numParts())
- newElem = newElem[current.getPart(currentPart++)];
-
- // newElem might be missing if $unset/$renamed-away
- if (newElem.ok()) {
- Status s = storageValid(newElem, true);
- if (!s.isOK())
- return s;
- }
- // Check if the updated field conflicts with immutable fields
- immutableFieldRef.findConflicts(&current, &changedImmutableFields);
- }
- }
-
- const bool checkIdField = (updatedFields.empty() && !original.isEmpty()) ||
- updatedFields.findConflicts(&idFieldRef, NULL);
-
- // Add _id to fields to check since it too is immutable
- if (checkIdField)
- changedImmutableFields.keepShortest(&idFieldRef);
- else if (changedImmutableFields.empty()) {
- // Return early if nothing changed which is immutable
- return Status::OK();
- }
-
- LOG(4) << "Changed immutable fields: " << changedImmutableFields;
- // 2.) Now compare values of the changed immutable fields (to make sure they haven't)
-
- const mutablebson::ConstElement newIdElem = updated.root()[idFieldName];
-
- FieldRefSet::const_iterator where = changedImmutableFields.begin();
- const FieldRefSet::const_iterator end = changedImmutableFields.end();
- for( ; where != end; ++where ) {
- const FieldRef& current = **where;
-
- // Find the updated field in the updated document.
- mutablebson::ConstElement newElem = updated.root();
- size_t currentPart = 0;
- while (newElem.ok() && currentPart < current.numParts())
- newElem = newElem[current.getPart(currentPart++)];
-
- if (!newElem.ok()) {
- if (original.isEmpty()) {
- // If the _id is missing and not required, then skip this check
- if (!(current.dottedField() == idFieldName))
- return Status(ErrorCodes::NoSuchKey,
- mongoutils::str::stream()
- << "After applying the update, the new"
- << " document was missing the '"
- << current.dottedField()
- << "' (required and immutable) field.");
-
- }
- else {
- if (current.dottedField() != idFieldName)
- return Status(ErrorCodes::ImmutableField,
- mongoutils::str::stream()
- << "After applying the update to the document with "
- << newIdElem.toString()
- << ", the '" << current.dottedField()
- << "' (required and immutable) field was "
- "found to have been removed --"
- << original);
- }
- }
- else {
-
- // Find the potentially affected field in the original document.
- const BSONElement oldElem = original.getFieldDotted(current.dottedField());
- const BSONElement oldIdElem = original.getField(idFieldName);
-
- // Ensure no arrays since neither _id nor shard keys can be in an array, or one.
- mb::ConstElement currElem = newElem;
- while (currElem.ok()) {
- if (currElem.getType() == Array) {
- return Status(ErrorCodes::NotSingleValueField,
- mongoutils::str::stream()
- << "After applying the update to the document {"
- << (oldIdElem.ok() ? oldIdElem.toString() :
- newIdElem.toString())
- << " , ...}, the (immutable) field '"
- << current.dottedField()
- << "' was found to be an array or array descendant.");
- }
- currElem = currElem.parent();
- }
-
- // If we have both (old and new), compare them. If we just have new we are good
- if (oldElem.ok() && newElem.compareWithBSONElement(oldElem, false) != 0) {
- return Status(ErrorCodes::ImmutableField,
- mongoutils::str::stream()
- << "After applying the update to the document {"
- << (oldIdElem.ok() ? oldIdElem.toString() :
- newIdElem.toString())
- << " , ...}, the (immutable) field '" << current.dottedField()
- << "' was found to have been altered to "
- << newElem.toString());
- }
- }
- }
-
- return Status::OK();
- }
-
- Status ensureIdAndFirst(mb::Document& doc) {
- mb::Element idElem = mb::findFirstChildNamed(doc.root(), idFieldName);
-
- // Move _id as first element if it exists
- if (idElem.ok()) {
- if (idElem.leftSibling().ok()) {
- Status s = idElem.remove();
- if (!s.isOK())
- return s;
- s = doc.root().pushFront(idElem);
- if (!s.isOK())
- return s;
- }
- }
- else {
- // Create _id if the document does not currently have one.
- idElem = doc.makeElementNewOID(idFieldName);
- if (!idElem.ok())
- return Status(ErrorCodes::BadValue,
- "Could not create new _id ObjectId element.",
- 17268);
- Status s = doc.root().pushFront(idElem);
- if (!s.isOK())
- return s;
- }
-
- return Status::OK();
-
- }
} // namespace
UpdateResult update(Database* db,
@@ -455,245 +103,53 @@ namespace mongo {
}
PlanExecutor* rawExec;
- Status status = cq ?
- getExecutor(request.getOpCtx(), collection, cqHolder.release(), &rawExec) :
- getExecutor(request.getOpCtx(), collection, nsString.ns(), request.getQuery(),
- &rawExec);
+ Status status = Status::OK();
+ if (cq) {
+ // This is the regular path for when we have a CanonicalQuery.
+ status = getExecutorUpdate(request.getOpCtx(), db, cqHolder.release(), &request, driver,
+ opDebug, &rawExec);
+ }
+ else {
+ // This is the idhack fast-path for getting a PlanExecutor without doing the work
+ // to create a CanonicalQuery.
+ status = getExecutorUpdate(request.getOpCtx(), db, nsString.ns(), &request, driver,
+ opDebug, &rawExec);
+ }
uassert(17243,
"could not get executor" + request.getQuery().toString() + "; " + causedBy(status),
status.isOK());
// Create the plan executor and setup all deps.
- auto_ptr<PlanExecutor> exec(rawExec);
+ scoped_ptr<PlanExecutor> exec(rawExec);
// Register executor with the collection cursor cache.
const ScopedExecutorRegistration safety(exec.get());
- // Get the canonical query which the underlying executor is using. This may be NULL in
- // the case of idhack updates.
- cq = exec->getCanonicalQuery();
-
- //
- // We'll start assuming we have one or more documents for this update. (Otherwise,
- // we'll fall-back to insert case (if upsert is true).)
- //
-
- // We are an update until we fall into the insert case below.
- driver->setContext(ModifierInterface::ExecInfo::UPDATE_CONTEXT);
-
- int numMatched = 0;
-
- // If the update was in-place, we may see it again. This only matters if we're doing
- // a multi-update; if we're not doing a multi-update we stop after one update and we
- // won't see any more docs.
- //
- // For example: If we're scanning an index {x:1} and performing {$inc:{x:5}}, we'll keep
- // moving the document forward and it will continue to reappear in our index scan.
- // Unless the index is multikey, the underlying query machinery won't de-dup.
- //
- // If the update wasn't in-place we may see it again. Our query may return the new
- // document and we wouldn't want to update that.
- //
- // So, no matter what, we keep track of where the doc wound up.
- typedef unordered_set<DiskLoc, DiskLoc::Hasher> DiskLocSet;
- const scoped_ptr<DiskLocSet> updatedLocs(request.isMulti() ? new DiskLocSet : NULL);
-
- // Reset these counters on each call. We might re-enter this function to retry this
- // update if we throw a page fault exception below, and we rely on these counters
- // reflecting only the actions taken locally. In particlar, we must have the no-op
- // counter reset so that we can meaningfully comapre it with numMatched above.
- opDebug->nModified = 0;
-
- // -1 for these fields means that we don't have a value. Once the update completes
- // we request these values from the plan executor.
- opDebug->nscanned = -1;
- opDebug->nscannedObjects = -1;
-
- // Get the cached document from the update driver.
- mutablebson::Document& doc = driver->getDocument();
- mutablebson::DamageVector damages;
-
- // Used during iteration of docs
- BSONObj oldObj;
-
- // Get first doc, and location
- PlanExecutor::ExecState state = PlanExecutor::ADVANCED;
-
- uassert(ErrorCodes::NotMaster,
- mongoutils::str::stream() << "Not primary while updating " << nsString.ns(),
- !request.shouldCallLogOp()
- || repl::getGlobalReplicationCoordinator()->canAcceptWritesForDatabase(
- nsString.db()));
-
- while (true) {
- // Get next doc, and location
- DiskLoc loc;
- state = exec->getNext(&oldObj, &loc);
-
- if (state != PlanExecutor::ADVANCED) {
- if (state == PlanExecutor::IS_EOF) {
- // We have reached the logical end of the loop, so do yielding recovery
- break;
- }
- else {
- uassertStatusOK(Status(ErrorCodes::InternalError,
- str::stream() << " Update query failed -- "
- << PlanExecutor::statestr(state)));
- }
- }
-
- // We fill this with the new locs of moved doc so we don't double-update.
- if (updatedLocs && updatedLocs->count(loc) > 0) {
- continue;
- }
-
- numMatched++;
-
- // Ask the driver to apply the mods. It may be that the driver can apply those "in
- // place", that is, some values of the old document just get adjusted without any
- // change to the binary layout on the bson layer. It may be that a whole new
- // document is needed to accomodate the new bson layout of the resulting document.
- doc.reset(oldObj, mutablebson::Document::kInPlaceEnabled);
- BSONObj logObj;
-
-
- FieldRefSet updatedFields;
-
- Status status = Status::OK();
- if (!driver->needMatchDetails()) {
- // If we don't need match details, avoid doing the rematch
- status = driver->update(StringData(), &doc, &logObj, &updatedFields);
- }
- else {
- // If there was a matched field, obtain it.
- MatchDetails matchDetails;
- matchDetails.requestElemMatchKey();
-
- dassert(cq);
- verify(cq->root()->matchesBSON(oldObj, &matchDetails));
-
- string matchedField;
- if (matchDetails.hasElemMatchKey())
- matchedField = matchDetails.elemMatchKey();
-
- // TODO: Right now, each mod checks in 'prepare' that if it needs positional
- // data, that a non-empty StringData() was provided. In principle, we could do
- // that check here in an else clause to the above conditional and remove the
- // checks from the mods.
-
- status = driver->update(matchedField, &doc, &logObj, &updatedFields);
- }
+ // Run the plan (don't need to collect results because UpdateStage always returns
+ // NEED_TIME).
+ uassertStatusOK(exec->executePlan());
- if (!status.isOK()) {
- uasserted(16837, status.reason());
- }
-
- // Ensure _id exists and is first
- uassertStatusOK(ensureIdAndFirst(doc));
-
- // If the driver applied the mods in place, we can ask the mutable for what
- // changed. We call those changes "damages". :) We use the damages to inform the
- // journal what was changed, and then apply them to the original document
- // ourselves. If, however, the driver applied the mods out of place, we ask it to
- // generate a new, modified document for us. In that case, the file manager will
- // take care of the journaling details for us.
- //
- // This code flow is admittedly odd. But, right now, journaling is baked in the file
- // manager. And if we aren't using the file manager, we have to do jounaling
- // ourselves.
- bool docWasModified = false;
- BSONObj newObj;
- const char* source = NULL;
- bool inPlace = doc.getInPlaceUpdates(&damages, &source);
-
- // If something changed in the document, verify that no immutable fields were changed
- // and data is valid for storage.
- if ((!inPlace || !damages.empty()) ) {
- if (!(request.isFromReplication() || request.isFromMigration())) {
- const std::vector<FieldRef*>* immutableFields = NULL;
- if (lifecycle)
- immutableFields = lifecycle->getImmutableFields();
-
- uassertStatusOK(validate(oldObj,
- updatedFields,
- doc,
- immutableFields,
- driver->modOptions()) );
- }
- }
-
- WriteUnitOfWork wunit(request.getOpCtx()->recoveryUnit());
-
- // Save state before making changes
- exec->saveState();
-
- if (inPlace && !driver->modsAffectIndices()) {
- // If a set of modifiers were all no-ops, we are still 'in place', but there is
- // no work to do, in which case we want to consider the object unchanged.
- if (!damages.empty() ) {
- collection->updateDocumentWithDamages(request.getOpCtx(), loc, source, damages);
- docWasModified = true;
- opDebug->fastmod = true;
- }
-
- newObj = oldObj;
- }
- else {
- // The updates were not in place. Apply them through the file manager.
-
- // XXX: With experimental document-level locking, we do not hold the sufficient
- // locks, so this would cause corruption.
- fassert(18516, !useExperimentalDocLocking);
-
- newObj = doc.getObject();
- uassert(17419,
- str::stream() << "Resulting document after update is larger than "
- << BSONObjMaxUserSize,
- newObj.objsize() <= BSONObjMaxUserSize);
- StatusWith<DiskLoc> res = collection->updateDocument(request.getOpCtx(),
- loc,
- newObj,
- true,
- opDebug);
- uassertStatusOK(res.getStatus());
- DiskLoc newLoc = res.getValue();
- docWasModified = true;
-
- // If the document moved, we might see it again in a collection scan (maybe it's
- // a document after our current document).
- //
- // If the document is indexed and the mod changes an indexed value, we might see it
- // again. For an example, see the comment above near declaration of updatedLocs.
- if (updatedLocs && (newLoc != loc || driver->modsAffectIndices())) {
- updatedLocs->insert(newLoc);
- }
- }
-
- // Restore state after modification
- uassert(17278,
- "Update could not restore plan executor state after updating a document.",
- exec->restoreState(request.getOpCtx()));
-
- // Call logOp if requested.
- if (request.shouldCallLogOp() && !logObj.isEmpty()) {
- BSONObj idQuery = driver->makeOplogEntryQuery(newObj, request.isMulti());
- repl::logOp(request.getOpCtx(), "u", nsString.ns().c_str(), logObj, &idQuery,
- NULL, request.isFromMigration());
- }
+ // Get stats from the root stage.
+ invariant(exec->getRootStage()->stageType() == STAGE_UPDATE);
+ UpdateStage* updateStage = static_cast<UpdateStage*>(exec->getRootStage());
+ const UpdateStats* updateStats =
+ static_cast<const UpdateStats*>(updateStage->getSpecificStats());
- wunit.commit();
+ // Use stats from the root stage to fill out opDebug.
+ opDebug->nMatched = updateStats->nMatched;
+ opDebug->nModified = updateStats->nModified;
+ opDebug->upsert = updateStats->inserted;
+ opDebug->fastmodinsert = updateStats->fastmodinsert;
+ opDebug->fastmod = updateStats->fastmod;
- // Only record doc modifications if they wrote (exclude no-ops)
- if (docWasModified)
- opDebug->nModified++;
-
- if (!request.isMulti()) {
- break;
- }
-
- // Opportunity for journaling to write during the update.
- request.getOpCtx()->recoveryUnit()->commitIfNeeded();
+ // Historically, 'opDebug' considers 'nMatched' and 'nModified' to be 1 (rather than 0) if
+ // there is an upsert that inserts a document. The UpdateStage does not participate in this
+ // madness in order to have saner stats reporting for explain. This means that we have to
+ // set these values "manually" in the case of an insert.
+ if (updateStats->inserted) {
+ opDebug->nMatched = 1;
+ opDebug->nModified = 1;
}
// Get summary information about the plan.
@@ -702,121 +158,11 @@ namespace mongo {
opDebug->nscanned = stats.totalKeysExamined;
opDebug->nscannedObjects = stats.totalDocsExamined;
- // TODO: Can this be simplified?
- if ((numMatched > 0) || (numMatched == 0 && !request.isUpsert()) ) {
- opDebug->nMatched = numMatched;
- return UpdateResult(numMatched > 0 /* updated existing object(s) */,
- !driver->isDocReplacement() /* $mod or obj replacement */,
- opDebug->nModified /* number of modified docs, no no-ops */,
- numMatched /* # of docs matched/updated, even no-ops */,
- BSONObj());
- }
-
- //
- // We haven't found any existing document so an insert is done
- // (upsert is true).
- //
- opDebug->upsert = true;
-
- // Since this is an insert (no docs found and upsert:true), we will be logging it
- // as an insert in the oplog. We don't need the driver's help to build the
- // oplog record, then. We also set the context of the update driver to the INSERT_CONTEXT.
- // Some mods may only work in that context (e.g. $setOnInsert).
- driver->setLogOp(false);
- driver->setContext(ModifierInterface::ExecInfo::INSERT_CONTEXT);
-
- // Reset the document we will be writing to
- doc.reset();
-
- // This remains the empty object in the case of an object replacement, but in the case
- // of an upsert where we are creating a base object from the query and applying mods,
- // we capture the query as the original so that we can detect immutable field mutations.
- BSONObj original = BSONObj();
-
- // Calling createFromQuery will populate the 'doc' with fields from the query which
- // creates the base of the update for the inserterd doc (because upsert was true)
- if (cq) {
- uassertStatusOK(driver->populateDocumentWithQueryFields(cq, doc));
- if (!driver->isDocReplacement()) {
- opDebug->fastmodinsert = true;
- // We need all the fields from the query to compare against for validation below.
- original = doc.getObject();
- }
- else {
- original = request.getQuery();
- }
- }
- else {
- fassert(17354, CanonicalQuery::isSimpleIdQuery(request.getQuery()));
- BSONElement idElt = request.getQuery()["_id"];
- original = idElt.wrap();
- fassert(17352, doc.root().appendElement(idElt));
- }
-
- // Apply the update modifications and then log the update as an insert manually.
- status = driver->update(StringData(), &doc);
- if (!status.isOK()) {
- uasserted(16836, status.reason());
- }
-
- // Ensure _id exists and is first
- uassertStatusOK(ensureIdAndFirst(doc));
-
- // Validate that the object replacement or modifiers resulted in a document
- // that contains all the immutable keys and can be stored if it isn't coming
- // from a migration or via replication.
- if (!(request.isFromReplication() || request.isFromMigration())){
- const std::vector<FieldRef*>* immutableFields = NULL;
- if (lifecycle)
- immutableFields = lifecycle->getImmutableFields();
-
- FieldRefSet noFields;
- // This will only validate the modified fields if not a replacement.
- uassertStatusOK(validate(original,
- noFields,
- doc,
- immutableFields,
- driver->modOptions()) );
- }
-
- // Insert the doc
- BSONObj newObj = doc.getObject();
- uassert(17420,
- str::stream() << "Document to upsert is larger than " << BSONObjMaxUserSize,
- newObj.objsize() <= BSONObjMaxUserSize);
-
- WriteUnitOfWork wunit(request.getOpCtx()->recoveryUnit());
- // Only create the collection if the doc will be inserted.
- if (!collection) {
- collection = db->getCollection(request.getOpCtx(), request.getNamespaceString().ns());
- if (!collection) {
- collection = db->createCollection(request.getOpCtx(), request.getNamespaceString().ns());
- }
- }
-
-
- StatusWith<DiskLoc> newLoc = collection->insertDocument(request.getOpCtx(),
- newObj,
- !request.isGod() /*enforceQuota*/);
- uassertStatusOK(newLoc.getStatus());
- if (request.shouldCallLogOp()) {
- repl::logOp(request.getOpCtx(),
- "i",
- nsString.ns().c_str(),
- newObj,
- NULL,
- NULL,
- request.isFromMigration());
- }
-
- wunit.commit();
-
- opDebug->nMatched = 1;
- return UpdateResult(false /* updated a non existing document */,
- !driver->isDocReplacement() /* $mod or obj replacement? */,
- 1 /* docs written*/,
- 1 /* count of updated documents */,
- newObj /* object that was upserted */ );
+ return UpdateResult(updateStats->nMatched > 0 /* Did we update at least one obj? */,
+ !driver->isDocReplacement() /* $mod or obj replacement */,
+ opDebug->nModified /* number of modified docs, no no-ops */,
+ opDebug->nMatched /* # of docs matched/updated, even no-ops */,
+ updateStats->objInserted);
}
BSONObj applyUpdateOperators(const BSONObj& from, const BSONObj& operators) {
diff --git a/src/mongo/db/query/get_executor.cpp b/src/mongo/db/query/get_executor.cpp
index 7a347c203d8..b2a84a18eb0 100644
--- a/src/mongo/db/query/get_executor.cpp
+++ b/src/mongo/db/query/get_executor.cpp
@@ -42,6 +42,7 @@
#include "mongo/db/exec/projection.h"
#include "mongo/db/exec/shard_filter.h"
#include "mongo/db/exec/subplan.h"
+#include "mongo/db/exec/update.h"
#include "mongo/db/query/canonical_query.h"
#include "mongo/db/query/explain.h"
#include "mongo/db/query/query_settings.h"
@@ -458,6 +459,10 @@ namespace mongo {
return Status::OK();
}
+ //
+ // Delete
+ //
+
Status getExecutorDelete(OperationContext* txn,
Collection* collection,
CanonicalQuery* rawCanonicalQuery,
@@ -529,6 +534,84 @@ namespace mongo {
}
//
+ // Update
+ //
+
+ Status getExecutorUpdate(OperationContext* txn,
+ Database* db,
+ CanonicalQuery* rawCanonicalQuery,
+ const UpdateRequest* request,
+ UpdateDriver* driver,
+ OpDebug* opDebug,
+ PlanExecutor** execOut) {
+ auto_ptr<CanonicalQuery> canonicalQuery(rawCanonicalQuery);
+ auto_ptr<WorkingSet> ws(new WorkingSet());
+ Collection* collection = db->getCollection(request->getOpCtx(),
+ request->getNamespaceString().ns());
+
+ PlanStage* root;
+ QuerySolution* querySolution;
+ Status status = prepareExecution(txn, collection, ws.get(), canonicalQuery.get(), 0, &root,
+ &querySolution);
+ if (!status.isOK()) {
+ return status;
+ }
+ invariant(root);
+ UpdateStageParams updateStageParams(request, driver, opDebug);
+ updateStageParams.canonicalQuery = rawCanonicalQuery;
+ root = new UpdateStage(updateStageParams, ws.get(), db, root);
+ // We must have a tree of stages in order to have a valid plan executor, but the query
+ // solution may be null. Takes ownership of all args other than 'collection'.
+ *execOut = new PlanExecutor(ws.release(), root, querySolution, canonicalQuery.release(),
+ collection);
+ return Status::OK();
+ }
+
+ Status getExecutorUpdate(OperationContext* txn,
+ Database* db,
+ const std::string& ns,
+ const UpdateRequest* request,
+ UpdateDriver* driver,
+ OpDebug* opDebug,
+ PlanExecutor** execOut) {
+ auto_ptr<WorkingSet> ws(new WorkingSet());
+ Collection* collection = db->getCollection(request->getOpCtx(),
+ request->getNamespaceString().ns());
+ const BSONObj& unparsedQuery = request->getQuery();
+
+ UpdateStageParams updateStageParams(request, driver, opDebug);
+ if (!collection) {
+ LOG(2) << "Collection " << ns << " does not exist."
+ << " Using EOF stage: " << unparsedQuery.toString();
+ UpdateStage* updateStage = new UpdateStage(updateStageParams, ws.get(), db,
+ new EOFStage());
+ *execOut = new PlanExecutor(ws.release(), updateStage, ns);
+ return Status::OK();
+ }
+
+ if (CanonicalQuery::isSimpleIdQuery(unparsedQuery) &&
+ collection->getIndexCatalog()->findIdIndex()) {
+ LOG(2) << "Using idhack: " << unparsedQuery.toString();
+
+ PlanStage* idHackStage = new IDHackStage(txn, collection, unparsedQuery["_id"].wrap(),
+ ws.get());
+ UpdateStage* root = new UpdateStage(updateStageParams, ws.get(), db, idHackStage);
+ *execOut = new PlanExecutor(ws.release(), root, collection);
+ return Status::OK();
+ }
+
+ const WhereCallbackReal whereCallback(txn, collection->ns().db());
+ CanonicalQuery* cq;
+ Status status = CanonicalQuery::canonicalize(collection->ns(), unparsedQuery, &cq,
+ whereCallback);
+ if (!status.isOK())
+ return status;
+
+ // Takes ownership of 'cq'.
+ return getExecutorUpdate(txn, db, cq, request, driver, opDebug, execOut);
+ }
+
+ //
// Count hack
//
diff --git a/src/mongo/db/query/get_executor.h b/src/mongo/db/query/get_executor.h
index 3e8cb781bd0..cb49007260f 100644
--- a/src/mongo/db/query/get_executor.h
+++ b/src/mongo/db/query/get_executor.h
@@ -31,10 +31,13 @@
#include "mongo/db/query/query_planner_params.h"
#include "mongo/db/query/query_settings.h"
#include "mongo/db/query/query_solution.h"
+#include "mongo/db/ops/update_driver.h"
+#include "mongo/db/ops/update_request.h"
namespace mongo {
class Collection;
+ class Database;
/**
* Filter indexes retrieved from index catalog by
@@ -121,6 +124,10 @@ namespace mongo {
const BSONObj& hintObj,
PlanExecutor** execOut);
+ //
+ // Delete
+ //
+
/**
* Get a PlanExecutor for a delete operation. 'rawCanonicalQuery' describes the predicate for
* the documents to be deleted. A write lock is required to execute the returned plan.
@@ -156,4 +163,46 @@ namespace mongo {
bool shouldCallLogOp,
PlanExecutor** execOut);
+ //
+ // Update
+ //
+
+ /**
+ * Get a PlanExecutor for an update operation. 'rawCanonicalQuery' describes the predicate for
+ * the documents to be deleted. A write lock is required to execute the returned plan.
+ *
+ * Takes ownership of 'rawCanonicalQuery'. Does not take ownership of other args.
+ *
+ * If the query is valid and an executor could be created, returns Status::OK() and populates
+ * *out with the PlanExecutor.
+ *
+ * If the query cannot be executed, returns a Status indicating why.
+ */
+ Status getExecutorUpdate(OperationContext* txn,
+ Database* db,
+ CanonicalQuery* rawCanonicalQuery,
+ const UpdateRequest* request,
+ UpdateDriver* driver,
+ OpDebug* opDebug,
+ PlanExecutor** execOut);
+
+ /**
+ * Overload of getExecutorUpdate() above, for when a canonicalQuery is not available. Used to
+ * support idhack-powered updates.
+ *
+ * If the query is valid and an executor could be created, returns Status::OK() and populates
+ * *out with the PlanExecutor.
+ *
+ * Does not take ownership of its arguments.
+ *
+ * If the query cannot be executed, returns a Status indicating why.
+ */
+ Status getExecutorUpdate(OperationContext* txn,
+ Database* db,
+ const std::string& ns,
+ const UpdateRequest* request,
+ UpdateDriver* driver,
+ OpDebug* opDebug,
+ PlanExecutor** execOut);
+
} // namespace mongo
diff --git a/src/mongo/db/query/stage_types.h b/src/mongo/db/query/stage_types.h
index 791145cce91..a2635ef8d22 100644
--- a/src/mongo/db/query/stage_types.h
+++ b/src/mongo/db/query/stage_types.h
@@ -88,6 +88,8 @@ namespace mongo {
STAGE_SUBPLAN,
STAGE_TEXT,
STAGE_UNKNOWN,
+
+ STAGE_UPDATE,
};
} // namespace mongo
diff --git a/src/mongo/dbtests/query_stage_update.cpp b/src/mongo/dbtests/query_stage_update.cpp
new file mode 100644
index 00000000000..a8975e40d05
--- /dev/null
+++ b/src/mongo/dbtests/query_stage_update.cpp
@@ -0,0 +1,364 @@
+/**
+ * Copyright (C) 2014 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+/**
+ * This file tests db/exec/update.cpp (UpdateStage).
+ */
+
+#include "mongo/db/catalog/collection.h"
+#include "mongo/db/catalog/database.h"
+#include "mongo/db/exec/collection_scan.h"
+#include "mongo/db/exec/eof.h"
+#include "mongo/db/exec/update.h"
+#include "mongo/db/exec/working_set.h"
+#include "mongo/db/jsobj.h"
+#include "mongo/db/json.h"
+#include "mongo/db/operation_context_impl.h"
+#include "mongo/db/ops/update_driver.h"
+#include "mongo/db/ops/update_lifecycle_impl.h"
+#include "mongo/db/ops/update_request.h"
+#include "mongo/db/query/canonical_query.h"
+#include "mongo/dbtests/dbtests.h"
+
+namespace QueryStageUpdate {
+
+ class QueryStageUpdateBase {
+ public:
+ QueryStageUpdateBase()
+ : _client(&_txn),
+ _ns("unittests.QueryStageUpdate"),
+ _nsString(StringData(ns())) {
+ Client::WriteContext ctx(&_txn, ns());
+ _client.dropCollection(ns());
+ ctx.commit();
+ }
+
+ virtual ~QueryStageUpdateBase() {
+ Client::WriteContext ctx(&_txn, ns());
+ _client.dropCollection(ns());
+ ctx.commit();
+ }
+
+ void insert(const BSONObj& doc) {
+ _client.insert(ns(), doc);
+ }
+
+ void remove(const BSONObj& obj) {
+ _client.remove(ns(), obj);
+ }
+
+ size_t count(const BSONObj& query) {
+ return _client.count(ns(), query, 0, 0, 0);
+ }
+
+ CanonicalQuery* canonicalize(const BSONObj& query) {
+ CanonicalQuery* cq;
+ Status status = CanonicalQuery::canonicalize(ns(), query, &cq);
+ ASSERT_OK(status);
+ return cq;
+ }
+
+ /**
+ * Runs the update operation by calling work until EOF. Asserts that
+ * the update stage always returns NEED_TIME.
+ */
+ void runUpdate(UpdateStage* updateStage) {
+ WorkingSetID id = WorkingSet::INVALID_ID;
+ PlanStage::StageState state = PlanStage::NEED_TIME;
+ while (PlanStage::IS_EOF != state) {
+ ASSERT_EQUALS(PlanStage::NEED_TIME, state);
+ state = updateStage->work(&id);
+ }
+ }
+
+ /**
+ * Returns a vector of all of the documents currently in 'collection'.
+ *
+ * Uses a forward collection scan stage to get the docs, and populates 'out' with
+ * the results.
+ */
+ void getCollContents(Collection* collection, vector<BSONObj>* out) {
+ WorkingSet ws;
+
+ CollectionScanParams params;
+ params.collection = collection;
+ params.direction = CollectionScanParams::FORWARD;
+ params.tailable = false;
+
+ scoped_ptr<CollectionScan> scan(new CollectionScan(&_txn, params, &ws, NULL));
+ while (!scan->isEOF()) {
+ WorkingSetID id = WorkingSet::INVALID_ID;
+ PlanStage::StageState state = scan->work(&id);
+ if (PlanStage::ADVANCED == state) {
+ WorkingSetMember* member = ws.get(id);
+ verify(member->hasObj());
+ out->push_back(member->obj);
+ }
+ }
+ }
+
+ void getLocs(Collection* collection,
+ CollectionScanParams::Direction direction,
+ vector<DiskLoc>* out) {
+ WorkingSet ws;
+
+ CollectionScanParams params;
+ params.collection = collection;
+ params.direction = direction;
+ params.tailable = false;
+
+ scoped_ptr<CollectionScan> scan(new CollectionScan(&_txn, params, &ws, NULL));
+ while (!scan->isEOF()) {
+ WorkingSetID id = WorkingSet::INVALID_ID;
+ PlanStage::StageState state = scan->work(&id);
+ if (PlanStage::ADVANCED == state) {
+ WorkingSetMember* member = ws.get(id);
+ verify(member->hasLoc());
+ out->push_back(member->loc);
+ }
+ }
+ }
+
+ /**
+ * Asserts that 'objs' contains 'expectedDoc'.
+ */
+ void assertHasDoc(const vector<BSONObj>& objs, const BSONObj& expectedDoc) {
+ bool foundDoc = false;
+ for (size_t i = 0; i < objs.size(); i++) {
+ if (0 == objs[i].woCompare(expectedDoc)) {
+ foundDoc = true;
+ break;
+ }
+ }
+ ASSERT(foundDoc);
+ }
+
+ const char* ns() { return _ns.c_str(); }
+
+ const NamespaceString& nsString() { return _nsString; }
+
+ protected:
+ OperationContextImpl _txn;
+
+ private:
+ DBDirectClient _client;
+
+ std::string _ns;
+ NamespaceString _nsString;
+ };
+
+ /**
+ * Test an upsert into an empty collection.
+ */
+ class QueryStageUpdateUpsertEmptyColl : public QueryStageUpdateBase {
+ public:
+ void run() {
+ // Run the update.
+ {
+ Client::WriteContext ctx(&_txn, ns());
+ Client& c = cc();
+ CurOp& curOp = *c.curop();
+ OpDebug* opDebug = &curOp.debug();
+ UpdateDriver driver( (UpdateDriver::Options()) );
+ Database* db = ctx.ctx().db();
+
+ // Collection should be empty.
+ ASSERT_EQUALS(0U, count(BSONObj()));
+
+ UpdateRequest request(&_txn, nsString());
+ UpdateLifecycleImpl updateLifecycle(false, nsString());
+ request.setLifecycle(&updateLifecycle);
+
+ // Update is the upsert {_id: 0, x: 1}, {$set: {y: 2}}.
+ BSONObj query = fromjson("{_id: 0, x: 1}");
+ BSONObj updates = fromjson("{$set: {y: 2}}");
+
+ request.setUpsert();
+ request.setQuery(query);
+ request.setUpdates(updates);
+
+ ASSERT_OK(driver.parse(request.getUpdates(), request.isMulti()));
+
+ // Setup update params.
+ UpdateStageParams params(&request, &driver, opDebug);
+ scoped_ptr<CanonicalQuery> cq(canonicalize(query));
+ params.canonicalQuery = cq.get();
+
+ scoped_ptr<WorkingSet> ws(new WorkingSet());
+ auto_ptr<EOFStage> eofStage(new EOFStage());
+
+ scoped_ptr<UpdateStage> updateStage(
+ new UpdateStage(params, ws.get(), db, eofStage.release()));
+
+ runUpdate(updateStage.get());
+ ctx.commit();
+ }
+
+ // Verify the contents of the resulting collection.
+ {
+ Client::ReadContext ctx(&_txn, ns());
+ Collection* collection = ctx.ctx().db()->getCollection(&_txn, ns());
+
+ vector<BSONObj> objs;
+ getCollContents(collection, &objs);
+
+ // Expect a single document, {_id: 0, x: 1, y: 2}.
+ ASSERT_EQUALS(1U, objs.size());
+ ASSERT_EQUALS(objs[0], fromjson("{_id: 0, x: 1, y: 2}"));
+ }
+ }
+ };
+
+ /**
+ * Test receipt of an invalidation: case in which the document about to updated
+ * is deleted.
+ */
+ class QueryStageUpdateSkipInvalidatedDoc : public QueryStageUpdateBase {
+ public:
+ void run() {
+ // Run the update.
+ {
+ Client::WriteContext ctx(&_txn, ns());
+
+ // Populate the collection.
+ for (int i = 0; i < 10; ++i) {
+ insert(BSON("_id" << i << "foo" << i));
+ }
+ ASSERT_EQUALS(10U, count(BSONObj()));
+
+ Client& c = cc();
+ CurOp& curOp = *c.curop();
+ OpDebug* opDebug = &curOp.debug();
+ UpdateDriver driver( (UpdateDriver::Options()) );
+ Database* db = ctx.ctx().db();
+ Collection* coll = db->getCollection(&_txn, ns());
+
+ // Get the DiskLocs that would be returned by an in-order scan.
+ vector<DiskLoc> locs;
+ getLocs(coll, CollectionScanParams::FORWARD, &locs);
+
+ UpdateRequest request(&_txn, nsString());
+ UpdateLifecycleImpl updateLifecycle(false, nsString());
+ request.setLifecycle(&updateLifecycle);
+
+ // Update is a multi-update that sets 'bar' to 3 in every document
+ // where foo is less than 5.
+ BSONObj query = fromjson("{foo: {$lt: 5}}");
+ BSONObj updates = fromjson("{$set: {bar: 3}}");
+
+ request.setMulti();
+ request.setQuery(query);
+ request.setUpdates(updates);
+
+ ASSERT_OK(driver.parse(request.getUpdates(), request.isMulti()));
+
+ // Configure the scan.
+ CollectionScanParams collScanParams;
+ collScanParams.collection = coll;
+ collScanParams.direction = CollectionScanParams::FORWARD;
+ collScanParams.tailable = false;
+
+ // Configure the update.
+ UpdateStageParams updateParams(&request, &driver, opDebug);
+ scoped_ptr<CanonicalQuery> cq(canonicalize(query));
+ updateParams.canonicalQuery = cq.get();
+
+ scoped_ptr<WorkingSet> ws(new WorkingSet());
+ auto_ptr<CollectionScan> cs(
+ new CollectionScan(&_txn, collScanParams, ws.get(), cq->root()));
+
+ scoped_ptr<UpdateStage> updateStage(
+ new UpdateStage(updateParams, ws.get(), db, cs.release()));
+
+ const UpdateStats* stats =
+ static_cast<const UpdateStats*>(updateStage->getSpecificStats());
+
+ const size_t targetDocIndex = 3;
+
+ while (stats->nModified < targetDocIndex) {
+ WorkingSetID id = WorkingSet::INVALID_ID;
+ PlanStage::StageState state = updateStage->work(&id);
+ ASSERT_EQUALS(PlanStage::NEED_TIME, state);
+ }
+
+ // Remove locs[targetDocIndex];
+ updateStage->saveState();
+ updateStage->invalidate(locs[targetDocIndex], INVALIDATION_DELETION);
+ BSONObj targetDoc = coll->docFor(locs[targetDocIndex]);
+ ASSERT(!targetDoc.isEmpty());
+ remove(targetDoc);
+ updateStage->restoreState(&_txn);
+
+ // Do the remaining updates.
+ while (!updateStage->isEOF()) {
+ WorkingSetID id = WorkingSet::INVALID_ID;
+ PlanStage::StageState state = updateStage->work(&id);
+ ASSERT(PlanStage::NEED_TIME == state || PlanStage::IS_EOF == state);
+ }
+
+ ctx.commit();
+
+ // 4 of the 5 matching documents should have been modified (one was deleted).
+ ASSERT_EQUALS(4U, stats->nModified);
+ ASSERT_EQUALS(4U, stats->nMatched);
+ }
+
+ // Check the contents of the collection.
+ {
+ Client::ReadContext ctx(&_txn, ns());
+ Collection* collection = ctx.ctx().db()->getCollection(&_txn, ns());
+
+ vector<BSONObj> objs;
+ getCollContents(collection, &objs);
+
+ // Verify that the collection now has 9 docs (one was deleted).
+ ASSERT_EQUALS(9U, objs.size());
+
+ // Make sure that the collection has certain documents.
+ assertHasDoc(objs, fromjson("{_id: 0, foo: 0, bar: 3}"));
+ assertHasDoc(objs, fromjson("{_id: 1, foo: 1, bar: 3}"));
+ assertHasDoc(objs, fromjson("{_id: 2, foo: 2, bar: 3}"));
+ assertHasDoc(objs, fromjson("{_id: 4, foo: 4, bar: 3}"));
+ assertHasDoc(objs, fromjson("{_id: 5, foo: 5}"));
+ assertHasDoc(objs, fromjson("{_id: 6, foo: 6}"));
+ }
+ }
+ };
+
+ class All : public Suite {
+ public:
+ All() : Suite("query_stage_update") {}
+
+ void setupTests() {
+ // Stage-specific tests below.
+ add<QueryStageUpdateUpsertEmptyColl>();
+ add<QueryStageUpdateSkipInvalidatedDoc>();
+ }
+ } all;
+
+} // namespace QueryStageUpdate