summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordgrogan@chromium.org <dgrogan@chromium.org@62dab493-f737-651d-591e-8d6aee1b9529>2011-05-28 00:53:58 +0000
committerdgrogan@chromium.org <dgrogan@chromium.org@62dab493-f737-651d-591e-8d6aee1b9529>2011-05-28 00:53:58 +0000
commit740d8b3d009462eed28a94ea516d667e735b0dfc (patch)
tree83959c625c77d893b34d4306e478a5ba38804dc5
parentda7990950787257cb312ca562ce5977749afc3e9 (diff)
downloadleveldb-740d8b3d009462eed28a94ea516d667e735b0dfc.tar.gz
Update from upstream @21551990
* Patch LevelDB to build for OSX and iOS * Fix race condition in memtable iterator deletion. * Other small fixes. git-svn-id: https://leveldb.googlecode.com/svn/trunk@29 62dab493-f737-651d-591e-8d6aee1b9529
-rw-r--r--Makefile45
-rw-r--r--db/db_bench.cc20
-rw-r--r--db/db_impl.cc36
-rw-r--r--db/db_impl.h4
-rw-r--r--db/db_test.cc95
-rw-r--r--db/log_test.cc2
-rw-r--r--db/memtable.cc33
-rw-r--r--port/port.h2
-rw-r--r--port/port_osx.cc50
-rw-r--r--port/port_osx.h125
-rw-r--r--table/iterator.cc1
-rw-r--r--table/iterator_wrapper.h11
-rw-r--r--util/cache.cc2
13 files changed, 381 insertions, 45 deletions
diff --git a/Makefile b/Makefile
index 43ac23d..5eadd72 100644
--- a/Makefile
+++ b/Makefile
@@ -8,7 +8,21 @@ CC = g++
#OPT = -O2 -DNDEBUG
OPT = -g2
-CFLAGS = -c -DLEVELDB_PLATFORM_POSIX -I. -I./include -std=c++0x $(OPT)
+UNAME := $(shell uname)
+
+ifeq ($(UNAME), Darwin)
+# To build for iOS, set PLATFORM=IOS.
+ifndef PLATFORM
+PLATFORM=OSX
+endif # PLATFORM
+PLATFORM_CFLAGS = -DLEVELDB_PLATFORM_OSX
+PORT_MODULE = port_osx.o
+else # UNAME
+PLATFORM_CFLAGS = -DLEVELDB_PLATFORM_POSIX -std=c++0x
+PORT_MODULE = port_posix.o
+endif # UNAME
+
+CFLAGS = -c -I. -I./include $(PLATFORM_CFLAGS) $(OPT)
LDFLAGS=-lpthread
@@ -26,7 +40,7 @@ LIBOBJECTS = \
./db/version_edit.o \
./db/version_set.o \
./db/write_batch.o \
- ./port/port_posix.o \
+ ./port/$(PORT_MODULE) \
./table/block.o \
./table/block_builder.o \
./table/format.o \
@@ -69,13 +83,25 @@ TESTS = \
PROGRAMS = db_bench $(TESTS)
-all: $(PROGRAMS)
+LIBRARY = libleveldb.a
+
+ifeq ($(PLATFORM), IOS)
+# Only XCode can build executable applications for iOS.
+all: $(LIBRARY)
+else
+all: $(PROGRAMS) $(LIBRARY)
+endif
check: $(TESTS)
for t in $(TESTS); do echo "***** Running $$t"; ./$$t || exit 1; done
clean:
- rm -f $(PROGRAMS) */*.o
+ -rm -f $(PROGRAMS) $(LIBRARY) */*.o ios-x86/*/*.o ios-arm/*/*.o
+ -rmdir -p ios-x86/* ios-arm/*
+
+$(LIBRARY): $(LIBOBJECTS)
+ rm -f $@
+ $(AR) -rs $@ $(LIBOBJECTS)
db_bench: db/db_bench.o $(LIBOBJECTS) $(TESTUTIL)
$(CC) $(LDFLAGS) db/db_bench.o $(LIBOBJECTS) $(TESTUTIL) -o $@
@@ -122,8 +148,19 @@ version_edit_test: db/version_edit_test.o $(LIBOBJECTS) $(TESTHARNESS)
write_batch_test: db/write_batch_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CC) $(LDFLAGS) db/write_batch_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@
+ifeq ($(PLATFORM), IOS)
+# For iOS, create universal object files to be used on both the simulator and
+# a device.
+.cc.o:
+ mkdir -p ios-x86/$(dir $@)
+ $(CC) $(CFLAGS) -isysroot /Developer/Platforms/iPhoneSimulator.platform/Developer/SDKs/iPhoneSimulator4.3.sdk -arch i686 $< -o ios-x86/$@
+ mkdir -p ios-arm/$(dir $@)
+ $(CC) $(CFLAGS) -isysroot /Developer/Platforms/iPhoneOS.platform/Developer/SDKs/iPhoneOS4.3.sdk -arch armv6 -arch armv7 $< -o ios-arm/$@
+ lipo ios-x86/$@ ios-arm/$@ -create -output $@
+else
.cc.o:
$(CC) $(CFLAGS) $< -o $@
+endif
# TODO(gabor): dependencies for .o files
# TODO(gabor): Build library
diff --git a/db/db_bench.cc b/db/db_bench.cc
index b5fd679..b24179d 100644
--- a/db/db_bench.cc
+++ b/db/db_bench.cc
@@ -29,6 +29,7 @@
// readrandom -- read N times in random order
// readhot -- read N times in random order from 1% section of DB
// crc32c -- repeated crc32c of 4K of data
+// acquireload -- load N*1000 times
// Meta operations:
// compact -- Compact the entire DB
// stats -- Print DB stats
@@ -50,6 +51,7 @@ static const char* FLAGS_benchmarks =
"crc32c,"
"snappycomp,"
"snappyuncomp,"
+ "acquireload,"
;
// Number of key/values to place in database
@@ -382,6 +384,8 @@ class Benchmark {
Compact();
} else if (name == Slice("crc32c")) {
Crc32c(4096, "(4K per op)");
+ } else if (name == Slice("acquireload")) {
+ AcquireLoad();
} else if (name == Slice("snappycomp")) {
SnappyCompress();
} else if (name == Slice("snappyuncomp")) {
@@ -420,6 +424,22 @@ class Benchmark {
message_ = label;
}
+ void AcquireLoad() {
+ int dummy;
+ port::AtomicPointer ap(&dummy);
+ int count = 0;
+ void *ptr = NULL;
+ message_ = "(each op is 1000 loads)";
+ while (count < 100000) {
+ for (int i = 0; i < 1000; i++) {
+ ptr = ap.Acquire_Load();
+ }
+ count++;
+ FinishedSingleOp();
+ }
+ if (ptr == NULL) exit(1); // Disable unused variable warning.
+ }
+
void SnappyCompress() {
Slice input = gen_.Generate(Options().block_size);
int64_t bytes = 0;
diff --git a/db/db_impl.cc b/db/db_impl.cc
index baf9299..9b139ce 100644
--- a/db/db_impl.cc
+++ b/db/db_impl.cc
@@ -875,22 +875,49 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
return status;
}
+namespace {
+struct IterState {
+ port::Mutex* mu;
+ Version* version;
+ MemTable* mem;
+ MemTable* imm;
+};
+
+static void CleanupIteratorState(void* arg1, void* arg2) {
+ IterState* state = reinterpret_cast<IterState*>(arg1);
+ state->mu->Lock();
+ state->mem->Unref();
+ if (state->imm != NULL) state->imm->Unref();
+ state->version->Unref();
+ state->mu->Unlock();
+ delete state;
+}
+}
+
Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
SequenceNumber* latest_snapshot) {
+ IterState* cleanup = new IterState;
mutex_.Lock();
*latest_snapshot = versions_->LastSequence();
// Collect together all needed child iterators
std::vector<Iterator*> list;
list.push_back(mem_->NewIterator());
+ mem_->Ref();
if (imm_ != NULL) {
list.push_back(imm_->NewIterator());
+ imm_->Ref();
}
versions_->current()->AddIterators(options, &list);
Iterator* internal_iter =
NewMergingIterator(&internal_comparator_, &list[0], list.size());
versions_->current()->Ref();
- internal_iter->RegisterCleanup(&DBImpl::Unref, this, versions_->current());
+
+ cleanup->mu = &mutex_;
+ cleanup->mem = mem_;
+ cleanup->imm = imm_;
+ cleanup->version = versions_->current();
+ internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, NULL);
mutex_.Unlock();
return internal_iter;
@@ -937,13 +964,6 @@ Iterator* DBImpl::NewIterator(const ReadOptions& options) {
: latest_snapshot));
}
-void DBImpl::Unref(void* arg1, void* arg2) {
- DBImpl* impl = reinterpret_cast<DBImpl*>(arg1);
- Version* v = reinterpret_cast<Version*>(arg2);
- MutexLock l(&impl->mutex_);
- v->Unref();
-}
-
const Snapshot* DBImpl::GetSnapshot() {
MutexLock l(&mutex_);
return snapshots_.New(versions_->LastSequence());
diff --git a/db/db_impl.h b/db/db_impl.h
index 7699d8c..c23ae00 100644
--- a/db/db_impl.h
+++ b/db/db_impl.h
@@ -77,10 +77,6 @@ class DBImpl : public DB {
// Delete any unneeded files and stale in-memory entries.
void DeleteObsoleteFiles();
- // Called when an iterator over a particular version of the
- // descriptor goes away.
- static void Unref(void* arg1, void* arg2);
-
// Compact the in-memory write buffer to disk. Switches to a new
// log-file/memtable and writes a new descriptor iff successful.
Status CompactMemTable();
diff --git a/db/db_test.cc b/db/db_test.cc
index 06565b2..42e70cf 100644
--- a/db/db_test.cc
+++ b/db/db_test.cc
@@ -801,6 +801,101 @@ TEST(DBTest, DBOpen_Options) {
db = NULL;
}
+// Multi-threaded test:
+namespace {
+
+static const int kNumThreads = 4;
+static const int kTestSeconds = 10;
+static const int kNumKeys = 1000;
+
+struct MTState {
+ DBTest* test;
+ port::AtomicPointer stop;
+ port::AtomicPointer counter[kNumThreads];
+ port::AtomicPointer thread_done[kNumThreads];
+};
+
+struct MTThread {
+ MTState* state;
+ int id;
+};
+
+static void MTThreadBody(void* arg) {
+ MTThread* t = reinterpret_cast<MTThread*>(arg);
+ DB* db = t->state->test->db_;
+ uintptr_t counter = 0;
+ fprintf(stderr, "... starting thread %d\n", t->id);
+ Random rnd(1000 + t->id);
+ std::string value;
+ char valbuf[1500];
+ while (t->state->stop.Acquire_Load() == NULL) {
+ t->state->counter[t->id].Release_Store(reinterpret_cast<void*>(counter));
+
+ int key = rnd.Uniform(kNumKeys);
+ char keybuf[20];
+ snprintf(keybuf, sizeof(keybuf), "%016d", key);
+
+ if (rnd.OneIn(2)) {
+ // Write values of the form <key, my id, counter>.
+ // We add some padding for force compactions.
+ snprintf(valbuf, sizeof(valbuf), "%d.%d.%-1000d",
+ key, t->id, static_cast<int>(counter));
+ ASSERT_OK(db->Put(WriteOptions(), Slice(keybuf), Slice(valbuf)));
+ } else {
+ // Read a value and verify that it matches the pattern written above.
+ Status s = db->Get(ReadOptions(), Slice(keybuf), &value);
+ if (s.IsNotFound()) {
+ // Key has not yet been written
+ } else {
+ // Check that the writer thread counter is >= the counter in the value
+ ASSERT_OK(s);
+ int k, w, c;
+ ASSERT_EQ(3, sscanf(value.c_str(), "%d.%d.%d", &k, &w, &c)) << value;
+ ASSERT_EQ(k, key);
+ ASSERT_GE(w, 0);
+ ASSERT_LT(w, kNumThreads);
+ ASSERT_LE(c, reinterpret_cast<uintptr_t>(
+ t->state->counter[w].Acquire_Load()));
+ }
+ }
+ counter++;
+ }
+ t->state->thread_done[t->id].Release_Store(t);
+ fprintf(stderr, "... stopping thread %d after %d ops\n", t->id, int(counter));
+}
+
+}
+
+TEST(DBTest, MultiThreaded) {
+ // Initialize state
+ MTState mt;
+ mt.test = this;
+ mt.stop.Release_Store(0);
+ for (int id = 0; id < kNumThreads; id++) {
+ mt.counter[id].Release_Store(0);
+ mt.thread_done[id].Release_Store(0);
+ }
+
+ // Start threads
+ MTThread thread[kNumThreads];
+ for (int id = 0; id < kNumThreads; id++) {
+ thread[id].state = &mt;
+ thread[id].id = id;
+ env_->StartThread(MTThreadBody, &thread[id]);
+ }
+
+ // Let them run for a while
+ env_->SleepForMicroseconds(kTestSeconds * 1000000);
+
+ // Stop the threads and wait for them to finish
+ mt.stop.Release_Store(&mt);
+ for (int id = 0; id < kNumThreads; id++) {
+ while (mt.thread_done[id].Acquire_Load() == NULL) {
+ env_->SleepForMicroseconds(100000);
+ }
+ }
+}
+
namespace {
typedef std::map<std::string, std::string> KVMap;
}
diff --git a/db/log_test.cc b/db/log_test.cc
index 040bdff..06e0893 100644
--- a/db/log_test.cc
+++ b/db/log_test.cc
@@ -76,7 +76,7 @@ class LogTest {
return Status::OK();
}
- virtual Status Skip(size_t n) {
+ virtual Status Skip(uint64_t n) {
if (n > contents_.size()) {
contents_.clear();
return Status::NotFound("in-memory file skipepd past end");
diff --git a/db/memtable.cc b/db/memtable.cc
index 9c25f6d..687900a 100644
--- a/db/memtable.cc
+++ b/db/memtable.cc
@@ -50,33 +50,24 @@ static const char* EncodeKey(std::string* scratch, const Slice& target) {
class MemTableIterator: public Iterator {
public:
- explicit MemTableIterator(MemTable* mem, MemTable::Table* table) {
- mem_ = mem;
- iter_ = new MemTable::Table::Iterator(table);
- mem->Ref();
- }
- virtual ~MemTableIterator() {
- delete iter_;
- mem_->Unref();
- }
-
- virtual bool Valid() const { return iter_->Valid(); }
- virtual void Seek(const Slice& k) { iter_->Seek(EncodeKey(&tmp_, k)); }
- virtual void SeekToFirst() { iter_->SeekToFirst(); }
- virtual void SeekToLast() { iter_->SeekToLast(); }
- virtual void Next() { iter_->Next(); }
- virtual void Prev() { iter_->Prev(); }
- virtual Slice key() const { return GetLengthPrefixedSlice(iter_->key()); }
+ explicit MemTableIterator(MemTable::Table* table) : iter_(table) { }
+
+ virtual bool Valid() const { return iter_.Valid(); }
+ virtual void Seek(const Slice& k) { iter_.Seek(EncodeKey(&tmp_, k)); }
+ virtual void SeekToFirst() { iter_.SeekToFirst(); }
+ virtual void SeekToLast() { iter_.SeekToLast(); }
+ virtual void Next() { iter_.Next(); }
+ virtual void Prev() { iter_.Prev(); }
+ virtual Slice key() const { return GetLengthPrefixedSlice(iter_.key()); }
virtual Slice value() const {
- Slice key_slice = GetLengthPrefixedSlice(iter_->key());
+ Slice key_slice = GetLengthPrefixedSlice(iter_.key());
return GetLengthPrefixedSlice(key_slice.data() + key_slice.size());
}
virtual Status status() const { return Status::OK(); }
private:
- MemTable* mem_;
- MemTable::Table::Iterator* iter_;
+ MemTable::Table::Iterator iter_;
std::string tmp_; // For passing to EncodeKey
// No copying allowed
@@ -85,7 +76,7 @@ class MemTableIterator: public Iterator {
};
Iterator* MemTable::NewIterator() {
- return new MemTableIterator(this, &table_);
+ return new MemTableIterator(&table_);
}
void MemTable::Add(SequenceNumber s, ValueType type,
diff --git a/port/port.h b/port/port.h
index 816826b..e35db23 100644
--- a/port/port.h
+++ b/port/port.h
@@ -16,6 +16,8 @@
# include "port/port_chromium.h"
#elif defined(LEVELDB_PLATFORM_ANDROID)
# include "port/port_android.h"
+#elif defined(LEVELDB_PLATFORM_OSX)
+# include "port/port_osx.h"
#endif
#endif // STORAGE_LEVELDB_PORT_PORT_H_
diff --git a/port/port_osx.cc b/port/port_osx.cc
new file mode 100644
index 0000000..4ab9e31
--- /dev/null
+++ b/port/port_osx.cc
@@ -0,0 +1,50 @@
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#include "port_osx.h"
+
+#include <cstdlib>
+#include <stdio.h>
+#include <string.h>
+#include "util/logging.h"
+
+namespace leveldb {
+namespace port {
+
+static void PthreadCall(const char* label, int result) {
+ if (result != 0) {
+ fprintf(stderr, "pthread %s: %s\n", label, strerror(result));
+ abort();
+ }
+}
+
+Mutex::Mutex() { PthreadCall("init mutex", pthread_mutex_init(&mu_, NULL)); }
+
+Mutex::~Mutex() { PthreadCall("destroy mutex", pthread_mutex_destroy(&mu_)); }
+
+void Mutex::Lock() { PthreadCall("lock", pthread_mutex_lock(&mu_)); }
+
+void Mutex::Unlock() { PthreadCall("unlock", pthread_mutex_unlock(&mu_)); }
+
+CondVar::CondVar(Mutex* mu)
+ : mu_(mu) {
+ PthreadCall("init cv", pthread_cond_init(&cv_, NULL));
+}
+
+CondVar::~CondVar() { PthreadCall("destroy cv", pthread_cond_destroy(&cv_)); }
+
+void CondVar::Wait() {
+ PthreadCall("wait", pthread_cond_wait(&cv_, &mu_->mu_));
+}
+
+void CondVar::Signal() {
+ PthreadCall("signal", pthread_cond_signal(&cv_));
+}
+
+void CondVar::SignalAll() {
+ PthreadCall("broadcast", pthread_cond_broadcast(&cv_));
+}
+
+}
+}
diff --git a/port/port_osx.h b/port/port_osx.h
new file mode 100644
index 0000000..5524c6c
--- /dev/null
+++ b/port/port_osx.h
@@ -0,0 +1,125 @@
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+//
+// See port_example.h for documentation for the following types/functions.
+
+#ifndef STORAGE_LEVELDB_PORT_PORT_OSX_H_
+#define STORAGE_LEVELDB_PORT_PORT_OSX_H_
+
+#include <libkern/OSAtomic.h>
+#include <machine/endian.h>
+#include <pthread.h>
+#include <stdint.h>
+
+#include <string>
+
+namespace leveldb {
+
+// The following 4 methods implemented here for the benefit of env_posix.cc.
+inline size_t fread_unlocked(void *a, size_t b, size_t c, FILE *d) {
+ return fread(a, b, c, d);
+}
+
+inline size_t fwrite_unlocked(const void *a, size_t b, size_t c, FILE *d) {
+ return fwrite(a, b, c, d);
+}
+
+inline int fflush_unlocked(FILE *f) {
+ return fflush(f);
+}
+
+inline int fdatasync(int fd) {
+ return fsync(fd);
+}
+
+namespace port {
+
+static const bool kLittleEndian = (__DARWIN_BYTE_ORDER == __DARWIN_LITTLE_ENDIAN);
+
+// ------------------ Threading -------------------
+
+// A Mutex represents an exclusive lock.
+class Mutex {
+ public:
+ Mutex();
+ ~Mutex();
+
+ void Lock();
+ void Unlock();
+ void AssertHeld() { }
+
+ private:
+ friend class CondVar;
+ pthread_mutex_t mu_;
+
+ // No copying
+ Mutex(const Mutex&);
+ void operator=(const Mutex&);
+};
+
+class CondVar {
+ public:
+ explicit CondVar(Mutex* mu);
+ ~CondVar();
+
+ void Wait();
+ void Signal();
+ void SignalAll();
+
+ private:
+ pthread_cond_t cv_;
+ Mutex* mu_;
+};
+
+inline void MemoryBarrier() {
+#if defined(__GNUC__) && (defined(__i386__) || defined(__x86_64__))
+ // See http://gcc.gnu.org/ml/gcc/2003-04/msg01180.html for a discussion on
+ // this idiom. Also see http://en.wikipedia.org/wiki/Memory_ordering.
+ __asm__ __volatile__("" : : : "memory");
+#else
+ OSMemoryBarrier();
+#endif
+}
+
+class AtomicPointer {
+ private:
+ void* ptr_;
+ public:
+ AtomicPointer() { }
+ explicit AtomicPointer(void* p) : ptr_(p) {}
+ inline void* Acquire_Load() const {
+ void* ptr = ptr_;
+ MemoryBarrier();
+ return ptr;
+ }
+ inline void Release_Store(void* v) {
+ MemoryBarrier();
+ ptr_ = v;
+ }
+ inline void* NoBarrier_Load() const {
+ return ptr_;
+ }
+ inline void NoBarrier_Store(void* v) {
+ ptr_ = v;
+ }
+};
+
+inline bool Snappy_Compress(const char* input, size_t input_length,
+ std::string* output) {
+ return false;
+}
+
+inline bool Snappy_Uncompress(const char* input_data, size_t input_length,
+ std::string* output) {
+ return false;
+}
+
+inline bool GetHeapProfile(void (*func)(void*, const char*, int), void* arg) {
+ return false;
+}
+
+}
+}
+
+#endif // STORAGE_LEVELDB_PORT_PORT_OSX_H_
diff --git a/table/iterator.cc b/table/iterator.cc
index 4ddd55f..33bc8a2 100644
--- a/table/iterator.cc
+++ b/table/iterator.cc
@@ -3,7 +3,6 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "leveldb/iterator.h"
-#include "util/logging.h"
namespace leveldb {
diff --git a/table/iterator_wrapper.h b/table/iterator_wrapper.h
index 158d3a7..d8ca2b3 100644
--- a/table/iterator_wrapper.h
+++ b/table/iterator_wrapper.h
@@ -12,10 +12,6 @@ namespace leveldb {
// This can help avoid virtual function calls and also gives better
// cache locality.
class IteratorWrapper {
- private:
- Iterator* iter_;
- bool valid_;
- Slice key_;
public:
IteratorWrapper(): iter_(NULL), valid_(false) { }
explicit IteratorWrapper(Iterator* iter): iter_(NULL) {
@@ -56,9 +52,12 @@ class IteratorWrapper {
key_ = iter_->key();
}
}
-};
-}
+ Iterator* iter_;
+ bool valid_;
+ Slice key_;
+};
+} // namespace leveldb
#endif // STORAGE_LEVELDB_TABLE_ITERATOR_WRAPPER_H_
diff --git a/util/cache.cc b/util/cache.cc
index d8a4426..968e6a0 100644
--- a/util/cache.cc
+++ b/util/cache.cc
@@ -4,6 +4,8 @@
#if defined(LEVELDB_PLATFORM_POSIX) || defined(LEVELDB_PLATFORM_ANDROID)
#include <unordered_set>
+#elif defined(LEVELDB_PLATFORM_OSX)
+#include <ext/hash_set>
#elif defined(LEVELDB_PLATFORM_CHROMIUM)
#include "base/hash_tables.h"
#else