summaryrefslogtreecommitdiff
path: root/db/db_impl.cc
diff options
context:
space:
mode:
Diffstat (limited to 'db/db_impl.cc')
-rw-r--r--db/db_impl.cc143
1 files changed, 107 insertions, 36 deletions
diff --git a/db/db_impl.cc b/db/db_impl.cc
index 7b268ea..dde3711 100644
--- a/db/db_impl.cc
+++ b/db/db_impl.cc
@@ -35,6 +35,17 @@
namespace leveldb {
+// Information kept for every waiting writer
+struct DBImpl::Writer {
+ Status status;
+ WriteBatch* batch;
+ bool sync;
+ bool done;
+ port::CondVar cv;
+
+ explicit Writer(port::Mutex* mu) : cv(mu) { }
+};
+
struct DBImpl::CompactionState {
Compaction* const compaction;
@@ -113,8 +124,7 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
logfile_(NULL),
logfile_number_(0),
log_(NULL),
- logger_(NULL),
- logger_cv_(&mutex_),
+ tmp_batch_(new WriteBatch),
bg_compaction_scheduled_(false),
manual_compaction_(NULL) {
mem_->Ref();
@@ -144,6 +154,7 @@ DBImpl::~DBImpl() {
delete versions_;
if (mem_ != NULL) mem_->Unref();
if (imm_ != NULL) imm_->Unref();
+ delete tmp_batch_;
delete log_;
delete logfile_;
delete table_cache_;
@@ -554,13 +565,11 @@ void DBImpl::TEST_CompactRange(int level, const Slice* begin,const Slice* end) {
}
Status DBImpl::TEST_CompactMemTable() {
- MutexLock l(&mutex_);
- LoggerId self;
- AcquireLoggingResponsibility(&self);
- Status s = MakeRoomForWrite(true /* force compaction */);
- ReleaseLoggingResponsibility(&self);
+ // NULL batch means just wait for earlier writes to be done
+ Status s = Write(WriteOptions(), NULL);
if (s.ok()) {
// Wait until the compaction completes
+ MutexLock l(&mutex_);
while (imm_ != NULL && bg_error_.ok()) {
bg_cv_.Wait();
}
@@ -1094,38 +1103,35 @@ Status DBImpl::Delete(const WriteOptions& options, const Slice& key) {
return DB::Delete(options, key);
}
-// There is at most one thread that is the current logger. This call
-// waits until preceding logger(s) have finished and becomes the
-// current logger.
-void DBImpl::AcquireLoggingResponsibility(LoggerId* self) {
- while (logger_ != NULL) {
- logger_cv_.Wait();
- }
- logger_ = self;
-}
+Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
+ Writer w(&mutex_);
+ w.batch = my_batch;
+ w.sync = options.sync;
+ w.done = false;
-void DBImpl::ReleaseLoggingResponsibility(LoggerId* self) {
- assert(logger_ == self);
- logger_ = NULL;
- logger_cv_.SignalAll();
-}
-
-Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
- Status status;
MutexLock l(&mutex_);
- LoggerId self;
- AcquireLoggingResponsibility(&self);
- status = MakeRoomForWrite(false); // May temporarily release lock and wait
+ writers_.push_back(&w);
+ while (!w.done && &w != writers_.front()) {
+ w.cv.Wait();
+ }
+ if (w.done) {
+ return w.status;
+ }
+
+ // May temporarily unlock and wait.
+ Status status = MakeRoomForWrite(my_batch == NULL);
uint64_t last_sequence = versions_->LastSequence();
- if (status.ok()) {
+ Writer* last_writer = &w;
+ if (status.ok() && my_batch != NULL) { // NULL batch is for compactions
+ WriteBatch* updates = BuildBatchGroup(&last_writer);
WriteBatchInternal::SetSequence(updates, last_sequence + 1);
last_sequence += WriteBatchInternal::Count(updates);
- // Add to log and apply to memtable. We can release the lock during
- // this phase since the "logger_" flag protects against concurrent
- // loggers and concurrent writes into mem_.
+ // Add to log and apply to memtable. We can release the lock
+ // during this phase since &w is currently responsible for logging
+ // and protects against concurrent loggers and concurrent writes
+ // into mem_.
{
- assert(logger_ == &self);
mutex_.Unlock();
status = log_->AddRecord(WriteBatchInternal::Contents(updates));
if (status.ok() && options.sync) {
@@ -1135,20 +1141,85 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
status = WriteBatchInternal::InsertInto(updates, mem_);
}
mutex_.Lock();
- assert(logger_ == &self);
}
+ if (updates == tmp_batch_) tmp_batch_->Clear();
versions_->SetLastSequence(last_sequence);
}
- ReleaseLoggingResponsibility(&self);
+
+ while (true) {
+ Writer* ready = writers_.front();
+ writers_.pop_front();
+ if (ready != &w) {
+ ready->status = status;
+ ready->done = true;
+ ready->cv.Signal();
+ }
+ if (ready == last_writer) break;
+ }
+
+ // Notify new head of write queue
+ if (!writers_.empty()) {
+ writers_.front()->cv.Signal();
+ }
+
return status;
}
+// REQUIRES: Writer list must be non-empty
+// REQUIRES: First writer must have a non-NULL batch
+WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) {
+ assert(!writers_.empty());
+ Writer* first = writers_.front();
+ WriteBatch* result = first->batch;
+ assert(result != NULL);
+
+ size_t size = WriteBatchInternal::ByteSize(first->batch);
+
+ // Allow the group to grow up to a maximum size, but if the
+ // original write is small, limit the growth so we do not slow
+ // down the small write too much.
+ size_t max_size = 1 << 20;
+ if (size <= (128<<10)) {
+ max_size = size + (128<<10);
+ }
+
+ *last_writer = first;
+ std::deque<Writer*>::iterator iter = writers_.begin();
+ ++iter; // Advance past "first"
+ for (; iter != writers_.end(); ++iter) {
+ Writer* w = *iter;
+ if (w->sync && !first->sync) {
+ // Do not include a sync write into a batch handled by a non-sync write.
+ break;
+ }
+
+ if (w->batch != NULL) {
+ size += WriteBatchInternal::ByteSize(w->batch);
+ if (size > max_size) {
+ // Do not make batch too big
+ break;
+ }
+
+ // Append to *reuslt
+ if (result == first->batch) {
+ // Switch to temporary batch instead of disturbing caller's batch
+ result = tmp_batch_;
+ assert(WriteBatchInternal::Count(result) == 0);
+ WriteBatchInternal::Append(result, first->batch);
+ }
+ WriteBatchInternal::Append(result, w->batch);
+ }
+ *last_writer = w;
+ }
+ return result;
+}
+
// REQUIRES: mutex_ is held
-// REQUIRES: this thread is the current logger
+// REQUIRES: this thread is currently at the front of the writer queue
Status DBImpl::MakeRoomForWrite(bool force) {
mutex_.AssertHeld();
- assert(logger_ != NULL);
+ assert(!writers_.empty());
bool allow_delay = !force;
Status s;
while (true) {