diff options
author | Mathias Stearn <mathias@10gen.com> | 2015-01-06 14:47:30 -0500 |
---|---|---|
committer | Mathias Stearn <mathias@10gen.com> | 2015-01-07 11:56:33 -0500 |
commit | 5d46a3a9f7b7f8b17ae1cbe73dd9f8d056350b81 (patch) | |
tree | 57a730638cd73a33a0c09ec14fb4f9b664035f9b /src/mongo/db/storage/key_string.cpp | |
parent | 4075f40df4fcb86ce6c8c6ea9565a4699769e4ed (diff) | |
download | mongo-5d46a3a9f7b7f8b17ae1cbe73dd9f8d056350b81.tar.gz |
SERVER-16632 Compress index keys in KeyString
* Bools are now completely encoded in the type byte
* BinData size is stored in a single byte if < 255
* Numeric encoding improvements:
** Maximum encoded size including type byte is now 9 bytes
** The "size-class" of each number is now stored in the type byte
** Integers and the integer part of Doubles use variable numbers of bytes
** Doubles > 1 with a fractional part implicitly store their exponent in the
integer part and use a variable number of bytes for the fractional part
Diffstat (limited to 'src/mongo/db/storage/key_string.cpp')
-rw-r--r-- | src/mongo/db/storage/key_string.cpp | 391 |
1 files changed, 265 insertions, 126 deletions
diff --git a/src/mongo/db/storage/key_string.cpp b/src/mongo/db/storage/key_string.cpp index 09e0a27df1d..7cf1a103460 100644 --- a/src/mongo/db/storage/key_string.cpp +++ b/src/mongo/db/storage/key_string.cpp @@ -45,27 +45,72 @@ namespace mongo { namespace { namespace CType { // canonical types namespace. (would be enum class CType: uint8_t in C++11) - // Note 0 and 255 are disallowed and reserved for value encodings + // Note 0-9 and 246-255 are disallowed and reserved for value encodings. + // For types that encode value information in the type byte, the value in this list is + // the "generic" one to be used to represent all values of that type, such as in the + // encoding of fields in Objects. const uint8_t kMinKey = 10; const uint8_t kUndefined = 15; const uint8_t kNullish = 20; const uint8_t kNumeric = 30; - const uint8_t kString = 40; - const uint8_t kObject = 50; - const uint8_t kArray = 60; - const uint8_t kBinData = 70; - const uint8_t kOID = 80; - const uint8_t kBool = 90; - const uint8_t kDate = 100; - const uint8_t kTimestamp = 110; - const uint8_t kRegEx = 120; - const uint8_t kDBRef = 130; - const uint8_t kCode = 140; - const uint8_t kCodeWithScope = 150; + const uint8_t kString = 60; + const uint8_t kObject = 70; + const uint8_t kArray = 80; + const uint8_t kBinData = 90; + const uint8_t kOID = 100; + const uint8_t kBool = 110; + const uint8_t kDate = 120; + const uint8_t kTimestamp = 130; + const uint8_t kRegEx = 140; + const uint8_t kDBRef = 150; + const uint8_t kCode = 160; + const uint8_t kCodeWithScope = 170; const uint8_t kMaxKey = 240; + + // These are ordered by the numeric value of the values encoded in each format. + // Therefore each format can be considered independently without considering + // cross-format comparisons. + const uint8_t kNumericNaN = kNumeric + 0; + const uint8_t kNumericNegativeLargeDouble = kNumeric + 1; // <= -2**63 including -Inf + const uint8_t kNumericNegative8ByteInt = kNumeric + 2; + const uint8_t kNumericNegative7ByteInt = kNumeric + 3; + const uint8_t kNumericNegative6ByteInt = kNumeric + 4; + const uint8_t kNumericNegative5ByteInt = kNumeric + 5; + const uint8_t kNumericNegative4ByteInt = kNumeric + 6; + const uint8_t kNumericNegative3ByteInt = kNumeric + 7; + const uint8_t kNumericNegative2ByteInt = kNumeric + 8; + const uint8_t kNumericNegative1ByteInt = kNumeric + 9; + const uint8_t kNumericNegativeSmallDouble = kNumeric + 10; // between 0 and -1 exclusive + const uint8_t kNumericZero = kNumeric + 11; + const uint8_t kNumericPositiveSmallDouble = kNumeric + 12; // between 0 and 1 exclusive + const uint8_t kNumericPositive1ByteInt = kNumeric + 13; + const uint8_t kNumericPositive2ByteInt = kNumeric + 14; + const uint8_t kNumericPositive3ByteInt = kNumeric + 15; + const uint8_t kNumericPositive4ByteInt = kNumeric + 16; + const uint8_t kNumericPositive5ByteInt = kNumeric + 17; + const uint8_t kNumericPositive6ByteInt = kNumeric + 18; + const uint8_t kNumericPositive7ByteInt = kNumeric + 19; + const uint8_t kNumericPositive8ByteInt = kNumeric + 20; + const uint8_t kNumericPositiveLargeDouble = kNumeric + 21; // >= 2**63 including +Inf + BOOST_STATIC_ASSERT(kNumericPositiveLargeDouble < kString); + + const uint8_t kBoolFalse = kBool + 0; + const uint8_t kBoolTrue = kBool + 1; + BOOST_STATIC_ASSERT(kBoolTrue < kDate); + + size_t numBytesForInt(uint8_t ctype) { + if (ctype >= kNumericPositive1ByteInt) { + dassert(ctype <= kNumericPositive8ByteInt); + return ctype - kNumericPositive1ByteInt + 1; + } + + dassert(ctype <= kNumericNegative1ByteInt); + dassert(ctype >= kNumericNegative8ByteInt); + return kNumericNegative1ByteInt - ctype + 1; + } } // namespace CType - uint8_t bsonTypeToKeyStringType(BSONType type) { + uint8_t bsonTypeToGenericKeyStringType(BSONType type) { switch (type) { case MinKey: return CType::kMinKey; @@ -116,12 +161,6 @@ namespace mongo { const uint8_t kEqual = 0x2; const uint8_t kLess = kEqual - 1; const uint8_t kGreater = kEqual + 1; - - const uint8_t kNaN = 0x00; - const uint8_t kZero = 0x80; - const uint8_t kSmallDouble = kZero + 1; - const uint8_t kLargeInt64 = kZero + 2; - const uint8_t kLargeDouble = kZero + 3; } // namespace // some utility functions @@ -253,10 +292,11 @@ namespace mongo { } void KeyString::_appendBool(bool val, bool invert) { - _append(int8_t(val ? 1 : 0), invert); + _append(val ? CType::kBoolTrue : CType::kBoolFalse, invert); } void KeyString::_appendDate(Date_t val, bool invert) { + _append(CType::kDate, invert); // see: http://en.wikipedia.org/wiki/Offset_binary uint64_t encoded = static_cast<uint64_t>(val.asInt64()); encoded ^= (1LL << 63); // flip highest bit (equivalent to bias encoding) @@ -264,37 +304,53 @@ namespace mongo { } void KeyString::_appendTimestamp(OpTime val, bool invert) { + _append(CType::kTimestamp, invert); _append(endian::nativeToBig(val.asLL()), invert); } void KeyString::_appendOID(OID val, bool invert) { + _append(CType::kOID, invert); _appendBytes(val.view().view(), OID::kOIDSize, invert); } void KeyString::_appendString(StringData val, bool invert) { + _append(CType::kString, invert); _appendStringLike(val, invert); } void KeyString::_appendSymbol(StringData val, bool invert) { + _append(CType::kString, invert); // Symbols and Strings compare equally _appendStringLike(val, invert); } void KeyString::_appendCode(StringData val, bool invert) { + _append(CType::kCode, invert); _appendStringLike(val, invert); } void KeyString::_appendCodeWString(const BSONCodeWScope& val, bool invert) { + _append(CType::kCodeWithScope, invert); _appendStringLike(val.code, invert); _appendBson(val.scope, invert); } void KeyString::_appendBinData(const BSONBinData& val, bool invert) { - _append(endian::nativeToBig(int32_t(val.length)), invert); + _append(CType::kBinData, invert); + if (val.length < 0xff) { + // size fits in one byte so use one byte to encode. + _append(uint8_t(val.length), invert); + } + else { + // Encode 0xff prefix to indicate that the size takes 4 bytes. + _append(uint8_t(0xff), invert); + _append(endian::nativeToBig(int32_t(val.length)), invert); + } _append(uint8_t(val.type), invert); _appendBytes(val.data, val.length, invert); } void KeyString::_appendRegex(const BSONRegEx& val, bool invert) { + _append(CType::kRegEx, invert); // note: NULL is not allowed in pattern or flags _appendBytes(val.pattern.rawData(), val.pattern.size(), invert); _append(int8_t(0), invert); @@ -303,27 +359,31 @@ namespace mongo { } void KeyString::_appendDBRef(const BSONDBRef& val, bool invert) { + _append(CType::kDBRef, invert); _append(endian::nativeToBig(int32_t(val.ns.size())), invert); _appendBytes(val.ns.rawData(), val.ns.size(), invert); _appendBytes(val.oid.view().view(), OID::kOIDSize, invert); } void KeyString::_appendArray(const BSONArray& val, bool invert) { + _append(CType::kArray, invert); BSONForEach(elem, val) { + // No generic type byte needed here since no name is encoded. _appendBsonValue(elem, invert, NULL); } _append(int8_t(0), invert); } void KeyString::_appendObject(const BSONObj& val, bool invert) { + _append(CType::kObject, invert); _appendBson(val, invert); } void KeyString::_appendDouble(const double num, bool invert) { - // no special cases for Inf, + // no special cases needed for Inf, // see http://en.wikipedia.org/wiki/IEEE_754-1985#Positive_and_negative_infinity if (isNaN(num)) { - _append(kNaN, invert); + _append(CType::kNumericNaN, invert); return; } @@ -333,73 +393,76 @@ namespace mongo { // however this prevents roundtripping -0.0. // So if you put a -0.0 in, you'll get 0.0 out. // We believe this to be ok. - _append(kZero, invert); + _append(CType::kNumericZero, invert); return; } - // if negative invert, unless we are inverting positives - if (num < 0.0) - invert = !invert; + const bool isNegative = num < 0.0; + const double magnitude = isNegative ? -num : num; - const double magnitude = num < 0.0 ? -num : num; - if (magnitude < kMinLargeInt64) { - _appendSmallDouble(magnitude, invert); + if (magnitude < 1.0) { + // This includes subnormal numbers. + _appendSmallDouble(num, invert); return; } if (magnitude < kMinLargeDouble) { - _appendLargeInt64(static_cast<long long>(magnitude), invert); + uint64_t integerPart = uint64_t(magnitude); + if (double(integerPart) == magnitude) { + // No fractional part + _appendPreshiftedIntegerPortion(integerPart << 1, isNegative, invert); + return; + } + + // There is a fractional part. + _appendPreshiftedIntegerPortion((integerPart << 1) | 1, isNegative, invert); + + // Append the bytes of the mantissa that include fractional bits. + const size_t fractionalBits = (53 - (64 - countLeadingZeros64(integerPart))); + const size_t fractionalBytes = (fractionalBits + 7) / 8; + dassert(fractionalBytes > 0); + uint64_t mantissa; + memcpy(&mantissa, &num, sizeof(mantissa)); + mantissa &= ~(uint64_t(-1) << fractionalBits); // set non-fractional bits to 0; + mantissa = endian::nativeToBig(mantissa); + + const void* firstUsedByte = + reinterpret_cast<const char*>((&mantissa) + 1) - fractionalBytes; + _appendBytes(firstUsedByte, fractionalBytes, isNegative ? !invert : invert); return; } - _appendLargeDouble(magnitude, invert); + _appendLargeDouble(num, invert); } void KeyString::_appendLongLong(const long long num, bool invert) { - if (num == 0) { - _append(kZero, invert); - return; - } - - if (num < 0) - invert = !invert; - if (num == std::numeric_limits<long long>::min()) { // -2**63 is exactly representable as a double and not as a positive int64. // Therefore we encode it as a double. dassert(-double(num) == kMinLargeDouble); - _appendLargeDouble(-double(num), invert); + _appendLargeDouble(double(num), invert); return; } - const long long magnitude = num < 0 ? -num : num; - if (magnitude < kMinLargeInt64) { - _appendSmallDouble(double(magnitude), invert); + if (num == 0) { + _append(CType::kNumericZero, invert); return; } - _appendLargeInt64(magnitude, invert); + const bool isNegative = num < 0; + const uint64_t magnitude = isNegative ? -num : num; + _appendPreshiftedIntegerPortion(magnitude << 1, isNegative, invert); } void KeyString::_appendInt(const int num, bool invert) { - if (num == 0) { - _append(kZero, invert); - return; - } - - if (num < 0) - invert = !invert; - - const double magnitude = num < 0 ? -double(num) : double(num); - _appendSmallDouble(magnitude, invert); + // NumberInt encoding is the same as for NumberLong, without introducing new edge cases. + _appendLongLong(num, invert); } void KeyString::_appendBsonValue(const BSONElement& elem, bool invert, const StringData* name) { - _append(bsonTypeToKeyStringType(elem.type()), invert); - if (name) { _appendBytes(name->rawData(), name->size() + 1, invert); // + 1 for NUL } @@ -410,6 +473,7 @@ namespace mongo { case EOO: case Undefined: case jstNULL: + _append(bsonTypeToGenericKeyStringType(elem.type()), invert); break; case NumberDouble: _appendDouble(elem._numberDouble(), invert); break; @@ -469,29 +533,66 @@ namespace mongo { void KeyString::_appendBson(const BSONObj& obj, bool invert) { BSONForEach(elem, obj) { + // Force the order to be based on (type, name, value). + _append(bsonTypeToGenericKeyStringType(elem.type()), invert); StringData name = elem.fieldNameStringData(); _appendBsonValue(elem, invert, &name); } _append(int8_t(0), invert); } - void KeyString::_appendSmallDouble(double magnitude, bool invert) { - _append(kSmallDouble, invert); + void KeyString::_appendSmallDouble(double value, bool invert) { + dassert(!isNaN(value)); + dassert(value != 0.0); + uint64_t data; - memcpy(&data, &magnitude, sizeof(data)); - _append(endian::nativeToBig(data), invert); + memcpy(&data, &value, sizeof(data)); + + if (value > 0) { + _append(CType::kNumericPositiveSmallDouble, invert); + _append(endian::nativeToBig(data), invert); + } + else { + _append(CType::kNumericNegativeSmallDouble, invert); + _append(endian::nativeToBig(data), !invert); + } } - void KeyString::_appendLargeDouble(double magnitude, bool invert) { - _append(uint8_t(kLargeDouble), invert); + void KeyString::_appendLargeDouble(double value, bool invert) { + dassert(!isNaN(value)); + dassert(value != 0.0); + uint64_t data; - memcpy(&data, &magnitude, sizeof(data)); - _append(endian::nativeToBig(data), invert); + memcpy(&data, &value, sizeof(data)); + + if (value > 0) { + _append(CType::kNumericPositiveLargeDouble, invert); + _append(endian::nativeToBig(data), invert); + } + else { + _append(CType::kNumericNegativeLargeDouble, invert); + _append(endian::nativeToBig(data), !invert); + } } - void KeyString::_appendLargeInt64(long long magnitude, bool invert) { - _append(uint8_t(kLargeInt64), invert); - _append(endian::nativeToBig(magnitude), invert); + void KeyString::_appendPreshiftedIntegerPortion(uint64_t value, bool isNegative, bool invert) { + dassert(value != 0ull); + dassert(value != 1ull); + + const size_t bytesNeeded = (64 - countLeadingZeros64(value) + 7) / 8; + + // Append the low bytes of value in big endian order. + value = endian::nativeToBig(value); + const void* firstUsedByte = reinterpret_cast<const char*>((&value) + 1) - bytesNeeded; + + if (isNegative) { + _append(uint8_t(CType::kNumericNegative1ByteInt - (bytesNeeded - 1)), invert); + _appendBytes(firstUsedByte, bytesNeeded, !invert); + } + else { + _append(uint8_t(CType::kNumericPositive1ByteInt + (bytesNeeded - 1)), invert); + _appendBytes(firstUsedByte, bytesNeeded, invert); + } } template <typename T> @@ -523,17 +624,16 @@ namespace mongo { BSONObjBuilderValueStream* stream); void toBson(BufReader* reader, bool inverted, BSONObjBuilder* builder) { - uint8_t type; - while ((type = readType<uint8_t>(reader, inverted)) != 0) { + while (readType<uint8_t>(reader, inverted) != 0) { if (inverted) { std::string name = readInvertedCString(reader); BSONObjBuilderValueStream& stream = *builder << name; - toBsonValue(type, reader, inverted, &stream); + toBsonValue(readType<uint8_t>(reader, inverted), reader, inverted, &stream); } else { StringData name = readCString(reader); BSONObjBuilderValueStream& stream = *builder << name; - toBsonValue(type, reader, inverted, &stream); + toBsonValue(readType<uint8_t>(reader, inverted), reader, inverted, &stream); } } } @@ -542,15 +642,19 @@ namespace mongo { BufReader* reader, bool inverted, BSONObjBuilderValueStream* stream) { + + // This is only used by the kNumeric.*ByteInt types, but needs to be declared up here + // since it is used across a fallthrough. + bool isNegative = false; + switch (type) { case CType::kMinKey: *stream << MINKEY; break; case CType::kMaxKey: *stream << MAXKEY; break; case CType::kNullish: *stream << BSONNULL; break; case CType::kUndefined: *stream << BSONUndefined; break; - case CType::kBool: - *stream << bool(readType<uint8_t>(reader, inverted)); - break; + case CType::kBoolTrue: *stream << true; break; + case CType::kBoolFalse: *stream << false; break; case CType::kDate: *stream << Date_t(endian::bigToNative(readType<uint64_t>(reader, @@ -607,7 +711,11 @@ namespace mongo { } case CType::kBinData: { - size_t size = endian::bigToNative(readType<uint32_t>(reader, inverted)); + size_t size = readType<uint8_t>(reader, inverted); + if (size == 0xff) { + // size was stored in 4 bytes. + size = endian::bigToNative(readType<uint32_t>(reader, inverted)); + } BinDataType type = BinDataType(readType<uint8_t>(reader, inverted)); const void* ptr = reader->skip(size); if (!inverted) { @@ -672,62 +780,93 @@ namespace mongo { break; } - case CType::kNumeric: { - // Using doubles for everything except kLargeInt64 - switch (uint8_t kind = readType<uint8_t>(reader, false)) { - case kNaN: *stream << std::numeric_limits<double>::quiet_NaN(); break; - case kZero: *stream << 0.0; break; - case kLargeInt64: { - long long ll = endian::bigToNative(readType<long long>(reader, false)); - if (inverted) - ll = -ll; - *stream << ll; - break; - } - case kSmallDouble: - case kLargeDouble: { - uint64_t encoded = readType<uint64_t>(reader, false); - encoded = endian::bigToNative(encoded); - double d; - memcpy(&d, &encoded, sizeof(d)); - if (inverted) - *stream << -d; - else - *stream << d; - break; - } + // + // Numerics + // Using doubles for everything outside of the range +/- [2**52, 2**63) + // + + case CType::kNumericNaN: *stream << std::numeric_limits<double>::quiet_NaN(); break; + case CType::kNumericZero: *stream << 0.0; break; + + case CType::kNumericNegativeLargeDouble: + case CType::kNumericNegativeSmallDouble: + inverted = !inverted; + // fallthrough (format is the same as positive, but inverted) + + case CType::kNumericPositiveLargeDouble: + case CType::kNumericPositiveSmallDouble: { + // for these, the original double was stored intact, including sign bit. + uint64_t encoded = readType<uint64_t>(reader, inverted); + encoded = endian::bigToNative(encoded); + double d; + memcpy(&d, &encoded, sizeof(d)); + *stream << d; + break; + } - default: { - switch (uint8_t(~kind)) { - case kNaN: *stream << std::numeric_limits<double>::quiet_NaN(); break; - case kZero: *stream << 0.0; break; - case kLargeInt64: { - long long ll = ~endian::bigToNative(readType<long long>(reader, false)); - if (!inverted) - ll = -ll; - *stream << ll; - break; + case CType::kNumericNegative8ByteInt: + case CType::kNumericNegative7ByteInt: + case CType::kNumericNegative6ByteInt: + case CType::kNumericNegative5ByteInt: + case CType::kNumericNegative4ByteInt: + case CType::kNumericNegative3ByteInt: + case CType::kNumericNegative2ByteInt: + case CType::kNumericNegative1ByteInt: + inverted = !inverted; + isNegative = true; + // fallthrough (format is the same as positive, but inverted) + + case CType::kNumericPositive1ByteInt: + case CType::kNumericPositive2ByteInt: + case CType::kNumericPositive3ByteInt: + case CType::kNumericPositive4ByteInt: + case CType::kNumericPositive5ByteInt: + case CType::kNumericPositive6ByteInt: + case CType::kNumericPositive7ByteInt: + case CType::kNumericPositive8ByteInt: { + uint64_t encodedIntegerPart = 0; + { + size_t intBytesRemaining = CType::numBytesForInt(type); + while (intBytesRemaining--) { + encodedIntegerPart = (encodedIntegerPart << 8) + | readType<uint8_t>(reader, inverted); } + } - case kSmallDouble: - case kLargeDouble: { - uint64_t encoded; - memcpy_flipBits(&encoded, reader->pos(), sizeof(encoded)); - reader->skip(sizeof(encoded)); - encoded = endian::bigToNative(encoded); - double d; - memcpy(&d, &encoded, sizeof(d)); - if(inverted) - *stream << d; - else - *stream << -d; - break; + const bool haveFractionalPart = (encodedIntegerPart & 1); + long long integerPart = encodedIntegerPart >> 1; + + if (!haveFractionalPart) { + if (integerPart < kMinLargeInt64) { + *stream << (isNegative ? -double(integerPart) : double(integerPart)); } - default: invariant(false); + else { + *stream << (isNegative ? -integerPart : integerPart); } - break; } + else { + const uint64_t exponent = (64 - countLeadingZeros64(integerPart)) - 1; + const size_t fractionalBits = (52 - exponent); + const size_t fractionalBytes = (fractionalBits + 7) / 8; + + // build up the bits of a double here. + uint64_t doubleBits = integerPart << fractionalBits; + doubleBits &= ~(1ull << 52); // clear implicit leading 1 + doubleBits |= (exponent + 1023/*bias*/) << 52; + if (isNegative) { + doubleBits |= (1ull << 63); // sign bit + } + for (size_t i = 0; i < fractionalBytes; i++) { + // fold in the fractional bytes + const uint64_t byte = readType<uint8_t>(reader, inverted); + doubleBits |= (byte << ((fractionalBytes - i - 1) * 8)); + } + + double number; + memcpy(&number, &doubleBits, sizeof(number)); + *stream << number; } + break; } default: invariant(false); |