// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab /* * Ceph - scalable distributed file system * * Copyright (C) 2004-2006 Sage Weil * * This is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License version 2.1, as published by the Free Software * Foundation. See file COPYING. * */ #include #include "os/FileStore.h" #include "global/global_init.h" #include "common/ceph_argparse.h" #include "common/debug.h" #undef dout_prefix #define dout_prefix *_dout struct io { utime_t start, ack, commit; bool done() { return ack.sec() && commit.sec(); } }; map writes; Cond cond; Mutex lock("streamtest.cc lock"); unsigned concurrent = 1; void throttle() { Mutex::Locker l(lock); while (writes.size() >= concurrent) { //generic_dout(0) << "waiting" << dendl; cond.Wait(lock); } } double total_ack = 0; double total_commit = 0; int total_num = 0; void pr(off_t off) { io &i = writes[off]; if (false) cout << off << "\t" << (i.ack - i.start) << "\t" << (i.commit - i.start) << std::endl; total_num++; total_ack += (i.ack - i.start); total_commit += (i.commit - i.start); writes.erase(off); cond.Signal(); } void set_start(off_t off, utime_t t) { Mutex::Locker l(lock); writes[off].start = t; } void set_ack(off_t off, utime_t t) { Mutex::Locker l(lock); //generic_dout(0) << "ack " << off << dendl; writes[off].ack = t; if (writes[off].done()) pr(off); } void set_commit(off_t off, utime_t t) { Mutex::Locker l(lock); //generic_dout(0) << "commit " << off << dendl; writes[off].commit = t; if (writes[off].done()) pr(off); } struct C_Ack : public Context { off_t off; C_Ack(off_t o) : off(o) {} void finish(int r) { set_ack(off, ceph_clock_now(g_ceph_context)); } }; struct C_Commit : public Context { off_t off; C_Commit(off_t o) : off(o) {} void finish(int r) { set_commit(off, ceph_clock_now(g_ceph_context)); } }; int main(int argc, const char **argv) { vector args; argv_to_vec(argc, argv, args); env_to_vec(args); global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT, CODE_ENVIRONMENT_UTILITY, 0); common_init_finish(g_ceph_context); // args if (args.size() < 3) return -1; const char *filename = args[0]; int seconds = atoi(args[1]); int bytes = atoi(args[2]); const char *journal = 0; if (args.size() >= 4) journal = args[3]; if (args.size() >= 5) concurrent = atoi(args[4]); cout << "concurrent = " << concurrent << std::endl; buffer::ptr bp(bytes); bp.zero(); bufferlist bl; bl.push_back(bp); //float interval = 1.0 / 1000; cout << "#dev " << filename << ", " << seconds << " seconds, " << bytes << " bytes per write" << std::endl; ObjectStore *fs = new FileStore(filename, journal); if (fs->mkfs() < 0) { cout << "mkfs failed" << std::endl; return -1; } if (fs->mount() < 0) { cout << "mount failed" << std::endl; return -1; } ObjectStore::Transaction ft; ft.create_collection(coll_t()); fs->apply_transaction(ft); utime_t now = ceph_clock_now(g_ceph_context); utime_t start = now; utime_t end = now; end += seconds; off_t pos = 0; //cout << "stop at " << end << std::endl; cout << "# offset\tack\tcommit" << std::endl; while (now < end) { sobject_t poid(object_t("streamtest"), 0); set_start(pos, ceph_clock_now(g_ceph_context)); ObjectStore::Transaction *t = new ObjectStore::Transaction; t->write(coll_t(), hobject_t(poid), pos, bytes, bl); fs->queue_transaction(NULL, t, new C_Ack(pos), new C_Commit(pos)); pos += bytes; throttle(); now = ceph_clock_now(g_ceph_context); // wait? /* utime_t next = start; next += interval; if (now < next) { float s = next - now; s *= 1000 * 1000; // s -> us //cout << "sleeping for " << s << " us" << std::endl; usleep((int)s); } */ } cout << "total num " << total_num << std::endl; cout << "avg ack\t" << (total_ack / (double)total_num) << std::endl; cout << "avg commit\t" << (total_commit / (double)total_num) << std::endl; cout << "tput\t" << prettybyte_t((double)(total_num * bytes) / (double)(end-start)) << "/sec" << std::endl; fs->umount(); }