diff options
Diffstat (limited to 'db/db_impl.cc')
-rw-r--r-- | db/db_impl.cc | 98 |
1 files changed, 68 insertions, 30 deletions
diff --git a/db/db_impl.cc b/db/db_impl.cc index 0ca6386..56182a0 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -454,13 +454,8 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit, if (s.ok() && meta.file_size > 0) { const Slice min_user_key = meta.smallest.user_key(); const Slice max_user_key = meta.largest.user_key(); - if (base != NULL && !base->OverlapInLevel(0, min_user_key, max_user_key)) { - // Push the new sstable to a higher level if possible to reduce - // expensive manifest file ops. - while (level < config::kMaxMemCompactLevel && - !base->OverlapInLevel(level + 1, min_user_key, max_user_key)) { - level++; - } + if (base != NULL) { + level = base->PickLevelForMemTableOutput(min_user_key, max_user_key); } edit->AddFile(level, meta.number, meta.file_size, meta.smallest, meta.largest); @@ -506,25 +501,55 @@ Status DBImpl::CompactMemTable() { return s; } -void DBImpl::TEST_CompactRange( - int level, - const std::string& begin, - const std::string& end) { +void DBImpl::CompactRange(const Slice* begin, const Slice* end) { + int max_level_with_files = 1; + { + MutexLock l(&mutex_); + Version* base = versions_->current(); + for (int level = 1; level < config::kNumLevels; level++) { + if (base->OverlapInLevel(level, begin, end)) { + max_level_with_files = level; + } + } + } + TEST_CompactMemTable(); // TODO(sanjay): Skip if memtable does not overlap + for (int level = 0; level < max_level_with_files; level++) { + TEST_CompactRange(level, begin, end); + } +} + +void DBImpl::TEST_CompactRange(int level, const Slice* begin,const Slice* end) { assert(level >= 0); assert(level + 1 < config::kNumLevels); - MutexLock l(&mutex_); - while (manual_compaction_ != NULL) { - bg_cv_.Wait(); - } + InternalKey begin_storage, end_storage; + ManualCompaction manual; manual.level = level; - manual.begin = begin; - manual.end = end; - manual_compaction_ = &manual; - MaybeScheduleCompaction(); - while (manual_compaction_ == &manual) { - bg_cv_.Wait(); + manual.done = false; + if (begin == NULL) { + manual.begin = NULL; + } else { + begin_storage = InternalKey(*begin, kMaxSequenceNumber, kValueTypeForSeek); + manual.begin = &begin_storage; + } + if (end == NULL) { + manual.end = NULL; + } else { + end_storage = InternalKey(*end, 0, static_cast<ValueType>(0)); + manual.end = &end_storage; + } + + MutexLock l(&mutex_); + while (!manual.done) { + while (manual_compaction_ != NULL) { + bg_cv_.Wait(); + } + manual_compaction_ = &manual; + MaybeScheduleCompaction(); + while (manual_compaction_ == &manual) { + bg_cv_.Wait(); + } } } @@ -590,12 +615,20 @@ void DBImpl::BackgroundCompaction() { Compaction* c; bool is_manual = (manual_compaction_ != NULL); + InternalKey manual_end; if (is_manual) { - const ManualCompaction* m = manual_compaction_; - c = versions_->CompactRange( + ManualCompaction* m = manual_compaction_; + c = versions_->CompactRange(m->level, m->begin, m->end); + m->done = (c == NULL); + if (c != NULL) { + manual_end = c->input(0, c->num_input_files(0) - 1)->largest; + } + Log(options_.info_log, + "Manual compaction at level-%d from %s .. %s; will stop at %s\n", m->level, - InternalKey(m->begin, kMaxSequenceNumber, kValueTypeForSeek), - InternalKey(m->end, 0, static_cast<ValueType>(0))); + (m->begin ? m->begin->DebugString().c_str() : "(begin)"), + (m->end ? m->end->DebugString().c_str() : "(end)"), + (m->done ? "(end)" : manual_end.DebugString().c_str())); } else { c = versions_->PickCompaction(); } @@ -638,7 +671,13 @@ void DBImpl::BackgroundCompaction() { } if (is_manual) { - // Mark it as done + ManualCompaction* m = manual_compaction_; + if (!m->done) { + // We only compacted part of the requested range. Update *m + // to the range that is left to be compacted. + m->tmp_storage = manual_end; + m->begin = &m->tmp_storage; + } manual_compaction_ = NULL; } } @@ -1109,10 +1148,6 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) { versions_->SetLastSequence(last_sequence); } - if (options.post_write_snapshot != NULL) { - *options.post_write_snapshot = - status.ok() ? snapshots_.New(last_sequence) : NULL; - } ReleaseLoggingResponsibility(&self); return status; } @@ -1225,6 +1260,9 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) { } } return true; + } else if (in == "sstables") { + *value = versions_->current()->DebugString(); + return true; } return false; |