From 72630236513e7384cb0a2e8fffcae232135a5adc Mon Sep 17 00:00:00 2001 From: "gabor@google.com" Date: Thu, 1 Sep 2011 19:08:02 +0000 Subject: Bugfixes: for Get(), don't hold mutex while writing log. - Fix bug in Get: when it triggers a compaction, it could sometimes mark the compaction with the wrong level (if there was a gap in the set of levels examined for the Get). - Do not hold mutex while writing to the log file or to the MANIFEST file. Added a new benchmark that runs a writer thread concurrently with reader threads. Percentiles ------------------------------ micros/op: avg median 99 99.9 99.99 99.999 max ------------------------------------------------------ before: 42 38 110 225 32000 42000 48000 after: 24 20 55 65 130 1100 7000 - Fixed race in optimized Get. It should have been using the pinned memtables, not the current memtables. git-svn-id: https://leveldb.googlecode.com/svn/trunk@50 62dab493-f737-651d-591e-8d6aee1b9529 --- db/db_impl.cc | 69 +++++++++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 55 insertions(+), 14 deletions(-) (limited to 'db/db_impl.cc') diff --git a/db/db_impl.cc b/db/db_impl.cc index c4c6a61..0ca6386 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -113,6 +113,8 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) logfile_(NULL), logfile_number_(0), log_(NULL), + logger_(NULL), + logger_cv_(&mutex_), bg_compaction_scheduled_(false), manual_compaction_(NULL) { mem_->Ref(); @@ -308,6 +310,11 @@ Status DBImpl::Recover(VersionEdit* edit) { std::sort(logs.begin(), logs.end()); for (size_t i = 0; i < logs.size(); i++) { s = RecoverLogFile(logs[i], edit, &max_sequence); + + // The previous incarnation may not have written any MANIFEST + // records after allocating this log number. So we manually + // update the file number allocation counter in VersionSet. + versions_->MarkFileNumberUsed(logs[i]); } if (s.ok()) { @@ -485,7 +492,7 @@ Status DBImpl::CompactMemTable() { if (s.ok()) { edit.SetPrevLogNumber(0); edit.SetLogNumber(logfile_number_); // Earlier logs no longer needed - s = versions_->LogAndApply(&edit); + s = versions_->LogAndApply(&edit, &mutex_); } if (s.ok()) { @@ -523,7 +530,10 @@ void DBImpl::TEST_CompactRange( Status DBImpl::TEST_CompactMemTable() { MutexLock l(&mutex_); + LoggerId self; + AcquireLoggingResponsibility(&self); Status s = MakeRoomForWrite(true /* force compaction */); + ReleaseLoggingResponsibility(&self); if (s.ok()) { // Wait until the compaction completes while (imm_ != NULL && bg_error_.ok()) { @@ -600,7 +610,7 @@ void DBImpl::BackgroundCompaction() { c->edit()->DeleteFile(c->level(), f->number); c->edit()->AddFile(c->level() + 1, f->number, f->file_size, f->smallest, f->largest); - status = versions_->LogAndApply(c->edit()); + status = versions_->LogAndApply(c->edit(), &mutex_); VersionSet::LevelSummaryStorage tmp; Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n", static_cast(f->number), @@ -748,7 +758,7 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) { } compact->outputs.clear(); - Status s = versions_->LogAndApply(compact->compaction->edit()); + Status s = versions_->LogAndApply(compact->compaction->edit(), &mutex_); if (s.ok()) { compact->compaction->ReleaseInputs(); DeleteObsoleteFiles(); @@ -1004,9 +1014,9 @@ Status DBImpl::Get(const ReadOptions& options, mutex_.Unlock(); // First look in the memtable, then in the immutable memtable (if any). LookupKey lkey(key, snapshot); - if (mem_->Get(lkey, value, &s)) { + if (mem->Get(lkey, value, &s)) { // Done - } else if (imm_ != NULL && imm_->Get(lkey, value, &s)) { + } else if (imm != NULL && imm->Get(lkey, value, &s)) { // Done } else { s = current->Get(options, lkey, value, &stats); @@ -1053,34 +1063,65 @@ Status DBImpl::Delete(const WriteOptions& options, const Slice& key) { return DB::Delete(options, key); } +// There is at most one thread that is the current logger. This call +// waits until preceding logger(s) have finished and becomes the +// current logger. +void DBImpl::AcquireLoggingResponsibility(LoggerId* self) { + while (logger_ != NULL) { + logger_cv_.Wait(); + } + logger_ = self; +} + +void DBImpl::ReleaseLoggingResponsibility(LoggerId* self) { + assert(logger_ == self); + logger_ = NULL; + logger_cv_.SignalAll(); +} + Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) { Status status; MutexLock l(&mutex_); + LoggerId self; + AcquireLoggingResponsibility(&self); status = MakeRoomForWrite(false); // May temporarily release lock and wait uint64_t last_sequence = versions_->LastSequence(); if (status.ok()) { WriteBatchInternal::SetSequence(updates, last_sequence + 1); last_sequence += WriteBatchInternal::Count(updates); - versions_->SetLastSequence(last_sequence); - // Add to log and apply to memtable - status = log_->AddRecord(WriteBatchInternal::Contents(updates)); - if (status.ok() && options.sync) { - status = logfile_->Sync(); - } - if (status.ok()) { - status = WriteBatchInternal::InsertInto(updates, mem_); + // Add to log and apply to memtable. We can release the lock during + // this phase since the "logger_" flag protects against concurrent + // loggers and concurrent writes into mem_. + { + assert(logger_ == &self); + mutex_.Unlock(); + status = log_->AddRecord(WriteBatchInternal::Contents(updates)); + if (status.ok() && options.sync) { + status = logfile_->Sync(); + } + if (status.ok()) { + status = WriteBatchInternal::InsertInto(updates, mem_); + } + mutex_.Lock(); + assert(logger_ == &self); } + + versions_->SetLastSequence(last_sequence); } if (options.post_write_snapshot != NULL) { *options.post_write_snapshot = status.ok() ? snapshots_.New(last_sequence) : NULL; } + ReleaseLoggingResponsibility(&self); return status; } +// REQUIRES: mutex_ is held +// REQUIRES: this thread is the current logger Status DBImpl::MakeRoomForWrite(bool force) { mutex_.AssertHeld(); + assert(logger_ != NULL); bool allow_delay = !force; Status s; while (true) { @@ -1249,7 +1290,7 @@ Status DB::Open(const Options& options, const std::string& dbname, impl->logfile_ = lfile; impl->logfile_number_ = new_log_number; impl->log_ = new log::Writer(lfile); - s = impl->versions_->LogAndApply(&edit); + s = impl->versions_->LogAndApply(&edit, &impl->mutex_); } if (s.ok()) { impl->DeleteObsoleteFiles(); -- cgit v1.2.1