summaryrefslogtreecommitdiff
path: root/db/db_impl.cc
diff options
context:
space:
mode:
authorgabor@google.com <gabor@google.com@62dab493-f737-651d-591e-8d6aee1b9529>2011-09-01 19:08:02 +0000
committergabor@google.com <gabor@google.com@62dab493-f737-651d-591e-8d6aee1b9529>2011-09-01 19:08:02 +0000
commit72630236513e7384cb0a2e8fffcae232135a5adc (patch)
treeb6afaaf0c59ce4d17d52e236bb73907fcd58070c /db/db_impl.cc
parente3584f9c28833ec0530b39540ffd406ee41dbc3a (diff)
downloadleveldb-72630236513e7384cb0a2e8fffcae232135a5adc.tar.gz
Bugfixes: for Get(), don't hold mutex while writing log.
- Fix bug in Get: when it triggers a compaction, it could sometimes mark the compaction with the wrong level (if there was a gap in the set of levels examined for the Get). - Do not hold mutex while writing to the log file or to the MANIFEST file. Added a new benchmark that runs a writer thread concurrently with reader threads. Percentiles ------------------------------ micros/op: avg median 99 99.9 99.99 99.999 max ------------------------------------------------------ before: 42 38 110 225 32000 42000 48000 after: 24 20 55 65 130 1100 7000 - Fixed race in optimized Get. It should have been using the pinned memtables, not the current memtables. git-svn-id: https://leveldb.googlecode.com/svn/trunk@50 62dab493-f737-651d-591e-8d6aee1b9529
Diffstat (limited to 'db/db_impl.cc')
-rw-r--r--db/db_impl.cc69
1 files changed, 55 insertions, 14 deletions
diff --git a/db/db_impl.cc b/db/db_impl.cc
index c4c6a61..0ca6386 100644
--- a/db/db_impl.cc
+++ b/db/db_impl.cc
@@ -113,6 +113,8 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
logfile_(NULL),
logfile_number_(0),
log_(NULL),
+ logger_(NULL),
+ logger_cv_(&mutex_),
bg_compaction_scheduled_(false),
manual_compaction_(NULL) {
mem_->Ref();
@@ -308,6 +310,11 @@ Status DBImpl::Recover(VersionEdit* edit) {
std::sort(logs.begin(), logs.end());
for (size_t i = 0; i < logs.size(); i++) {
s = RecoverLogFile(logs[i], edit, &max_sequence);
+
+ // The previous incarnation may not have written any MANIFEST
+ // records after allocating this log number. So we manually
+ // update the file number allocation counter in VersionSet.
+ versions_->MarkFileNumberUsed(logs[i]);
}
if (s.ok()) {
@@ -485,7 +492,7 @@ Status DBImpl::CompactMemTable() {
if (s.ok()) {
edit.SetPrevLogNumber(0);
edit.SetLogNumber(logfile_number_); // Earlier logs no longer needed
- s = versions_->LogAndApply(&edit);
+ s = versions_->LogAndApply(&edit, &mutex_);
}
if (s.ok()) {
@@ -523,7 +530,10 @@ void DBImpl::TEST_CompactRange(
Status DBImpl::TEST_CompactMemTable() {
MutexLock l(&mutex_);
+ LoggerId self;
+ AcquireLoggingResponsibility(&self);
Status s = MakeRoomForWrite(true /* force compaction */);
+ ReleaseLoggingResponsibility(&self);
if (s.ok()) {
// Wait until the compaction completes
while (imm_ != NULL && bg_error_.ok()) {
@@ -600,7 +610,7 @@ void DBImpl::BackgroundCompaction() {
c->edit()->DeleteFile(c->level(), f->number);
c->edit()->AddFile(c->level() + 1, f->number, f->file_size,
f->smallest, f->largest);
- status = versions_->LogAndApply(c->edit());
+ status = versions_->LogAndApply(c->edit(), &mutex_);
VersionSet::LevelSummaryStorage tmp;
Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n",
static_cast<unsigned long long>(f->number),
@@ -748,7 +758,7 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) {
}
compact->outputs.clear();
- Status s = versions_->LogAndApply(compact->compaction->edit());
+ Status s = versions_->LogAndApply(compact->compaction->edit(), &mutex_);
if (s.ok()) {
compact->compaction->ReleaseInputs();
DeleteObsoleteFiles();
@@ -1004,9 +1014,9 @@ Status DBImpl::Get(const ReadOptions& options,
mutex_.Unlock();
// First look in the memtable, then in the immutable memtable (if any).
LookupKey lkey(key, snapshot);
- if (mem_->Get(lkey, value, &s)) {
+ if (mem->Get(lkey, value, &s)) {
// Done
- } else if (imm_ != NULL && imm_->Get(lkey, value, &s)) {
+ } else if (imm != NULL && imm->Get(lkey, value, &s)) {
// Done
} else {
s = current->Get(options, lkey, value, &stats);
@@ -1053,34 +1063,65 @@ Status DBImpl::Delete(const WriteOptions& options, const Slice& key) {
return DB::Delete(options, key);
}
+// There is at most one thread that is the current logger. This call
+// waits until preceding logger(s) have finished and becomes the
+// current logger.
+void DBImpl::AcquireLoggingResponsibility(LoggerId* self) {
+ while (logger_ != NULL) {
+ logger_cv_.Wait();
+ }
+ logger_ = self;
+}
+
+void DBImpl::ReleaseLoggingResponsibility(LoggerId* self) {
+ assert(logger_ == self);
+ logger_ = NULL;
+ logger_cv_.SignalAll();
+}
+
Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
Status status;
MutexLock l(&mutex_);
+ LoggerId self;
+ AcquireLoggingResponsibility(&self);
status = MakeRoomForWrite(false); // May temporarily release lock and wait
uint64_t last_sequence = versions_->LastSequence();
if (status.ok()) {
WriteBatchInternal::SetSequence(updates, last_sequence + 1);
last_sequence += WriteBatchInternal::Count(updates);
- versions_->SetLastSequence(last_sequence);
- // Add to log and apply to memtable
- status = log_->AddRecord(WriteBatchInternal::Contents(updates));
- if (status.ok() && options.sync) {
- status = logfile_->Sync();
- }
- if (status.ok()) {
- status = WriteBatchInternal::InsertInto(updates, mem_);
+ // Add to log and apply to memtable. We can release the lock during
+ // this phase since the "logger_" flag protects against concurrent
+ // loggers and concurrent writes into mem_.
+ {
+ assert(logger_ == &self);
+ mutex_.Unlock();
+ status = log_->AddRecord(WriteBatchInternal::Contents(updates));
+ if (status.ok() && options.sync) {
+ status = logfile_->Sync();
+ }
+ if (status.ok()) {
+ status = WriteBatchInternal::InsertInto(updates, mem_);
+ }
+ mutex_.Lock();
+ assert(logger_ == &self);
}
+
+ versions_->SetLastSequence(last_sequence);
}
if (options.post_write_snapshot != NULL) {
*options.post_write_snapshot =
status.ok() ? snapshots_.New(last_sequence) : NULL;
}
+ ReleaseLoggingResponsibility(&self);
return status;
}
+// REQUIRES: mutex_ is held
+// REQUIRES: this thread is the current logger
Status DBImpl::MakeRoomForWrite(bool force) {
mutex_.AssertHeld();
+ assert(logger_ != NULL);
bool allow_delay = !force;
Status s;
while (true) {
@@ -1249,7 +1290,7 @@ Status DB::Open(const Options& options, const std::string& dbname,
impl->logfile_ = lfile;
impl->logfile_number_ = new_log_number;
impl->log_ = new log::Writer(lfile);
- s = impl->versions_->LogAndApply(&edit);
+ s = impl->versions_->LogAndApply(&edit, &impl->mutex_);
}
if (s.ok()) {
impl->DeleteObsoleteFiles();