diff options
author | Samuel Just <sam.just@inktank.com> | 2013-08-06 18:05:06 -0700 |
---|---|---|
committer | Samuel Just <sam.just@inktank.com> | 2013-08-06 18:05:08 -0700 |
commit | fb06bf60c19709e0248c8bb485f26ea0175a1927 (patch) | |
tree | ba6288fc0119d9d8874edaf3a2016565e77137ce | |
parent | f71ec8e63a3ef6b2f60fce22ed1003d0a8dccca0 (diff) | |
parent | 68203903570f4d654d453468078476be433fc65a (diff) | |
download | ceph-fb06bf60c19709e0248c8bb485f26ea0175a1927.tar.gz |
Merge branch 'wip-recovery-op-warn' into next
Reviewed-by: Sage Weil <sage@inktank.com>
-rw-r--r-- | src/common/config_opts.h | 11 | ||||
-rw-r--r-- | src/osd/OSD.cc | 19 | ||||
-rw-r--r-- | src/osd/OSD.h | 10 | ||||
-rw-r--r-- | src/osd/OpRequest.cc | 16 | ||||
-rw-r--r-- | src/osd/OpRequest.h | 12 | ||||
-rw-r--r-- | src/osd/PG.cc | 6 | ||||
-rw-r--r-- | src/osd/PG.h | 14 | ||||
-rw-r--r-- | src/osd/ReplicatedPG.cc | 42 | ||||
-rw-r--r-- | src/osd/ReplicatedPG.h | 20 |
9 files changed, 100 insertions, 50 deletions
diff --git a/src/common/config_opts.h b/src/common/config_opts.h index 1c7a917602a..f67d0d1237d 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -426,7 +426,8 @@ OPTION(osd_default_data_pool_replay_window, OPT_INT, 45) OPTION(osd_preserve_trimmed_log, OPT_BOOL, false) OPTION(osd_auto_mark_unfound_lost, OPT_BOOL, false) OPTION(osd_recovery_delay_start, OPT_FLOAT, 0) -OPTION(osd_recovery_max_active, OPT_INT, 5) +OPTION(osd_recovery_max_active, OPT_INT, 60) +OPTION(osd_recovery_max_single_start, OPT_INT, 10) OPTION(osd_recovery_max_chunk, OPT_U64, 8<<20) // max size of push chunk OPTION(osd_push_per_object_cost, OPT_U64, 1000) // push cost per object OPTION(osd_max_push_cost, OPT_U64, 8<<20) // max size of push message @@ -489,9 +490,13 @@ OPTION(osd_leveldb_log, OPT_STR, "") // enable OSD leveldb log file * osd_client_op_priority/osd_recovery_op_priority determines the ratio of * available io between client and recovery. Each option may be set between * 1..63. + * + * osd_recovery_op_warn_multiple scales the normal warning threshhold, + * osd_op_complaint_time, so that slow recovery ops won't cause noise */ -OPTION(osd_client_op_priority, OPT_INT, 63) -OPTION(osd_recovery_op_priority, OPT_INT, 10) +OPTION(osd_client_op_priority, OPT_U32, 63) +OPTION(osd_recovery_op_priority, OPT_U32, 10) +OPTION(osd_recovery_op_warn_multiple, OPT_U32, 16) // Max time to wait between notifying mon of shutdown and shutting down OPTION(osd_mon_shutdown_timeout, OPT_DOUBLE, 5) diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index e4653dc1f70..1a77dae730a 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -6632,11 +6632,12 @@ bool OSD::_recover_now() return true; } -void OSD::do_recovery(PG *pg) +void OSD::do_recovery(PG *pg, ThreadPool::TPHandle &handle) { // see how many we should try to start. note that this is a bit racy. recovery_wq.lock(); - int max = g_conf->osd_recovery_max_active - recovery_ops_active; + int max = MAX(g_conf->osd_recovery_max_active - recovery_ops_active, + g_conf->osd_recovery_max_single_start); if (max > 0) { dout(10) << "do_recovery can start " << max << " (" << recovery_ops_active << "/" << g_conf->osd_recovery_max_active << " rops)" << dendl; @@ -6652,7 +6653,7 @@ void OSD::do_recovery(PG *pg) recovery_wq.queue(pg); return; } else { - pg->lock(); + pg->lock_suspend_timeout(handle); if (pg->deleting || !(pg->is_active() && pg->is_primary())) { pg->unlock(); goto out; @@ -6664,7 +6665,7 @@ void OSD::do_recovery(PG *pg) #endif PG::RecoveryCtx rctx = create_context(); - int started = pg->start_recovery_ops(max, &rctx); + int started = pg->start_recovery_ops(max, &rctx, handle); dout(10) << "do_recovery started " << started << "/" << max << " on " << *pg << dendl; /* @@ -7052,7 +7053,7 @@ void OSD::OpWQ::_process(PGRef pg, ThreadPool::TPHandle &handle) if (!(pg_for_processing[&*pg].size())) pg_for_processing.erase(&*pg); } - osd->dequeue_op(pg, op); + osd->dequeue_op(pg, op, handle); pg->unlock(); } @@ -7065,7 +7066,9 @@ void OSDService::dequeue_pg(PG *pg, list<OpRequestRef> *dequeued) /* * NOTE: dequeue called in worker thread, with pg lock */ -void OSD::dequeue_op(PGRef pg, OpRequestRef op) +void OSD::dequeue_op( + PGRef pg, OpRequestRef op, + ThreadPool::TPHandle &handle) { utime_t latency = ceph_clock_now(g_ceph_context) - op->request->get_recv_stamp(); dout(10) << "dequeue_op " << op << " prio " << op->request->get_priority() @@ -7078,7 +7081,7 @@ void OSD::dequeue_op(PGRef pg, OpRequestRef op) op->mark_reached_pg(); - pg->do_request(op); + pg->do_request(op, handle); // finish dout(10) << "dequeue_op " << op << " finish" << dendl; @@ -7130,7 +7133,7 @@ void OSD::process_peering_events( ++i) { set<boost::intrusive_ptr<PG> > split_pgs; PG *pg = *i; - pg->lock(); + pg->lock_suspend_timeout(handle); curmap = service.get_osdmap(); if (pg->deleting) { pg->unlock(); diff --git a/src/osd/OSD.h b/src/osd/OSD.h index 5196a1dc1f3..82a251d9a80 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -915,7 +915,9 @@ private: } op_wq; void enqueue_op(PG *pg, OpRequestRef op); - void dequeue_op(PGRef pg, OpRequestRef op); + void dequeue_op( + PGRef pg, OpRequestRef op, + ThreadPool::TPHandle &handle); // -- peering queue -- struct PeeringWQ : public ThreadPool::BatchWorkQueue<PG> { @@ -1371,8 +1373,8 @@ protected: osd->recovery_queue.push_front(&pg->recovery_item); } } - void _process(PG *pg) { - osd->do_recovery(pg); + void _process(PG *pg, ThreadPool::TPHandle &handle) { + osd->do_recovery(pg, handle); pg->put("RecoveryWQ"); } void _clear() { @@ -1386,7 +1388,7 @@ protected: void start_recovery_op(PG *pg, const hobject_t& soid); void finish_recovery_op(PG *pg, const hobject_t& soid, bool dequeue); - void do_recovery(PG *pg); + void do_recovery(PG *pg, ThreadPool::TPHandle &handle); bool _recover_now(); // replay / delayed pg activation diff --git a/src/osd/OpRequest.cc b/src/osd/OpRequest.cc index a6cdc9ecffb..c694362a8a5 100644 --- a/src/osd/OpRequest.cc +++ b/src/osd/OpRequest.cc @@ -20,6 +20,22 @@ static ostream& _prefix(std::ostream* _dout) return *_dout << "--OSD::tracker-- "; } +OpRequest::OpRequest(Message *req, OpTracker *tracker) : + request(req), xitem(this), + rmw_flags(0), + warn_interval_multiplier(1), + lock("OpRequest::lock"), + tracker(tracker), + hit_flag_points(0), latest_flag_point(0), + seq(0) { + received_time = request->get_recv_stamp(); + tracker->register_inflight_op(&xitem); + if (req->get_priority() < g_conf->osd_client_op_priority) { + // don't warn as quickly for low priority ops + warn_interval_multiplier = g_conf->osd_recovery_op_warn_multiple; + } +} + void OpHistory::on_shutdown() { arrived.clear(); diff --git a/src/osd/OpRequest.h b/src/osd/OpRequest.h index a2014472432..e72f03d1d77 100644 --- a/src/osd/OpRequest.h +++ b/src/osd/OpRequest.h @@ -156,17 +156,7 @@ private: static const uint8_t flag_sub_op_sent = 1 << 4; static const uint8_t flag_commit_sent = 1 << 5; - OpRequest(Message *req, OpTracker *tracker) : - request(req), xitem(this), - rmw_flags(0), - warn_interval_multiplier(1), - lock("OpRequest::lock"), - tracker(tracker), - hit_flag_points(0), latest_flag_point(0), - seq(0) { - received_time = request->get_recv_stamp(); - tracker->register_inflight_op(&xitem); - } + OpRequest(Message *req, OpTracker *tracker); public: ~OpRequest() { assert(request); diff --git a/src/osd/PG.cc b/src/osd/PG.cc index 63e760e3b21..8e78eaa7a16 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -1397,7 +1397,9 @@ void PG::queue_op(OpRequestRef op) osd->op_wq.queue(make_pair(PGRef(this), op)); } -void PG::do_request(OpRequestRef op) +void PG::do_request( + OpRequestRef op, + ThreadPool::TPHandle &handle) { // do any pending flush do_pending_flush(); @@ -1435,7 +1437,7 @@ void PG::do_request(OpRequestRef op) break; case MSG_OSD_PG_SCAN: - do_scan(op); + do_scan(op, handle); break; case MSG_OSD_PG_BACKFILL: diff --git a/src/osd/PG.h b/src/osd/PG.h index 8f572c75e19..d4679ce4fd8 100644 --- a/src/osd/PG.h +++ b/src/osd/PG.h @@ -645,7 +645,9 @@ public: virtual void check_local() = 0; - virtual int start_recovery_ops(int max, RecoveryCtx *prctx) = 0; + virtual int start_recovery_ops( + int max, RecoveryCtx *prctx, + ThreadPool::TPHandle &handle) = 0; void purge_strays(); @@ -1804,12 +1806,18 @@ public: // abstract bits - void do_request(OpRequestRef op); + void do_request( + OpRequestRef op, + ThreadPool::TPHandle &handle + ); virtual void do_op(OpRequestRef op) = 0; virtual void do_sub_op(OpRequestRef op) = 0; virtual void do_sub_op_reply(OpRequestRef op) = 0; - virtual void do_scan(OpRequestRef op) = 0; + virtual void do_scan( + OpRequestRef op, + ThreadPool::TPHandle &handle + ) = 0; virtual void do_backfill(OpRequestRef op) = 0; virtual void do_push(OpRequestRef op) = 0; virtual void do_pull(OpRequestRef op) = 0; diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 658ea7cb746..ab9c8099a44 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -1252,7 +1252,9 @@ void ReplicatedPG::do_sub_op_reply(OpRequestRef op) sub_op_modify_reply(op); } -void ReplicatedPG::do_scan(OpRequestRef op) +void ReplicatedPG::do_scan( + OpRequestRef op, + ThreadPool::TPHandle &handle) { MOSDPGScan *m = static_cast<MOSDPGScan*>(op->request); assert(m->get_header().type == MSG_OSD_PG_SCAN); @@ -1278,7 +1280,9 @@ void ReplicatedPG::do_scan(OpRequestRef op) BackfillInterval bi; osr->flush(); - scan_range(m->begin, g_conf->osd_backfill_scan_min, g_conf->osd_backfill_scan_max, &bi); + scan_range( + m->begin, g_conf->osd_backfill_scan_min, + g_conf->osd_backfill_scan_max, &bi, handle); MOSDPGScan *reply = new MOSDPGScan(MOSDPGScan::OP_SCAN_DIGEST, get_osdmap()->get_epoch(), m->query_epoch, info.pgid, bi.begin, bi.end); @@ -6875,7 +6879,9 @@ void ReplicatedPG::check_recovery_sources(const OSDMapRef osdmap) } -int ReplicatedPG::start_recovery_ops(int max, RecoveryCtx *prctx) +int ReplicatedPG::start_recovery_ops( + int max, RecoveryCtx *prctx, + ThreadPool::TPHandle &handle) { int started = 0; assert(is_primary()); @@ -6898,15 +6904,15 @@ int ReplicatedPG::start_recovery_ops(int max, RecoveryCtx *prctx) if (num_missing == num_unfound) { // All of the missing objects we have are unfound. // Recover the replicas. - started = recover_replicas(max); + started = recover_replicas(max, handle); } if (!started) { // We still have missing objects that we should grab from replicas. - started += recover_primary(max); + started += recover_primary(max, handle); } if (!started && num_unfound != get_num_unfound()) { // second chance to recovery replicas - started = recover_replicas(max); + started = recover_replicas(max, handle); } bool deferred_backfill = false; @@ -6931,7 +6937,7 @@ int ReplicatedPG::start_recovery_ops(int max, RecoveryCtx *prctx) } deferred_backfill = true; } else { - started += recover_backfill(max - started); + started += recover_backfill(max - started, handle); } } @@ -6993,7 +6999,7 @@ int ReplicatedPG::start_recovery_ops(int max, RecoveryCtx *prctx) * do one recovery op. * return true if done, false if nothing left to do. */ -int ReplicatedPG::recover_primary(int max) +int ReplicatedPG::recover_primary(int max, ThreadPool::TPHandle &handle) { assert(is_primary()); @@ -7012,6 +7018,7 @@ int ReplicatedPG::recover_primary(int max) map<version_t, hobject_t>::const_iterator p = missing.rmissing.lower_bound(pg_log.get_log().last_requested); while (p != missing.rmissing.end()) { + handle.reset_tp_timeout(); hobject_t soid; version_t v = p->first; @@ -7204,7 +7211,7 @@ int ReplicatedPG::prep_object_replica_pushes( return 1; } -int ReplicatedPG::recover_replicas(int max) +int ReplicatedPG::recover_replicas(int max, ThreadPool::TPHandle &handle) { dout(10) << __func__ << "(" << max << ")" << dendl; int started = 0; @@ -7226,6 +7233,7 @@ int ReplicatedPG::recover_replicas(int max) for (map<version_t, hobject_t>::const_iterator p = m.rmissing.begin(); p != m.rmissing.end() && started < max; ++p) { + handle.reset_tp_timeout(); const hobject_t soid(p->second); if (pushing.count(soid)) { @@ -7275,7 +7283,9 @@ int ReplicatedPG::recover_replicas(int max) * peer_info[backfill_target].last_backfill = MIN(peer_backfill_info.begin, * backfill_info.begin, backfills_in_flight) */ -int ReplicatedPG::recover_backfill(int max) +int ReplicatedPG::recover_backfill( + int max, + ThreadPool::TPHandle &handle) { dout(10) << "recover_backfill (" << max << ")" << dendl; assert(backfill_target >= 0); @@ -7305,7 +7315,7 @@ int ReplicatedPG::recover_backfill(int max) dout(10) << " rescanning local backfill_info from " << backfill_pos << dendl; backfill_info.clear(); osr->flush(); - scan_range(backfill_pos, local_min, local_max, &backfill_info); + scan_range(backfill_pos, local_min, local_max, &backfill_info, handle); int ops = 0; map<hobject_t, pair<eversion_t, eversion_t> > to_push; @@ -7319,7 +7329,8 @@ int ReplicatedPG::recover_backfill(int max) if (backfill_info.begin <= pbi.begin && !backfill_info.extends_to_end() && backfill_info.empty()) { osr->flush(); - scan_range(backfill_info.end, local_min, local_max, &backfill_info); + scan_range(backfill_info.end, local_min, local_max, &backfill_info, + handle); backfill_info.trim(); } backfill_pos = backfill_info.begin > pbi.begin ? pbi.begin : backfill_info.begin; @@ -7407,6 +7418,7 @@ int ReplicatedPG::recover_backfill(int max) for (map<hobject_t, eversion_t>::iterator i = to_remove.begin(); i != to_remove.end(); ++i) { + handle.reset_tp_timeout(); send_remove_op(i->first, i->second, backfill_target); } @@ -7414,6 +7426,7 @@ int ReplicatedPG::recover_backfill(int max) for (map<hobject_t, pair<eversion_t, eversion_t> >::iterator i = to_push.begin(); i != to_push.end(); ++i) { + handle.reset_tp_timeout(); prep_backfill_object_push( i->first, i->second.first, i->second.second, backfill_target, &pushes); } @@ -7480,7 +7493,9 @@ void ReplicatedPG::prep_backfill_object_push( put_object_context(obc); } -void ReplicatedPG::scan_range(hobject_t begin, int min, int max, BackfillInterval *bi) +void ReplicatedPG::scan_range( + hobject_t begin, int min, int max, BackfillInterval *bi, + ThreadPool::TPHandle &handle) { assert(is_locked()); dout(10) << "scan_range from " << begin << dendl; @@ -7496,6 +7511,7 @@ void ReplicatedPG::scan_range(hobject_t begin, int min, int max, BackfillInterva dout(20) << ls << dendl; for (vector<hobject_t>::iterator p = ls.begin(); p != ls.end(); ++p) { + handle.reset_tp_timeout(); ObjectContext *obc = NULL; if (is_primary()) obc = _lookup_object_context(*p); diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index 7b70b4381ea..41c8106ea00 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -759,10 +759,13 @@ protected: void _clear_recovery_state(); void queue_for_recovery(); - int start_recovery_ops(int max, RecoveryCtx *prctx); - int recover_primary(int max); - int recover_replicas(int max); - int recover_backfill(int max); + int start_recovery_ops( + int max, RecoveryCtx *prctx, + ThreadPool::TPHandle &handle); + + int recover_primary(int max, ThreadPool::TPHandle &handle); + int recover_replicas(int max, ThreadPool::TPHandle &handle); + int recover_backfill(int max, ThreadPool::TPHandle &handle); /** * scan a (hash) range of objects in the current pg @@ -772,7 +775,10 @@ protected: * @max return no more than this many items * @bi [out] resulting map of objects to eversion_t's */ - void scan_range(hobject_t begin, int min, int max, BackfillInterval *bi); + void scan_range( + hobject_t begin, int min, int max, BackfillInterval *bi, + ThreadPool::TPHandle &handle + ); void prep_backfill_object_push( hobject_t oid, eversion_t v, eversion_t have, int peer, @@ -939,7 +945,9 @@ public: void do_pg_op(OpRequestRef op); void do_sub_op(OpRequestRef op); void do_sub_op_reply(OpRequestRef op); - void do_scan(OpRequestRef op); + void do_scan( + OpRequestRef op, + ThreadPool::TPHandle &handle); void do_backfill(OpRequestRef op); void _do_push(OpRequestRef op); void _do_pull_response(OpRequestRef op); |