summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorGeert Bosch <geert@mongodb.com>2016-03-31 00:42:05 -0400
committerGeert Bosch <geert@mongodb.com>2016-04-22 15:00:29 -0400
commitb5282c3b6d17a8b11e432ca5fbbfde5caddea048 (patch)
tree2205f7d457f0a1b7b278f8b723c5820c642d556d /src
parent8e13345122986b242811ebee1c0908a4fc01640c (diff)
downloadmongo-b5282c3b6d17a8b11e432ca5fbbfde5caddea048.tar.gz
SERVER-19703 Add KeyString support for NumberDecimal
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/catalog/collection.cpp3
-rw-r--r--src/mongo/db/storage/key_string.cpp890
-rw-r--r--src/mongo/db/storage/key_string.h122
-rw-r--r--src/mongo/db/storage/key_string_test.cpp667
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_index.cpp47
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_index.h6
-rw-r--r--src/mongo/platform/decimal128.cpp8
-rw-r--r--src/mongo/platform/decimal128.h5
8 files changed, 1399 insertions, 349 deletions
diff --git a/src/mongo/db/catalog/collection.cpp b/src/mongo/db/catalog/collection.cpp
index d83a825903b..53d77014120 100644
--- a/src/mongo/db/catalog/collection.cpp
+++ b/src/mongo/db/catalog/collection.cpp
@@ -1176,7 +1176,8 @@ private:
uint32_t hashIndexEntry(const BSONObj& key, RecordId& loc) {
uint32_t hash;
- KeyString ks(key, Ordering::make(BSONObj()), loc);
+ // We're only using KeyString to get something hashable here, so version doesn't matter.
+ KeyString ks(KeyString::Version::V1, key, Ordering::make(BSONObj()), loc);
MurmurHash3_x86_32(ks.getTypeBits().getBuffer(), ks.getTypeBits().getSize(), 0, &hash);
MurmurHash3_x86_32(ks.getBuffer(), ks.getSize(), hash, &hash);
// Take the first 25 bits to conserve memory.
diff --git a/src/mongo/db/storage/key_string.cpp b/src/mongo/db/storage/key_string.cpp
index 00dc9e067d9..3c5fafa3ec2 100644
--- a/src/mongo/db/storage/key_string.cpp
+++ b/src/mongo/db/storage/key_string.cpp
@@ -77,7 +77,7 @@ const uint8_t kMaxKey = 240;
// Therefore each format can be considered independently without considering
// cross-format comparisons.
const uint8_t kNumericNaN = kNumeric + 0;
-const uint8_t kNumericNegativeLargeDouble = kNumeric + 1; // <= -2**63 including -Inf
+const uint8_t kNumericNegativeLargeMagnitude = kNumeric + 1; // <= -2**63 including -Inf
const uint8_t kNumericNegative8ByteInt = kNumeric + 2;
const uint8_t kNumericNegative7ByteInt = kNumeric + 3;
const uint8_t kNumericNegative6ByteInt = kNumeric + 4;
@@ -86,9 +86,9 @@ const uint8_t kNumericNegative4ByteInt = kNumeric + 6;
const uint8_t kNumericNegative3ByteInt = kNumeric + 7;
const uint8_t kNumericNegative2ByteInt = kNumeric + 8;
const uint8_t kNumericNegative1ByteInt = kNumeric + 9;
-const uint8_t kNumericNegativeSmallDouble = kNumeric + 10; // between 0 and -1 exclusive
+const uint8_t kNumericNegativeSmallMagnitude = kNumeric + 10; // between 0 and -1 exclusive
const uint8_t kNumericZero = kNumeric + 11;
-const uint8_t kNumericPositiveSmallDouble = kNumeric + 12; // between 0 and 1 exclusive
+const uint8_t kNumericPositiveSmallMagnitude = kNumeric + 12; // between 0 and 1 exclusive
const uint8_t kNumericPositive1ByteInt = kNumeric + 13;
const uint8_t kNumericPositive2ByteInt = kNumeric + 14;
const uint8_t kNumericPositive3ByteInt = kNumeric + 15;
@@ -97,9 +97,9 @@ const uint8_t kNumericPositive5ByteInt = kNumeric + 17;
const uint8_t kNumericPositive6ByteInt = kNumeric + 18;
const uint8_t kNumericPositive7ByteInt = kNumeric + 19;
const uint8_t kNumericPositive8ByteInt = kNumeric + 20;
-const uint8_t kNumericPositiveLargeDouble = kNumeric + 21; // >= 2**63 including +Inf
-static_assert(kNumericPositiveLargeDouble < kStringLike,
- "kNumericPositiveLargeDouble < kStringLike");
+const uint8_t kNumericPositiveLargeMagnitude = kNumeric + 21; // >= 2**63 including +Inf
+static_assert(kNumericPositiveLargeMagnitude < kStringLike,
+ "kNumericPositiveLargeMagnitude < kStringLike");
const uint8_t kBoolFalse = kBool + 0;
const uint8_t kBoolTrue = kBool + 1;
@@ -169,8 +169,54 @@ uint8_t bsonTypeToGenericKeyStringType(BSONType type) {
}
}
+// Doubles smaller than this store only a single bit indicating a decimal continuation follows.
+const double kTiniestDoubleWith2BitDCM = std::ldexp(1, -255);
+
+// Amount to add to exponent of doubles tinier than kTiniestDoubleWith2BitDCM to avoid subnormals.
+const int kTinyDoubleExponentShift = 256;
+
+// Amount to multiply tiny doubles to perform a shift of the exponent by
+// kSmallMagnitudeExponentShift.
+const double kTinyDoubleExponentUpshiftFactor = std::ldexp(1, kTinyDoubleExponentShift);
+
+// Amount to multiply scaled tiny doubles by to recover the unscaled value.
+const double kTinyDoubleExponentDownshiftFactor = std::ldexp(1, -kTinyDoubleExponentShift);
+
+// An underestimate of 2**256.
+const Decimal128 kTinyDoubleExponentUpshiftFactorAsDecimal(std::ldexp(1, kTinyDoubleExponentShift),
+ Decimal128::kRoundTo34Digits,
+ Decimal128::kRoundTowardZero);
+
+// An underestimate of 2**(-256).
+const Decimal128 kTinyDoubleExponentDownshiftFactorAsDecimal(std::ldexp(1,
+ -kTinyDoubleExponentShift),
+ Decimal128::kRoundTo34Digits,
+ Decimal128::kRoundTowardZero);
+
// First double that isn't an int64.
-const double kMinLargeDouble = 9223372036854775808.0; // 1ULL<<63
+const double kMinLargeDouble = 1ULL << 63;
+
+// Integers larger than this may not be representable as doubles.
+const double kMaxIntForDouble = 1ULL << 53;
+
+// Factors for scaling a double by powers of 256 to do a logical shift left of x bytes.
+const double kPow256[] = {1.0, // 2**0
+ 1.0 * 256, // 2**8
+ 1.0 * 256 * 256, // 2**16
+ 1.0 * 256 * 256 * 256, // 2**24
+ 1.0 * 256 * 256 * 256 * 256, // 2**32
+ 1.0 * 256 * 256 * 256 * 256 * 256, // 2**40
+ 1.0 * 256 * 256 * 256 * 256 * 256 * 256, // 2**48
+ 1.0 * 256 * 256 * 256 * 256 * 256 * 256 * 256}; // 2**56
+// Factors for scaling a double by negative powers of 256 to do a logical shift right of x bytes.
+const double kInvPow256[] = {1.0, // 2**0
+ 1.0 / 256, // 2**(-8)
+ 1.0 / 256 / 256, // 2**(-16)
+ 1.0 / 256 / 256 / 256, // 2**(-24)
+ 1.0 / 256 / 256 / 256 / 256, // 2**(-32)
+ 1.0 / 256 / 256 / 256 / 256 / 256, // 2**(-40)
+ 1.0 / 256 / 256 / 256 / 256 / 256 / 256, // 2**(-48)
+ 1.0 / 256 / 256 / 256 / 256 / 256 / 256 / 256}; // 2**(-56)
const uint8_t kEnd = 0x4;
@@ -479,65 +525,85 @@ void KeyString::_appendObject(const BSONObj& val, bool invert) {
}
void KeyString::_appendNumberDouble(const double num, bool invert) {
- if (num == 0.0 && std::signbit(num)) {
- _typeBits.appendNegativeZero();
- } else {
+ if (num == 0.0 && std::signbit(num))
+ _typeBits.appendZero(TypeBits::kNegativeDoubleZero);
+ else
_typeBits.appendNumberDouble();
- }
- // no special cases needed for Inf,
- // see http://en.wikipedia.org/wiki/IEEE_754-1985#Positive_and_negative_infinity
- if (std::isnan(num)) {
- _append(CType::kNumericNaN, invert);
- return;
- }
-
- if (num == 0.0) {
- // We are collapsing -0.0 and 0.0 to the same value here.
- // This is correct as IEEE-754 specifies that they compare as equal,
- // however this prevents roundtripping -0.0.
- // So if you put a -0.0 in, you'll get 0.0 out.
- // We believe this to be ok.
- _append(CType::kNumericZero, invert);
- return;
- }
+ _appendDoubleWithoutTypeBits(num, kDCMEqualToDouble, invert);
+}
+void KeyString::_appendDoubleWithoutTypeBits(const double num,
+ DecimalContinuationMarker dcm,
+ bool invert) {
const bool isNegative = num < 0.0;
const double magnitude = isNegative ? -num : num;
- if (magnitude < 1.0) {
- // This includes subnormal numbers.
- _appendSmallDouble(num, invert);
+ // Tests are structured such that NaNs and infinities fall through correctly.
+ if (!(magnitude >= 1.0)) {
+ if (magnitude > 0.0) {
+ // This includes subnormal numbers.
+ _appendSmallDouble(num, dcm, invert);
+ } else if (num == 0.0) {
+ // We are collapsing -0.0 and 0.0 to the same value here, the type bits disambiguate.
+ _append(CType::kNumericZero, invert);
+ } else {
+ invariant(std::isnan(num));
+ _append(CType::kNumericNaN, invert);
+ }
return;
}
if (magnitude < kMinLargeDouble) {
- uint64_t integerPart = uint64_t(magnitude);
- if (double(integerPart) == magnitude) {
+ uint64_t integerPart = static_cast<uint64_t>(magnitude);
+ if (static_cast<double>(integerPart) == magnitude && dcm == kDCMEqualToDouble) {
// No fractional part
_appendPreshiftedIntegerPortion(integerPart << 1, isNegative, invert);
return;
}
- // There is a fractional part.
- _appendPreshiftedIntegerPortion((integerPart << 1) | 1, isNegative, invert);
-
- // Append the bytes of the mantissa that include fractional bits.
- const size_t fractionalBits = (53 - (64 - countLeadingZeros64(integerPart)));
- const size_t fractionalBytes = (fractionalBits + 7) / 8;
- dassert(fractionalBytes > 0);
- uint64_t mantissa;
- memcpy(&mantissa, &num, sizeof(mantissa));
- mantissa &= ~(uint64_t(-1) << fractionalBits); // set non-fractional bits to 0;
- mantissa = endian::nativeToBig(mantissa);
-
- const void* firstUsedByte =
- reinterpret_cast<const char*>((&mantissa) + 1) - fractionalBytes;
- _appendBytes(firstUsedByte, fractionalBytes, isNegative ? !invert : invert);
- return;
- }
+ if (version == Version::V0) {
+ invariant(dcm == kDCMEqualToDouble);
+ // There is a fractional part.
+ _appendPreshiftedIntegerPortion((integerPart << 1) | 1, isNegative, invert);
+
+ // Append the bytes of the mantissa that include fractional bits.
+ const size_t fractionalBits = 53 - (64 - countLeadingZeros64(integerPart));
+ const size_t fractionalBytes = (fractionalBits + 7) / 8;
+ dassert(fractionalBytes > 0);
+ uint64_t mantissa;
+ memcpy(&mantissa, &num, sizeof(mantissa));
+ mantissa &= ~(uint64_t(-1) << fractionalBits); // set non-fractional bits to 0;
- _appendLargeDouble(num, invert);
+ mantissa = endian::nativeToBig(mantissa);
+
+ const void* firstUsedByte =
+ reinterpret_cast<const char*>((&mantissa) + 1) - fractionalBytes;
+ _appendBytes(firstUsedByte, fractionalBytes, isNegative ? !invert : invert);
+ } else {
+ const size_t fractionalBytes = countLeadingZeros64(integerPart << 1) / 8;
+ const auto ctype = isNegative ? CType::kNumericNegative8ByteInt + fractionalBytes
+ : CType::kNumericPositive8ByteInt - fractionalBytes;
+ _append(static_cast<uint8_t>(ctype), invert);
+
+ // Multiplying the double by 256 to the power X is logically equivalent to shifting the
+ // fraction left by X bytes.
+ uint64_t encoding = static_cast<uint64_t>(magnitude * kPow256[fractionalBytes]);
+ dassert(encoding == magnitude * kPow256[fractionalBytes]);
+
+ // Merge in the bit indicating the value has a fractional part by doubling the integer
+ // part and adding 1. This leaves encoding with the high 8-fractionalBytes bytes in the
+ // same form they'd have with _appendPreshiftedIntegerPortion(). The remaining low bytes
+ // are the fractional bytes left-shifted by 2 bits to make room for the DCM.
+ encoding += (integerPart + 1) << (fractionalBytes * 8);
+ invariant((encoding & 0x3ULL) == 0);
+ encoding |= dcm;
+ encoding = endian::nativeToBig(encoding);
+ _append(encoding, isNegative ? !invert : invert);
+ }
+ } else {
+ _appendLargeDouble(num, dcm, invert);
+ }
}
void KeyString::_appendNumberLong(const long long num, bool invert) {
@@ -550,6 +616,148 @@ void KeyString::_appendNumberInt(const int num, bool invert) {
_appendInteger(num, invert);
}
+void KeyString::_appendNumberDecimal(const Decimal128 dec, bool invert) {
+ bool isNegative = dec.isNegative();
+ if (dec.isZero()) {
+ uint32_t zeroExp = dec.getBiasedExponent();
+ if (isNegative)
+ zeroExp += Decimal128::kMaxBiasedExponent + 1;
+
+ _typeBits.appendDecimalZero(zeroExp);
+ _append(CType::kNumericZero, invert);
+ return;
+ }
+
+ if (dec.isNaN()) {
+ _append(CType::kNumericNaN, invert);
+ _typeBits.appendNumberDecimal();
+ return;
+ }
+
+ if (dec.isInfinite()) {
+ _append(isNegative ? CType::kNumericNegativeLargeMagnitude
+ : CType::kNumericPositiveLargeMagnitude,
+ invert);
+ const uint64_t infinity = ~0ULL;
+ _append(infinity, isNegative ? !invert : invert);
+ _typeBits.appendNumberDecimal();
+ return;
+ }
+
+ const uint32_t biasedExponent = dec.getBiasedExponent();
+ dassert(biasedExponent <= Decimal128::kInfinityExponent);
+ _typeBits.appendNumberDecimal();
+ _typeBits.appendDecimalExponent(biasedExponent & TypeBits::kStoredDecimalExponentMask);
+
+ uint32_t signalingFlags = Decimal128::kNoFlag;
+ double bin = dec.toDouble(&signalingFlags, Decimal128::kRoundTowardZero);
+
+ // Easy case: the decimal actually is a double. True for many integers, fractions like 1.5, etc.
+ if (!Decimal128::hasFlag(signalingFlags, Decimal128::kInexact) &&
+ !Decimal128::hasFlag(signalingFlags, Decimal128::kOverflow)) {
+ _appendDoubleWithoutTypeBits(bin, kDCMEqualToDouble, invert);
+ return;
+ }
+
+ // Values smaller than the double normalized range need special handling: a regular double
+ // wouldn't give 15 digits, if any at all.
+ if (std::abs(bin) < std::numeric_limits<double>::min()) {
+ _appendTinyDecimalWithoutTypeBits(dec, bin, invert);
+ return;
+ }
+
+ // Huge finite values are encoded directly. Because the value is not exact, and truncates
+ // to the maximum double, the original decimal was outside of the range of finite doubles.
+ // Because all decimals larger than the max finite double round down to that value, strict
+ // less-than would be incorrect.
+ if (std::abs(bin) >= std::numeric_limits<double>::max()) {
+ _appendHugeDecimalWithoutTypeBits(dec, invert);
+ return;
+ }
+
+ const auto roundAwayFromZero =
+ isNegative ? Decimal128::kRoundTowardNegative : Decimal128::kRoundTowardPositive;
+ const uint64_t k1E15 = 1E15; // Exact in both double and int64_t.
+
+ // If the conditions below fall through, a decimal continuation is needed to represent the
+ // difference between the stored value and the actual decimal. All paths that fall through
+ // must set 'storedValue', overwriting the NaN.
+ Decimal128 storedValue = Decimal128::kPositiveNaN;
+
+ // For doubles in this range, 'bin' may have lost precision in the integer part, which would
+ // lead to miscompares with integers. So, instead handle explicitly.
+ if ((bin <= -kMaxIntForDouble || bin >= kMaxIntForDouble) && bin > -kMinLargeDouble &&
+ bin < kMinLargeDouble) {
+ uint32_t signalingFlags = Decimal128::kNoFlag;
+ Decimal128 truncated = dec.quantize(
+ Decimal128::kNormalizedZero, &signalingFlags, Decimal128::kRoundTowardZero);
+ dassert(truncated.getBiasedExponent() == Decimal128::kExponentBias);
+ dassert(truncated.getCoefficientHigh() == 0 &&
+ truncated.getCoefficientLow() < (1ULL << 63));
+ int64_t integerPart = truncated.getCoefficientLow();
+ bool hasFraction = Decimal128::hasFlag(signalingFlags, Decimal128::kInexact);
+ bool isNegative = truncated.isNegative();
+ bool has8bytes = integerPart >= (1LL << 55);
+ uint64_t preshifted = integerPart << 1;
+ _appendPreshiftedIntegerPortion(preshifted | hasFraction, isNegative, invert);
+ if (!hasFraction)
+ return;
+ if (!has8bytes) {
+ // A Fraction byte follows, but the leading 7 bytes already encode 53 bits of the
+ // coefficient, so just store the DCM.
+ uint8_t dcm = hasFraction ? kDCMHasContinuationLargerThanDoubleRoundedUpTo15Digits
+ : kDCMEqualToDouble;
+ _append(dcm, isNegative ? !invert : invert);
+ }
+
+ storedValue = Decimal128(isNegative, Decimal128::kExponentBias, 0, integerPart);
+
+ // Common case: the coefficient less than 1E15, so at most 15 digits, and the number is
+ // in the normal range of double, so the decimal can be represented with at least 15 digits
+ // of precision by the double 'bin'
+ } else if (dec.getCoefficientHigh() == 0 && dec.getCoefficientLow() < k1E15) {
+ dassert(Decimal128(std::abs(bin),
+ Decimal128::kRoundTo15Digits,
+ Decimal128::kRoundTowardPositive).isEqual(dec.toAbs()));
+ _appendDoubleWithoutTypeBits(bin, kDCMEqualToDoubleRoundedUpTo15Digits, invert);
+ return;
+ } else {
+ // The coefficient has more digits, but may still be 15 digits after removing trailing
+ // zeros.
+ Decimal128 decFromBin = Decimal128(bin, Decimal128::kRoundTo15Digits, roundAwayFromZero);
+ if (decFromBin.isEqual(dec)) {
+ _appendDoubleWithoutTypeBits(bin, kDCMEqualToDoubleRoundedUpTo15Digits, invert);
+ return;
+ }
+ // Harder cases: decimal continuation is needed.
+ // First store the double and kind of continuation needed.
+ DecimalContinuationMarker dcm = dec.isLess(decFromBin) != isNegative
+ ? kDCMHasContinuationLessThanDoubleRoundedUpTo15Digits
+ : kDCMHasContinuationLargerThanDoubleRoundedUpTo15Digits;
+ _appendDoubleWithoutTypeBits(bin, dcm, invert);
+
+ // Note that 'dec' and 'bin' can be negative.
+ storedValue = Decimal128(bin, Decimal128::kRoundTo34Digits, roundAwayFromZero);
+ }
+ invariant(!storedValue.isNaN()); // Should have been set explicitly.
+ storedValue = storedValue.normalize(); // Normalize to 34 digits to fix decDiff exponent.
+
+ Decimal128 decDiff = dec.subtract(storedValue);
+ invariant(decDiff.isNegative() == dec.isNegative() || decDiff.isZero());
+ invariant(decDiff.getBiasedExponent() == storedValue.getBiasedExponent());
+ invariant(decDiff.getCoefficientHigh() == 0);
+
+ // Now we know that we can recover the original decimal value (but not its precision, which is
+ // given by the type bits) from the binary double plus the decimal continuation.
+ uint64_t decimalContinuation = decDiff.getCoefficientLow();
+ dassert(storedValue.add(Decimal128(isNegative,
+ storedValue.getBiasedExponent(),
+ 0,
+ decimalContinuation)).isEqual(dec));
+ decimalContinuation = endian::nativeToBig(decimalContinuation);
+ _append(decimalContinuation, isNegative ? !invert : invert);
+}
+
void KeyString::_appendBsonValue(const BSONElement& elem, bool invert, const StringData* name) {
if (name) {
_appendBytes(name->rawData(), name->size() + 1, invert); // + 1 for NUL
@@ -621,6 +829,12 @@ void KeyString::_appendBsonValue(const BSONElement& elem, bool invert, const Str
case NumberLong:
_appendNumberLong(elem._numberLong(), invert);
break;
+ case NumberDecimal:
+ uassert(ErrorCodes::UnsupportedFormat,
+ "Index version does not support NumberDecimal",
+ version >= Version::V1);
+ _appendNumberDecimal(elem._numberDecimal(), invert);
+ break;
default:
invariant(false);
@@ -656,36 +870,138 @@ void KeyString::_appendBson(const BSONObj& obj, bool invert) {
_append(int8_t(0), invert);
}
-void KeyString::_appendSmallDouble(double value, bool invert) {
- dassert(!std::isnan(value));
- dassert(value != 0.0);
-
- uint64_t data;
- memcpy(&data, &value, sizeof(data));
-
- if (value > 0) {
- _append(CType::kNumericPositiveSmallDouble, invert);
- _append(endian::nativeToBig(data), invert);
+void KeyString::_appendSmallDouble(double value, DecimalContinuationMarker dcm, bool invert) {
+ bool isNegative = value < 0;
+ double magnitude = isNegative ? -value : value;
+ dassert(!std::isnan(value) && value != 0 && magnitude < 1);
+
+ _append(isNegative ? CType::kNumericNegativeSmallMagnitude
+ : CType::kNumericPositiveSmallMagnitude,
+ invert);
+
+ uint64_t encoded;
+
+ if (version == KeyString::Version::V0) {
+ // Not using magnitude to preserve sign bit in V0
+ memcpy(&encoded, &value, sizeof(encoded));
+ } else if (magnitude >= kTiniestDoubleWith2BitDCM) {
+ // Values in the range [2**(-255), 1) get the prefix 0b11
+ memcpy(&encoded, &magnitude, sizeof(encoded));
+ dassert((encoded & (0x3ULL << 62)) == 0);
+ encoded <<= 2;
+ encoded |= dcm;
+ dassert(encoded >> 62 == 0x3);
} else {
- _append(CType::kNumericNegativeSmallDouble, invert);
- _append(endian::nativeToBig(data), !invert);
+ // Values in the range [numeric_limits<double>::denorm_min(), 2**(-255)) get the prefixes
+ // 0b01 or 0b10. The 0b00 prefix is used by _appendHugeDecimalWithoutTypeBits for decimals
+ // smaller than that.
+ magnitude *= kTinyDoubleExponentUpshiftFactor;
+ memcpy(&encoded, &magnitude, sizeof(encoded));
+ encoded <<= 1;
+ encoded |= (dcm != kDCMEqualToDouble);
+ // Change the two most significant bits from 0b00 or 0b01 to 0b01 or 0b10.
+ encoded += (1ULL << 62);
+ dassert(encoded >> 62 == 0x1 || encoded >> 62 == 0x2);
}
+
+ _append(endian::nativeToBig(encoded), isNegative ? !invert : invert);
}
-void KeyString::_appendLargeDouble(double value, bool invert) {
+void KeyString::_appendLargeDouble(double value, DecimalContinuationMarker dcm, bool invert) {
dassert(!std::isnan(value));
dassert(value != 0.0);
- uint64_t data;
- memcpy(&data, &value, sizeof(data));
+ _append(value > 0 ? CType::kNumericPositiveLargeMagnitude
+ : CType::kNumericNegativeLargeMagnitude,
+ invert);
- if (value > 0) {
- _append(CType::kNumericPositiveLargeDouble, invert);
- _append(endian::nativeToBig(data), invert);
- } else {
- _append(CType::kNumericNegativeLargeDouble, invert);
- _append(endian::nativeToBig(data), !invert);
+ uint64_t encoded;
+ memcpy(&encoded, &value, sizeof(encoded));
+
+ if (version != Version::V0) {
+ if (std::isfinite(value)) {
+ encoded <<= 1;
+ encoded &= ~(1ULL << 63);
+ encoded |= (dcm != kDCMEqualToDouble);
+ } else {
+ encoded = ~0ULL; // infinity
+ }
}
+ encoded = endian::nativeToBig(encoded);
+ _append(encoded, value > 0 ? invert : !invert);
+}
+
+void KeyString::_appendTinyDecimalWithoutTypeBits(const Decimal128 dec,
+ const double bin,
+ bool invert) {
+ // This function is only for 'dec' that doesn't exactly equal a double, but rounds to 'bin'
+ dassert(bin == dec.toDouble(Decimal128::kRoundTowardZero));
+ dassert(std::abs(bin) < DBL_MIN);
+ const bool isNegative = dec.isNegative();
+ Decimal128 magnitude = isNegative ? dec.negate() : dec;
+
+ _append(isNegative ? CType::kNumericNegativeSmallMagnitude
+ : CType::kNumericPositiveSmallMagnitude,
+ invert);
+
+ // For decimals smaller than the smallest subnormal double, just store the decimal number
+ if (bin == 0.0) {
+ Decimal128 normalized = magnitude.normalize();
+ uint64_t hi = normalized.getValue().high64;
+ uint64_t lo = normalized.getValue().low64;
+ invariant((hi & (0x3ULL << 62)) == 0);
+ _append(endian::nativeToBig(hi), isNegative ? !invert : invert);
+ _append(endian::nativeToBig(lo), isNegative ? !invert : invert);
+ return;
+ }
+ // Encode decimal in subnormal double range by scaling in the decimal domain. Round down at
+ // each step, but ensure not to get below the subnormal double. This will ensure that
+ // 'scaledBin' is monotonically increasing and will only be off by at most a few units in the
+ // last place, so the decimal continuation will stay in range.
+ Decimal128 scaledDec =
+ magnitude.multiply(kTinyDoubleExponentUpshiftFactorAsDecimal, Decimal128::kRoundTowardZero);
+ double scaledBin = scaledDec.toDouble(Decimal128::kRoundTowardZero);
+
+ // Here we know that scaledBin contains the first 15 significant digits of scaled dec, and
+ // sorts correctly with scaled double.
+ scaledBin = std::max(scaledBin, std::abs(bin) * kTinyDoubleExponentUpshiftFactor);
+ uint64_t encoded;
+ memcpy(&encoded, &scaledBin, sizeof(encoded));
+ encoded <<= 1;
+ encoded |= 1; // Even if decDiff.isZero() we aren't exactly equal
+ encoded += (1ULL << 62);
+ dassert(encoded >> 62 == 0x1);
+ _append(endian::nativeToBig(encoded), isNegative ? !invert : invert);
+
+ Decimal128 storedVal(scaledBin, Decimal128::kRoundTo34Digits, Decimal128::kRoundTowardPositive);
+ storedVal = storedVal.multiply(kTinyDoubleExponentDownshiftFactorAsDecimal,
+ Decimal128::kRoundTowardZero)
+ .add(Decimal128::kLargestNegativeExponentZero);
+ dassert(storedVal.isLess(magnitude));
+ Decimal128 decDiff = magnitude.subtract(storedVal);
+ dassert(decDiff.getBiasedExponent() == storedVal.getBiasedExponent() || decDiff.isZero());
+ dassert(decDiff.getCoefficientHigh() == 0 && !decDiff.isNegative());
+ uint64_t continuation = decDiff.getCoefficientLow();
+ _append(endian::nativeToBig(continuation), isNegative ? !invert : invert);
+}
+
+
+void KeyString::_appendHugeDecimalWithoutTypeBits(const Decimal128 dec, bool invert) {
+ // To allow us to use CType::kNumericNegativeLargeMagnitude we need to fit between the highest
+ // finite double and the representation of +/-Inf. We do this by forcing the high bit to 1
+ // (large doubles always have 0) and never encoding ~0 here.
+
+ const bool isNegative = dec.isNegative();
+ Decimal128 normalizedMagnitude = (isNegative ? dec.negate() : dec).normalize();
+ uint64_t hi = normalizedMagnitude.getValue().high64;
+ uint64_t lo = normalizedMagnitude.getValue().low64;
+ dassert(hi < (1ULL << 63));
+ hi |= (1ULL << 63);
+ _append(isNegative ? CType::kNumericNegativeLargeMagnitude
+ : CType::kNumericPositiveLargeMagnitude,
+ invert);
+ _append(endian::nativeToBig(hi), isNegative ? !invert : invert);
+ _append(endian::nativeToBig(lo), isNegative ? !invert : invert);
}
// Handles NumberLong and NumberInt which are encoded identically except for the TypeBits.
@@ -694,7 +1010,7 @@ void KeyString::_appendInteger(const long long num, bool invert) {
// -2**63 is exactly representable as a double and not as a positive int64.
// Therefore we encode it as a double.
dassert(-double(num) == kMinLargeDouble);
- _appendLargeDouble(double(num), invert);
+ _appendLargeDouble(static_cast<double>(num), kDCMEqualToDouble, invert);
return;
}
@@ -708,10 +1024,9 @@ void KeyString::_appendInteger(const long long num, bool invert) {
_appendPreshiftedIntegerPortion(magnitude << 1, isNegative, invert);
}
-
void KeyString::_appendPreshiftedIntegerPortion(uint64_t value, bool isNegative, bool invert) {
- dassert(value != 0ull);
- dassert(value != 1ull);
+ dassert(value != 0ULL);
+ dassert(value != 1ULL);
const size_t bytesNeeded = (64 - countLeadingZeros64(value) + 7) / 8;
@@ -753,26 +1068,53 @@ void toBsonValue(uint8_t ctype,
BufReader* reader,
TypeBits::Reader* typeBits,
bool inverted,
+ KeyString::Version version,
BSONObjBuilderValueStream* stream);
-void toBson(BufReader* reader, TypeBits::Reader* typeBits, bool inverted, BSONObjBuilder* builder) {
+void toBson(BufReader* reader,
+ TypeBits::Reader* typeBits,
+ bool inverted,
+ KeyString::Version version,
+ BSONObjBuilder* builder) {
while (readType<uint8_t>(reader, inverted) != 0) {
if (inverted) {
std::string name = readInvertedCString(reader);
BSONObjBuilderValueStream& stream = *builder << name;
- toBsonValue(readType<uint8_t>(reader, inverted), reader, typeBits, inverted, &stream);
+ toBsonValue(
+ readType<uint8_t>(reader, inverted), reader, typeBits, inverted, version, &stream);
} else {
StringData name = readCString(reader);
BSONObjBuilderValueStream& stream = *builder << name;
- toBsonValue(readType<uint8_t>(reader, inverted), reader, typeBits, inverted, &stream);
+ toBsonValue(
+ readType<uint8_t>(reader, inverted), reader, typeBits, inverted, version, &stream);
}
}
}
+/**
+ * Helper function to read the least significant type bits for 'num' and return a value that
+ * is numerically equal to 'num', but has its exponent adjusted to match the stored exponent bits.
+ */
+Decimal128 adjustDecimalExponent(TypeBits::Reader* typeBits, Decimal128 num);
+
+/**
+ * Helper function that takes a 'num' with 15 decimal digits of precision, normalizes it to 34
+ * digits and reads a 64-bit (19-digit) continuation to obtain the full 34-bit value.
+ */
+Decimal128 readDecimalContinuation(BufReader* reader, bool inverted, Decimal128 num) {
+ uint32_t flags = Decimal128::kNoFlag;
+ uint64_t continuation = endian::bigToNative(readType<uint64_t>(reader, inverted));
+ num = num.normalize();
+ num = num.add(Decimal128(num.isNegative(), num.getBiasedExponent(), 0, continuation), &flags);
+ invariant(!(Decimal128::hasFlag(flags, Decimal128::kInexact)));
+ return num;
+}
+
void toBsonValue(uint8_t ctype,
BufReader* reader,
TypeBits::Reader* typeBits,
bool inverted,
+ KeyString::Version version,
BSONObjBuilderValueStream* stream) {
// This is only used by the kNumeric.*ByteInt types, but needs to be declared up here
// since it is used across a fallthrough.
@@ -861,7 +1203,7 @@ void toBsonValue(uint8_t ctype,
}
// Not going to optimize CodeWScope.
BSONObjBuilder scope;
- toBson(reader, typeBits, inverted, &scope);
+ toBson(reader, typeBits, inverted, version, &scope);
*stream << BSONCodeWScope(code, scope.done());
break;
}
@@ -916,7 +1258,7 @@ void toBsonValue(uint8_t ctype,
case CType::kObject: {
BSONObjBuilder subObj(stream->subobjStart());
- toBson(reader, typeBits, inverted, &subObj);
+ toBson(reader, typeBits, inverted, version, &subObj);
break;
}
@@ -929,6 +1271,7 @@ void toBsonValue(uint8_t ctype,
reader,
typeBits,
inverted,
+ version,
&(subArr << BSONObjBuilder::numStr(index++)));
}
break;
@@ -938,13 +1281,20 @@ void toBsonValue(uint8_t ctype,
// Numerics
//
- case CType::kNumericNaN:
- invariant(typeBits->readNumeric() == TypeBits::kDouble);
- *stream << std::numeric_limits<double>::quiet_NaN();
+ case CType::kNumericNaN: {
+ auto type = typeBits->readNumeric();
+ if (type == TypeBits::kDouble) {
+ *stream << std::numeric_limits<double>::quiet_NaN();
+ } else {
+ invariant(type == TypeBits::kDecimal && version == KeyString::Version::V1);
+ *stream << Decimal128::kPositiveNaN;
+ }
break;
+ }
- case CType::kNumericZero:
- switch (typeBits->readNumeric()) {
+ case CType::kNumericZero: {
+ uint8_t zeroType = typeBits->readZero();
+ switch (zeroType) {
case TypeBits::kDouble:
*stream << 0.0;
break;
@@ -954,35 +1304,190 @@ void toBsonValue(uint8_t ctype,
case TypeBits::kLong:
*stream << 0ll;
break;
- case TypeBits::kNegativeZero:
+ case TypeBits::kNegativeDoubleZero:
*stream << -0.0;
break;
+ default:
+ const uint32_t whichZero = typeBits->readDecimalZero(zeroType);
+ const bool isNegative = whichZero > Decimal128::kMaxBiasedExponent;
+ const uint32_t biasedExponent =
+ isNegative ? whichZero - (Decimal128::kMaxBiasedExponent + 1) : whichZero;
+
+ *stream << Decimal128(isNegative, biasedExponent, 0, 0);
+ break;
}
break;
+ }
- case CType::kNumericNegativeLargeDouble:
- case CType::kNumericNegativeSmallDouble:
+ case CType::kNumericNegativeLargeMagnitude:
inverted = !inverted;
+ isNegative = true;
// fallthrough (format is the same as positive, but inverted)
-
- case CType::kNumericPositiveLargeDouble:
- case CType::kNumericPositiveSmallDouble: {
- // for these, the raw double was stored intact, including sign bit.
+ case CType::kNumericPositiveLargeMagnitude: {
const uint8_t originalType = typeBits->readNumeric();
+ invariant(version > KeyString::Version::V0 || originalType != TypeBits::kDecimal);
uint64_t encoded = readType<uint64_t>(reader, inverted);
encoded = endian::bigToNative(encoded);
- double d;
- memcpy(&d, &encoded, sizeof(d));
+ bool hasDecimalContinuation = false;
+ double bin;
+
+ // Backward compatibility
+ if (version == KeyString::Version::V0) {
+ memcpy(&bin, &encoded, sizeof(bin));
+ } else if (!(encoded & (1ULL << 63))) { // In range of (finite) doubles
+ hasDecimalContinuation = encoded & 1;
+ encoded >>= 1; // remove decimal continuation marker
+ encoded |= 1ULL << 62; // implied leading exponent bit
+ memcpy(&bin, &encoded, sizeof(bin));
+ if (isNegative)
+ bin = -bin;
+ } else if (encoded == ~0ULL) { // infinity
+ bin = isNegative ? -std::numeric_limits<double>::infinity()
+ : std::numeric_limits<double>::infinity();
+ } else { // Huge decimal number, directly output
+ invariant(originalType == TypeBits::kDecimal);
+ uint64_t highbits = encoded & ~(1ULL << 63);
+ uint64_t lowbits = endian::bigToNative(readType<uint64_t>(reader, inverted));
+ Decimal128 dec(Decimal128::Value{lowbits, highbits});
+ if (isNegative)
+ dec = dec.negate();
+ dec = adjustDecimalExponent(typeBits, dec);
+ *stream << dec;
+ break;
+ }
+ // 'bin' contains the value of the input, rounded toward zero in case of decimal
if (originalType == TypeBits::kDouble) {
- *stream << d;
- } else {
+ *stream << bin;
+ } else if (originalType == TypeBits::kLong) {
// This can only happen for a single number.
- invariant(originalType == TypeBits::kLong);
- invariant(d == double(std::numeric_limits<long long>::min()));
+ invariant(bin == static_cast<double>(std::numeric_limits<long long>::min()));
*stream << std::numeric_limits<long long>::min();
+ } else {
+ invariant(originalType == TypeBits::kDecimal && version != KeyString::Version::V0);
+ const auto roundAwayFromZero = isNegative ? Decimal128::kRoundTowardNegative
+ : Decimal128::kRoundTowardPositive;
+ Decimal128 dec(bin, Decimal128::kRoundTo34Digits, roundAwayFromZero);
+ if (hasDecimalContinuation)
+ dec = readDecimalContinuation(reader, inverted, dec);
+ dec = adjustDecimalExponent(typeBits, dec);
+ *stream << dec;
+ }
+ break;
+ }
+
+ case CType::kNumericNegativeSmallMagnitude:
+ inverted = !inverted;
+ isNegative = true;
+ // fallthrough (format is the same as positive, but inverted)
+
+ case CType::kNumericPositiveSmallMagnitude: {
+ const uint8_t originalType = typeBits->readNumeric();
+ uint64_t encoded = readType<uint64_t>(reader, inverted);
+ encoded = endian::bigToNative(encoded);
+
+ if (version == KeyString::Version::V0) {
+ // for these, the raw double was stored intact, including sign bit.
+ invariant(originalType == TypeBits::kDouble);
+ double d;
+ memcpy(&d, &encoded, sizeof(d));
+ *stream << d;
+ break;
}
+ switch (encoded >> 62) {
+ case 0x0: {
+ // Teeny tiny decimal, smaller magnitude than 2**(-1074)
+ uint64_t lowbits = readType<uint64_t>(reader, inverted);
+ lowbits = endian::bigToNative(lowbits);
+ Decimal128 dec = Decimal128(Decimal128::Value{lowbits, encoded});
+ dec = adjustDecimalExponent(typeBits, dec);
+ if (ctype == CType::kNumericNegativeSmallMagnitude)
+ dec = dec.negate();
+ *stream << dec;
+ break;
+ }
+ case 0x1:
+ case 0x2: {
+ // Tiny double or decimal, magnitude from 2**(-1074) to 2**(-255), exclusive.
+ // The exponent is shifted by 256 in order to avoid subnormals, which would
+ // result in less than 15 significant digits. Because 2**(-255) has 179
+ // decimal digits, no doubles exactly equal decimals, so all decimals have
+ // a continuation. The bit is still needed for comparison purposes.
+ bool hasDecimalContinuation = encoded & 1;
+ encoded -= 1ULL << 62;
+ encoded >>= 1;
+ double scaledBin;
+ memcpy(&scaledBin, &encoded, sizeof(scaledBin));
+ if (originalType == TypeBits::kDouble) {
+ invariant(!hasDecimalContinuation);
+ double bin = scaledBin * kTinyDoubleExponentDownshiftFactor;
+ *stream << (isNegative ? -bin : bin);
+ break;
+ }
+ invariant(originalType == TypeBits::kDecimal && hasDecimalContinuation);
+
+ // If the actual double would be subnormal, scale in decimal domain.
+ Decimal128 dec;
+ if (scaledBin < DBL_MIN * kTinyDoubleExponentUpshiftFactor) {
+ // For conversion from binary->decimal scale away from zero,
+ // otherwise round toward. Needs to be done consistently in read/write.
+
+ Decimal128 scaledDec = Decimal128(scaledBin,
+ Decimal128::kRoundTo34Digits,
+ Decimal128::kRoundTowardPositive);
+ dec = scaledDec.multiply(kTinyDoubleExponentDownshiftFactorAsDecimal,
+ Decimal128::kRoundTowardZero);
+ } else {
+ double bin = scaledBin * kTinyDoubleExponentDownshiftFactor;
+ dec = Decimal128(
+ bin, Decimal128::kRoundTo34Digits, Decimal128::kRoundTowardPositive);
+ }
+
+ dec = readDecimalContinuation(reader, inverted, dec);
+ *stream << adjustDecimalExponent(typeBits, isNegative ? dec.negate() : dec);
+ break;
+ }
+ case 0x3: {
+ // Small double, 2**(-255) or more in magnitude. Common case.
+ auto dcm = static_cast<KeyString::DecimalContinuationMarker>(encoded & 3);
+ encoded >>= 2;
+ double bin;
+ memcpy(&bin, &encoded, sizeof(bin));
+ if (originalType == TypeBits::kDouble) {
+ invariant(dcm == KeyString::kDCMEqualToDouble);
+ *stream << (isNegative ? -bin : bin);
+ break;
+ }
+
+ // Deal with decimal cases
+ invariant(originalType == TypeBits::kDecimal);
+ Decimal128 dec;
+ switch (dcm) {
+ case KeyString::kDCMEqualToDoubleRoundedUpTo15Digits:
+ dec = Decimal128(bin,
+ Decimal128::kRoundTo15Digits,
+ Decimal128::kRoundTowardPositive);
+ break;
+ case KeyString::kDCMEqualToDouble:
+ dec = Decimal128(bin,
+ Decimal128::kRoundTo34Digits,
+ Decimal128::kRoundTowardPositive);
+ break;
+ case KeyString::kDCMHasContinuationLessThanDoubleRoundedUpTo15Digits:
+ case KeyString::kDCMHasContinuationLargerThanDoubleRoundedUpTo15Digits:
+ // Deal with decimal continuation
+ dec = Decimal128(bin,
+ Decimal128::kRoundTo34Digits,
+ Decimal128::kRoundTowardPositive);
+ dec = readDecimalContinuation(reader, inverted, dec);
+ }
+ *stream << adjustDecimalExponent(typeBits, isNegative ? dec.negate() : dec);
+ break;
+ }
+ default:
+ MONGO_UNREACHABLE;
+ }
break;
}
@@ -1018,7 +1523,7 @@ void toBsonValue(uint8_t ctype,
}
const bool haveFractionalPart = (encodedIntegerPart & 1);
- long long integerPart = encodedIntegerPart >> 1;
+ int64_t integerPart = encodedIntegerPart >> 1;
if (!haveFractionalPart) {
if (isNegative)
@@ -1032,25 +1537,30 @@ void toBsonValue(uint8_t ctype,
*stream << int(integerPart);
break;
case TypeBits::kLong:
- *stream << integerPart;
+ *stream << static_cast<long long>(integerPart);
+ break;
+ case TypeBits::kDecimal:
+ *stream << adjustDecimalExponent(typeBits, Decimal128(integerPart));
break;
- case TypeBits::kNegativeZero:
- invariant(false);
+ default:
+ MONGO_UNREACHABLE;
}
- } else {
- // Nothing else can have a fractional part.
- invariant(originalType == TypeBits::kDouble);
+ break;
+ }
+ // KeyString V0: anything fractional is a double
+ if (version == KeyString::Version::V0) {
+ invariant(originalType == TypeBits::kDouble);
const uint64_t exponent = (64 - countLeadingZeros64(integerPart)) - 1;
const size_t fractionalBits = (52 - exponent);
const size_t fractionalBytes = (fractionalBits + 7) / 8;
// build up the bits of a double here.
uint64_t doubleBits = integerPart << fractionalBits;
- doubleBits &= ~(1ull << 52); // clear implicit leading 1
+ doubleBits &= ~(1ULL << 52); // clear implicit leading 1
doubleBits |= (exponent + 1023 /*bias*/) << 52;
if (isNegative) {
- doubleBits |= (1ull << 63); // sign bit
+ doubleBits |= (1ULL << 63); // sign bit
}
for (size_t i = 0; i < fractionalBytes; i++) {
// fold in the fractional bytes
@@ -1061,14 +1571,104 @@ void toBsonValue(uint8_t ctype,
double number;
memcpy(&number, &doubleBits, sizeof(number));
*stream << number;
+ break;
+ }
+
+ // KeyString V1: all numeric values with fractions have at least 8 bytes.
+ // Start with integer part, and read until we have a full 8 bytes worth of data.
+ const size_t fracBytes = 8 - CType::numBytesForInt(ctype);
+ uint64_t encodedFraction = integerPart;
+
+ for (int fracBytesRemaining = fracBytes; fracBytesRemaining; fracBytesRemaining--)
+ encodedFraction = (encodedFraction << 8) | readType<uint8_t>(reader, inverted);
+
+ // Zero out the DCM and convert the whole binary fraction
+ double bin = static_cast<double>(encodedFraction & ~3ULL) * kInvPow256[fracBytes];
+ if (originalType == TypeBits::kDouble) {
+ *stream << (isNegative ? -bin : bin);
+ break;
}
+ // The two lsb's are the DCM, except for the 8-byte case, where it's already known
+ KeyString::DecimalContinuationMarker dcm = fracBytes
+ ? static_cast<KeyString::DecimalContinuationMarker>(encodedFraction & 3)
+ : KeyString::kDCMHasContinuationLargerThanDoubleRoundedUpTo15Digits;
+
+ // Deal with decimal cases
+ invariant(originalType == TypeBits::kDecimal);
+ Decimal128 dec;
+ switch (dcm) {
+ case KeyString::kDCMEqualToDoubleRoundedUpTo15Digits:
+ dec = Decimal128(
+ bin, Decimal128::kRoundTo15Digits, Decimal128::kRoundTowardPositive);
+ break;
+ case KeyString::kDCMEqualToDouble:
+ dec = Decimal128(
+ bin, Decimal128::kRoundTo34Digits, Decimal128::kRoundTowardPositive);
+ break;
+ default:
+ // Deal with decimal continuation
+ dec = integerPart > kMaxIntForDouble
+ ? Decimal128(integerPart)
+ : Decimal128(
+ bin, Decimal128::kRoundTo34Digits, Decimal128::kRoundTowardPositive);
+ dec = readDecimalContinuation(reader, inverted, dec);
+ }
+ *stream << adjustDecimalExponent(typeBits, isNegative ? dec.negate() : dec);
break;
}
default:
invariant(false);
}
}
+
+
+Decimal128 adjustDecimalExponent(TypeBits::Reader* typeBits, Decimal128 num) {
+ // The last 6 bits of the exponent are stored in the type bits. First figure out if the exponent
+ // of 'num' is too high or too low. Even for a non-zero number with only a single significant
+ // digit, there are only 34 possiblities while exponents with the given low 6 bits are spaced
+ // (1 << 6) == 64 apart. This is not quite enough to figure out whether to shift the expnent up
+ // or down when the difference is for example 32 in either direction. However, if the high part
+ // of the coefficient is zero, the coefficient can only be scaled down by up to 1E19 (increasing
+ // the exponent by 19), as 2**64 < 1E20. If scaling down to match the higher exponent isn't
+ // possible, we must be able to scale up. Scaling always must be exact and not change the value.
+ const uint32_t kMaxExpAdjust = 33;
+ const uint32_t kMaxExpIncrementForZeroHighCoefficient = 19;
+ dassert(!num.isZero());
+ const uint32_t origExp = num.getBiasedExponent();
+ const uint8_t storedBits = typeBits->readDecimalExponent();
+
+ uint32_t highExp = (origExp & ~KeyString::TypeBits::kStoredDecimalExponentMask) | storedBits;
+
+ // Start by determining an exponent that's not less than num's and matches the stored bits.
+ if (highExp < origExp)
+ highExp += (1U << KeyString::TypeBits::kStoredDecimalExponentBits);
+
+ // This must be the right exponent, as no scaling is required.
+ if (highExp == origExp)
+ return num;
+
+ // For increasing the exponent, quantize the existing number. This must be
+ // exact, as the value stays in the same cohort.
+ if (highExp <= origExp + kMaxExpAdjust &&
+ (num.getCoefficientHigh() != 0 ||
+ highExp <= origExp + kMaxExpIncrementForZeroHighCoefficient)) {
+ // Increase exponent and decrease (right shift) coefficient.
+ uint32_t flags = Decimal128::SignalingFlag::kNoFlag;
+ auto quantized = num.quantize(Decimal128(0, highExp, 0, 1), &flags);
+ invariant(flags == Decimal128::SignalingFlag::kNoFlag); // must be exact
+ num = quantized;
+ } else {
+ // Decrease exponent and increase (left shift) coefficient.
+ uint32_t lowExp = highExp - (1U << KeyString::TypeBits::kStoredDecimalExponentBits);
+ invariant(lowExp >= origExp - kMaxExpAdjust);
+ num = num.add(Decimal128(0, lowExp, 0, 0));
+ }
+ dassert((num.getBiasedExponent() & KeyString::TypeBits::kStoredDecimalExponentMask) ==
+ (highExp & KeyString::TypeBits::kStoredDecimalExponentMask));
+ return num;
+}
+
} // namespace
BSONObj KeyString::toBson(const char* buffer, size_t len, Ordering ord, const TypeBits& typeBits) {
@@ -1088,7 +1688,7 @@ BSONObj KeyString::toBson(const char* buffer, size_t len, Ordering ord, const Ty
if (ctype == kEnd)
break;
- toBsonValue(ctype, &reader, &typeBitsReader, invert, &(builder << ""));
+ toBsonValue(ctype, &reader, &typeBitsReader, invert, typeBits.version, &(builder << ""));
}
return builder.obj();
}
@@ -1201,6 +1801,52 @@ void KeyString::TypeBits::appendBit(uint8_t oneOrZero) {
_curBit++;
}
+void KeyString::TypeBits::appendZero(uint8_t zeroType) {
+ switch (zeroType) {
+ // 2-bit encodings
+ case kInt:
+ case kDouble:
+ case kLong:
+ appendBit(zeroType >> 1);
+ appendBit(zeroType & 1);
+ break;
+ case kNegativeDoubleZero:
+ if (version == Version::V0) {
+ appendBit(kV0NegativeDoubleZero >> 1);
+ appendBit(kV0NegativeDoubleZero & 1);
+ break;
+ }
+ zeroType = kV1NegativeDoubleZero;
+ // fallthrough for 5-bit encodings
+ case kDecimalZero0xxx:
+ case kDecimalZero1xxx:
+ case kDecimalZero2xxx:
+ case kDecimalZero3xxx:
+ case kDecimalZero4xxx:
+ case kDecimalZero5xxx:
+ // first two bits output are ones
+ dassert((zeroType >> 3) == 3);
+ for (int bitPos = 4; bitPos >= 0; bitPos--)
+ appendBit((zeroType >> bitPos) & 1);
+ break;
+ default:
+ MONGO_UNREACHABLE;
+ }
+}
+
+void KeyString::TypeBits::appendDecimalZero(uint32_t whichZero) {
+ invariant((whichZero >> 12) <= kDecimalZero5xxx - kDecimalZero0xxx);
+ appendZero((whichZero >> 12) + kDecimalZero0xxx);
+ for (int bitPos = 11; bitPos >= 0; bitPos--)
+ appendBit((whichZero >> bitPos) & 1);
+}
+
+void KeyString::TypeBits::appendDecimalExponent(uint8_t storedExponentBits) {
+ invariant(storedExponentBits < (1U << kStoredDecimalExponentBits));
+ for (int bitPos = kStoredDecimalExponentBits - 1; bitPos >= 0; bitPos--)
+ appendBit((storedExponentBits >> bitPos) & 1);
+}
+
uint8_t KeyString::TypeBits::Reader::readBit() {
if (_typeBits._isAllZeros)
return 0;
@@ -1214,4 +1860,32 @@ uint8_t KeyString::TypeBits::Reader::readBit() {
return (_typeBits._buf[byte] & (1 << offsetInByte)) ? 1 : 0;
}
+uint8_t KeyString::TypeBits::Reader::readZero() {
+ uint8_t res = readNumeric();
+
+ // For keyString v1, negative and decimal zeros require at least 3 more bits.
+ if (_typeBits.version != Version::V0 && res == kSpecialZeroPrefix) {
+ res = (res << 1) | readBit();
+ res = (res << 1) | readBit();
+ res = (res << 1) | readBit();
+ }
+ if (res == kV1NegativeDoubleZero || res == kV0NegativeDoubleZero)
+ res = kNegativeDoubleZero;
+ return res;
+}
+
+uint32_t KeyString::TypeBits::Reader::readDecimalZero(uint8_t zeroType) {
+ uint32_t whichZero = zeroType - TypeBits::kDecimalZero0xxx;
+ for (int bitPos = 11; bitPos >= 0; bitPos--)
+ whichZero = (whichZero << 1) | readBit();
+
+ return whichZero;
+}
+
+uint8_t KeyString::TypeBits::Reader::readDecimalExponent() {
+ uint8_t exponentBits = 0;
+ for (int bitPos = kStoredDecimalExponentBits - 1; bitPos >= 0; bitPos--)
+ exponentBits = (exponentBits << 1) | readBit();
+ return exponentBits;
+}
} // namespace mongo
diff --git a/src/mongo/db/storage/key_string.h b/src/mongo/db/storage/key_string.h
index 02302498b0d..5b3d256f0d4 100644
--- a/src/mongo/db/storage/key_string.h
+++ b/src/mongo/db/storage/key_string.h
@@ -30,18 +30,29 @@
#pragma once
+#include <limits>
+
+#include "mongo/bson/bsonmisc.h"
#include "mongo/bson/bsonobj.h"
#include "mongo/bson/bsonobjbuilder.h"
-#include "mongo/bson/bsonmisc.h"
-#include "mongo/bson/timestamp.h"
#include "mongo/bson/ordering.h"
+#include "mongo/bson/timestamp.h"
#include "mongo/db/record_id.h"
+#include "mongo/platform/decimal128.h"
namespace mongo {
class KeyString {
public:
/**
+ * Selects version of KeyString to use. V0 and V1 differ in their encoding of numeric values.
+ */
+ enum class Version : uint8_t { V0 = 0, V1 = 1 };
+ static StringData versionToString(Version version) {
+ return version == Version::V0 ? "V0" : "V1";
+ }
+
+ /**
* Encodes info needed to restore the original BSONTypes from a KeyString. They cannot be
* stored in place since we don't want them to affect the ordering (1 and 1.0 compare as
* equal).
@@ -51,8 +62,17 @@ public:
// Sufficient bytes to encode extra type information for any BSON key that fits in 1KB.
// The encoding format will need to change if we raise this limit.
static const uint8_t kMaxBytesNeeded = 127;
-
- TypeBits() {
+ static const uint32_t kMaxKeyBytes = 1024;
+ static const uint32_t kMaxTypeBitsPerDecimal = 17;
+ static const uint32_t kBytesForTypeAndEmptyKey = 2;
+ static const uint32_t kMaxDecimalsPerKey =
+ kMaxKeyBytes / (sizeof(Decimal128::Value) + kBytesForTypeAndEmptyKey);
+ static_assert(kMaxTypeBitsPerDecimal * kMaxDecimalsPerKey < kMaxBytesNeeded * 8UL,
+ "encoding needs change to contain all type bits for worst case key");
+ static const uint8_t kStoredDecimalExponentBits = 6;
+ static const uint32_t kStoredDecimalExponentMask = (1U << kStoredDecimalExponentBits) - 1;
+
+ explicit TypeBits(Version version) : version(version) {
reset();
}
@@ -61,8 +81,8 @@ public:
* BufReader in the format described on the getBuffer() method.
*/
void resetFromBuffer(BufReader* reader);
- static TypeBits fromBuffer(BufReader* reader) {
- TypeBits out;
+ static TypeBits fromBuffer(Version version, BufReader* reader) {
+ TypeBits out(version);
out.resetFromBuffer(reader);
return out;
}
@@ -124,9 +144,26 @@ public:
static const uint8_t kSymbol = 0x1;
static const uint8_t kInt = 0x0;
- static const uint8_t kDouble = 0x1;
- static const uint8_t kLong = 0x2;
- static const uint8_t kNegativeZero = 0x3; // decodes as a double
+ static const uint8_t kLong = 0x1;
+ static const uint8_t kDouble = 0x2;
+ static const uint8_t kDecimal = 0x3; // indicates 6 more bits of typeinfo follow.
+ static const uint8_t kSpecialZeroPrefix = 0x3; // kNumericZero case, 3 more bits follow.
+ static const uint8_t kNegativeDoubleZero = 0x3; // normalized -0.0 double, either V0 or V1.
+ static const uint8_t kV0NegativeDoubleZero = 0x3; // legacy encoding for V0
+
+ // The following describe the initial 5 type bits for kNegativeOrDecimalZero. These bits
+ // encode double -0 or a 3-bit prefix (range 0 to 5) of the 15-bit decimal zero type.
+ static const uint8_t kV1NegativeDoubleZero = 0x18; // 0b11000
+
+ static const uint8_t kUnusedEncoding = 0x19; // 0b11001
+
+ // There are 6 * (1<<12) == 2 * (kMaxBiasedExponent + 1) == 24576 decimal zeros.
+ static const uint8_t kDecimalZero0xxx = 0x1a; // 0b11010 12 more exponent bits follow
+ static const uint8_t kDecimalZero1xxx = 0x1b; // 0b11011
+ static const uint8_t kDecimalZero2xxx = 0x1c; // 0b11100
+ static const uint8_t kDecimalZero3xxx = 0x1d; // 0b11101
+ static const uint8_t kDecimalZero4xxx = 0x1e; // 0b11110
+ static const uint8_t kDecimalZero5xxx = 0x1f; // 0b11111
void reset() {
_curBit = 0;
@@ -143,21 +180,24 @@ public:
}
void appendNumberDouble() {
- appendBit(kDouble & 1);
appendBit(kDouble >> 1);
+ appendBit(kDouble & 1);
}
void appendNumberInt() {
- appendBit(kInt & 1);
appendBit(kInt >> 1);
+ appendBit(kInt & 1);
}
void appendNumberLong() {
- appendBit(kLong & 1);
appendBit(kLong >> 1);
+ appendBit(kLong & 1);
}
- void appendNegativeZero() {
- appendBit(kNegativeZero & 1);
- appendBit(kNegativeZero >> 1);
+ void appendNumberDecimal() {
+ appendBit(kDecimal >> 1);
+ appendBit(kDecimal & 1);
}
+ void appendZero(uint8_t zeroType);
+ void appendDecimalZero(uint32_t whichZero);
+ void appendDecimalExponent(uint8_t storedExponentBits);
class Reader {
public:
@@ -170,9 +210,17 @@ public:
return readBit();
}
uint8_t readNumeric() {
- uint8_t lowBit = readBit();
- return lowBit | (readBit() << 1);
+ uint8_t highBit = readBit();
+ return (highBit << 1) | readBit();
}
+ uint8_t readZero();
+
+ // Given a decimal zero type between kDecimalZero0xxx and kDecimal5xxx, read the
+ // remaining 12 bits and return which of the 24576 decimal zeros to produce.
+ uint32_t readDecimalZero(uint8_t zeroType);
+
+ // Reads the stored exponent bits of a non-zero decimal number.
+ uint8_t readDecimalExponent();
private:
uint8_t readBit();
@@ -181,6 +229,8 @@ public:
const TypeBits& _typeBits;
};
+ const Version version;
+
private:
/**
* size only includes data bytes, not the size byte itself.
@@ -211,17 +261,32 @@ public:
kExclusiveAfter,
};
- KeyString() {}
+ /**
+ * Encodes the kind of NumberDecimal that is stored.
+ */
+ enum DecimalContinuationMarker {
+ kDCMEqualToDouble = 0x0,
+ kDCMHasContinuationLessThanDoubleRoundedUpTo15Digits = 0x1,
+ kDCMEqualToDoubleRoundedUpTo15Digits = 0x2,
+ kDCMHasContinuationLargerThanDoubleRoundedUpTo15Digits = 0x3
+ };
+
+ explicit KeyString(Version version) : version(version), _typeBits(version) {}
- KeyString(const BSONObj& obj, Ordering ord, RecordId recordId) {
+ KeyString(Version version, const BSONObj& obj, Ordering ord, RecordId recordId)
+ : KeyString(version) {
resetToKey(obj, ord, recordId);
}
- KeyString(const BSONObj& obj, Ordering ord, Discriminator discriminator = kInclusive) {
+ KeyString(Version version,
+ const BSONObj& obj,
+ Ordering ord,
+ Discriminator discriminator = kInclusive)
+ : KeyString(version) {
resetToKey(obj, ord, discriminator);
}
- explicit KeyString(RecordId rid) {
+ KeyString(Version version, RecordId rid) : version(version), _typeBits(version) {
appendRecordId(rid);
}
@@ -278,6 +343,12 @@ public:
*/
std::string toString() const;
+ /**
+ * Version to use for conversion to/from KeyString. V1 has different encodings for numeric
+ * values.
+ */
+ const Version version;
+
private:
void _appendAllElementsForIndexing(const BSONObj& obj,
Ordering ord,
@@ -299,6 +370,7 @@ private:
void _appendNumberDouble(const double num, bool invert);
void _appendNumberLong(const long long num, bool invert);
void _appendNumberInt(const int num, bool invert);
+ void _appendNumberDecimal(const Decimal128 num, bool invert);
/**
* @param name - optional, can be NULL
@@ -309,11 +381,15 @@ private:
void _appendStringLike(StringData str, bool invert);
void _appendBson(const BSONObj& obj, bool invert);
- void _appendSmallDouble(double value, bool invert);
- void _appendLargeDouble(double value, bool invert);
+ void _appendSmallDouble(double value, DecimalContinuationMarker dcm, bool invert);
+ void _appendLargeDouble(double value, DecimalContinuationMarker dcm, bool invert);
void _appendInteger(const long long num, bool invert);
void _appendPreshiftedIntegerPortion(uint64_t value, bool isNegative, bool invert);
+ void _appendDoubleWithoutTypeBits(const double num, DecimalContinuationMarker dcm, bool invert);
+ void _appendHugeDecimalWithoutTypeBits(const Decimal128 dec, bool invert);
+ void _appendTinyDecimalWithoutTypeBits(const Decimal128 dec, const double bin, bool invert);
+
template <typename T>
void _append(const T& thing, bool invert);
void _appendBytes(const void* source, size_t bytes, bool invert);
diff --git a/src/mongo/db/storage/key_string_test.cpp b/src/mongo/db/storage/key_string_test.cpp
index 471d365f012..07c8f357058 100644
--- a/src/mongo/db/storage/key_string_test.cpp
+++ b/src/mongo/db/storage/key_string_test.cpp
@@ -30,15 +30,24 @@
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kStorage
+#include "mongo/platform/basic.h"
+
+#include <algorithm>
#include <cmath>
+#include <limits>
+#include <random>
+#include <typeinfo>
+#include <vector>
-#include "mongo/platform/basic.h"
+#include "mongo/base/owned_pointer_vector.h"
#include "mongo/config.h"
#include "mongo/db/storage/key_string.h"
+#include "mongo/platform/decimal128.h"
+#include "mongo/stdx/functional.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/hex.h"
#include "mongo/util/log.h"
-#include "mongo/base/owned_pointer_vector.h"
+#include "mongo/util/timer.h"
using std::string;
using namespace mongo;
@@ -51,68 +60,93 @@ Ordering ALL_ASCENDING = Ordering::make(BSONObj());
Ordering ONE_ASCENDING = Ordering::make(BSON("a" << 1));
Ordering ONE_DESCENDING = Ordering::make(BSON("a" << -1));
-TEST(KeyStringTest, Simple1) {
+class KeyStringTest : public mongo::unittest::Test {
+public:
+ void run() {
+ auto base = static_cast<mongo::unittest::Test*>(this);
+ try {
+ version = KeyString::Version::V0;
+ base->run();
+ version = KeyString::Version::V1;
+ base->run();
+ } catch (...) {
+ log() << "exception while testing KeyString version "
+ << mongo::KeyString::versionToString(version);
+ throw;
+ }
+ }
+
+protected:
+ KeyString::Version version;
+};
+
+
+TEST_F(KeyStringTest, Simple1) {
BSONObj a = BSON("" << 5);
BSONObj b = BSON("" << 6);
ASSERT_LESS_THAN(a, b);
- ASSERT_LESS_THAN(KeyString(a, ALL_ASCENDING, RecordId()),
- KeyString(b, ALL_ASCENDING, RecordId()));
+ ASSERT_LESS_THAN(KeyString(version, a, ALL_ASCENDING, RecordId()),
+ KeyString(version, b, ALL_ASCENDING, RecordId()));
}
-#define ROUNDTRIP_ORDER(x, order) \
+#define ROUNDTRIP_ORDER(version, x, order) \
do { \
const BSONObj _orig = x; \
- const KeyString _ks(_orig, order); \
+ const KeyString _ks(version, _orig, order); \
const BSONObj _converted = toBson(_ks, order); \
ASSERT_EQ(_converted, _orig); \
ASSERT(_converted.binaryEqual(_orig)); \
} while (0)
-#define ROUNDTRIP(x) \
- do { \
- ROUNDTRIP_ORDER(x, ALL_ASCENDING); \
- ROUNDTRIP_ORDER(x, ONE_DESCENDING); \
+#define ROUNDTRIP(version, x) \
+ do { \
+ ROUNDTRIP_ORDER(version, x, ALL_ASCENDING); \
+ ROUNDTRIP_ORDER(version, x, ONE_DESCENDING); \
} while (0)
-#define COMPARES_SAME(_x, _y) \
- do { \
- KeyString _xKS(_x, ONE_ASCENDING); \
- KeyString _yKS(_y, ONE_ASCENDING); \
- if (_x == _y) { \
- ASSERT_EQUALS(_xKS, _yKS); \
- } else if (_x < _y) { \
- ASSERT_LESS_THAN(_xKS, _yKS); \
- } else { \
- ASSERT_LESS_THAN(_yKS, _xKS); \
- } \
- \
- _xKS.resetToKey(_x, ONE_DESCENDING); \
- _yKS.resetToKey(_y, ONE_DESCENDING); \
- if (_x == _y) { \
- ASSERT_EQUALS(_xKS, _yKS); \
- } else if (_x < _y) { \
- ASSERT_GREATER_THAN(_xKS, _yKS); \
- } else { \
- ASSERT_GREATER_THAN(_yKS, _xKS); \
- } \
+#define COMPARES_SAME(_v, _x, _y) \
+ do { \
+ KeyString _xKS(_v, _x, ONE_ASCENDING); \
+ KeyString _yKS(_v, _y, ONE_ASCENDING); \
+ if (_x == _y) { \
+ ASSERT_EQUALS(_xKS, _yKS); \
+ } else if (_x < _y) { \
+ ASSERT_LESS_THAN(_xKS, _yKS); \
+ } else { \
+ ASSERT_LESS_THAN(_yKS, _xKS); \
+ } \
+ \
+ _xKS.resetToKey(_x, ONE_DESCENDING); \
+ _yKS.resetToKey(_y, ONE_DESCENDING); \
+ if (_x == _y) { \
+ ASSERT_EQUALS(_xKS, _yKS); \
+ } else if (_x < _y) { \
+ ASSERT_GREATER_THAN(_xKS, _yKS); \
+ } else { \
+ ASSERT_GREATER_THAN(_yKS, _xKS); \
+ } \
} while (0)
-TEST(KeyStringTest, ActualBytesDouble) {
+TEST_F(KeyStringTest, ActualBytesDouble) {
// just one test like this for utter sanity
BSONObj a = BSON("" << 5.5);
- KeyString ks(a, ALL_ASCENDING);
- log() << "size: " << ks.getSize() << " hex [" << toHex(ks.getBuffer(), ks.getSize()) << "]";
+ KeyString ks(version, a, ALL_ASCENDING);
+ log() << KeyString::versionToString(version) << " size: " << ks.getSize() << " hex ["
+ << toHex(ks.getBuffer(), ks.getSize()) << "]";
ASSERT_EQUALS(10U, ks.getSize());
- string hex =
- "2B" // kNumericPositive1ByteInt
- "0B" // (5 << 1) | 1
- "02000000000000" // fractional bytes of double
- "04"; // kEnd
+ string hex = version == KeyString::Version::V0 ? "2B" // kNumericPositive1ByteInt
+ "0B" // (5 << 1) | 1
+ "02000000000000" // fractional bytes of double
+ "04" // kEnd
+ : "2B" // kNumericPositive1ByteInt
+ "0B" // (5 << 1) | 1
+ "80000000000000" // fractional bytes
+ "04"; // kEnd
ASSERT_EQUALS(hex, toHex(ks.getBuffer(), ks.getSize()));
@@ -133,151 +167,222 @@ TEST(KeyStringTest, ActualBytesDouble) {
ASSERT_EQUALS(hexFlipped, toHex(ks.getBuffer(), ks.getSize()));
}
-TEST(KeyStringTest, AllTypesSimple) {
- ROUNDTRIP(BSON("" << 5.5));
- ROUNDTRIP(BSON(""
+TEST_F(KeyStringTest, AllTypesSimple) {
+ ROUNDTRIP(version, BSON("" << 5.5));
+ ROUNDTRIP(version,
+ BSON(""
<< "abc"));
- ROUNDTRIP(BSON("" << BSON("a" << 5)));
- ROUNDTRIP(BSON("" << BSON_ARRAY("a" << 5)));
- ROUNDTRIP(BSON("" << BSONBinData("abc", 3, bdtCustom)));
- ROUNDTRIP(BSON("" << BSONUndefined));
- ROUNDTRIP(BSON("" << OID("abcdefabcdefabcdefabcdef")));
- ROUNDTRIP(BSON("" << true));
- ROUNDTRIP(BSON("" << Date_t::fromMillisSinceEpoch(123123123)));
- ROUNDTRIP(BSON("" << BSONRegEx("asdf", "x")));
- ROUNDTRIP(BSON("" << BSONDBRef("db.c", OID("010203040506070809101112"))));
- ROUNDTRIP(BSON("" << BSONCode("abc_code")));
- ROUNDTRIP(BSON("" << BSONCodeWScope("def_code",
+ ROUNDTRIP(version, BSON("" << BSON("a" << 5)));
+ ROUNDTRIP(version, BSON("" << BSON_ARRAY("a" << 5)));
+ ROUNDTRIP(version, BSON("" << BSONBinData("abc", 3, bdtCustom)));
+ ROUNDTRIP(version, BSON("" << BSONUndefined));
+ ROUNDTRIP(version, BSON("" << OID("abcdefabcdefabcdefabcdef")));
+ ROUNDTRIP(version, BSON("" << true));
+ ROUNDTRIP(version, BSON("" << Date_t::fromMillisSinceEpoch(123123123)));
+ ROUNDTRIP(version, BSON("" << BSONRegEx("asdf", "x")));
+ ROUNDTRIP(version, BSON("" << BSONDBRef("db.c", OID("010203040506070809101112"))));
+ ROUNDTRIP(version, BSON("" << BSONCode("abc_code")));
+ ROUNDTRIP(version,
+ BSON("" << BSONCodeWScope("def_code",
BSON("x_scope"
<< "a"))));
- ROUNDTRIP(BSON("" << 5));
- ROUNDTRIP(BSON("" << Timestamp(123123, 123)));
- ROUNDTRIP(BSON("" << Timestamp(~0U, 3)));
- ROUNDTRIP(BSON("" << 1235123123123LL));
+ ROUNDTRIP(version, BSON("" << 5));
+ ROUNDTRIP(version, BSON("" << Timestamp(123123, 123)));
+ ROUNDTRIP(version, BSON("" << Timestamp(~0U, 3)));
+ ROUNDTRIP(version, BSON("" << 1235123123123LL));
}
-TEST(KeyStringTest, Array1) {
+TEST_F(KeyStringTest, Array1) {
BSONObj emptyArray = BSON("" << BSONArray());
ASSERT_EQUALS(Array, emptyArray.firstElement().type());
- ROUNDTRIP(emptyArray);
- ROUNDTRIP(BSON("" << BSON_ARRAY(emptyArray.firstElement())));
- ROUNDTRIP(BSON("" << BSON_ARRAY(1)));
- ROUNDTRIP(BSON("" << BSON_ARRAY(1 << 2)));
- ROUNDTRIP(BSON("" << BSON_ARRAY(1 << 2 << 3)));
+ ROUNDTRIP(version, emptyArray);
+ ROUNDTRIP(version, BSON("" << BSON_ARRAY(emptyArray.firstElement())));
+ ROUNDTRIP(version, BSON("" << BSON_ARRAY(1)));
+ ROUNDTRIP(version, BSON("" << BSON_ARRAY(1 << 2)));
+ ROUNDTRIP(version, BSON("" << BSON_ARRAY(1 << 2 << 3)));
{
- KeyString a(emptyArray, ALL_ASCENDING, RecordId::min());
- KeyString b(emptyArray, ALL_ASCENDING, RecordId(5));
+ KeyString a(version, emptyArray, ALL_ASCENDING, RecordId::min());
+ KeyString b(version, emptyArray, ALL_ASCENDING, RecordId(5));
ASSERT_LESS_THAN(a, b);
}
{
- KeyString a(emptyArray, ALL_ASCENDING, RecordId(0));
- KeyString b(emptyArray, ALL_ASCENDING, RecordId(5));
+ KeyString a(version, emptyArray, ALL_ASCENDING, RecordId(0));
+ KeyString b(version, emptyArray, ALL_ASCENDING, RecordId(5));
ASSERT_LESS_THAN(a, b);
}
}
-TEST(KeyStringTest, SubDoc1) {
- ROUNDTRIP(BSON("" << BSON("foo" << 2)));
- ROUNDTRIP(BSON("" << BSON("foo" << 2 << "bar"
+TEST_F(KeyStringTest, SubDoc1) {
+ ROUNDTRIP(version, BSON("" << BSON("foo" << 2)));
+ ROUNDTRIP(version,
+ BSON("" << BSON("foo" << 2 << "bar"
<< "asd")));
- ROUNDTRIP(BSON("" << BSON("foo" << BSON_ARRAY(2 << 4))));
+ ROUNDTRIP(version, BSON("" << BSON("foo" << BSON_ARRAY(2 << 4))));
}
-TEST(KeyStringTest, SubDoc2) {
+TEST_F(KeyStringTest, SubDoc2) {
BSONObj a = BSON("" << BSON("a"
<< "foo"));
BSONObj b = BSON("" << BSON("b" << 5.5));
BSONObj c = BSON("" << BSON("c" << BSON("x" << 5)));
- ROUNDTRIP(a);
- ROUNDTRIP(b);
- ROUNDTRIP(c);
+ ROUNDTRIP(version, a);
+ ROUNDTRIP(version, b);
+ ROUNDTRIP(version, c);
- COMPARES_SAME(a, b);
- COMPARES_SAME(a, c);
- COMPARES_SAME(b, c);
+ COMPARES_SAME(version, a, b);
+ COMPARES_SAME(version, a, c);
+ COMPARES_SAME(version, b, c);
}
-TEST(KeyStringTest, Compound1) {
- ROUNDTRIP(BSON("" << BSON("a" << 5) << "" << 1));
- ROUNDTRIP(BSON("" << BSON("" << 5) << "" << 1));
+TEST_F(KeyStringTest, Compound1) {
+ ROUNDTRIP(version, BSON("" << BSON("a" << 5) << "" << 1));
+ ROUNDTRIP(version, BSON("" << BSON("" << 5) << "" << 1));
}
-TEST(KeyStringTest, Undef1) {
- ROUNDTRIP(BSON("" << BSONUndefined));
+TEST_F(KeyStringTest, Undef1) {
+ ROUNDTRIP(version, BSON("" << BSONUndefined));
}
-TEST(KeyStringTest, NumberLong0) {
+TEST_F(KeyStringTest, NumberLong0) {
double d = (1ll << 52) - 1;
long long ll = static_cast<long long>(d);
double d2 = static_cast<double>(ll);
ASSERT_EQUALS(d, d2);
}
-TEST(KeyStringTest, NumbersNearInt32Max) {
+TEST_F(KeyStringTest, NumbersNearInt32Max) {
int64_t start = std::numeric_limits<int32_t>::max();
for (int64_t i = -1000; i < 1000; i++) {
long long toTest = start + i;
- ROUNDTRIP(BSON("" << toTest));
- ROUNDTRIP(BSON("" << static_cast<int>(toTest)));
- ROUNDTRIP(BSON("" << static_cast<double>(toTest)));
+ ROUNDTRIP(version, BSON("" << toTest));
+ ROUNDTRIP(version, BSON("" << static_cast<int>(toTest)));
+ ROUNDTRIP(version, BSON("" << static_cast<double>(toTest)));
+ }
+}
+
+TEST_F(KeyStringTest, DecimalNumbers) {
+ if (version == KeyString::Version::V0) {
+ log() << "not testing DecimalNumbers for KeyString V0";
+ return;
}
+
+ const auto V1 = KeyString::Version::V1;
+
+ // Zeros
+ ROUNDTRIP(V1, BSON("" << Decimal128("0")));
+ ROUNDTRIP(V1, BSON("" << Decimal128("0.0")));
+ ROUNDTRIP(V1, BSON("" << Decimal128("-0")));
+ ROUNDTRIP(V1, BSON("" << Decimal128("0E5000")));
+ ROUNDTRIP(V1, BSON("" << Decimal128("-0.0000E-6172")));
+
+ // Special numbers
+ ROUNDTRIP(V1, BSON("" << Decimal128("NaN")));
+ ROUNDTRIP(V1, BSON("" << Decimal128("+Inf")));
+ ROUNDTRIP(V1, BSON("" << Decimal128("-Inf")));
+
+ // Decimal representations of whole double numbers
+ ROUNDTRIP(V1, BSON("" << Decimal128("1")));
+ ROUNDTRIP(V1, BSON("" << Decimal128("2.0")));
+ ROUNDTRIP(V1, BSON("" << Decimal128("-2.0E1")));
+ ROUNDTRIP(V1, BSON("" << Decimal128("1234.56E15")));
+ ROUNDTRIP(V1, BSON("" << Decimal128("2.00000000000000000000000")));
+ ROUNDTRIP(V1, BSON("" << Decimal128("-9223372036854775808.00000000000000"))); // -2**63
+ ROUNDTRIP(V1, BSON("" << Decimal128("973555660975280180349468061728768E1"))); // 1.875 * 2**112
+
+ // Decimal representations of fractional double numbers
+ ROUNDTRIP(V1, BSON("" << Decimal128("1.25")));
+ ROUNDTRIP(V1, BSON("" << Decimal128("3.141592653584666550159454345703125")));
+ ROUNDTRIP(V1, BSON("" << Decimal128("-127.50")));
+
+ // Decimal representations of whole int64 non-double numbers
+ ROUNDTRIP(V1, BSON("" << Decimal128("243290200817664E4"))); // 20!
+ ROUNDTRIP(V1, BSON("" << Decimal128("9007199254740993"))); // 2**53 + 1
+ ROUNDTRIP(V1, BSON("" << Decimal128(std::numeric_limits<int64_t>::max())));
+ ROUNDTRIP(V1, BSON("" << Decimal128(std::numeric_limits<int64_t>::min())));
+
+ // Decimals in int64_t range without decimal or integer representation
+ ROUNDTRIP(V1, BSON("" << Decimal128("1.23")));
+ ROUNDTRIP(V1, BSON("" << Decimal128("-1.1")));
+ ROUNDTRIP(V1, BSON("" << Decimal128("-12345.60")));
+ ROUNDTRIP(V1, BSON("" << Decimal128("3.141592653589793238462643383279502")));
+ ROUNDTRIP(V1, BSON("" << Decimal128("-3.141592653589793115997963468544185")));
+
+ // Decimal representations of small double numbers
+ ROUNDTRIP(V1, BSON("" << Decimal128("0.50")));
+ ROUNDTRIP(V1,
+ BSON("" << Decimal128("-0.3552713678800500929355621337890625E-14"))); // -2**(-48)
+ ROUNDTRIP(V1,
+ BSON("" << Decimal128("-0.000000000000001234567890123456789012345678901234E-99")));
+
+ // Decimal representations of small decimals not representable as double
+ ROUNDTRIP(V1, BSON("" << Decimal128("0.02")));
+
+ // Large decimals
+ ROUNDTRIP(V1, BSON("" << Decimal128("1234567890123456789012345678901234E6000")));
+ ROUNDTRIP(V1,
+ BSON("" << Decimal128("-19950631168.80758384883742162683585E3000"))); // -2**10000
+
+ // Tiny, tiny decimals
+ ROUNDTRIP(V1,
+ BSON("" << Decimal128("0.2512388057698744585180135042133610E-6020"))); // 2**(-10000)
+ ROUNDTRIP(V1, BSON("" << Decimal128("4.940656458412465441765687928682213E-324") << "" << 1));
+ ROUNDTRIP(V1, BSON("" << Decimal128("-0.8289046058458094980903836776809409E-316")));
}
-TEST(KeyStringTest, LotsOfNumbers1) {
+TEST_F(KeyStringTest, LotsOfNumbers1) {
for (int i = 0; i < 64; i++) {
int64_t x = 1LL << i;
- ROUNDTRIP(BSON("" << static_cast<long long>(x)));
- ROUNDTRIP(BSON("" << static_cast<int>(x)));
- ROUNDTRIP(BSON("" << static_cast<double>(x)));
- ROUNDTRIP(BSON("" << (static_cast<double>(x) + .1)));
- ROUNDTRIP(BSON("" << (static_cast<double>(x) - .1)));
+ ROUNDTRIP(version, BSON("" << static_cast<long long>(x)));
+ ROUNDTRIP(version, BSON("" << static_cast<int>(x)));
+ ROUNDTRIP(version, BSON("" << static_cast<double>(x)));
+ ROUNDTRIP(version, BSON("" << (static_cast<double>(x) + .1)));
+ ROUNDTRIP(version, BSON("" << (static_cast<double>(x) - .1)));
- ROUNDTRIP(BSON("" << (static_cast<long long>(x) + 1)));
- ROUNDTRIP(BSON("" << (static_cast<int>(x) + 1)));
- ROUNDTRIP(BSON("" << (static_cast<double>(x) + 1)));
- ROUNDTRIP(BSON("" << (static_cast<double>(x) + 1.1)));
+ ROUNDTRIP(version, BSON("" << (static_cast<long long>(x) + 1)));
+ ROUNDTRIP(version, BSON("" << (static_cast<int>(x) + 1)));
+ ROUNDTRIP(version, BSON("" << (static_cast<double>(x) + 1)));
+ ROUNDTRIP(version, BSON("" << (static_cast<double>(x) + 1.1)));
// Avoid negating signed integral minima
if (i < 63)
- ROUNDTRIP(BSON("" << -static_cast<long long>(x)));
+ ROUNDTRIP(version, BSON("" << -static_cast<long long>(x)));
if (i < 31)
- ROUNDTRIP(BSON("" << -static_cast<int>(x)));
-
- ROUNDTRIP(BSON("" << -static_cast<double>(x)));
- ROUNDTRIP(BSON("" << -(static_cast<double>(x) + .1)));
-
- ROUNDTRIP(BSON("" << -(static_cast<long long>(x) + 1)));
- ROUNDTRIP(BSON("" << -(static_cast<int>(x) + 1)));
- ROUNDTRIP(BSON("" << -(static_cast<double>(x) + 1)));
- ROUNDTRIP(BSON("" << -(static_cast<double>(x) + 1.1)));
+ ROUNDTRIP(version, BSON("" << -static_cast<int>(x)));
+ ROUNDTRIP(version, BSON("" << -static_cast<double>(x)));
+ ROUNDTRIP(version, BSON("" << -(static_cast<double>(x) + .1)));
+
+ ROUNDTRIP(version, BSON("" << -(static_cast<long long>(x) + 1)));
+ ROUNDTRIP(version, BSON("" << -(static_cast<int>(x) + 1)));
+ ROUNDTRIP(version, BSON("" << -(static_cast<double>(x) + 1)));
+ ROUNDTRIP(version, BSON("" << -(static_cast<double>(x) + 1.1)));
}
}
-TEST(KeyStringTest, LotsOfNumbers2) {
+TEST_F(KeyStringTest, LotsOfNumbers2) {
for (double i = -1100; i < 1100; i++) {
double x = pow(2, i);
- ROUNDTRIP(BSON("" << x));
+ ROUNDTRIP(version, BSON("" << x));
}
for (double i = -1100; i < 1100; i++) {
double x = pow(2.1, i);
- ROUNDTRIP(BSON("" << x));
+ ROUNDTRIP(version, BSON("" << x));
}
}
-TEST(KeyStringTest, RecordIdOrder1) {
+TEST_F(KeyStringTest, RecordIdOrder1) {
Ordering ordering = Ordering::make(BSON("a" << 1));
- KeyString a(BSON("" << 5), ordering, RecordId::min());
- KeyString b(BSON("" << 5), ordering, RecordId(2));
- KeyString c(BSON("" << 5), ordering, RecordId(3));
- KeyString d(BSON("" << 6), ordering, RecordId());
- KeyString e(BSON("" << 6), ordering, RecordId(1));
+ KeyString a(version, BSON("" << 5), ordering, RecordId::min());
+ KeyString b(version, BSON("" << 5), ordering, RecordId(2));
+ KeyString c(version, BSON("" << 5), ordering, RecordId(3));
+ KeyString d(version, BSON("" << 6), ordering, RecordId());
+ KeyString e(version, BSON("" << 6), ordering, RecordId(1));
ASSERT_LESS_THAN(a, b);
ASSERT_LESS_THAN(b, c);
@@ -285,13 +390,13 @@ TEST(KeyStringTest, RecordIdOrder1) {
ASSERT_LESS_THAN(d, e);
}
-TEST(KeyStringTest, RecordIdOrder2) {
+TEST_F(KeyStringTest, RecordIdOrder2) {
Ordering ordering = Ordering::make(BSON("a" << -1 << "b" << -1));
- KeyString a(BSON("" << 5 << "" << 6), ordering, RecordId::min());
- KeyString b(BSON("" << 5 << "" << 6), ordering, RecordId(5));
- KeyString c(BSON("" << 5 << "" << 5), ordering, RecordId(4));
- KeyString d(BSON("" << 3 << "" << 4), ordering, RecordId(3));
+ KeyString a(version, BSON("" << 5 << "" << 6), ordering, RecordId::min());
+ KeyString b(version, BSON("" << 5 << "" << 6), ordering, RecordId(5));
+ KeyString c(version, BSON("" << 5 << "" << 5), ordering, RecordId(4));
+ KeyString d(version, BSON("" << 3 << "" << 4), ordering, RecordId(3));
ASSERT_LESS_THAN(a, b);
ASSERT_LESS_THAN(b, c);
@@ -301,19 +406,19 @@ TEST(KeyStringTest, RecordIdOrder2) {
ASSERT_LESS_THAN(b, d);
}
-TEST(KeyStringTest, RecordIdOrder2Double) {
+TEST_F(KeyStringTest, RecordIdOrder2Double) {
Ordering ordering = Ordering::make(BSON("a" << -1 << "b" << -1));
- KeyString a(BSON("" << 5.0 << "" << 6.0), ordering, RecordId::min());
- KeyString b(BSON("" << 5.0 << "" << 6.0), ordering, RecordId(5));
- KeyString c(BSON("" << 3.0 << "" << 4.0), ordering, RecordId(3));
+ KeyString a(version, BSON("" << 5.0 << "" << 6.0), ordering, RecordId::min());
+ KeyString b(version, BSON("" << 5.0 << "" << 6.0), ordering, RecordId(5));
+ KeyString c(version, BSON("" << 3.0 << "" << 4.0), ordering, RecordId(3));
ASSERT_LESS_THAN(a, b);
ASSERT_LESS_THAN(b, c);
ASSERT_LESS_THAN(a, c);
}
-TEST(KeyStringTest, Timestamp) {
+TEST_F(KeyStringTest, Timestamp) {
BSONObj a = BSON("" << Timestamp(0, 0));
BSONObj b = BSON("" << Timestamp(1234, 1));
BSONObj c = BSON("" << Timestamp(1234, 2));
@@ -321,19 +426,19 @@ TEST(KeyStringTest, Timestamp) {
BSONObj e = BSON("" << Timestamp(~0U, 0));
{
- ROUNDTRIP(a);
- ROUNDTRIP(b);
- ROUNDTRIP(c);
+ ROUNDTRIP(version, a);
+ ROUNDTRIP(version, b);
+ ROUNDTRIP(version, c);
ASSERT_LESS_THAN(a, b);
ASSERT_LESS_THAN(b, c);
ASSERT_LESS_THAN(c, d);
- KeyString ka(a, ALL_ASCENDING);
- KeyString kb(b, ALL_ASCENDING);
- KeyString kc(c, ALL_ASCENDING);
- KeyString kd(d, ALL_ASCENDING);
- KeyString ke(e, ALL_ASCENDING);
+ KeyString ka(version, a, ALL_ASCENDING);
+ KeyString kb(version, b, ALL_ASCENDING);
+ KeyString kc(version, c, ALL_ASCENDING);
+ KeyString kd(version, d, ALL_ASCENDING);
+ KeyString ke(version, e, ALL_ASCENDING);
ASSERT(ka.compare(kb) < 0);
ASSERT(kb.compare(kc) < 0);
@@ -344,18 +449,18 @@ TEST(KeyStringTest, Timestamp) {
{
Ordering ALL_ASCENDING = Ordering::make(BSON("a" << -1));
- ROUNDTRIP(a);
- ROUNDTRIP(b);
- ROUNDTRIP(c);
+ ROUNDTRIP(version, a);
+ ROUNDTRIP(version, b);
+ ROUNDTRIP(version, c);
ASSERT(d.woCompare(c, ALL_ASCENDING) < 0);
ASSERT(c.woCompare(b, ALL_ASCENDING) < 0);
ASSERT(b.woCompare(a, ALL_ASCENDING) < 0);
- KeyString ka(a, ALL_ASCENDING);
- KeyString kb(b, ALL_ASCENDING);
- KeyString kc(c, ALL_ASCENDING);
- KeyString kd(d, ALL_ASCENDING);
+ KeyString ka(version, a, ALL_ASCENDING);
+ KeyString kb(version, b, ALL_ASCENDING);
+ KeyString kc(version, c, ALL_ASCENDING);
+ KeyString kd(version, d, ALL_ASCENDING);
ASSERT(ka.compare(kb) > 0);
ASSERT(kb.compare(kc) > 0);
@@ -363,33 +468,26 @@ TEST(KeyStringTest, Timestamp) {
}
}
-TEST(KeyStringTest, AllTypesRoundtrip) {
+TEST_F(KeyStringTest, AllTypesRoundtrip) {
for (int i = 1; i <= JSTypeMax; i++) {
- // TODO: Currently KeyString does not support NumberDecimal
- // SERVER-19703
- if (i == NumberDecimal)
- continue;
{
BSONObjBuilder b;
b.appendMinForType("", i);
BSONObj o = b.obj();
- ROUNDTRIP(o);
+ ROUNDTRIP(version, o);
}
{
BSONObjBuilder b;
b.appendMaxForType("", i);
BSONObj o = b.obj();
- ROUNDTRIP(o);
+ ROUNDTRIP(version, o);
}
}
}
-const std::vector<BSONObj>& getInterestingElements() {
+const std::vector<BSONObj>& getInterestingElements(KeyString::Version version) {
static std::vector<BSONObj> elements;
-
- if (!elements.empty()) {
- return elements;
- }
+ elements.clear();
// These are used to test strings that include NUL bytes.
const StringData ball("ball", StringData::LiteralTag());
@@ -501,11 +599,36 @@ const std::vector<BSONObj>& getInterestingElements() {
if (powerOfTwo <= 52) { // is dNum - 0.5 representable?
elements.push_back(BSON("" << (dNum - 0.5)));
elements.push_back(BSON("" << -(dNum - 0.5)));
+ elements.push_back(BSON("" << (dNum - 0.1)));
+ elements.push_back(BSON("" << -(dNum - 0.1)));
}
if (powerOfTwo <= 51) { // is dNum + 0.5 representable?
elements.push_back(BSON("" << (dNum + 0.5)));
elements.push_back(BSON("" << -(dNum + 0.5)));
+ elements.push_back(BSON("" << (dNum + 0.1)));
+ elements.push_back(BSON("" << -(dNum + -.1)));
+ }
+
+ if (version != KeyString::Version::V0) {
+ const Decimal128 dec(static_cast<int64_t>(lNum));
+ const Decimal128 one("1");
+ const Decimal128 half("0.5");
+ const Decimal128 tenth("0.1");
+ elements.push_back(BSON("" << dec));
+ elements.push_back(BSON("" << dec.add(one)));
+ elements.push_back(BSON("" << dec.subtract(one)));
+ elements.push_back(BSON("" << dec.negate()));
+ elements.push_back(BSON("" << dec.add(one).negate()));
+ elements.push_back(BSON("" << dec.subtract(one).negate()));
+ elements.push_back(BSON("" << dec.subtract(half)));
+ elements.push_back(BSON("" << dec.subtract(half).negate()));
+ elements.push_back(BSON("" << dec.add(half)));
+ elements.push_back(BSON("" << dec.add(half).negate()));
+ elements.push_back(BSON("" << dec.subtract(tenth)));
+ elements.push_back(BSON("" << dec.subtract(tenth).negate()));
+ elements.push_back(BSON("" << dec.add(tenth)));
+ elements.push_back(BSON("" << dec.add(tenth).negate()));
}
}
@@ -541,10 +664,39 @@ const std::vector<BSONObj>& getInterestingElements() {
elements.push_back(BSON("" << closestBelow));
}
+ if (version != KeyString::Version::V0) {
+ // Numbers that are hard to round to between binary and decimal.
+ elements.push_back(BSON("" << 0.1));
+ elements.push_back(BSON("" << Decimal128("0.100000000")));
+ // Decimals closest to the double representation of 0.1.
+ elements.push_back(BSON("" << Decimal128("0.1000000000000000055511151231257827")));
+ elements.push_back(BSON("" << Decimal128("0.1000000000000000055511151231257828")));
+
+ // Numbers close to numerical underflow/overflow for double.
+ elements.push_back(BSON("" << Decimal128("1.797693134862315708145274237317044E308")));
+ elements.push_back(BSON("" << Decimal128("1.797693134862315708145274237317043E308")));
+ elements.push_back(BSON("" << Decimal128("-1.797693134862315708145274237317044E308")));
+ elements.push_back(BSON("" << Decimal128("-1.797693134862315708145274237317043E308")));
+ elements.push_back(BSON("" << Decimal128("9.881312916824930883531375857364427")));
+ elements.push_back(BSON("" << Decimal128("9.881312916824930883531375857364428")));
+ elements.push_back(BSON("" << Decimal128("-9.881312916824930883531375857364427")));
+ elements.push_back(BSON("" << Decimal128("-9.881312916824930883531375857364428")));
+ elements.push_back(BSON("" << Decimal128("4.940656458412465441765687928682213E-324")));
+ elements.push_back(BSON("" << Decimal128("4.940656458412465441765687928682214E-324")));
+ elements.push_back(BSON("" << Decimal128("-4.940656458412465441765687928682214E-324")));
+ elements.push_back(BSON("" << Decimal128("-4.940656458412465441765687928682213E-324")));
+ }
+
+ // Tricky double precision number for binary/decimal conversion: very close to a decimal
+ if (version != KeyString::Version::V0)
+ elements.push_back(BSON("" << Decimal128("3743626360493413E-165")));
+ elements.push_back(BSON("" << 3743626360493413E-165));
+
return elements;
}
-void testPermutation(const std::vector<BSONObj>& elementsOrig,
+void testPermutation(KeyString::Version version,
+ const std::vector<BSONObj>& elementsOrig,
const std::vector<BSONObj>& orderings,
bool debug) {
// Since KeyStrings are compared using memcmp we can assume it provides a total ordering such
@@ -563,12 +715,12 @@ void testPermutation(const std::vector<BSONObj>& elementsOrig,
const BSONObj& o1 = elements[i];
if (debug)
log() << "\to1: " << o1;
- ROUNDTRIP_ORDER(o1, ordering);
+ ROUNDTRIP_ORDER(version, o1, ordering);
- KeyString k1(o1, ordering);
+ KeyString k1(version, o1, ordering);
- KeyString l1(BSON("l" << o1.firstElement()), ordering); // kLess
- KeyString g1(BSON("g" << o1.firstElement()), ordering); // kGreater
+ KeyString l1(version, BSON("l" << o1.firstElement()), ordering); // kLess
+ KeyString g1(version, BSON("g" << o1.firstElement()), ordering); // kGreater
ASSERT_LT(l1, k1);
ASSERT_GT(g1, k1);
@@ -576,9 +728,9 @@ void testPermutation(const std::vector<BSONObj>& elementsOrig,
const BSONObj& o2 = elements[i + 1];
if (debug)
log() << "\t\t o2: " << o2;
- KeyString k2(o2, ordering);
- KeyString g2(BSON("g" << o2.firstElement()), ordering);
- KeyString l2(BSON("l" << o2.firstElement()), ordering);
+ KeyString k2(version, o2, ordering);
+ KeyString g2(version, BSON("g" << o2.firstElement()), ordering);
+ KeyString l2(version, BSON("l" << o2.firstElement()), ordering);
int bsonCmp = o1.woCompare(o2, ordering);
invariant(bsonCmp <= 0); // We should be sorted...
@@ -613,29 +765,28 @@ void testPermutation(const std::vector<BSONObj>& elementsOrig,
}
}
-TEST(KeyStringTest, AllPermCompare) {
- const std::vector<BSONObj>& elements = getInterestingElements();
+TEST_F(KeyStringTest, AllPermCompare) {
+ const std::vector<BSONObj>& elements = getInterestingElements(version);
for (size_t i = 0; i < elements.size(); i++) {
const BSONObj& o = elements[i];
- ROUNDTRIP(o);
+ ROUNDTRIP(version, o);
}
std::vector<BSONObj> orderings;
orderings.push_back(BSON("a" << 1));
orderings.push_back(BSON("a" << -1));
- testPermutation(elements, orderings, false);
+ testPermutation(version, elements, orderings, false);
}
-TEST(KeyStringTest, AllPerm2Compare) {
-// This test can take over a minute without optimizations. Re-enable if you need to debug it.
+TEST_F(KeyStringTest, AllPerm2Compare) {
#if !defined(MONGO_CONFIG_OPTIMIZED_BUILD)
- log() << "\t\t\tskipping test on non-optimized build";
+ log() << "\t\t\tskipping permutation testing on non-optimized build";
return;
#endif
- const std::vector<BSONObj>& baseElements = getInterestingElements();
+ const std::vector<BSONObj>& baseElements = getInterestingElements(version);
std::vector<BSONObj> elements;
for (size_t i = 0; i < baseElements.size(); i++) {
@@ -648,11 +799,12 @@ TEST(KeyStringTest, AllPerm2Compare) {
}
}
- log() << "AllPrem2Compare size:" << elements.size();
+ log() << "AllPerm2Compare " << KeyString::versionToString(version)
+ << " size:" << elements.size();
for (size_t i = 0; i < elements.size(); i++) {
const BSONObj& o = elements[i];
- ROUNDTRIP(o);
+ ROUNDTRIP(version, o);
}
std::vector<BSONObj> orderings;
@@ -661,7 +813,7 @@ TEST(KeyStringTest, AllPerm2Compare) {
orderings.push_back(BSON("a" << 1 << "b" << -1));
orderings.push_back(BSON("a" << -1 << "b" << -1));
- testPermutation(elements, orderings, false);
+ testPermutation(version, elements, orderings, false);
}
#define COMPARE_HELPER(LHS, RHS) (((LHS) < (RHS)) ? -1 : (((LHS) == (RHS)) ? 0 : 1))
@@ -696,18 +848,18 @@ int compareNumbers(const BSONElement& lhs, const BSONElement& rhs) {
}
}
-TEST(KeyStringTest, NaNs) {
+TEST_F(KeyStringTest, NaNs) {
// TODO use hex floats to force distinct NaNs
const double nan1 = std::numeric_limits<double>::quiet_NaN();
const double nan2 = std::numeric_limits<double>::signaling_NaN();
// Since only output a single NaN, we can't use the normal ROUNDTRIP testing here.
- const KeyString ks1a(BSON("" << nan1), ONE_ASCENDING);
- const KeyString ks1d(BSON("" << nan1), ONE_DESCENDING);
+ const KeyString ks1a(version, BSON("" << nan1), ONE_ASCENDING);
+ const KeyString ks1d(version, BSON("" << nan1), ONE_DESCENDING);
- const KeyString ks2a(BSON("" << nan2), ONE_ASCENDING);
- const KeyString ks2d(BSON("" << nan2), ONE_DESCENDING);
+ const KeyString ks2a(version, BSON("" << nan2), ONE_ASCENDING);
+ const KeyString ks2d(version, BSON("" << nan2), ONE_DESCENDING);
ASSERT_EQ(ks1a, ks2a);
ASSERT_EQ(ks1d, ks2d);
@@ -717,7 +869,7 @@ TEST(KeyStringTest, NaNs) {
ASSERT(std::isnan(toBson(ks1d, ONE_DESCENDING)[""].Double()));
ASSERT(std::isnan(toBson(ks2d, ONE_DESCENDING)[""].Double()));
}
-TEST(KeyStringTest, NumberOrderLots) {
+TEST_F(KeyStringTest, NumberOrderLots) {
std::vector<BSONObj> numbers;
{
numbers.push_back(BSON("" << 0));
@@ -773,7 +925,7 @@ TEST(KeyStringTest, NumberOrderLots) {
OwnedPointerVector<KeyString> keyStrings;
for (size_t i = 0; i < numbers.size(); i++) {
- keyStrings.push_back(new KeyString(numbers[i], ordering));
+ keyStrings.push_back(new KeyString(version, numbers[i], ordering));
}
for (size_t i = 0; i < numbers.size(); i++) {
@@ -793,12 +945,12 @@ TEST(KeyStringTest, NumberOrderLots) {
}
}
-TEST(KeyStringTest, RecordIds) {
+TEST_F(KeyStringTest, RecordIds) {
for (int i = 0; i < 63; i++) {
const RecordId rid = RecordId(1ll << i);
{ // Test encoding / decoding of single RecordIds
- const KeyString ks(rid);
+ const KeyString ks(version, rid);
ASSERT_GTE(ks.getSize(), 2u);
ASSERT_LTE(ks.getSize(), 10u);
@@ -811,12 +963,12 @@ TEST(KeyStringTest, RecordIds) {
}
if (rid.isNormal()) {
- ASSERT_GT(ks, KeyString(RecordId()));
- ASSERT_GT(ks, KeyString(RecordId::min()));
- ASSERT_LT(ks, KeyString(RecordId::max()));
+ ASSERT_GT(ks, KeyString(version, RecordId()));
+ ASSERT_GT(ks, KeyString(version, RecordId::min()));
+ ASSERT_LT(ks, KeyString(version, RecordId::max()));
- ASSERT_GT(ks, KeyString(RecordId(rid.repr() - 1)));
- ASSERT_LT(ks, KeyString(RecordId(rid.repr() + 1)));
+ ASSERT_GT(ks, KeyString(version, RecordId(rid.repr() - 1)));
+ ASSERT_LT(ks, KeyString(version, RecordId(rid.repr() + 1)));
}
}
@@ -824,15 +976,15 @@ TEST(KeyStringTest, RecordIds) {
RecordId other = RecordId(1ll << j);
if (rid == other)
- ASSERT_EQ(KeyString(rid), KeyString(other));
+ ASSERT_EQ(KeyString(version, rid), KeyString(version, other));
if (rid < other)
- ASSERT_LT(KeyString(rid), KeyString(other));
+ ASSERT_LT(KeyString(version, rid), KeyString(version, other));
if (rid > other)
- ASSERT_GT(KeyString(rid), KeyString(other));
+ ASSERT_GT(KeyString(version, rid), KeyString(version, other));
{
// Test concatenating RecordIds like in a unique index.
- KeyString ks;
+ KeyString ks(version);
ks.appendRecordId(RecordId::max()); // uses all bytes
ks.appendRecordId(rid);
ks.appendRecordId(RecordId(0xDEADBEEF)); // uses some extra bytes
@@ -857,3 +1009,134 @@ TEST(KeyStringTest, RecordIds) {
}
}
}
+
+namespace {
+const uint64_t kMinPerfMicros = 10 * 1000;
+const uint64_t kMinPerfSamples = 10 * 1000;
+typedef std::vector<BSONObj> Numbers;
+
+/**
+ * Evaluates ROUNDTRIP on all items in Numbers a sufficient number of times to take at least
+ * kMinPerfMicros microseconds. Logs the elapsed time per ROUNDTRIP evaluation.
+ */
+void perfTest(KeyString::Version version, const Numbers& numbers) {
+ uint64_t micros = 0;
+ uint64_t iters;
+ // Ensure at least 16 iterations are done and at least 25 milliseconds is timed
+ for (iters = 16; iters < (1 << 30) && micros < kMinPerfMicros; iters *= 2) {
+ // Measure the number of loops
+ Timer t;
+
+ for (uint64_t i = 0; i < iters; i++)
+ for (auto item : numbers) {
+ // Assuming there are sufficient invariants in the to/from KeyString methods
+ // that calls will not be optimized away.
+ const KeyString ks(version, item, ALL_ASCENDING);
+ const BSONObj& converted = toBson(ks, ALL_ASCENDING);
+ invariant(converted.binaryEqual(item));
+ }
+
+ micros = t.micros();
+ }
+
+ auto minmax = std::minmax_element(numbers.begin(), numbers.end());
+
+ log() << 1E3 * micros / static_cast<double>(iters * numbers.size()) << " ns per "
+ << mongo::KeyString::versionToString(version) << " roundtrip"
+ << (kDebugBuild ? " (DEBUG BUILD!)" : "") << " min " << (*minmax.first)[""] << ", max"
+ << (*minmax.second)[""];
+}
+} // namespace
+
+TEST_F(KeyStringTest, CommonIntPerf) {
+ // Exponential distribution, so skewed towards smaller integers.
+ std::random_device rd;
+ std::mt19937 gen(rd());
+ std::exponential_distribution<double> expReal(1e-3);
+
+ std::vector<BSONObj> numbers;
+ for (uint64_t x = 0; x < kMinPerfSamples; x++)
+ numbers.push_back(BSON("" << static_cast<int>(expReal(gen))));
+
+ perfTest(version, numbers);
+}
+
+TEST_F(KeyStringTest, UniformInt64Perf) {
+ std::vector<BSONObj> numbers;
+ std::random_device rd;
+ std::mt19937 gen(rd());
+ std::uniform_int_distribution<long long> uniformInt64(std::numeric_limits<long long>::min(),
+ std::numeric_limits<long long>::max());
+
+ for (uint64_t x = 0; x < kMinPerfSamples; x++)
+ numbers.push_back(BSON("" << uniformInt64(gen)));
+
+ perfTest(version, numbers);
+}
+
+TEST_F(KeyStringTest, CommonDoublePerf) {
+ std::random_device rd;
+ std::mt19937 gen(rd());
+ std::exponential_distribution<double> expReal(1e-3);
+
+ std::vector<BSONObj> numbers;
+ for (uint64_t x = 0; x < kMinPerfSamples; x++)
+ numbers.push_back(BSON("" << expReal(gen)));
+
+ perfTest(version, numbers);
+}
+
+TEST_F(KeyStringTest, UniformDoublePerf) {
+ std::vector<BSONObj> numbers;
+ std::random_device rd;
+ std::mt19937 gen(rd());
+ std::uniform_int_distribution<long long> uniformInt64(std::numeric_limits<long long>::min(),
+ std::numeric_limits<long long>::max());
+
+ for (uint64_t x = 0; x < kMinPerfSamples; x++) {
+ uint64_t u = uniformInt64(gen);
+ double d;
+ memcpy(&d, &u, sizeof(d));
+ if (std::isnormal(d))
+ numbers.push_back(BSON("" << d));
+ }
+ perfTest(version, numbers);
+}
+
+TEST_F(KeyStringTest, CommonDecimalPerf) {
+ std::random_device rd;
+ std::mt19937 gen(rd());
+ std::exponential_distribution<double> expReal(1e-3);
+
+ if (version == KeyString::Version::V0)
+ return;
+
+ std::vector<BSONObj> numbers;
+ for (uint64_t x = 0; x < kMinPerfSamples; x++)
+ numbers.push_back(
+ BSON("" << Decimal128(
+ expReal(gen), Decimal128::kRoundTo34Digits, Decimal128::kRoundTiesToAway)
+ .quantize(Decimal128("0.01", Decimal128::kRoundTiesToAway))));
+
+ perfTest(version, numbers);
+}
+
+TEST_F(KeyStringTest, UniformDecimalPerf) {
+ std::random_device rd;
+ std::mt19937 gen(rd());
+ std::uniform_int_distribution<long long> uniformInt64(std::numeric_limits<long long>::min(),
+ std::numeric_limits<long long>::max());
+
+ if (version == KeyString::Version::V0)
+ return;
+
+ std::vector<BSONObj> numbers;
+ for (uint64_t x = 0; x < kMinPerfSamples; x++) {
+ uint64_t hi = uniformInt64(gen);
+ uint64_t lo = uniformInt64(gen);
+ Decimal128 d(Decimal128::Value{lo, hi});
+ if (!d.isZero() && !d.isNaN() && !d.isInfinite())
+ numbers.push_back(BSON("" << d));
+ }
+ perfTest(version, numbers);
+}
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_index.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_index.cpp
index 2320672fe19..283eca55364 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_index.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_index.cpp
@@ -82,9 +82,10 @@ static const int TempKeyMaxSize = 1024; // this goes away with SERVER-3372
static const WiredTigerItem emptyItem(NULL, 0);
+static const int kKeyStringV1Version = 7;
static const int kMinimumIndexVersion = 6;
-static const int kCurrentIndexVersion = 6; // New indexes use this by default.
-static const int kMaximumIndexVersion = 6;
+static const int kCurrentIndexVersion = kKeyStringV1Version; // New indexes use this by default.
+static const int kMaximumIndexVersion = kKeyStringV1Version;
static_assert(kCurrentIndexVersion >= kMinimumIndexVersion,
"kCurrentIndexVersion >= kMinimumIndexVersion");
static_assert(kCurrentIndexVersion <= kMaximumIndexVersion,
@@ -197,7 +198,7 @@ StatusWith<std::string> WiredTigerIndex::generateCreateString(const std::string&
// Index metadata
ss << ",app_metadata=("
- << "formatVersion=" << kCurrentIndexVersion << ','
+ << "formatVersion=" << (enableBSON1_1 ? kKeyStringV1Version : kCurrentIndexVersion) << ','
<< "infoObj=" << desc.infoObj().jsonString() << "),";
LOG(3) << "index create string: " << ss.ss.str();
@@ -218,6 +219,8 @@ WiredTigerIndex::WiredTigerIndex(OperationContext* ctx,
const std::string& uri,
const IndexDescriptor* desc)
: _ordering(Ordering::make(desc->keyPattern())),
+ _keyStringVersion(desc->version() < kKeyStringV1Version ? KeyString::Version::V0
+ : KeyString::Version::V1),
_uri(uri),
_tableId(WiredTigerSession::genTableId()),
_collectionNamespace(desc->parentNS()),
@@ -412,7 +415,7 @@ long long WiredTigerIndex::getSpaceUsedBytes(OperationContext* txn) const {
bool WiredTigerIndex::isDup(WT_CURSOR* c, const BSONObj& key, const RecordId& id) {
invariant(unique());
// First check whether the key exists.
- KeyString data(key, _ordering);
+ KeyString data(keyStringVersion(), key, _ordering);
WiredTigerItem item(data.getBuffer(), data.getSize());
c->set_key(c, item.Get());
int ret = WT_OP_CHECK(c->search(c));
@@ -430,7 +433,7 @@ bool WiredTigerIndex::isDup(WT_CURSOR* c, const BSONObj& key, const RecordId& id
if (KeyString::decodeRecordId(&br) == id)
return false;
- KeyString::TypeBits::fromBuffer(&br); // Just calling this to advance reader.
+ KeyString::TypeBits::fromBuffer(keyStringVersion(), &br); // Just advance the reader.
}
return true;
}
@@ -511,7 +514,7 @@ public:
return s;
}
- KeyString data(key, _idx->_ordering, id);
+ KeyString data(_idx->keyStringVersion(), key, _idx->_ordering, id);
// Can't use WiredTigerCursor since we aren't using the cache.
WiredTigerItem item(data.getBuffer(), data.getSize());
@@ -550,7 +553,10 @@ private:
class WiredTigerIndex::UniqueBulkBuilder : public BulkBuilder {
public:
UniqueBulkBuilder(WiredTigerIndex* idx, OperationContext* txn, bool dupsAllowed)
- : BulkBuilder(idx, txn), _idx(idx), _dupsAllowed(dupsAllowed) {}
+ : BulkBuilder(idx, txn),
+ _idx(idx),
+ _dupsAllowed(dupsAllowed),
+ _keyString(idx->keyStringVersion()) {}
Status addKey(const BSONObj& newKey, const RecordId& id) {
{
@@ -598,7 +604,7 @@ private:
void doInsert() {
invariant(!_records.empty());
- KeyString value;
+ KeyString value(_idx->keyStringVersion());
for (size_t i = 0; i < _records.size(); i++) {
value.appendRecordId(_records[i].first);
// When there is only one record, we can omit AllZeros TypeBits. Otherwise they need
@@ -634,7 +640,12 @@ namespace {
class WiredTigerIndexCursorBase : public SortedDataInterface::Cursor {
public:
WiredTigerIndexCursorBase(const WiredTigerIndex& idx, OperationContext* txn, bool forward)
- : _txn(txn), _idx(idx), _forward(forward) {
+ : _txn(txn),
+ _idx(idx),
+ _forward(forward),
+ _key(idx.keyStringVersion()),
+ _typeBits(idx.keyStringVersion()),
+ _query(idx.keyStringVersion()) {
_cursor.emplace(_idx.uri(), _idx.tableId(), false, _txn);
}
boost::optional<IndexKeyEntry> next(RequestedInfo parts) override {
@@ -660,7 +671,7 @@ public:
// end after the key if inclusive and before if exclusive.
const auto discriminator =
_forward == inclusive ? KeyString::kExclusiveAfter : KeyString::kExclusiveBefore;
- _endPosition = stdx::make_unique<KeyString>();
+ _endPosition = stdx::make_unique<KeyString>(_idx.keyStringVersion());
_endPosition->resetToKey(stripFieldNames(key), _idx.ordering(), discriminator);
}
@@ -988,10 +999,10 @@ Status WiredTigerIndexUnique::_insert(WT_CURSOR* c,
const BSONObj& key,
const RecordId& id,
bool dupsAllowed) {
- const KeyString data(key, _ordering);
+ const KeyString data(keyStringVersion(), key, _ordering);
WiredTigerItem keyItem(data.getBuffer(), data.getSize());
- KeyString value(id);
+ KeyString value(keyStringVersion(), id);
if (!data.getTypeBits().isAllZeros())
value.appendTypeBits(data.getTypeBits());
@@ -1031,7 +1042,7 @@ Status WiredTigerIndexUnique::_insert(WT_CURSOR* c,
// Copy from old to new value
value.appendRecordId(idInIndex);
- value.appendTypeBits(KeyString::TypeBits::fromBuffer(&br));
+ value.appendTypeBits(KeyString::TypeBits::fromBuffer(keyStringVersion(), &br));
}
if (!dupsAllowed)
@@ -1052,7 +1063,7 @@ void WiredTigerIndexUnique::_unindex(WT_CURSOR* c,
const BSONObj& key,
const RecordId& id,
bool dupsAllowed) {
- KeyString data(key, _ordering);
+ KeyString data(keyStringVersion(), key, _ordering);
WiredTigerItem keyItem(data.getBuffer(), data.getSize());
c->set_key(c, keyItem.Get());
@@ -1091,7 +1102,7 @@ void WiredTigerIndexUnique::_unindex(WT_CURSOR* c,
BufReader br(old.data, old.size);
while (br.remaining()) {
RecordId idInIndex = KeyString::decodeRecordId(&br);
- KeyString::TypeBits typeBits = KeyString::TypeBits::fromBuffer(&br);
+ KeyString::TypeBits typeBits = KeyString::TypeBits::fromBuffer(keyStringVersion(), &br);
if (id == idInIndex) {
if (records.empty() && !br.remaining()) {
@@ -1114,7 +1125,7 @@ void WiredTigerIndexUnique::_unindex(WT_CURSOR* c,
}
// Put other ids for this key back in the index.
- KeyString newValue;
+ KeyString newValue(keyStringVersion());
invariant(!records.empty());
for (size_t i = 0; i < records.size(); i++) {
newValue.appendRecordId(records[i].first);
@@ -1157,7 +1168,7 @@ Status WiredTigerIndexStandard::_insert(WT_CURSOR* c,
TRACE_INDEX << " key: " << keyBson << " id: " << id;
- KeyString key(keyBson, _ordering, id);
+ KeyString key(keyStringVersion(), keyBson, _ordering, id);
WiredTigerItem keyItem(key.getBuffer(), key.getSize());
WiredTigerItem valueItem = key.getTypeBits().isAllZeros()
@@ -1181,7 +1192,7 @@ void WiredTigerIndexStandard::_unindex(WT_CURSOR* c,
const RecordId& id,
bool dupsAllowed) {
invariant(dupsAllowed);
- KeyString data(key, _ordering, id);
+ KeyString data(keyStringVersion(), key, _ordering, id);
WiredTigerItem item(data.getBuffer(), data.getSize());
c->set_key(c, item.Get());
int ret = WT_OP_CHECK(c->remove(c));
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_index.h b/src/mongo/db/storage/wiredtiger/wiredtiger_index.h
index cb4be8794c7..f989f0b1ce5 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_index.h
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_index.h
@@ -31,6 +31,7 @@
#include <wiredtiger.h>
#include "mongo/base/status_with.h"
+#include "mongo/db/storage/key_string.h"
#include "mongo/db/storage/index_entry_comparison.h"
#include "mongo/db/storage/sorted_data_interface.h"
#include "mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h"
@@ -121,6 +122,10 @@ public:
return _ordering;
}
+ KeyString::Version keyStringVersion() const {
+ return _keyStringVersion;
+ }
+
virtual bool unique() const = 0;
Status dupKeyError(const BSONObj& key);
@@ -141,6 +146,7 @@ protected:
class UniqueBulkBuilder;
const Ordering _ordering;
+ const KeyString::Version _keyStringVersion;
std::string _uri;
uint64_t _tableId;
std::string _collectionNamespace;
diff --git a/src/mongo/platform/decimal128.cpp b/src/mongo/platform/decimal128.cpp
index ddaa381b367..990b18e1569 100644
--- a/src/mongo/platform/decimal128.cpp
+++ b/src/mongo/platform/decimal128.cpp
@@ -223,7 +223,7 @@ Decimal128::Decimal128(double doubleValue,
// Estimate doubleValue's base10 exponent from its base2 exponent by
// multiplying by an approximation of log10(2).
// Since 10^(x*log10(2)) == 2^x, this initial guess gets us very close.
- int base10Exp = (base2Exp * 301) / 1000;
+ int base10Exp = (base2Exp * 30103) / (100 * 1000);
// Although both 1000 and .001 have a base 10 exponent of magnitude 3, they have
// a different number of leading/trailing zeros. Adjust base10Exp to compensate.
@@ -244,6 +244,7 @@ Decimal128::Decimal128(double doubleValue,
}
// The decimal must have exactly 15 digits of precision
+ invariant(getCoefficientHigh() == 0);
invariant(_value.low64 >= 100000000000000ull && _value.low64 <= 999999999999999ull);
}
@@ -566,11 +567,6 @@ Decimal128 Decimal128::quantize(const Decimal128& reference,
return result;
}
-Decimal128 Decimal128::normalize() const {
- // Normalize by adding 0E-6176 which forces a decimal to maximum precision (34 digits)
- return add(kLargestNegativeExponentZero);
-}
-
bool Decimal128::isEqual(const Decimal128& other) const {
std::uint32_t throwAwayFlag = 0;
BID_UINT128 current = decimal128ToLibraryType(_value);
diff --git a/src/mongo/platform/decimal128.h b/src/mongo/platform/decimal128.h
index 22dadb5c7c3..9481a30ccd1 100644
--- a/src/mongo/platform/decimal128.h
+++ b/src/mongo/platform/decimal128.h
@@ -360,7 +360,10 @@ public:
* decimals with the same representation (5000000000000000000000000000000000E-33).
* Hashing equal decimals to equal hashes becomes possible with such normalization.
*/
- Decimal128 normalize() const;
+ Decimal128 normalize() const {
+ // Normalize by adding 0E-6176 which forces a decimal to maximum precision (34 digits)
+ return add(kLargestNegativeExponentZero);
+ }
/**
* This set of comparison operations takes a single Decimal128 and returns a boolean