summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGreg Farnum <gregf@hq.newdream.net>2009-12-01 13:55:19 -0800
committerGreg Farnum <gregf@hq.newdream.net>2009-12-01 17:32:38 -0800
commit364e045df881da33a4dcf8b1dad2a050f9b07145 (patch)
treed1ac79b656c3dcba9e77db7ab13f9114848943a2
parent8efea576678b72f6d3e58ca1b209d228872eec2b (diff)
downloadceph-364e045df881da33a4dcf8b1dad2a050f9b07145.tar.gz
rados: read benchmark is asynchronous.
-rw-r--r--src/rados_bencher.h189
1 files changed, 157 insertions, 32 deletions
diff --git a/src/rados_bencher.h b/src/rados_bencher.h
index 64a61ec64a9..c8ee1980a7d 100644
--- a/src/rados_bencher.h
+++ b/src/rados_bencher.h
@@ -185,6 +185,11 @@ int write_bench(Rados& rados, rados_pool_t pool,
slot = data->finished % concurrentios;
completions[slot]->wait_for_safe();
dataLock.Lock();
+ r = completions[slot]->get_return_value();
+ if (r != 0) {
+ dataLock.Unlock();
+ return r;
+ }
data->cur_latency = g_clock.now() - start_times[slot];
total_latency += data->cur_latency;
if (data->cur_latency > data->max_latency) data->max_latency = data->cur_latency;
@@ -221,45 +226,165 @@ int write_bench(Rados& rados, rados_pool_t pool,
return 0;
}
-int seq_read_bench(Rados& rados, rados_pool_t pool, bench_data *data) {
+int seq_read_bench(Rados& rados, rados_pool_t pool,
+ int concurrentios, bench_data *write_data, int verify) {
+ bench_data *data = new bench_data();
+ data->done = false;
+ data->object_size = write_data->object_size;
+ data->trans_size = data->object_size;
+ data->in_flight= 0;
+ data->started = 0;
+ data->finished = 0;
+ data->min_latency = 9999.0;
+ data->max_latency = 0;
+ data->avg_latency = 0;
+ data->object_contents = write_data->object_contents;
+
+ Rados::AioCompletion* completions[concurrentios];
+ char* name[concurrentios];
+ bufferlist* contents[concurrentios];
int errors = 0;
- char matchName[128];
- object_t oid;
- bufferlist actualContents;
utime_t start_time;
- utime_t last_start;
+ utime_t start_times[concurrentios];
double total_latency = 0;
- double avg_latency;
- double avg_bw;
int r = 0;
- sanitize_object_contents(data, 128);
- start_time = g_clock.now();
- for (int i = 0; i < data->finished; ++i ) {
- snprintf(matchName, 128, "Object %d", i);
- oid = object_t(matchName);
- snprintf(data->object_contents, data->object_size, "I'm the %dth object!", i);
- last_start = g_clock.now();
- r = rados.read(pool, oid, 0, actualContents, data->object_size);
- if (r != data->object_size) {
- if (r < 0) return r;
- else return -5; //EIO
+ sanitize_object_contents(data, 128); //clean it up once; subsequent
+ //changes will be safe because string length monotonically increases
+
+ //set up initial reads
+ for (int i = 0; i < concurrentios; ++i) {
+ name[i] = new char[128];
+ snprintf(name[i], 128, "Object %d", i);
+ contents[i] = new bufferlist();
+ }
+
+ pthread_t print_thread;
+ pthread_create(&print_thread, NULL, status_printer, (void *)data);
+
+ dataLock.Lock();
+ data->start_time = g_clock.now();
+ //start initial reads
+ for (int i = 0; i < concurrentios; ++i) {
+ start_times[i] = g_clock.now();
+ r = rados.aio_read(pool, name[i], 0, contents[i], data->object_size, &completions[i]);
+ if (r < 0) { //naughty, doesn't clean up heap -- oh, or handle the print thread!
+ cerr << "r = " << r << std::endl;
+ dataLock.Unlock();
+ return -5; //EIO
}
- total_latency += (double) g_clock.now() - last_start;
- if (memcmp(data->object_contents, actualContents.c_str(), data->object_size) != 0 ) {
- cerr << "Object " << matchName << " is not correct!";
- ++errors;
+ ++data->started;
+ ++data->in_flight;
+ }
+ dataLock.Unlock();
+
+ //keep on adding new reads as old ones complete
+ int slot;
+ char* newName;
+ utime_t runtime;
+ bufferlist *cur_contents;
+
+ for (int i = 0; i < write_data->finished - concurrentios; ++i) {
+ slot = data->finished % concurrentios;
+ newName = new char[128];
+ snprintf(newName, 128, "Object %d", data->started);
+ completions[slot]->wait_for_complete();
+ dataLock.Lock();
+ r = completions[slot]->get_return_value();
+ if (r != 0) {
+ cerr << "read got " << r << std::endl;
+ dataLock.Unlock();
+ return r;
}
- actualContents.clear();
+ data->cur_latency = g_clock.now() - start_times[slot];
+ total_latency += data->cur_latency;
+ if( data->cur_latency > data->max_latency) data->max_latency = data->cur_latency;
+ if (data->cur_latency < data->min_latency) data->min_latency = data->cur_latency;
+ ++data->finished;
+ data->avg_latency = total_latency / data->finished;
+ --data->in_flight;
+ dataLock.Unlock();
+ completions[slot]->release();
+ cur_contents = contents[slot];
+
+ //start new read and check data if requested
+ start_times[slot] = g_clock.now();
+ contents[slot] = new bufferlist();
+ r = rados.aio_read(pool, newName, 0, contents[slot], data->object_size, &completions[slot]);
+ if (r < 0)
+ return r;
+ dataLock.Lock();
+ ++data->started;
+ ++data->in_flight;
+ dataLock.Unlock();
+ if (verify) {
+ dataLock.Lock();
+ snprintf(data->object_contents, data->object_size, "I'm the %dth object!", i);
+ dataLock.Unlock();
+ if (memcmp(data->object_contents, cur_contents->c_str(), data->object_size) != 0) {
+ cerr << name[slot] << " is not correct!";
+ ++errors;
+ }
+ }
+ delete name[slot];
+ name[slot] = newName;
+ delete cur_contents;
}
- last_start = g_clock.now();
- avg_latency = total_latency / data->finished;
- avg_bw = data->finished * data->object_size /
- (double)(last_start - start_time) / (1024 *1024);
- cout << "read avg latency: " << avg_latency
- << " read avg bw: " << avg_bw << std::endl;
-
- if (errors) cout << "WARNING: There were " << errors << " total errors in copying!\n";
- else cout << "No errors in copying!\n";
+
+ //wait for final reads to complete
+ while (data->finished < data->started) {
+ slot = data->finished % concurrentios;
+ completions[slot]->wait_for_complete();
+ dataLock.Lock();
+ r = completions[slot]->get_return_value();
+ if (r != 0) {
+ cerr << "read got " << r << std::endl;
+ dataLock.Unlock();
+ return r;
+ }
+ data->cur_latency = g_clock.now() - start_times[slot];
+ total_latency += data->cur_latency;
+ if (data->cur_latency > data->max_latency) data->max_latency = data->cur_latency;
+ if (data->cur_latency < data->min_latency) data->min_latency = data->cur_latency;
+ ++data->finished;
+ data->avg_latency = total_latency / data->finished;
+ --data->in_flight;
+ dataLock.Unlock();
+ completions[slot]-> release();
+ if (verify) {
+ dataLock.Lock();
+ snprintf(data->object_contents, data->object_size, "I'm the %dth object!", data->finished-1);
+ dataLock.Unlock();
+ if (memcmp(data->object_contents, contents[slot]->c_str(), data->object_size) != 0) {
+ cerr << name[slot] << " is not correct!" << std::endl;
+ ++errors;
+ }
+ }
+ delete name[slot];
+ delete contents[slot];
+ }
+
+ runtime = g_clock.now() - data->start_time;
+ dataLock.Lock();
+ data->done = true;
+ dataLock.Unlock();
+
+ pthread_join(print_thread, NULL);
+
+ double bandwidth;
+ bandwidth = ((double)data->finished)*((double)data->object_size)/(double)runtime;
+ bandwidth = bandwidth/(1024*1024); // we want it in MB/sec
+ char bw[20];
+ sprintf(bw, "%.3lf \n", bandwidth);
+
+ cout << "Total time run: " << runtime << std::endl
+ << "Total reads made: " << data->finished << std::endl
+ << "Read size: " << data->object_size << std::endl
+ << "Bandwidth (MB/sec): " << bw << std::endl
+ << "Average Latency: " << data->avg_latency << std::endl
+ << "Max latency: " << data->max_latency << std::endl
+ << "Min latency: " << data->min_latency << std::endl;
+
+ delete data;
return 0;
}