summaryrefslogtreecommitdiff
path: root/db/db_impl.cc
diff options
context:
space:
mode:
Diffstat (limited to 'db/db_impl.cc')
-rw-r--r--db/db_impl.cc179
1 files changed, 74 insertions, 105 deletions
diff --git a/db/db_impl.cc b/db/db_impl.cc
index caef2b1..bff2d62 100644
--- a/db/db_impl.cc
+++ b/db/db_impl.cc
@@ -75,7 +75,7 @@ struct DBImpl::CompactionState {
uint64_t total_bytes;
- Output* current_output() { return &outputs[outputs.size()-1]; }
+ Output* current_output() { return &outputs[outputs.size() - 1]; }
explicit CompactionState(Compaction* c)
: compaction(c),
@@ -98,10 +98,10 @@ Options SanitizeOptions(const std::string& dbname,
Options result = src;
result.comparator = icmp;
result.filter_policy = (src.filter_policy != nullptr) ? ipolicy : nullptr;
- ClipToRange(&result.max_open_files, 64 + kNumNonTableCacheFiles, 50000);
- ClipToRange(&result.write_buffer_size, 64<<10, 1<<30);
- ClipToRange(&result.max_file_size, 1<<20, 1<<30);
- ClipToRange(&result.block_size, 1<<10, 4<<20);
+ ClipToRange(&result.max_open_files, 64 + kNumNonTableCacheFiles, 50000);
+ ClipToRange(&result.write_buffer_size, 64 << 10, 1 << 30);
+ ClipToRange(&result.max_file_size, 1 << 20, 1 << 30);
+ ClipToRange(&result.block_size, 1 << 10, 4 << 20);
if (result.info_log == nullptr) {
// Open a log file in the same directory as the db
src.env->CreateDir(dbname); // In case it does not exist
@@ -268,8 +268,7 @@ void DBImpl::DeleteObsoleteFiles() {
if (type == kTableFile) {
table_cache_->Evict(number);
}
- Log(options_.info_log, "Delete type=%d #%lld\n",
- static_cast<int>(type),
+ Log(options_.info_log, "Delete type=%d #%lld\n", static_cast<int>(type),
static_cast<unsigned long long>(number));
env_->DeleteFile(dbname_ + "/" + filenames[i]);
}
@@ -277,7 +276,7 @@ void DBImpl::DeleteObsoleteFiles() {
}
}
-Status DBImpl::Recover(VersionEdit* edit, bool *save_manifest) {
+Status DBImpl::Recover(VersionEdit* edit, bool* save_manifest) {
mutex_.AssertHeld();
// Ignore error from CreateDir since the creation of the DB is
@@ -302,8 +301,8 @@ Status DBImpl::Recover(VersionEdit* edit, bool *save_manifest) {
}
} else {
if (options_.error_if_exists) {
- return Status::InvalidArgument(
- dbname_, "exists (error_if_exists is true)");
+ return Status::InvalidArgument(dbname_,
+ "exists (error_if_exists is true)");
}
}
@@ -378,8 +377,8 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, bool last_log,
Status* status; // null if options_.paranoid_checks==false
virtual void Corruption(size_t bytes, const Status& s) {
Log(info_log, "%s%s: dropping %d bytes; %s",
- (this->status == nullptr ? "(ignoring error) " : ""),
- fname, static_cast<int>(bytes), s.ToString().c_str());
+ (this->status == nullptr ? "(ignoring error) " : ""), fname,
+ static_cast<int>(bytes), s.ToString().c_str());
if (this->status != nullptr && this->status->ok()) *this->status = s;
}
};
@@ -405,10 +404,9 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, bool last_log,
// paranoid_checks==false so that corruptions cause entire commits
// to be skipped instead of propagating bad information (like overly
// large sequence numbers).
- log::Reader reader(file, &reporter, true/*checksum*/,
- 0/*initial_offset*/);
+ log::Reader reader(file, &reporter, true /*checksum*/, 0 /*initial_offset*/);
Log(options_.info_log, "Recovering log #%llu",
- (unsigned long long) log_number);
+ (unsigned long long)log_number);
// Read all the records and add to a memtable
std::string scratch;
@@ -416,11 +414,10 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, bool last_log,
WriteBatch batch;
int compactions = 0;
MemTable* mem = nullptr;
- while (reader.ReadRecord(&record, &scratch) &&
- status.ok()) {
+ while (reader.ReadRecord(&record, &scratch) && status.ok()) {
if (record.size() < 12) {
- reporter.Corruption(
- record.size(), Status::Corruption("log record too small"));
+ reporter.Corruption(record.size(),
+ Status::Corruption("log record too small"));
continue;
}
WriteBatchInternal::SetContents(&batch, record);
@@ -434,9 +431,8 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, bool last_log,
if (!status.ok()) {
break;
}
- const SequenceNumber last_seq =
- WriteBatchInternal::Sequence(&batch) +
- WriteBatchInternal::Count(&batch) - 1;
+ const SequenceNumber last_seq = WriteBatchInternal::Sequence(&batch) +
+ WriteBatchInternal::Count(&batch) - 1;
if (last_seq > *max_sequence) {
*max_sequence = last_seq;
}
@@ -500,7 +496,7 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
pending_outputs_.insert(meta.number);
Iterator* iter = mem->NewIterator();
Log(options_.info_log, "Level-0 table #%llu: started",
- (unsigned long long) meta.number);
+ (unsigned long long)meta.number);
Status s;
{
@@ -510,13 +506,11 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
}
Log(options_.info_log, "Level-0 table #%llu: %lld bytes %s",
- (unsigned long long) meta.number,
- (unsigned long long) meta.file_size,
+ (unsigned long long)meta.number, (unsigned long long)meta.file_size,
s.ToString().c_str());
delete iter;
pending_outputs_.erase(meta.number);
-
// Note that if file_size is zero, the file has been deleted and
// should not be added to the manifest.
int level = 0;
@@ -526,8 +520,8 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
if (base != nullptr) {
level = base->PickLevelForMemTableOutput(min_user_key, max_user_key);
}
- edit->AddFile(level, meta.number, meta.file_size,
- meta.smallest, meta.largest);
+ edit->AddFile(level, meta.number, meta.file_size, meta.smallest,
+ meta.largest);
}
CompactionStats stats;
@@ -658,8 +652,7 @@ void DBImpl::MaybeScheduleCompaction() {
// DB is being deleted; no more background compactions
} else if (!bg_error_.ok()) {
// Already got an error; no more changes
- } else if (imm_ == nullptr &&
- manual_compaction_ == nullptr &&
+ } else if (imm_ == nullptr && manual_compaction_ == nullptr &&
!versions_->NeedsCompaction()) {
// No work to be done
} else {
@@ -711,8 +704,7 @@ void DBImpl::BackgroundCompaction() {
}
Log(options_.info_log,
"Manual compaction at level-%d from %s .. %s; will stop at %s\n",
- m->level,
- (m->begin ? m->begin->DebugString().c_str() : "(begin)"),
+ m->level, (m->begin ? m->begin->DebugString().c_str() : "(begin)"),
(m->end ? m->end->DebugString().c_str() : "(end)"),
(m->done ? "(end)" : manual_end.DebugString().c_str()));
} else {
@@ -727,19 +719,17 @@ void DBImpl::BackgroundCompaction() {
assert(c->num_input_files(0) == 1);
FileMetaData* f = c->input(0, 0);
c->edit()->DeleteFile(c->level(), f->number);
- c->edit()->AddFile(c->level() + 1, f->number, f->file_size,
- f->smallest, f->largest);
+ c->edit()->AddFile(c->level() + 1, f->number, f->file_size, f->smallest,
+ f->largest);
status = versions_->LogAndApply(c->edit(), &mutex_);
if (!status.ok()) {
RecordBackgroundError(status);
}
VersionSet::LevelSummaryStorage tmp;
Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n",
- static_cast<unsigned long long>(f->number),
- c->level() + 1,
+ static_cast<unsigned long long>(f->number), c->level() + 1,
static_cast<unsigned long long>(f->file_size),
- status.ToString().c_str(),
- versions_->LevelSummary(&tmp));
+ status.ToString().c_str(), versions_->LevelSummary(&tmp));
} else {
CompactionState* compact = new CompactionState(c);
status = DoCompactionWork(compact);
@@ -757,8 +747,7 @@ void DBImpl::BackgroundCompaction() {
} else if (shutting_down_.load(std::memory_order_acquire)) {
// Ignore compaction errors found during shutting down
} else {
- Log(options_.info_log,
- "Compaction error: %s", status.ToString().c_str());
+ Log(options_.info_log, "Compaction error: %s", status.ToString().c_str());
}
if (is_manual) {
@@ -853,31 +842,25 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact,
if (s.ok() && current_entries > 0) {
// Verify that the table is usable
- Iterator* iter = table_cache_->NewIterator(ReadOptions(),
- output_number,
- current_bytes);
+ Iterator* iter =
+ table_cache_->NewIterator(ReadOptions(), output_number, current_bytes);
s = iter->status();
delete iter;
if (s.ok()) {
- Log(options_.info_log,
- "Generated table #%llu@%d: %lld keys, %lld bytes",
- (unsigned long long) output_number,
- compact->compaction->level(),
- (unsigned long long) current_entries,
- (unsigned long long) current_bytes);
+ Log(options_.info_log, "Generated table #%llu@%d: %lld keys, %lld bytes",
+ (unsigned long long)output_number, compact->compaction->level(),
+ (unsigned long long)current_entries,
+ (unsigned long long)current_bytes);
}
}
return s;
}
-
Status DBImpl::InstallCompactionResults(CompactionState* compact) {
mutex_.AssertHeld();
- Log(options_.info_log, "Compacted %d@%d + %d@%d files => %lld bytes",
- compact->compaction->num_input_files(0),
- compact->compaction->level(),
- compact->compaction->num_input_files(1),
- compact->compaction->level() + 1,
+ Log(options_.info_log, "Compacted %d@%d + %d@%d files => %lld bytes",
+ compact->compaction->num_input_files(0), compact->compaction->level(),
+ compact->compaction->num_input_files(1), compact->compaction->level() + 1,
static_cast<long long>(compact->total_bytes));
// Add compaction outputs
@@ -885,9 +868,8 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) {
const int level = compact->compaction->level();
for (size_t i = 0; i < compact->outputs.size(); i++) {
const CompactionState::Output& out = compact->outputs[i];
- compact->compaction->edit()->AddFile(
- level + 1,
- out.number, out.file_size, out.smallest, out.largest);
+ compact->compaction->edit()->AddFile(level + 1, out.number, out.file_size,
+ out.smallest, out.largest);
}
return versions_->LogAndApply(compact->compaction->edit(), &mutex_);
}
@@ -896,9 +878,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
const uint64_t start_micros = env_->NowMicros();
int64_t imm_micros = 0; // Micros spent doing imm_ compactions
- Log(options_.info_log, "Compacting %d@%d + %d@%d files",
- compact->compaction->num_input_files(0),
- compact->compaction->level(),
+ Log(options_.info_log, "Compacting %d@%d + %d@%d files",
+ compact->compaction->num_input_files(0), compact->compaction->level(),
compact->compaction->num_input_files(1),
compact->compaction->level() + 1);
@@ -921,7 +902,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
std::string current_user_key;
bool has_current_user_key = false;
SequenceNumber last_sequence_for_key = kMaxSequenceNumber;
- for (; input->Valid() && !shutting_down_.load(std::memory_order_acquire); ) {
+ for (; input->Valid() && !shutting_down_.load(std::memory_order_acquire);) {
// Prioritize immutable compaction work
if (has_imm_.load(std::memory_order_relaxed)) {
const uint64_t imm_start = env_->NowMicros();
@@ -953,8 +934,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
last_sequence_for_key = kMaxSequenceNumber;
} else {
if (!has_current_user_key ||
- user_comparator()->Compare(ikey.user_key,
- Slice(current_user_key)) != 0) {
+ user_comparator()->Compare(ikey.user_key, Slice(current_user_key)) !=
+ 0) {
// First occurrence of this user key
current_user_key.assign(ikey.user_key.data(), ikey.user_key.size());
has_current_user_key = true;
@@ -963,7 +944,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
if (last_sequence_for_key <= compact->smallest_snapshot) {
// Hidden by an newer entry for same user key
- drop = true; // (A)
+ drop = true; // (A)
} else if (ikey.type == kTypeDeletion &&
ikey.sequence <= compact->smallest_snapshot &&
compact->compaction->IsBaseLevelForKey(ikey.user_key)) {
@@ -1049,8 +1030,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
RecordBackgroundError(status);
}
VersionSet::LevelSummaryStorage tmp;
- Log(options_.info_log,
- "compacted to: %s", versions_->LevelSummary(&tmp));
+ Log(options_.info_log, "compacted to: %s", versions_->LevelSummary(&tmp));
return status;
}
@@ -1063,7 +1043,7 @@ struct IterState {
MemTable* const imm GUARDED_BY(mu);
IterState(port::Mutex* mutex, MemTable* mem, MemTable* imm, Version* version)
- : mu(mutex), version(version), mem(mem), imm(imm) { }
+ : mu(mutex), version(version), mem(mem), imm(imm) {}
};
static void CleanupIteratorState(void* arg1, void* arg2) {
@@ -1116,8 +1096,7 @@ int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() {
return versions_->MaxNextLevelOverlappingBytes();
}
-Status DBImpl::Get(const ReadOptions& options,
- const Slice& key,
+Status DBImpl::Get(const ReadOptions& options, const Slice& key,
std::string* value) {
Status s;
MutexLock l(&mutex_);
@@ -1168,12 +1147,12 @@ Iterator* DBImpl::NewIterator(const ReadOptions& options) {
SequenceNumber latest_snapshot;
uint32_t seed;
Iterator* iter = NewInternalIterator(options, &latest_snapshot, &seed);
- return NewDBIterator(
- this, user_comparator(), iter,
- (options.snapshot != nullptr
- ? static_cast<const SnapshotImpl*>(options.snapshot)->sequence_number()
- : latest_snapshot),
- seed);
+ return NewDBIterator(this, user_comparator(), iter,
+ (options.snapshot != nullptr
+ ? static_cast<const SnapshotImpl*>(options.snapshot)
+ ->sequence_number()
+ : latest_snapshot),
+ seed);
}
void DBImpl::RecordReadSample(Slice key) {
@@ -1202,9 +1181,9 @@ Status DBImpl::Delete(const WriteOptions& options, const Slice& key) {
return DB::Delete(options, key);
}
-Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
+Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
Writer w(&mutex_);
- w.batch = my_batch;
+ w.batch = updates;
w.sync = options.sync;
w.done = false;
@@ -1218,10 +1197,10 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
}
// May temporarily unlock and wait.
- Status status = MakeRoomForWrite(my_batch == nullptr);
+ Status status = MakeRoomForWrite(updates == nullptr);
uint64_t last_sequence = versions_->LastSequence();
Writer* last_writer = &w;
- if (status.ok() && my_batch != nullptr) { // nullptr batch is for compactions
+ if (status.ok() && updates != nullptr) { // nullptr batch is for compactions
WriteBatch* updates = BuildBatchGroup(&last_writer);
WriteBatchInternal::SetSequence(updates, last_sequence + 1);
last_sequence += WriteBatchInternal::Count(updates);
@@ -1290,8 +1269,8 @@ WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) {
// original write is small, limit the growth so we do not slow
// down the small write too much.
size_t max_size = 1 << 20;
- if (size <= (128<<10)) {
- max_size = size + (128<<10);
+ if (size <= (128 << 10)) {
+ max_size = size + (128 << 10);
}
*last_writer = first;
@@ -1337,9 +1316,8 @@ Status DBImpl::MakeRoomForWrite(bool force) {
// Yield previous error
s = bg_error_;
break;
- } else if (
- allow_delay &&
- versions_->NumLevelFiles(0) >= config::kL0_SlowdownWritesTrigger) {
+ } else if (allow_delay && versions_->NumLevelFiles(0) >=
+ config::kL0_SlowdownWritesTrigger) {
// We are getting close to hitting a hard limit on the number of
// L0 files. Rather than delaying a single write by several
// seconds when we hit the hard limit, start delaying each
@@ -1383,7 +1361,7 @@ Status DBImpl::MakeRoomForWrite(bool force) {
has_imm_.store(true, std::memory_order_release);
mem_ = new MemTable(internal_comparator_);
mem_->Ref();
- force = false; // Do not force another compaction if have room
+ force = false; // Do not force another compaction if have room
MaybeScheduleCompaction();
}
}
@@ -1417,21 +1395,16 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) {
snprintf(buf, sizeof(buf),
" Compactions\n"
"Level Files Size(MB) Time(sec) Read(MB) Write(MB)\n"
- "--------------------------------------------------\n"
- );
+ "--------------------------------------------------\n");
value->append(buf);
for (int level = 0; level < config::kNumLevels; level++) {
int files = versions_->NumLevelFiles(level);
if (stats_[level].micros > 0 || files > 0) {
- snprintf(
- buf, sizeof(buf),
- "%3d %8d %8.0f %9.0f %8.0f %9.0f\n",
- level,
- files,
- versions_->NumLevelBytes(level) / 1048576.0,
- stats_[level].micros / 1e6,
- stats_[level].bytes_read / 1048576.0,
- stats_[level].bytes_written / 1048576.0);
+ snprintf(buf, sizeof(buf), "%3d %8d %8.0f %9.0f %8.0f %9.0f\n", level,
+ files, versions_->NumLevelBytes(level) / 1048576.0,
+ stats_[level].micros / 1e6,
+ stats_[level].bytes_read / 1048576.0,
+ stats_[level].bytes_written / 1048576.0);
value->append(buf);
}
}
@@ -1457,9 +1430,7 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) {
return false;
}
-void DBImpl::GetApproximateSizes(
- const Range* range, int n,
- uint64_t* sizes) {
+void DBImpl::GetApproximateSizes(const Range* range, int n, uint64_t* sizes) {
// TODO(opt): better implementation
Version* v;
{
@@ -1497,10 +1468,9 @@ Status DB::Delete(const WriteOptions& opt, const Slice& key) {
return Write(opt, &batch);
}
-DB::~DB() { }
+DB::~DB() {}
-Status DB::Open(const Options& options, const std::string& dbname,
- DB** dbptr) {
+Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
*dbptr = nullptr;
DBImpl* impl = new DBImpl(options, dbname);
@@ -1543,8 +1513,7 @@ Status DB::Open(const Options& options, const std::string& dbname,
return s;
}
-Snapshot::~Snapshot() {
-}
+Snapshot::~Snapshot() {}
Status DestroyDB(const std::string& dbname, const Options& options) {
Env* env = options.env;