diff options
author | Geert Bosch <geert@mongodb.com> | 2016-03-31 00:42:05 -0400 |
---|---|---|
committer | Geert Bosch <geert@mongodb.com> | 2016-04-22 15:00:29 -0400 |
commit | b5282c3b6d17a8b11e432ca5fbbfde5caddea048 (patch) | |
tree | 2205f7d457f0a1b7b278f8b723c5820c642d556d | |
parent | 8e13345122986b242811ebee1c0908a4fc01640c (diff) | |
download | mongo-b5282c3b6d17a8b11e432ca5fbbfde5caddea048.tar.gz |
SERVER-19703 Add KeyString support for NumberDecimal
-rw-r--r-- | src/mongo/db/catalog/collection.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/storage/key_string.cpp | 890 | ||||
-rw-r--r-- | src/mongo/db/storage/key_string.h | 122 | ||||
-rw-r--r-- | src/mongo/db/storage/key_string_test.cpp | 667 | ||||
-rw-r--r-- | src/mongo/db/storage/wiredtiger/wiredtiger_index.cpp | 47 | ||||
-rw-r--r-- | src/mongo/db/storage/wiredtiger/wiredtiger_index.h | 6 | ||||
-rw-r--r-- | src/mongo/platform/decimal128.cpp | 8 | ||||
-rw-r--r-- | src/mongo/platform/decimal128.h | 5 |
8 files changed, 1399 insertions, 349 deletions
diff --git a/src/mongo/db/catalog/collection.cpp b/src/mongo/db/catalog/collection.cpp index d83a825903b..53d77014120 100644 --- a/src/mongo/db/catalog/collection.cpp +++ b/src/mongo/db/catalog/collection.cpp @@ -1176,7 +1176,8 @@ private: uint32_t hashIndexEntry(const BSONObj& key, RecordId& loc) { uint32_t hash; - KeyString ks(key, Ordering::make(BSONObj()), loc); + // We're only using KeyString to get something hashable here, so version doesn't matter. + KeyString ks(KeyString::Version::V1, key, Ordering::make(BSONObj()), loc); MurmurHash3_x86_32(ks.getTypeBits().getBuffer(), ks.getTypeBits().getSize(), 0, &hash); MurmurHash3_x86_32(ks.getBuffer(), ks.getSize(), hash, &hash); // Take the first 25 bits to conserve memory. diff --git a/src/mongo/db/storage/key_string.cpp b/src/mongo/db/storage/key_string.cpp index 00dc9e067d9..3c5fafa3ec2 100644 --- a/src/mongo/db/storage/key_string.cpp +++ b/src/mongo/db/storage/key_string.cpp @@ -77,7 +77,7 @@ const uint8_t kMaxKey = 240; // Therefore each format can be considered independently without considering // cross-format comparisons. const uint8_t kNumericNaN = kNumeric + 0; -const uint8_t kNumericNegativeLargeDouble = kNumeric + 1; // <= -2**63 including -Inf +const uint8_t kNumericNegativeLargeMagnitude = kNumeric + 1; // <= -2**63 including -Inf const uint8_t kNumericNegative8ByteInt = kNumeric + 2; const uint8_t kNumericNegative7ByteInt = kNumeric + 3; const uint8_t kNumericNegative6ByteInt = kNumeric + 4; @@ -86,9 +86,9 @@ const uint8_t kNumericNegative4ByteInt = kNumeric + 6; const uint8_t kNumericNegative3ByteInt = kNumeric + 7; const uint8_t kNumericNegative2ByteInt = kNumeric + 8; const uint8_t kNumericNegative1ByteInt = kNumeric + 9; -const uint8_t kNumericNegativeSmallDouble = kNumeric + 10; // between 0 and -1 exclusive +const uint8_t kNumericNegativeSmallMagnitude = kNumeric + 10; // between 0 and -1 exclusive const uint8_t kNumericZero = kNumeric + 11; -const uint8_t kNumericPositiveSmallDouble = kNumeric + 12; // between 0 and 1 exclusive +const uint8_t kNumericPositiveSmallMagnitude = kNumeric + 12; // between 0 and 1 exclusive const uint8_t kNumericPositive1ByteInt = kNumeric + 13; const uint8_t kNumericPositive2ByteInt = kNumeric + 14; const uint8_t kNumericPositive3ByteInt = kNumeric + 15; @@ -97,9 +97,9 @@ const uint8_t kNumericPositive5ByteInt = kNumeric + 17; const uint8_t kNumericPositive6ByteInt = kNumeric + 18; const uint8_t kNumericPositive7ByteInt = kNumeric + 19; const uint8_t kNumericPositive8ByteInt = kNumeric + 20; -const uint8_t kNumericPositiveLargeDouble = kNumeric + 21; // >= 2**63 including +Inf -static_assert(kNumericPositiveLargeDouble < kStringLike, - "kNumericPositiveLargeDouble < kStringLike"); +const uint8_t kNumericPositiveLargeMagnitude = kNumeric + 21; // >= 2**63 including +Inf +static_assert(kNumericPositiveLargeMagnitude < kStringLike, + "kNumericPositiveLargeMagnitude < kStringLike"); const uint8_t kBoolFalse = kBool + 0; const uint8_t kBoolTrue = kBool + 1; @@ -169,8 +169,54 @@ uint8_t bsonTypeToGenericKeyStringType(BSONType type) { } } +// Doubles smaller than this store only a single bit indicating a decimal continuation follows. +const double kTiniestDoubleWith2BitDCM = std::ldexp(1, -255); + +// Amount to add to exponent of doubles tinier than kTiniestDoubleWith2BitDCM to avoid subnormals. +const int kTinyDoubleExponentShift = 256; + +// Amount to multiply tiny doubles to perform a shift of the exponent by +// kSmallMagnitudeExponentShift. +const double kTinyDoubleExponentUpshiftFactor = std::ldexp(1, kTinyDoubleExponentShift); + +// Amount to multiply scaled tiny doubles by to recover the unscaled value. +const double kTinyDoubleExponentDownshiftFactor = std::ldexp(1, -kTinyDoubleExponentShift); + +// An underestimate of 2**256. +const Decimal128 kTinyDoubleExponentUpshiftFactorAsDecimal(std::ldexp(1, kTinyDoubleExponentShift), + Decimal128::kRoundTo34Digits, + Decimal128::kRoundTowardZero); + +// An underestimate of 2**(-256). +const Decimal128 kTinyDoubleExponentDownshiftFactorAsDecimal(std::ldexp(1, + -kTinyDoubleExponentShift), + Decimal128::kRoundTo34Digits, + Decimal128::kRoundTowardZero); + // First double that isn't an int64. -const double kMinLargeDouble = 9223372036854775808.0; // 1ULL<<63 +const double kMinLargeDouble = 1ULL << 63; + +// Integers larger than this may not be representable as doubles. +const double kMaxIntForDouble = 1ULL << 53; + +// Factors for scaling a double by powers of 256 to do a logical shift left of x bytes. +const double kPow256[] = {1.0, // 2**0 + 1.0 * 256, // 2**8 + 1.0 * 256 * 256, // 2**16 + 1.0 * 256 * 256 * 256, // 2**24 + 1.0 * 256 * 256 * 256 * 256, // 2**32 + 1.0 * 256 * 256 * 256 * 256 * 256, // 2**40 + 1.0 * 256 * 256 * 256 * 256 * 256 * 256, // 2**48 + 1.0 * 256 * 256 * 256 * 256 * 256 * 256 * 256}; // 2**56 +// Factors for scaling a double by negative powers of 256 to do a logical shift right of x bytes. +const double kInvPow256[] = {1.0, // 2**0 + 1.0 / 256, // 2**(-8) + 1.0 / 256 / 256, // 2**(-16) + 1.0 / 256 / 256 / 256, // 2**(-24) + 1.0 / 256 / 256 / 256 / 256, // 2**(-32) + 1.0 / 256 / 256 / 256 / 256 / 256, // 2**(-40) + 1.0 / 256 / 256 / 256 / 256 / 256 / 256, // 2**(-48) + 1.0 / 256 / 256 / 256 / 256 / 256 / 256 / 256}; // 2**(-56) const uint8_t kEnd = 0x4; @@ -479,65 +525,85 @@ void KeyString::_appendObject(const BSONObj& val, bool invert) { } void KeyString::_appendNumberDouble(const double num, bool invert) { - if (num == 0.0 && std::signbit(num)) { - _typeBits.appendNegativeZero(); - } else { + if (num == 0.0 && std::signbit(num)) + _typeBits.appendZero(TypeBits::kNegativeDoubleZero); + else _typeBits.appendNumberDouble(); - } - // no special cases needed for Inf, - // see http://en.wikipedia.org/wiki/IEEE_754-1985#Positive_and_negative_infinity - if (std::isnan(num)) { - _append(CType::kNumericNaN, invert); - return; - } - - if (num == 0.0) { - // We are collapsing -0.0 and 0.0 to the same value here. - // This is correct as IEEE-754 specifies that they compare as equal, - // however this prevents roundtripping -0.0. - // So if you put a -0.0 in, you'll get 0.0 out. - // We believe this to be ok. - _append(CType::kNumericZero, invert); - return; - } + _appendDoubleWithoutTypeBits(num, kDCMEqualToDouble, invert); +} +void KeyString::_appendDoubleWithoutTypeBits(const double num, + DecimalContinuationMarker dcm, + bool invert) { const bool isNegative = num < 0.0; const double magnitude = isNegative ? -num : num; - if (magnitude < 1.0) { - // This includes subnormal numbers. - _appendSmallDouble(num, invert); + // Tests are structured such that NaNs and infinities fall through correctly. + if (!(magnitude >= 1.0)) { + if (magnitude > 0.0) { + // This includes subnormal numbers. + _appendSmallDouble(num, dcm, invert); + } else if (num == 0.0) { + // We are collapsing -0.0 and 0.0 to the same value here, the type bits disambiguate. + _append(CType::kNumericZero, invert); + } else { + invariant(std::isnan(num)); + _append(CType::kNumericNaN, invert); + } return; } if (magnitude < kMinLargeDouble) { - uint64_t integerPart = uint64_t(magnitude); - if (double(integerPart) == magnitude) { + uint64_t integerPart = static_cast<uint64_t>(magnitude); + if (static_cast<double>(integerPart) == magnitude && dcm == kDCMEqualToDouble) { // No fractional part _appendPreshiftedIntegerPortion(integerPart << 1, isNegative, invert); return; } - // There is a fractional part. - _appendPreshiftedIntegerPortion((integerPart << 1) | 1, isNegative, invert); - - // Append the bytes of the mantissa that include fractional bits. - const size_t fractionalBits = (53 - (64 - countLeadingZeros64(integerPart))); - const size_t fractionalBytes = (fractionalBits + 7) / 8; - dassert(fractionalBytes > 0); - uint64_t mantissa; - memcpy(&mantissa, &num, sizeof(mantissa)); - mantissa &= ~(uint64_t(-1) << fractionalBits); // set non-fractional bits to 0; - mantissa = endian::nativeToBig(mantissa); - - const void* firstUsedByte = - reinterpret_cast<const char*>((&mantissa) + 1) - fractionalBytes; - _appendBytes(firstUsedByte, fractionalBytes, isNegative ? !invert : invert); - return; - } + if (version == Version::V0) { + invariant(dcm == kDCMEqualToDouble); + // There is a fractional part. + _appendPreshiftedIntegerPortion((integerPart << 1) | 1, isNegative, invert); + + // Append the bytes of the mantissa that include fractional bits. + const size_t fractionalBits = 53 - (64 - countLeadingZeros64(integerPart)); + const size_t fractionalBytes = (fractionalBits + 7) / 8; + dassert(fractionalBytes > 0); + uint64_t mantissa; + memcpy(&mantissa, &num, sizeof(mantissa)); + mantissa &= ~(uint64_t(-1) << fractionalBits); // set non-fractional bits to 0; - _appendLargeDouble(num, invert); + mantissa = endian::nativeToBig(mantissa); + + const void* firstUsedByte = + reinterpret_cast<const char*>((&mantissa) + 1) - fractionalBytes; + _appendBytes(firstUsedByte, fractionalBytes, isNegative ? !invert : invert); + } else { + const size_t fractionalBytes = countLeadingZeros64(integerPart << 1) / 8; + const auto ctype = isNegative ? CType::kNumericNegative8ByteInt + fractionalBytes + : CType::kNumericPositive8ByteInt - fractionalBytes; + _append(static_cast<uint8_t>(ctype), invert); + + // Multiplying the double by 256 to the power X is logically equivalent to shifting the + // fraction left by X bytes. + uint64_t encoding = static_cast<uint64_t>(magnitude * kPow256[fractionalBytes]); + dassert(encoding == magnitude * kPow256[fractionalBytes]); + + // Merge in the bit indicating the value has a fractional part by doubling the integer + // part and adding 1. This leaves encoding with the high 8-fractionalBytes bytes in the + // same form they'd have with _appendPreshiftedIntegerPortion(). The remaining low bytes + // are the fractional bytes left-shifted by 2 bits to make room for the DCM. + encoding += (integerPart + 1) << (fractionalBytes * 8); + invariant((encoding & 0x3ULL) == 0); + encoding |= dcm; + encoding = endian::nativeToBig(encoding); + _append(encoding, isNegative ? !invert : invert); + } + } else { + _appendLargeDouble(num, dcm, invert); + } } void KeyString::_appendNumberLong(const long long num, bool invert) { @@ -550,6 +616,148 @@ void KeyString::_appendNumberInt(const int num, bool invert) { _appendInteger(num, invert); } +void KeyString::_appendNumberDecimal(const Decimal128 dec, bool invert) { + bool isNegative = dec.isNegative(); + if (dec.isZero()) { + uint32_t zeroExp = dec.getBiasedExponent(); + if (isNegative) + zeroExp += Decimal128::kMaxBiasedExponent + 1; + + _typeBits.appendDecimalZero(zeroExp); + _append(CType::kNumericZero, invert); + return; + } + + if (dec.isNaN()) { + _append(CType::kNumericNaN, invert); + _typeBits.appendNumberDecimal(); + return; + } + + if (dec.isInfinite()) { + _append(isNegative ? CType::kNumericNegativeLargeMagnitude + : CType::kNumericPositiveLargeMagnitude, + invert); + const uint64_t infinity = ~0ULL; + _append(infinity, isNegative ? !invert : invert); + _typeBits.appendNumberDecimal(); + return; + } + + const uint32_t biasedExponent = dec.getBiasedExponent(); + dassert(biasedExponent <= Decimal128::kInfinityExponent); + _typeBits.appendNumberDecimal(); + _typeBits.appendDecimalExponent(biasedExponent & TypeBits::kStoredDecimalExponentMask); + + uint32_t signalingFlags = Decimal128::kNoFlag; + double bin = dec.toDouble(&signalingFlags, Decimal128::kRoundTowardZero); + + // Easy case: the decimal actually is a double. True for many integers, fractions like 1.5, etc. + if (!Decimal128::hasFlag(signalingFlags, Decimal128::kInexact) && + !Decimal128::hasFlag(signalingFlags, Decimal128::kOverflow)) { + _appendDoubleWithoutTypeBits(bin, kDCMEqualToDouble, invert); + return; + } + + // Values smaller than the double normalized range need special handling: a regular double + // wouldn't give 15 digits, if any at all. + if (std::abs(bin) < std::numeric_limits<double>::min()) { + _appendTinyDecimalWithoutTypeBits(dec, bin, invert); + return; + } + + // Huge finite values are encoded directly. Because the value is not exact, and truncates + // to the maximum double, the original decimal was outside of the range of finite doubles. + // Because all decimals larger than the max finite double round down to that value, strict + // less-than would be incorrect. + if (std::abs(bin) >= std::numeric_limits<double>::max()) { + _appendHugeDecimalWithoutTypeBits(dec, invert); + return; + } + + const auto roundAwayFromZero = + isNegative ? Decimal128::kRoundTowardNegative : Decimal128::kRoundTowardPositive; + const uint64_t k1E15 = 1E15; // Exact in both double and int64_t. + + // If the conditions below fall through, a decimal continuation is needed to represent the + // difference between the stored value and the actual decimal. All paths that fall through + // must set 'storedValue', overwriting the NaN. + Decimal128 storedValue = Decimal128::kPositiveNaN; + + // For doubles in this range, 'bin' may have lost precision in the integer part, which would + // lead to miscompares with integers. So, instead handle explicitly. + if ((bin <= -kMaxIntForDouble || bin >= kMaxIntForDouble) && bin > -kMinLargeDouble && + bin < kMinLargeDouble) { + uint32_t signalingFlags = Decimal128::kNoFlag; + Decimal128 truncated = dec.quantize( + Decimal128::kNormalizedZero, &signalingFlags, Decimal128::kRoundTowardZero); + dassert(truncated.getBiasedExponent() == Decimal128::kExponentBias); + dassert(truncated.getCoefficientHigh() == 0 && + truncated.getCoefficientLow() < (1ULL << 63)); + int64_t integerPart = truncated.getCoefficientLow(); + bool hasFraction = Decimal128::hasFlag(signalingFlags, Decimal128::kInexact); + bool isNegative = truncated.isNegative(); + bool has8bytes = integerPart >= (1LL << 55); + uint64_t preshifted = integerPart << 1; + _appendPreshiftedIntegerPortion(preshifted | hasFraction, isNegative, invert); + if (!hasFraction) + return; + if (!has8bytes) { + // A Fraction byte follows, but the leading 7 bytes already encode 53 bits of the + // coefficient, so just store the DCM. + uint8_t dcm = hasFraction ? kDCMHasContinuationLargerThanDoubleRoundedUpTo15Digits + : kDCMEqualToDouble; + _append(dcm, isNegative ? !invert : invert); + } + + storedValue = Decimal128(isNegative, Decimal128::kExponentBias, 0, integerPart); + + // Common case: the coefficient less than 1E15, so at most 15 digits, and the number is + // in the normal range of double, so the decimal can be represented with at least 15 digits + // of precision by the double 'bin' + } else if (dec.getCoefficientHigh() == 0 && dec.getCoefficientLow() < k1E15) { + dassert(Decimal128(std::abs(bin), + Decimal128::kRoundTo15Digits, + Decimal128::kRoundTowardPositive).isEqual(dec.toAbs())); + _appendDoubleWithoutTypeBits(bin, kDCMEqualToDoubleRoundedUpTo15Digits, invert); + return; + } else { + // The coefficient has more digits, but may still be 15 digits after removing trailing + // zeros. + Decimal128 decFromBin = Decimal128(bin, Decimal128::kRoundTo15Digits, roundAwayFromZero); + if (decFromBin.isEqual(dec)) { + _appendDoubleWithoutTypeBits(bin, kDCMEqualToDoubleRoundedUpTo15Digits, invert); + return; + } + // Harder cases: decimal continuation is needed. + // First store the double and kind of continuation needed. + DecimalContinuationMarker dcm = dec.isLess(decFromBin) != isNegative + ? kDCMHasContinuationLessThanDoubleRoundedUpTo15Digits + : kDCMHasContinuationLargerThanDoubleRoundedUpTo15Digits; + _appendDoubleWithoutTypeBits(bin, dcm, invert); + + // Note that 'dec' and 'bin' can be negative. + storedValue = Decimal128(bin, Decimal128::kRoundTo34Digits, roundAwayFromZero); + } + invariant(!storedValue.isNaN()); // Should have been set explicitly. + storedValue = storedValue.normalize(); // Normalize to 34 digits to fix decDiff exponent. + + Decimal128 decDiff = dec.subtract(storedValue); + invariant(decDiff.isNegative() == dec.isNegative() || decDiff.isZero()); + invariant(decDiff.getBiasedExponent() == storedValue.getBiasedExponent()); + invariant(decDiff.getCoefficientHigh() == 0); + + // Now we know that we can recover the original decimal value (but not its precision, which is + // given by the type bits) from the binary double plus the decimal continuation. + uint64_t decimalContinuation = decDiff.getCoefficientLow(); + dassert(storedValue.add(Decimal128(isNegative, + storedValue.getBiasedExponent(), + 0, + decimalContinuation)).isEqual(dec)); + decimalContinuation = endian::nativeToBig(decimalContinuation); + _append(decimalContinuation, isNegative ? !invert : invert); +} + void KeyString::_appendBsonValue(const BSONElement& elem, bool invert, const StringData* name) { if (name) { _appendBytes(name->rawData(), name->size() + 1, invert); // + 1 for NUL @@ -621,6 +829,12 @@ void KeyString::_appendBsonValue(const BSONElement& elem, bool invert, const Str case NumberLong: _appendNumberLong(elem._numberLong(), invert); break; + case NumberDecimal: + uassert(ErrorCodes::UnsupportedFormat, + "Index version does not support NumberDecimal", + version >= Version::V1); + _appendNumberDecimal(elem._numberDecimal(), invert); + break; default: invariant(false); @@ -656,36 +870,138 @@ void KeyString::_appendBson(const BSONObj& obj, bool invert) { _append(int8_t(0), invert); } -void KeyString::_appendSmallDouble(double value, bool invert) { - dassert(!std::isnan(value)); - dassert(value != 0.0); - - uint64_t data; - memcpy(&data, &value, sizeof(data)); - - if (value > 0) { - _append(CType::kNumericPositiveSmallDouble, invert); - _append(endian::nativeToBig(data), invert); +void KeyString::_appendSmallDouble(double value, DecimalContinuationMarker dcm, bool invert) { + bool isNegative = value < 0; + double magnitude = isNegative ? -value : value; + dassert(!std::isnan(value) && value != 0 && magnitude < 1); + + _append(isNegative ? CType::kNumericNegativeSmallMagnitude + : CType::kNumericPositiveSmallMagnitude, + invert); + + uint64_t encoded; + + if (version == KeyString::Version::V0) { + // Not using magnitude to preserve sign bit in V0 + memcpy(&encoded, &value, sizeof(encoded)); + } else if (magnitude >= kTiniestDoubleWith2BitDCM) { + // Values in the range [2**(-255), 1) get the prefix 0b11 + memcpy(&encoded, &magnitude, sizeof(encoded)); + dassert((encoded & (0x3ULL << 62)) == 0); + encoded <<= 2; + encoded |= dcm; + dassert(encoded >> 62 == 0x3); } else { - _append(CType::kNumericNegativeSmallDouble, invert); - _append(endian::nativeToBig(data), !invert); + // Values in the range [numeric_limits<double>::denorm_min(), 2**(-255)) get the prefixes + // 0b01 or 0b10. The 0b00 prefix is used by _appendHugeDecimalWithoutTypeBits for decimals + // smaller than that. + magnitude *= kTinyDoubleExponentUpshiftFactor; + memcpy(&encoded, &magnitude, sizeof(encoded)); + encoded <<= 1; + encoded |= (dcm != kDCMEqualToDouble); + // Change the two most significant bits from 0b00 or 0b01 to 0b01 or 0b10. + encoded += (1ULL << 62); + dassert(encoded >> 62 == 0x1 || encoded >> 62 == 0x2); } + + _append(endian::nativeToBig(encoded), isNegative ? !invert : invert); } -void KeyString::_appendLargeDouble(double value, bool invert) { +void KeyString::_appendLargeDouble(double value, DecimalContinuationMarker dcm, bool invert) { dassert(!std::isnan(value)); dassert(value != 0.0); - uint64_t data; - memcpy(&data, &value, sizeof(data)); + _append(value > 0 ? CType::kNumericPositiveLargeMagnitude + : CType::kNumericNegativeLargeMagnitude, + invert); - if (value > 0) { - _append(CType::kNumericPositiveLargeDouble, invert); - _append(endian::nativeToBig(data), invert); - } else { - _append(CType::kNumericNegativeLargeDouble, invert); - _append(endian::nativeToBig(data), !invert); + uint64_t encoded; + memcpy(&encoded, &value, sizeof(encoded)); + + if (version != Version::V0) { + if (std::isfinite(value)) { + encoded <<= 1; + encoded &= ~(1ULL << 63); + encoded |= (dcm != kDCMEqualToDouble); + } else { + encoded = ~0ULL; // infinity + } } + encoded = endian::nativeToBig(encoded); + _append(encoded, value > 0 ? invert : !invert); +} + +void KeyString::_appendTinyDecimalWithoutTypeBits(const Decimal128 dec, + const double bin, + bool invert) { + // This function is only for 'dec' that doesn't exactly equal a double, but rounds to 'bin' + dassert(bin == dec.toDouble(Decimal128::kRoundTowardZero)); + dassert(std::abs(bin) < DBL_MIN); + const bool isNegative = dec.isNegative(); + Decimal128 magnitude = isNegative ? dec.negate() : dec; + + _append(isNegative ? CType::kNumericNegativeSmallMagnitude + : CType::kNumericPositiveSmallMagnitude, + invert); + + // For decimals smaller than the smallest subnormal double, just store the decimal number + if (bin == 0.0) { + Decimal128 normalized = magnitude.normalize(); + uint64_t hi = normalized.getValue().high64; + uint64_t lo = normalized.getValue().low64; + invariant((hi & (0x3ULL << 62)) == 0); + _append(endian::nativeToBig(hi), isNegative ? !invert : invert); + _append(endian::nativeToBig(lo), isNegative ? !invert : invert); + return; + } + // Encode decimal in subnormal double range by scaling in the decimal domain. Round down at + // each step, but ensure not to get below the subnormal double. This will ensure that + // 'scaledBin' is monotonically increasing and will only be off by at most a few units in the + // last place, so the decimal continuation will stay in range. + Decimal128 scaledDec = + magnitude.multiply(kTinyDoubleExponentUpshiftFactorAsDecimal, Decimal128::kRoundTowardZero); + double scaledBin = scaledDec.toDouble(Decimal128::kRoundTowardZero); + + // Here we know that scaledBin contains the first 15 significant digits of scaled dec, and + // sorts correctly with scaled double. + scaledBin = std::max(scaledBin, std::abs(bin) * kTinyDoubleExponentUpshiftFactor); + uint64_t encoded; + memcpy(&encoded, &scaledBin, sizeof(encoded)); + encoded <<= 1; + encoded |= 1; // Even if decDiff.isZero() we aren't exactly equal + encoded += (1ULL << 62); + dassert(encoded >> 62 == 0x1); + _append(endian::nativeToBig(encoded), isNegative ? !invert : invert); + + Decimal128 storedVal(scaledBin, Decimal128::kRoundTo34Digits, Decimal128::kRoundTowardPositive); + storedVal = storedVal.multiply(kTinyDoubleExponentDownshiftFactorAsDecimal, + Decimal128::kRoundTowardZero) + .add(Decimal128::kLargestNegativeExponentZero); + dassert(storedVal.isLess(magnitude)); + Decimal128 decDiff = magnitude.subtract(storedVal); + dassert(decDiff.getBiasedExponent() == storedVal.getBiasedExponent() || decDiff.isZero()); + dassert(decDiff.getCoefficientHigh() == 0 && !decDiff.isNegative()); + uint64_t continuation = decDiff.getCoefficientLow(); + _append(endian::nativeToBig(continuation), isNegative ? !invert : invert); +} + + +void KeyString::_appendHugeDecimalWithoutTypeBits(const Decimal128 dec, bool invert) { + // To allow us to use CType::kNumericNegativeLargeMagnitude we need to fit between the highest + // finite double and the representation of +/-Inf. We do this by forcing the high bit to 1 + // (large doubles always have 0) and never encoding ~0 here. + + const bool isNegative = dec.isNegative(); + Decimal128 normalizedMagnitude = (isNegative ? dec.negate() : dec).normalize(); + uint64_t hi = normalizedMagnitude.getValue().high64; + uint64_t lo = normalizedMagnitude.getValue().low64; + dassert(hi < (1ULL << 63)); + hi |= (1ULL << 63); + _append(isNegative ? CType::kNumericNegativeLargeMagnitude + : CType::kNumericPositiveLargeMagnitude, + invert); + _append(endian::nativeToBig(hi), isNegative ? !invert : invert); + _append(endian::nativeToBig(lo), isNegative ? !invert : invert); } // Handles NumberLong and NumberInt which are encoded identically except for the TypeBits. @@ -694,7 +1010,7 @@ void KeyString::_appendInteger(const long long num, bool invert) { // -2**63 is exactly representable as a double and not as a positive int64. // Therefore we encode it as a double. dassert(-double(num) == kMinLargeDouble); - _appendLargeDouble(double(num), invert); + _appendLargeDouble(static_cast<double>(num), kDCMEqualToDouble, invert); return; } @@ -708,10 +1024,9 @@ void KeyString::_appendInteger(const long long num, bool invert) { _appendPreshiftedIntegerPortion(magnitude << 1, isNegative, invert); } - void KeyString::_appendPreshiftedIntegerPortion(uint64_t value, bool isNegative, bool invert) { - dassert(value != 0ull); - dassert(value != 1ull); + dassert(value != 0ULL); + dassert(value != 1ULL); const size_t bytesNeeded = (64 - countLeadingZeros64(value) + 7) / 8; @@ -753,26 +1068,53 @@ void toBsonValue(uint8_t ctype, BufReader* reader, TypeBits::Reader* typeBits, bool inverted, + KeyString::Version version, BSONObjBuilderValueStream* stream); -void toBson(BufReader* reader, TypeBits::Reader* typeBits, bool inverted, BSONObjBuilder* builder) { +void toBson(BufReader* reader, + TypeBits::Reader* typeBits, + bool inverted, + KeyString::Version version, + BSONObjBuilder* builder) { while (readType<uint8_t>(reader, inverted) != 0) { if (inverted) { std::string name = readInvertedCString(reader); BSONObjBuilderValueStream& stream = *builder << name; - toBsonValue(readType<uint8_t>(reader, inverted), reader, typeBits, inverted, &stream); + toBsonValue( + readType<uint8_t>(reader, inverted), reader, typeBits, inverted, version, &stream); } else { StringData name = readCString(reader); BSONObjBuilderValueStream& stream = *builder << name; - toBsonValue(readType<uint8_t>(reader, inverted), reader, typeBits, inverted, &stream); + toBsonValue( + readType<uint8_t>(reader, inverted), reader, typeBits, inverted, version, &stream); } } } +/** + * Helper function to read the least significant type bits for 'num' and return a value that + * is numerically equal to 'num', but has its exponent adjusted to match the stored exponent bits. + */ +Decimal128 adjustDecimalExponent(TypeBits::Reader* typeBits, Decimal128 num); + +/** + * Helper function that takes a 'num' with 15 decimal digits of precision, normalizes it to 34 + * digits and reads a 64-bit (19-digit) continuation to obtain the full 34-bit value. + */ +Decimal128 readDecimalContinuation(BufReader* reader, bool inverted, Decimal128 num) { + uint32_t flags = Decimal128::kNoFlag; + uint64_t continuation = endian::bigToNative(readType<uint64_t>(reader, inverted)); + num = num.normalize(); + num = num.add(Decimal128(num.isNegative(), num.getBiasedExponent(), 0, continuation), &flags); + invariant(!(Decimal128::hasFlag(flags, Decimal128::kInexact))); + return num; +} + void toBsonValue(uint8_t ctype, BufReader* reader, TypeBits::Reader* typeBits, bool inverted, + KeyString::Version version, BSONObjBuilderValueStream* stream) { // This is only used by the kNumeric.*ByteInt types, but needs to be declared up here // since it is used across a fallthrough. @@ -861,7 +1203,7 @@ void toBsonValue(uint8_t ctype, } // Not going to optimize CodeWScope. BSONObjBuilder scope; - toBson(reader, typeBits, inverted, &scope); + toBson(reader, typeBits, inverted, version, &scope); *stream << BSONCodeWScope(code, scope.done()); break; } @@ -916,7 +1258,7 @@ void toBsonValue(uint8_t ctype, case CType::kObject: { BSONObjBuilder subObj(stream->subobjStart()); - toBson(reader, typeBits, inverted, &subObj); + toBson(reader, typeBits, inverted, version, &subObj); break; } @@ -929,6 +1271,7 @@ void toBsonValue(uint8_t ctype, reader, typeBits, inverted, + version, &(subArr << BSONObjBuilder::numStr(index++))); } break; @@ -938,13 +1281,20 @@ void toBsonValue(uint8_t ctype, // Numerics // - case CType::kNumericNaN: - invariant(typeBits->readNumeric() == TypeBits::kDouble); - *stream << std::numeric_limits<double>::quiet_NaN(); + case CType::kNumericNaN: { + auto type = typeBits->readNumeric(); + if (type == TypeBits::kDouble) { + *stream << std::numeric_limits<double>::quiet_NaN(); + } else { + invariant(type == TypeBits::kDecimal && version == KeyString::Version::V1); + *stream << Decimal128::kPositiveNaN; + } break; + } - case CType::kNumericZero: - switch (typeBits->readNumeric()) { + case CType::kNumericZero: { + uint8_t zeroType = typeBits->readZero(); + switch (zeroType) { case TypeBits::kDouble: *stream << 0.0; break; @@ -954,35 +1304,190 @@ void toBsonValue(uint8_t ctype, case TypeBits::kLong: *stream << 0ll; break; - case TypeBits::kNegativeZero: + case TypeBits::kNegativeDoubleZero: *stream << -0.0; break; + default: + const uint32_t whichZero = typeBits->readDecimalZero(zeroType); + const bool isNegative = whichZero > Decimal128::kMaxBiasedExponent; + const uint32_t biasedExponent = + isNegative ? whichZero - (Decimal128::kMaxBiasedExponent + 1) : whichZero; + + *stream << Decimal128(isNegative, biasedExponent, 0, 0); + break; } break; + } - case CType::kNumericNegativeLargeDouble: - case CType::kNumericNegativeSmallDouble: + case CType::kNumericNegativeLargeMagnitude: inverted = !inverted; + isNegative = true; // fallthrough (format is the same as positive, but inverted) - - case CType::kNumericPositiveLargeDouble: - case CType::kNumericPositiveSmallDouble: { - // for these, the raw double was stored intact, including sign bit. + case CType::kNumericPositiveLargeMagnitude: { const uint8_t originalType = typeBits->readNumeric(); + invariant(version > KeyString::Version::V0 || originalType != TypeBits::kDecimal); uint64_t encoded = readType<uint64_t>(reader, inverted); encoded = endian::bigToNative(encoded); - double d; - memcpy(&d, &encoded, sizeof(d)); + bool hasDecimalContinuation = false; + double bin; + + // Backward compatibility + if (version == KeyString::Version::V0) { + memcpy(&bin, &encoded, sizeof(bin)); + } else if (!(encoded & (1ULL << 63))) { // In range of (finite) doubles + hasDecimalContinuation = encoded & 1; + encoded >>= 1; // remove decimal continuation marker + encoded |= 1ULL << 62; // implied leading exponent bit + memcpy(&bin, &encoded, sizeof(bin)); + if (isNegative) + bin = -bin; + } else if (encoded == ~0ULL) { // infinity + bin = isNegative ? -std::numeric_limits<double>::infinity() + : std::numeric_limits<double>::infinity(); + } else { // Huge decimal number, directly output + invariant(originalType == TypeBits::kDecimal); + uint64_t highbits = encoded & ~(1ULL << 63); + uint64_t lowbits = endian::bigToNative(readType<uint64_t>(reader, inverted)); + Decimal128 dec(Decimal128::Value{lowbits, highbits}); + if (isNegative) + dec = dec.negate(); + dec = adjustDecimalExponent(typeBits, dec); + *stream << dec; + break; + } + // 'bin' contains the value of the input, rounded toward zero in case of decimal if (originalType == TypeBits::kDouble) { - *stream << d; - } else { + *stream << bin; + } else if (originalType == TypeBits::kLong) { // This can only happen for a single number. - invariant(originalType == TypeBits::kLong); - invariant(d == double(std::numeric_limits<long long>::min())); + invariant(bin == static_cast<double>(std::numeric_limits<long long>::min())); *stream << std::numeric_limits<long long>::min(); + } else { + invariant(originalType == TypeBits::kDecimal && version != KeyString::Version::V0); + const auto roundAwayFromZero = isNegative ? Decimal128::kRoundTowardNegative + : Decimal128::kRoundTowardPositive; + Decimal128 dec(bin, Decimal128::kRoundTo34Digits, roundAwayFromZero); + if (hasDecimalContinuation) + dec = readDecimalContinuation(reader, inverted, dec); + dec = adjustDecimalExponent(typeBits, dec); + *stream << dec; + } + break; + } + + case CType::kNumericNegativeSmallMagnitude: + inverted = !inverted; + isNegative = true; + // fallthrough (format is the same as positive, but inverted) + + case CType::kNumericPositiveSmallMagnitude: { + const uint8_t originalType = typeBits->readNumeric(); + uint64_t encoded = readType<uint64_t>(reader, inverted); + encoded = endian::bigToNative(encoded); + + if (version == KeyString::Version::V0) { + // for these, the raw double was stored intact, including sign bit. + invariant(originalType == TypeBits::kDouble); + double d; + memcpy(&d, &encoded, sizeof(d)); + *stream << d; + break; } + switch (encoded >> 62) { + case 0x0: { + // Teeny tiny decimal, smaller magnitude than 2**(-1074) + uint64_t lowbits = readType<uint64_t>(reader, inverted); + lowbits = endian::bigToNative(lowbits); + Decimal128 dec = Decimal128(Decimal128::Value{lowbits, encoded}); + dec = adjustDecimalExponent(typeBits, dec); + if (ctype == CType::kNumericNegativeSmallMagnitude) + dec = dec.negate(); + *stream << dec; + break; + } + case 0x1: + case 0x2: { + // Tiny double or decimal, magnitude from 2**(-1074) to 2**(-255), exclusive. + // The exponent is shifted by 256 in order to avoid subnormals, which would + // result in less than 15 significant digits. Because 2**(-255) has 179 + // decimal digits, no doubles exactly equal decimals, so all decimals have + // a continuation. The bit is still needed for comparison purposes. + bool hasDecimalContinuation = encoded & 1; + encoded -= 1ULL << 62; + encoded >>= 1; + double scaledBin; + memcpy(&scaledBin, &encoded, sizeof(scaledBin)); + if (originalType == TypeBits::kDouble) { + invariant(!hasDecimalContinuation); + double bin = scaledBin * kTinyDoubleExponentDownshiftFactor; + *stream << (isNegative ? -bin : bin); + break; + } + invariant(originalType == TypeBits::kDecimal && hasDecimalContinuation); + + // If the actual double would be subnormal, scale in decimal domain. + Decimal128 dec; + if (scaledBin < DBL_MIN * kTinyDoubleExponentUpshiftFactor) { + // For conversion from binary->decimal scale away from zero, + // otherwise round toward. Needs to be done consistently in read/write. + + Decimal128 scaledDec = Decimal128(scaledBin, + Decimal128::kRoundTo34Digits, + Decimal128::kRoundTowardPositive); + dec = scaledDec.multiply(kTinyDoubleExponentDownshiftFactorAsDecimal, + Decimal128::kRoundTowardZero); + } else { + double bin = scaledBin * kTinyDoubleExponentDownshiftFactor; + dec = Decimal128( + bin, Decimal128::kRoundTo34Digits, Decimal128::kRoundTowardPositive); + } + + dec = readDecimalContinuation(reader, inverted, dec); + *stream << adjustDecimalExponent(typeBits, isNegative ? dec.negate() : dec); + break; + } + case 0x3: { + // Small double, 2**(-255) or more in magnitude. Common case. + auto dcm = static_cast<KeyString::DecimalContinuationMarker>(encoded & 3); + encoded >>= 2; + double bin; + memcpy(&bin, &encoded, sizeof(bin)); + if (originalType == TypeBits::kDouble) { + invariant(dcm == KeyString::kDCMEqualToDouble); + *stream << (isNegative ? -bin : bin); + break; + } + + // Deal with decimal cases + invariant(originalType == TypeBits::kDecimal); + Decimal128 dec; + switch (dcm) { + case KeyString::kDCMEqualToDoubleRoundedUpTo15Digits: + dec = Decimal128(bin, + Decimal128::kRoundTo15Digits, + Decimal128::kRoundTowardPositive); + break; + case KeyString::kDCMEqualToDouble: + dec = Decimal128(bin, + Decimal128::kRoundTo34Digits, + Decimal128::kRoundTowardPositive); + break; + case KeyString::kDCMHasContinuationLessThanDoubleRoundedUpTo15Digits: + case KeyString::kDCMHasContinuationLargerThanDoubleRoundedUpTo15Digits: + // Deal with decimal continuation + dec = Decimal128(bin, + Decimal128::kRoundTo34Digits, + Decimal128::kRoundTowardPositive); + dec = readDecimalContinuation(reader, inverted, dec); + } + *stream << adjustDecimalExponent(typeBits, isNegative ? dec.negate() : dec); + break; + } + default: + MONGO_UNREACHABLE; + } break; } @@ -1018,7 +1523,7 @@ void toBsonValue(uint8_t ctype, } const bool haveFractionalPart = (encodedIntegerPart & 1); - long long integerPart = encodedIntegerPart >> 1; + int64_t integerPart = encodedIntegerPart >> 1; if (!haveFractionalPart) { if (isNegative) @@ -1032,25 +1537,30 @@ void toBsonValue(uint8_t ctype, *stream << int(integerPart); break; case TypeBits::kLong: - *stream << integerPart; + *stream << static_cast<long long>(integerPart); + break; + case TypeBits::kDecimal: + *stream << adjustDecimalExponent(typeBits, Decimal128(integerPart)); break; - case TypeBits::kNegativeZero: - invariant(false); + default: + MONGO_UNREACHABLE; } - } else { - // Nothing else can have a fractional part. - invariant(originalType == TypeBits::kDouble); + break; + } + // KeyString V0: anything fractional is a double + if (version == KeyString::Version::V0) { + invariant(originalType == TypeBits::kDouble); const uint64_t exponent = (64 - countLeadingZeros64(integerPart)) - 1; const size_t fractionalBits = (52 - exponent); const size_t fractionalBytes = (fractionalBits + 7) / 8; // build up the bits of a double here. uint64_t doubleBits = integerPart << fractionalBits; - doubleBits &= ~(1ull << 52); // clear implicit leading 1 + doubleBits &= ~(1ULL << 52); // clear implicit leading 1 doubleBits |= (exponent + 1023 /*bias*/) << 52; if (isNegative) { - doubleBits |= (1ull << 63); // sign bit + doubleBits |= (1ULL << 63); // sign bit } for (size_t i = 0; i < fractionalBytes; i++) { // fold in the fractional bytes @@ -1061,14 +1571,104 @@ void toBsonValue(uint8_t ctype, double number; memcpy(&number, &doubleBits, sizeof(number)); *stream << number; + break; + } + + // KeyString V1: all numeric values with fractions have at least 8 bytes. + // Start with integer part, and read until we have a full 8 bytes worth of data. + const size_t fracBytes = 8 - CType::numBytesForInt(ctype); + uint64_t encodedFraction = integerPart; + + for (int fracBytesRemaining = fracBytes; fracBytesRemaining; fracBytesRemaining--) + encodedFraction = (encodedFraction << 8) | readType<uint8_t>(reader, inverted); + + // Zero out the DCM and convert the whole binary fraction + double bin = static_cast<double>(encodedFraction & ~3ULL) * kInvPow256[fracBytes]; + if (originalType == TypeBits::kDouble) { + *stream << (isNegative ? -bin : bin); + break; } + // The two lsb's are the DCM, except for the 8-byte case, where it's already known + KeyString::DecimalContinuationMarker dcm = fracBytes + ? static_cast<KeyString::DecimalContinuationMarker>(encodedFraction & 3) + : KeyString::kDCMHasContinuationLargerThanDoubleRoundedUpTo15Digits; + + // Deal with decimal cases + invariant(originalType == TypeBits::kDecimal); + Decimal128 dec; + switch (dcm) { + case KeyString::kDCMEqualToDoubleRoundedUpTo15Digits: + dec = Decimal128( + bin, Decimal128::kRoundTo15Digits, Decimal128::kRoundTowardPositive); + break; + case KeyString::kDCMEqualToDouble: + dec = Decimal128( + bin, Decimal128::kRoundTo34Digits, Decimal128::kRoundTowardPositive); + break; + default: + // Deal with decimal continuation + dec = integerPart > kMaxIntForDouble + ? Decimal128(integerPart) + : Decimal128( + bin, Decimal128::kRoundTo34Digits, Decimal128::kRoundTowardPositive); + dec = readDecimalContinuation(reader, inverted, dec); + } + *stream << adjustDecimalExponent(typeBits, isNegative ? dec.negate() : dec); break; } default: invariant(false); } } + + +Decimal128 adjustDecimalExponent(TypeBits::Reader* typeBits, Decimal128 num) { + // The last 6 bits of the exponent are stored in the type bits. First figure out if the exponent + // of 'num' is too high or too low. Even for a non-zero number with only a single significant + // digit, there are only 34 possiblities while exponents with the given low 6 bits are spaced + // (1 << 6) == 64 apart. This is not quite enough to figure out whether to shift the expnent up + // or down when the difference is for example 32 in either direction. However, if the high part + // of the coefficient is zero, the coefficient can only be scaled down by up to 1E19 (increasing + // the exponent by 19), as 2**64 < 1E20. If scaling down to match the higher exponent isn't + // possible, we must be able to scale up. Scaling always must be exact and not change the value. + const uint32_t kMaxExpAdjust = 33; + const uint32_t kMaxExpIncrementForZeroHighCoefficient = 19; + dassert(!num.isZero()); + const uint32_t origExp = num.getBiasedExponent(); + const uint8_t storedBits = typeBits->readDecimalExponent(); + + uint32_t highExp = (origExp & ~KeyString::TypeBits::kStoredDecimalExponentMask) | storedBits; + + // Start by determining an exponent that's not less than num's and matches the stored bits. + if (highExp < origExp) + highExp += (1U << KeyString::TypeBits::kStoredDecimalExponentBits); + + // This must be the right exponent, as no scaling is required. + if (highExp == origExp) + return num; + + // For increasing the exponent, quantize the existing number. This must be + // exact, as the value stays in the same cohort. + if (highExp <= origExp + kMaxExpAdjust && + (num.getCoefficientHigh() != 0 || + highExp <= origExp + kMaxExpIncrementForZeroHighCoefficient)) { + // Increase exponent and decrease (right shift) coefficient. + uint32_t flags = Decimal128::SignalingFlag::kNoFlag; + auto quantized = num.quantize(Decimal128(0, highExp, 0, 1), &flags); + invariant(flags == Decimal128::SignalingFlag::kNoFlag); // must be exact + num = quantized; + } else { + // Decrease exponent and increase (left shift) coefficient. + uint32_t lowExp = highExp - (1U << KeyString::TypeBits::kStoredDecimalExponentBits); + invariant(lowExp >= origExp - kMaxExpAdjust); + num = num.add(Decimal128(0, lowExp, 0, 0)); + } + dassert((num.getBiasedExponent() & KeyString::TypeBits::kStoredDecimalExponentMask) == + (highExp & KeyString::TypeBits::kStoredDecimalExponentMask)); + return num; +} + } // namespace BSONObj KeyString::toBson(const char* buffer, size_t len, Ordering ord, const TypeBits& typeBits) { @@ -1088,7 +1688,7 @@ BSONObj KeyString::toBson(const char* buffer, size_t len, Ordering ord, const Ty if (ctype == kEnd) break; - toBsonValue(ctype, &reader, &typeBitsReader, invert, &(builder << "")); + toBsonValue(ctype, &reader, &typeBitsReader, invert, typeBits.version, &(builder << "")); } return builder.obj(); } @@ -1201,6 +1801,52 @@ void KeyString::TypeBits::appendBit(uint8_t oneOrZero) { _curBit++; } +void KeyString::TypeBits::appendZero(uint8_t zeroType) { + switch (zeroType) { + // 2-bit encodings + case kInt: + case kDouble: + case kLong: + appendBit(zeroType >> 1); + appendBit(zeroType & 1); + break; + case kNegativeDoubleZero: + if (version == Version::V0) { + appendBit(kV0NegativeDoubleZero >> 1); + appendBit(kV0NegativeDoubleZero & 1); + break; + } + zeroType = kV1NegativeDoubleZero; + // fallthrough for 5-bit encodings + case kDecimalZero0xxx: + case kDecimalZero1xxx: + case kDecimalZero2xxx: + case kDecimalZero3xxx: + case kDecimalZero4xxx: + case kDecimalZero5xxx: + // first two bits output are ones + dassert((zeroType >> 3) == 3); + for (int bitPos = 4; bitPos >= 0; bitPos--) + appendBit((zeroType >> bitPos) & 1); + break; + default: + MONGO_UNREACHABLE; + } +} + +void KeyString::TypeBits::appendDecimalZero(uint32_t whichZero) { + invariant((whichZero >> 12) <= kDecimalZero5xxx - kDecimalZero0xxx); + appendZero((whichZero >> 12) + kDecimalZero0xxx); + for (int bitPos = 11; bitPos >= 0; bitPos--) + appendBit((whichZero >> bitPos) & 1); +} + +void KeyString::TypeBits::appendDecimalExponent(uint8_t storedExponentBits) { + invariant(storedExponentBits < (1U << kStoredDecimalExponentBits)); + for (int bitPos = kStoredDecimalExponentBits - 1; bitPos >= 0; bitPos--) + appendBit((storedExponentBits >> bitPos) & 1); +} + uint8_t KeyString::TypeBits::Reader::readBit() { if (_typeBits._isAllZeros) return 0; @@ -1214,4 +1860,32 @@ uint8_t KeyString::TypeBits::Reader::readBit() { return (_typeBits._buf[byte] & (1 << offsetInByte)) ? 1 : 0; } +uint8_t KeyString::TypeBits::Reader::readZero() { + uint8_t res = readNumeric(); + + // For keyString v1, negative and decimal zeros require at least 3 more bits. + if (_typeBits.version != Version::V0 && res == kSpecialZeroPrefix) { + res = (res << 1) | readBit(); + res = (res << 1) | readBit(); + res = (res << 1) | readBit(); + } + if (res == kV1NegativeDoubleZero || res == kV0NegativeDoubleZero) + res = kNegativeDoubleZero; + return res; +} + +uint32_t KeyString::TypeBits::Reader::readDecimalZero(uint8_t zeroType) { + uint32_t whichZero = zeroType - TypeBits::kDecimalZero0xxx; + for (int bitPos = 11; bitPos >= 0; bitPos--) + whichZero = (whichZero << 1) | readBit(); + + return whichZero; +} + +uint8_t KeyString::TypeBits::Reader::readDecimalExponent() { + uint8_t exponentBits = 0; + for (int bitPos = kStoredDecimalExponentBits - 1; bitPos >= 0; bitPos--) + exponentBits = (exponentBits << 1) | readBit(); + return exponentBits; +} } // namespace mongo diff --git a/src/mongo/db/storage/key_string.h b/src/mongo/db/storage/key_string.h index 02302498b0d..5b3d256f0d4 100644 --- a/src/mongo/db/storage/key_string.h +++ b/src/mongo/db/storage/key_string.h @@ -30,18 +30,29 @@ #pragma once +#include <limits> + +#include "mongo/bson/bsonmisc.h" #include "mongo/bson/bsonobj.h" #include "mongo/bson/bsonobjbuilder.h" -#include "mongo/bson/bsonmisc.h" -#include "mongo/bson/timestamp.h" #include "mongo/bson/ordering.h" +#include "mongo/bson/timestamp.h" #include "mongo/db/record_id.h" +#include "mongo/platform/decimal128.h" namespace mongo { class KeyString { public: /** + * Selects version of KeyString to use. V0 and V1 differ in their encoding of numeric values. + */ + enum class Version : uint8_t { V0 = 0, V1 = 1 }; + static StringData versionToString(Version version) { + return version == Version::V0 ? "V0" : "V1"; + } + + /** * Encodes info needed to restore the original BSONTypes from a KeyString. They cannot be * stored in place since we don't want them to affect the ordering (1 and 1.0 compare as * equal). @@ -51,8 +62,17 @@ public: // Sufficient bytes to encode extra type information for any BSON key that fits in 1KB. // The encoding format will need to change if we raise this limit. static const uint8_t kMaxBytesNeeded = 127; - - TypeBits() { + static const uint32_t kMaxKeyBytes = 1024; + static const uint32_t kMaxTypeBitsPerDecimal = 17; + static const uint32_t kBytesForTypeAndEmptyKey = 2; + static const uint32_t kMaxDecimalsPerKey = + kMaxKeyBytes / (sizeof(Decimal128::Value) + kBytesForTypeAndEmptyKey); + static_assert(kMaxTypeBitsPerDecimal * kMaxDecimalsPerKey < kMaxBytesNeeded * 8UL, + "encoding needs change to contain all type bits for worst case key"); + static const uint8_t kStoredDecimalExponentBits = 6; + static const uint32_t kStoredDecimalExponentMask = (1U << kStoredDecimalExponentBits) - 1; + + explicit TypeBits(Version version) : version(version) { reset(); } @@ -61,8 +81,8 @@ public: * BufReader in the format described on the getBuffer() method. */ void resetFromBuffer(BufReader* reader); - static TypeBits fromBuffer(BufReader* reader) { - TypeBits out; + static TypeBits fromBuffer(Version version, BufReader* reader) { + TypeBits out(version); out.resetFromBuffer(reader); return out; } @@ -124,9 +144,26 @@ public: static const uint8_t kSymbol = 0x1; static const uint8_t kInt = 0x0; - static const uint8_t kDouble = 0x1; - static const uint8_t kLong = 0x2; - static const uint8_t kNegativeZero = 0x3; // decodes as a double + static const uint8_t kLong = 0x1; + static const uint8_t kDouble = 0x2; + static const uint8_t kDecimal = 0x3; // indicates 6 more bits of typeinfo follow. + static const uint8_t kSpecialZeroPrefix = 0x3; // kNumericZero case, 3 more bits follow. + static const uint8_t kNegativeDoubleZero = 0x3; // normalized -0.0 double, either V0 or V1. + static const uint8_t kV0NegativeDoubleZero = 0x3; // legacy encoding for V0 + + // The following describe the initial 5 type bits for kNegativeOrDecimalZero. These bits + // encode double -0 or a 3-bit prefix (range 0 to 5) of the 15-bit decimal zero type. + static const uint8_t kV1NegativeDoubleZero = 0x18; // 0b11000 + + static const uint8_t kUnusedEncoding = 0x19; // 0b11001 + + // There are 6 * (1<<12) == 2 * (kMaxBiasedExponent + 1) == 24576 decimal zeros. + static const uint8_t kDecimalZero0xxx = 0x1a; // 0b11010 12 more exponent bits follow + static const uint8_t kDecimalZero1xxx = 0x1b; // 0b11011 + static const uint8_t kDecimalZero2xxx = 0x1c; // 0b11100 + static const uint8_t kDecimalZero3xxx = 0x1d; // 0b11101 + static const uint8_t kDecimalZero4xxx = 0x1e; // 0b11110 + static const uint8_t kDecimalZero5xxx = 0x1f; // 0b11111 void reset() { _curBit = 0; @@ -143,21 +180,24 @@ public: } void appendNumberDouble() { - appendBit(kDouble & 1); appendBit(kDouble >> 1); + appendBit(kDouble & 1); } void appendNumberInt() { - appendBit(kInt & 1); appendBit(kInt >> 1); + appendBit(kInt & 1); } void appendNumberLong() { - appendBit(kLong & 1); appendBit(kLong >> 1); + appendBit(kLong & 1); } - void appendNegativeZero() { - appendBit(kNegativeZero & 1); - appendBit(kNegativeZero >> 1); + void appendNumberDecimal() { + appendBit(kDecimal >> 1); + appendBit(kDecimal & 1); } + void appendZero(uint8_t zeroType); + void appendDecimalZero(uint32_t whichZero); + void appendDecimalExponent(uint8_t storedExponentBits); class Reader { public: @@ -170,9 +210,17 @@ public: return readBit(); } uint8_t readNumeric() { - uint8_t lowBit = readBit(); - return lowBit | (readBit() << 1); + uint8_t highBit = readBit(); + return (highBit << 1) | readBit(); } + uint8_t readZero(); + + // Given a decimal zero type between kDecimalZero0xxx and kDecimal5xxx, read the + // remaining 12 bits and return which of the 24576 decimal zeros to produce. + uint32_t readDecimalZero(uint8_t zeroType); + + // Reads the stored exponent bits of a non-zero decimal number. + uint8_t readDecimalExponent(); private: uint8_t readBit(); @@ -181,6 +229,8 @@ public: const TypeBits& _typeBits; }; + const Version version; + private: /** * size only includes data bytes, not the size byte itself. @@ -211,17 +261,32 @@ public: kExclusiveAfter, }; - KeyString() {} + /** + * Encodes the kind of NumberDecimal that is stored. + */ + enum DecimalContinuationMarker { + kDCMEqualToDouble = 0x0, + kDCMHasContinuationLessThanDoubleRoundedUpTo15Digits = 0x1, + kDCMEqualToDoubleRoundedUpTo15Digits = 0x2, + kDCMHasContinuationLargerThanDoubleRoundedUpTo15Digits = 0x3 + }; + + explicit KeyString(Version version) : version(version), _typeBits(version) {} - KeyString(const BSONObj& obj, Ordering ord, RecordId recordId) { + KeyString(Version version, const BSONObj& obj, Ordering ord, RecordId recordId) + : KeyString(version) { resetToKey(obj, ord, recordId); } - KeyString(const BSONObj& obj, Ordering ord, Discriminator discriminator = kInclusive) { + KeyString(Version version, + const BSONObj& obj, + Ordering ord, + Discriminator discriminator = kInclusive) + : KeyString(version) { resetToKey(obj, ord, discriminator); } - explicit KeyString(RecordId rid) { + KeyString(Version version, RecordId rid) : version(version), _typeBits(version) { appendRecordId(rid); } @@ -278,6 +343,12 @@ public: */ std::string toString() const; + /** + * Version to use for conversion to/from KeyString. V1 has different encodings for numeric + * values. + */ + const Version version; + private: void _appendAllElementsForIndexing(const BSONObj& obj, Ordering ord, @@ -299,6 +370,7 @@ private: void _appendNumberDouble(const double num, bool invert); void _appendNumberLong(const long long num, bool invert); void _appendNumberInt(const int num, bool invert); + void _appendNumberDecimal(const Decimal128 num, bool invert); /** * @param name - optional, can be NULL @@ -309,11 +381,15 @@ private: void _appendStringLike(StringData str, bool invert); void _appendBson(const BSONObj& obj, bool invert); - void _appendSmallDouble(double value, bool invert); - void _appendLargeDouble(double value, bool invert); + void _appendSmallDouble(double value, DecimalContinuationMarker dcm, bool invert); + void _appendLargeDouble(double value, DecimalContinuationMarker dcm, bool invert); void _appendInteger(const long long num, bool invert); void _appendPreshiftedIntegerPortion(uint64_t value, bool isNegative, bool invert); + void _appendDoubleWithoutTypeBits(const double num, DecimalContinuationMarker dcm, bool invert); + void _appendHugeDecimalWithoutTypeBits(const Decimal128 dec, bool invert); + void _appendTinyDecimalWithoutTypeBits(const Decimal128 dec, const double bin, bool invert); + template <typename T> void _append(const T& thing, bool invert); void _appendBytes(const void* source, size_t bytes, bool invert); diff --git a/src/mongo/db/storage/key_string_test.cpp b/src/mongo/db/storage/key_string_test.cpp index 471d365f012..07c8f357058 100644 --- a/src/mongo/db/storage/key_string_test.cpp +++ b/src/mongo/db/storage/key_string_test.cpp @@ -30,15 +30,24 @@ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kStorage +#include "mongo/platform/basic.h" + +#include <algorithm> #include <cmath> +#include <limits> +#include <random> +#include <typeinfo> +#include <vector> -#include "mongo/platform/basic.h" +#include "mongo/base/owned_pointer_vector.h" #include "mongo/config.h" #include "mongo/db/storage/key_string.h" +#include "mongo/platform/decimal128.h" +#include "mongo/stdx/functional.h" #include "mongo/unittest/unittest.h" #include "mongo/util/hex.h" #include "mongo/util/log.h" -#include "mongo/base/owned_pointer_vector.h" +#include "mongo/util/timer.h" using std::string; using namespace mongo; @@ -51,68 +60,93 @@ Ordering ALL_ASCENDING = Ordering::make(BSONObj()); Ordering ONE_ASCENDING = Ordering::make(BSON("a" << 1)); Ordering ONE_DESCENDING = Ordering::make(BSON("a" << -1)); -TEST(KeyStringTest, Simple1) { +class KeyStringTest : public mongo::unittest::Test { +public: + void run() { + auto base = static_cast<mongo::unittest::Test*>(this); + try { + version = KeyString::Version::V0; + base->run(); + version = KeyString::Version::V1; + base->run(); + } catch (...) { + log() << "exception while testing KeyString version " + << mongo::KeyString::versionToString(version); + throw; + } + } + +protected: + KeyString::Version version; +}; + + +TEST_F(KeyStringTest, Simple1) { BSONObj a = BSON("" << 5); BSONObj b = BSON("" << 6); ASSERT_LESS_THAN(a, b); - ASSERT_LESS_THAN(KeyString(a, ALL_ASCENDING, RecordId()), - KeyString(b, ALL_ASCENDING, RecordId())); + ASSERT_LESS_THAN(KeyString(version, a, ALL_ASCENDING, RecordId()), + KeyString(version, b, ALL_ASCENDING, RecordId())); } -#define ROUNDTRIP_ORDER(x, order) \ +#define ROUNDTRIP_ORDER(version, x, order) \ do { \ const BSONObj _orig = x; \ - const KeyString _ks(_orig, order); \ + const KeyString _ks(version, _orig, order); \ const BSONObj _converted = toBson(_ks, order); \ ASSERT_EQ(_converted, _orig); \ ASSERT(_converted.binaryEqual(_orig)); \ } while (0) -#define ROUNDTRIP(x) \ - do { \ - ROUNDTRIP_ORDER(x, ALL_ASCENDING); \ - ROUNDTRIP_ORDER(x, ONE_DESCENDING); \ +#define ROUNDTRIP(version, x) \ + do { \ + ROUNDTRIP_ORDER(version, x, ALL_ASCENDING); \ + ROUNDTRIP_ORDER(version, x, ONE_DESCENDING); \ } while (0) -#define COMPARES_SAME(_x, _y) \ - do { \ - KeyString _xKS(_x, ONE_ASCENDING); \ - KeyString _yKS(_y, ONE_ASCENDING); \ - if (_x == _y) { \ - ASSERT_EQUALS(_xKS, _yKS); \ - } else if (_x < _y) { \ - ASSERT_LESS_THAN(_xKS, _yKS); \ - } else { \ - ASSERT_LESS_THAN(_yKS, _xKS); \ - } \ - \ - _xKS.resetToKey(_x, ONE_DESCENDING); \ - _yKS.resetToKey(_y, ONE_DESCENDING); \ - if (_x == _y) { \ - ASSERT_EQUALS(_xKS, _yKS); \ - } else if (_x < _y) { \ - ASSERT_GREATER_THAN(_xKS, _yKS); \ - } else { \ - ASSERT_GREATER_THAN(_yKS, _xKS); \ - } \ +#define COMPARES_SAME(_v, _x, _y) \ + do { \ + KeyString _xKS(_v, _x, ONE_ASCENDING); \ + KeyString _yKS(_v, _y, ONE_ASCENDING); \ + if (_x == _y) { \ + ASSERT_EQUALS(_xKS, _yKS); \ + } else if (_x < _y) { \ + ASSERT_LESS_THAN(_xKS, _yKS); \ + } else { \ + ASSERT_LESS_THAN(_yKS, _xKS); \ + } \ + \ + _xKS.resetToKey(_x, ONE_DESCENDING); \ + _yKS.resetToKey(_y, ONE_DESCENDING); \ + if (_x == _y) { \ + ASSERT_EQUALS(_xKS, _yKS); \ + } else if (_x < _y) { \ + ASSERT_GREATER_THAN(_xKS, _yKS); \ + } else { \ + ASSERT_GREATER_THAN(_yKS, _xKS); \ + } \ } while (0) -TEST(KeyStringTest, ActualBytesDouble) { +TEST_F(KeyStringTest, ActualBytesDouble) { // just one test like this for utter sanity BSONObj a = BSON("" << 5.5); - KeyString ks(a, ALL_ASCENDING); - log() << "size: " << ks.getSize() << " hex [" << toHex(ks.getBuffer(), ks.getSize()) << "]"; + KeyString ks(version, a, ALL_ASCENDING); + log() << KeyString::versionToString(version) << " size: " << ks.getSize() << " hex [" + << toHex(ks.getBuffer(), ks.getSize()) << "]"; ASSERT_EQUALS(10U, ks.getSize()); - string hex = - "2B" // kNumericPositive1ByteInt - "0B" // (5 << 1) | 1 - "02000000000000" // fractional bytes of double - "04"; // kEnd + string hex = version == KeyString::Version::V0 ? "2B" // kNumericPositive1ByteInt + "0B" // (5 << 1) | 1 + "02000000000000" // fractional bytes of double + "04" // kEnd + : "2B" // kNumericPositive1ByteInt + "0B" // (5 << 1) | 1 + "80000000000000" // fractional bytes + "04"; // kEnd ASSERT_EQUALS(hex, toHex(ks.getBuffer(), ks.getSize())); @@ -133,151 +167,222 @@ TEST(KeyStringTest, ActualBytesDouble) { ASSERT_EQUALS(hexFlipped, toHex(ks.getBuffer(), ks.getSize())); } -TEST(KeyStringTest, AllTypesSimple) { - ROUNDTRIP(BSON("" << 5.5)); - ROUNDTRIP(BSON("" +TEST_F(KeyStringTest, AllTypesSimple) { + ROUNDTRIP(version, BSON("" << 5.5)); + ROUNDTRIP(version, + BSON("" << "abc")); - ROUNDTRIP(BSON("" << BSON("a" << 5))); - ROUNDTRIP(BSON("" << BSON_ARRAY("a" << 5))); - ROUNDTRIP(BSON("" << BSONBinData("abc", 3, bdtCustom))); - ROUNDTRIP(BSON("" << BSONUndefined)); - ROUNDTRIP(BSON("" << OID("abcdefabcdefabcdefabcdef"))); - ROUNDTRIP(BSON("" << true)); - ROUNDTRIP(BSON("" << Date_t::fromMillisSinceEpoch(123123123))); - ROUNDTRIP(BSON("" << BSONRegEx("asdf", "x"))); - ROUNDTRIP(BSON("" << BSONDBRef("db.c", OID("010203040506070809101112")))); - ROUNDTRIP(BSON("" << BSONCode("abc_code"))); - ROUNDTRIP(BSON("" << BSONCodeWScope("def_code", + ROUNDTRIP(version, BSON("" << BSON("a" << 5))); + ROUNDTRIP(version, BSON("" << BSON_ARRAY("a" << 5))); + ROUNDTRIP(version, BSON("" << BSONBinData("abc", 3, bdtCustom))); + ROUNDTRIP(version, BSON("" << BSONUndefined)); + ROUNDTRIP(version, BSON("" << OID("abcdefabcdefabcdefabcdef"))); + ROUNDTRIP(version, BSON("" << true)); + ROUNDTRIP(version, BSON("" << Date_t::fromMillisSinceEpoch(123123123))); + ROUNDTRIP(version, BSON("" << BSONRegEx("asdf", "x"))); + ROUNDTRIP(version, BSON("" << BSONDBRef("db.c", OID("010203040506070809101112")))); + ROUNDTRIP(version, BSON("" << BSONCode("abc_code"))); + ROUNDTRIP(version, + BSON("" << BSONCodeWScope("def_code", BSON("x_scope" << "a")))); - ROUNDTRIP(BSON("" << 5)); - ROUNDTRIP(BSON("" << Timestamp(123123, 123))); - ROUNDTRIP(BSON("" << Timestamp(~0U, 3))); - ROUNDTRIP(BSON("" << 1235123123123LL)); + ROUNDTRIP(version, BSON("" << 5)); + ROUNDTRIP(version, BSON("" << Timestamp(123123, 123))); + ROUNDTRIP(version, BSON("" << Timestamp(~0U, 3))); + ROUNDTRIP(version, BSON("" << 1235123123123LL)); } -TEST(KeyStringTest, Array1) { +TEST_F(KeyStringTest, Array1) { BSONObj emptyArray = BSON("" << BSONArray()); ASSERT_EQUALS(Array, emptyArray.firstElement().type()); - ROUNDTRIP(emptyArray); - ROUNDTRIP(BSON("" << BSON_ARRAY(emptyArray.firstElement()))); - ROUNDTRIP(BSON("" << BSON_ARRAY(1))); - ROUNDTRIP(BSON("" << BSON_ARRAY(1 << 2))); - ROUNDTRIP(BSON("" << BSON_ARRAY(1 << 2 << 3))); + ROUNDTRIP(version, emptyArray); + ROUNDTRIP(version, BSON("" << BSON_ARRAY(emptyArray.firstElement()))); + ROUNDTRIP(version, BSON("" << BSON_ARRAY(1))); + ROUNDTRIP(version, BSON("" << BSON_ARRAY(1 << 2))); + ROUNDTRIP(version, BSON("" << BSON_ARRAY(1 << 2 << 3))); { - KeyString a(emptyArray, ALL_ASCENDING, RecordId::min()); - KeyString b(emptyArray, ALL_ASCENDING, RecordId(5)); + KeyString a(version, emptyArray, ALL_ASCENDING, RecordId::min()); + KeyString b(version, emptyArray, ALL_ASCENDING, RecordId(5)); ASSERT_LESS_THAN(a, b); } { - KeyString a(emptyArray, ALL_ASCENDING, RecordId(0)); - KeyString b(emptyArray, ALL_ASCENDING, RecordId(5)); + KeyString a(version, emptyArray, ALL_ASCENDING, RecordId(0)); + KeyString b(version, emptyArray, ALL_ASCENDING, RecordId(5)); ASSERT_LESS_THAN(a, b); } } -TEST(KeyStringTest, SubDoc1) { - ROUNDTRIP(BSON("" << BSON("foo" << 2))); - ROUNDTRIP(BSON("" << BSON("foo" << 2 << "bar" +TEST_F(KeyStringTest, SubDoc1) { + ROUNDTRIP(version, BSON("" << BSON("foo" << 2))); + ROUNDTRIP(version, + BSON("" << BSON("foo" << 2 << "bar" << "asd"))); - ROUNDTRIP(BSON("" << BSON("foo" << BSON_ARRAY(2 << 4)))); + ROUNDTRIP(version, BSON("" << BSON("foo" << BSON_ARRAY(2 << 4)))); } -TEST(KeyStringTest, SubDoc2) { +TEST_F(KeyStringTest, SubDoc2) { BSONObj a = BSON("" << BSON("a" << "foo")); BSONObj b = BSON("" << BSON("b" << 5.5)); BSONObj c = BSON("" << BSON("c" << BSON("x" << 5))); - ROUNDTRIP(a); - ROUNDTRIP(b); - ROUNDTRIP(c); + ROUNDTRIP(version, a); + ROUNDTRIP(version, b); + ROUNDTRIP(version, c); - COMPARES_SAME(a, b); - COMPARES_SAME(a, c); - COMPARES_SAME(b, c); + COMPARES_SAME(version, a, b); + COMPARES_SAME(version, a, c); + COMPARES_SAME(version, b, c); } -TEST(KeyStringTest, Compound1) { - ROUNDTRIP(BSON("" << BSON("a" << 5) << "" << 1)); - ROUNDTRIP(BSON("" << BSON("" << 5) << "" << 1)); +TEST_F(KeyStringTest, Compound1) { + ROUNDTRIP(version, BSON("" << BSON("a" << 5) << "" << 1)); + ROUNDTRIP(version, BSON("" << BSON("" << 5) << "" << 1)); } -TEST(KeyStringTest, Undef1) { - ROUNDTRIP(BSON("" << BSONUndefined)); +TEST_F(KeyStringTest, Undef1) { + ROUNDTRIP(version, BSON("" << BSONUndefined)); } -TEST(KeyStringTest, NumberLong0) { +TEST_F(KeyStringTest, NumberLong0) { double d = (1ll << 52) - 1; long long ll = static_cast<long long>(d); double d2 = static_cast<double>(ll); ASSERT_EQUALS(d, d2); } -TEST(KeyStringTest, NumbersNearInt32Max) { +TEST_F(KeyStringTest, NumbersNearInt32Max) { int64_t start = std::numeric_limits<int32_t>::max(); for (int64_t i = -1000; i < 1000; i++) { long long toTest = start + i; - ROUNDTRIP(BSON("" << toTest)); - ROUNDTRIP(BSON("" << static_cast<int>(toTest))); - ROUNDTRIP(BSON("" << static_cast<double>(toTest))); + ROUNDTRIP(version, BSON("" << toTest)); + ROUNDTRIP(version, BSON("" << static_cast<int>(toTest))); + ROUNDTRIP(version, BSON("" << static_cast<double>(toTest))); + } +} + +TEST_F(KeyStringTest, DecimalNumbers) { + if (version == KeyString::Version::V0) { + log() << "not testing DecimalNumbers for KeyString V0"; + return; } + + const auto V1 = KeyString::Version::V1; + + // Zeros + ROUNDTRIP(V1, BSON("" << Decimal128("0"))); + ROUNDTRIP(V1, BSON("" << Decimal128("0.0"))); + ROUNDTRIP(V1, BSON("" << Decimal128("-0"))); + ROUNDTRIP(V1, BSON("" << Decimal128("0E5000"))); + ROUNDTRIP(V1, BSON("" << Decimal128("-0.0000E-6172"))); + + // Special numbers + ROUNDTRIP(V1, BSON("" << Decimal128("NaN"))); + ROUNDTRIP(V1, BSON("" << Decimal128("+Inf"))); + ROUNDTRIP(V1, BSON("" << Decimal128("-Inf"))); + + // Decimal representations of whole double numbers + ROUNDTRIP(V1, BSON("" << Decimal128("1"))); + ROUNDTRIP(V1, BSON("" << Decimal128("2.0"))); + ROUNDTRIP(V1, BSON("" << Decimal128("-2.0E1"))); + ROUNDTRIP(V1, BSON("" << Decimal128("1234.56E15"))); + ROUNDTRIP(V1, BSON("" << Decimal128("2.00000000000000000000000"))); + ROUNDTRIP(V1, BSON("" << Decimal128("-9223372036854775808.00000000000000"))); // -2**63 + ROUNDTRIP(V1, BSON("" << Decimal128("973555660975280180349468061728768E1"))); // 1.875 * 2**112 + + // Decimal representations of fractional double numbers + ROUNDTRIP(V1, BSON("" << Decimal128("1.25"))); + ROUNDTRIP(V1, BSON("" << Decimal128("3.141592653584666550159454345703125"))); + ROUNDTRIP(V1, BSON("" << Decimal128("-127.50"))); + + // Decimal representations of whole int64 non-double numbers + ROUNDTRIP(V1, BSON("" << Decimal128("243290200817664E4"))); // 20! + ROUNDTRIP(V1, BSON("" << Decimal128("9007199254740993"))); // 2**53 + 1 + ROUNDTRIP(V1, BSON("" << Decimal128(std::numeric_limits<int64_t>::max()))); + ROUNDTRIP(V1, BSON("" << Decimal128(std::numeric_limits<int64_t>::min()))); + + // Decimals in int64_t range without decimal or integer representation + ROUNDTRIP(V1, BSON("" << Decimal128("1.23"))); + ROUNDTRIP(V1, BSON("" << Decimal128("-1.1"))); + ROUNDTRIP(V1, BSON("" << Decimal128("-12345.60"))); + ROUNDTRIP(V1, BSON("" << Decimal128("3.141592653589793238462643383279502"))); + ROUNDTRIP(V1, BSON("" << Decimal128("-3.141592653589793115997963468544185"))); + + // Decimal representations of small double numbers + ROUNDTRIP(V1, BSON("" << Decimal128("0.50"))); + ROUNDTRIP(V1, + BSON("" << Decimal128("-0.3552713678800500929355621337890625E-14"))); // -2**(-48) + ROUNDTRIP(V1, + BSON("" << Decimal128("-0.000000000000001234567890123456789012345678901234E-99"))); + + // Decimal representations of small decimals not representable as double + ROUNDTRIP(V1, BSON("" << Decimal128("0.02"))); + + // Large decimals + ROUNDTRIP(V1, BSON("" << Decimal128("1234567890123456789012345678901234E6000"))); + ROUNDTRIP(V1, + BSON("" << Decimal128("-19950631168.80758384883742162683585E3000"))); // -2**10000 + + // Tiny, tiny decimals + ROUNDTRIP(V1, + BSON("" << Decimal128("0.2512388057698744585180135042133610E-6020"))); // 2**(-10000) + ROUNDTRIP(V1, BSON("" << Decimal128("4.940656458412465441765687928682213E-324") << "" << 1)); + ROUNDTRIP(V1, BSON("" << Decimal128("-0.8289046058458094980903836776809409E-316"))); } -TEST(KeyStringTest, LotsOfNumbers1) { +TEST_F(KeyStringTest, LotsOfNumbers1) { for (int i = 0; i < 64; i++) { int64_t x = 1LL << i; - ROUNDTRIP(BSON("" << static_cast<long long>(x))); - ROUNDTRIP(BSON("" << static_cast<int>(x))); - ROUNDTRIP(BSON("" << static_cast<double>(x))); - ROUNDTRIP(BSON("" << (static_cast<double>(x) + .1))); - ROUNDTRIP(BSON("" << (static_cast<double>(x) - .1))); + ROUNDTRIP(version, BSON("" << static_cast<long long>(x))); + ROUNDTRIP(version, BSON("" << static_cast<int>(x))); + ROUNDTRIP(version, BSON("" << static_cast<double>(x))); + ROUNDTRIP(version, BSON("" << (static_cast<double>(x) + .1))); + ROUNDTRIP(version, BSON("" << (static_cast<double>(x) - .1))); - ROUNDTRIP(BSON("" << (static_cast<long long>(x) + 1))); - ROUNDTRIP(BSON("" << (static_cast<int>(x) + 1))); - ROUNDTRIP(BSON("" << (static_cast<double>(x) + 1))); - ROUNDTRIP(BSON("" << (static_cast<double>(x) + 1.1))); + ROUNDTRIP(version, BSON("" << (static_cast<long long>(x) + 1))); + ROUNDTRIP(version, BSON("" << (static_cast<int>(x) + 1))); + ROUNDTRIP(version, BSON("" << (static_cast<double>(x) + 1))); + ROUNDTRIP(version, BSON("" << (static_cast<double>(x) + 1.1))); // Avoid negating signed integral minima if (i < 63) - ROUNDTRIP(BSON("" << -static_cast<long long>(x))); + ROUNDTRIP(version, BSON("" << -static_cast<long long>(x))); if (i < 31) - ROUNDTRIP(BSON("" << -static_cast<int>(x))); - - ROUNDTRIP(BSON("" << -static_cast<double>(x))); - ROUNDTRIP(BSON("" << -(static_cast<double>(x) + .1))); - - ROUNDTRIP(BSON("" << -(static_cast<long long>(x) + 1))); - ROUNDTRIP(BSON("" << -(static_cast<int>(x) + 1))); - ROUNDTRIP(BSON("" << -(static_cast<double>(x) + 1))); - ROUNDTRIP(BSON("" << -(static_cast<double>(x) + 1.1))); + ROUNDTRIP(version, BSON("" << -static_cast<int>(x))); + ROUNDTRIP(version, BSON("" << -static_cast<double>(x))); + ROUNDTRIP(version, BSON("" << -(static_cast<double>(x) + .1))); + + ROUNDTRIP(version, BSON("" << -(static_cast<long long>(x) + 1))); + ROUNDTRIP(version, BSON("" << -(static_cast<int>(x) + 1))); + ROUNDTRIP(version, BSON("" << -(static_cast<double>(x) + 1))); + ROUNDTRIP(version, BSON("" << -(static_cast<double>(x) + 1.1))); } } -TEST(KeyStringTest, LotsOfNumbers2) { +TEST_F(KeyStringTest, LotsOfNumbers2) { for (double i = -1100; i < 1100; i++) { double x = pow(2, i); - ROUNDTRIP(BSON("" << x)); + ROUNDTRIP(version, BSON("" << x)); } for (double i = -1100; i < 1100; i++) { double x = pow(2.1, i); - ROUNDTRIP(BSON("" << x)); + ROUNDTRIP(version, BSON("" << x)); } } -TEST(KeyStringTest, RecordIdOrder1) { +TEST_F(KeyStringTest, RecordIdOrder1) { Ordering ordering = Ordering::make(BSON("a" << 1)); - KeyString a(BSON("" << 5), ordering, RecordId::min()); - KeyString b(BSON("" << 5), ordering, RecordId(2)); - KeyString c(BSON("" << 5), ordering, RecordId(3)); - KeyString d(BSON("" << 6), ordering, RecordId()); - KeyString e(BSON("" << 6), ordering, RecordId(1)); + KeyString a(version, BSON("" << 5), ordering, RecordId::min()); + KeyString b(version, BSON("" << 5), ordering, RecordId(2)); + KeyString c(version, BSON("" << 5), ordering, RecordId(3)); + KeyString d(version, BSON("" << 6), ordering, RecordId()); + KeyString e(version, BSON("" << 6), ordering, RecordId(1)); ASSERT_LESS_THAN(a, b); ASSERT_LESS_THAN(b, c); @@ -285,13 +390,13 @@ TEST(KeyStringTest, RecordIdOrder1) { ASSERT_LESS_THAN(d, e); } -TEST(KeyStringTest, RecordIdOrder2) { +TEST_F(KeyStringTest, RecordIdOrder2) { Ordering ordering = Ordering::make(BSON("a" << -1 << "b" << -1)); - KeyString a(BSON("" << 5 << "" << 6), ordering, RecordId::min()); - KeyString b(BSON("" << 5 << "" << 6), ordering, RecordId(5)); - KeyString c(BSON("" << 5 << "" << 5), ordering, RecordId(4)); - KeyString d(BSON("" << 3 << "" << 4), ordering, RecordId(3)); + KeyString a(version, BSON("" << 5 << "" << 6), ordering, RecordId::min()); + KeyString b(version, BSON("" << 5 << "" << 6), ordering, RecordId(5)); + KeyString c(version, BSON("" << 5 << "" << 5), ordering, RecordId(4)); + KeyString d(version, BSON("" << 3 << "" << 4), ordering, RecordId(3)); ASSERT_LESS_THAN(a, b); ASSERT_LESS_THAN(b, c); @@ -301,19 +406,19 @@ TEST(KeyStringTest, RecordIdOrder2) { ASSERT_LESS_THAN(b, d); } -TEST(KeyStringTest, RecordIdOrder2Double) { +TEST_F(KeyStringTest, RecordIdOrder2Double) { Ordering ordering = Ordering::make(BSON("a" << -1 << "b" << -1)); - KeyString a(BSON("" << 5.0 << "" << 6.0), ordering, RecordId::min()); - KeyString b(BSON("" << 5.0 << "" << 6.0), ordering, RecordId(5)); - KeyString c(BSON("" << 3.0 << "" << 4.0), ordering, RecordId(3)); + KeyString a(version, BSON("" << 5.0 << "" << 6.0), ordering, RecordId::min()); + KeyString b(version, BSON("" << 5.0 << "" << 6.0), ordering, RecordId(5)); + KeyString c(version, BSON("" << 3.0 << "" << 4.0), ordering, RecordId(3)); ASSERT_LESS_THAN(a, b); ASSERT_LESS_THAN(b, c); ASSERT_LESS_THAN(a, c); } -TEST(KeyStringTest, Timestamp) { +TEST_F(KeyStringTest, Timestamp) { BSONObj a = BSON("" << Timestamp(0, 0)); BSONObj b = BSON("" << Timestamp(1234, 1)); BSONObj c = BSON("" << Timestamp(1234, 2)); @@ -321,19 +426,19 @@ TEST(KeyStringTest, Timestamp) { BSONObj e = BSON("" << Timestamp(~0U, 0)); { - ROUNDTRIP(a); - ROUNDTRIP(b); - ROUNDTRIP(c); + ROUNDTRIP(version, a); + ROUNDTRIP(version, b); + ROUNDTRIP(version, c); ASSERT_LESS_THAN(a, b); ASSERT_LESS_THAN(b, c); ASSERT_LESS_THAN(c, d); - KeyString ka(a, ALL_ASCENDING); - KeyString kb(b, ALL_ASCENDING); - KeyString kc(c, ALL_ASCENDING); - KeyString kd(d, ALL_ASCENDING); - KeyString ke(e, ALL_ASCENDING); + KeyString ka(version, a, ALL_ASCENDING); + KeyString kb(version, b, ALL_ASCENDING); + KeyString kc(version, c, ALL_ASCENDING); + KeyString kd(version, d, ALL_ASCENDING); + KeyString ke(version, e, ALL_ASCENDING); ASSERT(ka.compare(kb) < 0); ASSERT(kb.compare(kc) < 0); @@ -344,18 +449,18 @@ TEST(KeyStringTest, Timestamp) { { Ordering ALL_ASCENDING = Ordering::make(BSON("a" << -1)); - ROUNDTRIP(a); - ROUNDTRIP(b); - ROUNDTRIP(c); + ROUNDTRIP(version, a); + ROUNDTRIP(version, b); + ROUNDTRIP(version, c); ASSERT(d.woCompare(c, ALL_ASCENDING) < 0); ASSERT(c.woCompare(b, ALL_ASCENDING) < 0); ASSERT(b.woCompare(a, ALL_ASCENDING) < 0); - KeyString ka(a, ALL_ASCENDING); - KeyString kb(b, ALL_ASCENDING); - KeyString kc(c, ALL_ASCENDING); - KeyString kd(d, ALL_ASCENDING); + KeyString ka(version, a, ALL_ASCENDING); + KeyString kb(version, b, ALL_ASCENDING); + KeyString kc(version, c, ALL_ASCENDING); + KeyString kd(version, d, ALL_ASCENDING); ASSERT(ka.compare(kb) > 0); ASSERT(kb.compare(kc) > 0); @@ -363,33 +468,26 @@ TEST(KeyStringTest, Timestamp) { } } -TEST(KeyStringTest, AllTypesRoundtrip) { +TEST_F(KeyStringTest, AllTypesRoundtrip) { for (int i = 1; i <= JSTypeMax; i++) { - // TODO: Currently KeyString does not support NumberDecimal - // SERVER-19703 - if (i == NumberDecimal) - continue; { BSONObjBuilder b; b.appendMinForType("", i); BSONObj o = b.obj(); - ROUNDTRIP(o); + ROUNDTRIP(version, o); } { BSONObjBuilder b; b.appendMaxForType("", i); BSONObj o = b.obj(); - ROUNDTRIP(o); + ROUNDTRIP(version, o); } } } -const std::vector<BSONObj>& getInterestingElements() { +const std::vector<BSONObj>& getInterestingElements(KeyString::Version version) { static std::vector<BSONObj> elements; - - if (!elements.empty()) { - return elements; - } + elements.clear(); // These are used to test strings that include NUL bytes. const StringData ball("ball", StringData::LiteralTag()); @@ -501,11 +599,36 @@ const std::vector<BSONObj>& getInterestingElements() { if (powerOfTwo <= 52) { // is dNum - 0.5 representable? elements.push_back(BSON("" << (dNum - 0.5))); elements.push_back(BSON("" << -(dNum - 0.5))); + elements.push_back(BSON("" << (dNum - 0.1))); + elements.push_back(BSON("" << -(dNum - 0.1))); } if (powerOfTwo <= 51) { // is dNum + 0.5 representable? elements.push_back(BSON("" << (dNum + 0.5))); elements.push_back(BSON("" << -(dNum + 0.5))); + elements.push_back(BSON("" << (dNum + 0.1))); + elements.push_back(BSON("" << -(dNum + -.1))); + } + + if (version != KeyString::Version::V0) { + const Decimal128 dec(static_cast<int64_t>(lNum)); + const Decimal128 one("1"); + const Decimal128 half("0.5"); + const Decimal128 tenth("0.1"); + elements.push_back(BSON("" << dec)); + elements.push_back(BSON("" << dec.add(one))); + elements.push_back(BSON("" << dec.subtract(one))); + elements.push_back(BSON("" << dec.negate())); + elements.push_back(BSON("" << dec.add(one).negate())); + elements.push_back(BSON("" << dec.subtract(one).negate())); + elements.push_back(BSON("" << dec.subtract(half))); + elements.push_back(BSON("" << dec.subtract(half).negate())); + elements.push_back(BSON("" << dec.add(half))); + elements.push_back(BSON("" << dec.add(half).negate())); + elements.push_back(BSON("" << dec.subtract(tenth))); + elements.push_back(BSON("" << dec.subtract(tenth).negate())); + elements.push_back(BSON("" << dec.add(tenth))); + elements.push_back(BSON("" << dec.add(tenth).negate())); } } @@ -541,10 +664,39 @@ const std::vector<BSONObj>& getInterestingElements() { elements.push_back(BSON("" << closestBelow)); } + if (version != KeyString::Version::V0) { + // Numbers that are hard to round to between binary and decimal. + elements.push_back(BSON("" << 0.1)); + elements.push_back(BSON("" << Decimal128("0.100000000"))); + // Decimals closest to the double representation of 0.1. + elements.push_back(BSON("" << Decimal128("0.1000000000000000055511151231257827"))); + elements.push_back(BSON("" << Decimal128("0.1000000000000000055511151231257828"))); + + // Numbers close to numerical underflow/overflow for double. + elements.push_back(BSON("" << Decimal128("1.797693134862315708145274237317044E308"))); + elements.push_back(BSON("" << Decimal128("1.797693134862315708145274237317043E308"))); + elements.push_back(BSON("" << Decimal128("-1.797693134862315708145274237317044E308"))); + elements.push_back(BSON("" << Decimal128("-1.797693134862315708145274237317043E308"))); + elements.push_back(BSON("" << Decimal128("9.881312916824930883531375857364427"))); + elements.push_back(BSON("" << Decimal128("9.881312916824930883531375857364428"))); + elements.push_back(BSON("" << Decimal128("-9.881312916824930883531375857364427"))); + elements.push_back(BSON("" << Decimal128("-9.881312916824930883531375857364428"))); + elements.push_back(BSON("" << Decimal128("4.940656458412465441765687928682213E-324"))); + elements.push_back(BSON("" << Decimal128("4.940656458412465441765687928682214E-324"))); + elements.push_back(BSON("" << Decimal128("-4.940656458412465441765687928682214E-324"))); + elements.push_back(BSON("" << Decimal128("-4.940656458412465441765687928682213E-324"))); + } + + // Tricky double precision number for binary/decimal conversion: very close to a decimal + if (version != KeyString::Version::V0) + elements.push_back(BSON("" << Decimal128("3743626360493413E-165"))); + elements.push_back(BSON("" << 3743626360493413E-165)); + return elements; } -void testPermutation(const std::vector<BSONObj>& elementsOrig, +void testPermutation(KeyString::Version version, + const std::vector<BSONObj>& elementsOrig, const std::vector<BSONObj>& orderings, bool debug) { // Since KeyStrings are compared using memcmp we can assume it provides a total ordering such @@ -563,12 +715,12 @@ void testPermutation(const std::vector<BSONObj>& elementsOrig, const BSONObj& o1 = elements[i]; if (debug) log() << "\to1: " << o1; - ROUNDTRIP_ORDER(o1, ordering); + ROUNDTRIP_ORDER(version, o1, ordering); - KeyString k1(o1, ordering); + KeyString k1(version, o1, ordering); - KeyString l1(BSON("l" << o1.firstElement()), ordering); // kLess - KeyString g1(BSON("g" << o1.firstElement()), ordering); // kGreater + KeyString l1(version, BSON("l" << o1.firstElement()), ordering); // kLess + KeyString g1(version, BSON("g" << o1.firstElement()), ordering); // kGreater ASSERT_LT(l1, k1); ASSERT_GT(g1, k1); @@ -576,9 +728,9 @@ void testPermutation(const std::vector<BSONObj>& elementsOrig, const BSONObj& o2 = elements[i + 1]; if (debug) log() << "\t\t o2: " << o2; - KeyString k2(o2, ordering); - KeyString g2(BSON("g" << o2.firstElement()), ordering); - KeyString l2(BSON("l" << o2.firstElement()), ordering); + KeyString k2(version, o2, ordering); + KeyString g2(version, BSON("g" << o2.firstElement()), ordering); + KeyString l2(version, BSON("l" << o2.firstElement()), ordering); int bsonCmp = o1.woCompare(o2, ordering); invariant(bsonCmp <= 0); // We should be sorted... @@ -613,29 +765,28 @@ void testPermutation(const std::vector<BSONObj>& elementsOrig, } } -TEST(KeyStringTest, AllPermCompare) { - const std::vector<BSONObj>& elements = getInterestingElements(); +TEST_F(KeyStringTest, AllPermCompare) { + const std::vector<BSONObj>& elements = getInterestingElements(version); for (size_t i = 0; i < elements.size(); i++) { const BSONObj& o = elements[i]; - ROUNDTRIP(o); + ROUNDTRIP(version, o); } std::vector<BSONObj> orderings; orderings.push_back(BSON("a" << 1)); orderings.push_back(BSON("a" << -1)); - testPermutation(elements, orderings, false); + testPermutation(version, elements, orderings, false); } -TEST(KeyStringTest, AllPerm2Compare) { -// This test can take over a minute without optimizations. Re-enable if you need to debug it. +TEST_F(KeyStringTest, AllPerm2Compare) { #if !defined(MONGO_CONFIG_OPTIMIZED_BUILD) - log() << "\t\t\tskipping test on non-optimized build"; + log() << "\t\t\tskipping permutation testing on non-optimized build"; return; #endif - const std::vector<BSONObj>& baseElements = getInterestingElements(); + const std::vector<BSONObj>& baseElements = getInterestingElements(version); std::vector<BSONObj> elements; for (size_t i = 0; i < baseElements.size(); i++) { @@ -648,11 +799,12 @@ TEST(KeyStringTest, AllPerm2Compare) { } } - log() << "AllPrem2Compare size:" << elements.size(); + log() << "AllPerm2Compare " << KeyString::versionToString(version) + << " size:" << elements.size(); for (size_t i = 0; i < elements.size(); i++) { const BSONObj& o = elements[i]; - ROUNDTRIP(o); + ROUNDTRIP(version, o); } std::vector<BSONObj> orderings; @@ -661,7 +813,7 @@ TEST(KeyStringTest, AllPerm2Compare) { orderings.push_back(BSON("a" << 1 << "b" << -1)); orderings.push_back(BSON("a" << -1 << "b" << -1)); - testPermutation(elements, orderings, false); + testPermutation(version, elements, orderings, false); } #define COMPARE_HELPER(LHS, RHS) (((LHS) < (RHS)) ? -1 : (((LHS) == (RHS)) ? 0 : 1)) @@ -696,18 +848,18 @@ int compareNumbers(const BSONElement& lhs, const BSONElement& rhs) { } } -TEST(KeyStringTest, NaNs) { +TEST_F(KeyStringTest, NaNs) { // TODO use hex floats to force distinct NaNs const double nan1 = std::numeric_limits<double>::quiet_NaN(); const double nan2 = std::numeric_limits<double>::signaling_NaN(); // Since only output a single NaN, we can't use the normal ROUNDTRIP testing here. - const KeyString ks1a(BSON("" << nan1), ONE_ASCENDING); - const KeyString ks1d(BSON("" << nan1), ONE_DESCENDING); + const KeyString ks1a(version, BSON("" << nan1), ONE_ASCENDING); + const KeyString ks1d(version, BSON("" << nan1), ONE_DESCENDING); - const KeyString ks2a(BSON("" << nan2), ONE_ASCENDING); - const KeyString ks2d(BSON("" << nan2), ONE_DESCENDING); + const KeyString ks2a(version, BSON("" << nan2), ONE_ASCENDING); + const KeyString ks2d(version, BSON("" << nan2), ONE_DESCENDING); ASSERT_EQ(ks1a, ks2a); ASSERT_EQ(ks1d, ks2d); @@ -717,7 +869,7 @@ TEST(KeyStringTest, NaNs) { ASSERT(std::isnan(toBson(ks1d, ONE_DESCENDING)[""].Double())); ASSERT(std::isnan(toBson(ks2d, ONE_DESCENDING)[""].Double())); } -TEST(KeyStringTest, NumberOrderLots) { +TEST_F(KeyStringTest, NumberOrderLots) { std::vector<BSONObj> numbers; { numbers.push_back(BSON("" << 0)); @@ -773,7 +925,7 @@ TEST(KeyStringTest, NumberOrderLots) { OwnedPointerVector<KeyString> keyStrings; for (size_t i = 0; i < numbers.size(); i++) { - keyStrings.push_back(new KeyString(numbers[i], ordering)); + keyStrings.push_back(new KeyString(version, numbers[i], ordering)); } for (size_t i = 0; i < numbers.size(); i++) { @@ -793,12 +945,12 @@ TEST(KeyStringTest, NumberOrderLots) { } } -TEST(KeyStringTest, RecordIds) { +TEST_F(KeyStringTest, RecordIds) { for (int i = 0; i < 63; i++) { const RecordId rid = RecordId(1ll << i); { // Test encoding / decoding of single RecordIds - const KeyString ks(rid); + const KeyString ks(version, rid); ASSERT_GTE(ks.getSize(), 2u); ASSERT_LTE(ks.getSize(), 10u); @@ -811,12 +963,12 @@ TEST(KeyStringTest, RecordIds) { } if (rid.isNormal()) { - ASSERT_GT(ks, KeyString(RecordId())); - ASSERT_GT(ks, KeyString(RecordId::min())); - ASSERT_LT(ks, KeyString(RecordId::max())); + ASSERT_GT(ks, KeyString(version, RecordId())); + ASSERT_GT(ks, KeyString(version, RecordId::min())); + ASSERT_LT(ks, KeyString(version, RecordId::max())); - ASSERT_GT(ks, KeyString(RecordId(rid.repr() - 1))); - ASSERT_LT(ks, KeyString(RecordId(rid.repr() + 1))); + ASSERT_GT(ks, KeyString(version, RecordId(rid.repr() - 1))); + ASSERT_LT(ks, KeyString(version, RecordId(rid.repr() + 1))); } } @@ -824,15 +976,15 @@ TEST(KeyStringTest, RecordIds) { RecordId other = RecordId(1ll << j); if (rid == other) - ASSERT_EQ(KeyString(rid), KeyString(other)); + ASSERT_EQ(KeyString(version, rid), KeyString(version, other)); if (rid < other) - ASSERT_LT(KeyString(rid), KeyString(other)); + ASSERT_LT(KeyString(version, rid), KeyString(version, other)); if (rid > other) - ASSERT_GT(KeyString(rid), KeyString(other)); + ASSERT_GT(KeyString(version, rid), KeyString(version, other)); { // Test concatenating RecordIds like in a unique index. - KeyString ks; + KeyString ks(version); ks.appendRecordId(RecordId::max()); // uses all bytes ks.appendRecordId(rid); ks.appendRecordId(RecordId(0xDEADBEEF)); // uses some extra bytes @@ -857,3 +1009,134 @@ TEST(KeyStringTest, RecordIds) { } } } + +namespace { +const uint64_t kMinPerfMicros = 10 * 1000; +const uint64_t kMinPerfSamples = 10 * 1000; +typedef std::vector<BSONObj> Numbers; + +/** + * Evaluates ROUNDTRIP on all items in Numbers a sufficient number of times to take at least + * kMinPerfMicros microseconds. Logs the elapsed time per ROUNDTRIP evaluation. + */ +void perfTest(KeyString::Version version, const Numbers& numbers) { + uint64_t micros = 0; + uint64_t iters; + // Ensure at least 16 iterations are done and at least 25 milliseconds is timed + for (iters = 16; iters < (1 << 30) && micros < kMinPerfMicros; iters *= 2) { + // Measure the number of loops + Timer t; + + for (uint64_t i = 0; i < iters; i++) + for (auto item : numbers) { + // Assuming there are sufficient invariants in the to/from KeyString methods + // that calls will not be optimized away. + const KeyString ks(version, item, ALL_ASCENDING); + const BSONObj& converted = toBson(ks, ALL_ASCENDING); + invariant(converted.binaryEqual(item)); + } + + micros = t.micros(); + } + + auto minmax = std::minmax_element(numbers.begin(), numbers.end()); + + log() << 1E3 * micros / static_cast<double>(iters * numbers.size()) << " ns per " + << mongo::KeyString::versionToString(version) << " roundtrip" + << (kDebugBuild ? " (DEBUG BUILD!)" : "") << " min " << (*minmax.first)[""] << ", max" + << (*minmax.second)[""]; +} +} // namespace + +TEST_F(KeyStringTest, CommonIntPerf) { + // Exponential distribution, so skewed towards smaller integers. + std::random_device rd; + std::mt19937 gen(rd()); + std::exponential_distribution<double> expReal(1e-3); + + std::vector<BSONObj> numbers; + for (uint64_t x = 0; x < kMinPerfSamples; x++) + numbers.push_back(BSON("" << static_cast<int>(expReal(gen)))); + + perfTest(version, numbers); +} + +TEST_F(KeyStringTest, UniformInt64Perf) { + std::vector<BSONObj> numbers; + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution<long long> uniformInt64(std::numeric_limits<long long>::min(), + std::numeric_limits<long long>::max()); + + for (uint64_t x = 0; x < kMinPerfSamples; x++) + numbers.push_back(BSON("" << uniformInt64(gen))); + + perfTest(version, numbers); +} + +TEST_F(KeyStringTest, CommonDoublePerf) { + std::random_device rd; + std::mt19937 gen(rd()); + std::exponential_distribution<double> expReal(1e-3); + + std::vector<BSONObj> numbers; + for (uint64_t x = 0; x < kMinPerfSamples; x++) + numbers.push_back(BSON("" << expReal(gen))); + + perfTest(version, numbers); +} + +TEST_F(KeyStringTest, UniformDoublePerf) { + std::vector<BSONObj> numbers; + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution<long long> uniformInt64(std::numeric_limits<long long>::min(), + std::numeric_limits<long long>::max()); + + for (uint64_t x = 0; x < kMinPerfSamples; x++) { + uint64_t u = uniformInt64(gen); + double d; + memcpy(&d, &u, sizeof(d)); + if (std::isnormal(d)) + numbers.push_back(BSON("" << d)); + } + perfTest(version, numbers); +} + +TEST_F(KeyStringTest, CommonDecimalPerf) { + std::random_device rd; + std::mt19937 gen(rd()); + std::exponential_distribution<double> expReal(1e-3); + + if (version == KeyString::Version::V0) + return; + + std::vector<BSONObj> numbers; + for (uint64_t x = 0; x < kMinPerfSamples; x++) + numbers.push_back( + BSON("" << Decimal128( + expReal(gen), Decimal128::kRoundTo34Digits, Decimal128::kRoundTiesToAway) + .quantize(Decimal128("0.01", Decimal128::kRoundTiesToAway)))); + + perfTest(version, numbers); +} + +TEST_F(KeyStringTest, UniformDecimalPerf) { + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution<long long> uniformInt64(std::numeric_limits<long long>::min(), + std::numeric_limits<long long>::max()); + + if (version == KeyString::Version::V0) + return; + + std::vector<BSONObj> numbers; + for (uint64_t x = 0; x < kMinPerfSamples; x++) { + uint64_t hi = uniformInt64(gen); + uint64_t lo = uniformInt64(gen); + Decimal128 d(Decimal128::Value{lo, hi}); + if (!d.isZero() && !d.isNaN() && !d.isInfinite()) + numbers.push_back(BSON("" << d)); + } + perfTest(version, numbers); +} diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_index.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_index.cpp index 2320672fe19..283eca55364 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_index.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_index.cpp @@ -82,9 +82,10 @@ static const int TempKeyMaxSize = 1024; // this goes away with SERVER-3372 static const WiredTigerItem emptyItem(NULL, 0); +static const int kKeyStringV1Version = 7; static const int kMinimumIndexVersion = 6; -static const int kCurrentIndexVersion = 6; // New indexes use this by default. -static const int kMaximumIndexVersion = 6; +static const int kCurrentIndexVersion = kKeyStringV1Version; // New indexes use this by default. +static const int kMaximumIndexVersion = kKeyStringV1Version; static_assert(kCurrentIndexVersion >= kMinimumIndexVersion, "kCurrentIndexVersion >= kMinimumIndexVersion"); static_assert(kCurrentIndexVersion <= kMaximumIndexVersion, @@ -197,7 +198,7 @@ StatusWith<std::string> WiredTigerIndex::generateCreateString(const std::string& // Index metadata ss << ",app_metadata=(" - << "formatVersion=" << kCurrentIndexVersion << ',' + << "formatVersion=" << (enableBSON1_1 ? kKeyStringV1Version : kCurrentIndexVersion) << ',' << "infoObj=" << desc.infoObj().jsonString() << "),"; LOG(3) << "index create string: " << ss.ss.str(); @@ -218,6 +219,8 @@ WiredTigerIndex::WiredTigerIndex(OperationContext* ctx, const std::string& uri, const IndexDescriptor* desc) : _ordering(Ordering::make(desc->keyPattern())), + _keyStringVersion(desc->version() < kKeyStringV1Version ? KeyString::Version::V0 + : KeyString::Version::V1), _uri(uri), _tableId(WiredTigerSession::genTableId()), _collectionNamespace(desc->parentNS()), @@ -412,7 +415,7 @@ long long WiredTigerIndex::getSpaceUsedBytes(OperationContext* txn) const { bool WiredTigerIndex::isDup(WT_CURSOR* c, const BSONObj& key, const RecordId& id) { invariant(unique()); // First check whether the key exists. - KeyString data(key, _ordering); + KeyString data(keyStringVersion(), key, _ordering); WiredTigerItem item(data.getBuffer(), data.getSize()); c->set_key(c, item.Get()); int ret = WT_OP_CHECK(c->search(c)); @@ -430,7 +433,7 @@ bool WiredTigerIndex::isDup(WT_CURSOR* c, const BSONObj& key, const RecordId& id if (KeyString::decodeRecordId(&br) == id) return false; - KeyString::TypeBits::fromBuffer(&br); // Just calling this to advance reader. + KeyString::TypeBits::fromBuffer(keyStringVersion(), &br); // Just advance the reader. } return true; } @@ -511,7 +514,7 @@ public: return s; } - KeyString data(key, _idx->_ordering, id); + KeyString data(_idx->keyStringVersion(), key, _idx->_ordering, id); // Can't use WiredTigerCursor since we aren't using the cache. WiredTigerItem item(data.getBuffer(), data.getSize()); @@ -550,7 +553,10 @@ private: class WiredTigerIndex::UniqueBulkBuilder : public BulkBuilder { public: UniqueBulkBuilder(WiredTigerIndex* idx, OperationContext* txn, bool dupsAllowed) - : BulkBuilder(idx, txn), _idx(idx), _dupsAllowed(dupsAllowed) {} + : BulkBuilder(idx, txn), + _idx(idx), + _dupsAllowed(dupsAllowed), + _keyString(idx->keyStringVersion()) {} Status addKey(const BSONObj& newKey, const RecordId& id) { { @@ -598,7 +604,7 @@ private: void doInsert() { invariant(!_records.empty()); - KeyString value; + KeyString value(_idx->keyStringVersion()); for (size_t i = 0; i < _records.size(); i++) { value.appendRecordId(_records[i].first); // When there is only one record, we can omit AllZeros TypeBits. Otherwise they need @@ -634,7 +640,12 @@ namespace { class WiredTigerIndexCursorBase : public SortedDataInterface::Cursor { public: WiredTigerIndexCursorBase(const WiredTigerIndex& idx, OperationContext* txn, bool forward) - : _txn(txn), _idx(idx), _forward(forward) { + : _txn(txn), + _idx(idx), + _forward(forward), + _key(idx.keyStringVersion()), + _typeBits(idx.keyStringVersion()), + _query(idx.keyStringVersion()) { _cursor.emplace(_idx.uri(), _idx.tableId(), false, _txn); } boost::optional<IndexKeyEntry> next(RequestedInfo parts) override { @@ -660,7 +671,7 @@ public: // end after the key if inclusive and before if exclusive. const auto discriminator = _forward == inclusive ? KeyString::kExclusiveAfter : KeyString::kExclusiveBefore; - _endPosition = stdx::make_unique<KeyString>(); + _endPosition = stdx::make_unique<KeyString>(_idx.keyStringVersion()); _endPosition->resetToKey(stripFieldNames(key), _idx.ordering(), discriminator); } @@ -988,10 +999,10 @@ Status WiredTigerIndexUnique::_insert(WT_CURSOR* c, const BSONObj& key, const RecordId& id, bool dupsAllowed) { - const KeyString data(key, _ordering); + const KeyString data(keyStringVersion(), key, _ordering); WiredTigerItem keyItem(data.getBuffer(), data.getSize()); - KeyString value(id); + KeyString value(keyStringVersion(), id); if (!data.getTypeBits().isAllZeros()) value.appendTypeBits(data.getTypeBits()); @@ -1031,7 +1042,7 @@ Status WiredTigerIndexUnique::_insert(WT_CURSOR* c, // Copy from old to new value value.appendRecordId(idInIndex); - value.appendTypeBits(KeyString::TypeBits::fromBuffer(&br)); + value.appendTypeBits(KeyString::TypeBits::fromBuffer(keyStringVersion(), &br)); } if (!dupsAllowed) @@ -1052,7 +1063,7 @@ void WiredTigerIndexUnique::_unindex(WT_CURSOR* c, const BSONObj& key, const RecordId& id, bool dupsAllowed) { - KeyString data(key, _ordering); + KeyString data(keyStringVersion(), key, _ordering); WiredTigerItem keyItem(data.getBuffer(), data.getSize()); c->set_key(c, keyItem.Get()); @@ -1091,7 +1102,7 @@ void WiredTigerIndexUnique::_unindex(WT_CURSOR* c, BufReader br(old.data, old.size); while (br.remaining()) { RecordId idInIndex = KeyString::decodeRecordId(&br); - KeyString::TypeBits typeBits = KeyString::TypeBits::fromBuffer(&br); + KeyString::TypeBits typeBits = KeyString::TypeBits::fromBuffer(keyStringVersion(), &br); if (id == idInIndex) { if (records.empty() && !br.remaining()) { @@ -1114,7 +1125,7 @@ void WiredTigerIndexUnique::_unindex(WT_CURSOR* c, } // Put other ids for this key back in the index. - KeyString newValue; + KeyString newValue(keyStringVersion()); invariant(!records.empty()); for (size_t i = 0; i < records.size(); i++) { newValue.appendRecordId(records[i].first); @@ -1157,7 +1168,7 @@ Status WiredTigerIndexStandard::_insert(WT_CURSOR* c, TRACE_INDEX << " key: " << keyBson << " id: " << id; - KeyString key(keyBson, _ordering, id); + KeyString key(keyStringVersion(), keyBson, _ordering, id); WiredTigerItem keyItem(key.getBuffer(), key.getSize()); WiredTigerItem valueItem = key.getTypeBits().isAllZeros() @@ -1181,7 +1192,7 @@ void WiredTigerIndexStandard::_unindex(WT_CURSOR* c, const RecordId& id, bool dupsAllowed) { invariant(dupsAllowed); - KeyString data(key, _ordering, id); + KeyString data(keyStringVersion(), key, _ordering, id); WiredTigerItem item(data.getBuffer(), data.getSize()); c->set_key(c, item.Get()); int ret = WT_OP_CHECK(c->remove(c)); diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_index.h b/src/mongo/db/storage/wiredtiger/wiredtiger_index.h index cb4be8794c7..f989f0b1ce5 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_index.h +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_index.h @@ -31,6 +31,7 @@ #include <wiredtiger.h> #include "mongo/base/status_with.h" +#include "mongo/db/storage/key_string.h" #include "mongo/db/storage/index_entry_comparison.h" #include "mongo/db/storage/sorted_data_interface.h" #include "mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h" @@ -121,6 +122,10 @@ public: return _ordering; } + KeyString::Version keyStringVersion() const { + return _keyStringVersion; + } + virtual bool unique() const = 0; Status dupKeyError(const BSONObj& key); @@ -141,6 +146,7 @@ protected: class UniqueBulkBuilder; const Ordering _ordering; + const KeyString::Version _keyStringVersion; std::string _uri; uint64_t _tableId; std::string _collectionNamespace; diff --git a/src/mongo/platform/decimal128.cpp b/src/mongo/platform/decimal128.cpp index ddaa381b367..990b18e1569 100644 --- a/src/mongo/platform/decimal128.cpp +++ b/src/mongo/platform/decimal128.cpp @@ -223,7 +223,7 @@ Decimal128::Decimal128(double doubleValue, // Estimate doubleValue's base10 exponent from its base2 exponent by // multiplying by an approximation of log10(2). // Since 10^(x*log10(2)) == 2^x, this initial guess gets us very close. - int base10Exp = (base2Exp * 301) / 1000; + int base10Exp = (base2Exp * 30103) / (100 * 1000); // Although both 1000 and .001 have a base 10 exponent of magnitude 3, they have // a different number of leading/trailing zeros. Adjust base10Exp to compensate. @@ -244,6 +244,7 @@ Decimal128::Decimal128(double doubleValue, } // The decimal must have exactly 15 digits of precision + invariant(getCoefficientHigh() == 0); invariant(_value.low64 >= 100000000000000ull && _value.low64 <= 999999999999999ull); } @@ -566,11 +567,6 @@ Decimal128 Decimal128::quantize(const Decimal128& reference, return result; } -Decimal128 Decimal128::normalize() const { - // Normalize by adding 0E-6176 which forces a decimal to maximum precision (34 digits) - return add(kLargestNegativeExponentZero); -} - bool Decimal128::isEqual(const Decimal128& other) const { std::uint32_t throwAwayFlag = 0; BID_UINT128 current = decimal128ToLibraryType(_value); diff --git a/src/mongo/platform/decimal128.h b/src/mongo/platform/decimal128.h index 22dadb5c7c3..9481a30ccd1 100644 --- a/src/mongo/platform/decimal128.h +++ b/src/mongo/platform/decimal128.h @@ -360,7 +360,10 @@ public: * decimals with the same representation (5000000000000000000000000000000000E-33). * Hashing equal decimals to equal hashes becomes possible with such normalization. */ - Decimal128 normalize() const; + Decimal128 normalize() const { + // Normalize by adding 0E-6176 which forces a decimal to maximum precision (34 digits) + return add(kLargestNegativeExponentZero); + } /** * This set of comparison operations takes a single Decimal128 and returns a boolean |