diff options
Diffstat (limited to 'db/db_impl.cc')
-rw-r--r-- | db/db_impl.cc | 64 |
1 files changed, 46 insertions, 18 deletions
diff --git a/db/db_impl.cc b/db/db_impl.cc index 3b9e04e..baf9299 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -126,6 +126,7 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) log_(NULL), bg_compaction_scheduled_(false), compacting_(false) { + mem_->Ref(); has_imm_.Release_Store(NULL); // Reserve ten files or so for other uses and give the rest to TableCache. @@ -152,8 +153,8 @@ DBImpl::~DBImpl() { } delete versions_; - delete mem_; - delete imm_; + if (mem_ != NULL) mem_->Unref(); + if (imm_ != NULL) imm_->Unref(); delete log_; delete logfile_; delete table_cache_; @@ -344,7 +345,8 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, // paranoid_checks==false so that corruptions cause entire commits // to be skipped instead of propagating bad information (like overly // large sequence numbers). - log::Reader reader(file, &reporter, true/*checksum*/); + log::Reader reader(file, &reporter, true/*checksum*/, + 0/*initial_offset*/); Log(env_, options_.info_log, "Recovering log #%llu", (unsigned long long) log_number); @@ -364,6 +366,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, if (mem == NULL) { mem = new MemTable(internal_comparator_); + mem->Ref(); } status = WriteBatchInternal::InsertInto(&batch, mem); MaybeIgnoreError(&status); @@ -384,7 +387,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, // file-systems cause the DB::Open() to fail. break; } - delete mem; + mem->Unref(); mem = NULL; } } @@ -395,7 +398,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, // file-systems cause the DB::Open() to fail. } - delete mem; + if (mem != NULL) mem->Unref(); delete file; return status; } @@ -443,11 +446,12 @@ Status DBImpl::CompactMemTable() { // Replace immutable memtable with the generated Table if (s.ok()) { edit.SetPrevLogNumber(0); - s = versions_->LogAndApply(&edit, imm_); + s = versions_->LogAndApply(&edit); } if (s.ok()) { // Commit to the new state + imm_->Unref(); imm_ = NULL; has_imm_.Release_Store(NULL); DeleteObsoleteFiles(); @@ -556,7 +560,7 @@ void DBImpl::BackgroundCompaction() { c->edit()->DeleteFile(c->level(), f->number); c->edit()->AddFile(c->level() + 1, f->number, f->file_size, f->smallest, f->largest); - status = versions_->LogAndApply(c->edit(), NULL); + status = versions_->LogAndApply(c->edit()); Log(env_, options_.info_log, "Moved #%lld to level-%d %lld bytes %s\n", static_cast<unsigned long long>(f->number), c->level() + 1, @@ -697,7 +701,7 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) { } compact->outputs.clear(); - Status s = versions_->LogAndApply(compact->compaction->edit(), NULL); + Status s = versions_->LogAndApply(compact->compaction->edit()); if (s.ok()) { compact->compaction->ReleaseInputs(); DeleteObsoleteFiles(); @@ -754,9 +758,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { } Slice key = input->key(); - InternalKey tmp_internal_key; - tmp_internal_key.DecodeFrom(key); - if (compact->compaction->ShouldStopBefore(tmp_internal_key) && + if (compact->compaction->ShouldStopBefore(key) && compact->builder != NULL) { status = FinishCompactionOutputFile(compact, input); if (!status.ok()) { @@ -867,6 +869,9 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { } compacting_ = false; compacting_cv_.SignalAll(); + VersionSet::LevelSummaryStorage tmp; + Log(env_, options_.info_log, + "compacted to: %s", versions_->LevelSummary(&tmp)); return status; } @@ -925,10 +930,11 @@ Status DBImpl::Get(const ReadOptions& options, Iterator* DBImpl::NewIterator(const ReadOptions& options) { SequenceNumber latest_snapshot; Iterator* internal_iter = NewInternalIterator(options, &latest_snapshot); - SequenceNumber sequence = - (options.snapshot ? options.snapshot->number_ : latest_snapshot); - return NewDBIterator(&dbname_, env_, - user_comparator(), internal_iter, sequence); + return NewDBIterator( + &dbname_, env_, user_comparator(), internal_iter, + (options.snapshot != NULL + ? reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_ + : latest_snapshot)); } void DBImpl::Unref(void* arg1, void* arg2) { @@ -945,7 +951,7 @@ const Snapshot* DBImpl::GetSnapshot() { void DBImpl::ReleaseSnapshot(const Snapshot* s) { MutexLock l(&mutex_); - snapshots_.Delete(s); + snapshots_.Delete(reinterpret_cast<const SnapshotImpl*>(s)); } // Convenience methods @@ -985,12 +991,26 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) { Status DBImpl::MakeRoomForWrite(bool force) { mutex_.AssertHeld(); + bool allow_delay = !force; Status s; while (true) { if (!bg_error_.ok()) { // Yield previous error s = bg_error_; break; + } else if ( + allow_delay && + versions_->NumLevelFiles(0) >= config::kL0_SlowdownWritesTrigger) { + // We are getting close to hitting a hard limit on the number of + // L0 files. Rather than delaying a single write by several + // seconds when we hit the hard limit, start delaying each + // individual write by 1ms to reduce latency variance. Also, + // this delay hands over some CPU to the compaction thread in + // case it is sharing the same core as the writer. + mutex_.Unlock(); + env_->SleepForMicroseconds(1000); + allow_delay = false; // Do not delay a single write more than once + mutex_.Lock(); } else if (!force && (mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) { // There is room in current memtable @@ -999,6 +1019,9 @@ Status DBImpl::MakeRoomForWrite(bool force) { // We have filled up the current memtable, but the previous // one is still being compacted, so we wait. compacting_cv_.Wait(); + } else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) { + // There are too many level-0 files. + compacting_cv_.Wait(); } else { // Attempt to switch to a new memtable and trigger compaction of old assert(versions_->PrevLogNumber() == 0); @@ -1011,7 +1034,7 @@ Status DBImpl::MakeRoomForWrite(bool force) { VersionEdit edit; edit.SetPrevLogNumber(versions_->LogNumber()); edit.SetLogNumber(new_log_number); - s = versions_->LogAndApply(&edit, NULL); + s = versions_->LogAndApply(&edit); if (!s.ok()) { delete lfile; env_->DeleteFile(LogFileName(dbname_, new_log_number)); @@ -1024,6 +1047,7 @@ Status DBImpl::MakeRoomForWrite(bool force) { imm_ = mem_; has_imm_.Release_Store(imm_); mem_ = new MemTable(internal_comparator_); + mem_->Ref(); force = false; // Do not force another compaction if have room MaybeScheduleCompaction(); } @@ -1141,10 +1165,11 @@ Status DB::Open(const Options& options, const std::string& dbname, edit.SetLogNumber(new_log_number); impl->logfile_ = lfile; impl->log_ = new log::Writer(lfile); - s = impl->versions_->LogAndApply(&edit, NULL); + s = impl->versions_->LogAndApply(&edit); } if (s.ok()) { impl->DeleteObsoleteFiles(); + impl->MaybeScheduleCompaction(); } } impl->mutex_.Unlock(); @@ -1156,6 +1181,9 @@ Status DB::Open(const Options& options, const std::string& dbname, return s; } +Snapshot::~Snapshot() { +} + Status DestroyDB(const std::string& dbname, const Options& options) { Env* env = options.env; std::vector<std::string> filenames; |