/** * Copyright (C) 2013 10gen Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault #include "mongo/db/ops/modifier_push.h" #include #include #include #include "mongo/base/error_codes.h" #include "mongo/bson/mutable/algorithm.h" #include "mongo/db/ops/field_checker.h" #include "mongo/db/ops/log_builder.h" #include "mongo/db/ops/path_support.h" #include "mongo/util/assert_util.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" namespace mongo { using std::abs; using std::numeric_limits; namespace mb = mutablebson; namespace str = mongoutils::str; namespace { const char kEach[] = "$each"; const char kSlice[] = "$slice"; const char kSort[] = "$sort"; const char kPosition[] = "$position"; bool isPatternElement(const BSONElement& pattern) { if (!pattern.isNumber()) { return false; } // Patterns can be only 1 or -1. double val = pattern.Number(); if (val != 1 && val != -1) { return false; } return true; } bool inEachMode(const BSONElement& modExpr) { if (modExpr.type() != Object) { return false; } BSONObj obj = modExpr.embeddedObject(); if (obj[kEach].type() == EOO) { return false; } return true; } Status parseEachMode(ModifierPush::ModifierPushMode pushMode, const BSONElement& modExpr, BSONElement* eachElem, BSONElement* sliceElem, BSONElement* sortElem, BSONElement* positionElem) { Status status = Status::OK(); // If in $pushAll mode, all we need is the array. if (pushMode == ModifierPush::PUSH_ALL) { if (modExpr.type() != Array) { return Status(ErrorCodes::BadValue, "$pushAll requires an array"); } *eachElem = modExpr; return Status::OK(); } // The $each clause must be an array. *eachElem = modExpr.embeddedObject()[kEach]; if (eachElem->type() != Array) { return Status(ErrorCodes::BadValue, str::stream() << "The argument to $each in $push must be" " an array but it was of type: " << typeName(eachElem->type())); } // There must be only one $each clause. bool seenEach = false; BSONObjIterator itMod(modExpr.embeddedObject()); while (itMod.more()) { BSONElement modElem = itMod.next(); if (mongoutils::str::equals(modElem.fieldName(), kEach)) { if (seenEach) { return Status(ErrorCodes::BadValue, "Only one $each clause is supported."); } seenEach = true; } } // Slice, sort, position are optional and may be present in any order. bool seenSlice = false; bool seenSort = false; bool seenPosition = false; BSONObjIterator itPush(modExpr.embeddedObject()); while (itPush.more()) { BSONElement elem = itPush.next(); if (mongoutils::str::equals(elem.fieldName(), kSlice)) { if (seenSlice) { return Status(ErrorCodes::BadValue, "Only one $slice clause is supported."); } *sliceElem = elem; seenSlice = true; } else if (mongoutils::str::equals(elem.fieldName(), kSort)) { if (seenSort) { return Status(ErrorCodes::BadValue, "Only one $sort clause is supported."); } *sortElem = elem; seenSort = true; } else if (mongoutils::str::equals(elem.fieldName(), kPosition)) { if (seenPosition) { return Status(ErrorCodes::BadValue, "Only one $position clause is supported."); } *positionElem = elem; seenPosition = true; } else if (!mongoutils::str::equals(elem.fieldName(), kEach)) { return Status(ErrorCodes::BadValue, str::stream() << "Unrecognized clause in $push: " << elem.fieldNameStringData()); } } return Status::OK(); } } // unnamed namespace struct ModifierPush::PreparedState { PreparedState(mutablebson::Document* targetDoc) : doc(*targetDoc), idxFound(0), elemFound(doc.end()), arrayPreModSize(0) {} // Document that is going to be changed. mutablebson::Document& doc; // Index in _fieldRef for which an Element exist in the document. size_t idxFound; // Element corresponding to _fieldRef[0.._idxFound]. mutablebson::Element elemFound; size_t arrayPreModSize; }; ModifierPush::ModifierPush(ModifierPush::ModifierPushMode pushMode) : _fieldRef(), _posDollar(0), _eachMode(false), _eachElem(), _slicePresent(false), _slice(0), _sortPresent(false), _startPosition(std::numeric_limits::max()), _sort(), _pushMode(pushMode), _val() {} ModifierPush::~ModifierPush() {} Status ModifierPush::init(const BSONElement& modExpr, const Options& opts, bool* positional) { // // field name analysis // // Break down the field name into its 'dotted' components (aka parts) and check that // the field is fit for updates. _fieldRef.parse(modExpr.fieldName()); Status status = fieldchecker::isUpdatable(_fieldRef); if (!status.isOK()) { return status; } // If a $-positional operator was used, get the index in which it occurred // and ensure only one occurrence. size_t foundCount; bool foundDollar = fieldchecker::isPositional(_fieldRef, &_posDollar, &foundCount); if (positional) *positional = foundDollar; if (foundDollar && foundCount > 1) { return Status(ErrorCodes::BadValue, str::stream() << "Too many positional (i.e. '$') elements found in path '" << _fieldRef.dottedField() << "'"); } // // value analysis // // Are the target push values safe to store? BSONElement sliceElem; BSONElement sortElem; BSONElement positionElem; switch (modExpr.type()) { case Array: if (_pushMode == PUSH_ALL) { _eachMode = true; Status status = parseEachMode( PUSH_ALL, modExpr, &_eachElem, &sliceElem, &sortElem, &positionElem); if (!status.isOK()) { return status; } } else { _val = modExpr; } break; case Object: if (_pushMode == PUSH_ALL) { return Status(ErrorCodes::BadValue, str::stream() << "$pushAll requires an array of values " "but was given an embedded document."); } // If any known clause ($each, $slice, or $sort) is present, we'd assume // we're using the $each variation of push and would parse accodingly. _eachMode = inEachMode(modExpr); if (_eachMode) { Status status = parseEachMode( PUSH_NORMAL, modExpr, &_eachElem, &sliceElem, &sortElem, &positionElem); if (!status.isOK()) { return status; } } else { _val = modExpr; } break; default: if (_pushMode == PUSH_ALL) { return Status(ErrorCodes::BadValue, str::stream() << "$pushAll requires an array of values " "but was given type: " << typeName(modExpr.type())); } _val = modExpr; break; } // Is slice present and correct? if (sliceElem.type() != EOO) { if (_pushMode == PUSH_ALL) { return Status(ErrorCodes::BadValue, "cannot use $slice in $pushAll"); } if (!sliceElem.isNumber()) { return Status(ErrorCodes::BadValue, str::stream() << "The value for $slice must " "be a numeric value but was given type: " << typeName(sliceElem.type())); } // TODO: Cleanup and unify numbers wrt getting int32/64 bson values (from doubles) // If the value of slice is not fraction, even if it's a double, we allow it. The // reason here is that the shell will use doubles by default unless told otherwise. const double doubleVal = sliceElem.numberDouble(); if (doubleVal - static_cast(doubleVal) != 0) { return Status(ErrorCodes::BadValue, "The $slice value in $push cannot be fractional"); } _slice = sliceElem.numberLong(); _slicePresent = true; } // Is position present and correct? if (positionElem.type() != EOO) { if (_pushMode == PUSH_ALL) { return Status(ErrorCodes::BadValue, "cannot use $position in $pushAll"); } // Check that $position can be represented by a 32-bit integer. switch (positionElem.type()) { case NumberInt: break; case NumberLong: if (positionElem.numberInt() != positionElem.numberLong()) { return Status( ErrorCodes::BadValue, "The $position value in $push must be representable as a 32-bit integer."); } break; case NumberDouble: { const auto doubleVal = positionElem.numberDouble(); if (doubleVal != 0.0) { if (!std::isnormal(doubleVal) || (doubleVal != positionElem.numberInt())) { return Status(ErrorCodes::BadValue, "The $position value in $push must be representable as a " "32-bit integer."); } } break; } default: return Status(ErrorCodes::BadValue, str::stream() << "The value for $position must " "be a non-negative numeric value, not of type: " << typeName(positionElem.type())); } if (positionElem.numberInt() < 0) { return { Status(ErrorCodes::BadValue, "The $position value in $push must be non-negative.")}; } _startPosition = size_t(positionElem.numberInt()); } // Is sort present and correct? if (sortElem.type() != EOO) { if (_pushMode == PUSH_ALL) { return Status(ErrorCodes::BadValue, "cannot use $sort in $pushAll"); } if (sortElem.type() != Object && !sortElem.isNumber()) { return Status(ErrorCodes::BadValue, "The $sort is invalid: use 1/-1 to sort the whole element, " "or {field:1/-1} to sort embedded fields"); } if (sortElem.isABSONObj()) { BSONObj sortObj = sortElem.embeddedObject(); if (sortObj.isEmpty()) { return Status(ErrorCodes::BadValue, "The $sort pattern is empty when it should be a set of fields."); } // Check if the sort pattern is sound. BSONObjIterator sortIter(sortObj); while (sortIter.more()) { BSONElement sortPatternElem = sortIter.next(); // We require either : 1 or -1 for asc and desc. if (!isPatternElement(sortPatternElem)) { return Status(ErrorCodes::BadValue, "The $sort element value must be either 1 or -1"); } // All fields parts must be valid. FieldRef sortField(sortPatternElem.fieldName()); if (sortField.numParts() == 0) { return Status(ErrorCodes::BadValue, "The $sort field cannot be empty"); } for (size_t i = 0; i < sortField.numParts(); i++) { if (sortField.getPart(i).size() == 0) { return Status(ErrorCodes::BadValue, str::stream() << "The $sort field is a dotted field " "but has an empty part: " << sortField.dottedField()); } } } _sort = PatternElementCmp(sortElem.embeddedObject(), opts.collator); } else { // Ensure the sortElem number is valid. if (!isPatternElement(sortElem)) { return Status(ErrorCodes::BadValue, "The $sort element value must be either 1 or -1"); } _sort = PatternElementCmp(BSON("" << sortElem.number()), opts.collator); } _sortPresent = true; } return Status::OK(); } void ModifierPush::setCollator(const CollatorInterface* collator) { invariant(!_sort.collator); if (_sortPresent) { _sort.collator = collator; } } Status ModifierPush::prepare(mutablebson::Element root, StringData matchedField, ExecInfo* execInfo) { _preparedState.reset(new PreparedState(&root.getDocument())); // If we have a $-positional field, it is time to bind it to an actual field part. if (_posDollar) { if (matchedField.empty()) { return Status(ErrorCodes::BadValue, str::stream() << "The positional operator did not find the match " "needed from the query. Unexpanded update: " << _fieldRef.dottedField()); } _fieldRef.setPart(_posDollar, matchedField); } // Locate the field name in 'root'. Note that we may not have all the parts in the path // in the doc -- which is fine. Our goal now is merely to reason about whether this mod // apply is a noOp or whether is can be in place. The remainin path, if missing, will // be created during the apply. Status status = pathsupport::findLongestPrefix( _fieldRef, root, &_preparedState->idxFound, &_preparedState->elemFound); // FindLongestPrefix may say the path does not exist at all, which is fine here, or // that the path was not viable or otherwise wrong, in which case, the mod cannot // proceed. if (status.code() == ErrorCodes::NonExistentPath) { _preparedState->elemFound = root.getDocument().end(); } else if (status.isOK()) { const bool destExists = (_preparedState->idxFound == (_fieldRef.numParts() - 1)); // If the path exists, we require the target field to be already an // array. if (destExists && _preparedState->elemFound.getType() != Array) { mb::Element idElem = mb::findFirstChildNamed(root, "_id"); return Status(ErrorCodes::BadValue, str::stream() << "The field '" << _fieldRef.dottedField() << "'" << " must be an array but is of type " << typeName(_preparedState->elemFound.getType()) << " in document {" << idElem.toString() << "}"); } } else { return status; } // We register interest in the field name. The driver needs this info to sort out if // there is any conflict among mods. execInfo->fieldRef[0] = &_fieldRef; return Status::OK(); } namespace { Status pushFirstElement(mb::Element& arrayElem, const size_t arraySize, const size_t pos, mb::Element& elem) { // Empty array or pushing to the front if (arraySize == 0 || pos == 0) { return arrayElem.pushFront(elem); } else { // Push position is at the end, or beyond if (pos >= arraySize) { return arrayElem.pushBack(elem); } const size_t appendPos = pos - 1; mutablebson::Element fromElem = getNthChild(arrayElem, appendPos); // This should not be possible since the checks above should // cover us but error just in case if (!fromElem.ok()) { return Status(ErrorCodes::InvalidLength, str::stream() << "The specified position (" << appendPos << "/" << pos << ") is invalid based on the length ( " << arraySize << ") of the array"); } return fromElem.addSiblingRight(elem); } } } // unamed namespace Status ModifierPush::apply() const { Status status = Status::OK(); // // Applying a $push with an $clause has the following steps // 1. Create the doc array we'll push into, if it is not there // 2. Add the items in the $each array (or the simple $push) to the doc array // 3. Sort the resulting array according to $sort clause, if present // 4. Trim the resulting array according the $slice clause, if present // // TODO There are _lots_ of optimization opportunities that we'll consider once the // test coverage is adequate. // // 1. If the array field is not there, create it as an array and attach it to the // document. if (!_preparedState->elemFound.ok() || _preparedState->idxFound < (_fieldRef.numParts() - 1)) { // Creates the array element mutablebson::Document& doc = _preparedState->doc; StringData lastPart = _fieldRef.getPart(_fieldRef.numParts() - 1); mutablebson::Element baseArray = doc.makeElementArray(lastPart); if (!baseArray.ok()) { return Status(ErrorCodes::InternalError, "can't create new base array"); } // Now, we can be in two cases here, as far as attaching the element being set // goes: (a) none of the parts in the element's path exist, or (b) some parts of // the path exist but not all. if (!_preparedState->elemFound.ok()) { _preparedState->elemFound = doc.root(); _preparedState->idxFound = 0; } else { _preparedState->idxFound++; } // createPathAt() will complete the path and attach 'elemToSet' at the end of it. status = pathsupport::createPathAt( _fieldRef, _preparedState->idxFound, _preparedState->elemFound, baseArray); if (!status.isOK()) { return status; } // Point to the base array just created. The subsequent code expects it to exist // already. _preparedState->elemFound = baseArray; } // This is the count of the array before we change it, or 0 if missing from the doc. _preparedState->arrayPreModSize = countChildren(_preparedState->elemFound); // 2. Add new elements to the array either by going over the $each array or by // appending the (old style $push) element. if (_eachMode || _pushMode == PUSH_ALL) { BSONObjIterator itEach(_eachElem.embeddedObject()); // When adding more than one element we keep track of the previous one // so we can add right siblings to it. mutablebson::Element prevElem = _preparedState->doc.end(); // The first element is special below bool first = true; while (itEach.more()) { BSONElement eachItem = itEach.next(); mutablebson::Element elem = _preparedState->doc.makeElementWithNewFieldName(StringData(), eachItem); if (first) { status = pushFirstElement(_preparedState->elemFound, _preparedState->arrayPreModSize, _startPosition, elem); } else { status = prevElem.addSiblingRight(elem); } if (!status.isOK()) { return status; } // For the next iteration the previous element will be the left sibling prevElem = elem; first = false; } } else { mutablebson::Element elem = _preparedState->doc.makeElementWithNewFieldName(StringData(), _val); if (!elem.ok()) { return Status(ErrorCodes::InternalError, "can't wrap element being $push-ed"); } return pushFirstElement( _preparedState->elemFound, _preparedState->arrayPreModSize, _startPosition, elem); } // 3. Sort the resulting array, if $sort was requested. if (_sortPresent) { sortChildren(_preparedState->elemFound, _sort); } // 4. Trim the resulting array according to $slice, if present. if (_slicePresent) { // Slice 0 means to remove all if (_slice == 0) { while (_preparedState->elemFound.ok() && _preparedState->elemFound.rightChild().ok()) { _preparedState->elemFound.rightChild().remove(); } } const int64_t numChildren = mutablebson::countChildren(_preparedState->elemFound); int64_t countRemoved = std::max(static_cast(0), numChildren - abs(_slice)); // If _slice is negative, remove from the bottom, otherwise from the top const bool removeFromEnd = (_slice > 0); // Either start at right or left depending if we are taking from top or bottom mutablebson::Element curr = removeFromEnd ? _preparedState->elemFound.rightChild() : _preparedState->elemFound.leftChild(); while (curr.ok() && countRemoved > 0) { mutablebson::Element toRemove = curr; // Either go right or left depending if we are taking from top or bottom curr = removeFromEnd ? curr.leftSibling() : curr.rightSibling(); status = toRemove.remove(); if (!status.isOK()) { return status; } countRemoved--; } } return status; } Status ModifierPush::log(LogBuilder* logBuilder) const { // The start position to use for positional (ordinal) updates to the array // (We will increment as we append elements to the oplog entry so can't be const) size_t position = _preparedState->arrayPreModSize; // NOTE: Idempotence Requirement // In the case that the document does't have an array or it is empty we need to make sure // that the first time the field gets filled with items that it is a full set of the array. // If we sorted, sliced, or added the first items to the array, make a full array copy. const bool doFullCopy = _slicePresent || _sortPresent || (position == 0) // first element in new/empty array || (_startPosition < _preparedState->arrayPreModSize); // add in middle if (doFullCopy) { return logBuilder->addToSetsWithNewFieldName(_fieldRef.dottedField(), _preparedState->elemFound); } else { // Set only the positional elements appended if (_eachMode || _pushMode == PUSH_ALL) { // For each input element log it as a posisional $set BSONObjIterator itEach(_eachElem.embeddedObject()); while (itEach.more()) { BSONElement eachItem = itEach.next(); // value for the logElement ("field.path.name.N": ) const std::string positionalName = mongoutils::str::stream() << _fieldRef.dottedField() << "." << position++; Status s = logBuilder->addToSetsWithNewFieldName(positionalName, eachItem); if (!s.isOK()) return s; } return Status::OK(); } else { // single value for the logElement ("field.path.name.N": ) const std::string positionalName = mongoutils::str::stream() << _fieldRef.dottedField() << "." << position++; return logBuilder->addToSetsWithNewFieldName(positionalName, _val); } } } } // namespace mongo