diff options
Diffstat (limited to 'db/db_bench.cc')
-rw-r--r-- | db/db_bench.cc | 530 |
1 files changed, 351 insertions, 179 deletions
diff --git a/db/db_bench.cc b/db/db_bench.cc index 7b4e41a..d3ec61b 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -14,6 +14,7 @@ #include "port/port.h" #include "util/crc32c.h" #include "util/histogram.h" +#include "util/mutexlock.h" #include "util/random.h" #include "util/testutil.h" @@ -60,6 +61,9 @@ static int FLAGS_num = 1000000; // Number of read operations to do. If negative, do FLAGS_num reads. static int FLAGS_reads = -1; +// Number of concurrent threads to run. +static int FLAGS_threads = 1; + // Size of each value static int FLAGS_value_size = 100; @@ -91,8 +95,9 @@ static const char* FLAGS_db = "/tmp/dbbench"; namespace leveldb { -// Helper for quickly generating random data. namespace { + +// Helper for quickly generating random data. class RandomGenerator { private: std::string data_; @@ -136,6 +141,152 @@ static Slice TrimSpace(Slice s) { return Slice(s.data() + start, limit - start); } +static void AppendWithSpace(std::string* str, Slice msg) { + if (msg.empty()) return; + if (!str->empty()) { + str->push_back(' '); + } + str->append(msg.data(), msg.size()); +} + +class Stats { + private: + double start_; + double finish_; + double seconds_; + int done_; + int next_report_; + int64_t bytes_; + double last_op_finish_; + Histogram hist_; + std::string message_; + + public: + Stats() { Start(); } + + void Start() { + next_report_ = 100; + last_op_finish_ = start_; + hist_.Clear(); + done_ = 0; + bytes_ = 0; + seconds_ = 0; + start_ = Env::Default()->NowMicros(); + finish_ = start_; + message_.clear(); + } + + void Merge(const Stats& other) { + hist_.Merge(other.hist_); + done_ += other.done_; + bytes_ += other.bytes_; + seconds_ += other.seconds_; + if (other.start_ < start_) start_ = other.start_; + if (other.finish_ > finish_) finish_ = other.finish_; + + // Just keep the messages from one thread + if (message_.empty()) message_ = other.message_; + } + + void Stop() { + finish_ = Env::Default()->NowMicros(); + seconds_ = (finish_ - start_) * 1e-6; + } + + void AddMessage(Slice msg) { + AppendWithSpace(&message_, msg); + } + + void FinishedSingleOp() { + if (FLAGS_histogram) { + double now = Env::Default()->NowMicros(); + double micros = now - last_op_finish_; + hist_.Add(micros); + if (micros > 20000) { + fprintf(stderr, "long op: %.1f micros%30s\r", micros, ""); + fflush(stderr); + } + last_op_finish_ = now; + } + + done_++; + if (done_ >= next_report_) { + if (next_report_ < 1000) next_report_ += 100; + else if (next_report_ < 5000) next_report_ += 500; + else if (next_report_ < 10000) next_report_ += 1000; + else if (next_report_ < 50000) next_report_ += 5000; + else if (next_report_ < 100000) next_report_ += 10000; + else if (next_report_ < 500000) next_report_ += 50000; + else next_report_ += 100000; + fprintf(stderr, "... finished %d ops%30s\r", done_, ""); + fflush(stderr); + } + } + + void AddBytes(int64_t n) { + bytes_ += n; + } + + void Report(const Slice& name) { + // Pretend at least one op was done in case we are running a benchmark + // that does not call FinishedSingleOp(). + if (done_ < 1) done_ = 1; + + std::string extra; + if (bytes_ > 0) { + // Rate is computed on actual elapsed time, not the sum of per-thread + // elapsed times. + double elapsed = (finish_ - start_) * 1e-6; + char rate[100]; + snprintf(rate, sizeof(rate), "%6.1f MB/s", + (bytes_ / 1048576.0) / elapsed); + extra = rate; + } + AppendWithSpace(&extra, message_); + + fprintf(stdout, "%-12s : %11.3f micros/op;%s%s\n", + name.ToString().c_str(), + seconds_ * 1e6 / done_, + (extra.empty() ? "" : " "), + extra.c_str()); + if (FLAGS_histogram) { + fprintf(stdout, "Microseconds per op:\n%s\n", hist_.ToString().c_str()); + } + fflush(stdout); + } +}; + +// State shared by all concurrent executions of the same benchmark. +struct SharedState { + port::Mutex mu; + port::CondVar cv; + int total; + + // Each thread goes through the following states: + // (1) initializing + // (2) waiting for others to be initialized + // (3) running + // (4) done + + int num_initialized; + int num_done; + bool start; + + SharedState() : cv(&mu) { } +}; + +// Per-thread state for concurrent executions of the same benchmark. +struct ThreadState { + int tid; // 0..n-1 when running in n threads + Random rand; // Has different seeds for different threads + Stats stats; + + ThreadState(int index) + : tid(index), + rand(1000 + index) { + } +}; + } class Benchmark { @@ -143,20 +294,11 @@ class Benchmark { Cache* cache_; DB* db_; int num_; + int value_size_; + int entries_per_batch_; + WriteOptions write_options_; int reads_; int heap_counter_; - double start_; - double last_op_finish_; - int64_t bytes_; - std::string message_; - std::string post_message_; - Histogram hist_; - RandomGenerator gen_; - Random rand_; - - // State kept for progress messages - int done_; - int next_report_; // When to report next void PrintHeader() { const int kKeySize = 16; @@ -232,94 +374,15 @@ class Benchmark { #endif } - void Start() { - start_ = Env::Default()->NowMicros() * 1e-6; - bytes_ = 0; - message_.clear(); - last_op_finish_ = start_; - hist_.Clear(); - done_ = 0; - next_report_ = 100; - } - - void FinishedSingleOp() { - if (FLAGS_histogram) { - double now = Env::Default()->NowMicros() * 1e-6; - double micros = (now - last_op_finish_) * 1e6; - hist_.Add(micros); - if (micros > 20000) { - fprintf(stderr, "long op: %.1f micros%30s\r", micros, ""); - fflush(stderr); - } - last_op_finish_ = now; - } - - done_++; - if (done_ >= next_report_) { - if (next_report_ < 1000) next_report_ += 100; - else if (next_report_ < 5000) next_report_ += 500; - else if (next_report_ < 10000) next_report_ += 1000; - else if (next_report_ < 50000) next_report_ += 5000; - else if (next_report_ < 100000) next_report_ += 10000; - else if (next_report_ < 500000) next_report_ += 50000; - else next_report_ += 100000; - fprintf(stderr, "... finished %d ops%30s\r", done_, ""); - fflush(stderr); - } - } - - void Stop(const Slice& name) { - double finish = Env::Default()->NowMicros() * 1e-6; - - // Pretend at least one op was done in case we are running a benchmark - // that does nto call FinishedSingleOp(). - if (done_ < 1) done_ = 1; - - if (bytes_ > 0) { - char rate[100]; - snprintf(rate, sizeof(rate), "%6.1f MB/s", - (bytes_ / 1048576.0) / (finish - start_)); - if (!message_.empty()) { - message_ = std::string(rate) + " " + message_; - } else { - message_ = rate; - } - } - - fprintf(stdout, "%-12s : %11.3f micros/op;%s%s\n", - name.ToString().c_str(), - (finish - start_) * 1e6 / done_, - (message_.empty() ? "" : " "), - message_.c_str()); - if (FLAGS_histogram) { - fprintf(stdout, "Microseconds per op:\n%s\n", hist_.ToString().c_str()); - } - fflush(stdout); - - if (!post_message_.empty()) { - fprintf(stdout, "\n%s\n", post_message_.c_str()); - post_message_.clear(); - } - } - public: - enum Order { - SEQUENTIAL, - RANDOM - }; - enum DBState { - FRESH, - EXISTING - }; - Benchmark() : cache_(FLAGS_cache_size >= 0 ? NewLRUCache(FLAGS_cache_size) : NULL), db_(NULL), num_(FLAGS_num), + value_size_(FLAGS_value_size), + entries_per_batch_(1), reads_(FLAGS_reads < 0 ? FLAGS_num : FLAGS_reads), - heap_counter_(0), - bytes_(0), - rand_(301) { + heap_counter_(0) { std::vector<std::string> files; Env::Default()->GetChildren(FLAGS_db, &files); for (int i = 0; i < files.size(); i++) { @@ -353,98 +416,203 @@ class Benchmark { benchmarks = sep + 1; } - Start(); + // Reset parameters that may be overriddden bwlow + num_ = FLAGS_num; + reads_ = num_; + value_size_ = FLAGS_value_size; + entries_per_batch_ = 1; + write_options_ = WriteOptions(); + + void (Benchmark::*method)(ThreadState*) = NULL; + bool fresh_db = false; - WriteOptions write_options; - bool known = true; if (name == Slice("fillseq")) { - Write(write_options, SEQUENTIAL, FRESH, num_, FLAGS_value_size, 1); + fresh_db = true; + method = &Benchmark::WriteSeq; } else if (name == Slice("fillbatch")) { - Write(write_options, SEQUENTIAL, FRESH, num_, FLAGS_value_size, 1000); + fresh_db = true; + entries_per_batch_ = 1000; + method = &Benchmark::WriteSeq; } else if (name == Slice("fillrandom")) { - Write(write_options, RANDOM, FRESH, num_, FLAGS_value_size, 1); + fresh_db = true; + method = &Benchmark::WriteRandom; } else if (name == Slice("overwrite")) { - Write(write_options, RANDOM, EXISTING, num_, FLAGS_value_size, 1); + fresh_db = false; + method = &Benchmark::WriteRandom; } else if (name == Slice("fillsync")) { - write_options.sync = true; - Write(write_options, RANDOM, FRESH, num_ / 1000, FLAGS_value_size, 1); + fresh_db = true; + num_ /= 1000; + write_options_.sync = true; + method = &Benchmark::WriteRandom; } else if (name == Slice("fill100K")) { - Write(write_options, RANDOM, FRESH, num_ / 1000, 100 * 1000, 1); + fresh_db = true; + num_ /= 1000; + value_size_ = 100 * 1000; + method = &Benchmark::WriteRandom; } else if (name == Slice("readseq")) { - ReadSequential(); + method = &Benchmark::ReadSequential; } else if (name == Slice("readreverse")) { - ReadReverse(); + method = &Benchmark::ReadReverse; } else if (name == Slice("readrandom")) { - ReadRandom(); + method = &Benchmark::ReadRandom; } else if (name == Slice("readhot")) { - ReadHot(); + method = &Benchmark::ReadHot; } else if (name == Slice("readrandomsmall")) { - int n = reads_; reads_ /= 1000; - ReadRandom(); - reads_ = n; + method = &Benchmark::ReadRandom; } else if (name == Slice("compact")) { - Compact(); + method = &Benchmark::Compact; } else if (name == Slice("crc32c")) { - Crc32c(4096, "(4K per op)"); + method = &Benchmark::Crc32c; } else if (name == Slice("acquireload")) { - AcquireLoad(); + method = &Benchmark::AcquireLoad; } else if (name == Slice("snappycomp")) { - SnappyCompress(); + method = &Benchmark::SnappyCompress; } else if (name == Slice("snappyuncomp")) { - SnappyUncompress(); + method = &Benchmark::SnappyUncompress; } else if (name == Slice("heapprofile")) { HeapProfile(); } else if (name == Slice("stats")) { PrintStats(); } else { - known = false; if (name != Slice()) { // No error message for empty name fprintf(stderr, "unknown benchmark '%s'\n", name.ToString().c_str()); } } - if (known) { - Stop(name); + + if (fresh_db) { + if (FLAGS_use_existing_db) { + fprintf(stdout, "%-12s : skipped (--use_existing_db is true)\n", + name.ToString().c_str()); + method = NULL; + } else { + delete db_; + db_ = NULL; + DestroyDB(FLAGS_db, Options()); + Open(); + } + } + + if (method != NULL) { + RunBenchmark(name, method); } } } private: - void Crc32c(int size, const char* label) { + struct ThreadArg { + Benchmark* bm; + SharedState* shared; + ThreadState* thread; + void (Benchmark::*method)(ThreadState*); + }; + + static void ThreadBody(void* v) { + ThreadArg* arg = reinterpret_cast<ThreadArg*>(v); + SharedState* shared = arg->shared; + ThreadState* thread = arg->thread; + { + MutexLock l(&shared->mu); + shared->num_initialized++; + if (shared->num_initialized >= shared->total) { + shared->cv.SignalAll(); + } + while (!shared->start) { + shared->cv.Wait(); + } + } + + thread->stats.Start(); + (arg->bm->*(arg->method))(thread); + thread->stats.Stop(); + + { + MutexLock l(&shared->mu); + shared->num_done++; + if (shared->num_done >= shared->total) { + shared->cv.SignalAll(); + } + } + } + + void RunBenchmark(Slice name, void (Benchmark::*method)(ThreadState*)) { + const int n = FLAGS_threads; + SharedState shared; + shared.total = n; + shared.num_initialized = 0; + shared.num_done = 0; + shared.start = false; + + ThreadArg* arg = new ThreadArg[n]; + for (int i = 0; i < n; i++) { + arg[i].bm = this; + arg[i].method = method; + arg[i].shared = &shared; + arg[i].thread = new ThreadState(i); + Env::Default()->StartThread(ThreadBody, &arg[i]); + } + + shared.mu.Lock(); + while (shared.num_initialized < n) { + shared.cv.Wait(); + } + + shared.start = true; + shared.cv.SignalAll(); + while (shared.num_done < n) { + shared.cv.Wait(); + } + shared.mu.Unlock(); + + for (int i = 1; i < n; i++) { + arg[0].thread->stats.Merge(arg[i].thread->stats); + } + arg[0].thread->stats.Report(name); + + for (int i = 0; i < n; i++) { + delete arg[i].thread; + } + delete[] arg; + } + + void Crc32c(ThreadState* thread) { // Checksum about 500MB of data total + const int size = 4096; + const char* label = "(4K per op)"; std::string data(size, 'x'); int64_t bytes = 0; uint32_t crc = 0; while (bytes < 500 * 1048576) { crc = crc32c::Value(data.data(), size); - FinishedSingleOp(); + thread->stats.FinishedSingleOp(); bytes += size; } // Print so result is not dead fprintf(stderr, "... crc=0x%x\r", static_cast<unsigned int>(crc)); - bytes_ = bytes; - message_ = label; + thread->stats.AddBytes(bytes); + thread->stats.AddMessage(label); } - void AcquireLoad() { + void AcquireLoad(ThreadState* thread) { int dummy; port::AtomicPointer ap(&dummy); int count = 0; void *ptr = NULL; - message_ = "(each op is 1000 loads)"; + thread->stats.AddMessage("(each op is 1000 loads)"); while (count < 100000) { for (int i = 0; i < 1000; i++) { ptr = ap.Acquire_Load(); } count++; - FinishedSingleOp(); + thread->stats.FinishedSingleOp(); } if (ptr == NULL) exit(1); // Disable unused variable warning. } - void SnappyCompress() { - Slice input = gen_.Generate(Options().block_size); + void SnappyCompress(ThreadState* thread) { + RandomGenerator gen; + Slice input = gen.Generate(Options().block_size); int64_t bytes = 0; int64_t produced = 0; bool ok = true; @@ -453,22 +621,23 @@ class Benchmark { ok = port::Snappy_Compress(input.data(), input.size(), &compressed); produced += compressed.size(); bytes += input.size(); - FinishedSingleOp(); + thread->stats.FinishedSingleOp(); } if (!ok) { - message_ = "(snappy failure)"; + thread->stats.AddMessage("(snappy failure)"); } else { char buf[100]; snprintf(buf, sizeof(buf), "(output: %.1f%%)", (produced * 100.0) / bytes); - message_ = buf; - bytes_ = bytes; + thread->stats.AddMessage(buf); + thread->stats.AddBytes(bytes); } } - void SnappyUncompress() { - Slice input = gen_.Generate(Options().block_size); + void SnappyUncompress(ThreadState* thread) { + RandomGenerator gen; + Slice input = gen.Generate(Options().block_size); std::string compressed; bool ok = port::Snappy_Compress(input.data(), input.size(), &compressed); int64_t bytes = 0; @@ -477,14 +646,14 @@ class Benchmark { ok = port::Snappy_Uncompress(compressed.data(), compressed.size(), uncompressed); bytes += input.size(); - FinishedSingleOp(); + thread->stats.FinishedSingleOp(); } delete[] uncompressed; if (!ok) { - message_ = "(snappy failure)"; + thread->stats.AddMessage("(snappy failure)"); } else { - bytes_ = bytes; + thread->stats.AddBytes(bytes); } } @@ -501,95 +670,97 @@ class Benchmark { } } - void Write(const WriteOptions& options, Order order, DBState state, - int num_entries, int value_size, int entries_per_batch) { - if (state == FRESH) { - if (FLAGS_use_existing_db) { - message_ = "skipping (--use_existing_db is true)"; - return; - } - delete db_; - db_ = NULL; - DestroyDB(FLAGS_db, Options()); - Open(); - Start(); // Do not count time taken to destroy/open - } + void WriteSeq(ThreadState* thread) { + DoWrite(thread, true); + } - if (num_entries != num_) { + void WriteRandom(ThreadState* thread) { + DoWrite(thread, false); + } + + void DoWrite(ThreadState* thread, bool seq) { + if (num_ != FLAGS_num) { char msg[100]; - snprintf(msg, sizeof(msg), "(%d ops)", num_entries); - message_ = msg; + snprintf(msg, sizeof(msg), "(%d ops)", num_); + thread->stats.AddMessage(msg); } + RandomGenerator gen; WriteBatch batch; Status s; std::string val; - for (int i = 0; i < num_entries; i += entries_per_batch) { + int64_t bytes = 0; + for (int i = 0; i < num_; i += entries_per_batch_) { batch.Clear(); - for (int j = 0; j < entries_per_batch; j++) { - const int k = (order == SEQUENTIAL) ? i+j : (rand_.Next() % FLAGS_num); + for (int j = 0; j < entries_per_batch_; j++) { + const int k = seq ? i+j : (thread->rand.Next() % FLAGS_num); char key[100]; snprintf(key, sizeof(key), "%016d", k); - batch.Put(key, gen_.Generate(value_size)); - bytes_ += value_size + strlen(key); - FinishedSingleOp(); + batch.Put(key, gen.Generate(value_size_)); + bytes += value_size_ + strlen(key); + thread->stats.FinishedSingleOp(); } - s = db_->Write(options, &batch); + s = db_->Write(write_options_, &batch); if (!s.ok()) { fprintf(stderr, "put error: %s\n", s.ToString().c_str()); exit(1); } } + thread->stats.AddBytes(bytes); } - void ReadSequential() { + void ReadSequential(ThreadState* thread) { Iterator* iter = db_->NewIterator(ReadOptions()); int i = 0; + int64_t bytes = 0; for (iter->SeekToFirst(); i < reads_ && iter->Valid(); iter->Next()) { - bytes_ += iter->key().size() + iter->value().size(); - FinishedSingleOp(); + bytes += iter->key().size() + iter->value().size(); + thread->stats.FinishedSingleOp(); ++i; } delete iter; + thread->stats.AddBytes(bytes); } - void ReadReverse() { + void ReadReverse(ThreadState* thread) { Iterator* iter = db_->NewIterator(ReadOptions()); int i = 0; + int64_t bytes = 0; for (iter->SeekToLast(); i < reads_ && iter->Valid(); iter->Prev()) { - bytes_ += iter->key().size() + iter->value().size(); - FinishedSingleOp(); + bytes += iter->key().size() + iter->value().size(); + thread->stats.FinishedSingleOp(); ++i; } delete iter; + thread->stats.AddBytes(bytes); } - void ReadRandom() { + void ReadRandom(ThreadState* thread) { ReadOptions options; std::string value; for (int i = 0; i < reads_; i++) { char key[100]; - const int k = rand_.Next() % FLAGS_num; + const int k = thread->rand.Next() % FLAGS_num; snprintf(key, sizeof(key), "%016d", k); db_->Get(options, key, &value); - FinishedSingleOp(); + thread->stats.FinishedSingleOp(); } } - void ReadHot() { + void ReadHot(ThreadState* thread) { ReadOptions options; std::string value; const int range = (FLAGS_num + 99) / 100; for (int i = 0; i < reads_; i++) { char key[100]; - const int k = rand_.Next() % range; + const int k = thread->rand.Next() % range; snprintf(key, sizeof(key), "%016d", k); db_->Get(options, key, &value); - FinishedSingleOp(); + thread->stats.FinishedSingleOp(); } } - void Compact() { + void Compact(ThreadState* thread) { DBImpl* dbi = reinterpret_cast<DBImpl*>(db_); dbi->TEST_CompactMemTable(); int max_level_with_files = 1; @@ -609,10 +780,9 @@ class Benchmark { void PrintStats() { std::string stats; if (!db_->GetProperty("leveldb.stats", &stats)) { - message_ = "(failed)"; - } else { - post_message_ = stats; + stats = "(failed)"; } + fprintf(stdout, "\n%s\n", stats.c_str()); } static void WriteToFile(void* arg, const char* buf, int n) { @@ -625,13 +795,13 @@ class Benchmark { WritableFile* file; Status s = Env::Default()->NewWritableFile(fname, &file); if (!s.ok()) { - message_ = s.ToString(); + fprintf(stderr, "%s\n", s.ToString().c_str()); return; } bool ok = port::GetHeapProfile(WriteToFile, file); delete file; if (!ok) { - message_ = "not supported"; + fprintf(stderr, "heap profiling not supported\n"); Env::Default()->DeleteFile(fname); } } @@ -661,6 +831,8 @@ int main(int argc, char** argv) { FLAGS_num = n; } else if (sscanf(argv[i], "--reads=%d%c", &n, &junk) == 1) { FLAGS_reads = n; + } else if (sscanf(argv[i], "--threads=%d%c", &n, &junk) == 1) { + FLAGS_threads = n; } else if (sscanf(argv[i], "--value_size=%d%c", &n, &junk) == 1) { FLAGS_value_size = n; } else if (sscanf(argv[i], "--write_buffer_size=%d%c", &n, &junk) == 1) { |