summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--db/db_impl.cc68
-rw-r--r--db/db_impl.h41
-rw-r--r--db/memtable.h1
-rw-r--r--db/repair.cc2
-rw-r--r--db/table_cache.cc6
-rw-r--r--db/table_cache.h4
-rw-r--r--helpers/memenv/memenv.cc18
-rw-r--r--util/cache.cc16
8 files changed, 85 insertions, 71 deletions
diff --git a/db/db_impl.cc b/db/db_impl.cc
index a9044c2..8484e46 100644
--- a/db/db_impl.cc
+++ b/db/db_impl.cc
@@ -4,12 +4,14 @@
#include "db/db_impl.h"
+#include <stdint.h>
+#include <stdio.h>
+
#include <algorithm>
#include <set>
#include <string>
-#include <stdint.h>
-#include <stdio.h>
#include <vector>
+
#include "db/builder.h"
#include "db/db_iter.h"
#include "db/dbformat.h"
@@ -82,7 +84,7 @@ struct DBImpl::CompactionState {
};
// Fix user-supplied options to be reasonable
-template <class T,class V>
+template <class T, class V>
static void ClipToRange(T* ptr, V minvalue, V maxvalue) {
if (static_cast<V>(*ptr) > maxvalue) *ptr = maxvalue;
if (static_cast<V>(*ptr) < minvalue) *ptr = minvalue;
@@ -114,6 +116,11 @@ Options SanitizeOptions(const std::string& dbname,
return result;
}
+static int TableCacheSize(const Options& sanitized_options) {
+ // Reserve ten files or so for other uses and give the rest to TableCache.
+ return sanitized_options.max_open_files - kNumNonTableCacheFiles;
+}
+
DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
: env_(raw_options.env),
internal_comparator_(raw_options.comparator),
@@ -123,9 +130,10 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
owns_info_log_(options_.info_log != raw_options.info_log),
owns_cache_(options_.block_cache != raw_options.block_cache),
dbname_(dbname),
+ table_cache_(new TableCache(dbname_, options_, TableCacheSize(options_))),
db_lock_(NULL),
shutting_down_(NULL),
- bg_cv_(&mutex_),
+ background_work_finished_signal_(&mutex_),
mem_(NULL),
imm_(NULL),
logfile_(NULL),
@@ -133,24 +141,19 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
log_(NULL),
seed_(0),
tmp_batch_(new WriteBatch),
- bg_compaction_scheduled_(false),
- manual_compaction_(NULL) {
+ background_compaction_scheduled_(false),
+ manual_compaction_(NULL),
+ versions_(new VersionSet(dbname_, &options_, table_cache_,
+ &internal_comparator_)) {
has_imm_.Release_Store(NULL);
-
- // Reserve ten files or so for other uses and give the rest to TableCache.
- const int table_cache_size = options_.max_open_files - kNumNonTableCacheFiles;
- table_cache_ = new TableCache(dbname_, &options_, table_cache_size);
-
- versions_ = new VersionSet(dbname_, &options_, table_cache_,
- &internal_comparator_);
}
DBImpl::~DBImpl() {
// Wait for background work to finish
mutex_.Lock();
shutting_down_.Release_Store(this); // Any non-NULL value is ok
- while (bg_compaction_scheduled_) {
- bg_cv_.Wait();
+ while (background_compaction_scheduled_) {
+ background_work_finished_signal_.Wait();
}
mutex_.Unlock();
@@ -216,6 +219,8 @@ void DBImpl::MaybeIgnoreError(Status* s) const {
}
void DBImpl::DeleteObsoleteFiles() {
+ mutex_.AssertHeld();
+
if (!bg_error_.ok()) {
// After a background error, we don't know whether a new version may
// or may not have been committed, so we cannot safely garbage collect.
@@ -227,7 +232,7 @@ void DBImpl::DeleteObsoleteFiles() {
versions_->AddLiveFiles(&live);
std::vector<std::string> filenames;
- env_->GetChildren(dbname_, &filenames); // Ignoring errors on purpose
+ env_->GetChildren(dbname_, &filenames); // Ignoring errors on purpose
uint64_t number;
FileType type;
for (size_t i = 0; i < filenames.size(); i++) {
@@ -263,7 +268,7 @@ void DBImpl::DeleteObsoleteFiles() {
table_cache_->Evict(number);
}
Log(options_.info_log, "Delete type=%d #%lld\n",
- int(type),
+ static_cast<int>(type),
static_cast<unsigned long long>(number));
env_->DeleteFile(dbname_ + "/" + filenames[i]);
}
@@ -575,13 +580,14 @@ void DBImpl::CompactRange(const Slice* begin, const Slice* end) {
}
}
}
- TEST_CompactMemTable(); // TODO(sanjay): Skip if memtable does not overlap
+ TEST_CompactMemTable(); // TODO(sanjay): Skip if memtable does not overlap
for (int level = 0; level < max_level_with_files; level++) {
TEST_CompactRange(level, begin, end);
}
}
-void DBImpl::TEST_CompactRange(int level, const Slice* begin,const Slice* end) {
+void DBImpl::TEST_CompactRange(int level, const Slice* begin,
+ const Slice* end) {
assert(level >= 0);
assert(level + 1 < config::kNumLevels);
@@ -609,7 +615,7 @@ void DBImpl::TEST_CompactRange(int level, const Slice* begin,const Slice* end) {
manual_compaction_ = &manual;
MaybeScheduleCompaction();
} else { // Running either my compaction or another compaction.
- bg_cv_.Wait();
+ background_work_finished_signal_.Wait();
}
}
if (manual_compaction_ == &manual) {
@@ -625,7 +631,7 @@ Status DBImpl::TEST_CompactMemTable() {
// Wait until the compaction completes
MutexLock l(&mutex_);
while (imm_ != NULL && bg_error_.ok()) {
- bg_cv_.Wait();
+ background_work_finished_signal_.Wait();
}
if (imm_ != NULL) {
s = bg_error_;
@@ -638,13 +644,13 @@ void DBImpl::RecordBackgroundError(const Status& s) {
mutex_.AssertHeld();
if (bg_error_.ok()) {
bg_error_ = s;
- bg_cv_.SignalAll();
+ background_work_finished_signal_.SignalAll();
}
}
void DBImpl::MaybeScheduleCompaction() {
mutex_.AssertHeld();
- if (bg_compaction_scheduled_) {
+ if (background_compaction_scheduled_) {
// Already scheduled
} else if (shutting_down_.Acquire_Load()) {
// DB is being deleted; no more background compactions
@@ -655,7 +661,7 @@ void DBImpl::MaybeScheduleCompaction() {
!versions_->NeedsCompaction()) {
// No work to be done
} else {
- bg_compaction_scheduled_ = true;
+ background_compaction_scheduled_ = true;
env_->Schedule(&DBImpl::BGWork, this);
}
}
@@ -666,7 +672,7 @@ void DBImpl::BGWork(void* db) {
void DBImpl::BackgroundCall() {
MutexLock l(&mutex_);
- assert(bg_compaction_scheduled_);
+ assert(background_compaction_scheduled_);
if (shutting_down_.Acquire_Load()) {
// No more background work when shutting down.
} else if (!bg_error_.ok()) {
@@ -675,12 +681,12 @@ void DBImpl::BackgroundCall() {
BackgroundCompaction();
}
- bg_compaction_scheduled_ = false;
+ background_compaction_scheduled_ = false;
// Previous compaction may have produced too many files in a level,
// so reschedule another compaction if needed.
MaybeScheduleCompaction();
- bg_cv_.SignalAll();
+ background_work_finished_signal_.SignalAll();
}
void DBImpl::BackgroundCompaction() {
@@ -920,7 +926,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
mutex_.Lock();
if (imm_ != NULL) {
CompactMemTable();
- bg_cv_.SignalAll(); // Wakeup MakeRoomForWrite() if necessary
+ // Wake up MakeRoomForWrite() if necessary.
+ background_work_finished_signal_.SignalAll();
}
mutex_.Unlock();
imm_micros += (env_->NowMicros() - imm_start);
@@ -1267,6 +1274,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
// REQUIRES: Writer list must be non-empty
// REQUIRES: First writer must have a non-NULL batch
WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) {
+ mutex_.AssertHeld();
assert(!writers_.empty());
Writer* first = writers_.front();
WriteBatch* result = first->batch;
@@ -1346,11 +1354,11 @@ Status DBImpl::MakeRoomForWrite(bool force) {
// We have filled up the current memtable, but the previous
// one is still being compacted, so we wait.
Log(options_.info_log, "Current memtable full; waiting...\n");
- bg_cv_.Wait();
+ background_work_finished_signal_.Wait();
} else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) {
// There are too many level-0 files.
Log(options_.info_log, "Too many L0 files; waiting...\n");
- bg_cv_.Wait();
+ background_work_finished_signal_.Wait();
} else {
// Attempt to switch to a new memtable and trigger compaction of old
assert(versions_->PrevLogNumber() == 0);
diff --git a/db/db_impl.h b/db/db_impl.h
index 3861b86..6344112 100644
--- a/db/db_impl.h
+++ b/db/db_impl.h
@@ -84,7 +84,7 @@ class DBImpl : public DB {
void MaybeIgnoreError(Status* s) const;
// Delete any unneeded files and stale in-memory entries.
- void DeleteObsoleteFiles();
+ void DeleteObsoleteFiles() EXCLUSIVE_LOCKS_REQUIRED(mutex_);
// Compact the in-memory write buffer to disk. Switches to a new
// log-file/memtable and writes a new descriptor iff successful.
@@ -100,14 +100,15 @@ class DBImpl : public DB {
Status MakeRoomForWrite(bool force /* compact even if there is room? */)
EXCLUSIVE_LOCKS_REQUIRED(mutex_);
- WriteBatch* BuildBatchGroup(Writer** last_writer);
+ WriteBatch* BuildBatchGroup(Writer** last_writer)
+ EXCLUSIVE_LOCKS_REQUIRED(mutex_);
void RecordBackgroundError(const Status& s);
void MaybeScheduleCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_);
static void BGWork(void* db);
void BackgroundCall();
- void BackgroundCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_);
+ void BackgroundCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_);
void CleanupCompaction(CompactionState* compact)
EXCLUSIVE_LOCKS_REQUIRED(mutex_);
Status DoCompactionWork(CompactionState* compact)
@@ -123,12 +124,12 @@ class DBImpl : public DB {
const InternalKeyComparator internal_comparator_;
const InternalFilterPolicy internal_filter_policy_;
const Options options_; // options_.comparator == &internal_comparator_
- bool owns_info_log_;
- bool owns_cache_;
+ const bool owns_info_log_;
+ const bool owns_cache_;
const std::string dbname_;
// table_cache_ provides its own synchronization
- TableCache* table_cache_;
+ TableCache* const table_cache_;
// Lock over the persistent DB state. Non-NULL iff successfully acquired.
FileLock* db_lock_;
@@ -136,27 +137,27 @@ class DBImpl : public DB {
// State below is protected by mutex_
port::Mutex mutex_;
port::AtomicPointer shutting_down_;
- port::CondVar bg_cv_; // Signalled when background work finishes
+ port::CondVar background_work_finished_signal_ GUARDED_BY(mutex_);
MemTable* mem_;
- MemTable* imm_; // Memtable being compacted
- port::AtomicPointer has_imm_; // So bg thread can detect non-NULL imm_
+ MemTable* imm_ GUARDED_BY(mutex_); // Memtable being compacted
+ port::AtomicPointer has_imm_; // So bg thread can detect non-NULL imm_
WritableFile* logfile_;
- uint64_t logfile_number_;
+ uint64_t logfile_number_ GUARDED_BY(mutex_);
log::Writer* log_;
- uint32_t seed_; // For sampling.
+ uint32_t seed_ GUARDED_BY(mutex_); // For sampling.
// Queue of writers.
- std::deque<Writer*> writers_;
- WriteBatch* tmp_batch_;
+ std::deque<Writer*> writers_ GUARDED_BY(mutex_);
+ WriteBatch* tmp_batch_ GUARDED_BY(mutex_);
- SnapshotList snapshots_;
+ SnapshotList snapshots_ GUARDED_BY(mutex_);
// Set of table files to protect from deletion because they are
// part of ongoing compactions.
- std::set<uint64_t> pending_outputs_;
+ std::set<uint64_t> pending_outputs_ GUARDED_BY(mutex_);
// Has a background compaction been scheduled or is running?
- bool bg_compaction_scheduled_;
+ bool background_compaction_scheduled_ GUARDED_BY(mutex_);
// Information for a manual compaction
struct ManualCompaction {
@@ -166,12 +167,12 @@ class DBImpl : public DB {
const InternalKey* end; // NULL means end of key range
InternalKey tmp_storage; // Used to keep track of compaction progress
};
- ManualCompaction* manual_compaction_;
+ ManualCompaction* manual_compaction_ GUARDED_BY(mutex_);
- VersionSet* versions_;
+ VersionSet* const versions_;
// Have we encountered a background error in paranoid mode?
- Status bg_error_;
+ Status bg_error_ GUARDED_BY(mutex_);
// Per level compaction stats. stats_[level] stores the stats for
// compactions that produced data for the specified "level".
@@ -188,7 +189,7 @@ class DBImpl : public DB {
this->bytes_written += c.bytes_written;
}
};
- CompactionStats stats_[config::kNumLevels];
+ CompactionStats stats_[config::kNumLevels] GUARDED_BY(mutex_);
// No copying allowed
DBImpl(const DBImpl&);
diff --git a/db/memtable.h b/db/memtable.h
index 9f41567..f2a6736 100644
--- a/db/memtable.h
+++ b/db/memtable.h
@@ -14,7 +14,6 @@
namespace leveldb {
class InternalKeyComparator;
-class Mutex;
class MemTableIterator;
class MemTable {
diff --git a/db/repair.cc b/db/repair.cc
index 4cd4bb0..c10da82 100644
--- a/db/repair.cc
+++ b/db/repair.cc
@@ -54,7 +54,7 @@ class Repairer {
owns_cache_(options_.block_cache != options.block_cache),
next_file_number_(1) {
// TableCache can be small since we expect each table to be opened once.
- table_cache_ = new TableCache(dbname_, &options_, 10);
+ table_cache_ = new TableCache(dbname_, options_, 10);
}
~Repairer() {
diff --git a/db/table_cache.cc b/db/table_cache.cc
index e3d82cd..6cf005b 100644
--- a/db/table_cache.cc
+++ b/db/table_cache.cc
@@ -30,9 +30,9 @@ static void UnrefEntry(void* arg1, void* arg2) {
}
TableCache::TableCache(const std::string& dbname,
- const Options* options,
+ const Options& options,
int entries)
- : env_(options->env),
+ : env_(options.env),
dbname_(dbname),
options_(options),
cache_(NewLRUCache(entries)) {
@@ -61,7 +61,7 @@ Status TableCache::FindTable(uint64_t file_number, uint64_t file_size,
}
}
if (s.ok()) {
- s = Table::Open(*options_, file, file_size, &table);
+ s = Table::Open(options_, file, file_size, &table);
}
if (!s.ok()) {
diff --git a/db/table_cache.h b/db/table_cache.h
index 8cf4aaf..e9191dc 100644
--- a/db/table_cache.h
+++ b/db/table_cache.h
@@ -20,7 +20,7 @@ class Env;
class TableCache {
public:
- TableCache(const std::string& dbname, const Options* options, int entries);
+ TableCache(const std::string& dbname, const Options& options, int entries);
~TableCache();
// Return an iterator for the specified file number (the corresponding
@@ -50,7 +50,7 @@ class TableCache {
private:
Env* const env_;
const std::string dbname_;
- const Options* options_;
+ const Options& options_;
Cache* cache_;
Status FindTable(uint64_t file_number, uint64_t file_size, Cache::Handle**);
diff --git a/helpers/memenv/memenv.cc b/helpers/memenv/memenv.cc
index 9a98884..ee7abd4 100644
--- a/helpers/memenv/memenv.cc
+++ b/helpers/memenv/memenv.cc
@@ -4,14 +4,17 @@
#include "helpers/memenv/memenv.h"
+#include <string.h>
+
+#include <map>
+#include <string>
+#include <vector>
+
#include "leveldb/env.h"
#include "leveldb/status.h"
#include "port/port.h"
+#include "port/thread_annotations.h"
#include "util/mutexlock.h"
-#include <map>
-#include <string.h>
-#include <string>
-#include <vector>
namespace leveldb {
@@ -135,7 +138,7 @@ class FileState {
void operator=(const FileState&);
port::Mutex refs_mutex_;
- int refs_; // Protected by refs_mutex_;
+ int refs_ GUARDED_BY(refs_mutex_);
// The following fields are not protected by any mutex. They are only mutable
// while the file is being written, and concurrent access is not allowed
@@ -312,7 +315,8 @@ class InMemoryEnv : public EnvWrapper {
return Status::OK();
}
- void DeleteFileInternal(const std::string& fname) {
+ void DeleteFileInternal(const std::string& fname)
+ EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
if (file_map_.find(fname) == file_map_.end()) {
return;
}
@@ -386,7 +390,7 @@ class InMemoryEnv : public EnvWrapper {
// Map from filenames to FileState objects, representing a simple file system.
typedef std::map<std::string, FileState*> FileSystem;
port::Mutex mutex_;
- FileSystem file_map_; // Protected by mutex_.
+ FileSystem file_map_ GUARDED_BY(mutex_);
};
} // namespace
diff --git a/util/cache.cc b/util/cache.cc
index bd914ae..10b7103 100644
--- a/util/cache.cc
+++ b/util/cache.cc
@@ -8,6 +8,7 @@
#include "leveldb/cache.h"
#include "port/port.h"
+#include "port/thread_annotations.h"
#include "util/hash.h"
#include "util/mutexlock.h"
@@ -174,25 +175,25 @@ class LRUCache {
void LRU_Append(LRUHandle*list, LRUHandle* e);
void Ref(LRUHandle* e);
void Unref(LRUHandle* e);
- bool FinishErase(LRUHandle* e);
+ bool FinishErase(LRUHandle* e) EXCLUSIVE_LOCKS_REQUIRED(mutex_);
// Initialized before use.
size_t capacity_;
// mutex_ protects the following state.
mutable port::Mutex mutex_;
- size_t usage_;
+ size_t usage_ GUARDED_BY(mutex_);
// Dummy head of LRU list.
// lru.prev is newest entry, lru.next is oldest entry.
// Entries have refs==1 and in_cache==true.
- LRUHandle lru_;
+ LRUHandle lru_ GUARDED_BY(mutex_);
// Dummy head of in-use list.
// Entries are in use by clients, and have refs >= 2 and in_cache==true.
- LRUHandle in_use_;
+ LRUHandle in_use_ GUARDED_BY(mutex_);
- HandleTable table_;
+ HandleTable table_ GUARDED_BY(mutex_);
};
LRUCache::LRUCache()
@@ -227,11 +228,12 @@ void LRUCache::Ref(LRUHandle* e) {
void LRUCache::Unref(LRUHandle* e) {
assert(e->refs > 0);
e->refs--;
- if (e->refs == 0) { // Deallocate.
+ if (e->refs == 0) { // Deallocate.
assert(!e->in_cache);
(*e->deleter)(e->key(), e->value);
free(e);
- } else if (e->in_cache && e->refs == 1) { // No longer in use; move to lru_ list.
+ } else if (e->in_cache && e->refs == 1) {
+ // No longer in use; move to lru_ list.
LRU_Remove(e);
LRU_Append(&lru_, e);
}