diff options
Diffstat (limited to 'db/version_set.cc')
-rw-r--r-- | db/version_set.cc | 309 |
1 files changed, 171 insertions, 138 deletions
diff --git a/db/version_set.cc b/db/version_set.cc index c439f49..f64ac8d 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -57,17 +57,22 @@ std::string IntSetToString(const std::set<uint64_t>& s) { Version::~Version() { assert(refs_ == 0); + + // Remove from linked list + prev_->next_ = next_; + next_->prev_ = prev_; + + // Drop references to files for (int level = 0; level < config::kNumLevels; level++) { for (size_t i = 0; i < files_[level].size(); i++) { FileMetaData* f = files_[level][i]; - assert(f->refs >= 0); + assert(f->refs > 0); f->refs--; if (f->refs <= 0) { delete f; } } } - delete cleanup_mem_; } // An internal iterator. For a given version/level pair, yields @@ -77,9 +82,9 @@ Version::~Version() { // encoded using EncodeFixed64. class Version::LevelFileNumIterator : public Iterator { public: - LevelFileNumIterator(const Version* version, + LevelFileNumIterator(const InternalKeyComparator& icmp, const std::vector<FileMetaData*>* flist) - : icmp_(version->vset_->icmp_.user_comparator()), + : icmp_(icmp), flist_(flist), index_(flist->size()) { // Marks as invalid } @@ -157,7 +162,7 @@ static Iterator* GetFileIterator(void* arg, Iterator* Version::NewConcatenatingIterator(const ReadOptions& options, int level) const { return NewTwoLevelIterator( - new LevelFileNumIterator(this, &files_[level]), + new LevelFileNumIterator(vset_->icmp_, &files_[level]), &GetFileIterator, vset_->table_cache_, options); } @@ -185,11 +190,11 @@ void Version::Ref() { } void Version::Unref() { + assert(this != &vset_->dummy_versions_); assert(refs_ >= 1); --refs_; if (refs_ == 0) { - vset_->MaybeDeleteOldVersions(); - // TODO: try to delete obsolete files + delete this; } } @@ -222,37 +227,58 @@ std::string Version::DebugString() const { // Versions that contain full copies of the intermediate state. class VersionSet::Builder { private: - typedef std::map<uint64_t, FileMetaData*> FileMap; + // Helper to sort by v->files_[file_number].smallest + struct BySmallestKey { + const InternalKeyComparator* internal_comparator; + + bool operator()(FileMetaData* f1, FileMetaData* f2) const { + int r = internal_comparator->Compare(f1->smallest, f2->smallest); + if (r != 0) { + return (r < 0); + } else { + // Break ties by file number + return (f1->number < f2->number); + } + } + }; + + typedef std::set<FileMetaData*, BySmallestKey> FileSet; + struct LevelState { + std::set<uint64_t> deleted_files; + FileSet* added_files; + }; + VersionSet* vset_; - FileMap files_[config::kNumLevels]; + Version* base_; + LevelState levels_[config::kNumLevels]; public: // Initialize a builder with the files from *base and other info from *vset Builder(VersionSet* vset, Version* base) - : vset_(vset) { + : vset_(vset), + base_(base) { + base_->Ref(); + BySmallestKey cmp; + cmp.internal_comparator = &vset_->icmp_; for (int level = 0; level < config::kNumLevels; level++) { - const std::vector<FileMetaData*>& files = base->files_[level]; - for (size_t i = 0; i < files.size(); i++) { - FileMetaData* f = files[i]; - f->refs++; - files_[level].insert(std::make_pair(f->number, f)); - } + levels_[level].added_files = new FileSet(cmp); } } ~Builder() { for (int level = 0; level < config::kNumLevels; level++) { - const FileMap& fmap = files_[level]; - for (FileMap::const_iterator iter = fmap.begin(); - iter != fmap.end(); - ++iter) { - FileMetaData* f = iter->second; + std::vector<FileMetaData*> to_unref(levels_[level].added_files->begin(), + levels_[level].added_files->end()); + delete levels_[level].added_files; + for (int i = 0; i < to_unref.size(); i++) { + FileMetaData* f = to_unref[i]; f->refs--; if (f->refs <= 0) { delete f; } } } + base_->Unref(); } // Apply all of the edits in *edit to the current state. @@ -271,16 +297,7 @@ class VersionSet::Builder { ++iter) { const int level = iter->first; const uint64_t number = iter->second; - FileMap::iterator fiter = files_[level].find(number); - assert(fiter != files_[level].end()); // Sanity check for debug mode - if (fiter != files_[level].end()) { - FileMetaData* f = fiter->second; - f->refs--; - if (f->refs <= 0) { - delete f; - } - files_[level].erase(fiter); - } + levels_[level].deleted_files.insert(number); } // Add new files @@ -288,22 +305,66 @@ class VersionSet::Builder { const int level = edit->new_files_[i].first; FileMetaData* f = new FileMetaData(edit->new_files_[i].second); f->refs = 1; - assert(files_[level].count(f->number) == 0); - files_[level].insert(std::make_pair(f->number, f)); + levels_[level].deleted_files.erase(f->number); + levels_[level].added_files->insert(f); } } // Save the current state in *v. void SaveTo(Version* v) { + BySmallestKey cmp; + cmp.internal_comparator = &vset_->icmp_; for (int level = 0; level < config::kNumLevels; level++) { - const FileMap& fmap = files_[level]; - for (FileMap::const_iterator iter = fmap.begin(); - iter != fmap.end(); - ++iter) { - FileMetaData* f = iter->second; - f->refs++; - v->files_[level].push_back(f); + // Merge the set of added files with the set of pre-existing files. + // Drop any deleted files. Store the result in *v. + const std::vector<FileMetaData*>& base_files = base_->files_[level]; + std::vector<FileMetaData*>::const_iterator base_iter = base_files.begin(); + std::vector<FileMetaData*>::const_iterator base_end = base_files.end(); + const FileSet* added = levels_[level].added_files; + v->files_[level].reserve(base_files.size() + added->size()); + for (FileSet::const_iterator added_iter = added->begin(); + added_iter != added->end(); + ++added_iter) { + // Add all smaller files listed in base_ + for (std::vector<FileMetaData*>::const_iterator bpos + = std::upper_bound(base_iter, base_end, *added_iter, cmp); + base_iter != bpos; + ++base_iter) { + MaybeAddFile(v, level, *base_iter); + } + + MaybeAddFile(v, level, *added_iter); + } + + // Add remaining base files + for (; base_iter != base_end; ++base_iter) { + MaybeAddFile(v, level, *base_iter); } + +#ifndef NDEBUG + // Make sure there is no overlap in levels > 0 + if (level > 0) { + for (int i = 1; i < v->files_[level].size(); i++) { + const InternalKey& prev_end = v->files_[level][i-1]->largest; + const InternalKey& this_begin = v->files_[level][i]->smallest; + if (vset_->icmp_.Compare(prev_end, this_begin) >= 0) { + fprintf(stderr, "overlapping ranges in same level %s vs. %s\n", + EscapeString(prev_end.Encode()).c_str(), + EscapeString(this_begin.Encode()).c_str()); + abort(); + } + } + } +#endif + } + } + + void MaybeAddFile(Version* v, int level, FileMetaData* f) { + if (levels_[level].deleted_files.count(f->number) > 0) { + // File is deleted: do nothing + } else { + f->refs++; + v->files_[level].push_back(f); } } }; @@ -324,22 +385,36 @@ VersionSet::VersionSet(const std::string& dbname, prev_log_number_(0), descriptor_file_(NULL), descriptor_log_(NULL), - current_(new Version(this)), - oldest_(current_) { + dummy_versions_(this), + current_(NULL) { + AppendVersion(new Version(this)); } VersionSet::~VersionSet() { - for (Version* v = oldest_; v != NULL; ) { - Version* next = v->next_; - assert(v->refs_ == 0); - delete v; - v = next; - } + current_->Unref(); + assert(dummy_versions_.next_ == &dummy_versions_); // List must be empty delete descriptor_log_; delete descriptor_file_; } -Status VersionSet::LogAndApply(VersionEdit* edit, MemTable* cleanup_mem) { +void VersionSet::AppendVersion(Version* v) { + // Make "v" current + assert(v->refs_ == 0); + assert(v != current_); + if (current_ != NULL) { + current_->Unref(); + } + current_ = v; + v->Ref(); + + // Append to linked list + v->prev_ = dummy_versions_.prev_; + v->next_ = &dummy_versions_; + v->prev_->next_ = v; + v->next_->prev_ = v; +} + +Status VersionSet::LogAndApply(VersionEdit* edit) { if (edit->has_log_number_) { assert(edit->log_number_ >= log_number_); assert(edit->log_number_ < next_file_number_); @@ -360,22 +435,20 @@ Status VersionSet::LogAndApply(VersionEdit* edit, MemTable* cleanup_mem) { builder.Apply(edit); builder.SaveTo(v); } - - std::string new_manifest_file; - Status s = Finalize(v); + Finalize(v); // Initialize new descriptor log file if necessary by creating // a temporary file that contains a snapshot of the current version. - if (s.ok()) { - if (descriptor_log_ == NULL) { - assert(descriptor_file_ == NULL); - new_manifest_file = DescriptorFileName(dbname_, manifest_file_number_); - edit->SetNextFile(next_file_number_); - s = env_->NewWritableFile(new_manifest_file, &descriptor_file_); - if (s.ok()) { - descriptor_log_ = new log::Writer(descriptor_file_); - s = WriteSnapshot(descriptor_log_); - } + std::string new_manifest_file; + Status s; + if (descriptor_log_ == NULL) { + assert(descriptor_file_ == NULL); + new_manifest_file = DescriptorFileName(dbname_, manifest_file_number_); + edit->SetNextFile(next_file_number_); + s = env_->NewWritableFile(new_manifest_file, &descriptor_file_); + if (s.ok()) { + descriptor_log_ = new log::Writer(descriptor_file_); + s = WriteSnapshot(descriptor_log_); } } @@ -397,12 +470,7 @@ Status VersionSet::LogAndApply(VersionEdit* edit, MemTable* cleanup_mem) { // Install the new version if (s.ok()) { - assert(current_->next_ == NULL); - assert(current_->cleanup_mem_ == NULL); - current_->cleanup_mem_ = cleanup_mem; - v->next_ = NULL; - current_->next_ = v; - current_ = v; + AppendVersion(v); log_number_ = edit->log_number_; prev_log_number_ = edit->prev_log_number_; } else { @@ -458,7 +526,7 @@ Status VersionSet::Recover() { { LogReporter reporter; reporter.status = &s; - log::Reader reader(file, &reporter, true/*checksum*/); + log::Reader reader(file, &reporter, true/*checksum*/, 0/*initial_offset*/); Slice record; std::string scratch; while (reader.ReadRecord(&record, &scratch) && s.ok()) { @@ -518,20 +586,14 @@ Status VersionSet::Recover() { if (s.ok()) { Version* v = new Version(this); builder.SaveTo(v); - s = Finalize(v); - if (!s.ok()) { - delete v; - } else { - // Install recovered version - v->next_ = NULL; - current_->next_ = v; - current_ = v; - manifest_file_number_ = next_file; - next_file_number_ = next_file + 1; - last_sequence_ = last_sequence; - log_number_ = log_number; - prev_log_number_ = prev_log_number; - } + // Install recovered version + Finalize(v); + AppendVersion(v); + manifest_file_number_ = next_file; + next_file_number_ = next_file + 1; + last_sequence_ = last_sequence; + log_number_ = log_number; + prev_log_number_ = prev_log_number; } return s; @@ -545,15 +607,12 @@ static int64_t TotalFileSize(const std::vector<FileMetaData*>& files) { return sum; } -Status VersionSet::Finalize(Version* v) { +void VersionSet::Finalize(Version* v) { // Precomputed best level for next compaction int best_level = -1; double best_score = -1; - Status s; - for (int level = 0; s.ok() && level < config::kNumLevels-1; level++) { - s = SortLevel(v, level); - + for (int level = 0; level < config::kNumLevels-1; level++) { double score; if (level == 0) { // We treat level-0 specially by bounding the number of files @@ -567,7 +626,8 @@ Status VersionSet::Finalize(Version* v) { // file size is small (perhaps because of a small write-buffer // setting, or very high compression ratios, or lots of // overwrites/deletions). - score = v->files_[level].size() / 4.0; + score = v->files_[level].size() / + static_cast<double>(config::kL0_CompactionTrigger); } else { // Compute the ratio of current size to size limit. const uint64_t level_bytes = TotalFileSize(v->files_[level]); @@ -582,7 +642,6 @@ Status VersionSet::Finalize(Version* v) { v->compaction_level_ = best_level; v->compaction_score_ = best_score; - return s; } Status VersionSet::WriteSnapshot(log::Writer* log) { @@ -615,44 +674,27 @@ Status VersionSet::WriteSnapshot(log::Writer* log) { return log->AddRecord(record); } -// Helper to sort by tables_[file_number].smallest -struct VersionSet::BySmallestKey { - const InternalKeyComparator* internal_comparator; - - bool operator()(FileMetaData* f1, FileMetaData* f2) const { - return internal_comparator->Compare(f1->smallest, f2->smallest) < 0; - } -}; - -Status VersionSet::SortLevel(Version* v, uint64_t level) { - Status result; - BySmallestKey cmp; - cmp.internal_comparator = &icmp_; - std::sort(v->files_[level].begin(), v->files_[level].end(), cmp); - - if (result.ok() && level > 0) { - // There should be no overlap - for (size_t i = 1; i < v->files_[level].size(); i++) { - const InternalKey& prev_end = v->files_[level][i-1]->largest; - const InternalKey& this_begin = v->files_[level][i]->smallest; - if (icmp_.Compare(prev_end, this_begin) >= 0) { - result = Status::Corruption( - "overlapping ranges in same level", - (EscapeString(prev_end.Encode()) + " vs. " + - EscapeString(this_begin.Encode()))); - break; - } - } - } - return result; -} - int VersionSet::NumLevelFiles(int level) const { assert(level >= 0); assert(level < config::kNumLevels); return current_->files_[level].size(); } +const char* VersionSet::LevelSummary(LevelSummaryStorage* scratch) const { + // Update code if kNumLevels changes + assert(config::kNumLevels == 7); + snprintf(scratch->buffer, sizeof(scratch->buffer), + "files[ %d %d %d %d %d %d %d ]", + int(current_->files_[0].size()), + int(current_->files_[1].size()), + int(current_->files_[2].size()), + int(current_->files_[3].size()), + int(current_->files_[4].size()), + int(current_->files_[5].size()), + int(current_->files_[6].size())); + return scratch->buffer; +} + uint64_t VersionSet::ApproximateOffsetOf(Version* v, const InternalKey& ikey) { uint64_t result = 0; for (int level = 0; level < config::kNumLevels; level++) { @@ -685,19 +727,10 @@ uint64_t VersionSet::ApproximateOffsetOf(Version* v, const InternalKey& ikey) { return result; } -void VersionSet::MaybeDeleteOldVersions() { - // Note: it is important to delete versions in order since a newer - // version with zero refs may be holding a pointer to a memtable - // that is used by somebody who has a ref on an older version. - while (oldest_ != current_ && oldest_->refs_ == 0) { - Version* next = oldest_->next_; - delete oldest_; - oldest_ = next; - } -} - void VersionSet::AddLiveFiles(std::set<uint64_t>* live) { - for (Version* v = oldest_; v != NULL; v = v->next_) { + for (Version* v = dummy_versions_.next_; + v != &dummy_versions_; + v = v->next_) { for (int level = 0; level < config::kNumLevels; level++) { const std::vector<FileMetaData*>& files = v->files_[level]; for (size_t i = 0; i < files.size(); i++) { @@ -809,8 +842,7 @@ Iterator* VersionSet::MakeInputIterator(Compaction* c) { } else { // Create concatenating iterator for the files from this level list[num++] = NewTwoLevelIterator( - new Version::LevelFileNumIterator( - c->input_version_, &c->inputs_[which]), + new Version::LevelFileNumIterator(icmp_, &c->inputs_[which]), &GetFileIterator, table_cache_, options); } } @@ -996,11 +1028,12 @@ bool Compaction::IsBaseLevelForKey(const Slice& user_key) { return true; } -bool Compaction::ShouldStopBefore(const InternalKey& key) { +bool Compaction::ShouldStopBefore(const Slice& internal_key) { // Scan to find earliest grandparent file that contains key. const InternalKeyComparator* icmp = &input_version_->vset_->icmp_; while (grandparent_index_ < grandparents_.size() && - icmp->Compare(key, grandparents_[grandparent_index_]->largest) > 0) { + icmp->Compare(internal_key, + grandparents_[grandparent_index_]->largest.Encode()) > 0) { if (seen_key_) { overlapped_bytes_ += grandparents_[grandparent_index_]->file_size; } |