diff options
author | Sage Weil <sage@inktank.com> | 2013-01-22 16:13:14 -0800 |
---|---|---|
committer | Sage Weil <sage@inktank.com> | 2013-01-22 16:13:14 -0800 |
commit | 8eee815fb2686d505ec0ad4e3d3239e323990610 (patch) | |
tree | dd679e33ff3ecb7d86a42bde6ee65cb38688fc9c | |
parent | eaf20fa94bf23f268a6d84fa0e9845fc1adf4c79 (diff) | |
parent | 73a969366c8bbd105579611320c43e2334907fef (diff) | |
download | ceph-8eee815fb2686d505ec0ad4e3d3239e323990610.tar.gz |
Merge remote-tracking branch 'gh/wip-3833-b'
Conflicts:
src/osd/OSD.cc
src/osd/OSD.h
Reviewed-by: Samuel Just <sam.just@inktank.com>
-rw-r--r-- | src/common/AsyncReserver.h | 9 | ||||
-rw-r--r-- | src/common/PrioritizedQueue.h | 89 | ||||
-rw-r--r-- | src/common/config_opts.h | 12 | ||||
-rw-r--r-- | src/messages/MOSDSubOp.h | 6 | ||||
-rw-r--r-- | src/msg/DispatchQueue.cc | 4 | ||||
-rw-r--r-- | src/msg/DispatchQueue.h | 2 | ||||
-rw-r--r-- | src/msg/Message.h | 4 | ||||
-rw-r--r-- | src/os/FileStore.cc | 12 | ||||
-rw-r--r-- | src/osd/OSD.cc | 111 | ||||
-rw-r--r-- | src/osd/OSD.h | 34 | ||||
-rw-r--r-- | src/osd/OpRequest.cc | 3 | ||||
-rw-r--r-- | src/osd/OpRequest.h | 23 | ||||
-rw-r--r-- | src/osd/PG.cc | 2 | ||||
-rw-r--r-- | src/osd/ReplicatedPG.cc | 78 |
14 files changed, 279 insertions, 110 deletions
diff --git a/src/common/AsyncReserver.h b/src/common/AsyncReserver.h index 755d11a6753..8cc2258d7b4 100644 --- a/src/common/AsyncReserver.h +++ b/src/common/AsyncReserver.h @@ -28,7 +28,7 @@ template <typename T> class AsyncReserver { Finisher *f; - const unsigned max_allowed; + unsigned max_allowed; Mutex lock; list<pair<T, Context*> > queue; @@ -51,6 +51,13 @@ public: unsigned max_allowed) : f(f), max_allowed(max_allowed), lock("AsyncReserver::lock") {} + void set_max(unsigned max) { + Mutex::Locker l(lock); + assert(max > 0); + max_allowed = max; + do_queues(); + } + /** * Requests a reservation * diff --git a/src/common/PrioritizedQueue.h b/src/common/PrioritizedQueue.h index 9c27ddc3860..6dcb519da40 100644 --- a/src/common/PrioritizedQueue.h +++ b/src/common/PrioritizedQueue.h @@ -16,6 +16,7 @@ #define PRIORITY_QUEUE_H #include "common/Mutex.h" +#include "common/Formatter.h" #include <map> #include <utility> @@ -45,6 +46,8 @@ template <typename T, typename K> class PrioritizedQueue { int64_t total_priority; + int64_t max_tokens_per_subqueue; + int64_t min_cost; template <class F> static unsigned filter_list_pairs( @@ -76,22 +79,39 @@ class PrioritizedQueue { struct SubQueue { private: map<K, list<pair<unsigned, T> > > q; - unsigned bucket; + unsigned tokens, max_tokens; int64_t size; typename map<K, list<pair<unsigned, T> > >::iterator cur; public: SubQueue(const SubQueue &other) - : q(other.q), bucket(other.bucket), size(other.size), + : q(other.q), + tokens(other.tokens), + max_tokens(other.max_tokens), + size(other.size), cur(q.begin()) {} - SubQueue() : bucket(0), size(0), cur(q.begin()) {} + SubQueue() + : tokens(0), + max_tokens(0), + size(0) {} + void set_max_tokens(unsigned mt) { + max_tokens = mt; + } + unsigned get_max_tokens() const { + return max_tokens; + } unsigned num_tokens() const { - return bucket; + return tokens; } - void put_tokens(unsigned tokens) { - bucket += tokens; + void put_tokens(unsigned t) { + tokens += t; + if (tokens > max_tokens) + tokens = max_tokens; } - void take_tokens(unsigned tokens) { - bucket -= tokens; + void take_tokens(unsigned t) { + if (tokens > t) + tokens -= t; + else + tokens = 0; } void enqueue(K cl, unsigned cost, T item) { q[cl].push_back(make_pair(cost, item)); @@ -166,6 +186,13 @@ class PrioritizedQueue { } q.erase(i); } + + void dump(Formatter *f) const { + f->dump_int("tokens", tokens); + f->dump_int("max_tokens", max_tokens); + f->dump_int("size", size); + f->dump_int("num_keys", q.size()); + } }; map<unsigned, SubQueue> high_queue; map<unsigned, SubQueue> queue; @@ -175,7 +202,9 @@ class PrioritizedQueue { if (p != queue.end()) return &p->second; total_priority += priority; - return &queue[priority]; + SubQueue *sq = &queue[priority]; + sq->set_max_tokens(max_tokens_per_subqueue); + return sq; } void remove_queue(unsigned priority) { @@ -196,7 +225,11 @@ class PrioritizedQueue { } public: - PrioritizedQueue() : total_priority(0) {} + PrioritizedQueue(unsigned max_per, unsigned min_c) + : total_priority(0), + max_tokens_per_subqueue(max_per), + min_cost(min_c) + {} unsigned length() { unsigned total = 0; @@ -276,10 +309,14 @@ public: } void enqueue(K cl, unsigned priority, unsigned cost, T item) { + if (cost < min_cost) + cost = min_cost; create_queue(priority)->enqueue(cl, cost, item); } void enqueue_front(K cl, unsigned priority, unsigned cost, T item) { + if (cost < min_cost) + cost = min_cost; create_queue(priority)->enqueue_front(cl, cost, item); } @@ -300,6 +337,9 @@ public: return ret; } + // if there are multiple buckets/subqueues with sufficient tokens, + // we behave like a strict priority queue among all subqueues that + // are eligible to run. for (typename map<unsigned, SubQueue>::iterator i = queue.begin(); i != queue.end(); ++i) { @@ -315,6 +355,9 @@ public: return ret; } } + + // if no subqueues have sufficient tokens, we behave like a strict + // priority queue. T ret = queue.rbegin()->second.front().second; unsigned cost = queue.rbegin()->second.front().first; queue.rbegin()->second.pop_front(); @@ -323,6 +366,32 @@ public: distribute_tokens(cost); return ret; } + + void dump(Formatter *f) const { + f->dump_int("total_priority", total_priority); + f->dump_int("max_tokens_per_subqueue", max_tokens_per_subqueue); + f->dump_int("min_cost", min_cost); + f->open_array_section("high_queues"); + for (typename map<unsigned, SubQueue>::const_iterator p = high_queue.begin(); + p != high_queue.end(); + ++p) { + f->open_object_section("subqueue"); + f->dump_int("priority", p->first); + p->second.dump(f); + f->close_section(); + } + f->close_section(); + f->open_array_section("queues"); + for (typename map<unsigned, SubQueue>::const_iterator p = queue.begin(); + p != queue.end(); + ++p) { + f->open_object_section("subqueue"); + f->dump_int("priority", p->first); + p->second.dump(f); + f->close_section(); + } + f->close_section(); + } }; #endif diff --git a/src/common/config_opts.h b/src/common/config_opts.h index a5df112576f..7eb3e94fadc 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -105,6 +105,8 @@ OPTION(ms_bind_port_min, OPT_INT, 6800) OPTION(ms_bind_port_max, OPT_INT, 7100) OPTION(ms_rwthread_stack_bytes, OPT_U64, 1024 << 10) OPTION(ms_tcp_read_timeout, OPT_U64, 900) +OPTION(ms_pq_max_tokens_per_priority, OPT_U64, 4194304) +OPTION(ms_pq_min_cost, OPT_U64, 65536) OPTION(ms_inject_socket_failures, OPT_U64, 0) OPTION(ms_inject_delay_type, OPT_STR, "") // "osd mds mon client" allowed OPTION(ms_inject_delay_max, OPT_DOUBLE, 1) // seconds @@ -315,6 +317,8 @@ OPTION(osd_map_dedup, OPT_BOOL, true) OPTION(osd_map_cache_size, OPT_INT, 500) OPTION(osd_map_message_max, OPT_INT, 100) // max maps per MOSDMap message OPTION(osd_op_threads, OPT_INT, 2) // 0 == no threading +OPTION(osd_op_pq_max_tokens_per_priority, OPT_U64, 4194304) +OPTION(osd_op_pq_min_cost, OPT_U64, 65536) OPTION(osd_disk_threads, OPT_INT, 1) OPTION(osd_recovery_threads, OPT_INT, 1) OPTION(osd_recover_clone_overlap, OPT_BOOL, true) // preserve clone_overlap during recovery/migration @@ -370,7 +374,7 @@ OPTION(osd_debug_drop_pg_create_duration, OPT_INT, 1) OPTION(osd_debug_drop_op_probability, OPT_DOUBLE, 0) // probability of stalling/dropping a client op OPTION(osd_op_history_size, OPT_U32, 20) // Max number of completed ops to track OPTION(osd_op_history_duration, OPT_U32, 600) // Oldest completed op to track -OPTION(osd_target_transaction_size, OPT_INT, 300) // to adjust various transactions that batch smaller items +OPTION(osd_target_transaction_size, OPT_INT, 30) // to adjust various transactions that batch smaller items /** * osd_client_op_priority and osd_recovery_op_priority adjust the relative @@ -409,10 +413,10 @@ OPTION(filestore_sync_flush, OPT_BOOL, false) OPTION(filestore_journal_parallel, OPT_BOOL, false) OPTION(filestore_journal_writeahead, OPT_BOOL, false) OPTION(filestore_journal_trailing, OPT_BOOL, false) -OPTION(filestore_queue_max_ops, OPT_INT, 500) +OPTION(filestore_queue_max_ops, OPT_INT, 50) OPTION(filestore_queue_max_bytes, OPT_INT, 100 << 20) -OPTION(filestore_queue_committing_max_ops, OPT_INT, 500) // this is ON TOP of filestore_queue_max_* -OPTION(filestore_queue_committing_max_bytes, OPT_INT, 100 << 20) // " +OPTION(filestore_queue_committing_max_ops, OPT_INT, 0) // this is ON TOP of filestore_queue_max_* +OPTION(filestore_queue_committing_max_bytes, OPT_INT, 0) // " OPTION(filestore_op_threads, OPT_INT, 2) OPTION(filestore_op_thread_timeout, OPT_INT, 60) OPTION(filestore_op_thread_suicide_timeout, OPT_INT, 180) diff --git a/src/messages/MOSDSubOp.h b/src/messages/MOSDSubOp.h index e69042b121a..50b1a926957 100644 --- a/src/messages/MOSDSubOp.h +++ b/src/messages/MOSDSubOp.h @@ -86,6 +86,12 @@ public: // indicates that we must fix hobject_t encoding bool hobject_incorrect_pool; + int get_cost() const { + if (ops.size() == 1 && ops[0].op.op == CEPH_OSD_OP_PULL) + return ops[0].op.extent.length; + return data.length(); + } + virtual void decode_payload() { hobject_incorrect_pool = false; bufferlist::iterator p = payload.begin(); diff --git a/src/msg/DispatchQueue.cc b/src/msg/DispatchQueue.cc index 04e405581a7..31f37cfd5f6 100644 --- a/src/msg/DispatchQueue.cc +++ b/src/msg/DispatchQueue.cc @@ -37,7 +37,7 @@ void DispatchQueue::enqueue(Message *m, int priority, uint64_t id) id, priority, QueueItem(m)); } else { mqueue.enqueue( - id, priority, m->get_data().length(), QueueItem(m)); + id, priority, m->get_cost(), QueueItem(m)); } cond.Signal(); } @@ -51,7 +51,7 @@ void DispatchQueue::local_delivery(Message *m, int priority) 0, priority, QueueItem(m)); } else { mqueue.enqueue( - 0, priority, m->get_data().length(), QueueItem(m)); + 0, priority, m->get_cost(), QueueItem(m)); } cond.Signal(); } diff --git a/src/msg/DispatchQueue.h b/src/msg/DispatchQueue.h index ea44c165d56..884e0269342 100644 --- a/src/msg/DispatchQueue.h +++ b/src/msg/DispatchQueue.h @@ -151,6 +151,8 @@ class DispatchQueue { DispatchQueue(CephContext *cct, SimpleMessenger *msgr) : cct(cct), msgr(msgr), lock("SimpleMessenger::DispatchQeueu::lock"), + mqueue(cct->_conf->ms_pq_max_tokens_per_priority, + cct->_conf->ms_pq_min_cost), next_pipe_id(1), dispatch_thread(this), stop(false) diff --git a/src/msg/Message.h b/src/msg/Message.h index 95f165a43dd..5bdd4d463b6 100644 --- a/src/msg/Message.h +++ b/src/msg/Message.h @@ -424,6 +424,10 @@ public: footer.data_crc = data.crc32c(0); } + virtual int get_cost() const { + return data.length(); + } + // type int get_type() const { return header.type; } void set_type(int t) { header.type = t; } diff --git a/src/os/FileStore.cc b/src/os/FileStore.cc index 4e208350902..5976e68b6d2 100644 --- a/src/os/FileStore.cc +++ b/src/os/FileStore.cc @@ -4629,6 +4629,10 @@ const char** FileStore::get_tracked_conf_keys() const static const char* KEYS[] = { "filestore_min_sync_interval", "filestore_max_sync_interval", + "filestore_queue_max_ops", + "filestore_queue_max_bytes", + "filestore_queue_committing_max_ops", + "filestore_queue_committing_max_bytes", "filestore_flusher", "filestore_flusher_max_fds", "filestore_sync_flush", @@ -4646,6 +4650,10 @@ void FileStore::handle_conf_change(const struct md_config_t *conf, { if (changed.count("filestore_min_sync_interval") || changed.count("filestore_max_sync_interval") || + changed.count("filestore_queue_max_ops") || + changed.count("filestore_queue_max_bytes") || + changed.count("filestore_queue_committing_max_ops") || + changed.count("filestore_queue_committing_max_bytes") || changed.count("filestore_flusher_max_fds") || changed.count("filestore_flush_min") || changed.count("filestore_kill_at") || @@ -4653,6 +4661,10 @@ void FileStore::handle_conf_change(const struct md_config_t *conf, Mutex::Locker l(lock); m_filestore_min_sync_interval = conf->filestore_min_sync_interval; m_filestore_max_sync_interval = conf->filestore_max_sync_interval; + m_filestore_queue_max_ops = conf->filestore_queue_max_ops; + m_filestore_queue_max_bytes = conf->filestore_queue_max_bytes; + m_filestore_queue_committing_max_ops = conf->filestore_queue_committing_max_ops; + m_filestore_queue_committing_max_bytes = conf->filestore_queue_committing_max_bytes; m_filestore_flusher = conf->filestore_flusher; m_filestore_flusher_max_fds = conf->filestore_flusher_max_fds; m_filestore_flush_min = conf->filestore_flush_min; diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 34ec3678f08..a17fec0fea8 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -762,6 +762,7 @@ OSD::OSD(int id, Messenger *internal_messenger, Messenger *external_messenger, whoami(id), dev_path(dev), journal_path(jdev), dispatch_running(false), + asok_hook(NULL), osd_compat(get_osd_compat_set()), state(STATE_INITIALIZING), boot_epoch(0), up_epoch(0), bind_epoch(0), op_tp(external_messenger->cct, "OSD::op_tp", g_conf->osd_op_threads, "osd_op_threads"), @@ -777,8 +778,6 @@ OSD::OSD(int id, Messenger *internal_messenger, Messenger *external_messenger, heartbeat_dispatcher(this), stat_lock("OSD::stat_lock"), finished_lock("OSD::finished_lock"), - admin_ops_hook(NULL), - historic_ops_hook(NULL), op_wq(this, g_conf->osd_op_thread_timeout, &op_tp), peering_wq(this, g_conf->osd_op_thread_timeout, &op_tp, 200), map_lock("OSD::map_lock"), @@ -842,33 +841,42 @@ int OSD::pre_init() << "currently in use. (Is ceph-osd already running?)" << dendl; return -EBUSY; } + + g_conf->add_observer(this); return 0; } -class HistoricOpsSocketHook : public AdminSocketHook { +// asok + +class OSDSocketHook : public AdminSocketHook { OSD *osd; public: - HistoricOpsSocketHook(OSD *o) : osd(o) {} + OSDSocketHook(OSD *o) : osd(o) {} bool call(std::string command, std::string args, bufferlist& out) { stringstream ss; - osd->dump_historic_ops(ss); + bool r = osd->asok_command(command, args, ss); out.append(ss); - return true; + return r; } }; - -class OpsFlightSocketHook : public AdminSocketHook { - OSD *osd; -public: - OpsFlightSocketHook(OSD *o) : osd(o) {} - bool call(std::string command, std::string args, bufferlist& out) { - stringstream ss; - osd->dump_ops_in_flight(ss); - out.append(ss); - return true; +bool OSD::asok_command(string command, string args, ostream& ss) +{ + if (command == "dump_ops_in_flight") { + op_tracker.dump_ops_in_flight(ss); + } else if (command == "dump_historic_ops") { + op_tracker.dump_historic_ops(ss); + } else if (command == "dump_op_pq_state") { + JSONFormatter f(true); + f.open_object_section("pq"); + op_wq.dump(&f); + f.close_section(); + f.flush(ss); + } else { + assert(0 == "broken asok registration"); } -}; + return true; +} class TestOpsSocketHook : public AdminSocketHook { OSDService *service; @@ -979,14 +987,16 @@ int OSD::init() // tick timer.add_event_after(g_conf->osd_heartbeat_interval, new C_Tick(this)); - admin_ops_hook = new OpsFlightSocketHook(this); AdminSocket *admin_socket = cct->get_admin_socket(); - r = admin_socket->register_command("dump_ops_in_flight", admin_ops_hook, - "show the ops currently in flight"); + asok_hook = new OSDSocketHook(this); + r = admin_socket->register_command("dump_ops_in_flight", asok_hook, + "show the ops currently in flight"); assert(r == 0); - historic_ops_hook = new HistoricOpsSocketHook(this); - r = admin_socket->register_command("dump_historic_ops", historic_ops_hook, - "show slowest recent ops"); + r = admin_socket->register_command("dump_historic_ops", asok_hook, + "show slowest recent ops"); + assert(r == 0); + r = admin_socket->register_command("dump_op_pq_state", asok_hook, + "dump op priority queue state"); assert(r == 0); test_ops_hook = new TestOpsSocketHook(&(this->service), this->store); r = admin_socket->register_command("setomapval", test_ops_hook, @@ -1121,6 +1131,8 @@ void OSD::suicide(int exitcode) int OSD::shutdown() { + g_conf->remove_observer(this); + service.shutdown(); g_ceph_context->_conf->set_val("debug_osd", "100"); g_ceph_context->_conf->set_val("debug_journal", "100"); @@ -1152,10 +1164,10 @@ int OSD::shutdown() cct->get_admin_socket()->unregister_command("dump_ops_in_flight"); cct->get_admin_socket()->unregister_command("dump_historic_ops"); - delete admin_ops_hook; - delete historic_ops_hook; - admin_ops_hook = NULL; - historic_ops_hook = NULL; + cct->get_admin_socket()->unregister_command("dump_op_pq_state"); + delete asok_hook; + asok_hook = NULL; + cct->get_admin_socket()->unregister_command("setomapval"); cct->get_admin_socket()->unregister_command("rmomapkey"); cct->get_admin_socket()->unregister_command("setomapheader"); @@ -2324,11 +2336,6 @@ void OSD::check_ops_in_flight() return; } -void OSD::dump_ops_in_flight(ostream& ss) -{ - op_tracker.dump_ops_in_flight(ss); -} - // Usage: // setomapval <pool-id> <obj-name> <key> <val> // rmomapkey <pool-id> <obj-name> <key> @@ -3828,7 +3835,7 @@ void OSD::wait_for_new_map(OpRequestRef op) } waiting_for_osdmap.push_back(op); - op->mark_delayed(); + op->mark_delayed("wait for new map"); } @@ -6056,7 +6063,7 @@ void OSD::handle_op(OpRequestRef op) if (osdmap->get_pg_acting_role(pgid, whoami) >= 0) { dout(7) << "we are valid target for op, waiting" << dendl; waiting_for_pg[pgid].push_back(op); - op->mark_delayed(); + op->mark_delayed("waiting for pg to exist locally"); return; } @@ -6188,14 +6195,18 @@ bool OSD::op_is_discardable(MOSDOp *op) */ void OSD::enqueue_op(PG *pg, OpRequestRef op) { - dout(15) << "enqueue_op " << op << " " << *(op->request) << dendl; + utime_t latency = ceph_clock_now(g_ceph_context) - op->request->get_recv_stamp(); + dout(15) << "enqueue_op " << op << " prio " << op->request->get_priority() + << " cost " << op->request->get_cost() + << " latency " << latency + << " " << *(op->request) << dendl; op_wq.queue(make_pair(PGRef(pg), op)); } void OSD::OpWQ::_enqueue(pair<PGRef, OpRequestRef> item) { unsigned priority = item.second->request->get_priority(); - unsigned cost = item.second->request->get_data().length(); + unsigned cost = item.second->request->get_cost(); if (priority >= CEPH_MSG_PRIO_LOW) pqueue.enqueue_strict( item.second->request->get_source_inst(), @@ -6217,7 +6228,7 @@ void OSD::OpWQ::_enqueue_front(pair<PGRef, OpRequestRef> item) } } unsigned priority = item.second->request->get_priority(); - unsigned cost = item.second->request->get_data().length(); + unsigned cost = item.second->request->get_cost(); if (priority >= CEPH_MSG_PRIO_LOW) pqueue.enqueue_strict_front( item.second->request->get_source_inst(), @@ -6273,7 +6284,11 @@ void OSDService::dequeue_pg(PG *pg, list<OpRequestRef> *dequeued) */ void OSD::dequeue_op(PGRef pg, OpRequestRef op) { - dout(10) << "dequeue_op " << op << " " << *(op->request) + utime_t latency = ceph_clock_now(g_ceph_context) - op->request->get_recv_stamp(); + dout(10) << "dequeue_op " << op << " prio " << op->request->get_priority() + << " cost " << op->request->get_cost() + << " latency " << latency + << " " << *(op->request) << " pg " << *pg << dendl; if (pg->deleting) return; @@ -6364,6 +6379,26 @@ void OSD::process_peering_events(const list<PG*> &pgs) // -------------------------------- +const char** OSD::get_tracked_conf_keys() const +{ + static const char* KEYS[] = { + "osd_max_backfills", + NULL + }; + return KEYS; +} + +void OSD::handle_conf_change(const struct md_config_t *conf, + const std::set <std::string> &changed) +{ + if (changed.count("osd_max_backfills")) { + service.local_reserver.set_max(g_conf->osd_max_backfills); + service.remote_reserver.set_max(g_conf->osd_max_backfills); + } +} + +// -------------------------------- + int OSD::init_op_flags(OpRequestRef op) { MOSDOp *m = (MOSDOp*)op->request; diff --git a/src/osd/OSD.h b/src/osd/OSD.h index be4ed11791e..359d75396fe 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -395,8 +395,15 @@ public: OSDService(OSD *osd); }; -class OSD : public Dispatcher { +class OSD : public Dispatcher, + public md_config_obs_t { /** OSD **/ +public: + // config observer bits + virtual const char** get_tracked_conf_keys() const; + virtual void handle_conf_change(const struct md_config_t *conf, + const std::set <std::string> &changed); + protected: Mutex osd_lock; // global lock SafeTimer timer; // safe timer (osd_lock) @@ -434,6 +441,11 @@ protected: void check_osdmap_features(); + // asok + friend class OSDSocketHook; + class OSDSocketHook *asok_hook; + bool asok_command(string command, string args, ostream& ss); + public: ClassHandler *class_handler; int get_nodeid() { return whoami; } @@ -614,18 +626,10 @@ private: // -- op tracking -- OpTracker op_tracker; void check_ops_in_flight(); - void dump_ops_in_flight(ostream& ss); - void dump_historic_ops(ostream& ss) { - return op_tracker.dump_historic_ops(ss); - } void test_ops(std::string command, std::string args, ostream& ss); - friend class OpsFlightSocketHook; - friend class HistoricOpsSocketHook; friend class TestOpsSocketHook; - friend class C_CompleteSplits; - OpsFlightSocketHook *admin_ops_hook; - HistoricOpsSocketHook *historic_ops_hook; TestOpsSocketHook *test_ops_hook; + friend class C_CompleteSplits; // -- op queue -- @@ -639,7 +643,15 @@ private: : ThreadPool::WorkQueueVal<pair<PGRef, OpRequestRef>, PGRef >( "OSD::OpWQ", ti, ti*10, tp), qlock("OpWQ::qlock"), - osd(o) {} + osd(o), + pqueue(o->cct->_conf->osd_op_pq_max_tokens_per_priority, + o->cct->_conf->osd_op_pq_min_cost) + {} + + void dump(Formatter *f) { + Mutex::Locker l(qlock); + pqueue.dump(f); + } void _enqueue_front(pair<PGRef, OpRequestRef> item); void _enqueue(pair<PGRef, OpRequestRef> item); diff --git a/src/osd/OpRequest.cc b/src/osd/OpRequest.cc index 436e2de4176..b3d95367ec5 100644 --- a/src/osd/OpRequest.cc +++ b/src/osd/OpRequest.cc @@ -153,7 +153,8 @@ bool OpTracker::check_ops_in_flight(std::vector<string> &warning_vector) utime_t age = now - (*i)->received_time; stringstream ss; ss << "slow request " << age << " seconds old, received at " << (*i)->received_time - << ": " << *((*i)->request) << " currently " << (*i)->state_string(); + << ": " << *((*i)->request) << " currently " + << ((*i)->current.size() ? (*i)->current : (*i)->state_string()); warning_vector.push_back(ss.str()); // only those that have been shown will backoff diff --git a/src/osd/OpRequest.h b/src/osd/OpRequest.h index 0b35fd89f70..432bddb1672 100644 --- a/src/osd/OpRequest.h +++ b/src/osd/OpRequest.h @@ -128,6 +128,7 @@ struct OpRequest : public TrackedOp { private: list<pair<utime_t, string> > events; + string current; Mutex lock; OpTracker *tracker; osd_reqid_t reqid; @@ -139,6 +140,7 @@ private: static const uint8_t flag_delayed = 1 << 2; static const uint8_t flag_started = 1 << 3; static const uint8_t flag_sub_op_sent = 1 << 4; + static const uint8_t flag_commit_sent = 1 << 5; OpRequest(Message *req, OpTracker *tracker) : request(req), xitem(this), @@ -162,11 +164,13 @@ public: bool been_delayed() { return hit_flag_points & flag_delayed; } bool been_started() { return hit_flag_points & flag_started; } bool been_sub_op_sent() { return hit_flag_points & flag_sub_op_sent; } + bool been_commit_sent() { return hit_flag_points & flag_commit_sent; } bool currently_queued_for_pg() { return latest_flag_point & flag_queued_for_pg; } bool currently_reached_pg() { return latest_flag_point & flag_reached_pg; } bool currently_delayed() { return latest_flag_point & flag_delayed; } bool currently_started() { return latest_flag_point & flag_started; } bool currently_sub_op_sent() { return latest_flag_point & flag_sub_op_sent; } + bool currently_commit_sent() { return latest_flag_point & flag_commit_sent; } const char *state_string() const { switch(latest_flag_point) { @@ -175,6 +179,7 @@ public: case flag_delayed: return "delayed"; case flag_started: return "started"; case flag_sub_op_sent: return "waiting for sub ops"; + case flag_commit_sent: return "commit sent; apply or cleanup"; default: break; } return "no flag points reached"; @@ -182,28 +187,40 @@ public: void mark_queued_for_pg() { mark_event("queued_for_pg"); + current = "queued for pg"; hit_flag_points |= flag_queued_for_pg; latest_flag_point = flag_queued_for_pg; } void mark_reached_pg() { mark_event("reached_pg"); + current = "reached pg"; hit_flag_points |= flag_reached_pg; latest_flag_point = flag_reached_pg; } - void mark_delayed() { + void mark_delayed(string s) { + mark_event(s); + current = s; hit_flag_points |= flag_delayed; latest_flag_point = flag_delayed; } void mark_started() { mark_event("started"); + current = "started"; hit_flag_points |= flag_started; latest_flag_point = flag_started; } - void mark_sub_op_sent() { - mark_event("sub_op_sent"); + void mark_sub_op_sent(string s) { + mark_event(s); + current = s; hit_flag_points |= flag_sub_op_sent; latest_flag_point = flag_sub_op_sent; } + void mark_commit_sent() { + mark_event("commit_sent"); + current = "commit sent"; + hit_flag_points |= flag_commit_sent; + latest_flag_point = flag_commit_sent; + } void mark_event(const string &event); osd_reqid_t get_reqid() const { diff --git a/src/osd/PG.cc b/src/osd/PG.cc index 2899738c346..51dbee46f61 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -4964,7 +4964,7 @@ bool PG::can_discard_op(OpRequestRef op) dout(7) << " queueing replay at " << m->get_version() << " for " << *m << dendl; replay_queue[m->get_version()] = op; - op->mark_delayed(); + op->mark_delayed("waiting for replay"); return true; } } diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 2e15e945b10..552fb89bed2 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -122,7 +122,7 @@ void ReplicatedPG::wait_for_missing_object(const hobject_t& soid, OpRequestRef o pull(soid, v, g_conf->osd_client_op_priority); } waiting_for_missing_object[soid].push_back(op); - op->mark_delayed(); + op->mark_delayed("waiting for missing object"); } void ReplicatedPG::wait_for_all_missing(OpRequestRef op) @@ -178,7 +178,7 @@ void ReplicatedPG::wait_for_degraded_object(const hobject_t& soid, OpRequestRef recover_object_replicas(soid, v, g_conf->osd_client_op_priority); } waiting_for_degraded_object[soid].push_back(op); - op->mark_delayed(); + op->mark_delayed("waiting for degraded object"); } void ReplicatedPG::wait_for_backfill_pos(OpRequestRef op) @@ -631,7 +631,7 @@ void ReplicatedPG::do_op(OpRequestRef op) if (op->may_write() && scrubber.write_blocked_by_scrub(head)) { dout(20) << __func__ << ": waiting for scrub" << dendl; waiting_for_active.push_back(op); - op->mark_delayed(); + op->mark_delayed("waiting for scrub"); return; } @@ -734,7 +734,7 @@ void ReplicatedPG::do_op(OpRequestRef op) if (!ok) { dout(10) << "do_op waiting on mode " << mode << dendl; mode.waiting.push_back(op); - op->mark_delayed(); + op->mark_delayed("waiting on pg mode"); return; } @@ -876,7 +876,7 @@ void ReplicatedPG::do_op(OpRequestRef op) } dout(10) << " waiting for " << oldv << " to commit" << dendl; waiting_for_ondisk[oldv].push_back(op); // always queue ondisk waiters, so that we can requeue if needed - op->mark_delayed(); + op->mark_delayed("waiting for ondisk"); } return; } @@ -1086,55 +1086,46 @@ void ReplicatedPG::do_sub_op(OpRequestRef op) assert(m->get_header().type == MSG_OSD_SUBOP); dout(15) << "do_sub_op " << *op->request << dendl; + OSDOp *first = NULL; if (m->ops.size() >= 1) { - OSDOp& first = m->ops[0]; - switch (first.op.op) { + first = &m->ops[0]; + switch (first->op.op) { case CEPH_OSD_OP_PULL: sub_op_pull(op); return; + } + } + + if (!is_active()) { + waiting_for_active.push_back(op); + op->mark_delayed("waiting for active"); + return; + } + + if (first) { + switch (first->op.op) { case CEPH_OSD_OP_PUSH: - if (!is_active()) - waiting_for_active.push_back(op); - else - sub_op_push(op); + sub_op_push(op); return; case CEPH_OSD_OP_DELETE: - if (!is_active()) - waiting_for_active.push_back(op); - else - sub_op_remove(op); + sub_op_remove(op); return; case CEPH_OSD_OP_SCRUB_RESERVE: - if (!is_active()) - waiting_for_active.push_back(op); - else - sub_op_scrub_reserve(op); + sub_op_scrub_reserve(op); return; case CEPH_OSD_OP_SCRUB_UNRESERVE: - if (!is_active()) - waiting_for_active.push_back(op); - else - sub_op_scrub_unreserve(op); + sub_op_scrub_unreserve(op); return; case CEPH_OSD_OP_SCRUB_STOP: - if (!is_active()) - waiting_for_active.push_back(op); - else - sub_op_scrub_stop(op); + sub_op_scrub_stop(op); return; case CEPH_OSD_OP_SCRUB_MAP: - if (!is_active()) - waiting_for_active.push_back(op); - else - sub_op_scrub_map(op); + sub_op_scrub_map(op); return; } } - if (!is_active()) - waiting_for_active.push_back(op); - else - sub_op_modify(op); + sub_op_modify(op); } void ReplicatedPG::do_sub_op_reply(OpRequestRef op) @@ -3876,6 +3867,7 @@ void ReplicatedPG::eval_repop(RepGather *repop) assert(entity_name_t::TYPE_OSD != m->get_connection()->peer_type); osd->send_message_osd_client(reply, m->get_connection()); repop->sent_disk = true; + repop->ctx->op->mark_commit_sent(); } } @@ -3962,9 +3954,12 @@ void ReplicatedPG::issue_repop(RepGather *repop, utime_t now, int acks_wanted = CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK; + if (ctx->op && acting.size() > 1) { + ostringstream ss; + ss << "waiting for subops from " << vector<int>(acting.begin() + 1, acting.end()); + ctx->op->mark_sub_op_sent(ss.str()); + } for (unsigned i=1; i<acting.size(); i++) { - if (ctx->op) - ctx->op->mark_sub_op_sent(); int peer = acting[i]; pg_info_t &pinfo = peer_info[peer]; @@ -4698,6 +4693,8 @@ void ReplicatedPG::sub_op_modify(OpRequestRef op) } } + op->mark_started(); + Context *oncommit = new C_OSD_RepModifyCommit(rm); Context *onapply = new C_OSD_RepModifyApply(rm); int r = osd->store->queue_transactions(osr.get(), rm->tls, onapply, oncommit, 0, op); @@ -4753,7 +4750,7 @@ void ReplicatedPG::sub_op_modify_applied(RepModify *rm) void ReplicatedPG::sub_op_modify_commit(RepModify *rm) { lock(); - rm->op->mark_event("sub_op_commit"); + rm->op->mark_commit_sent(); rm->committed = true; if (rm->epoch_started >= last_peering_reset) { @@ -5192,6 +5189,7 @@ int ReplicatedPG::send_pull(int prio, int peer, subop->set_priority(prio); subop->ops = vector<OSDOp>(1); subop->ops[0].op.op = CEPH_OSD_OP_PULL; + subop->ops[0].op.extent.length = g_conf->osd_recovery_max_chunk; subop->recovery_info = recovery_info; subop->recovery_progress = progress; @@ -5780,8 +5778,10 @@ void ReplicatedPG::_committed_pushed_object(OpRequestRef op, epoch_t same_since, dout(10) << "_committed_pushed_object pg has changed, not touching last_complete_ondisk" << dendl; } - if (op) + if (op) { log_subop_stats(op, l_osd_sop_push_inb, l_osd_sop_push_lat); + op->mark_event("committed"); + } unlock(); } |