summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorScott Hernandez <scotthernandez@gmail.com>2013-10-11 10:56:42 -0400
committerScott Hernandez <scotthernandez@gmail.com>2013-10-11 16:55:35 -0400
commit967dbe1b27ac6459c15d02c30dd6a0790203d54e (patch)
tree1661744e95293c821a12b4ae9afbbe0f75586605 /src
parent8952535c3dc337957f68e11b9b48c8709771a0e7 (diff)
downloadmongo-967dbe1b27ac6459c15d02c30dd6a0790203d54e.tar.gz
SERVER-2363 $push support for insertion at any position
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/ops/modifier_push.cpp174
-rw-r--r--src/mongo/db/ops/modifier_push.h2
-rw-r--r--src/mongo/db/ops/modifier_push_test.cpp171
3 files changed, 324 insertions, 23 deletions
diff --git a/src/mongo/db/ops/modifier_push.cpp b/src/mongo/db/ops/modifier_push.cpp
index 2b207257f56..c7e099c98f1 100644
--- a/src/mongo/db/ops/modifier_push.cpp
+++ b/src/mongo/db/ops/modifier_push.cpp
@@ -29,6 +29,7 @@
#include "mongo/db/ops/modifier_push.h"
#include <algorithm>
+#include <limits>
#include "mongo/base/error_codes.h"
#include "mongo/bson/mutable/algorithm.h"
@@ -46,6 +47,11 @@ namespace mongo {
namespace {
+ const char kEach[] = "$each";
+ const char kSlice[] = "$slice";
+ const char kSort[] = "$sort";
+ const char kPosition[] = "$position";
+
bool isPatternElement(const BSONElement& pattern) {
if (!pattern.isNumber()) {
return false;
@@ -65,7 +71,7 @@ namespace mongo {
return false;
}
BSONObj obj = modExpr.embeddedObject();
- if (obj["$each"].type() == EOO) {
+ if (obj[kEach].type() == EOO) {
return false;
}
return true;
@@ -75,7 +81,8 @@ namespace mongo {
const BSONElement& modExpr,
BSONElement* eachElem,
BSONElement* sliceElem,
- BSONElement* sortElem) {
+ BSONElement* sortElem,
+ BSONElement* positionElem) {
Status status = Status::OK();
@@ -89,7 +96,7 @@ namespace mongo {
}
// The $each clause must be an array.
- *eachElem = modExpr.embeddedObject()["$each"];
+ *eachElem = modExpr.embeddedObject()[kEach];
if (eachElem->type() != Array) {
return Status(ErrorCodes::BadValue, "$each must be an array");
}
@@ -99,7 +106,7 @@ namespace mongo {
BSONObjIterator itMod(modExpr.embeddedObject());
while (itMod.more()) {
BSONElement modElem = itMod.next();
- if (mongoutils::str::equals(modElem.fieldName(), "$each")) {
+ if (mongoutils::str::equals(modElem.fieldName(), kEach)) {
if (seenEach) {
return Status(ErrorCodes::BadValue, "duplicated $each clause");
}
@@ -119,28 +126,38 @@ namespace mongo {
}
}
- // Slice and sort are optional and may be present in any order.
+ // Slice, sort, position are optional and may be present in any order.
bool seenSlice = false;
bool seenSort = false;
+ bool seenPosition = false;
BSONObjIterator itPush(modExpr.embeddedObject());
while (itPush.more()) {
BSONElement elem = itPush.next();
- if (mongoutils::str::equals(elem.fieldName(), "$slice")) {
+ if (mongoutils::str::equals(elem.fieldName(), kSlice)) {
if (seenSlice) {
return Status(ErrorCodes::BadValue, "duplicate $slice clause");
}
*sliceElem = elem;
seenSlice = true;
}
- else if (mongoutils::str::equals(elem.fieldName(), "$sort")) {
+ else if (mongoutils::str::equals(elem.fieldName(), kSort)) {
if (seenSort) {
return Status(ErrorCodes::BadValue, "duplicate $sort clause");
}
*sortElem = elem;
seenSort = true;
}
- else if (!mongoutils::str::equals(elem.fieldName(), "$each")) {
- return Status(ErrorCodes::BadValue, "unrecognized clause is $push");
+ else if (mongoutils::str::equals(elem.fieldName(), kPosition)) {
+ if (seenPosition) {
+ return Status(ErrorCodes::BadValue, "duplicate $position clause");
+ }
+ *positionElem = elem;
+ seenPosition = true;
+ }
+ else if (!mongoutils::str::equals(elem.fieldName(), kEach)) {
+ return Status(ErrorCodes::BadValue,
+ str::stream() << "Unrecognized clause in $push: "
+ << elem.fieldNameStringData());
}
}
@@ -156,7 +173,7 @@ namespace mongo {
, idxFound(0)
, elemFound(doc.end())
, boundDollar("")
- , arraySize(0) {
+ , arrayPreModSize(0) {
}
// Document that is going to be changed.
@@ -171,7 +188,7 @@ namespace mongo {
// Value to bind to a $-positional field, if one is provided.
std::string boundDollar;
- size_t arraySize;
+ size_t arrayPreModSize;
};
@@ -183,6 +200,7 @@ namespace mongo {
, _slicePresent(false)
, _slice(0)
, _sortPresent(false)
+ , _startPosition(std::numeric_limits<std::size_t>::max())
, _sort()
, _pushMode(pushMode)
, _val() {
@@ -222,6 +240,7 @@ namespace mongo {
// Are the target push values safe to store?
BSONElement sliceElem;
BSONElement sortElem;
+ BSONElement positionElem;
switch (modExpr.type()) {
case Array:
@@ -237,7 +256,8 @@ namespace mongo {
modExpr,
&_eachElem,
&sliceElem,
- &sortElem);
+ &sortElem,
+ &positionElem);
if (!status.isOK()) {
return status;
}
@@ -262,7 +282,8 @@ namespace mongo {
modExpr,
&_eachElem,
&sliceElem,
- &sortElem);
+ &sortElem,
+ &positionElem);
if (!status.isOK()) {
return status;
}
@@ -301,10 +322,12 @@ namespace mongo {
<< typeName(sliceElem.type()));
}
+ // TODO: Cleanup and unify numbers wrt getting int32/64 bson values (from doubles)
+
// If the value of slice is not fraction, even if it's a double, we allow it. The
// reason here is that the shell will use doubles by default unless told otherwise.
- double fractional = sliceElem.numberDouble();
- if (fractional - static_cast<int64_t>(fractional) != 0) {
+ const double doubleVal = sliceElem.numberDouble();
+ if (doubleVal - static_cast<int64_t>(doubleVal) != 0) {
return Status(ErrorCodes::BadValue,
"The $slice value in $push cannot be fractional");
}
@@ -313,6 +336,47 @@ namespace mongo {
_slicePresent = true;
}
+ // Is position present and correct?
+ if (positionElem.type() != EOO) {
+ if (_pushMode == PUSH_ALL) {
+ return Status(ErrorCodes::BadValue, "cannot use $position in $pushAll");
+ }
+
+ if (!positionElem.isNumber()) {
+ return Status(ErrorCodes::BadValue,
+ str::stream() << "The value for $position must "
+ "a be a positive numeric value not "
+ << typeName(positionElem.type()));
+ }
+
+ // TODO: Cleanup and unify numbers wrt getting int32/64 bson values (from doubles)
+
+ // If the value of position is not fraction, even if it's a double, we allow it. The
+ // reason here is that the shell will use doubles by default unless told otherwise.
+ const double doubleVal = positionElem.numberDouble();
+ if (doubleVal - static_cast<int64_t>(doubleVal) != 0) {
+ return Status(ErrorCodes::BadValue,
+ "The $position value in $push cannot be fractional");
+ }
+
+ if (static_cast<double>(numeric_limits<int64_t>::max()) < doubleVal) {
+ return Status(ErrorCodes::BadValue,
+ "The $position value in $push is too large a number.");
+ }
+
+ if (static_cast<double>(numeric_limits<int64_t>::min()) > doubleVal) {
+ return Status(ErrorCodes::BadValue,
+ "The $position value in $push is too small a number.");
+ }
+
+ const int64_t tempVal = positionElem.numberLong();
+ if (tempVal < 0)
+ return Status(ErrorCodes::BadValue,
+ "The $position value in $push must be positive.");
+
+ _startPosition = size_t(tempVal);
+ }
+
// Is sort present and correct?
if (sortElem.type() != EOO) {
if (_pushMode == PUSH_ALL) {
@@ -440,6 +504,43 @@ namespace mongo {
return Status::OK();
}
+ namespace {
+ Status pushFirstElement(mb::Element& arrayElem,
+ const size_t arraySize,
+ const size_t pos,
+ mb::Element& elem) {
+
+
+ // Empty array or pushing to the front
+ if (arraySize == 0 || pos == 0) {
+ return arrayElem.pushFront(elem);
+ }
+ else {
+
+ // Push position is at the end, or beyond
+ if (pos >= arraySize) {
+ return arrayElem.pushBack(elem);
+ }
+
+ const size_t appendPos = pos - 1;
+ mutablebson::Element fromElem = getNthChild(arrayElem, appendPos);
+
+ // This should not be possible since the checks above should
+ // cover us but error just in case
+ if (!fromElem.ok()){
+ return Status(ErrorCodes::InvalidLength,
+ str::stream() << "The specified position (" << appendPos << "/"
+ << pos
+ << ") is invalid based on the length ( "
+ << arraySize
+ << ") of the array");
+ }
+
+ return fromElem.addSiblingRight(elem);
+ }
+ }
+ } //unamed namespace
+
Status ModifierPush::apply() const {
Status status = Status::OK();
@@ -495,19 +596,41 @@ namespace mongo {
}
// This is the count of the array before we change it, or 0 if missing from the doc.
- _preparedState->arraySize = countChildren(_preparedState->elemFound);
+ _preparedState->arrayPreModSize = countChildren(_preparedState->elemFound);
- // 2. Concatenate the two arrays together, either by going over the $each array or by
- // appending the (old style $push) element. Note that if we're the latter case, we
- // won't need to proceed to the $sort and $slice phases of the apply.
+ // 2. Add new elements to the array either by going over the $each array or by
+ // appending the (old style $push) element.
if (_eachMode || _pushMode == PUSH_ALL) {
BSONObjIterator itEach(_eachElem.embeddedObject());
+
+ // When adding more than one element we keep track of the previous one
+ // so we can add right siblings to it.
+ mutablebson::Element prevElem = _preparedState->doc.end();
+
+ // The first element is special below
+ bool first = true;
+
while (itEach.more()) {
BSONElement eachItem = itEach.next();
- status = _preparedState->elemFound.appendElement(eachItem);
+ mutablebson::Element elem =
+ _preparedState->doc.makeElementWithNewFieldName(StringData(), eachItem);
+
+ if (first) {
+ status = pushFirstElement(_preparedState->elemFound,
+ _preparedState->arrayPreModSize,
+ _startPosition,
+ elem); }
+ else {
+ status = prevElem.addSiblingRight(elem);
+ }
+
if (!status.isOK()) {
return status;
}
+
+ // For the next iteration the previous element will be the left sibling
+ prevElem = elem;
+ first = false;
}
}
else {
@@ -516,7 +639,10 @@ namespace mongo {
if (!elem.ok()) {
return Status(ErrorCodes::InternalError, "can't wrap element being $push-ed");
}
- return _preparedState->elemFound.pushBack(elem);
+ return pushFirstElement(_preparedState->elemFound,
+ _preparedState->arrayPreModSize,
+ _startPosition,
+ elem);
}
// 3. Sort the resulting array, if $sort was requested.
@@ -565,14 +691,16 @@ namespace mongo {
// The start position to use for positional (ordinal) updates to the array
// (We will increment as we append elements to the oplog entry so can't be const)
- size_t position = _preparedState->arraySize;
+ size_t position = _preparedState->arrayPreModSize;
// NOTE: Idempotence Requirement
// In the case that the document does't have an array or it is empty we need to make sure
// that the first time the field gets filled with items that it is a full set of the array.
// If we sorted, sliced, or added the first items to the array, make a full array copy.
- const bool doFullCopy = _slicePresent || _sortPresent || (position == 0);
+ const bool doFullCopy = _slicePresent || _sortPresent
+ || (position == 0) // first element in new/empty array
+ || (_startPosition < _preparedState->arrayPreModSize); // add in middle
if (doFullCopy) {
return logBuilder->addToSetsWithNewFieldName(_fieldRef.dottedField(),
diff --git a/src/mongo/db/ops/modifier_push.h b/src/mongo/db/ops/modifier_push.h
index a9b8f3f0f8d..85b4698e876 100644
--- a/src/mongo/db/ops/modifier_push.h
+++ b/src/mongo/db/ops/modifier_push.h
@@ -113,6 +113,8 @@ namespace mongo {
bool _slicePresent;
int64_t _slice;
bool _sortPresent;
+ size_t _startPosition;
+
PatternElementCmp _sort;
// Whether this mod is supposed to be parsed as a $pushAll.
diff --git a/src/mongo/db/ops/modifier_push_test.cpp b/src/mongo/db/ops/modifier_push_test.cpp
index 755c9353cce..69d27b8397f 100644
--- a/src/mongo/db/ops/modifier_push_test.cpp
+++ b/src/mongo/db/ops/modifier_push_test.cpp
@@ -1305,4 +1305,175 @@ namespace {
}
}
+ // Push to position tests
+
+ TEST(ToPosition, BadInputs) {
+ const char* const bad[] = {
+ "{$push: {a: { $each: [1], $position:-1}}}",
+ "{$push: {a: { $each: [1], $position:'s'}}}",
+ "{$push: {a: { $each: [1], $position:{}}}}",
+ "{$push: {a: { $each: [1], $position:[0]}}}",
+ "{$push: {a: { $each: [1], $position:1.1211212}}}",
+ "{$push: {a: { $each: [1], $position:3.000000000001}}}",
+ "{$push: {a: { $each: [1], $position:1.2}}}",
+ "{$push: {a: { $each: [1], $position:-1.2}}}",
+ "{$push: {a: { $each: [1], $position:9223372036854775810}}}",
+ "{$push: {a: { $each: [1], $position:-9223372036854775810}}}",
+ NULL,
+ };
+
+ int i = 0;
+ while(bad[i] != NULL)
+ {
+ ModifierPush pushMod(ModifierPush::PUSH_NORMAL);
+ BSONObj modObj = fromjson(bad[i]);
+ ASSERT_NOT_OK(pushMod.init(modObj.firstElement().embeddedObject().firstElement(),
+ ModifierInterface::Options::normal()));
+ i++;
+ }
+ }
+
+ TEST(ToPosition, GoodInputs) {
+ {
+ Document doc(fromjson("{a: []}"));
+ Mod pushMod(fromjson("{$push: {a: { $each: [1], $position: NumberLong(1)}}}"));
+
+ ModifierInterface::ExecInfo execInfo;
+ ASSERT_OK(pushMod.prepare(doc.root(), "", &execInfo));
+ }
+ {
+ Document doc(fromjson("{a: []}"));
+ Mod pushMod(fromjson("{$push: {a: { $each: [1], $position:NumberInt(100)}}}"));
+
+ ModifierInterface::ExecInfo execInfo;
+ ASSERT_OK(pushMod.prepare(doc.root(), "", &execInfo));
+ }
+ {
+ Document doc(fromjson("{a: []}"));
+ Mod pushMod(fromjson("{$push: {a: { $each: [1], $position:1.0}}}"));
+
+ ModifierInterface::ExecInfo execInfo;
+ ASSERT_OK(pushMod.prepare(doc.root(), "", &execInfo));
+ }
+ {
+ Document doc(fromjson("{a: []}"));
+ Mod pushMod(fromjson("{$push: {a: { $each: [1], $position:1000000}}}"));
+
+ ModifierInterface::ExecInfo execInfo;
+ ASSERT_OK(pushMod.prepare(doc.root(), "", &execInfo));
+ }
+ {
+ Document doc(fromjson("{a: []}"));
+ Mod pushMod(fromjson("{$push: {a: { $each: [1], $position:0}}}"));
+
+ ModifierInterface::ExecInfo execInfo;
+ ASSERT_OK(pushMod.prepare(doc.root(), "", &execInfo));
+ }
+ }
+
+ TEST(ToPosition, EmptyArrayFront) {
+ Document doc(fromjson("{a: []}"));
+ Mod pushMod(fromjson("{$push: {a: { $each: [1], $position:0}}}"));
+
+ ModifierInterface::ExecInfo execInfo;
+ ASSERT_OK(pushMod.prepare(doc.root(), "", &execInfo));
+
+ ASSERT_EQUALS(execInfo.fieldRef[0]->dottedField(), "a");
+ ASSERT_FALSE(execInfo.noOp);
+
+ ASSERT_OK(pushMod.apply());
+ ASSERT_FALSE(doc.isInPlaceModeEnabled());
+ ASSERT_EQUALS(fromjson("{a: [1]}"), doc);
+
+ Document logDoc;
+ LogBuilder logBuilder(logDoc.root());
+ ASSERT_OK(pushMod.log(&logBuilder));
+ ASSERT_EQUALS(countChildren(logDoc.root()), 1u);
+ ASSERT_EQUALS(fromjson("{$set: {'a':[1]}}"), logDoc);
+ }
+
+ TEST(ToPosition, EmptyArrayBackBigPosition) {
+ Document doc(fromjson("{a: []}"));
+ Mod pushMod(fromjson("{$push: {a: { $each: [1], $position:1000}}}"));
+
+ ModifierInterface::ExecInfo execInfo;
+ ASSERT_OK(pushMod.prepare(doc.root(), "", &execInfo));
+
+ ASSERT_EQUALS(execInfo.fieldRef[0]->dottedField(), "a");
+ ASSERT_FALSE(execInfo.noOp);
+
+ ASSERT_OK(pushMod.apply());
+ ASSERT_FALSE(doc.isInPlaceModeEnabled());
+ ASSERT_EQUALS(fromjson("{a: [1]}"), doc);
+
+ Document logDoc;
+ LogBuilder logBuilder(logDoc.root());
+ ASSERT_OK(pushMod.log(&logBuilder));
+ ASSERT_EQUALS(countChildren(logDoc.root()), 1u);
+ ASSERT_EQUALS(fromjson("{$set: {'a':[1]}}"), logDoc);
+ }
+
+ TEST(ToPosition, EmptyArrayBack) {
+ Document doc(fromjson("{a: []}"));
+ Mod pushMod(fromjson("{$push: {a: { $each: [1], $position:1}}}"));
+
+ ModifierInterface::ExecInfo execInfo;
+ ASSERT_OK(pushMod.prepare(doc.root(), "", &execInfo));
+
+ ASSERT_EQUALS(execInfo.fieldRef[0]->dottedField(), "a");
+ ASSERT_FALSE(execInfo.noOp);
+
+ ASSERT_OK(pushMod.apply());
+ ASSERT_FALSE(doc.isInPlaceModeEnabled());
+ ASSERT_EQUALS(fromjson("{a: [1]}"), doc);
+
+ Document logDoc;
+ LogBuilder logBuilder(logDoc.root());
+ ASSERT_OK(pushMod.log(&logBuilder));
+ ASSERT_EQUALS(countChildren(logDoc.root()), 1u);
+ ASSERT_EQUALS(fromjson("{$set: {'a':[1]}}"), logDoc);
+ }
+
+ TEST(ToPosition, Front) {
+ Document doc(fromjson("{a: [0]}"));
+ Mod pushMod(fromjson("{$push: {a: { $each: [1], $position:0}}}"));
+
+ ModifierInterface::ExecInfo execInfo;
+ ASSERT_OK(pushMod.prepare(doc.root(), "", &execInfo));
+
+ ASSERT_EQUALS(execInfo.fieldRef[0]->dottedField(), "a");
+ ASSERT_FALSE(execInfo.noOp);
+
+ ASSERT_OK(pushMod.apply());
+ ASSERT_FALSE(doc.isInPlaceModeEnabled());
+ ASSERT_EQUALS(fromjson("{a: [1, 0]}"), doc);
+
+ Document logDoc;
+ LogBuilder logBuilder(logDoc.root());
+ ASSERT_OK(pushMod.log(&logBuilder));
+ ASSERT_EQUALS(countChildren(logDoc.root()), 1u);
+ ASSERT_EQUALS(fromjson("{$set: {'a':[1, 0]}}"), logDoc);
+ }
+
+ TEST(ToPosition, Back) {
+ Document doc(fromjson("{a: [0]}"));
+ Mod pushMod(fromjson("{$push: {a: { $each: [1], $position:100}}}"));
+
+ ModifierInterface::ExecInfo execInfo;
+ ASSERT_OK(pushMod.prepare(doc.root(), "", &execInfo));
+
+ ASSERT_EQUALS(execInfo.fieldRef[0]->dottedField(), "a");
+ ASSERT_FALSE(execInfo.noOp);
+
+ ASSERT_OK(pushMod.apply());
+ ASSERT_FALSE(doc.isInPlaceModeEnabled());
+ ASSERT_EQUALS(fromjson("{a: [0,1]}"), doc);
+
+ Document logDoc;
+ LogBuilder logBuilder(logDoc.root());
+ ASSERT_OK(pushMod.log(&logBuilder));
+ ASSERT_EQUALS(countChildren(logDoc.root()), 1u);
+ ASSERT_EQUALS(fromjson("{$set: {'a.1':1}}"), logDoc);
+ }
+
} // unnamed namespace