summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/document_source_unwind.cpp
diff options
context:
space:
mode:
authorCharlie Swanson <charlie.swanson@mongodb.com>2015-10-02 13:51:04 -0400
committerCharlie Swanson <charlie.swanson@mongodb.com>2015-10-07 16:11:45 -0400
commit9c46f951450271310fa818a6dbb30411f088007d (patch)
tree7dcbc84b039674c9861b26062b0e2b93c594823e /src/mongo/db/pipeline/document_source_unwind.cpp
parenta1b78e4835486f19262a6673c210092dea8ca7e1 (diff)
downloadmongo-9c46f951450271310fa818a6dbb30411f088007d.tar.gz
SERVER-20595 The includeArrayIndex option to $unwind should specify a path
Diffstat (limited to 'src/mongo/db/pipeline/document_source_unwind.cpp')
-rw-r--r--src/mongo/db/pipeline/document_source_unwind.cpp81
1 files changed, 49 insertions, 32 deletions
diff --git a/src/mongo/db/pipeline/document_source_unwind.cpp b/src/mongo/db/pipeline/document_source_unwind.cpp
index c428ff7cfaa..e8ec3abed0b 100644
--- a/src/mongo/db/pipeline/document_source_unwind.cpp
+++ b/src/mongo/db/pipeline/document_source_unwind.cpp
@@ -43,7 +43,9 @@ using std::vector;
/** Helper class to unwind array from a single document. */
class DocumentSourceUnwind::Unwinder {
public:
- Unwinder(const FieldPath& unwindPath, bool preserveNullAndEmptyArrays, bool includeArrayIndex);
+ Unwinder(const FieldPath& unwindPath,
+ bool preserveNullAndEmptyArrays,
+ const boost::optional<FieldPath>& indexPath);
/** Reset the unwinder to unwind a new document. */
void resetDocument(const Document& document);
@@ -67,9 +69,9 @@ private:
// through the $unwind stage unmodified if '_preserveNullAndEmptyArrays' is true.
const bool _preserveNullAndEmptyArrays;
- // If set, the $unwind stage will replace the unwound field with a sub-document with structure
- // {index: <array index>, value: <array value>} instead of just the array value.
- const bool _includeArrayIndex;
+ // If set, the $unwind stage will include the array index in the specified path, overwriting any
+ // existing value, setting to null when the value was a non-array or empty array.
+ const boost::optional<FieldPath> _indexPath;
Value _inputArray;
@@ -84,10 +86,10 @@ private:
DocumentSourceUnwind::Unwinder::Unwinder(const FieldPath& unwindPath,
bool preserveNullAndEmptyArrays,
- bool includeArrayIndex)
+ const boost::optional<FieldPath>& indexPath)
: _unwindPath(unwindPath),
_preserveNullAndEmptyArrays(preserveNullAndEmptyArrays),
- _includeArrayIndex(includeArrayIndex) {}
+ _indexPath(indexPath) {}
void DocumentSourceUnwind::Unwinder::resetDocument(const Document& document) {
// Reset document specific attributes.
@@ -105,6 +107,10 @@ boost::optional<Document> DocumentSourceUnwind::Unwinder::getNext() {
return boost::none;
}
+ // Track which index this value came from. If 'includeArrayIndex' was specified, we will use
+ // this index in the output document, or null if the value didn't come from an array.
+ boost::optional<long long> indexForOutput;
+
if (_inputArray.getType() == Array) {
const size_t length = _inputArray.getArrayLength();
invariant(_index == 0 || _index < length);
@@ -122,13 +128,8 @@ boost::optional<Document> DocumentSourceUnwind::Unwinder::getNext() {
// clone. Because the value at the end will be replaced, everything along the path
// leading to that will be replaced in order not to share that change with any other
// clones (or the original).
- if (_includeArrayIndex) {
- _output.setNestedField(_unwindPathFieldIndexes,
- Value(DOC("index" << static_cast<long long>(_index)
- << "value" << _inputArray[_index])));
- } else {
- _output.setNestedField(_unwindPathFieldIndexes, _inputArray[_index]);
- }
+ _output.setNestedField(_unwindPathFieldIndexes, _inputArray[_index]);
+ indexForOutput = _index;
_index++;
_haveNext = _index < length;
}
@@ -139,22 +140,27 @@ boost::optional<Document> DocumentSourceUnwind::Unwinder::getNext() {
return boost::none;
}
} else {
- // Any non-nullish, non-array type should pass through unaltered.
+ // Any non-nullish, non-array type should pass through.
_haveNext = false;
}
+ if (_indexPath) {
+ _output.getNestedField(*_indexPath) =
+ indexForOutput ? Value(*indexForOutput) : Value(BSONNULL);
+ }
+
return _haveNext ? _output.peek() : _output.freeze();
}
DocumentSourceUnwind::DocumentSourceUnwind(const intrusive_ptr<ExpressionContext>& pExpCtx,
const FieldPath& fieldPath,
bool preserveNullAndEmptyArrays,
- bool includeArrayIndex)
+ const boost::optional<FieldPath>& indexPath)
: DocumentSource(pExpCtx),
_unwindPath(fieldPath),
_preserveNullAndEmptyArrays(preserveNullAndEmptyArrays),
- _includeArrayIndex(includeArrayIndex),
- _unwinder(new Unwinder(fieldPath, preserveNullAndEmptyArrays, includeArrayIndex)) {}
+ _indexPath(indexPath),
+ _unwinder(new Unwinder(fieldPath, preserveNullAndEmptyArrays, indexPath)) {}
REGISTER_DOCUMENT_SOURCE(unwind, DocumentSourceUnwind::createFromBson);
@@ -166,9 +172,12 @@ intrusive_ptr<DocumentSourceUnwind> DocumentSourceUnwind::create(
const intrusive_ptr<ExpressionContext>& expCtx,
const string& unwindPath,
bool preserveNullAndEmptyArrays,
- bool includeArrayIndex) {
- return new DocumentSourceUnwind(
- expCtx, FieldPath(unwindPath), preserveNullAndEmptyArrays, includeArrayIndex);
+ const boost::optional<string>& indexPath) {
+ return new DocumentSourceUnwind(expCtx,
+ FieldPath(unwindPath),
+ preserveNullAndEmptyArrays,
+ indexPath ? FieldPath(*indexPath)
+ : boost::optional<FieldPath>());
}
boost::optional<Document> DocumentSourceUnwind::getNext() {
@@ -191,11 +200,11 @@ boost::optional<Document> DocumentSourceUnwind::getNext() {
}
Value DocumentSourceUnwind::serialize(bool explain) const {
- return Value(DOC(getSourceName()
- << DOC("path" << _unwindPath.getPath(true) << "preserveNullAndEmptyArrays"
- << (_preserveNullAndEmptyArrays ? Value(true) : Value())
- << "includeArrayIndex"
- << (_includeArrayIndex ? Value(true) : Value()))));
+ return Value(DOC(getSourceName() << DOC(
+ "path" << _unwindPath.getPath(true) << "preserveNullAndEmptyArrays"
+ << (_preserveNullAndEmptyArrays ? Value(true) : Value())
+ << "includeArrayIndex"
+ << (_indexPath ? Value((*_indexPath).getPath(false)) : Value()))));
}
DocumentSource::GetDepsReturn DocumentSourceUnwind::getDependencies(DepsTracker* deps) const {
@@ -209,7 +218,7 @@ intrusive_ptr<DocumentSource> DocumentSourceUnwind::createFromBson(
// extra options.
string prefixedPathString;
bool preserveNullAndEmptyArrays = false;
- bool includeArrayIndex = false;
+ boost::optional<string> indexPath;
if (elem.type() == Object) {
for (auto&& subElem : elem.Obj()) {
if (subElem.fieldNameStringData() == "path") {
@@ -227,10 +236,15 @@ intrusive_ptr<DocumentSource> DocumentSourceUnwind::createFromBson(
preserveNullAndEmptyArrays = subElem.Bool();
} else if (subElem.fieldNameStringData() == "includeArrayIndex") {
uassert(28810,
- str::stream() << "expected a boolean for the includeArrayIndex option to "
- "$unwind stage, got " << typeName(subElem.type()),
- subElem.type() == Bool);
- includeArrayIndex = subElem.Bool();
+ str::stream() << "expected a non-empty string for the includeArrayIndex "
+ " option to $unwind stage, got "
+ << typeName(subElem.type()),
+ subElem.type() == String && !subElem.String().empty());
+ indexPath = subElem.String();
+ uassert(28822,
+ str::stream() << "includeArrayIndex option to $unwind stage should not be "
+ "prefixed with a '$': " << (*indexPath),
+ (*indexPath)[0] != '$');
} else {
uasserted(28811,
str::stream() << "unrecognized option to $unwind stage: "
@@ -248,8 +262,11 @@ intrusive_ptr<DocumentSource> DocumentSourceUnwind::createFromBson(
}
uassert(28812, "no path specified to $unwind stage", !prefixedPathString.empty());
+ uassert(28818,
+ str::stream() << "path option to $unwind stage should be prefixed with a '$': "
+ << prefixedPathString,
+ prefixedPathString[0] == '$');
string pathString(Expression::removeFieldPrefix(prefixedPathString));
- return DocumentSourceUnwind::create(
- pExpCtx, pathString, preserveNullAndEmptyArrays, includeArrayIndex);
+ return DocumentSourceUnwind::create(pExpCtx, pathString, preserveNullAndEmptyArrays, indexPath);
}
}