summaryrefslogtreecommitdiff
path: root/db/db_impl.cc
diff options
context:
space:
mode:
Diffstat (limited to 'db/db_impl.cc')
-rw-r--r--db/db_impl.cc64
1 files changed, 46 insertions, 18 deletions
diff --git a/db/db_impl.cc b/db/db_impl.cc
index 3b9e04e..baf9299 100644
--- a/db/db_impl.cc
+++ b/db/db_impl.cc
@@ -126,6 +126,7 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
log_(NULL),
bg_compaction_scheduled_(false),
compacting_(false) {
+ mem_->Ref();
has_imm_.Release_Store(NULL);
// Reserve ten files or so for other uses and give the rest to TableCache.
@@ -152,8 +153,8 @@ DBImpl::~DBImpl() {
}
delete versions_;
- delete mem_;
- delete imm_;
+ if (mem_ != NULL) mem_->Unref();
+ if (imm_ != NULL) imm_->Unref();
delete log_;
delete logfile_;
delete table_cache_;
@@ -344,7 +345,8 @@ Status DBImpl::RecoverLogFile(uint64_t log_number,
// paranoid_checks==false so that corruptions cause entire commits
// to be skipped instead of propagating bad information (like overly
// large sequence numbers).
- log::Reader reader(file, &reporter, true/*checksum*/);
+ log::Reader reader(file, &reporter, true/*checksum*/,
+ 0/*initial_offset*/);
Log(env_, options_.info_log, "Recovering log #%llu",
(unsigned long long) log_number);
@@ -364,6 +366,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number,
if (mem == NULL) {
mem = new MemTable(internal_comparator_);
+ mem->Ref();
}
status = WriteBatchInternal::InsertInto(&batch, mem);
MaybeIgnoreError(&status);
@@ -384,7 +387,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number,
// file-systems cause the DB::Open() to fail.
break;
}
- delete mem;
+ mem->Unref();
mem = NULL;
}
}
@@ -395,7 +398,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number,
// file-systems cause the DB::Open() to fail.
}
- delete mem;
+ if (mem != NULL) mem->Unref();
delete file;
return status;
}
@@ -443,11 +446,12 @@ Status DBImpl::CompactMemTable() {
// Replace immutable memtable with the generated Table
if (s.ok()) {
edit.SetPrevLogNumber(0);
- s = versions_->LogAndApply(&edit, imm_);
+ s = versions_->LogAndApply(&edit);
}
if (s.ok()) {
// Commit to the new state
+ imm_->Unref();
imm_ = NULL;
has_imm_.Release_Store(NULL);
DeleteObsoleteFiles();
@@ -556,7 +560,7 @@ void DBImpl::BackgroundCompaction() {
c->edit()->DeleteFile(c->level(), f->number);
c->edit()->AddFile(c->level() + 1, f->number, f->file_size,
f->smallest, f->largest);
- status = versions_->LogAndApply(c->edit(), NULL);
+ status = versions_->LogAndApply(c->edit());
Log(env_, options_.info_log, "Moved #%lld to level-%d %lld bytes %s\n",
static_cast<unsigned long long>(f->number),
c->level() + 1,
@@ -697,7 +701,7 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) {
}
compact->outputs.clear();
- Status s = versions_->LogAndApply(compact->compaction->edit(), NULL);
+ Status s = versions_->LogAndApply(compact->compaction->edit());
if (s.ok()) {
compact->compaction->ReleaseInputs();
DeleteObsoleteFiles();
@@ -754,9 +758,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
}
Slice key = input->key();
- InternalKey tmp_internal_key;
- tmp_internal_key.DecodeFrom(key);
- if (compact->compaction->ShouldStopBefore(tmp_internal_key) &&
+ if (compact->compaction->ShouldStopBefore(key) &&
compact->builder != NULL) {
status = FinishCompactionOutputFile(compact, input);
if (!status.ok()) {
@@ -867,6 +869,9 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
}
compacting_ = false;
compacting_cv_.SignalAll();
+ VersionSet::LevelSummaryStorage tmp;
+ Log(env_, options_.info_log,
+ "compacted to: %s", versions_->LevelSummary(&tmp));
return status;
}
@@ -925,10 +930,11 @@ Status DBImpl::Get(const ReadOptions& options,
Iterator* DBImpl::NewIterator(const ReadOptions& options) {
SequenceNumber latest_snapshot;
Iterator* internal_iter = NewInternalIterator(options, &latest_snapshot);
- SequenceNumber sequence =
- (options.snapshot ? options.snapshot->number_ : latest_snapshot);
- return NewDBIterator(&dbname_, env_,
- user_comparator(), internal_iter, sequence);
+ return NewDBIterator(
+ &dbname_, env_, user_comparator(), internal_iter,
+ (options.snapshot != NULL
+ ? reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_
+ : latest_snapshot));
}
void DBImpl::Unref(void* arg1, void* arg2) {
@@ -945,7 +951,7 @@ const Snapshot* DBImpl::GetSnapshot() {
void DBImpl::ReleaseSnapshot(const Snapshot* s) {
MutexLock l(&mutex_);
- snapshots_.Delete(s);
+ snapshots_.Delete(reinterpret_cast<const SnapshotImpl*>(s));
}
// Convenience methods
@@ -985,12 +991,26 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
Status DBImpl::MakeRoomForWrite(bool force) {
mutex_.AssertHeld();
+ bool allow_delay = !force;
Status s;
while (true) {
if (!bg_error_.ok()) {
// Yield previous error
s = bg_error_;
break;
+ } else if (
+ allow_delay &&
+ versions_->NumLevelFiles(0) >= config::kL0_SlowdownWritesTrigger) {
+ // We are getting close to hitting a hard limit on the number of
+ // L0 files. Rather than delaying a single write by several
+ // seconds when we hit the hard limit, start delaying each
+ // individual write by 1ms to reduce latency variance. Also,
+ // this delay hands over some CPU to the compaction thread in
+ // case it is sharing the same core as the writer.
+ mutex_.Unlock();
+ env_->SleepForMicroseconds(1000);
+ allow_delay = false; // Do not delay a single write more than once
+ mutex_.Lock();
} else if (!force &&
(mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) {
// There is room in current memtable
@@ -999,6 +1019,9 @@ Status DBImpl::MakeRoomForWrite(bool force) {
// We have filled up the current memtable, but the previous
// one is still being compacted, so we wait.
compacting_cv_.Wait();
+ } else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) {
+ // There are too many level-0 files.
+ compacting_cv_.Wait();
} else {
// Attempt to switch to a new memtable and trigger compaction of old
assert(versions_->PrevLogNumber() == 0);
@@ -1011,7 +1034,7 @@ Status DBImpl::MakeRoomForWrite(bool force) {
VersionEdit edit;
edit.SetPrevLogNumber(versions_->LogNumber());
edit.SetLogNumber(new_log_number);
- s = versions_->LogAndApply(&edit, NULL);
+ s = versions_->LogAndApply(&edit);
if (!s.ok()) {
delete lfile;
env_->DeleteFile(LogFileName(dbname_, new_log_number));
@@ -1024,6 +1047,7 @@ Status DBImpl::MakeRoomForWrite(bool force) {
imm_ = mem_;
has_imm_.Release_Store(imm_);
mem_ = new MemTable(internal_comparator_);
+ mem_->Ref();
force = false; // Do not force another compaction if have room
MaybeScheduleCompaction();
}
@@ -1141,10 +1165,11 @@ Status DB::Open(const Options& options, const std::string& dbname,
edit.SetLogNumber(new_log_number);
impl->logfile_ = lfile;
impl->log_ = new log::Writer(lfile);
- s = impl->versions_->LogAndApply(&edit, NULL);
+ s = impl->versions_->LogAndApply(&edit);
}
if (s.ok()) {
impl->DeleteObsoleteFiles();
+ impl->MaybeScheduleCompaction();
}
}
impl->mutex_.Unlock();
@@ -1156,6 +1181,9 @@ Status DB::Open(const Options& options, const std::string& dbname,
return s;
}
+Snapshot::~Snapshot() {
+}
+
Status DestroyDB(const std::string& dbname, const Options& options) {
Env* env = options.env;
std::vector<std::string> filenames;