From 60bd8015f21fdb63d5409b1191f8ea9d8f1a1b87 Mon Sep 17 00:00:00 2001 From: "gabor@google.com" Date: Thu, 21 Jul 2011 02:40:18 +0000 Subject: Speed up Snappy uncompression, new Logger interface. - Removed one copy of an uncompressed block contents changing the signature of Snappy_Uncompress() so it uncompresses into a flat array instead of a std::string. Speeds up readrandom ~10%. - Instead of a combination of Env/WritableFile, we now have a Logger interface that can be easily overridden applications that want to supply their own logging. - Separated out the gcc and Sun Studio parts of atomic_pointer.h so we can use 'asm', 'volatile' keywords for Sun Studio. git-svn-id: https://leveldb.googlecode.com/svn/trunk@39 62dab493-f737-651d-591e-8d6aee1b9529 --- db/db_bench.cc | 7 ++++--- db/db_impl.cc | 45 +++++++++++++++++---------------------------- db/repair.cc | 20 ++++++++++---------- db/version_set.cc | 4 ++-- 4 files changed, 33 insertions(+), 43 deletions(-) (limited to 'db') diff --git a/db/db_bench.cc b/db/db_bench.cc index 53b8c53..7b4e41a 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -472,13 +472,14 @@ class Benchmark { std::string compressed; bool ok = port::Snappy_Compress(input.data(), input.size(), &compressed); int64_t bytes = 0; - std::string uncompressed; + char* uncompressed = new char[input.size()]; while (ok && bytes < 1024 * 1048576) { // Compress 1G ok = port::Snappy_Uncompress(compressed.data(), compressed.size(), - &uncompressed); - bytes += uncompressed.size(); + uncompressed); + bytes += input.size(); FinishedSingleOp(); } + delete[] uncompressed; if (!ok) { message_ = "(snappy failure)"; diff --git a/db/db_impl.cc b/db/db_impl.cc index 48056da..5a0648e 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -68,16 +68,6 @@ struct DBImpl::CompactionState { } }; -namespace { -class NullWritableFile : public WritableFile { - public: - virtual Status Append(const Slice& data) { return Status::OK(); } - virtual Status Close() { return Status::OK(); } - virtual Status Flush() { return Status::OK(); } - virtual Status Sync() { return Status::OK(); } -}; -} - // Fix user-supplied options to be reasonable template static void ClipToRange(T* ptr, V minvalue, V maxvalue) { @@ -96,11 +86,10 @@ Options SanitizeOptions(const std::string& dbname, // Open a log file in the same directory as the db src.env->CreateDir(dbname); // In case it does not exist src.env->RenameFile(InfoLogFileName(dbname), OldInfoLogFileName(dbname)); - Status s = src.env->NewWritableFile(InfoLogFileName(dbname), - &result.info_log); + Status s = src.env->NewLogger(InfoLogFileName(dbname), &result.info_log); if (!s.ok()) { // No place suitable for logging - result.info_log = new NullWritableFile; + result.info_log = NULL; } } if (result.block_cache == NULL) { @@ -201,7 +190,7 @@ void DBImpl::MaybeIgnoreError(Status* s) const { if (s->ok() || options_.paranoid_checks) { // No change needed } else { - Log(env_, options_.info_log, "Ignoring error %s", s->ToString().c_str()); + Log(options_.info_log, "Ignoring error %s", s->ToString().c_str()); *s = Status::OK(); } } @@ -247,7 +236,7 @@ void DBImpl::DeleteObsoleteFiles() { if (type == kTableFile) { table_cache_->Evict(number); } - Log(env_, options_.info_log, "Delete type=%d #%lld\n", + Log(options_.info_log, "Delete type=%d #%lld\n", int(type), static_cast(number)); env_->DeleteFile(dbname_ + "/" + filenames[i]); @@ -336,11 +325,11 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence) { struct LogReporter : public log::Reader::Reporter { Env* env; - WritableFile* info_log; + Logger* info_log; const char* fname; Status* status; // NULL if options_.paranoid_checks==false virtual void Corruption(size_t bytes, const Status& s) { - Log(env, info_log, "%s%s: dropping %d bytes; %s", + Log(info_log, "%s%s: dropping %d bytes; %s", (this->status == NULL ? "(ignoring error) " : ""), fname, static_cast(bytes), s.ToString().c_str()); if (this->status != NULL && this->status->ok()) *this->status = s; @@ -370,7 +359,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, // large sequence numbers). log::Reader reader(file, &reporter, true/*checksum*/, 0/*initial_offset*/); - Log(env_, options_.info_log, "Recovering log #%llu", + Log(options_.info_log, "Recovering log #%llu", (unsigned long long) log_number); // Read all the records and add to a memtable @@ -434,7 +423,7 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit, meta.number = versions_->NewFileNumber(); pending_outputs_.insert(meta.number); Iterator* iter = mem->NewIterator(); - Log(env_, options_.info_log, "Level-0 table #%llu: started", + Log(options_.info_log, "Level-0 table #%llu: started", (unsigned long long) meta.number); Status s; @@ -444,7 +433,7 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit, mutex_.Lock(); } - Log(env_, options_.info_log, "Level-0 table #%llu: %lld bytes %s", + Log(options_.info_log, "Level-0 table #%llu: %lld bytes %s", (unsigned long long) meta.number, (unsigned long long) meta.file_size, s.ToString().c_str()); @@ -613,7 +602,7 @@ void DBImpl::BackgroundCompaction() { f->smallest, f->largest); status = versions_->LogAndApply(c->edit()); VersionSet::LevelSummaryStorage tmp; - Log(env_, options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n", + Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n", static_cast(f->number), c->level() + 1, static_cast(f->file_size), @@ -631,7 +620,7 @@ void DBImpl::BackgroundCompaction() { } else if (shutting_down_.Acquire_Load()) { // Ignore compaction errors found during shutting down } else { - Log(env_, options_.info_log, + Log(options_.info_log, "Compaction error: %s", status.ToString().c_str()); if (options_.paranoid_checks && bg_error_.ok()) { bg_error_ = status; @@ -727,7 +716,7 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact, s = iter->status(); delete iter; if (s.ok()) { - Log(env_, options_.info_log, + Log(options_.info_log, "Generated table #%llu: %lld keys, %lld bytes", (unsigned long long) output_number, (unsigned long long) current_entries, @@ -740,7 +729,7 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact, Status DBImpl::InstallCompactionResults(CompactionState* compact) { mutex_.AssertHeld(); - Log(env_, options_.info_log, "Compacted %d@%d + %d@%d files => %lld bytes", + Log(options_.info_log, "Compacted %d@%d + %d@%d files => %lld bytes", compact->compaction->num_input_files(0), compact->compaction->level(), compact->compaction->num_input_files(1), @@ -776,7 +765,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { const uint64_t start_micros = env_->NowMicros(); int64_t imm_micros = 0; // Micros spent doing imm_ compactions - Log(env_, options_.info_log, "Compacting %d@%d + %d@%d files", + Log(options_.info_log, "Compacting %d@%d + %d@%d files", compact->compaction->num_input_files(0), compact->compaction->level(), compact->compaction->num_input_files(1), @@ -859,7 +848,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { last_sequence_for_key = ikey.sequence; } #if 0 - Log(env_, options_.info_log, + Log(options_.info_log, " Compact: %s, seq %d, type: %d %d, drop: %d, is_base: %d, " "%d smallest_snapshot: %d", ikey.user_key.ToString().c_str(), @@ -925,7 +914,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { status = InstallCompactionResults(compact); } VersionSet::LevelSummaryStorage tmp; - Log(env_, options_.info_log, + Log(options_.info_log, "compacted to: %s", versions_->LevelSummary(&tmp)); return status; } @@ -1112,7 +1101,7 @@ Status DBImpl::MakeRoomForWrite(bool force) { bg_cv_.Wait(); } else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) { // There are too many level-0 files. - Log(env_, options_.info_log, "waiting...\n"); + Log(options_.info_log, "waiting...\n"); bg_cv_.Wait(); } else { // Attempt to switch to a new memtable and trigger compaction of old diff --git a/db/repair.cc b/db/repair.cc index 2e3f506..5bcdb56 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -78,7 +78,7 @@ class Repairer { for (size_t i = 0; i < tables_.size(); i++) { bytes += tables_[i].meta.file_size; } - Log(env_, options_.info_log, + Log(options_.info_log, "**** Repaired leveldb %s; " "recovered %d files; %llu bytes. " "Some data may have been lost. " @@ -149,7 +149,7 @@ class Repairer { std::string logname = LogFileName(dbname_, logs_[i]); Status status = ConvertLogToTable(logs_[i]); if (!status.ok()) { - Log(env_, options_.info_log, "Log #%llu: ignoring conversion error: %s", + Log(options_.info_log, "Log #%llu: ignoring conversion error: %s", (unsigned long long) logs_[i], status.ToString().c_str()); } @@ -160,11 +160,11 @@ class Repairer { Status ConvertLogToTable(uint64_t log) { struct LogReporter : public log::Reader::Reporter { Env* env; - WritableFile* info_log; + Logger* info_log; uint64_t lognum; virtual void Corruption(size_t bytes, const Status& s) { // We print error messages for corruption, but continue repairing. - Log(env, info_log, "Log #%llu: dropping %d bytes; %s", + Log(info_log, "Log #%llu: dropping %d bytes; %s", (unsigned long long) lognum, static_cast(bytes), s.ToString().c_str()); @@ -209,7 +209,7 @@ class Repairer { if (status.ok()) { counter += WriteBatchInternal::Count(&batch); } else { - Log(env_, options_.info_log, "Log #%llu: ignoring %s", + Log(options_.info_log, "Log #%llu: ignoring %s", (unsigned long long) log, status.ToString().c_str()); status = Status::OK(); // Keep going with rest of file @@ -231,7 +231,7 @@ class Repairer { table_numbers_.push_back(meta.number); } } - Log(env_, options_.info_log, "Log #%llu: %d ops saved to Table #%llu %s", + Log(options_.info_log, "Log #%llu: %d ops saved to Table #%llu %s", (unsigned long long) log, counter, (unsigned long long) meta.number, @@ -247,7 +247,7 @@ class Repairer { Status status = ScanTable(&t); if (!status.ok()) { std::string fname = TableFileName(dbname_, table_numbers_[i]); - Log(env_, options_.info_log, "Table #%llu: ignoring %s", + Log(options_.info_log, "Table #%llu: ignoring %s", (unsigned long long) table_numbers_[i], status.ToString().c_str()); ArchiveFile(fname); @@ -270,7 +270,7 @@ class Repairer { for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { Slice key = iter->key(); if (!ParseInternalKey(key, &parsed)) { - Log(env_, options_.info_log, "Table #%llu: unparsable key %s", + Log(options_.info_log, "Table #%llu: unparsable key %s", (unsigned long long) t->meta.number, EscapeString(key).c_str()); continue; @@ -291,7 +291,7 @@ class Repairer { } delete iter; } - Log(env_, options_.info_log, "Table #%llu: %d entries %s", + Log(options_.info_log, "Table #%llu: %d entries %s", (unsigned long long) t->meta.number, counter, status.ToString().c_str()); @@ -373,7 +373,7 @@ class Repairer { new_file.append("/"); new_file.append((slash == NULL) ? fname.c_str() : slash + 1); Status s = env_->RenameFile(fname, new_file); - Log(env_, options_.info_log, "Archiving %s: %s\n", + Log(options_.info_log, "Archiving %s: %s\n", fname.c_str(), s.ToString().c_str()); } }; diff --git a/db/version_set.cc b/db/version_set.cc index 62bd6dd..5040b72 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1124,7 +1124,7 @@ void VersionSet::SetupOtherInputs(Compaction* c) { std::vector expanded1; GetOverlappingInputs(level+1, new_start, new_limit, &expanded1); if (expanded1.size() == c->inputs_[1].size()) { - Log(env_, options_->info_log, + Log(options_->info_log, "Expanding@%d %d+%d to %d+%d\n", level, int(c->inputs_[0].size()), @@ -1147,7 +1147,7 @@ void VersionSet::SetupOtherInputs(Compaction* c) { } if (false) { - Log(env_, options_->info_log, "Compacting %d '%s' .. '%s'", + Log(options_->info_log, "Compacting %d '%s' .. '%s'", level, EscapeString(smallest.Encode()).c_str(), EscapeString(largest.Encode()).c_str()); -- cgit v1.2.1