summaryrefslogtreecommitdiff
path: root/src/mongo/db/storage/key_string.cpp
diff options
context:
space:
mode:
authorMathias Stearn <mathias@10gen.com>2015-01-06 14:47:30 -0500
committerMathias Stearn <mathias@10gen.com>2015-01-07 11:56:33 -0500
commit5d46a3a9f7b7f8b17ae1cbe73dd9f8d056350b81 (patch)
tree57a730638cd73a33a0c09ec14fb4f9b664035f9b /src/mongo/db/storage/key_string.cpp
parent4075f40df4fcb86ce6c8c6ea9565a4699769e4ed (diff)
downloadmongo-5d46a3a9f7b7f8b17ae1cbe73dd9f8d056350b81.tar.gz
SERVER-16632 Compress index keys in KeyString
* Bools are now completely encoded in the type byte * BinData size is stored in a single byte if < 255 * Numeric encoding improvements: ** Maximum encoded size including type byte is now 9 bytes ** The "size-class" of each number is now stored in the type byte ** Integers and the integer part of Doubles use variable numbers of bytes ** Doubles > 1 with a fractional part implicitly store their exponent in the integer part and use a variable number of bytes for the fractional part
Diffstat (limited to 'src/mongo/db/storage/key_string.cpp')
-rw-r--r--src/mongo/db/storage/key_string.cpp391
1 files changed, 265 insertions, 126 deletions
diff --git a/src/mongo/db/storage/key_string.cpp b/src/mongo/db/storage/key_string.cpp
index 09e0a27df1d..7cf1a103460 100644
--- a/src/mongo/db/storage/key_string.cpp
+++ b/src/mongo/db/storage/key_string.cpp
@@ -45,27 +45,72 @@ namespace mongo {
namespace {
namespace CType {
// canonical types namespace. (would be enum class CType: uint8_t in C++11)
- // Note 0 and 255 are disallowed and reserved for value encodings
+ // Note 0-9 and 246-255 are disallowed and reserved for value encodings.
+ // For types that encode value information in the type byte, the value in this list is
+ // the "generic" one to be used to represent all values of that type, such as in the
+ // encoding of fields in Objects.
const uint8_t kMinKey = 10;
const uint8_t kUndefined = 15;
const uint8_t kNullish = 20;
const uint8_t kNumeric = 30;
- const uint8_t kString = 40;
- const uint8_t kObject = 50;
- const uint8_t kArray = 60;
- const uint8_t kBinData = 70;
- const uint8_t kOID = 80;
- const uint8_t kBool = 90;
- const uint8_t kDate = 100;
- const uint8_t kTimestamp = 110;
- const uint8_t kRegEx = 120;
- const uint8_t kDBRef = 130;
- const uint8_t kCode = 140;
- const uint8_t kCodeWithScope = 150;
+ const uint8_t kString = 60;
+ const uint8_t kObject = 70;
+ const uint8_t kArray = 80;
+ const uint8_t kBinData = 90;
+ const uint8_t kOID = 100;
+ const uint8_t kBool = 110;
+ const uint8_t kDate = 120;
+ const uint8_t kTimestamp = 130;
+ const uint8_t kRegEx = 140;
+ const uint8_t kDBRef = 150;
+ const uint8_t kCode = 160;
+ const uint8_t kCodeWithScope = 170;
const uint8_t kMaxKey = 240;
+
+ // These are ordered by the numeric value of the values encoded in each format.
+ // Therefore each format can be considered independently without considering
+ // cross-format comparisons.
+ const uint8_t kNumericNaN = kNumeric + 0;
+ const uint8_t kNumericNegativeLargeDouble = kNumeric + 1; // <= -2**63 including -Inf
+ const uint8_t kNumericNegative8ByteInt = kNumeric + 2;
+ const uint8_t kNumericNegative7ByteInt = kNumeric + 3;
+ const uint8_t kNumericNegative6ByteInt = kNumeric + 4;
+ const uint8_t kNumericNegative5ByteInt = kNumeric + 5;
+ const uint8_t kNumericNegative4ByteInt = kNumeric + 6;
+ const uint8_t kNumericNegative3ByteInt = kNumeric + 7;
+ const uint8_t kNumericNegative2ByteInt = kNumeric + 8;
+ const uint8_t kNumericNegative1ByteInt = kNumeric + 9;
+ const uint8_t kNumericNegativeSmallDouble = kNumeric + 10; // between 0 and -1 exclusive
+ const uint8_t kNumericZero = kNumeric + 11;
+ const uint8_t kNumericPositiveSmallDouble = kNumeric + 12; // between 0 and 1 exclusive
+ const uint8_t kNumericPositive1ByteInt = kNumeric + 13;
+ const uint8_t kNumericPositive2ByteInt = kNumeric + 14;
+ const uint8_t kNumericPositive3ByteInt = kNumeric + 15;
+ const uint8_t kNumericPositive4ByteInt = kNumeric + 16;
+ const uint8_t kNumericPositive5ByteInt = kNumeric + 17;
+ const uint8_t kNumericPositive6ByteInt = kNumeric + 18;
+ const uint8_t kNumericPositive7ByteInt = kNumeric + 19;
+ const uint8_t kNumericPositive8ByteInt = kNumeric + 20;
+ const uint8_t kNumericPositiveLargeDouble = kNumeric + 21; // >= 2**63 including +Inf
+ BOOST_STATIC_ASSERT(kNumericPositiveLargeDouble < kString);
+
+ const uint8_t kBoolFalse = kBool + 0;
+ const uint8_t kBoolTrue = kBool + 1;
+ BOOST_STATIC_ASSERT(kBoolTrue < kDate);
+
+ size_t numBytesForInt(uint8_t ctype) {
+ if (ctype >= kNumericPositive1ByteInt) {
+ dassert(ctype <= kNumericPositive8ByteInt);
+ return ctype - kNumericPositive1ByteInt + 1;
+ }
+
+ dassert(ctype <= kNumericNegative1ByteInt);
+ dassert(ctype >= kNumericNegative8ByteInt);
+ return kNumericNegative1ByteInt - ctype + 1;
+ }
} // namespace CType
- uint8_t bsonTypeToKeyStringType(BSONType type) {
+ uint8_t bsonTypeToGenericKeyStringType(BSONType type) {
switch (type) {
case MinKey:
return CType::kMinKey;
@@ -116,12 +161,6 @@ namespace mongo {
const uint8_t kEqual = 0x2;
const uint8_t kLess = kEqual - 1;
const uint8_t kGreater = kEqual + 1;
-
- const uint8_t kNaN = 0x00;
- const uint8_t kZero = 0x80;
- const uint8_t kSmallDouble = kZero + 1;
- const uint8_t kLargeInt64 = kZero + 2;
- const uint8_t kLargeDouble = kZero + 3;
} // namespace
// some utility functions
@@ -253,10 +292,11 @@ namespace mongo {
}
void KeyString::_appendBool(bool val, bool invert) {
- _append(int8_t(val ? 1 : 0), invert);
+ _append(val ? CType::kBoolTrue : CType::kBoolFalse, invert);
}
void KeyString::_appendDate(Date_t val, bool invert) {
+ _append(CType::kDate, invert);
// see: http://en.wikipedia.org/wiki/Offset_binary
uint64_t encoded = static_cast<uint64_t>(val.asInt64());
encoded ^= (1LL << 63); // flip highest bit (equivalent to bias encoding)
@@ -264,37 +304,53 @@ namespace mongo {
}
void KeyString::_appendTimestamp(OpTime val, bool invert) {
+ _append(CType::kTimestamp, invert);
_append(endian::nativeToBig(val.asLL()), invert);
}
void KeyString::_appendOID(OID val, bool invert) {
+ _append(CType::kOID, invert);
_appendBytes(val.view().view(), OID::kOIDSize, invert);
}
void KeyString::_appendString(StringData val, bool invert) {
+ _append(CType::kString, invert);
_appendStringLike(val, invert);
}
void KeyString::_appendSymbol(StringData val, bool invert) {
+ _append(CType::kString, invert); // Symbols and Strings compare equally
_appendStringLike(val, invert);
}
void KeyString::_appendCode(StringData val, bool invert) {
+ _append(CType::kCode, invert);
_appendStringLike(val, invert);
}
void KeyString::_appendCodeWString(const BSONCodeWScope& val, bool invert) {
+ _append(CType::kCodeWithScope, invert);
_appendStringLike(val.code, invert);
_appendBson(val.scope, invert);
}
void KeyString::_appendBinData(const BSONBinData& val, bool invert) {
- _append(endian::nativeToBig(int32_t(val.length)), invert);
+ _append(CType::kBinData, invert);
+ if (val.length < 0xff) {
+ // size fits in one byte so use one byte to encode.
+ _append(uint8_t(val.length), invert);
+ }
+ else {
+ // Encode 0xff prefix to indicate that the size takes 4 bytes.
+ _append(uint8_t(0xff), invert);
+ _append(endian::nativeToBig(int32_t(val.length)), invert);
+ }
_append(uint8_t(val.type), invert);
_appendBytes(val.data, val.length, invert);
}
void KeyString::_appendRegex(const BSONRegEx& val, bool invert) {
+ _append(CType::kRegEx, invert);
// note: NULL is not allowed in pattern or flags
_appendBytes(val.pattern.rawData(), val.pattern.size(), invert);
_append(int8_t(0), invert);
@@ -303,27 +359,31 @@ namespace mongo {
}
void KeyString::_appendDBRef(const BSONDBRef& val, bool invert) {
+ _append(CType::kDBRef, invert);
_append(endian::nativeToBig(int32_t(val.ns.size())), invert);
_appendBytes(val.ns.rawData(), val.ns.size(), invert);
_appendBytes(val.oid.view().view(), OID::kOIDSize, invert);
}
void KeyString::_appendArray(const BSONArray& val, bool invert) {
+ _append(CType::kArray, invert);
BSONForEach(elem, val) {
+ // No generic type byte needed here since no name is encoded.
_appendBsonValue(elem, invert, NULL);
}
_append(int8_t(0), invert);
}
void KeyString::_appendObject(const BSONObj& val, bool invert) {
+ _append(CType::kObject, invert);
_appendBson(val, invert);
}
void KeyString::_appendDouble(const double num, bool invert) {
- // no special cases for Inf,
+ // no special cases needed for Inf,
// see http://en.wikipedia.org/wiki/IEEE_754-1985#Positive_and_negative_infinity
if (isNaN(num)) {
- _append(kNaN, invert);
+ _append(CType::kNumericNaN, invert);
return;
}
@@ -333,73 +393,76 @@ namespace mongo {
// however this prevents roundtripping -0.0.
// So if you put a -0.0 in, you'll get 0.0 out.
// We believe this to be ok.
- _append(kZero, invert);
+ _append(CType::kNumericZero, invert);
return;
}
- // if negative invert, unless we are inverting positives
- if (num < 0.0)
- invert = !invert;
+ const bool isNegative = num < 0.0;
+ const double magnitude = isNegative ? -num : num;
- const double magnitude = num < 0.0 ? -num : num;
- if (magnitude < kMinLargeInt64) {
- _appendSmallDouble(magnitude, invert);
+ if (magnitude < 1.0) {
+ // This includes subnormal numbers.
+ _appendSmallDouble(num, invert);
return;
}
if (magnitude < kMinLargeDouble) {
- _appendLargeInt64(static_cast<long long>(magnitude), invert);
+ uint64_t integerPart = uint64_t(magnitude);
+ if (double(integerPart) == magnitude) {
+ // No fractional part
+ _appendPreshiftedIntegerPortion(integerPart << 1, isNegative, invert);
+ return;
+ }
+
+ // There is a fractional part.
+ _appendPreshiftedIntegerPortion((integerPart << 1) | 1, isNegative, invert);
+
+ // Append the bytes of the mantissa that include fractional bits.
+ const size_t fractionalBits = (53 - (64 - countLeadingZeros64(integerPart)));
+ const size_t fractionalBytes = (fractionalBits + 7) / 8;
+ dassert(fractionalBytes > 0);
+ uint64_t mantissa;
+ memcpy(&mantissa, &num, sizeof(mantissa));
+ mantissa &= ~(uint64_t(-1) << fractionalBits); // set non-fractional bits to 0;
+ mantissa = endian::nativeToBig(mantissa);
+
+ const void* firstUsedByte =
+ reinterpret_cast<const char*>((&mantissa) + 1) - fractionalBytes;
+ _appendBytes(firstUsedByte, fractionalBytes, isNegative ? !invert : invert);
return;
}
- _appendLargeDouble(magnitude, invert);
+ _appendLargeDouble(num, invert);
}
void KeyString::_appendLongLong(const long long num, bool invert) {
- if (num == 0) {
- _append(kZero, invert);
- return;
- }
-
- if (num < 0)
- invert = !invert;
-
if (num == std::numeric_limits<long long>::min()) {
// -2**63 is exactly representable as a double and not as a positive int64.
// Therefore we encode it as a double.
dassert(-double(num) == kMinLargeDouble);
- _appendLargeDouble(-double(num), invert);
+ _appendLargeDouble(double(num), invert);
return;
}
- const long long magnitude = num < 0 ? -num : num;
- if (magnitude < kMinLargeInt64) {
- _appendSmallDouble(double(magnitude), invert);
+ if (num == 0) {
+ _append(CType::kNumericZero, invert);
return;
}
- _appendLargeInt64(magnitude, invert);
+ const bool isNegative = num < 0;
+ const uint64_t magnitude = isNegative ? -num : num;
+ _appendPreshiftedIntegerPortion(magnitude << 1, isNegative, invert);
}
void KeyString::_appendInt(const int num, bool invert) {
- if (num == 0) {
- _append(kZero, invert);
- return;
- }
-
- if (num < 0)
- invert = !invert;
-
- const double magnitude = num < 0 ? -double(num) : double(num);
- _appendSmallDouble(magnitude, invert);
+ // NumberInt encoding is the same as for NumberLong, without introducing new edge cases.
+ _appendLongLong(num, invert);
}
void KeyString::_appendBsonValue(const BSONElement& elem,
bool invert,
const StringData* name) {
- _append(bsonTypeToKeyStringType(elem.type()), invert);
-
if (name) {
_appendBytes(name->rawData(), name->size() + 1, invert); // + 1 for NUL
}
@@ -410,6 +473,7 @@ namespace mongo {
case EOO:
case Undefined:
case jstNULL:
+ _append(bsonTypeToGenericKeyStringType(elem.type()), invert);
break;
case NumberDouble: _appendDouble(elem._numberDouble(), invert); break;
@@ -469,29 +533,66 @@ namespace mongo {
void KeyString::_appendBson(const BSONObj& obj, bool invert) {
BSONForEach(elem, obj) {
+ // Force the order to be based on (type, name, value).
+ _append(bsonTypeToGenericKeyStringType(elem.type()), invert);
StringData name = elem.fieldNameStringData();
_appendBsonValue(elem, invert, &name);
}
_append(int8_t(0), invert);
}
- void KeyString::_appendSmallDouble(double magnitude, bool invert) {
- _append(kSmallDouble, invert);
+ void KeyString::_appendSmallDouble(double value, bool invert) {
+ dassert(!isNaN(value));
+ dassert(value != 0.0);
+
uint64_t data;
- memcpy(&data, &magnitude, sizeof(data));
- _append(endian::nativeToBig(data), invert);
+ memcpy(&data, &value, sizeof(data));
+
+ if (value > 0) {
+ _append(CType::kNumericPositiveSmallDouble, invert);
+ _append(endian::nativeToBig(data), invert);
+ }
+ else {
+ _append(CType::kNumericNegativeSmallDouble, invert);
+ _append(endian::nativeToBig(data), !invert);
+ }
}
- void KeyString::_appendLargeDouble(double magnitude, bool invert) {
- _append(uint8_t(kLargeDouble), invert);
+ void KeyString::_appendLargeDouble(double value, bool invert) {
+ dassert(!isNaN(value));
+ dassert(value != 0.0);
+
uint64_t data;
- memcpy(&data, &magnitude, sizeof(data));
- _append(endian::nativeToBig(data), invert);
+ memcpy(&data, &value, sizeof(data));
+
+ if (value > 0) {
+ _append(CType::kNumericPositiveLargeDouble, invert);
+ _append(endian::nativeToBig(data), invert);
+ }
+ else {
+ _append(CType::kNumericNegativeLargeDouble, invert);
+ _append(endian::nativeToBig(data), !invert);
+ }
}
- void KeyString::_appendLargeInt64(long long magnitude, bool invert) {
- _append(uint8_t(kLargeInt64), invert);
- _append(endian::nativeToBig(magnitude), invert);
+ void KeyString::_appendPreshiftedIntegerPortion(uint64_t value, bool isNegative, bool invert) {
+ dassert(value != 0ull);
+ dassert(value != 1ull);
+
+ const size_t bytesNeeded = (64 - countLeadingZeros64(value) + 7) / 8;
+
+ // Append the low bytes of value in big endian order.
+ value = endian::nativeToBig(value);
+ const void* firstUsedByte = reinterpret_cast<const char*>((&value) + 1) - bytesNeeded;
+
+ if (isNegative) {
+ _append(uint8_t(CType::kNumericNegative1ByteInt - (bytesNeeded - 1)), invert);
+ _appendBytes(firstUsedByte, bytesNeeded, !invert);
+ }
+ else {
+ _append(uint8_t(CType::kNumericPositive1ByteInt + (bytesNeeded - 1)), invert);
+ _appendBytes(firstUsedByte, bytesNeeded, invert);
+ }
}
template <typename T>
@@ -523,17 +624,16 @@ namespace mongo {
BSONObjBuilderValueStream* stream);
void toBson(BufReader* reader, bool inverted, BSONObjBuilder* builder) {
- uint8_t type;
- while ((type = readType<uint8_t>(reader, inverted)) != 0) {
+ while (readType<uint8_t>(reader, inverted) != 0) {
if (inverted) {
std::string name = readInvertedCString(reader);
BSONObjBuilderValueStream& stream = *builder << name;
- toBsonValue(type, reader, inverted, &stream);
+ toBsonValue(readType<uint8_t>(reader, inverted), reader, inverted, &stream);
}
else {
StringData name = readCString(reader);
BSONObjBuilderValueStream& stream = *builder << name;
- toBsonValue(type, reader, inverted, &stream);
+ toBsonValue(readType<uint8_t>(reader, inverted), reader, inverted, &stream);
}
}
}
@@ -542,15 +642,19 @@ namespace mongo {
BufReader* reader,
bool inverted,
BSONObjBuilderValueStream* stream) {
+
+ // This is only used by the kNumeric.*ByteInt types, but needs to be declared up here
+ // since it is used across a fallthrough.
+ bool isNegative = false;
+
switch (type) {
case CType::kMinKey: *stream << MINKEY; break;
case CType::kMaxKey: *stream << MAXKEY; break;
case CType::kNullish: *stream << BSONNULL; break;
case CType::kUndefined: *stream << BSONUndefined; break;
- case CType::kBool:
- *stream << bool(readType<uint8_t>(reader, inverted));
- break;
+ case CType::kBoolTrue: *stream << true; break;
+ case CType::kBoolFalse: *stream << false; break;
case CType::kDate:
*stream << Date_t(endian::bigToNative(readType<uint64_t>(reader,
@@ -607,7 +711,11 @@ namespace mongo {
}
case CType::kBinData: {
- size_t size = endian::bigToNative(readType<uint32_t>(reader, inverted));
+ size_t size = readType<uint8_t>(reader, inverted);
+ if (size == 0xff) {
+ // size was stored in 4 bytes.
+ size = endian::bigToNative(readType<uint32_t>(reader, inverted));
+ }
BinDataType type = BinDataType(readType<uint8_t>(reader, inverted));
const void* ptr = reader->skip(size);
if (!inverted) {
@@ -672,62 +780,93 @@ namespace mongo {
break;
}
- case CType::kNumeric: {
- // Using doubles for everything except kLargeInt64
- switch (uint8_t kind = readType<uint8_t>(reader, false)) {
- case kNaN: *stream << std::numeric_limits<double>::quiet_NaN(); break;
- case kZero: *stream << 0.0; break;
- case kLargeInt64: {
- long long ll = endian::bigToNative(readType<long long>(reader, false));
- if (inverted)
- ll = -ll;
- *stream << ll;
- break;
- }
- case kSmallDouble:
- case kLargeDouble: {
- uint64_t encoded = readType<uint64_t>(reader, false);
- encoded = endian::bigToNative(encoded);
- double d;
- memcpy(&d, &encoded, sizeof(d));
- if (inverted)
- *stream << -d;
- else
- *stream << d;
- break;
- }
+ //
+ // Numerics
+ // Using doubles for everything outside of the range +/- [2**52, 2**63)
+ //
+
+ case CType::kNumericNaN: *stream << std::numeric_limits<double>::quiet_NaN(); break;
+ case CType::kNumericZero: *stream << 0.0; break;
+
+ case CType::kNumericNegativeLargeDouble:
+ case CType::kNumericNegativeSmallDouble:
+ inverted = !inverted;
+ // fallthrough (format is the same as positive, but inverted)
+
+ case CType::kNumericPositiveLargeDouble:
+ case CType::kNumericPositiveSmallDouble: {
+ // for these, the original double was stored intact, including sign bit.
+ uint64_t encoded = readType<uint64_t>(reader, inverted);
+ encoded = endian::bigToNative(encoded);
+ double d;
+ memcpy(&d, &encoded, sizeof(d));
+ *stream << d;
+ break;
+ }
- default: {
- switch (uint8_t(~kind)) {
- case kNaN: *stream << std::numeric_limits<double>::quiet_NaN(); break;
- case kZero: *stream << 0.0; break;
- case kLargeInt64: {
- long long ll = ~endian::bigToNative(readType<long long>(reader, false));
- if (!inverted)
- ll = -ll;
- *stream << ll;
- break;
+ case CType::kNumericNegative8ByteInt:
+ case CType::kNumericNegative7ByteInt:
+ case CType::kNumericNegative6ByteInt:
+ case CType::kNumericNegative5ByteInt:
+ case CType::kNumericNegative4ByteInt:
+ case CType::kNumericNegative3ByteInt:
+ case CType::kNumericNegative2ByteInt:
+ case CType::kNumericNegative1ByteInt:
+ inverted = !inverted;
+ isNegative = true;
+ // fallthrough (format is the same as positive, but inverted)
+
+ case CType::kNumericPositive1ByteInt:
+ case CType::kNumericPositive2ByteInt:
+ case CType::kNumericPositive3ByteInt:
+ case CType::kNumericPositive4ByteInt:
+ case CType::kNumericPositive5ByteInt:
+ case CType::kNumericPositive6ByteInt:
+ case CType::kNumericPositive7ByteInt:
+ case CType::kNumericPositive8ByteInt: {
+ uint64_t encodedIntegerPart = 0;
+ {
+ size_t intBytesRemaining = CType::numBytesForInt(type);
+ while (intBytesRemaining--) {
+ encodedIntegerPart = (encodedIntegerPart << 8)
+ | readType<uint8_t>(reader, inverted);
}
+ }
- case kSmallDouble:
- case kLargeDouble: {
- uint64_t encoded;
- memcpy_flipBits(&encoded, reader->pos(), sizeof(encoded));
- reader->skip(sizeof(encoded));
- encoded = endian::bigToNative(encoded);
- double d;
- memcpy(&d, &encoded, sizeof(d));
- if(inverted)
- *stream << d;
- else
- *stream << -d;
- break;
+ const bool haveFractionalPart = (encodedIntegerPart & 1);
+ long long integerPart = encodedIntegerPart >> 1;
+
+ if (!haveFractionalPart) {
+ if (integerPart < kMinLargeInt64) {
+ *stream << (isNegative ? -double(integerPart) : double(integerPart));
}
- default: invariant(false);
+ else {
+ *stream << (isNegative ? -integerPart : integerPart);
}
- break;
}
+ else {
+ const uint64_t exponent = (64 - countLeadingZeros64(integerPart)) - 1;
+ const size_t fractionalBits = (52 - exponent);
+ const size_t fractionalBytes = (fractionalBits + 7) / 8;
+
+ // build up the bits of a double here.
+ uint64_t doubleBits = integerPart << fractionalBits;
+ doubleBits &= ~(1ull << 52); // clear implicit leading 1
+ doubleBits |= (exponent + 1023/*bias*/) << 52;
+ if (isNegative) {
+ doubleBits |= (1ull << 63); // sign bit
+ }
+ for (size_t i = 0; i < fractionalBytes; i++) {
+ // fold in the fractional bytes
+ const uint64_t byte = readType<uint8_t>(reader, inverted);
+ doubleBits |= (byte << ((fractionalBytes - i - 1) * 8));
+ }
+
+ double number;
+ memcpy(&number, &doubleBits, sizeof(number));
+ *stream << number;
}
+
break;
}
default: invariant(false);