diff options
author | dgrogan@chromium.org <dgrogan@chromium.org@62dab493-f737-651d-591e-8d6aee1b9529> | 2011-05-21 02:17:43 +0000 |
---|---|---|
committer | dgrogan@chromium.org <dgrogan@chromium.org@62dab493-f737-651d-591e-8d6aee1b9529> | 2011-05-21 02:17:43 +0000 |
commit | da7990950787257cb312ca562ce5977749afc3e9 (patch) | |
tree | 91fe98f6e14e74c794392b22105a47a58499edff /db/log_reader.cc | |
parent | 3c111335a760d8d82414b91a54f740df09dd4f8f (diff) | |
download | leveldb-da7990950787257cb312ca562ce5977749afc3e9.tar.gz |
sync with upstream @ 21409451
Check the NEWS file for details of what changed.
git-svn-id: https://leveldb.googlecode.com/svn/trunk@28 62dab493-f737-651d-591e-8d6aee1b9529
Diffstat (limited to 'db/log_reader.cc')
-rw-r--r-- | db/log_reader.cc | 116 |
1 files changed, 97 insertions, 19 deletions
diff --git a/db/log_reader.cc b/db/log_reader.cc index 75e1d28..8721071 100644 --- a/db/log_reader.cc +++ b/db/log_reader.cc @@ -4,7 +4,6 @@ #include "db/log_reader.h" -#include <stdint.h> #include "leveldb/env.h" #include "util/coding.h" #include "util/crc32c.h" @@ -15,46 +14,104 @@ namespace log { Reader::Reporter::~Reporter() { } -Reader::Reader(SequentialFile* file, Reporter* reporter, bool checksum) +Reader::Reader(SequentialFile* file, Reporter* reporter, bool checksum, + uint64_t initial_offset) : file_(file), reporter_(reporter), checksum_(checksum), backing_store_(new char[kBlockSize]), buffer_(), - eof_(false) { + eof_(false), + last_record_offset_(0), + end_of_buffer_offset_(0), + initial_offset_(initial_offset) { } Reader::~Reader() { delete[] backing_store_; } +bool Reader::SkipToInitialBlock() { + size_t offset_in_block = initial_offset_ % kBlockSize; + uint64_t block_start_location = initial_offset_ - offset_in_block; + + // Don't search a block if we'd be in the trailer + if (offset_in_block > kBlockSize - 6) { + offset_in_block = 0; + block_start_location += kBlockSize; + } + + end_of_buffer_offset_ = block_start_location; + + // Skip to start of first block that can contain the initial record + if (block_start_location > 0) { + Status skip_status = file_->Skip(block_start_location); + if (!skip_status.ok()) { + ReportDrop(block_start_location, skip_status); + return false; + } + } + + return true; +} + bool Reader::ReadRecord(Slice* record, std::string* scratch) { + if (last_record_offset_ < initial_offset_) { + if (!SkipToInitialBlock()) { + return false; + } + } + scratch->clear(); record->clear(); bool in_fragmented_record = false; + // Record offset of the logical record that we're reading + // 0 is a dummy value to make compilers happy + uint64_t prospective_record_offset = 0; Slice fragment; while (true) { + uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size(); switch (ReadPhysicalRecord(&fragment)) { case kFullType: if (in_fragmented_record) { - ReportDrop(scratch->size(), "partial record without end"); + // Handle bug in earlier versions of log::Writer where + // it could emit an empty kFirstType record at the tail end + // of a block followed by a kFullType or kFirstType record + // at the beginning of the next block. + if (scratch->empty()) { + in_fragmented_record = false; + } else { + ReportCorruption(scratch->size(), "partial record without end(1)"); + } } + prospective_record_offset = physical_record_offset; scratch->clear(); *record = fragment; + last_record_offset_ = prospective_record_offset; return true; case kFirstType: if (in_fragmented_record) { - ReportDrop(scratch->size(), "partial record without end"); + // Handle bug in earlier versions of log::Writer where + // it could emit an empty kFirstType record at the tail end + // of a block followed by a kFullType or kFirstType record + // at the beginning of the next block. + if (scratch->empty()) { + in_fragmented_record = false; + } else { + ReportCorruption(scratch->size(), "partial record without end(2)"); + } } + prospective_record_offset = physical_record_offset; scratch->assign(fragment.data(), fragment.size()); in_fragmented_record = true; break; case kMiddleType: if (!in_fragmented_record) { - ReportDrop(fragment.size(), "missing start of fragmented record"); + ReportCorruption(fragment.size(), + "missing start of fragmented record(1)"); } else { scratch->append(fragment.data(), fragment.size()); } @@ -62,31 +119,33 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch) { case kLastType: if (!in_fragmented_record) { - ReportDrop(fragment.size(), "missing start of fragmented record"); + ReportCorruption(fragment.size(), + "missing start of fragmented record(2)"); } else { scratch->append(fragment.data(), fragment.size()); *record = Slice(*scratch); + last_record_offset_ = prospective_record_offset; return true; } break; case kEof: if (in_fragmented_record) { - ReportDrop(scratch->size(), "partial record without end"); + ReportCorruption(scratch->size(), "partial record without end(3)"); scratch->clear(); } return false; case kBadRecord: if (in_fragmented_record) { - ReportDrop(scratch->size(), "error in middle of record"); + ReportCorruption(scratch->size(), "error in middle of record"); in_fragmented_record = false; scratch->clear(); } break; default: - ReportDrop( + ReportCorruption( (fragment.size() + (in_fragmented_record ? scratch->size() : 0)), "unknown record type"); in_fragmented_record = false; @@ -97,9 +156,18 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch) { return false; } -void Reader::ReportDrop(size_t bytes, const char* reason) { - if (reporter_ != NULL) { - reporter_->Corruption(bytes, Status::Corruption(reason)); +uint64_t Reader::LastRecordOffset() { + return last_record_offset_; +} + +void Reader::ReportCorruption(size_t bytes, const char* reason) { + ReportDrop(bytes, Status::Corruption(reason)); +} + +void Reader::ReportDrop(size_t bytes, const Status& reason) { + if (reporter_ != NULL && + end_of_buffer_offset_ - buffer_.size() - bytes >= initial_offset_) { + reporter_->Corruption(bytes, reason); } } @@ -110,11 +178,10 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result) { // Last read was a full read, so this is a trailer to skip buffer_.clear(); Status status = file_->Read(kBlockSize, &buffer_, backing_store_); + end_of_buffer_offset_ += buffer_.size(); if (!status.ok()) { - if (reporter_ != NULL) { - reporter_->Corruption(kBlockSize, status); - } buffer_.clear(); + ReportDrop(kBlockSize, status); eof_ = true; return kEof; } else if (buffer_.size() < kBlockSize) { @@ -125,8 +192,9 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result) { // End of file return kEof; } else { - ReportDrop(buffer_.size(), "truncated record at end of file"); + size_t drop_size = buffer_.size(); buffer_.clear(); + ReportCorruption(drop_size, "truncated record at end of file"); return kEof; } } @@ -138,8 +206,9 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result) { const unsigned int type = header[6]; const uint32_t length = a | (b << 8); if (kHeaderSize + length > buffer_.size()) { - ReportDrop(buffer_.size(), "bad record length"); + size_t drop_size = buffer_.size(); buffer_.clear(); + ReportCorruption(drop_size, "bad record length"); return kBadRecord; } @@ -160,13 +229,22 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result) { // been corrupted and if we trust it, we could find some // fragment of a real log record that just happens to look // like a valid log record. - ReportDrop(buffer_.size(), "checksum mismatch"); + size_t drop_size = buffer_.size(); buffer_.clear(); + ReportCorruption(drop_size, "checksum mismatch"); return kBadRecord; } } buffer_.remove_prefix(kHeaderSize + length); + + // Skip physical record that started before initial_offset_ + if (end_of_buffer_offset_ - buffer_.size() - kHeaderSize - length < + initial_offset_) { + result->clear(); + return kBadRecord; + } + *result = Slice(header + kHeaderSize, length); return type; } |