diff options
Diffstat (limited to 'db')
-rw-r--r-- | db/db_impl.cc | 68 | ||||
-rw-r--r-- | db/db_impl.h | 41 | ||||
-rw-r--r-- | db/memtable.h | 1 | ||||
-rw-r--r-- | db/repair.cc | 2 | ||||
-rw-r--r-- | db/table_cache.cc | 6 | ||||
-rw-r--r-- | db/table_cache.h | 4 |
6 files changed, 65 insertions, 57 deletions
diff --git a/db/db_impl.cc b/db/db_impl.cc index a9044c2..8484e46 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -4,12 +4,14 @@ #include "db/db_impl.h" +#include <stdint.h> +#include <stdio.h> + #include <algorithm> #include <set> #include <string> -#include <stdint.h> -#include <stdio.h> #include <vector> + #include "db/builder.h" #include "db/db_iter.h" #include "db/dbformat.h" @@ -82,7 +84,7 @@ struct DBImpl::CompactionState { }; // Fix user-supplied options to be reasonable -template <class T,class V> +template <class T, class V> static void ClipToRange(T* ptr, V minvalue, V maxvalue) { if (static_cast<V>(*ptr) > maxvalue) *ptr = maxvalue; if (static_cast<V>(*ptr) < minvalue) *ptr = minvalue; @@ -114,6 +116,11 @@ Options SanitizeOptions(const std::string& dbname, return result; } +static int TableCacheSize(const Options& sanitized_options) { + // Reserve ten files or so for other uses and give the rest to TableCache. + return sanitized_options.max_open_files - kNumNonTableCacheFiles; +} + DBImpl::DBImpl(const Options& raw_options, const std::string& dbname) : env_(raw_options.env), internal_comparator_(raw_options.comparator), @@ -123,9 +130,10 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname) owns_info_log_(options_.info_log != raw_options.info_log), owns_cache_(options_.block_cache != raw_options.block_cache), dbname_(dbname), + table_cache_(new TableCache(dbname_, options_, TableCacheSize(options_))), db_lock_(NULL), shutting_down_(NULL), - bg_cv_(&mutex_), + background_work_finished_signal_(&mutex_), mem_(NULL), imm_(NULL), logfile_(NULL), @@ -133,24 +141,19 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname) log_(NULL), seed_(0), tmp_batch_(new WriteBatch), - bg_compaction_scheduled_(false), - manual_compaction_(NULL) { + background_compaction_scheduled_(false), + manual_compaction_(NULL), + versions_(new VersionSet(dbname_, &options_, table_cache_, + &internal_comparator_)) { has_imm_.Release_Store(NULL); - - // Reserve ten files or so for other uses and give the rest to TableCache. - const int table_cache_size = options_.max_open_files - kNumNonTableCacheFiles; - table_cache_ = new TableCache(dbname_, &options_, table_cache_size); - - versions_ = new VersionSet(dbname_, &options_, table_cache_, - &internal_comparator_); } DBImpl::~DBImpl() { // Wait for background work to finish mutex_.Lock(); shutting_down_.Release_Store(this); // Any non-NULL value is ok - while (bg_compaction_scheduled_) { - bg_cv_.Wait(); + while (background_compaction_scheduled_) { + background_work_finished_signal_.Wait(); } mutex_.Unlock(); @@ -216,6 +219,8 @@ void DBImpl::MaybeIgnoreError(Status* s) const { } void DBImpl::DeleteObsoleteFiles() { + mutex_.AssertHeld(); + if (!bg_error_.ok()) { // After a background error, we don't know whether a new version may // or may not have been committed, so we cannot safely garbage collect. @@ -227,7 +232,7 @@ void DBImpl::DeleteObsoleteFiles() { versions_->AddLiveFiles(&live); std::vector<std::string> filenames; - env_->GetChildren(dbname_, &filenames); // Ignoring errors on purpose + env_->GetChildren(dbname_, &filenames); // Ignoring errors on purpose uint64_t number; FileType type; for (size_t i = 0; i < filenames.size(); i++) { @@ -263,7 +268,7 @@ void DBImpl::DeleteObsoleteFiles() { table_cache_->Evict(number); } Log(options_.info_log, "Delete type=%d #%lld\n", - int(type), + static_cast<int>(type), static_cast<unsigned long long>(number)); env_->DeleteFile(dbname_ + "/" + filenames[i]); } @@ -575,13 +580,14 @@ void DBImpl::CompactRange(const Slice* begin, const Slice* end) { } } } - TEST_CompactMemTable(); // TODO(sanjay): Skip if memtable does not overlap + TEST_CompactMemTable(); // TODO(sanjay): Skip if memtable does not overlap for (int level = 0; level < max_level_with_files; level++) { TEST_CompactRange(level, begin, end); } } -void DBImpl::TEST_CompactRange(int level, const Slice* begin,const Slice* end) { +void DBImpl::TEST_CompactRange(int level, const Slice* begin, + const Slice* end) { assert(level >= 0); assert(level + 1 < config::kNumLevels); @@ -609,7 +615,7 @@ void DBImpl::TEST_CompactRange(int level, const Slice* begin,const Slice* end) { manual_compaction_ = &manual; MaybeScheduleCompaction(); } else { // Running either my compaction or another compaction. - bg_cv_.Wait(); + background_work_finished_signal_.Wait(); } } if (manual_compaction_ == &manual) { @@ -625,7 +631,7 @@ Status DBImpl::TEST_CompactMemTable() { // Wait until the compaction completes MutexLock l(&mutex_); while (imm_ != NULL && bg_error_.ok()) { - bg_cv_.Wait(); + background_work_finished_signal_.Wait(); } if (imm_ != NULL) { s = bg_error_; @@ -638,13 +644,13 @@ void DBImpl::RecordBackgroundError(const Status& s) { mutex_.AssertHeld(); if (bg_error_.ok()) { bg_error_ = s; - bg_cv_.SignalAll(); + background_work_finished_signal_.SignalAll(); } } void DBImpl::MaybeScheduleCompaction() { mutex_.AssertHeld(); - if (bg_compaction_scheduled_) { + if (background_compaction_scheduled_) { // Already scheduled } else if (shutting_down_.Acquire_Load()) { // DB is being deleted; no more background compactions @@ -655,7 +661,7 @@ void DBImpl::MaybeScheduleCompaction() { !versions_->NeedsCompaction()) { // No work to be done } else { - bg_compaction_scheduled_ = true; + background_compaction_scheduled_ = true; env_->Schedule(&DBImpl::BGWork, this); } } @@ -666,7 +672,7 @@ void DBImpl::BGWork(void* db) { void DBImpl::BackgroundCall() { MutexLock l(&mutex_); - assert(bg_compaction_scheduled_); + assert(background_compaction_scheduled_); if (shutting_down_.Acquire_Load()) { // No more background work when shutting down. } else if (!bg_error_.ok()) { @@ -675,12 +681,12 @@ void DBImpl::BackgroundCall() { BackgroundCompaction(); } - bg_compaction_scheduled_ = false; + background_compaction_scheduled_ = false; // Previous compaction may have produced too many files in a level, // so reschedule another compaction if needed. MaybeScheduleCompaction(); - bg_cv_.SignalAll(); + background_work_finished_signal_.SignalAll(); } void DBImpl::BackgroundCompaction() { @@ -920,7 +926,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { mutex_.Lock(); if (imm_ != NULL) { CompactMemTable(); - bg_cv_.SignalAll(); // Wakeup MakeRoomForWrite() if necessary + // Wake up MakeRoomForWrite() if necessary. + background_work_finished_signal_.SignalAll(); } mutex_.Unlock(); imm_micros += (env_->NowMicros() - imm_start); @@ -1267,6 +1274,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { // REQUIRES: Writer list must be non-empty // REQUIRES: First writer must have a non-NULL batch WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) { + mutex_.AssertHeld(); assert(!writers_.empty()); Writer* first = writers_.front(); WriteBatch* result = first->batch; @@ -1346,11 +1354,11 @@ Status DBImpl::MakeRoomForWrite(bool force) { // We have filled up the current memtable, but the previous // one is still being compacted, so we wait. Log(options_.info_log, "Current memtable full; waiting...\n"); - bg_cv_.Wait(); + background_work_finished_signal_.Wait(); } else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) { // There are too many level-0 files. Log(options_.info_log, "Too many L0 files; waiting...\n"); - bg_cv_.Wait(); + background_work_finished_signal_.Wait(); } else { // Attempt to switch to a new memtable and trigger compaction of old assert(versions_->PrevLogNumber() == 0); diff --git a/db/db_impl.h b/db/db_impl.h index 3861b86..6344112 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -84,7 +84,7 @@ class DBImpl : public DB { void MaybeIgnoreError(Status* s) const; // Delete any unneeded files and stale in-memory entries. - void DeleteObsoleteFiles(); + void DeleteObsoleteFiles() EXCLUSIVE_LOCKS_REQUIRED(mutex_); // Compact the in-memory write buffer to disk. Switches to a new // log-file/memtable and writes a new descriptor iff successful. @@ -100,14 +100,15 @@ class DBImpl : public DB { Status MakeRoomForWrite(bool force /* compact even if there is room? */) EXCLUSIVE_LOCKS_REQUIRED(mutex_); - WriteBatch* BuildBatchGroup(Writer** last_writer); + WriteBatch* BuildBatchGroup(Writer** last_writer) + EXCLUSIVE_LOCKS_REQUIRED(mutex_); void RecordBackgroundError(const Status& s); void MaybeScheduleCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_); static void BGWork(void* db); void BackgroundCall(); - void BackgroundCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_); + void BackgroundCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_); void CleanupCompaction(CompactionState* compact) EXCLUSIVE_LOCKS_REQUIRED(mutex_); Status DoCompactionWork(CompactionState* compact) @@ -123,12 +124,12 @@ class DBImpl : public DB { const InternalKeyComparator internal_comparator_; const InternalFilterPolicy internal_filter_policy_; const Options options_; // options_.comparator == &internal_comparator_ - bool owns_info_log_; - bool owns_cache_; + const bool owns_info_log_; + const bool owns_cache_; const std::string dbname_; // table_cache_ provides its own synchronization - TableCache* table_cache_; + TableCache* const table_cache_; // Lock over the persistent DB state. Non-NULL iff successfully acquired. FileLock* db_lock_; @@ -136,27 +137,27 @@ class DBImpl : public DB { // State below is protected by mutex_ port::Mutex mutex_; port::AtomicPointer shutting_down_; - port::CondVar bg_cv_; // Signalled when background work finishes + port::CondVar background_work_finished_signal_ GUARDED_BY(mutex_); MemTable* mem_; - MemTable* imm_; // Memtable being compacted - port::AtomicPointer has_imm_; // So bg thread can detect non-NULL imm_ + MemTable* imm_ GUARDED_BY(mutex_); // Memtable being compacted + port::AtomicPointer has_imm_; // So bg thread can detect non-NULL imm_ WritableFile* logfile_; - uint64_t logfile_number_; + uint64_t logfile_number_ GUARDED_BY(mutex_); log::Writer* log_; - uint32_t seed_; // For sampling. + uint32_t seed_ GUARDED_BY(mutex_); // For sampling. // Queue of writers. - std::deque<Writer*> writers_; - WriteBatch* tmp_batch_; + std::deque<Writer*> writers_ GUARDED_BY(mutex_); + WriteBatch* tmp_batch_ GUARDED_BY(mutex_); - SnapshotList snapshots_; + SnapshotList snapshots_ GUARDED_BY(mutex_); // Set of table files to protect from deletion because they are // part of ongoing compactions. - std::set<uint64_t> pending_outputs_; + std::set<uint64_t> pending_outputs_ GUARDED_BY(mutex_); // Has a background compaction been scheduled or is running? - bool bg_compaction_scheduled_; + bool background_compaction_scheduled_ GUARDED_BY(mutex_); // Information for a manual compaction struct ManualCompaction { @@ -166,12 +167,12 @@ class DBImpl : public DB { const InternalKey* end; // NULL means end of key range InternalKey tmp_storage; // Used to keep track of compaction progress }; - ManualCompaction* manual_compaction_; + ManualCompaction* manual_compaction_ GUARDED_BY(mutex_); - VersionSet* versions_; + VersionSet* const versions_; // Have we encountered a background error in paranoid mode? - Status bg_error_; + Status bg_error_ GUARDED_BY(mutex_); // Per level compaction stats. stats_[level] stores the stats for // compactions that produced data for the specified "level". @@ -188,7 +189,7 @@ class DBImpl : public DB { this->bytes_written += c.bytes_written; } }; - CompactionStats stats_[config::kNumLevels]; + CompactionStats stats_[config::kNumLevels] GUARDED_BY(mutex_); // No copying allowed DBImpl(const DBImpl&); diff --git a/db/memtable.h b/db/memtable.h index 9f41567..f2a6736 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -14,7 +14,6 @@ namespace leveldb { class InternalKeyComparator; -class Mutex; class MemTableIterator; class MemTable { diff --git a/db/repair.cc b/db/repair.cc index 4cd4bb0..c10da82 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -54,7 +54,7 @@ class Repairer { owns_cache_(options_.block_cache != options.block_cache), next_file_number_(1) { // TableCache can be small since we expect each table to be opened once. - table_cache_ = new TableCache(dbname_, &options_, 10); + table_cache_ = new TableCache(dbname_, options_, 10); } ~Repairer() { diff --git a/db/table_cache.cc b/db/table_cache.cc index e3d82cd..6cf005b 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -30,9 +30,9 @@ static void UnrefEntry(void* arg1, void* arg2) { } TableCache::TableCache(const std::string& dbname, - const Options* options, + const Options& options, int entries) - : env_(options->env), + : env_(options.env), dbname_(dbname), options_(options), cache_(NewLRUCache(entries)) { @@ -61,7 +61,7 @@ Status TableCache::FindTable(uint64_t file_number, uint64_t file_size, } } if (s.ok()) { - s = Table::Open(*options_, file, file_size, &table); + s = Table::Open(options_, file, file_size, &table); } if (!s.ok()) { diff --git a/db/table_cache.h b/db/table_cache.h index 8cf4aaf..e9191dc 100644 --- a/db/table_cache.h +++ b/db/table_cache.h @@ -20,7 +20,7 @@ class Env; class TableCache { public: - TableCache(const std::string& dbname, const Options* options, int entries); + TableCache(const std::string& dbname, const Options& options, int entries); ~TableCache(); // Return an iterator for the specified file number (the corresponding @@ -50,7 +50,7 @@ class TableCache { private: Env* const env_; const std::string dbname_; - const Options* options_; + const Options& options_; Cache* cache_; Status FindTable(uint64_t file_number, uint64_t file_size, Cache::Handle**); |