diff options
author | Louis Williams <louis.williams@mongodb.com> | 2021-04-09 14:05:18 -0400 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-04-09 19:03:34 +0000 |
commit | da7c4d8cdc446b50826455fbd20c08b55b31e897 (patch) | |
tree | d396ee39a185e53a6afb2da8c3f386a54232e261 /src/mongo/db/storage/key_string.cpp | |
parent | 61d796c8dd5b25521e273fcb2d88cd91e37688d6 (diff) | |
download | mongo-da7c4d8cdc446b50826455fbd20c08b55b31e897.tar.gz |
SERVER-55779 Clustered collections should store RecordIds as KeyString
Diffstat (limited to 'src/mongo/db/storage/key_string.cpp')
-rw-r--r-- | src/mongo/db/storage/key_string.cpp | 86 |
1 files changed, 71 insertions, 15 deletions
diff --git a/src/mongo/db/storage/key_string.cpp b/src/mongo/db/storage/key_string.cpp index 89c4561b46b..f5cb04f3b20 100644 --- a/src/mongo/db/storage/key_string.cpp +++ b/src/mongo/db/storage/key_string.cpp @@ -236,6 +236,9 @@ const uint8_t kEnd = 0x4; // the encoding of NUL bytes in strings as "\x00\xff". const uint8_t kLess = 1; const uint8_t kGreater = 254; + +// The maximum length of a RecordId binary string that may be appended to a KeyString. +const int8_t kMaxRecordIdStrLen = 127; } // namespace // some utility functions @@ -275,7 +278,7 @@ StringData readCString(BufReader* reader) { */ StringData readCStringWithNuls(BufReader* reader, std::string* scratch) { const StringData initial = readCString(reader); - if (reader->peek<unsigned char>() != 0xFF) + if (!reader->remaining() || reader->peek<unsigned char>() != 0xFF) return initial; // Don't alloc or copy for simple case with no NUL bytes. scratch->append(initial.rawData(), initial.size()); @@ -406,6 +409,13 @@ void BuilderBase<BufferT>::appendSetAsArray(const BSONElementSet& set, const Str } template <class BufferT> +void BuilderBase<BufferT>::appendOID(OID oid) { + _verifyAppendingState(); + _appendOID(oid, _shouldInvertOnAppend()); + _elemCount++; +} + +template <class BufferT> void BuilderBase<BufferT>::appendDiscriminator(const Discriminator discriminator) { // The discriminator forces this KeyString to compare Less/Greater than any KeyString with // the same prefix of keys. As an example, this can be used to land on the first key in the @@ -474,7 +484,7 @@ void BuilderBase<BufferT>::_appendRecordIdLong(int64_t val) { if (raw < 0) { // Note: we encode RecordId::minLong() and RecordId() the same which is ok, as they // are never stored so they will never be compared to each other. - invariant(raw == RecordId::minLong().asLong()); + invariant(raw == RecordId::minLong().getLong()); raw = 0; } const uint64_t value = static_cast<uint64_t>(raw); @@ -506,10 +516,20 @@ void BuilderBase<BufferT>::_appendRecordIdLong(int64_t val) { template <class BufferT> void BuilderBase<BufferT>::_appendRecordIdStr(const char* str, int size) { - // Only 12 byte strings can be encoded as RecordIds. - invariant(size == RecordId::kSmallStrSize); + // This encoding for RecordId binary strings stores the size at the end. This means that a + // RecordId may only be appended at the end of a KeyString. That is, it cannot be appended in + // the middle of a KeyString and also be binary-comparable. + + // The current maximum string length is 127. The high bit is reserved for future usage. + invariant(size <= kMaxRecordIdStrLen); + invariant(size > 0); + const bool invert = false; + + // String is encoded with a single byte for the size at the end. _appendBytes(str, size, invert); + auto encodedSize = static_cast<uint8_t>(size); + _append(encodedSize, invert); } template <class BufferT> @@ -2488,19 +2508,20 @@ RecordId decodeRecordIdLong(BufReader* reader) { } RecordId decodeRecordIdStrAtEnd(const void* bufferRaw, size_t bufSize) { + invariant(bufSize > 0); const uint8_t* buffer = static_cast<const uint8_t*>(bufferRaw); - // We currently require all RecordId strings to be 12 bytes. - const int ridSize = RecordId::kSmallStrSize; - invariant(bufSize >= ridSize); - const uint8_t* firstBytePtr = (buffer + bufSize - ridSize); - BufReader reader(firstBytePtr, ridSize); - return decodeRecordIdStr(&reader); -} -RecordId decodeRecordIdStr(BufReader* reader) { - // We currently require all RecordId strings to be 12 bytes. - const int size = RecordId::kSmallStrSize; - return RecordId(static_cast<const char*>(reader->skip(size)), size); + // The current encoding for strings supports strings up to 128 bytes. The high bit is reserved + // for future usage. + uint8_t len = buffer[bufSize - 1]; + keyStringAssert(5577900, + fmt::format("Cannot decode record id string longer than {} bytes; size is {}", + kMaxRecordIdStrLen, + len), + len <= kMaxRecordIdStrLen); + invariant(bufSize > len); + const uint8_t* firstBytePtr = (buffer + bufSize - len - 1); + return RecordId(reinterpret_cast<const char*>(firstBytePtr), len); } int compare(const char* leftBuf, const char* rightBuf, size_t leftSize, size_t rightSize) { @@ -2551,6 +2572,41 @@ bool readSBEValue(BufReader* reader, return true; } +void appendSingleFieldToBSONAs( + const char* buf, int len, StringData fieldName, BSONObjBuilder* builder, Version version) { + const bool inverted = false; + + BufReader reader(buf, len); + invariant(reader.remaining()); + uint8_t ctype = readType<uint8_t>(&reader, inverted); + invariant(ctype != kEnd && ctype > kLess && ctype < kGreater); + + const uint32_t depth = 1; // This function only gets called for a top-level KeyString::Value. + // Callers discard their TypeBits. + TypeBits typeBits(version); + TypeBits::Reader typeBitsReader(typeBits); + + BSONObjBuilderValueStream& stream = *builder << fieldName; + toBsonValue(ctype, &reader, &typeBitsReader, inverted, version, &stream, depth); +} + +void appendToBSONArray(const char* buf, int len, BSONArrayBuilder* builder, Version version) { + const bool inverted = false; + + BufReader reader(buf, len); + invariant(reader.remaining()); + uint8_t ctype = readType<uint8_t>(&reader, inverted); + invariant(ctype != kEnd && ctype > kLess && ctype < kGreater); + + // This function only gets called for a top-level KeyString::Value. + const uint32_t depth = 1; + // All users of this currently discard type bits. + TypeBits typeBits(version); + TypeBits::Reader typeBitsReader(typeBits); + + toBsonValue(ctype, &reader, &typeBitsReader, inverted, version, builder, depth); +} + void Value::serializeWithoutRecordId(BufBuilder& buf) const { dassert(decodeRecordIdLongAtEnd(_buffer.get(), _ksSize).isValid()); |