From 967dbe1b27ac6459c15d02c30dd6a0790203d54e Mon Sep 17 00:00:00 2001 From: Scott Hernandez Date: Fri, 11 Oct 2013 10:56:42 -0400 Subject: SERVER-2363 $push support for insertion at any position --- src/mongo/db/ops/modifier_push.cpp | 174 +++++++++++++++++++++++++++----- src/mongo/db/ops/modifier_push.h | 2 + src/mongo/db/ops/modifier_push_test.cpp | 171 +++++++++++++++++++++++++++++++ 3 files changed, 324 insertions(+), 23 deletions(-) (limited to 'src') diff --git a/src/mongo/db/ops/modifier_push.cpp b/src/mongo/db/ops/modifier_push.cpp index 2b207257f56..c7e099c98f1 100644 --- a/src/mongo/db/ops/modifier_push.cpp +++ b/src/mongo/db/ops/modifier_push.cpp @@ -29,6 +29,7 @@ #include "mongo/db/ops/modifier_push.h" #include +#include #include "mongo/base/error_codes.h" #include "mongo/bson/mutable/algorithm.h" @@ -46,6 +47,11 @@ namespace mongo { namespace { + const char kEach[] = "$each"; + const char kSlice[] = "$slice"; + const char kSort[] = "$sort"; + const char kPosition[] = "$position"; + bool isPatternElement(const BSONElement& pattern) { if (!pattern.isNumber()) { return false; @@ -65,7 +71,7 @@ namespace mongo { return false; } BSONObj obj = modExpr.embeddedObject(); - if (obj["$each"].type() == EOO) { + if (obj[kEach].type() == EOO) { return false; } return true; @@ -75,7 +81,8 @@ namespace mongo { const BSONElement& modExpr, BSONElement* eachElem, BSONElement* sliceElem, - BSONElement* sortElem) { + BSONElement* sortElem, + BSONElement* positionElem) { Status status = Status::OK(); @@ -89,7 +96,7 @@ namespace mongo { } // The $each clause must be an array. - *eachElem = modExpr.embeddedObject()["$each"]; + *eachElem = modExpr.embeddedObject()[kEach]; if (eachElem->type() != Array) { return Status(ErrorCodes::BadValue, "$each must be an array"); } @@ -99,7 +106,7 @@ namespace mongo { BSONObjIterator itMod(modExpr.embeddedObject()); while (itMod.more()) { BSONElement modElem = itMod.next(); - if (mongoutils::str::equals(modElem.fieldName(), "$each")) { + if (mongoutils::str::equals(modElem.fieldName(), kEach)) { if (seenEach) { return Status(ErrorCodes::BadValue, "duplicated $each clause"); } @@ -119,28 +126,38 @@ namespace mongo { } } - // Slice and sort are optional and may be present in any order. + // Slice, sort, position are optional and may be present in any order. bool seenSlice = false; bool seenSort = false; + bool seenPosition = false; BSONObjIterator itPush(modExpr.embeddedObject()); while (itPush.more()) { BSONElement elem = itPush.next(); - if (mongoutils::str::equals(elem.fieldName(), "$slice")) { + if (mongoutils::str::equals(elem.fieldName(), kSlice)) { if (seenSlice) { return Status(ErrorCodes::BadValue, "duplicate $slice clause"); } *sliceElem = elem; seenSlice = true; } - else if (mongoutils::str::equals(elem.fieldName(), "$sort")) { + else if (mongoutils::str::equals(elem.fieldName(), kSort)) { if (seenSort) { return Status(ErrorCodes::BadValue, "duplicate $sort clause"); } *sortElem = elem; seenSort = true; } - else if (!mongoutils::str::equals(elem.fieldName(), "$each")) { - return Status(ErrorCodes::BadValue, "unrecognized clause is $push"); + else if (mongoutils::str::equals(elem.fieldName(), kPosition)) { + if (seenPosition) { + return Status(ErrorCodes::BadValue, "duplicate $position clause"); + } + *positionElem = elem; + seenPosition = true; + } + else if (!mongoutils::str::equals(elem.fieldName(), kEach)) { + return Status(ErrorCodes::BadValue, + str::stream() << "Unrecognized clause in $push: " + << elem.fieldNameStringData()); } } @@ -156,7 +173,7 @@ namespace mongo { , idxFound(0) , elemFound(doc.end()) , boundDollar("") - , arraySize(0) { + , arrayPreModSize(0) { } // Document that is going to be changed. @@ -171,7 +188,7 @@ namespace mongo { // Value to bind to a $-positional field, if one is provided. std::string boundDollar; - size_t arraySize; + size_t arrayPreModSize; }; @@ -183,6 +200,7 @@ namespace mongo { , _slicePresent(false) , _slice(0) , _sortPresent(false) + , _startPosition(std::numeric_limits::max()) , _sort() , _pushMode(pushMode) , _val() { @@ -222,6 +240,7 @@ namespace mongo { // Are the target push values safe to store? BSONElement sliceElem; BSONElement sortElem; + BSONElement positionElem; switch (modExpr.type()) { case Array: @@ -237,7 +256,8 @@ namespace mongo { modExpr, &_eachElem, &sliceElem, - &sortElem); + &sortElem, + &positionElem); if (!status.isOK()) { return status; } @@ -262,7 +282,8 @@ namespace mongo { modExpr, &_eachElem, &sliceElem, - &sortElem); + &sortElem, + &positionElem); if (!status.isOK()) { return status; } @@ -301,10 +322,12 @@ namespace mongo { << typeName(sliceElem.type())); } + // TODO: Cleanup and unify numbers wrt getting int32/64 bson values (from doubles) + // If the value of slice is not fraction, even if it's a double, we allow it. The // reason here is that the shell will use doubles by default unless told otherwise. - double fractional = sliceElem.numberDouble(); - if (fractional - static_cast(fractional) != 0) { + const double doubleVal = sliceElem.numberDouble(); + if (doubleVal - static_cast(doubleVal) != 0) { return Status(ErrorCodes::BadValue, "The $slice value in $push cannot be fractional"); } @@ -313,6 +336,47 @@ namespace mongo { _slicePresent = true; } + // Is position present and correct? + if (positionElem.type() != EOO) { + if (_pushMode == PUSH_ALL) { + return Status(ErrorCodes::BadValue, "cannot use $position in $pushAll"); + } + + if (!positionElem.isNumber()) { + return Status(ErrorCodes::BadValue, + str::stream() << "The value for $position must " + "a be a positive numeric value not " + << typeName(positionElem.type())); + } + + // TODO: Cleanup and unify numbers wrt getting int32/64 bson values (from doubles) + + // If the value of position is not fraction, even if it's a double, we allow it. The + // reason here is that the shell will use doubles by default unless told otherwise. + const double doubleVal = positionElem.numberDouble(); + if (doubleVal - static_cast(doubleVal) != 0) { + return Status(ErrorCodes::BadValue, + "The $position value in $push cannot be fractional"); + } + + if (static_cast(numeric_limits::max()) < doubleVal) { + return Status(ErrorCodes::BadValue, + "The $position value in $push is too large a number."); + } + + if (static_cast(numeric_limits::min()) > doubleVal) { + return Status(ErrorCodes::BadValue, + "The $position value in $push is too small a number."); + } + + const int64_t tempVal = positionElem.numberLong(); + if (tempVal < 0) + return Status(ErrorCodes::BadValue, + "The $position value in $push must be positive."); + + _startPosition = size_t(tempVal); + } + // Is sort present and correct? if (sortElem.type() != EOO) { if (_pushMode == PUSH_ALL) { @@ -440,6 +504,43 @@ namespace mongo { return Status::OK(); } + namespace { + Status pushFirstElement(mb::Element& arrayElem, + const size_t arraySize, + const size_t pos, + mb::Element& elem) { + + + // Empty array or pushing to the front + if (arraySize == 0 || pos == 0) { + return arrayElem.pushFront(elem); + } + else { + + // Push position is at the end, or beyond + if (pos >= arraySize) { + return arrayElem.pushBack(elem); + } + + const size_t appendPos = pos - 1; + mutablebson::Element fromElem = getNthChild(arrayElem, appendPos); + + // This should not be possible since the checks above should + // cover us but error just in case + if (!fromElem.ok()){ + return Status(ErrorCodes::InvalidLength, + str::stream() << "The specified position (" << appendPos << "/" + << pos + << ") is invalid based on the length ( " + << arraySize + << ") of the array"); + } + + return fromElem.addSiblingRight(elem); + } + } + } //unamed namespace + Status ModifierPush::apply() const { Status status = Status::OK(); @@ -495,19 +596,41 @@ namespace mongo { } // This is the count of the array before we change it, or 0 if missing from the doc. - _preparedState->arraySize = countChildren(_preparedState->elemFound); + _preparedState->arrayPreModSize = countChildren(_preparedState->elemFound); - // 2. Concatenate the two arrays together, either by going over the $each array or by - // appending the (old style $push) element. Note that if we're the latter case, we - // won't need to proceed to the $sort and $slice phases of the apply. + // 2. Add new elements to the array either by going over the $each array or by + // appending the (old style $push) element. if (_eachMode || _pushMode == PUSH_ALL) { BSONObjIterator itEach(_eachElem.embeddedObject()); + + // When adding more than one element we keep track of the previous one + // so we can add right siblings to it. + mutablebson::Element prevElem = _preparedState->doc.end(); + + // The first element is special below + bool first = true; + while (itEach.more()) { BSONElement eachItem = itEach.next(); - status = _preparedState->elemFound.appendElement(eachItem); + mutablebson::Element elem = + _preparedState->doc.makeElementWithNewFieldName(StringData(), eachItem); + + if (first) { + status = pushFirstElement(_preparedState->elemFound, + _preparedState->arrayPreModSize, + _startPosition, + elem); } + else { + status = prevElem.addSiblingRight(elem); + } + if (!status.isOK()) { return status; } + + // For the next iteration the previous element will be the left sibling + prevElem = elem; + first = false; } } else { @@ -516,7 +639,10 @@ namespace mongo { if (!elem.ok()) { return Status(ErrorCodes::InternalError, "can't wrap element being $push-ed"); } - return _preparedState->elemFound.pushBack(elem); + return pushFirstElement(_preparedState->elemFound, + _preparedState->arrayPreModSize, + _startPosition, + elem); } // 3. Sort the resulting array, if $sort was requested. @@ -565,14 +691,16 @@ namespace mongo { // The start position to use for positional (ordinal) updates to the array // (We will increment as we append elements to the oplog entry so can't be const) - size_t position = _preparedState->arraySize; + size_t position = _preparedState->arrayPreModSize; // NOTE: Idempotence Requirement // In the case that the document does't have an array or it is empty we need to make sure // that the first time the field gets filled with items that it is a full set of the array. // If we sorted, sliced, or added the first items to the array, make a full array copy. - const bool doFullCopy = _slicePresent || _sortPresent || (position == 0); + const bool doFullCopy = _slicePresent || _sortPresent + || (position == 0) // first element in new/empty array + || (_startPosition < _preparedState->arrayPreModSize); // add in middle if (doFullCopy) { return logBuilder->addToSetsWithNewFieldName(_fieldRef.dottedField(), diff --git a/src/mongo/db/ops/modifier_push.h b/src/mongo/db/ops/modifier_push.h index a9b8f3f0f8d..85b4698e876 100644 --- a/src/mongo/db/ops/modifier_push.h +++ b/src/mongo/db/ops/modifier_push.h @@ -113,6 +113,8 @@ namespace mongo { bool _slicePresent; int64_t _slice; bool _sortPresent; + size_t _startPosition; + PatternElementCmp _sort; // Whether this mod is supposed to be parsed as a $pushAll. diff --git a/src/mongo/db/ops/modifier_push_test.cpp b/src/mongo/db/ops/modifier_push_test.cpp index 755c9353cce..69d27b8397f 100644 --- a/src/mongo/db/ops/modifier_push_test.cpp +++ b/src/mongo/db/ops/modifier_push_test.cpp @@ -1305,4 +1305,175 @@ namespace { } } + // Push to position tests + + TEST(ToPosition, BadInputs) { + const char* const bad[] = { + "{$push: {a: { $each: [1], $position:-1}}}", + "{$push: {a: { $each: [1], $position:'s'}}}", + "{$push: {a: { $each: [1], $position:{}}}}", + "{$push: {a: { $each: [1], $position:[0]}}}", + "{$push: {a: { $each: [1], $position:1.1211212}}}", + "{$push: {a: { $each: [1], $position:3.000000000001}}}", + "{$push: {a: { $each: [1], $position:1.2}}}", + "{$push: {a: { $each: [1], $position:-1.2}}}", + "{$push: {a: { $each: [1], $position:9223372036854775810}}}", + "{$push: {a: { $each: [1], $position:-9223372036854775810}}}", + NULL, + }; + + int i = 0; + while(bad[i] != NULL) + { + ModifierPush pushMod(ModifierPush::PUSH_NORMAL); + BSONObj modObj = fromjson(bad[i]); + ASSERT_NOT_OK(pushMod.init(modObj.firstElement().embeddedObject().firstElement(), + ModifierInterface::Options::normal())); + i++; + } + } + + TEST(ToPosition, GoodInputs) { + { + Document doc(fromjson("{a: []}")); + Mod pushMod(fromjson("{$push: {a: { $each: [1], $position: NumberLong(1)}}}")); + + ModifierInterface::ExecInfo execInfo; + ASSERT_OK(pushMod.prepare(doc.root(), "", &execInfo)); + } + { + Document doc(fromjson("{a: []}")); + Mod pushMod(fromjson("{$push: {a: { $each: [1], $position:NumberInt(100)}}}")); + + ModifierInterface::ExecInfo execInfo; + ASSERT_OK(pushMod.prepare(doc.root(), "", &execInfo)); + } + { + Document doc(fromjson("{a: []}")); + Mod pushMod(fromjson("{$push: {a: { $each: [1], $position:1.0}}}")); + + ModifierInterface::ExecInfo execInfo; + ASSERT_OK(pushMod.prepare(doc.root(), "", &execInfo)); + } + { + Document doc(fromjson("{a: []}")); + Mod pushMod(fromjson("{$push: {a: { $each: [1], $position:1000000}}}")); + + ModifierInterface::ExecInfo execInfo; + ASSERT_OK(pushMod.prepare(doc.root(), "", &execInfo)); + } + { + Document doc(fromjson("{a: []}")); + Mod pushMod(fromjson("{$push: {a: { $each: [1], $position:0}}}")); + + ModifierInterface::ExecInfo execInfo; + ASSERT_OK(pushMod.prepare(doc.root(), "", &execInfo)); + } + } + + TEST(ToPosition, EmptyArrayFront) { + Document doc(fromjson("{a: []}")); + Mod pushMod(fromjson("{$push: {a: { $each: [1], $position:0}}}")); + + ModifierInterface::ExecInfo execInfo; + ASSERT_OK(pushMod.prepare(doc.root(), "", &execInfo)); + + ASSERT_EQUALS(execInfo.fieldRef[0]->dottedField(), "a"); + ASSERT_FALSE(execInfo.noOp); + + ASSERT_OK(pushMod.apply()); + ASSERT_FALSE(doc.isInPlaceModeEnabled()); + ASSERT_EQUALS(fromjson("{a: [1]}"), doc); + + Document logDoc; + LogBuilder logBuilder(logDoc.root()); + ASSERT_OK(pushMod.log(&logBuilder)); + ASSERT_EQUALS(countChildren(logDoc.root()), 1u); + ASSERT_EQUALS(fromjson("{$set: {'a':[1]}}"), logDoc); + } + + TEST(ToPosition, EmptyArrayBackBigPosition) { + Document doc(fromjson("{a: []}")); + Mod pushMod(fromjson("{$push: {a: { $each: [1], $position:1000}}}")); + + ModifierInterface::ExecInfo execInfo; + ASSERT_OK(pushMod.prepare(doc.root(), "", &execInfo)); + + ASSERT_EQUALS(execInfo.fieldRef[0]->dottedField(), "a"); + ASSERT_FALSE(execInfo.noOp); + + ASSERT_OK(pushMod.apply()); + ASSERT_FALSE(doc.isInPlaceModeEnabled()); + ASSERT_EQUALS(fromjson("{a: [1]}"), doc); + + Document logDoc; + LogBuilder logBuilder(logDoc.root()); + ASSERT_OK(pushMod.log(&logBuilder)); + ASSERT_EQUALS(countChildren(logDoc.root()), 1u); + ASSERT_EQUALS(fromjson("{$set: {'a':[1]}}"), logDoc); + } + + TEST(ToPosition, EmptyArrayBack) { + Document doc(fromjson("{a: []}")); + Mod pushMod(fromjson("{$push: {a: { $each: [1], $position:1}}}")); + + ModifierInterface::ExecInfo execInfo; + ASSERT_OK(pushMod.prepare(doc.root(), "", &execInfo)); + + ASSERT_EQUALS(execInfo.fieldRef[0]->dottedField(), "a"); + ASSERT_FALSE(execInfo.noOp); + + ASSERT_OK(pushMod.apply()); + ASSERT_FALSE(doc.isInPlaceModeEnabled()); + ASSERT_EQUALS(fromjson("{a: [1]}"), doc); + + Document logDoc; + LogBuilder logBuilder(logDoc.root()); + ASSERT_OK(pushMod.log(&logBuilder)); + ASSERT_EQUALS(countChildren(logDoc.root()), 1u); + ASSERT_EQUALS(fromjson("{$set: {'a':[1]}}"), logDoc); + } + + TEST(ToPosition, Front) { + Document doc(fromjson("{a: [0]}")); + Mod pushMod(fromjson("{$push: {a: { $each: [1], $position:0}}}")); + + ModifierInterface::ExecInfo execInfo; + ASSERT_OK(pushMod.prepare(doc.root(), "", &execInfo)); + + ASSERT_EQUALS(execInfo.fieldRef[0]->dottedField(), "a"); + ASSERT_FALSE(execInfo.noOp); + + ASSERT_OK(pushMod.apply()); + ASSERT_FALSE(doc.isInPlaceModeEnabled()); + ASSERT_EQUALS(fromjson("{a: [1, 0]}"), doc); + + Document logDoc; + LogBuilder logBuilder(logDoc.root()); + ASSERT_OK(pushMod.log(&logBuilder)); + ASSERT_EQUALS(countChildren(logDoc.root()), 1u); + ASSERT_EQUALS(fromjson("{$set: {'a':[1, 0]}}"), logDoc); + } + + TEST(ToPosition, Back) { + Document doc(fromjson("{a: [0]}")); + Mod pushMod(fromjson("{$push: {a: { $each: [1], $position:100}}}")); + + ModifierInterface::ExecInfo execInfo; + ASSERT_OK(pushMod.prepare(doc.root(), "", &execInfo)); + + ASSERT_EQUALS(execInfo.fieldRef[0]->dottedField(), "a"); + ASSERT_FALSE(execInfo.noOp); + + ASSERT_OK(pushMod.apply()); + ASSERT_FALSE(doc.isInPlaceModeEnabled()); + ASSERT_EQUALS(fromjson("{a: [0,1]}"), doc); + + Document logDoc; + LogBuilder logBuilder(logDoc.root()); + ASSERT_OK(pushMod.log(&logBuilder)); + ASSERT_EQUALS(countChildren(logDoc.root()), 1u); + ASSERT_EQUALS(fromjson("{$set: {'a.1':1}}"), logDoc); + } + } // unnamed namespace -- cgit v1.2.1