diff options
Diffstat (limited to 'src/mongo/bson')
-rw-r--r-- | src/mongo/bson/util/bsoncolumn.cpp | 21 | ||||
-rw-r--r-- | src/mongo/bson/util/bsoncolumn.h | 4 | ||||
-rw-r--r-- | src/mongo/bson/util/bsoncolumn_test.cpp | 614 | ||||
-rw-r--r-- | src/mongo/bson/util/bsoncolumn_util.h | 9 | ||||
-rw-r--r-- | src/mongo/bson/util/bsoncolumnbuilder.cpp | 660 | ||||
-rw-r--r-- | src/mongo/bson/util/bsoncolumnbuilder.h | 122 |
6 files changed, 1243 insertions, 187 deletions
diff --git a/src/mongo/bson/util/bsoncolumn.cpp b/src/mongo/bson/util/bsoncolumn.cpp index 1cf7185f782..c902c1e9487 100644 --- a/src/mongo/bson/util/bsoncolumn.cpp +++ b/src/mongo/bson/util/bsoncolumn.cpp @@ -92,17 +92,10 @@ public: private: bool _traverse(StringData fieldName, const BSONObj& obj) { [[maybe_unused]] auto raii = _enterFunc(fieldName, obj); - for (const auto& elem : obj) { - bool result = true; - if (elem.type() == Object) { - result = _traverse(elem.fieldNameStringData(), elem.Obj()); - } else { - result = _elemFunc(elem); - } - if (!result) - return false; - } - return true; + return std::all_of(obj.begin(), obj.end(), [this, &fieldName](auto&& elem) { + return elem.type() == Object ? _traverse(elem.fieldNameStringData(), elem.Obj()) + : _elemFunc(elem); + }); } EnterSubObjFunc _enterFunc; @@ -149,7 +142,7 @@ const char* BSONColumn::ElementStorage::ContiguousBlock::done() { } char* BSONColumn::ElementStorage::allocate(int bytes) { - // If current block don't have enough capacity we need to allocate a new one + // If current block doesn't have enough capacity we need to allocate a new one. if (_capacity - _pos < bytes) { // Keep track of current block if it exists. if (_block) { @@ -371,7 +364,7 @@ void BSONColumn::Iterator::_incrementRegular() { // Load new control byte if (_isInterleavedStart(*_control)) { - // Remember this position to speed up "random access" for futher access. + // Remember this position to speed up "random access" for further access. _column->_maxDecodingStartPos.setIfLarger(_index, _control); _initializeInterleaving(); @@ -384,7 +377,7 @@ void BSONColumn::Iterator::_incrementRegular() { auto prevControl = _control; _control += result.size; if (result.full) { - // Remember this position to speed up "random access" for futher access. + // Remember this position to speed up "random access" for further access. _column->_maxDecodingStartPos.setIfLarger(_index, prevControl); } } diff --git a/src/mongo/bson/util/bsoncolumn.h b/src/mongo/bson/util/bsoncolumn.h index 86db713dd65..45316af7a6d 100644 --- a/src/mongo/bson/util/bsoncolumn.h +++ b/src/mongo/bson/util/bsoncolumn.h @@ -259,6 +259,10 @@ private: */ class ElementStorage { public: + /** + * "Writable" BSONElement. Provides access to a writable pointer for writing the value of + * the BSONElement. Users must write valid BSON data depending on the requested BSON type. + */ class Element { public: Element(char* buffer, int nameSize, int valueSize); diff --git a/src/mongo/bson/util/bsoncolumn_test.cpp b/src/mongo/bson/util/bsoncolumn_test.cpp index f895235b22c..12772fb478b 100644 --- a/src/mongo/bson/util/bsoncolumn_test.cpp +++ b/src/mongo/bson/util/bsoncolumn_test.cpp @@ -364,9 +364,10 @@ public: static void verifyBinary(BSONBinData columnBinary, const BufBuilder& expected) { ASSERT_EQ(columnBinary.type, BinDataType::Column); - ASSERT_EQ(columnBinary.length, expected.len()); + auto buf = expected.buf(); + ASSERT_EQ(columnBinary.length, expected.len()); ASSERT_EQ(memcmp(columnBinary.data, buf, columnBinary.length), 0); } @@ -2201,14 +2202,12 @@ TEST_F(BSONColumnTest, StringMultiType) { } TEST_F(BSONColumnTest, ObjectUncompressed) { - BSONColumnBuilder cb("test"_sd); + // BSONColumnBuilder does not produce this kind of binary where Objects are stored uncompressed. + // However they are valid according to the specification so verify that we can decompress. + std::vector<BSONElement> elems = {createElementObj(BSON("x" << 1 << "y" << 2)), createElementObj(BSON("x" << 1 << "y" << 3))}; - for (auto elem : elems) { - cb.append(elem); - } - BufBuilder expected; appendElementCount(expected, elems.size()); for (auto elem : elems) { @@ -2216,20 +2215,20 @@ TEST_F(BSONColumnTest, ObjectUncompressed) { } appendEOO(expected); - auto binData = cb.finalize(); - verifyBinary(binData, expected); - verifyDecompression(binData, elems); + BSONBinData data; + data.data = expected.buf(); + data.length = expected.len(); + data.type = BinDataType::Column; + verifyDecompression(data, elems); } TEST_F(BSONColumnTest, ObjectEqual) { - BSONColumnBuilder cb("test"_sd); + // BSONColumnBuilder does not produce this kind of binary where Objects are stored uncompressed. + // However they are valid according to the specification so verify that we can decompress. + auto elemObj = createElementObj(BSON("x" << 1 << "y" << 2)); std::vector<BSONElement> elems = {elemObj, elemObj}; - for (auto elem : elems) { - cb.append(elem); - } - BufBuilder expected; appendElementCount(expected, elems.size()); appendLiteral(expected, elemObj); @@ -2237,9 +2236,11 @@ TEST_F(BSONColumnTest, ObjectEqual) { appendSimple8bBlock64(expected, kDeltaForBinaryEqualValues); appendEOO(expected); - auto binData = cb.finalize(); - verifyBinary(binData, expected); - verifyDecompression(binData, elems); + BSONBinData data; + data.data = expected.buf(); + data.length = expected.len(); + data.type = BinDataType::Column; + verifyDecompression(data, elems); } TEST_F(BSONColumnTest, ArrayUncompressed) { @@ -2286,6 +2287,8 @@ TEST_F(BSONColumnTest, ArrayEqual) { } TEST_F(BSONColumnTest, Interleaved) { + BSONColumnBuilder cb("test"_sd); + std::vector<BSONElement> elems = {createElementObj(BSON("x" << 1 << "y" << 2)), createElementObj(BSON("x" << 1 << "y" << 3)), createElementObj(BSON("x" << 1 << "y" << 3)), @@ -2293,6 +2296,13 @@ TEST_F(BSONColumnTest, Interleaved) { createElementObj(BSON("y" << 4)), createElementObj(BSON("x" << 1 << "y" << 3))}; + for (auto elem : elems) { + if (!elem.eoo()) + cb.append(elem); + else + cb.skip(); + } + BufBuilder expected; appendElementCount(expected, elems.size()); appendInterleavedStart(expected, elems.front().Obj()); @@ -2317,19 +2327,26 @@ TEST_F(BSONColumnTest, Interleaved) { appendEOO(expected); appendEOO(expected); - BSONBinData data; - data.data = expected.buf(); - data.length = expected.len(); - data.type = BinDataType::Column; - verifyDecompression(data, elems); + auto binData = cb.finalize(); + verifyBinary(binData, expected); + verifyDecompression(binData, elems); } TEST_F(BSONColumnTest, InterleavedAfterNonInterleaved) { + BSONColumnBuilder cb("test"_sd); + std::vector<BSONElement> elems = {createElementInt32(1), createElementObj(BSON("x" << 1 << "y" << 2)), createElementObj(BSON("x" << 1 << "y" << 3)), createElementObj(BSON("x" << 2 << "y" << 4))}; + for (auto elem : elems) { + if (!elem.eoo()) + cb.append(elem); + else + cb.skip(); + } + BufBuilder expected; appendElementCount(expected, elems.size()); appendLiteral(expected, elems.front()); @@ -2349,18 +2366,25 @@ TEST_F(BSONColumnTest, InterleavedAfterNonInterleaved) { appendEOO(expected); appendEOO(expected); - BSONBinData data; - data.data = expected.buf(); - data.length = expected.len(); - data.type = BinDataType::Column; - verifyDecompression(data, elems); + auto binData = cb.finalize(); + verifyBinary(binData, expected); + verifyDecompression(binData, elems); } TEST_F(BSONColumnTest, InterleavedLevels) { + BSONColumnBuilder cb("test"_sd); + std::vector<BSONElement> elems = {createElementObj(BSON("root" << BSON("x" << 1) << "y" << 2)), createElementObj(BSON("root" << BSON("x" << 2) << "y" << 5)), createElementObj(BSON("root" << BSON("x" << 2) << "y" << 5))}; + for (auto elem : elems) { + if (!elem.eoo()) + cb.append(elem); + else + cb.skip(); + } + BufBuilder expected; appendElementCount(expected, elems.size()); appendInterleavedStart(expected, elems.front().Obj()); @@ -2381,20 +2405,27 @@ TEST_F(BSONColumnTest, InterleavedLevels) { appendEOO(expected); appendEOO(expected); - BSONBinData data; - data.data = expected.buf(); - data.length = expected.len(); - data.type = BinDataType::Column; - verifyDecompression(data, elems); + auto binData = cb.finalize(); + verifyBinary(binData, expected); + verifyDecompression(binData, elems); } TEST_F(BSONColumnTest, InterleavedDoubleDifferentScale) { + BSONColumnBuilder cb("test"_sd); + std::vector<BSONElement> elems = {createElementObj(BSON("x" << 1.0 << "y" << 2.0)), createElementObj(BSON("x" << 1.1 << "y" << 3.0)), createElementObj(BSON("x" << 1.2 << "y" << 2.0)), createElementObj(BSON("x" << 1.0)), createElementObj(BSON("x" << 1.5 << "y" << 2.0))}; + for (auto elem : elems) { + if (!elem.eoo()) + cb.append(elem); + else + cb.skip(); + } + BufBuilder expected; appendElementCount(expected, elems.size()); appendInterleavedStart(expected, elems.front().Obj()); @@ -2417,14 +2448,14 @@ TEST_F(BSONColumnTest, InterleavedDoubleDifferentScale) { appendEOO(expected); appendEOO(expected); - BSONBinData data; - data.data = expected.buf(); - data.length = expected.len(); - data.type = BinDataType::Column; - verifyDecompression(data, elems); + auto binData = cb.finalize(); + verifyBinary(binData, expected); + verifyDecompression(binData, elems); } TEST_F(BSONColumnTest, InterleavedMix64And128Bit) { + BSONColumnBuilder cb("test"_sd); + std::vector<BSONElement> elems = {createElementObj(BSON("x" << 1 << "y" << "count0")), createElementObj(BSON("x" << 2 << "y" @@ -2435,6 +2466,13 @@ TEST_F(BSONColumnTest, InterleavedMix64And128Bit) { createElementObj(BSON("x" << 5 << "y" << "count3"))}; + for (auto elem : elems) { + if (!elem.eoo()) + cb.append(elem); + else + cb.skip(); + } + BufBuilder expected; appendElementCount(expected, elems.size()); appendInterleavedStart(expected, elems.front().Obj()); @@ -2457,19 +2495,26 @@ TEST_F(BSONColumnTest, InterleavedMix64And128Bit) { appendEOO(expected); appendEOO(expected); - BSONBinData data; - data.data = expected.buf(); - data.length = expected.len(); - data.type = BinDataType::Column; - verifyDecompression(data, elems); + auto binData = cb.finalize(); + verifyBinary(binData, expected); + verifyDecompression(binData, elems); } TEST_F(BSONColumnTest, InterleavedWithEmptySubObj) { + BSONColumnBuilder cb("test"_sd); + std::vector<BSONElement> elems = { createElementObj(BSON("x" << 1 << "y" << BSONObjBuilder().obj())), createElementObj(BSON("x" << 2 << "y" << BSONObjBuilder().obj())), createElementObj(BSON("x" << 3 << "y" << BSONObjBuilder().obj()))}; + for (auto elem : elems) { + if (!elem.eoo()) + cb.append(elem); + else + cb.skip(); + } + BufBuilder expected; appendElementCount(expected, elems.size()); appendInterleavedStart(expected, elems.front().Obj()); @@ -2482,19 +2527,96 @@ TEST_F(BSONColumnTest, InterleavedWithEmptySubObj) { appendEOO(expected); appendEOO(expected); - BSONBinData data; - data.data = expected.buf(); - data.length = expected.len(); - data.type = BinDataType::Column; - verifyDecompression(data, elems); + auto binData = cb.finalize(); + verifyBinary(binData, expected); + verifyDecompression(binData, elems); +} + +TEST_F(BSONColumnTest, InterleavedRemoveEmptySubObj) { + BSONColumnBuilder cb("test"_sd); + + std::vector<BSONElement> elems = { + createElementObj(BSON("x" << 1 << "y" << BSONObjBuilder().obj())), + createElementObj(BSON("x" << 2 << "y" << BSONObjBuilder().obj())), + createElementObj(BSON("x" << 3))}; + + for (auto elem : elems) { + if (!elem.eoo()) + cb.append(elem); + else + cb.skip(); + } + + BufBuilder expected; + appendElementCount(expected, elems.size()); + appendInterleavedStart(expected, elems.front().Obj()); + appendSimple8bControl(expected, 0b1000, 0b0000); + appendSimple8bBlocks64( + expected, + {kDeltaForBinaryEqualValues, deltaInt32(elems[1].Obj()["x"_sd], elems[0].Obj()["x"_sd])}, + 1); + appendEOO(expected); + + appendInterleavedStart(expected, elems[2].Obj()); + appendSimple8bControl(expected, 0b1000, 0b0000); + appendSimple8bBlocks64(expected, {kDeltaForBinaryEqualValues}, 1); + appendEOO(expected); + + appendEOO(expected); + + auto binData = cb.finalize(); + verifyBinary(binData, expected); + verifyDecompression(binData, elems); +} + +TEST_F(BSONColumnTest, InterleavedAddEmptySubObj) { + BSONColumnBuilder cb("test"_sd); + + std::vector<BSONElement> elems = { + createElementObj(BSON("x" << 3)), + createElementObj(BSON("x" << 1 << "y" << BSONObjBuilder().obj()))}; + + for (auto elem : elems) { + if (!elem.eoo()) + cb.append(elem); + else + cb.skip(); + } + + BufBuilder expected; + appendElementCount(expected, elems.size()); + appendInterleavedStart(expected, elems.front().Obj()); + appendSimple8bControl(expected, 0b1000, 0b0000); + appendSimple8bBlocks64(expected, {kDeltaForBinaryEqualValues}, 1); + appendEOO(expected); + + appendInterleavedStart(expected, elems[1].Obj()); + appendSimple8bControl(expected, 0b1000, 0b0000); + appendSimple8bBlocks64(expected, {kDeltaForBinaryEqualValues}, 1); + appendEOO(expected); + + appendEOO(expected); + + auto binData = cb.finalize(); + verifyBinary(binData, expected); + verifyDecompression(binData, elems); } TEST_F(BSONColumnTest, InterleavedSchemaChange) { + BSONColumnBuilder cb("test"_sd); + std::vector<BSONElement> elems = {createElementObj(BSON("x" << 1 << "y" << 2)), createElementObj(BSON("x" << 1 << "y" << 3)), createElementObj(BSON("x" << 1 << "y" << 3.0)), createElementObj(BSON("x" << 1 << "y" << 4.0))}; + for (auto elem : elems) { + if (!elem.eoo()) + cb.append(elem); + else + cb.skip(); + } + BufBuilder expected; appendElementCount(expected, elems.size()); appendInterleavedStart(expected, elems.front().Obj()); @@ -2517,20 +2639,147 @@ TEST_F(BSONColumnTest, InterleavedSchemaChange) { appendEOO(expected); appendEOO(expected); - BSONBinData data; - data.data = expected.buf(); - data.length = expected.len(); - data.type = BinDataType::Column; - verifyDecompression(data, elems); + auto binData = cb.finalize(); + verifyBinary(binData, expected); + verifyDecompression(binData, elems); +} + +TEST_F(BSONColumnTest, InterleavedObjectSchemaChange) { + BSONColumnBuilder cb("test"_sd); + + std::vector<BSONElement> elems = {createElementObj(BSON("x" << 1 << "y" << BSON("z" << 2))), + createElementObj(BSON("x" << 1 << "y" << BSON("z" << 3))), + createElementObj(BSON("x" << 1 << "y" << 3))}; + + for (auto elem : elems) { + if (!elem.eoo()) + cb.append(elem); + else + cb.skip(); + } + + BufBuilder expected; + appendElementCount(expected, elems.size()); + appendInterleavedStart(expected, elems.front().Obj()); + appendSimple8bControl(expected, 0b1000, 0b0000); + appendSimple8bBlocks64( + expected, + {kDeltaForBinaryEqualValues, deltaInt32(elems[1].Obj()["x"_sd], elems[0].Obj()["x"_sd])}, + 1); + appendSimple8bControl(expected, 0b1000, 0b0000); + appendSimple8bBlocks64( + expected, + {kDeltaForBinaryEqualValues, + deltaInt32(elems[1].Obj()["y"_sd].Obj()["z"_sd], elems[0].Obj()["y"_sd].Obj()["z"_sd])}, + 1); + appendEOO(expected); + + appendInterleavedStart(expected, elems[2].Obj()); + appendSimple8bControl(expected, 0b1000, 0b0000); + appendSimple8bBlocks64(expected, {kDeltaForBinaryEqualValues}, 1); + appendSimple8bControl(expected, 0b1000, 0b0000); + appendSimple8bBlocks64(expected, {kDeltaForBinaryEqualValues}, 1); + appendEOO(expected); + appendEOO(expected); + + auto binData = cb.finalize(); + verifyBinary(binData, expected); + verifyDecompression(binData, elems); +} + +TEST_F(BSONColumnTest, InterleavedObjectNameChange) { + BSONColumnBuilder cb("test"_sd); + + std::vector<BSONElement> elems = {createElementObj(BSON("x" << 1 << "y" << BSON("z" << 2))), + createElementObj(BSON("x" << 1 << "y2" << BSON("z" << 3)))}; + + for (auto elem : elems) { + if (!elem.eoo()) + cb.append(elem); + else + cb.skip(); + } + + BufBuilder expected; + appendElementCount(expected, elems.size()); + appendInterleavedStart(expected, + BSON("x" << 1 << "y" << BSON("z" << 2) << "y2" << BSON("z" << 3))); + appendSimple8bControl(expected, 0b1000, 0b0000); + appendSimple8bBlocks64( + expected, + {kDeltaForBinaryEqualValues, deltaInt32(elems[1].Obj()["x"_sd], elems[0].Obj()["x"_sd])}, + 1); + appendSimple8bControl(expected, 0b1000, 0b0000); + appendSimple8bBlocks64(expected, {kDeltaForBinaryEqualValues, boost::none}, 1); + appendSimple8bControl(expected, 0b1000, 0b0000); + appendSimple8bBlocks64(expected, {boost::none, kDeltaForBinaryEqualValues}, 1); + appendEOO(expected); + + appendEOO(expected); + + auto binData = cb.finalize(); + verifyBinary(binData, expected); + verifyDecompression(binData, elems); +} + +TEST_F(BSONColumnTest, InterleavedObjectEmptyObjChange) { + BSONColumnBuilder cb("test"_sd); + + std::vector<BSONElement> elems = { + createElementObj(BSON("x" << 1 << "y" << BSON("z" << 2))), + createElementObj(BSON("x" << 1 << "y" << BSON("z" << 3))), + createElementObj(BSON("x" << 1 << "y" << BSONObjBuilder().obj()))}; + + for (auto elem : elems) { + if (!elem.eoo()) + cb.append(elem); + else + cb.skip(); + } + + BufBuilder expected; + appendElementCount(expected, elems.size()); + appendInterleavedStart(expected, elems.front().Obj()); + appendSimple8bControl(expected, 0b1000, 0b0000); + appendSimple8bBlocks64( + expected, + {kDeltaForBinaryEqualValues, deltaInt32(elems[1].Obj()["x"_sd], elems[0].Obj()["x"_sd])}, + 1); + appendSimple8bControl(expected, 0b1000, 0b0000); + appendSimple8bBlocks64( + expected, + {kDeltaForBinaryEqualValues, + deltaInt32(elems[1].Obj()["y"_sd].Obj()["z"_sd], elems[0].Obj()["y"_sd].Obj()["z"_sd])}, + 1); + appendEOO(expected); + + appendInterleavedStart(expected, elems[2].Obj()); + appendSimple8bControl(expected, 0b1000, 0b0000); + appendSimple8bBlocks64(expected, {kDeltaForBinaryEqualValues}, 1); + appendEOO(expected); + appendEOO(expected); + + auto binData = cb.finalize(); + verifyBinary(binData, expected); + verifyDecompression(binData, elems); } TEST_F(BSONColumnTest, ReenterInterleaved) { + BSONColumnBuilder cb("test"_sd); + std::vector<BSONElement> elems = {createElementObj(BSON("x" << 1 << "y" << 2)), createElementObj(BSON("x" << 1 << "y" << 3)), createElementInt32(1), createElementObj(BSON("x" << 2 << "y" << 2 << "z" << 2)), createElementObj(BSON("x" << 5 << "y" << 3 << "z" << 3))}; + for (auto elem : elems) { + if (!elem.eoo()) + cb.append(elem); + else + cb.skip(); + } + BufBuilder expected; appendElementCount(expected, elems.size()); appendInterleavedStart(expected, elems.front().Obj()); @@ -2539,7 +2788,7 @@ TEST_F(BSONColumnTest, ReenterInterleaved) { expected, {kDeltaForBinaryEqualValues, deltaInt32(elems[1].Obj()["x"_sd], elems[0].Obj()["x"_sd])}, 1); - appendSimple8bControl(expected, 0b1001, 0b0000); + appendSimple8bControl(expected, 0b1000, 0b0000); appendSimple8bBlocks64( expected, {kDeltaForBinaryEqualValues, deltaInt32(elems[1].Obj()["y"_sd], elems[0].Obj()["y"_sd])}, @@ -2552,12 +2801,12 @@ TEST_F(BSONColumnTest, ReenterInterleaved) { expected, {kDeltaForBinaryEqualValues, deltaInt32(elems[4].Obj()["x"_sd], elems[3].Obj()["x"_sd])}, 1); - appendSimple8bControl(expected, 0b1001, 0b0000); + appendSimple8bControl(expected, 0b1000, 0b0000); appendSimple8bBlocks64( expected, {kDeltaForBinaryEqualValues, deltaInt32(elems[4].Obj()["y"_sd], elems[3].Obj()["y"_sd])}, 1); - appendSimple8bControl(expected, 0b1001, 0b0000); + appendSimple8bControl(expected, 0b1000, 0b0000); appendSimple8bBlocks64( expected, {kDeltaForBinaryEqualValues, deltaInt32(elems[4].Obj()["z"_sd], elems[3].Obj()["z"_sd])}, @@ -2565,14 +2814,14 @@ TEST_F(BSONColumnTest, ReenterInterleaved) { appendEOO(expected); appendEOO(expected); - BSONBinData data; - data.data = expected.buf(); - data.length = expected.len(); - data.type = BinDataType::Column; - verifyDecompression(data, elems); + auto binData = cb.finalize(); + verifyBinary(binData, expected); + verifyDecompression(binData, elems); } -TEST_F(BSONColumnTest, InterleavedAlternating) { +TEST_F(BSONColumnTest, InterleavedAlternatingMergeRight) { + BSONColumnBuilder cb("test"_sd); + std::vector<BSONElement> elems = {createElementObj(BSON("x" << 1)), createElementObj(BSON("y" << 2)), createElementObj(BSON("z" << 3)), @@ -2580,6 +2829,13 @@ TEST_F(BSONColumnTest, InterleavedAlternating) { createElementObj(BSON("y" << 3)), createElementObj(BSON("z" << 4))}; + for (auto elem : elems) { + if (!elem.eoo()) + cb.append(elem); + else + cb.skip(); + } + BufBuilder expected; appendElementCount(expected, elems.size()); appendInterleavedStart(expected, @@ -2616,11 +2872,184 @@ TEST_F(BSONColumnTest, InterleavedAlternating) { appendEOO(expected); appendEOO(expected); - BSONBinData data; - data.data = expected.buf(); - data.length = expected.len(); - data.type = BinDataType::Column; - verifyDecompression(data, elems); + auto binData = cb.finalize(); + verifyBinary(binData, expected); + verifyDecompression(binData, elems); +} + +TEST_F(BSONColumnTest, InterleavedAlternatingMergeLeftThenRight) { + BSONColumnBuilder cb("test"_sd); + + std::vector<BSONElement> elems = {createElementObj(BSON("z" << 1)), + createElementObj(BSON("y" << 2 << "z" << 2)), + createElementObj(BSON("x" << 3 << "z" << 3))}; + + for (auto elem : elems) { + if (!elem.eoo()) + cb.append(elem); + else + cb.skip(); + } + + BufBuilder expected; + appendElementCount(expected, elems.size()); + appendInterleavedStart(expected, + BSON("y" << elems[1].Obj().firstElement().Int() << "x" + << elems[2].Obj().firstElement().Int() << "z" + << elems[0].Obj().firstElement().Int())); + appendSimple8bControl(expected, 0b1000, 0b0000); + appendSimple8bBlocks64(expected, {boost::none, kDeltaForBinaryEqualValues, boost::none}, 1); + appendSimple8bControl(expected, 0b1000, 0b0000); + appendSimple8bBlocks64(expected, {boost::none, boost::none, kDeltaForBinaryEqualValues}, 1); + appendSimple8bControl(expected, 0b1000, 0b0000); + appendSimple8bBlocks64(expected, + {kDeltaForBinaryEqualValues, + deltaInt32(elems[1].Obj()["z"_sd], elems[0].Obj()["z"_sd]), + deltaInt32(elems[2].Obj()["z"_sd], elems[1].Obj()["z"_sd])}, + 1); + appendEOO(expected); + appendEOO(expected); + + auto binData = cb.finalize(); + verifyBinary(binData, expected); + verifyDecompression(binData, elems); +} + +TEST_F(BSONColumnTest, InterleavedIncompatibleMerge) { + BSONColumnBuilder cb("test"_sd); + + std::vector<BSONElement> elems = {createElementObj(BSON("x" << 1)), + createElementObj(BSON("x" << 2 << "y" << 2)), + createElementObj(BSON("y" << 3 << "x" << 3))}; + + for (auto elem : elems) { + if (!elem.eoo()) + cb.append(elem); + else + cb.skip(); + } + + BufBuilder expected; + appendElementCount(expected, elems.size()); + appendInterleavedStart( + expected, + BSON("x" << elems[0].Obj().firstElement().Int() << "y" << elems[1].Obj()["y"_sd].Int())); + appendSimple8bControl(expected, 0b1000, 0b0000); + appendSimple8bBlocks64( + expected, + {kDeltaForBinaryEqualValues, deltaInt32(elems[1].Obj()["x"_sd], elems[0].Obj()["x"_sd])}, + 1); + appendSimple8bControl(expected, 0b1000, 0b0000); + appendSimple8bBlocks64(expected, {boost::none, kDeltaForBinaryEqualValues}, 1); + appendEOO(expected); + appendInterleavedStart(expected, elems[2].Obj()); + appendSimple8bControl(expected, 0b1000, 0b0000); + appendSimple8bBlocks64(expected, {kDeltaForBinaryEqualValues}, 1); + appendSimple8bControl(expected, 0b1000, 0b0000); + appendSimple8bBlocks64(expected, {kDeltaForBinaryEqualValues}, 1); + appendEOO(expected); + appendEOO(expected); + + auto binData = cb.finalize(); + verifyBinary(binData, expected); + verifyDecompression(binData, elems); +} + +TEST_F(BSONColumnTest, InterleavedIncompatibleAfterDeterminedReference) { + BSONColumnBuilder cb("test"_sd); + + std::vector<BSONElement> elems = {createElementObj(BSON("x" << 1)), + createElementObj(BSON("x" << 2)), + createElementObj(BSON("x" << 3)), + createElementObj(BSON("x" << 4)), + createElementObj(BSON("x" << 5)), + createElementObj(BSON("x" << 6)), + createElementObj(BSON("x" << 0 << "y" << 0))}; + + for (auto elem : elems) { + if (!elem.eoo()) + cb.append(elem); + else + cb.skip(); + } + + BufBuilder expected; + appendElementCount(expected, elems.size()); + appendInterleavedStart(expected, elems.front().Obj()); + appendSimple8bControl(expected, 0b1000, 0b0000); + appendSimple8bBlocks64(expected, + {kDeltaForBinaryEqualValues, + deltaInt32(elems[1].Obj()["x"_sd], elems[0].Obj()["x"_sd]), + deltaInt32(elems[2].Obj()["x"_sd], elems[1].Obj()["x"_sd]), + deltaInt32(elems[3].Obj()["x"_sd], elems[2].Obj()["x"_sd]), + deltaInt32(elems[4].Obj()["x"_sd], elems[3].Obj()["x"_sd]), + deltaInt32(elems[5].Obj()["x"_sd], elems[4].Obj()["x"_sd])}, + 1); + appendEOO(expected); + appendInterleavedStart(expected, elems[6].Obj()); + appendSimple8bControl(expected, 0b1000, 0b0000); + appendSimple8bBlocks64(expected, {kDeltaForBinaryEqualValues}, 1); + appendSimple8bControl(expected, 0b1000, 0b0000); + appendSimple8bBlocks64(expected, {kDeltaForBinaryEqualValues}, 1); + appendEOO(expected); + + appendEOO(expected); + + auto binData = cb.finalize(); + verifyBinary(binData, expected); + verifyDecompression(binData, elems); +} + +TEST_F(BSONColumnTest, ObjectEmpty) { + BSONColumnBuilder cb("test"_sd); + + std::vector<BSONElement> elems = {createElementObj(BSONObjBuilder().obj()), + createElementObj(BSONObjBuilder().obj())}; + + for (auto elem : elems) { + if (!elem.eoo()) + cb.append(elem); + else + cb.skip(); + } + + BufBuilder expected; + appendElementCount(expected, elems.size()); + appendLiteral(expected, elems.front()); + appendSimple8bControl(expected, 0b1000, 0b0000); + appendSimple8bBlock64(expected, kDeltaForBinaryEqualValues); + appendEOO(expected); + + auto binData = cb.finalize(); + verifyBinary(binData, expected); + verifyDecompression(binData, elems); +} + +TEST_F(BSONColumnTest, ObjectEmptyAfterNonEmpty) { + BSONColumnBuilder cb("test"_sd); + + std::vector<BSONElement> elems = {createElementObj(BSON("x" << 1)), + createElementObj(BSONObjBuilder().obj())}; + + for (auto elem : elems) { + if (!elem.eoo()) + cb.append(elem); + else + cb.skip(); + } + + BufBuilder expected; + appendElementCount(expected, elems.size()); + appendInterleavedStart(expected, elems.front().Obj()); + appendSimple8bControl(expected, 0b1000, 0b0000); + appendSimple8bBlock64(expected, kDeltaForBinaryEqualValues); + appendEOO(expected); + appendLiteral(expected, elems[1]); + appendEOO(expected); + + auto binData = cb.finalize(); + verifyBinary(binData, expected); + verifyDecompression(binData, elems); } TEST_F(BSONColumnTest, InvalidControlByte) { @@ -2775,6 +3204,59 @@ TEST_F(BSONColumnTest, AppendMaxKey) { verifyBinary(cb.finalize(), expected); } +TEST_F(BSONColumnTest, AppendMinKeyInSubObj) { + BSONColumnBuilder cb("test"); + + BSONObjBuilder obj; + { + BSONObjBuilder builder = obj.subobjStart("root"); + builder.append(createElementMinKey()); + } + + ASSERT_THROWS_CODE( + cb.append(createElementObj(obj.obj())), DBException, ErrorCodes::InvalidBSONType); + + BufBuilder expected; + appendElementCount(expected, 0); + appendEOO(expected); + + verifyBinary(cb.finalize(), expected); +} + +TEST_F(BSONColumnTest, AppendMinKeyInSubObjAfterInterleaveStart) { + BSONColumnBuilder cb("test"); + + BSONObjBuilder obj; + { + BSONObjBuilder builder = obj.subobjStart("root"); + builder.append(createElementMinKey()); + } + + cb.append(createElementObj(BSON("root" << BSON("0" << 1)))); + ASSERT_THROWS_CODE( + cb.append(createElementObj(obj.obj())), DBException, ErrorCodes::InvalidBSONType); +} + +TEST_F(BSONColumnTest, AppendMinKeyInSubObjAfterInterleaveStartInAppendMode) { + BSONColumnBuilder cb("test"); + + BSONObjBuilder obj; + { + BSONObjBuilder builder = obj.subobjStart("root"); + builder.append(createElementMinKey()); + } + + cb.append(createElementObj(BSON("root" << BSON("0" << 1)))); + cb.append(createElementObj(BSON("root" << BSON("0" << 1)))); + cb.append(createElementObj(BSON("root" << BSON("0" << 1)))); + cb.append(createElementObj(BSON("root" << BSON("0" << 1)))); + cb.append(createElementObj(BSON("root" << BSON("0" << 1)))); + cb.append(createElementObj(BSON("root" << BSON("0" << 1)))); + cb.append(createElementObj(BSON("root" << BSON("0" << 1)))); + ASSERT_THROWS_CODE( + cb.append(createElementObj(obj.obj())), DBException, ErrorCodes::InvalidBSONType); +} + } // namespace } // namespace mongo diff --git a/src/mongo/bson/util/bsoncolumn_util.h b/src/mongo/bson/util/bsoncolumn_util.h index a70a0ef0840..9ec93bf98e1 100644 --- a/src/mongo/bson/util/bsoncolumn_util.h +++ b/src/mongo/bson/util/bsoncolumn_util.h @@ -35,6 +35,15 @@ namespace mongo::bsoncolumn { // Number of bytes for element count at the beginning of BSON Column binary static constexpr uint8_t kElementCountBytes = 4; +static constexpr char kInterleavedStartControlByte = (char)0xF0; + +inline bool isLiteralControlByte(char control) { + return (control & 0xE0) == 0; +} + +inline uint8_t numSimple8bBlocksForControlByte(char control) { + return (control & 0x0F) + 1; +} bool usesDeltaOfDelta(BSONType type); bool uses128bit(BSONType type); diff --git a/src/mongo/bson/util/bsoncolumnbuilder.cpp b/src/mongo/bson/util/bsoncolumnbuilder.cpp index c7dee5223cb..58e1f2ef9e8 100644 --- a/src/mongo/bson/util/bsoncolumnbuilder.cpp +++ b/src/mongo/bson/util/bsoncolumnbuilder.cpp @@ -28,6 +28,8 @@ */ #include "mongo/bson/util/bsoncolumnbuilder.h" + +#include "mongo/bson/bsonobjbuilder.h" #include "mongo/bson/util/bsoncolumn_util.h" #include "mongo/bson/util/simple8b_type_util.h" @@ -41,6 +43,7 @@ namespace { static constexpr uint8_t kMaxCount = 16; static constexpr uint8_t kCountMask = 0x0F; static constexpr uint8_t kControlMask = 0xF0; +static constexpr std::ptrdiff_t kNoSimple8bControl = -1; static constexpr std::array<uint8_t, Simple8bTypeUtil::kMemoryAsInteger + 1> kControlByteForScaleIndex = {0x90, 0xA0, 0xB0, 0xC0, 0xD0, 0x80}; @@ -57,38 +60,362 @@ std::pair<int64_t, uint8_t> scaleAndEncodeDouble(double value, uint8_t minScaleI return {*encoded, minScaleIndex - 1}; } +// Checks if it is possible to do delta of ObjectIds +bool objectIdDeltaPossible(BSONElement elem, BSONElement prev) { + return !memcmp(prev.OID().getInstanceUnique().bytes, + elem.OID().getInstanceUnique().bytes, + OID::kInstanceUniqueSize); +} + +// Internal recursion function for traverseLockStep() when we just need to traverse reference +// object. +template <typename ElementFunc> +void _traverse(const BSONObj& reference, const ElementFunc& elemFunc) { + for (const auto& elem : reference) { + if (elem.type() == Object) { + _traverse(elem.Obj(), elemFunc); + } else { + elemFunc(elem, BSONElement()); + } + } +} + +// Internal recursion function for traverseLockStep(). See documentation for traverseLockStep. +template <typename ElementFunc> +std::pair<BSONObj::iterator, bool> _traverseLockStep(const BSONObj& reference, + const BSONObj& obj, + const ElementFunc& elemFunc) { + auto it = obj.begin(); + auto end = obj.end(); + for (const auto& elem : reference) { + if (elem.type() == Object) { + BSONObj refObj = elem.Obj(); + bool hasIt = it != end; + // If refObj is empty, there must also exist an empty object on 'it' for this to be + // valid. First we check that we have something on 'it' + if (!hasIt && refObj.isEmpty()) { + return {it, false}; + } + + bool elemMatch = hasIt && elem.fieldNameStringData() == it->fieldNameStringData(); + if (elemMatch) { + // If 'reference' element is Object then 'obj' must also be Object. + if (it->type() != Object) { + return {it, false}; + } + + // Differences in empty objects are not allowed. + if (refObj.isEmpty() != it->Obj().isEmpty()) { + return {it, false}; + } + + // Everything match, recurse deeper. + auto [_, compatible] = _traverseLockStep(refObj, (it++)->Obj(), elemFunc); + if (!compatible) { + return {it, false}; + } + } else { + // Assume field name at 'it' is coming later in 'reference'. Traverse as if it is + // missing from 'obj'. We don't increment the iterator in this case. If it is a + // mismatch we will detect that at end when 'it' is not at 'end'. + // Nothing can fail below this so traverse without all the checks. + _traverse(refObj, elemFunc); + } + } else { + // Non-object, call provided function with the two elements + elemFunc(elem, + it != end && elem.fieldNameStringData() == it->fieldNameStringData() + ? *(it++) + : BSONElement()); + } + } + // Extra elements in 'obj' are not allowed. These needs to be merged in to 'reference' to be + // able to compress. + return {it, it == end}; +} + +// Traverses and validates BSONObj's in reference and obj in lock-step. Returns true if the object +// hierarchies are compatible for sub-object compression. To be compatible fields in 'obj' must be +// in the same order as in 'reference' and sub-objects in 'reference' must be sub-objects in 'obj'. +// The only difference between the two objects that is allowed is missing fields in 'obj' compared +// to 'reference'. 'ElementFunc' is called for every matching pair of BSONElement. Function +// signature should be void(const BSONElement&, const BSONElement&). +template <typename ElementFunc> +bool traverseLockStep(const BSONObj& reference, const BSONObj& obj, ElementFunc elemFunc) { + auto [it, hierachyMatch] = _traverseLockStep(reference, obj, elemFunc); + // Extra elements in 'obj' are not allowed. These needs to be merged in to 'reference' to be + // able to compress. + return hierachyMatch && it == obj.end(); +} + +// Internal recursion function for mergeObj(). See documentation for mergeObj. Returns true if merge +// was successful. +bool _mergeObj(BSONObjBuilder* builder, const BSONObj& reference, const BSONObj& obj) { + auto refIt = reference.begin(); + auto refEnd = reference.end(); + auto it = obj.begin(); + auto end = obj.end(); + + // Iterate until we reach end of any of the two objects. + while (refIt != refEnd && it != end) { + StringData name = refIt->fieldNameStringData(); + if (name == it->fieldNameStringData()) { + bool refIsObj = refIt->type() == Object; + bool itIsObj = it->type() == Object; + + if (refIsObj && itIsObj) { + BSONObj refObj = refIt->Obj(); + BSONObj itObj = it->Obj(); + // There may not be a mismatch in empty objects + if (refObj.isEmpty() != itObj.isEmpty()) + return false; + + // Recurse deeper + BSONObjBuilder subBuilder = builder->subobjStart(name); + bool res = _mergeObj(&subBuilder, refObj, itObj); + if (!res) { + return false; + } + } else if (refIsObj || itIsObj) { + // Both or neither elements must be Object to be mergable + return false; + } else { + // If name match and neither is Object we can append from reference and increment + // both objects. + builder->append(*refIt); + } + + ++refIt; + ++it; + continue; + } + + // Name mismatch, first search in 'obj' if reference element exist later. + auto n = std::next(it); + auto namePos = std::find_if( + n, end, [&name](const auto& elem) { return elem.fieldNameStringData() == name; }); + if (namePos == end) { + // Reference element does not exist in 'obj' so add it and continue merging with just + // this iterator incremented. + builder->append(*(refIt++)); + } else { + // Reference element do exist later in 'obj'. Add element in 'it' if it is the first + // time we see it, fail otherwise (incompatible ordering). + if (builder->hasField(it->fieldNameStringData())) { + return false; + } + builder->append(*(it++)); + } + } + + // Add remaining reference elements when we reached end in 'obj'. + for (; refIt != refEnd; ++refIt) { + // We cannot allow empty object mismatch + if (refIt->type() == Object && refIt->Obj().isEmpty()) { + return false; + } + if (builder->hasField(refIt->fieldNameStringData())) { + return false; + } + builder->append(*refIt); + } + + // Add remaining 'obj' elements when we reached end in 'reference'. + for (; it != end; ++it) { + // We cannot allow empty object mismatch + if (it->type() == Object && it->Obj().isEmpty()) { + return false; + } + + if (builder->hasField(it->fieldNameStringData())) { + return false; + } + builder->append(*it); + } + + return true; +} + +// Tries to merge in elements from 'obj' into 'reference'. For successful merge the elements that +// already exist in 'reference' must be in 'obj' in the same order. The merged object is returned in +// case of a successful merge, empty BSONObj is returned for failure. This is quite an expensive +// operation as we are merging unsorted objects. Time complexity is O(N^2). +BSONObj mergeObj(const BSONObj& reference, const BSONObj& obj) { + BSONObjBuilder builder; + if (!_mergeObj(&builder, reference, obj)) { + builder.abandon(); + return BSONObj(); + } + + return builder.obj(); +} + } // namespace BSONColumnBuilder::BSONColumnBuilder(StringData fieldName) : BSONColumnBuilder(fieldName, BufBuilder()) {} BSONColumnBuilder::BSONColumnBuilder(StringData fieldName, BufBuilder&& builder) - : _simple8bBuilder64(_createBufferWriter()), - _simple8bBuilder128(_createBufferWriter()), - _scaleIndex(Simple8bTypeUtil::kMemoryAsInteger), - _bufBuilder(std::move(builder)), - _fieldName(fieldName) { + : _state(&_bufBuilder, nullptr), _bufBuilder(std::move(builder)), _fieldName(fieldName) { // Leave space for element count at the beginning static_assert(sizeof(_elementCount) == kElementCountBytes, "Element count for BSONColumn should be 4 bytes"); _bufBuilder.reset(); _bufBuilder.skip(kElementCountBytes); - // Store EOO type with empty field name as previous. - _storePrevious(BSONElement()); -} - -BSONElement BSONColumnBuilder::_previous() const { - return {_prev.get(), 1, _prevSize, BSONElement::CachedSizeTag{}}; } BSONColumnBuilder& BSONColumnBuilder::append(BSONElement elem) { auto type = elem.type(); uassert(ErrorCodes::InvalidBSONType, - "MinKey or MaxKey is not supported by BSON Column (subtype 7)", + "MinKey or MaxKey is not valid for storage", type != MinKey && type != MaxKey); - auto previous = _previous(); + if (type != Object || elem.Obj().isEmpty()) { + // Flush previous sub-object compression when non-object is appended + if (_mode != Mode::kRegular) { + _flushSubObjMode(); + } + _state.append(elem); + ++_elementCount; + return *this; + } + + + if (_mode == Mode::kRegular) { + _startDetermineSubObjReference(elem.Obj()); + ++_elementCount; + return *this; + } + + if (_mode == Mode::kSubObjDeterminingReference) { + auto obj = elem.Obj(); + + // We are in DeterminingReference mode, check if this current object is compatible and merge + // in any new fields that are discovered. + uint32_t numElements = 0; + if (!traverseLockStep( + _referenceSubObj, + obj, + [this, &numElements](const BSONElement& ref, const BSONElement& elem) { + ++numElements; + uassert(ErrorCodes::InvalidBSONType, + "MinKey or MaxKey is not valid for storage", + elem.type() != MinKey && elem.type() != MaxKey); + })) { + BSONObj merged = mergeObj(_referenceSubObj, obj); + if (merged.isEmptyPrototype()) { + // If merge failed, flush current sub-object compression and start over. + _flushSubObjMode(); + + _referenceSubObj = obj.getOwned(); + _bufferedObjElements.push_back(_referenceSubObj); + _mode = Mode::kSubObjDeterminingReference; + ++_elementCount; + return *this; + } + _referenceSubObj = merged; + } + + // If we've buffered twice as many objects as we have sub-elements we will achieve good + // compression so use the currently built reference. + if (numElements * 2 >= _bufferedObjElements.size()) { + _bufferedObjElements.push_back(obj.getOwned()); + ++_elementCount; + return *this; + } + + _finishDetermineSubObjReference(); + } + + // Reference already determined for sub-object compression, try to add this new object. + _appendSubElements(elem.Obj()); ++_elementCount; + return *this; +} + + +BSONColumnBuilder& BSONColumnBuilder::skip() { + ++_elementCount; + if (_mode == Mode::kRegular) { + _state.skip(); + } else if (_mode == Mode::kSubObjDeterminingReference) { + _bufferedObjElements.push_back(BSONObj()); + } else { + for (auto&& state : _subobjStates) { + state.skip(); + } + } + + return *this; +} + +BSONBinData BSONColumnBuilder::finalize() { + if (_mode == Mode::kRegular) { + _state.flush(); + } else { + _flushSubObjMode(); + } + + // Write EOO at the end + _bufBuilder.appendChar(EOO); + + // Write element count at the beginning + DataView(_bufBuilder.buf()).write<LittleEndian<uint32_t>>(_elementCount); + + return {_bufBuilder.buf(), _bufBuilder.len(), BinDataType::Column}; +} + +BufBuilder BSONColumnBuilder::detach() { + return std::move(_bufBuilder); +} + +BSONColumnBuilder::EncodingState::EncodingState( + BufBuilder* bufBuilder, std::function<void(const char*, size_t)> controlBlockWriter) + : _simple8bBuilder64(_createBufferWriter()), + _simple8bBuilder128(_createBufferWriter()), + _controlByteOffset(kNoSimple8bControl), + _scaleIndex(Simple8bTypeUtil::kMemoryAsInteger), + _bufBuilder(bufBuilder), + _controlBlockWriter(controlBlockWriter) { + // Store EOO type with empty field name as previous. + _storePrevious(BSONElement()); +} + +BSONColumnBuilder::EncodingState::EncodingState(EncodingState&& other) + : _prev(std::move(other._prev)), + _prevSize(std::move(other._prevSize)), + _prevCapacity(std::move(other._prevCapacity)), + _prevDelta(std::move(other._prevDelta)), + _simple8bBuilder64(_createBufferWriter()), + _simple8bBuilder128(_createBufferWriter()), + _storeWith128(std::move(other._storeWith128)), + _controlByteOffset(std::move(other._controlByteOffset)), + _prevEncoded64(std::move(other._prevEncoded64)), + _prevEncoded128(std::move(other._prevEncoded128)), + _lastValueInPrevBlock(std::move(other._lastValueInPrevBlock)), + _scaleIndex(std::move(other._scaleIndex)), + _bufBuilder(std::move(other._bufBuilder)), + _controlBlockWriter(std::move(other._controlBlockWriter)) {} + +BSONColumnBuilder::EncodingState& BSONColumnBuilder::EncodingState::operator=(EncodingState&& rhs) { + _prev = std::move(rhs._prev); + _prevSize = std::move(rhs._prevSize); + _prevCapacity = std::move(rhs._prevCapacity); + _prevDelta = std::move(rhs._prevDelta); + _storeWith128 = std::move(rhs._storeWith128); + _controlByteOffset = std::move(rhs._controlByteOffset); + _prevEncoded64 = std::move(rhs._prevEncoded64); + _prevEncoded128 = std::move(rhs._prevEncoded128); + _lastValueInPrevBlock = std::move(rhs._lastValueInPrevBlock); + _scaleIndex = std::move(rhs._scaleIndex); + _bufBuilder = std::move(rhs._bufBuilder); + _controlBlockWriter = std::move(rhs._controlBlockWriter); + return *this; +} + +void BSONColumnBuilder::EncodingState::append(BSONElement elem) { + auto type = elem.type(); + auto previous = _previous(); // If we detect a type change (or this is first value). Flush all pending values in Simple-8b // and write uncompressed literal. Reset all default values. @@ -96,9 +423,8 @@ BSONColumnBuilder& BSONColumnBuilder::append(BSONElement elem) { _storePrevious(elem); _simple8bBuilder128.flush(); _simple8bBuilder64.flush(); - _storeWith128 = uses128bit(type); _writeLiteralFromPrevious(); - return *this; + return; } // Store delta in Simple-8b if types match @@ -161,7 +487,7 @@ BSONColumnBuilder& BSONColumnBuilder::append(BSONElement elem) { value = calcDelta(elem._numberLong(), previous._numberLong()); break; case jstOID: { - encodingPossible = _objectIdDeltaPossible(elem, previous); + encodingPossible = objectIdDeltaPossible(elem, previous); if (!encodingPossible) break; @@ -212,11 +538,32 @@ BSONColumnBuilder& BSONColumnBuilder::append(BSONElement elem) { _simple8bBuilder64.flush(); _writeLiteralFromPrevious(); } +} - return *this; +void BSONColumnBuilder::EncodingState::skip() { + auto before = _bufBuilder->len(); + if (_storeWith128) { + _simple8bBuilder128.skip(); + } else { + _simple8bBuilder64.skip(); + } + // Rescale previous known value if this skip caused Simple-8b blocks to be written + if (before != _bufBuilder->len() && _previous().type() == NumberDouble) { + std::tie(_prevEncoded64, _scaleIndex) = scaleAndEncodeDouble(_lastValueInPrevBlock, 0); + } } -boost::optional<Simple8bBuilder<uint64_t>> BSONColumnBuilder::_tryRescalePending( +void BSONColumnBuilder::EncodingState::flush() { + _simple8bBuilder128.flush(); + _simple8bBuilder64.flush(); + + if (_controlByteOffset != kNoSimple8bControl && _controlBlockWriter) { + _controlBlockWriter(_bufBuilder->buf() + _controlByteOffset, + _bufBuilder->len() - _controlByteOffset); + } +} + +boost::optional<Simple8bBuilder<uint64_t>> BSONColumnBuilder::EncodingState::_tryRescalePending( int64_t encoded, uint8_t newScaleIndex) { // Encode last value in the previous block with old and new scale index. We know that scaling // with the old index is possible. @@ -275,7 +622,7 @@ boost::optional<Simple8bBuilder<uint64_t>> BSONColumnBuilder::_tryRescalePending return builder; } -bool BSONColumnBuilder::_appendDouble(double value, double previous) { +bool BSONColumnBuilder::EncodingState::_appendDouble(double value, double previous) { // Scale with lowest possible scale index auto [encoded, scaleIndex] = scaleAndEncodeDouble(value, _scaleIndex); @@ -295,7 +642,7 @@ bool BSONColumnBuilder::_appendDouble(double value, double previous) { // Re-scale not possible, flush and start new block with the higher scale factor _simple8bBuilder64.flush(); - _controlByteOffset = 0; + _controlByteOffset = kNoSimple8bControl; // Make sure value and previous are using the same scale factor. uint8_t prevScaleIndex; @@ -311,12 +658,12 @@ bool BSONColumnBuilder::_appendDouble(double value, double previous) { // Append delta and check if we wrote a Simple8b block. If we did we may be able to reduce the // scale factor when starting a new block - auto before = _bufBuilder.len(); + auto before = _bufBuilder->len(); if (!_simple8bBuilder64.append( Simple8bTypeUtil::encodeInt64(calcDelta(encoded, _prevEncoded64)))) return false; - if (_bufBuilder.len() != before) { + if (_bufBuilder->len() != before) { // Reset the scale factor to 0 and append all pending values to a new Simple8bBuilder. In // the worse case we will end up with an identical scale factor. auto prevScale = _scaleIndex; @@ -346,41 +693,12 @@ bool BSONColumnBuilder::_appendDouble(double value, double previous) { return true; } -BSONColumnBuilder& BSONColumnBuilder::skip() { - ++_elementCount; - - auto before = _bufBuilder.len(); - if (_storeWith128) { - _simple8bBuilder128.skip(); - } else { - _simple8bBuilder64.skip(); - } - // Rescale previous known value if this skip caused Simple-8b blocks to be written - if (before != _bufBuilder.len() && _previous().type() == NumberDouble) { - std::tie(_prevEncoded64, _scaleIndex) = scaleAndEncodeDouble(_lastValueInPrevBlock, 0); - } - return *this; -} - -BSONBinData BSONColumnBuilder::finalize() { - _simple8bBuilder128.flush(); - _simple8bBuilder64.flush(); - - // Write EOO at the end - _bufBuilder.appendChar(EOO); - - // Write element count at the beginning - DataView(_bufBuilder.buf()).write<LittleEndian<uint32_t>>(_elementCount); - - return {_bufBuilder.buf(), _bufBuilder.len(), BinDataType::Column}; -} - -BufBuilder BSONColumnBuilder::detach() { - return std::move(_bufBuilder); +BSONElement BSONColumnBuilder::EncodingState::_previous() const { + return {_prev.get(), 1, _prevSize, BSONElement::CachedSizeTag{}}; } -void BSONColumnBuilder::_storePrevious(BSONElement elem) { +void BSONColumnBuilder::EncodingState::_storePrevious(BSONElement elem) { auto valuesize = elem.valuesize(); // Add space for type byte and field name null terminator @@ -401,19 +719,33 @@ void BSONColumnBuilder::_storePrevious(BSONElement elem) { _prevSize = size; } -void BSONColumnBuilder::_writeLiteralFromPrevious() { +void BSONColumnBuilder::EncodingState::_writeLiteralFromPrevious() { // Write literal without field name and reset control byte to force new one to be written when // appending next value. - auto prevElem = _previous(); - _bufBuilder.appendBuf(_prev.get(), _prevSize); + if (_controlByteOffset != kNoSimple8bControl && _controlBlockWriter) { + _controlBlockWriter(_bufBuilder->buf() + _controlByteOffset, + _bufBuilder->len() - _controlByteOffset); + } + _bufBuilder->appendBuf(_prev.get(), _prevSize); + if (_controlBlockWriter) { + _controlBlockWriter(_bufBuilder->buf() + _bufBuilder->len() - _prevSize, _prevSize); + } + // Reset state - _controlByteOffset = 0; + _controlByteOffset = kNoSimple8bControl; _scaleIndex = Simple8bTypeUtil::kMemoryAsInteger; _prevDelta = 0; + _initializeFromPrevious(); +} + +void BSONColumnBuilder::EncodingState::_initializeFromPrevious() { // Initialize previous encoded when needed - switch (prevElem.type()) { + auto prevElem = _previous(); + auto type = prevElem.type(); + _storeWith128 = uses128bit(type); + switch (type) { case NumberDouble: _lastValueInPrevBlock = prevElem._numberDouble(); std::tie(_prevEncoded64, _scaleIndex) = scaleAndEncodeDouble(_lastValueInPrevBlock, 0); @@ -438,27 +770,31 @@ void BSONColumnBuilder::_writeLiteralFromPrevious() { } } -void BSONColumnBuilder::_incrementSimple8bCount() { +ptrdiff_t BSONColumnBuilder::EncodingState::_incrementSimple8bCount() { char* byte; uint8_t count; uint8_t control = kControlByteForScaleIndex[_scaleIndex]; - if (_controlByteOffset == 0) { + if (_controlByteOffset == kNoSimple8bControl) { // Allocate new control byte if we don't already have one. Record its offset so we can find // it even if the underlying buffer reallocates. - byte = _bufBuilder.skip(1); - _controlByteOffset = std::distance(_bufBuilder.buf(), byte); + byte = _bufBuilder->skip(1); + _controlByteOffset = std::distance(_bufBuilder->buf(), byte); count = 0; } else { // Read current count from previous control byte - byte = _bufBuilder.buf() + _controlByteOffset; + byte = _bufBuilder->buf() + _controlByteOffset; // If previous byte was written with a different control byte then we can't re-use and need // to start a new one if ((*byte & kControlMask) != control) { - _controlByteOffset = 0; + if (_controlBlockWriter) { + _controlBlockWriter(_bufBuilder->buf() + _controlByteOffset, + _bufBuilder->len() - _controlByteOffset); + } + _controlByteOffset = kNoSimple8bControl; _incrementSimple8bCount(); - return; + return kNoSimple8bControl; } count = (*byte & kCountMask) + 1; } @@ -466,17 +802,27 @@ void BSONColumnBuilder::_incrementSimple8bCount() { // Write back new count and clear offset if we have reached max count *byte = control | (count & kCountMask); if (count + 1 == kMaxCount) { - _controlByteOffset = 0; + auto prevControlByteOffset = _controlByteOffset; + _controlByteOffset = kNoSimple8bControl; + return prevControlByteOffset; } + + return kNoSimple8bControl; } -Simple8bWriteFn BSONColumnBuilder::_createBufferWriter() { +Simple8bWriteFn BSONColumnBuilder::EncodingState::_createBufferWriter() { return [this](uint64_t block) { // Write/update block count - _incrementSimple8bCount(); + ptrdiff_t fullControlOffset = _incrementSimple8bCount(); // Write Simple-8b block in little endian byte order - _bufBuilder.appendNum(block); + _bufBuilder->appendNum(block); + + // Write control block if this Simple-8b block made it full. + if (_controlBlockWriter && fullControlOffset != kNoSimple8bControl) { + _controlBlockWriter(_bufBuilder->buf() + fullControlOffset, + _bufBuilder->len() - fullControlOffset); + } auto previous = _previous(); if (previous.type() == NumberDouble) { @@ -487,10 +833,176 @@ Simple8bWriteFn BSONColumnBuilder::_createBufferWriter() { }; } -bool BSONColumnBuilder::_objectIdDeltaPossible(BSONElement elem, BSONElement prev) { - return !memcmp(prev.OID().getInstanceUnique().bytes, - elem.OID().getInstanceUnique().bytes, - OID::kInstanceUniqueSize); +void BSONColumnBuilder::_appendSubElements(const BSONObj& obj) { + // Check if added object is compatible with selected reference object. Collect a flat vector of + // all elements while we are doing this. + _flattenedAppendedObj.clear(); + if (!traverseLockStep( + _referenceSubObj, obj, [this](const BSONElement& ref, const BSONElement& elem) { + uassert(ErrorCodes::InvalidBSONType, + "MinKey or MaxKey is not valid for storage", + elem.type() != MinKey && elem.type() != MaxKey); + _flattenedAppendedObj.push_back(elem); + })) { + _flushSubObjMode(); + _startDetermineSubObjReference(obj); + return; + } + + // We should have recieved one callback for every sub-element in reference object. This should + // match number of encoding states setup previously. + invariant(_flattenedAppendedObj.size() == _subobjStates.size()); + auto statesIt = _subobjStates.begin(); + auto subElemIt = _flattenedAppendedObj.begin(); + auto subElemEnd = _flattenedAppendedObj.end(); + + // Append elements to corresponding encoding state. + for (; subElemIt != subElemEnd; ++subElemIt, ++statesIt) { + const auto& subelem = *subElemIt; + auto& state = *statesIt; + if (!subelem.eoo()) + state.append(subelem); + else + state.skip(); + } +} + +void BSONColumnBuilder::_startDetermineSubObjReference(const BSONObj& obj) { + // Start sub-object compression. Enter DeterminingReference mode, we use this first Object + // as the first reference + _state.flush(); + _state = {&_bufBuilder, nullptr}; + + _traverse(obj, [](const BSONElement& elem, const BSONElement&) { + uassert(ErrorCodes::InvalidBSONType, + "MinKey or MaxKey is not valid for storage", + elem.type() != MinKey && elem.type() != MaxKey); + }); + + _referenceSubObj = obj.getOwned(); + _bufferedObjElements.push_back(_referenceSubObj); + _mode = Mode::kSubObjDeterminingReference; +} + +void BSONColumnBuilder::_finishDetermineSubObjReference() { + // Done determining reference sub-object. Write this control byte and object to stream. + _bufBuilder.appendChar(bsoncolumn::kInterleavedStartControlByte); + _bufBuilder.appendBuf(_referenceSubObj.objdata(), _referenceSubObj.objsize()); + + // Initialize all encoding states. We do this by traversing in lock-step between the reference + // object and first buffered element. We can use the fact if sub-element exists in reference to + // determine if we should start with a zero delta or skip. + bool res = + traverseLockStep(_referenceSubObj, + _bufferedObjElements.front(), + [this](const BSONElement& ref, const BSONElement& elem) { + _subobjBuffers.emplace_back(); + auto* buffer = &_subobjBuffers.back().first; + auto* controlBlocks = &_subobjBuffers.back().second; + + // We need to buffer all control blocks written by the EncodingStates + // so they can be added to the main buffer in the right order. + auto controlBlockWriter = [buffer, controlBlocks]( + const char* controlBlock, size_t size) { + controlBlocks->emplace_back(controlBlock - buffer->buf(), size); + }; + + // Set a valid 'previous' into the encoding state to avoid a full + // literal to be written when we append the first element. We want this + // to be a zero delta as the reference object already contain this + // literal. + _subobjStates.emplace_back(buffer, controlBlockWriter); + _subobjStates.back()._storePrevious(ref); + _subobjStates.back()._initializeFromPrevious(); + if (!elem.eoo()) { + _subobjStates.back().append(elem); + } else { + _subobjStates.back().skip(); + } + }); + invariant(res); + _mode = Mode::kSubObjAppending; + + // Append remaining buffered objects. + auto it = _bufferedObjElements.begin() + 1; + auto end = _bufferedObjElements.end(); + for (; it != end; ++it) { + _appendSubElements(*it); + } + _bufferedObjElements.clear(); +} + +void BSONColumnBuilder::_flushSubObjMode() { + if (_mode == Mode::kSubObjDeterminingReference) { + _finishDetermineSubObjReference(); + } + + // Flush all EncodingStates, this will cause them to write out all their elements that is + // captured by the controlBlockWriter. + for (auto&& state : _subobjStates) { + state.flush(); + } + + // We now need to write all control blocks to the binary stream in the right order. This is done + // in the decoder's perspective where a DecodingState that exhausts its elements will read the + // next control byte. We can use a min-heap to see which encoding states have written the fewest + // elements so far. In case of tie we use the smallest encoder/decoder index. + std::vector<std::pair<uint32_t /* num elements written */, uint32_t /* encoder index */>> heap; + for (uint32_t i = 0; i < _subobjBuffers.size(); ++i) { + heap.emplace_back(0, i); + } + + // Initialize as min-heap + using MinHeap = std::greater<std::pair<uint32_t, uint32_t>>; + std::make_heap(heap.begin(), heap.end(), MinHeap()); + + // Append all control blocks + while (!heap.empty()) { + // Take out encoding state with fewest elements written from heap + std::pop_heap(heap.begin(), heap.end(), MinHeap()); + // And we take out control blocks in FIFO order from this encoding state + auto& slot = _subobjBuffers[heap.back().second]; + const char* controlBlock = slot.first.buf() + slot.second.front().first; + size_t size = slot.second.front().second; + + // Write it to the buffer + _bufBuilder.appendBuf(controlBlock, size); + slot.second.pop_front(); + if (slot.second.empty()) { + // No more control blocks for this encoding state so remove it from the heap + heap.pop_back(); + continue; + } + + // Calculate how many elements were in this control block + uint32_t elems = [&]() -> uint32_t { + if (bsoncolumn::isLiteralControlByte(*controlBlock)) { + return 1; + } + + Simple8b<uint128_t> reader( + controlBlock + 1, + sizeof(uint64_t) * bsoncolumn::numSimple8bBlocksForControlByte(*controlBlock)); + + uint32_t num = 0; + auto it = reader.begin(); + auto end = reader.end(); + while (it != end) { + num += it.blockSize(); + it.advanceBlock(); + } + return num; + }(); + + // Append num elements and put this encoding state back into the heap. + heap.back().first += elems; + std::push_heap(heap.begin(), heap.end(), MinHeap()); + } + // All control blocks written, write EOO to end the interleaving and cleanup. + _bufBuilder.appendChar(EOO); + _subobjStates.clear(); + _subobjBuffers.clear(); + _mode = Mode::kRegular; } } // namespace mongo diff --git a/src/mongo/bson/util/bsoncolumnbuilder.h b/src/mongo/bson/util/bsoncolumnbuilder.h index 036f30cccf9..83796ad5fe3 100644 --- a/src/mongo/bson/util/bsoncolumnbuilder.h +++ b/src/mongo/bson/util/bsoncolumnbuilder.h @@ -35,7 +35,9 @@ #include "mongo/bson/util/simple8b.h" #include "mongo/platform/int128.h" +#include <deque> #include <memory> +#include <vector> namespace mongo { @@ -46,7 +48,6 @@ class BSONColumnBuilder { public: BSONColumnBuilder(StringData fieldName); BSONColumnBuilder(StringData fieldName, BufBuilder&& builder); - BSONColumnBuilder(BSONColumnBuilder&&) = delete; /** * Appends a BSONElement to this BSONColumnBuilder. @@ -92,52 +93,107 @@ public: BufBuilder detach(); private: - BSONElement _previous() const; + /** + * State for encoding scalar BSONElement as BSONColumn using delta or delta-of-delta + * compression. When compressing Objects one Encoding state is used per sub-field within the + * object to compress. + */ + struct EncodingState { + EncodingState(BufBuilder* bufBuilder, + std::function<void(const char*, size_t)> controlBlockWriter); + EncodingState(EncodingState&& other); + EncodingState& operator=(EncodingState&& rhs); + + void append(BSONElement elem); + void skip(); + void flush(); + + BSONElement _previous() const; + void _storePrevious(BSONElement elem); + void _writeLiteralFromPrevious(); + void _initializeFromPrevious(); + ptrdiff_t _incrementSimple8bCount(); + + // Helper to append doubles to this Column builder. Returns true if append was successful + // and false if the value needs to be stored uncompressed. + bool _appendDouble(double value, double previous); + + // Tries to rescale current pending values + one additional value into a new + // Simple8bBuilder. Returns the new Simple8bBuilder if rescaling was possible and none + // otherwise. + boost::optional<Simple8bBuilder<uint64_t>> _tryRescalePending(int64_t encoded, + uint8_t newScaleIndex); + + Simple8bWriteFn _createBufferWriter(); + + // Storage for the previously appended BSONElement + std::unique_ptr<char[]> _prev; + int _prevSize = 0; + int _prevCapacity = 0; + // This is only used for types that use delta of delta. + int64_t _prevDelta = 0; + + // Simple-8b builder for storing compressed deltas + Simple8bBuilder<uint64_t> _simple8bBuilder64; + Simple8bBuilder<uint128_t> _simple8bBuilder128; - void _storePrevious(BSONElement elem); - void _writeLiteralFromPrevious(); - void _incrementSimple8bCount(); - bool _objectIdDeltaPossible(BSONElement elem, BSONElement prev); + // Chose whether to use 128 or 64 Simple-8b builder + bool _storeWith128 = false; - // Helper to append doubles to this Column builder. Returns true if append was successful and - // false if the value needs to be stored uncompressed. - bool _appendDouble(double value, double previous); + // Offset to last Simple-8b control byte + std::ptrdiff_t _controlByteOffset; - // Tries to rescale current pending values + one additional value into a new Simple8bBuilder. - // Returns the new Simple8bBuilder if rescaling was possible and none otherwise. - boost::optional<Simple8bBuilder<uint64_t>> _tryRescalePending(int64_t encoded, - uint8_t newScaleIndex); + // Additional variables needed for previous state + int64_t _prevEncoded64 = 0; + int128_t _prevEncoded128 = 0; + double _lastValueInPrevBlock = 0; + uint8_t _scaleIndex; - Simple8bWriteFn _createBufferWriter(); + BufBuilder* _bufBuilder; + std::function<void(const char*, size_t)> _controlBlockWriter; + }; - // Storage for the previously appended BSONElement - std::unique_ptr<char[]> _prev; - int _prevSize = 0; - int _prevCapacity = 0; - // This is only used for types that use delta of delta. - int64_t _prevDelta = 0; + // Append Object for sub-object compression when in mode kSubObjAppending + void _appendSubElements(const BSONObj& obj); - // Simple-8b builder for storing compressed deltas - Simple8bBuilder<uint64_t> _simple8bBuilder64; - Simple8bBuilder<uint128_t> _simple8bBuilder128; + // Transition into kSubObjDeterminingReference mode + void _startDetermineSubObjReference(const BSONObj& obj); - // Offset to last Simple-8b control byte - std::ptrdiff_t _controlByteOffset = 0; + // Transition from kSubObjDeterminingReference into kSubObjAppending + void _finishDetermineSubObjReference(); - // Additional variables needed for previous state - int64_t _prevEncoded64 = 0; - int128_t _prevEncoded128 = 0; - double _lastValueInPrevBlock = 0; - uint8_t _scaleIndex; + // Transition from kSubObjDeterminingReference or kSubObjAppending back into kRegular. + void _flushSubObjMode(); + + // Encoding state for kRegular mode + EncodingState _state; + + // Intermediate BufBuilder and offsets to written control blocks for sub-object compression + std::deque<std::pair<BufBuilder, std::deque<std::pair<ptrdiff_t, size_t>>>> _subobjBuffers; + + // Encoding states when in sub-object compression mode. There should be one encoding state per + // scalar field in '_referenceSubObj'. + std::deque<EncodingState> _subobjStates; + + // Reference object that is used to match object hierarchy to encoding states. Appending objects + // for sub-object compression need to check their hierarchy against this object. + BSONObj _referenceSubObj; + + // Buffered BSONObj when determining reference object. Will be compressed when this is complete + // and we transition into kSubObjAppending. + std::vector<BSONObj> _bufferedObjElements; + + // Helper to flatten Object to compress to match _subobjStates + std::vector<BSONElement> _flattenedAppendedObj; // Buffer for the BSON Column binary BufBuilder _bufBuilder; + enum class Mode { kRegular, kSubObjDeterminingReference, kSubObjAppending }; + Mode _mode = Mode::kRegular; + uint32_t _elementCount = 0; std::string _fieldName; - - // Chose whether to use 128 or 64 Simple-8b builder - bool _storeWith128 = false; }; } // namespace mongo |