summaryrefslogtreecommitdiff
path: root/src/mongo/bson/util/bsoncolumnbuilder.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/bson/util/bsoncolumnbuilder.cpp')
-rw-r--r--src/mongo/bson/util/bsoncolumnbuilder.cpp660
1 files changed, 586 insertions, 74 deletions
diff --git a/src/mongo/bson/util/bsoncolumnbuilder.cpp b/src/mongo/bson/util/bsoncolumnbuilder.cpp
index c7dee5223cb..58e1f2ef9e8 100644
--- a/src/mongo/bson/util/bsoncolumnbuilder.cpp
+++ b/src/mongo/bson/util/bsoncolumnbuilder.cpp
@@ -28,6 +28,8 @@
*/
#include "mongo/bson/util/bsoncolumnbuilder.h"
+
+#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/bson/util/bsoncolumn_util.h"
#include "mongo/bson/util/simple8b_type_util.h"
@@ -41,6 +43,7 @@ namespace {
static constexpr uint8_t kMaxCount = 16;
static constexpr uint8_t kCountMask = 0x0F;
static constexpr uint8_t kControlMask = 0xF0;
+static constexpr std::ptrdiff_t kNoSimple8bControl = -1;
static constexpr std::array<uint8_t, Simple8bTypeUtil::kMemoryAsInteger + 1>
kControlByteForScaleIndex = {0x90, 0xA0, 0xB0, 0xC0, 0xD0, 0x80};
@@ -57,38 +60,362 @@ std::pair<int64_t, uint8_t> scaleAndEncodeDouble(double value, uint8_t minScaleI
return {*encoded, minScaleIndex - 1};
}
+// Checks if it is possible to do delta of ObjectIds
+bool objectIdDeltaPossible(BSONElement elem, BSONElement prev) {
+ return !memcmp(prev.OID().getInstanceUnique().bytes,
+ elem.OID().getInstanceUnique().bytes,
+ OID::kInstanceUniqueSize);
+}
+
+// Internal recursion function for traverseLockStep() when we just need to traverse reference
+// object.
+template <typename ElementFunc>
+void _traverse(const BSONObj& reference, const ElementFunc& elemFunc) {
+ for (const auto& elem : reference) {
+ if (elem.type() == Object) {
+ _traverse(elem.Obj(), elemFunc);
+ } else {
+ elemFunc(elem, BSONElement());
+ }
+ }
+}
+
+// Internal recursion function for traverseLockStep(). See documentation for traverseLockStep.
+template <typename ElementFunc>
+std::pair<BSONObj::iterator, bool> _traverseLockStep(const BSONObj& reference,
+ const BSONObj& obj,
+ const ElementFunc& elemFunc) {
+ auto it = obj.begin();
+ auto end = obj.end();
+ for (const auto& elem : reference) {
+ if (elem.type() == Object) {
+ BSONObj refObj = elem.Obj();
+ bool hasIt = it != end;
+ // If refObj is empty, there must also exist an empty object on 'it' for this to be
+ // valid. First we check that we have something on 'it'
+ if (!hasIt && refObj.isEmpty()) {
+ return {it, false};
+ }
+
+ bool elemMatch = hasIt && elem.fieldNameStringData() == it->fieldNameStringData();
+ if (elemMatch) {
+ // If 'reference' element is Object then 'obj' must also be Object.
+ if (it->type() != Object) {
+ return {it, false};
+ }
+
+ // Differences in empty objects are not allowed.
+ if (refObj.isEmpty() != it->Obj().isEmpty()) {
+ return {it, false};
+ }
+
+ // Everything match, recurse deeper.
+ auto [_, compatible] = _traverseLockStep(refObj, (it++)->Obj(), elemFunc);
+ if (!compatible) {
+ return {it, false};
+ }
+ } else {
+ // Assume field name at 'it' is coming later in 'reference'. Traverse as if it is
+ // missing from 'obj'. We don't increment the iterator in this case. If it is a
+ // mismatch we will detect that at end when 'it' is not at 'end'.
+ // Nothing can fail below this so traverse without all the checks.
+ _traverse(refObj, elemFunc);
+ }
+ } else {
+ // Non-object, call provided function with the two elements
+ elemFunc(elem,
+ it != end && elem.fieldNameStringData() == it->fieldNameStringData()
+ ? *(it++)
+ : BSONElement());
+ }
+ }
+ // Extra elements in 'obj' are not allowed. These needs to be merged in to 'reference' to be
+ // able to compress.
+ return {it, it == end};
+}
+
+// Traverses and validates BSONObj's in reference and obj in lock-step. Returns true if the object
+// hierarchies are compatible for sub-object compression. To be compatible fields in 'obj' must be
+// in the same order as in 'reference' and sub-objects in 'reference' must be sub-objects in 'obj'.
+// The only difference between the two objects that is allowed is missing fields in 'obj' compared
+// to 'reference'. 'ElementFunc' is called for every matching pair of BSONElement. Function
+// signature should be void(const BSONElement&, const BSONElement&).
+template <typename ElementFunc>
+bool traverseLockStep(const BSONObj& reference, const BSONObj& obj, ElementFunc elemFunc) {
+ auto [it, hierachyMatch] = _traverseLockStep(reference, obj, elemFunc);
+ // Extra elements in 'obj' are not allowed. These needs to be merged in to 'reference' to be
+ // able to compress.
+ return hierachyMatch && it == obj.end();
+}
+
+// Internal recursion function for mergeObj(). See documentation for mergeObj. Returns true if merge
+// was successful.
+bool _mergeObj(BSONObjBuilder* builder, const BSONObj& reference, const BSONObj& obj) {
+ auto refIt = reference.begin();
+ auto refEnd = reference.end();
+ auto it = obj.begin();
+ auto end = obj.end();
+
+ // Iterate until we reach end of any of the two objects.
+ while (refIt != refEnd && it != end) {
+ StringData name = refIt->fieldNameStringData();
+ if (name == it->fieldNameStringData()) {
+ bool refIsObj = refIt->type() == Object;
+ bool itIsObj = it->type() == Object;
+
+ if (refIsObj && itIsObj) {
+ BSONObj refObj = refIt->Obj();
+ BSONObj itObj = it->Obj();
+ // There may not be a mismatch in empty objects
+ if (refObj.isEmpty() != itObj.isEmpty())
+ return false;
+
+ // Recurse deeper
+ BSONObjBuilder subBuilder = builder->subobjStart(name);
+ bool res = _mergeObj(&subBuilder, refObj, itObj);
+ if (!res) {
+ return false;
+ }
+ } else if (refIsObj || itIsObj) {
+ // Both or neither elements must be Object to be mergable
+ return false;
+ } else {
+ // If name match and neither is Object we can append from reference and increment
+ // both objects.
+ builder->append(*refIt);
+ }
+
+ ++refIt;
+ ++it;
+ continue;
+ }
+
+ // Name mismatch, first search in 'obj' if reference element exist later.
+ auto n = std::next(it);
+ auto namePos = std::find_if(
+ n, end, [&name](const auto& elem) { return elem.fieldNameStringData() == name; });
+ if (namePos == end) {
+ // Reference element does not exist in 'obj' so add it and continue merging with just
+ // this iterator incremented.
+ builder->append(*(refIt++));
+ } else {
+ // Reference element do exist later in 'obj'. Add element in 'it' if it is the first
+ // time we see it, fail otherwise (incompatible ordering).
+ if (builder->hasField(it->fieldNameStringData())) {
+ return false;
+ }
+ builder->append(*(it++));
+ }
+ }
+
+ // Add remaining reference elements when we reached end in 'obj'.
+ for (; refIt != refEnd; ++refIt) {
+ // We cannot allow empty object mismatch
+ if (refIt->type() == Object && refIt->Obj().isEmpty()) {
+ return false;
+ }
+ if (builder->hasField(refIt->fieldNameStringData())) {
+ return false;
+ }
+ builder->append(*refIt);
+ }
+
+ // Add remaining 'obj' elements when we reached end in 'reference'.
+ for (; it != end; ++it) {
+ // We cannot allow empty object mismatch
+ if (it->type() == Object && it->Obj().isEmpty()) {
+ return false;
+ }
+
+ if (builder->hasField(it->fieldNameStringData())) {
+ return false;
+ }
+ builder->append(*it);
+ }
+
+ return true;
+}
+
+// Tries to merge in elements from 'obj' into 'reference'. For successful merge the elements that
+// already exist in 'reference' must be in 'obj' in the same order. The merged object is returned in
+// case of a successful merge, empty BSONObj is returned for failure. This is quite an expensive
+// operation as we are merging unsorted objects. Time complexity is O(N^2).
+BSONObj mergeObj(const BSONObj& reference, const BSONObj& obj) {
+ BSONObjBuilder builder;
+ if (!_mergeObj(&builder, reference, obj)) {
+ builder.abandon();
+ return BSONObj();
+ }
+
+ return builder.obj();
+}
+
} // namespace
BSONColumnBuilder::BSONColumnBuilder(StringData fieldName)
: BSONColumnBuilder(fieldName, BufBuilder()) {}
BSONColumnBuilder::BSONColumnBuilder(StringData fieldName, BufBuilder&& builder)
- : _simple8bBuilder64(_createBufferWriter()),
- _simple8bBuilder128(_createBufferWriter()),
- _scaleIndex(Simple8bTypeUtil::kMemoryAsInteger),
- _bufBuilder(std::move(builder)),
- _fieldName(fieldName) {
+ : _state(&_bufBuilder, nullptr), _bufBuilder(std::move(builder)), _fieldName(fieldName) {
// Leave space for element count at the beginning
static_assert(sizeof(_elementCount) == kElementCountBytes,
"Element count for BSONColumn should be 4 bytes");
_bufBuilder.reset();
_bufBuilder.skip(kElementCountBytes);
- // Store EOO type with empty field name as previous.
- _storePrevious(BSONElement());
-}
-
-BSONElement BSONColumnBuilder::_previous() const {
- return {_prev.get(), 1, _prevSize, BSONElement::CachedSizeTag{}};
}
BSONColumnBuilder& BSONColumnBuilder::append(BSONElement elem) {
auto type = elem.type();
uassert(ErrorCodes::InvalidBSONType,
- "MinKey or MaxKey is not supported by BSON Column (subtype 7)",
+ "MinKey or MaxKey is not valid for storage",
type != MinKey && type != MaxKey);
- auto previous = _previous();
+ if (type != Object || elem.Obj().isEmpty()) {
+ // Flush previous sub-object compression when non-object is appended
+ if (_mode != Mode::kRegular) {
+ _flushSubObjMode();
+ }
+ _state.append(elem);
+ ++_elementCount;
+ return *this;
+ }
+
+
+ if (_mode == Mode::kRegular) {
+ _startDetermineSubObjReference(elem.Obj());
+ ++_elementCount;
+ return *this;
+ }
+
+ if (_mode == Mode::kSubObjDeterminingReference) {
+ auto obj = elem.Obj();
+
+ // We are in DeterminingReference mode, check if this current object is compatible and merge
+ // in any new fields that are discovered.
+ uint32_t numElements = 0;
+ if (!traverseLockStep(
+ _referenceSubObj,
+ obj,
+ [this, &numElements](const BSONElement& ref, const BSONElement& elem) {
+ ++numElements;
+ uassert(ErrorCodes::InvalidBSONType,
+ "MinKey or MaxKey is not valid for storage",
+ elem.type() != MinKey && elem.type() != MaxKey);
+ })) {
+ BSONObj merged = mergeObj(_referenceSubObj, obj);
+ if (merged.isEmptyPrototype()) {
+ // If merge failed, flush current sub-object compression and start over.
+ _flushSubObjMode();
+
+ _referenceSubObj = obj.getOwned();
+ _bufferedObjElements.push_back(_referenceSubObj);
+ _mode = Mode::kSubObjDeterminingReference;
+ ++_elementCount;
+ return *this;
+ }
+ _referenceSubObj = merged;
+ }
+
+ // If we've buffered twice as many objects as we have sub-elements we will achieve good
+ // compression so use the currently built reference.
+ if (numElements * 2 >= _bufferedObjElements.size()) {
+ _bufferedObjElements.push_back(obj.getOwned());
+ ++_elementCount;
+ return *this;
+ }
+
+ _finishDetermineSubObjReference();
+ }
+
+ // Reference already determined for sub-object compression, try to add this new object.
+ _appendSubElements(elem.Obj());
++_elementCount;
+ return *this;
+}
+
+
+BSONColumnBuilder& BSONColumnBuilder::skip() {
+ ++_elementCount;
+ if (_mode == Mode::kRegular) {
+ _state.skip();
+ } else if (_mode == Mode::kSubObjDeterminingReference) {
+ _bufferedObjElements.push_back(BSONObj());
+ } else {
+ for (auto&& state : _subobjStates) {
+ state.skip();
+ }
+ }
+
+ return *this;
+}
+
+BSONBinData BSONColumnBuilder::finalize() {
+ if (_mode == Mode::kRegular) {
+ _state.flush();
+ } else {
+ _flushSubObjMode();
+ }
+
+ // Write EOO at the end
+ _bufBuilder.appendChar(EOO);
+
+ // Write element count at the beginning
+ DataView(_bufBuilder.buf()).write<LittleEndian<uint32_t>>(_elementCount);
+
+ return {_bufBuilder.buf(), _bufBuilder.len(), BinDataType::Column};
+}
+
+BufBuilder BSONColumnBuilder::detach() {
+ return std::move(_bufBuilder);
+}
+
+BSONColumnBuilder::EncodingState::EncodingState(
+ BufBuilder* bufBuilder, std::function<void(const char*, size_t)> controlBlockWriter)
+ : _simple8bBuilder64(_createBufferWriter()),
+ _simple8bBuilder128(_createBufferWriter()),
+ _controlByteOffset(kNoSimple8bControl),
+ _scaleIndex(Simple8bTypeUtil::kMemoryAsInteger),
+ _bufBuilder(bufBuilder),
+ _controlBlockWriter(controlBlockWriter) {
+ // Store EOO type with empty field name as previous.
+ _storePrevious(BSONElement());
+}
+
+BSONColumnBuilder::EncodingState::EncodingState(EncodingState&& other)
+ : _prev(std::move(other._prev)),
+ _prevSize(std::move(other._prevSize)),
+ _prevCapacity(std::move(other._prevCapacity)),
+ _prevDelta(std::move(other._prevDelta)),
+ _simple8bBuilder64(_createBufferWriter()),
+ _simple8bBuilder128(_createBufferWriter()),
+ _storeWith128(std::move(other._storeWith128)),
+ _controlByteOffset(std::move(other._controlByteOffset)),
+ _prevEncoded64(std::move(other._prevEncoded64)),
+ _prevEncoded128(std::move(other._prevEncoded128)),
+ _lastValueInPrevBlock(std::move(other._lastValueInPrevBlock)),
+ _scaleIndex(std::move(other._scaleIndex)),
+ _bufBuilder(std::move(other._bufBuilder)),
+ _controlBlockWriter(std::move(other._controlBlockWriter)) {}
+
+BSONColumnBuilder::EncodingState& BSONColumnBuilder::EncodingState::operator=(EncodingState&& rhs) {
+ _prev = std::move(rhs._prev);
+ _prevSize = std::move(rhs._prevSize);
+ _prevCapacity = std::move(rhs._prevCapacity);
+ _prevDelta = std::move(rhs._prevDelta);
+ _storeWith128 = std::move(rhs._storeWith128);
+ _controlByteOffset = std::move(rhs._controlByteOffset);
+ _prevEncoded64 = std::move(rhs._prevEncoded64);
+ _prevEncoded128 = std::move(rhs._prevEncoded128);
+ _lastValueInPrevBlock = std::move(rhs._lastValueInPrevBlock);
+ _scaleIndex = std::move(rhs._scaleIndex);
+ _bufBuilder = std::move(rhs._bufBuilder);
+ _controlBlockWriter = std::move(rhs._controlBlockWriter);
+ return *this;
+}
+
+void BSONColumnBuilder::EncodingState::append(BSONElement elem) {
+ auto type = elem.type();
+ auto previous = _previous();
// If we detect a type change (or this is first value). Flush all pending values in Simple-8b
// and write uncompressed literal. Reset all default values.
@@ -96,9 +423,8 @@ BSONColumnBuilder& BSONColumnBuilder::append(BSONElement elem) {
_storePrevious(elem);
_simple8bBuilder128.flush();
_simple8bBuilder64.flush();
- _storeWith128 = uses128bit(type);
_writeLiteralFromPrevious();
- return *this;
+ return;
}
// Store delta in Simple-8b if types match
@@ -161,7 +487,7 @@ BSONColumnBuilder& BSONColumnBuilder::append(BSONElement elem) {
value = calcDelta(elem._numberLong(), previous._numberLong());
break;
case jstOID: {
- encodingPossible = _objectIdDeltaPossible(elem, previous);
+ encodingPossible = objectIdDeltaPossible(elem, previous);
if (!encodingPossible)
break;
@@ -212,11 +538,32 @@ BSONColumnBuilder& BSONColumnBuilder::append(BSONElement elem) {
_simple8bBuilder64.flush();
_writeLiteralFromPrevious();
}
+}
- return *this;
+void BSONColumnBuilder::EncodingState::skip() {
+ auto before = _bufBuilder->len();
+ if (_storeWith128) {
+ _simple8bBuilder128.skip();
+ } else {
+ _simple8bBuilder64.skip();
+ }
+ // Rescale previous known value if this skip caused Simple-8b blocks to be written
+ if (before != _bufBuilder->len() && _previous().type() == NumberDouble) {
+ std::tie(_prevEncoded64, _scaleIndex) = scaleAndEncodeDouble(_lastValueInPrevBlock, 0);
+ }
}
-boost::optional<Simple8bBuilder<uint64_t>> BSONColumnBuilder::_tryRescalePending(
+void BSONColumnBuilder::EncodingState::flush() {
+ _simple8bBuilder128.flush();
+ _simple8bBuilder64.flush();
+
+ if (_controlByteOffset != kNoSimple8bControl && _controlBlockWriter) {
+ _controlBlockWriter(_bufBuilder->buf() + _controlByteOffset,
+ _bufBuilder->len() - _controlByteOffset);
+ }
+}
+
+boost::optional<Simple8bBuilder<uint64_t>> BSONColumnBuilder::EncodingState::_tryRescalePending(
int64_t encoded, uint8_t newScaleIndex) {
// Encode last value in the previous block with old and new scale index. We know that scaling
// with the old index is possible.
@@ -275,7 +622,7 @@ boost::optional<Simple8bBuilder<uint64_t>> BSONColumnBuilder::_tryRescalePending
return builder;
}
-bool BSONColumnBuilder::_appendDouble(double value, double previous) {
+bool BSONColumnBuilder::EncodingState::_appendDouble(double value, double previous) {
// Scale with lowest possible scale index
auto [encoded, scaleIndex] = scaleAndEncodeDouble(value, _scaleIndex);
@@ -295,7 +642,7 @@ bool BSONColumnBuilder::_appendDouble(double value, double previous) {
// Re-scale not possible, flush and start new block with the higher scale factor
_simple8bBuilder64.flush();
- _controlByteOffset = 0;
+ _controlByteOffset = kNoSimple8bControl;
// Make sure value and previous are using the same scale factor.
uint8_t prevScaleIndex;
@@ -311,12 +658,12 @@ bool BSONColumnBuilder::_appendDouble(double value, double previous) {
// Append delta and check if we wrote a Simple8b block. If we did we may be able to reduce the
// scale factor when starting a new block
- auto before = _bufBuilder.len();
+ auto before = _bufBuilder->len();
if (!_simple8bBuilder64.append(
Simple8bTypeUtil::encodeInt64(calcDelta(encoded, _prevEncoded64))))
return false;
- if (_bufBuilder.len() != before) {
+ if (_bufBuilder->len() != before) {
// Reset the scale factor to 0 and append all pending values to a new Simple8bBuilder. In
// the worse case we will end up with an identical scale factor.
auto prevScale = _scaleIndex;
@@ -346,41 +693,12 @@ bool BSONColumnBuilder::_appendDouble(double value, double previous) {
return true;
}
-BSONColumnBuilder& BSONColumnBuilder::skip() {
- ++_elementCount;
-
- auto before = _bufBuilder.len();
- if (_storeWith128) {
- _simple8bBuilder128.skip();
- } else {
- _simple8bBuilder64.skip();
- }
- // Rescale previous known value if this skip caused Simple-8b blocks to be written
- if (before != _bufBuilder.len() && _previous().type() == NumberDouble) {
- std::tie(_prevEncoded64, _scaleIndex) = scaleAndEncodeDouble(_lastValueInPrevBlock, 0);
- }
- return *this;
-}
-
-BSONBinData BSONColumnBuilder::finalize() {
- _simple8bBuilder128.flush();
- _simple8bBuilder64.flush();
-
- // Write EOO at the end
- _bufBuilder.appendChar(EOO);
-
- // Write element count at the beginning
- DataView(_bufBuilder.buf()).write<LittleEndian<uint32_t>>(_elementCount);
-
- return {_bufBuilder.buf(), _bufBuilder.len(), BinDataType::Column};
-}
-
-BufBuilder BSONColumnBuilder::detach() {
- return std::move(_bufBuilder);
+BSONElement BSONColumnBuilder::EncodingState::_previous() const {
+ return {_prev.get(), 1, _prevSize, BSONElement::CachedSizeTag{}};
}
-void BSONColumnBuilder::_storePrevious(BSONElement elem) {
+void BSONColumnBuilder::EncodingState::_storePrevious(BSONElement elem) {
auto valuesize = elem.valuesize();
// Add space for type byte and field name null terminator
@@ -401,19 +719,33 @@ void BSONColumnBuilder::_storePrevious(BSONElement elem) {
_prevSize = size;
}
-void BSONColumnBuilder::_writeLiteralFromPrevious() {
+void BSONColumnBuilder::EncodingState::_writeLiteralFromPrevious() {
// Write literal without field name and reset control byte to force new one to be written when
// appending next value.
- auto prevElem = _previous();
- _bufBuilder.appendBuf(_prev.get(), _prevSize);
+ if (_controlByteOffset != kNoSimple8bControl && _controlBlockWriter) {
+ _controlBlockWriter(_bufBuilder->buf() + _controlByteOffset,
+ _bufBuilder->len() - _controlByteOffset);
+ }
+ _bufBuilder->appendBuf(_prev.get(), _prevSize);
+ if (_controlBlockWriter) {
+ _controlBlockWriter(_bufBuilder->buf() + _bufBuilder->len() - _prevSize, _prevSize);
+ }
+
// Reset state
- _controlByteOffset = 0;
+ _controlByteOffset = kNoSimple8bControl;
_scaleIndex = Simple8bTypeUtil::kMemoryAsInteger;
_prevDelta = 0;
+ _initializeFromPrevious();
+}
+
+void BSONColumnBuilder::EncodingState::_initializeFromPrevious() {
// Initialize previous encoded when needed
- switch (prevElem.type()) {
+ auto prevElem = _previous();
+ auto type = prevElem.type();
+ _storeWith128 = uses128bit(type);
+ switch (type) {
case NumberDouble:
_lastValueInPrevBlock = prevElem._numberDouble();
std::tie(_prevEncoded64, _scaleIndex) = scaleAndEncodeDouble(_lastValueInPrevBlock, 0);
@@ -438,27 +770,31 @@ void BSONColumnBuilder::_writeLiteralFromPrevious() {
}
}
-void BSONColumnBuilder::_incrementSimple8bCount() {
+ptrdiff_t BSONColumnBuilder::EncodingState::_incrementSimple8bCount() {
char* byte;
uint8_t count;
uint8_t control = kControlByteForScaleIndex[_scaleIndex];
- if (_controlByteOffset == 0) {
+ if (_controlByteOffset == kNoSimple8bControl) {
// Allocate new control byte if we don't already have one. Record its offset so we can find
// it even if the underlying buffer reallocates.
- byte = _bufBuilder.skip(1);
- _controlByteOffset = std::distance(_bufBuilder.buf(), byte);
+ byte = _bufBuilder->skip(1);
+ _controlByteOffset = std::distance(_bufBuilder->buf(), byte);
count = 0;
} else {
// Read current count from previous control byte
- byte = _bufBuilder.buf() + _controlByteOffset;
+ byte = _bufBuilder->buf() + _controlByteOffset;
// If previous byte was written with a different control byte then we can't re-use and need
// to start a new one
if ((*byte & kControlMask) != control) {
- _controlByteOffset = 0;
+ if (_controlBlockWriter) {
+ _controlBlockWriter(_bufBuilder->buf() + _controlByteOffset,
+ _bufBuilder->len() - _controlByteOffset);
+ }
+ _controlByteOffset = kNoSimple8bControl;
_incrementSimple8bCount();
- return;
+ return kNoSimple8bControl;
}
count = (*byte & kCountMask) + 1;
}
@@ -466,17 +802,27 @@ void BSONColumnBuilder::_incrementSimple8bCount() {
// Write back new count and clear offset if we have reached max count
*byte = control | (count & kCountMask);
if (count + 1 == kMaxCount) {
- _controlByteOffset = 0;
+ auto prevControlByteOffset = _controlByteOffset;
+ _controlByteOffset = kNoSimple8bControl;
+ return prevControlByteOffset;
}
+
+ return kNoSimple8bControl;
}
-Simple8bWriteFn BSONColumnBuilder::_createBufferWriter() {
+Simple8bWriteFn BSONColumnBuilder::EncodingState::_createBufferWriter() {
return [this](uint64_t block) {
// Write/update block count
- _incrementSimple8bCount();
+ ptrdiff_t fullControlOffset = _incrementSimple8bCount();
// Write Simple-8b block in little endian byte order
- _bufBuilder.appendNum(block);
+ _bufBuilder->appendNum(block);
+
+ // Write control block if this Simple-8b block made it full.
+ if (_controlBlockWriter && fullControlOffset != kNoSimple8bControl) {
+ _controlBlockWriter(_bufBuilder->buf() + fullControlOffset,
+ _bufBuilder->len() - fullControlOffset);
+ }
auto previous = _previous();
if (previous.type() == NumberDouble) {
@@ -487,10 +833,176 @@ Simple8bWriteFn BSONColumnBuilder::_createBufferWriter() {
};
}
-bool BSONColumnBuilder::_objectIdDeltaPossible(BSONElement elem, BSONElement prev) {
- return !memcmp(prev.OID().getInstanceUnique().bytes,
- elem.OID().getInstanceUnique().bytes,
- OID::kInstanceUniqueSize);
+void BSONColumnBuilder::_appendSubElements(const BSONObj& obj) {
+ // Check if added object is compatible with selected reference object. Collect a flat vector of
+ // all elements while we are doing this.
+ _flattenedAppendedObj.clear();
+ if (!traverseLockStep(
+ _referenceSubObj, obj, [this](const BSONElement& ref, const BSONElement& elem) {
+ uassert(ErrorCodes::InvalidBSONType,
+ "MinKey or MaxKey is not valid for storage",
+ elem.type() != MinKey && elem.type() != MaxKey);
+ _flattenedAppendedObj.push_back(elem);
+ })) {
+ _flushSubObjMode();
+ _startDetermineSubObjReference(obj);
+ return;
+ }
+
+ // We should have recieved one callback for every sub-element in reference object. This should
+ // match number of encoding states setup previously.
+ invariant(_flattenedAppendedObj.size() == _subobjStates.size());
+ auto statesIt = _subobjStates.begin();
+ auto subElemIt = _flattenedAppendedObj.begin();
+ auto subElemEnd = _flattenedAppendedObj.end();
+
+ // Append elements to corresponding encoding state.
+ for (; subElemIt != subElemEnd; ++subElemIt, ++statesIt) {
+ const auto& subelem = *subElemIt;
+ auto& state = *statesIt;
+ if (!subelem.eoo())
+ state.append(subelem);
+ else
+ state.skip();
+ }
+}
+
+void BSONColumnBuilder::_startDetermineSubObjReference(const BSONObj& obj) {
+ // Start sub-object compression. Enter DeterminingReference mode, we use this first Object
+ // as the first reference
+ _state.flush();
+ _state = {&_bufBuilder, nullptr};
+
+ _traverse(obj, [](const BSONElement& elem, const BSONElement&) {
+ uassert(ErrorCodes::InvalidBSONType,
+ "MinKey or MaxKey is not valid for storage",
+ elem.type() != MinKey && elem.type() != MaxKey);
+ });
+
+ _referenceSubObj = obj.getOwned();
+ _bufferedObjElements.push_back(_referenceSubObj);
+ _mode = Mode::kSubObjDeterminingReference;
+}
+
+void BSONColumnBuilder::_finishDetermineSubObjReference() {
+ // Done determining reference sub-object. Write this control byte and object to stream.
+ _bufBuilder.appendChar(bsoncolumn::kInterleavedStartControlByte);
+ _bufBuilder.appendBuf(_referenceSubObj.objdata(), _referenceSubObj.objsize());
+
+ // Initialize all encoding states. We do this by traversing in lock-step between the reference
+ // object and first buffered element. We can use the fact if sub-element exists in reference to
+ // determine if we should start with a zero delta or skip.
+ bool res =
+ traverseLockStep(_referenceSubObj,
+ _bufferedObjElements.front(),
+ [this](const BSONElement& ref, const BSONElement& elem) {
+ _subobjBuffers.emplace_back();
+ auto* buffer = &_subobjBuffers.back().first;
+ auto* controlBlocks = &_subobjBuffers.back().second;
+
+ // We need to buffer all control blocks written by the EncodingStates
+ // so they can be added to the main buffer in the right order.
+ auto controlBlockWriter = [buffer, controlBlocks](
+ const char* controlBlock, size_t size) {
+ controlBlocks->emplace_back(controlBlock - buffer->buf(), size);
+ };
+
+ // Set a valid 'previous' into the encoding state to avoid a full
+ // literal to be written when we append the first element. We want this
+ // to be a zero delta as the reference object already contain this
+ // literal.
+ _subobjStates.emplace_back(buffer, controlBlockWriter);
+ _subobjStates.back()._storePrevious(ref);
+ _subobjStates.back()._initializeFromPrevious();
+ if (!elem.eoo()) {
+ _subobjStates.back().append(elem);
+ } else {
+ _subobjStates.back().skip();
+ }
+ });
+ invariant(res);
+ _mode = Mode::kSubObjAppending;
+
+ // Append remaining buffered objects.
+ auto it = _bufferedObjElements.begin() + 1;
+ auto end = _bufferedObjElements.end();
+ for (; it != end; ++it) {
+ _appendSubElements(*it);
+ }
+ _bufferedObjElements.clear();
+}
+
+void BSONColumnBuilder::_flushSubObjMode() {
+ if (_mode == Mode::kSubObjDeterminingReference) {
+ _finishDetermineSubObjReference();
+ }
+
+ // Flush all EncodingStates, this will cause them to write out all their elements that is
+ // captured by the controlBlockWriter.
+ for (auto&& state : _subobjStates) {
+ state.flush();
+ }
+
+ // We now need to write all control blocks to the binary stream in the right order. This is done
+ // in the decoder's perspective where a DecodingState that exhausts its elements will read the
+ // next control byte. We can use a min-heap to see which encoding states have written the fewest
+ // elements so far. In case of tie we use the smallest encoder/decoder index.
+ std::vector<std::pair<uint32_t /* num elements written */, uint32_t /* encoder index */>> heap;
+ for (uint32_t i = 0; i < _subobjBuffers.size(); ++i) {
+ heap.emplace_back(0, i);
+ }
+
+ // Initialize as min-heap
+ using MinHeap = std::greater<std::pair<uint32_t, uint32_t>>;
+ std::make_heap(heap.begin(), heap.end(), MinHeap());
+
+ // Append all control blocks
+ while (!heap.empty()) {
+ // Take out encoding state with fewest elements written from heap
+ std::pop_heap(heap.begin(), heap.end(), MinHeap());
+ // And we take out control blocks in FIFO order from this encoding state
+ auto& slot = _subobjBuffers[heap.back().second];
+ const char* controlBlock = slot.first.buf() + slot.second.front().first;
+ size_t size = slot.second.front().second;
+
+ // Write it to the buffer
+ _bufBuilder.appendBuf(controlBlock, size);
+ slot.second.pop_front();
+ if (slot.second.empty()) {
+ // No more control blocks for this encoding state so remove it from the heap
+ heap.pop_back();
+ continue;
+ }
+
+ // Calculate how many elements were in this control block
+ uint32_t elems = [&]() -> uint32_t {
+ if (bsoncolumn::isLiteralControlByte(*controlBlock)) {
+ return 1;
+ }
+
+ Simple8b<uint128_t> reader(
+ controlBlock + 1,
+ sizeof(uint64_t) * bsoncolumn::numSimple8bBlocksForControlByte(*controlBlock));
+
+ uint32_t num = 0;
+ auto it = reader.begin();
+ auto end = reader.end();
+ while (it != end) {
+ num += it.blockSize();
+ it.advanceBlock();
+ }
+ return num;
+ }();
+
+ // Append num elements and put this encoding state back into the heap.
+ heap.back().first += elems;
+ std::push_heap(heap.begin(), heap.end(), MinHeap());
+ }
+ // All control blocks written, write EOO to end the interleaving and cleanup.
+ _bufBuilder.appendChar(EOO);
+ _subobjStates.clear();
+ _subobjBuffers.clear();
+ _mode = Mode::kRegular;
}
} // namespace mongo