summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSamuel Just <sam.just@inktank.com>2013-08-06 18:05:06 -0700
committerSamuel Just <sam.just@inktank.com>2013-08-06 18:05:08 -0700
commitfb06bf60c19709e0248c8bb485f26ea0175a1927 (patch)
treeba6288fc0119d9d8874edaf3a2016565e77137ce
parentf71ec8e63a3ef6b2f60fce22ed1003d0a8dccca0 (diff)
parent68203903570f4d654d453468078476be433fc65a (diff)
downloadceph-fb06bf60c19709e0248c8bb485f26ea0175a1927.tar.gz
Merge branch 'wip-recovery-op-warn' into next
Reviewed-by: Sage Weil <sage@inktank.com>
-rw-r--r--src/common/config_opts.h11
-rw-r--r--src/osd/OSD.cc19
-rw-r--r--src/osd/OSD.h10
-rw-r--r--src/osd/OpRequest.cc16
-rw-r--r--src/osd/OpRequest.h12
-rw-r--r--src/osd/PG.cc6
-rw-r--r--src/osd/PG.h14
-rw-r--r--src/osd/ReplicatedPG.cc42
-rw-r--r--src/osd/ReplicatedPG.h20
9 files changed, 100 insertions, 50 deletions
diff --git a/src/common/config_opts.h b/src/common/config_opts.h
index 1c7a917602a..f67d0d1237d 100644
--- a/src/common/config_opts.h
+++ b/src/common/config_opts.h
@@ -426,7 +426,8 @@ OPTION(osd_default_data_pool_replay_window, OPT_INT, 45)
OPTION(osd_preserve_trimmed_log, OPT_BOOL, false)
OPTION(osd_auto_mark_unfound_lost, OPT_BOOL, false)
OPTION(osd_recovery_delay_start, OPT_FLOAT, 0)
-OPTION(osd_recovery_max_active, OPT_INT, 5)
+OPTION(osd_recovery_max_active, OPT_INT, 60)
+OPTION(osd_recovery_max_single_start, OPT_INT, 10)
OPTION(osd_recovery_max_chunk, OPT_U64, 8<<20) // max size of push chunk
OPTION(osd_push_per_object_cost, OPT_U64, 1000) // push cost per object
OPTION(osd_max_push_cost, OPT_U64, 8<<20) // max size of push message
@@ -489,9 +490,13 @@ OPTION(osd_leveldb_log, OPT_STR, "") // enable OSD leveldb log file
* osd_client_op_priority/osd_recovery_op_priority determines the ratio of
* available io between client and recovery. Each option may be set between
* 1..63.
+ *
+ * osd_recovery_op_warn_multiple scales the normal warning threshhold,
+ * osd_op_complaint_time, so that slow recovery ops won't cause noise
*/
-OPTION(osd_client_op_priority, OPT_INT, 63)
-OPTION(osd_recovery_op_priority, OPT_INT, 10)
+OPTION(osd_client_op_priority, OPT_U32, 63)
+OPTION(osd_recovery_op_priority, OPT_U32, 10)
+OPTION(osd_recovery_op_warn_multiple, OPT_U32, 16)
// Max time to wait between notifying mon of shutdown and shutting down
OPTION(osd_mon_shutdown_timeout, OPT_DOUBLE, 5)
diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc
index e4653dc1f70..1a77dae730a 100644
--- a/src/osd/OSD.cc
+++ b/src/osd/OSD.cc
@@ -6632,11 +6632,12 @@ bool OSD::_recover_now()
return true;
}
-void OSD::do_recovery(PG *pg)
+void OSD::do_recovery(PG *pg, ThreadPool::TPHandle &handle)
{
// see how many we should try to start. note that this is a bit racy.
recovery_wq.lock();
- int max = g_conf->osd_recovery_max_active - recovery_ops_active;
+ int max = MAX(g_conf->osd_recovery_max_active - recovery_ops_active,
+ g_conf->osd_recovery_max_single_start);
if (max > 0) {
dout(10) << "do_recovery can start " << max << " (" << recovery_ops_active << "/" << g_conf->osd_recovery_max_active
<< " rops)" << dendl;
@@ -6652,7 +6653,7 @@ void OSD::do_recovery(PG *pg)
recovery_wq.queue(pg);
return;
} else {
- pg->lock();
+ pg->lock_suspend_timeout(handle);
if (pg->deleting || !(pg->is_active() && pg->is_primary())) {
pg->unlock();
goto out;
@@ -6664,7 +6665,7 @@ void OSD::do_recovery(PG *pg)
#endif
PG::RecoveryCtx rctx = create_context();
- int started = pg->start_recovery_ops(max, &rctx);
+ int started = pg->start_recovery_ops(max, &rctx, handle);
dout(10) << "do_recovery started " << started << "/" << max << " on " << *pg << dendl;
/*
@@ -7052,7 +7053,7 @@ void OSD::OpWQ::_process(PGRef pg, ThreadPool::TPHandle &handle)
if (!(pg_for_processing[&*pg].size()))
pg_for_processing.erase(&*pg);
}
- osd->dequeue_op(pg, op);
+ osd->dequeue_op(pg, op, handle);
pg->unlock();
}
@@ -7065,7 +7066,9 @@ void OSDService::dequeue_pg(PG *pg, list<OpRequestRef> *dequeued)
/*
* NOTE: dequeue called in worker thread, with pg lock
*/
-void OSD::dequeue_op(PGRef pg, OpRequestRef op)
+void OSD::dequeue_op(
+ PGRef pg, OpRequestRef op,
+ ThreadPool::TPHandle &handle)
{
utime_t latency = ceph_clock_now(g_ceph_context) - op->request->get_recv_stamp();
dout(10) << "dequeue_op " << op << " prio " << op->request->get_priority()
@@ -7078,7 +7081,7 @@ void OSD::dequeue_op(PGRef pg, OpRequestRef op)
op->mark_reached_pg();
- pg->do_request(op);
+ pg->do_request(op, handle);
// finish
dout(10) << "dequeue_op " << op << " finish" << dendl;
@@ -7130,7 +7133,7 @@ void OSD::process_peering_events(
++i) {
set<boost::intrusive_ptr<PG> > split_pgs;
PG *pg = *i;
- pg->lock();
+ pg->lock_suspend_timeout(handle);
curmap = service.get_osdmap();
if (pg->deleting) {
pg->unlock();
diff --git a/src/osd/OSD.h b/src/osd/OSD.h
index 5196a1dc1f3..82a251d9a80 100644
--- a/src/osd/OSD.h
+++ b/src/osd/OSD.h
@@ -915,7 +915,9 @@ private:
} op_wq;
void enqueue_op(PG *pg, OpRequestRef op);
- void dequeue_op(PGRef pg, OpRequestRef op);
+ void dequeue_op(
+ PGRef pg, OpRequestRef op,
+ ThreadPool::TPHandle &handle);
// -- peering queue --
struct PeeringWQ : public ThreadPool::BatchWorkQueue<PG> {
@@ -1371,8 +1373,8 @@ protected:
osd->recovery_queue.push_front(&pg->recovery_item);
}
}
- void _process(PG *pg) {
- osd->do_recovery(pg);
+ void _process(PG *pg, ThreadPool::TPHandle &handle) {
+ osd->do_recovery(pg, handle);
pg->put("RecoveryWQ");
}
void _clear() {
@@ -1386,7 +1388,7 @@ protected:
void start_recovery_op(PG *pg, const hobject_t& soid);
void finish_recovery_op(PG *pg, const hobject_t& soid, bool dequeue);
- void do_recovery(PG *pg);
+ void do_recovery(PG *pg, ThreadPool::TPHandle &handle);
bool _recover_now();
// replay / delayed pg activation
diff --git a/src/osd/OpRequest.cc b/src/osd/OpRequest.cc
index a6cdc9ecffb..c694362a8a5 100644
--- a/src/osd/OpRequest.cc
+++ b/src/osd/OpRequest.cc
@@ -20,6 +20,22 @@ static ostream& _prefix(std::ostream* _dout)
return *_dout << "--OSD::tracker-- ";
}
+OpRequest::OpRequest(Message *req, OpTracker *tracker) :
+ request(req), xitem(this),
+ rmw_flags(0),
+ warn_interval_multiplier(1),
+ lock("OpRequest::lock"),
+ tracker(tracker),
+ hit_flag_points(0), latest_flag_point(0),
+ seq(0) {
+ received_time = request->get_recv_stamp();
+ tracker->register_inflight_op(&xitem);
+ if (req->get_priority() < g_conf->osd_client_op_priority) {
+ // don't warn as quickly for low priority ops
+ warn_interval_multiplier = g_conf->osd_recovery_op_warn_multiple;
+ }
+}
+
void OpHistory::on_shutdown()
{
arrived.clear();
diff --git a/src/osd/OpRequest.h b/src/osd/OpRequest.h
index a2014472432..e72f03d1d77 100644
--- a/src/osd/OpRequest.h
+++ b/src/osd/OpRequest.h
@@ -156,17 +156,7 @@ private:
static const uint8_t flag_sub_op_sent = 1 << 4;
static const uint8_t flag_commit_sent = 1 << 5;
- OpRequest(Message *req, OpTracker *tracker) :
- request(req), xitem(this),
- rmw_flags(0),
- warn_interval_multiplier(1),
- lock("OpRequest::lock"),
- tracker(tracker),
- hit_flag_points(0), latest_flag_point(0),
- seq(0) {
- received_time = request->get_recv_stamp();
- tracker->register_inflight_op(&xitem);
- }
+ OpRequest(Message *req, OpTracker *tracker);
public:
~OpRequest() {
assert(request);
diff --git a/src/osd/PG.cc b/src/osd/PG.cc
index 63e760e3b21..8e78eaa7a16 100644
--- a/src/osd/PG.cc
+++ b/src/osd/PG.cc
@@ -1397,7 +1397,9 @@ void PG::queue_op(OpRequestRef op)
osd->op_wq.queue(make_pair(PGRef(this), op));
}
-void PG::do_request(OpRequestRef op)
+void PG::do_request(
+ OpRequestRef op,
+ ThreadPool::TPHandle &handle)
{
// do any pending flush
do_pending_flush();
@@ -1435,7 +1437,7 @@ void PG::do_request(OpRequestRef op)
break;
case MSG_OSD_PG_SCAN:
- do_scan(op);
+ do_scan(op, handle);
break;
case MSG_OSD_PG_BACKFILL:
diff --git a/src/osd/PG.h b/src/osd/PG.h
index 8f572c75e19..d4679ce4fd8 100644
--- a/src/osd/PG.h
+++ b/src/osd/PG.h
@@ -645,7 +645,9 @@ public:
virtual void check_local() = 0;
- virtual int start_recovery_ops(int max, RecoveryCtx *prctx) = 0;
+ virtual int start_recovery_ops(
+ int max, RecoveryCtx *prctx,
+ ThreadPool::TPHandle &handle) = 0;
void purge_strays();
@@ -1804,12 +1806,18 @@ public:
// abstract bits
- void do_request(OpRequestRef op);
+ void do_request(
+ OpRequestRef op,
+ ThreadPool::TPHandle &handle
+ );
virtual void do_op(OpRequestRef op) = 0;
virtual void do_sub_op(OpRequestRef op) = 0;
virtual void do_sub_op_reply(OpRequestRef op) = 0;
- virtual void do_scan(OpRequestRef op) = 0;
+ virtual void do_scan(
+ OpRequestRef op,
+ ThreadPool::TPHandle &handle
+ ) = 0;
virtual void do_backfill(OpRequestRef op) = 0;
virtual void do_push(OpRequestRef op) = 0;
virtual void do_pull(OpRequestRef op) = 0;
diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc
index 658ea7cb746..ab9c8099a44 100644
--- a/src/osd/ReplicatedPG.cc
+++ b/src/osd/ReplicatedPG.cc
@@ -1252,7 +1252,9 @@ void ReplicatedPG::do_sub_op_reply(OpRequestRef op)
sub_op_modify_reply(op);
}
-void ReplicatedPG::do_scan(OpRequestRef op)
+void ReplicatedPG::do_scan(
+ OpRequestRef op,
+ ThreadPool::TPHandle &handle)
{
MOSDPGScan *m = static_cast<MOSDPGScan*>(op->request);
assert(m->get_header().type == MSG_OSD_PG_SCAN);
@@ -1278,7 +1280,9 @@ void ReplicatedPG::do_scan(OpRequestRef op)
BackfillInterval bi;
osr->flush();
- scan_range(m->begin, g_conf->osd_backfill_scan_min, g_conf->osd_backfill_scan_max, &bi);
+ scan_range(
+ m->begin, g_conf->osd_backfill_scan_min,
+ g_conf->osd_backfill_scan_max, &bi, handle);
MOSDPGScan *reply = new MOSDPGScan(MOSDPGScan::OP_SCAN_DIGEST,
get_osdmap()->get_epoch(), m->query_epoch,
info.pgid, bi.begin, bi.end);
@@ -6875,7 +6879,9 @@ void ReplicatedPG::check_recovery_sources(const OSDMapRef osdmap)
}
-int ReplicatedPG::start_recovery_ops(int max, RecoveryCtx *prctx)
+int ReplicatedPG::start_recovery_ops(
+ int max, RecoveryCtx *prctx,
+ ThreadPool::TPHandle &handle)
{
int started = 0;
assert(is_primary());
@@ -6898,15 +6904,15 @@ int ReplicatedPG::start_recovery_ops(int max, RecoveryCtx *prctx)
if (num_missing == num_unfound) {
// All of the missing objects we have are unfound.
// Recover the replicas.
- started = recover_replicas(max);
+ started = recover_replicas(max, handle);
}
if (!started) {
// We still have missing objects that we should grab from replicas.
- started += recover_primary(max);
+ started += recover_primary(max, handle);
}
if (!started && num_unfound != get_num_unfound()) {
// second chance to recovery replicas
- started = recover_replicas(max);
+ started = recover_replicas(max, handle);
}
bool deferred_backfill = false;
@@ -6931,7 +6937,7 @@ int ReplicatedPG::start_recovery_ops(int max, RecoveryCtx *prctx)
}
deferred_backfill = true;
} else {
- started += recover_backfill(max - started);
+ started += recover_backfill(max - started, handle);
}
}
@@ -6993,7 +6999,7 @@ int ReplicatedPG::start_recovery_ops(int max, RecoveryCtx *prctx)
* do one recovery op.
* return true if done, false if nothing left to do.
*/
-int ReplicatedPG::recover_primary(int max)
+int ReplicatedPG::recover_primary(int max, ThreadPool::TPHandle &handle)
{
assert(is_primary());
@@ -7012,6 +7018,7 @@ int ReplicatedPG::recover_primary(int max)
map<version_t, hobject_t>::const_iterator p =
missing.rmissing.lower_bound(pg_log.get_log().last_requested);
while (p != missing.rmissing.end()) {
+ handle.reset_tp_timeout();
hobject_t soid;
version_t v = p->first;
@@ -7204,7 +7211,7 @@ int ReplicatedPG::prep_object_replica_pushes(
return 1;
}
-int ReplicatedPG::recover_replicas(int max)
+int ReplicatedPG::recover_replicas(int max, ThreadPool::TPHandle &handle)
{
dout(10) << __func__ << "(" << max << ")" << dendl;
int started = 0;
@@ -7226,6 +7233,7 @@ int ReplicatedPG::recover_replicas(int max)
for (map<version_t, hobject_t>::const_iterator p = m.rmissing.begin();
p != m.rmissing.end() && started < max;
++p) {
+ handle.reset_tp_timeout();
const hobject_t soid(p->second);
if (pushing.count(soid)) {
@@ -7275,7 +7283,9 @@ int ReplicatedPG::recover_replicas(int max)
* peer_info[backfill_target].last_backfill = MIN(peer_backfill_info.begin,
* backfill_info.begin, backfills_in_flight)
*/
-int ReplicatedPG::recover_backfill(int max)
+int ReplicatedPG::recover_backfill(
+ int max,
+ ThreadPool::TPHandle &handle)
{
dout(10) << "recover_backfill (" << max << ")" << dendl;
assert(backfill_target >= 0);
@@ -7305,7 +7315,7 @@ int ReplicatedPG::recover_backfill(int max)
dout(10) << " rescanning local backfill_info from " << backfill_pos << dendl;
backfill_info.clear();
osr->flush();
- scan_range(backfill_pos, local_min, local_max, &backfill_info);
+ scan_range(backfill_pos, local_min, local_max, &backfill_info, handle);
int ops = 0;
map<hobject_t, pair<eversion_t, eversion_t> > to_push;
@@ -7319,7 +7329,8 @@ int ReplicatedPG::recover_backfill(int max)
if (backfill_info.begin <= pbi.begin &&
!backfill_info.extends_to_end() && backfill_info.empty()) {
osr->flush();
- scan_range(backfill_info.end, local_min, local_max, &backfill_info);
+ scan_range(backfill_info.end, local_min, local_max, &backfill_info,
+ handle);
backfill_info.trim();
}
backfill_pos = backfill_info.begin > pbi.begin ? pbi.begin : backfill_info.begin;
@@ -7407,6 +7418,7 @@ int ReplicatedPG::recover_backfill(int max)
for (map<hobject_t, eversion_t>::iterator i = to_remove.begin();
i != to_remove.end();
++i) {
+ handle.reset_tp_timeout();
send_remove_op(i->first, i->second, backfill_target);
}
@@ -7414,6 +7426,7 @@ int ReplicatedPG::recover_backfill(int max)
for (map<hobject_t, pair<eversion_t, eversion_t> >::iterator i = to_push.begin();
i != to_push.end();
++i) {
+ handle.reset_tp_timeout();
prep_backfill_object_push(
i->first, i->second.first, i->second.second, backfill_target, &pushes);
}
@@ -7480,7 +7493,9 @@ void ReplicatedPG::prep_backfill_object_push(
put_object_context(obc);
}
-void ReplicatedPG::scan_range(hobject_t begin, int min, int max, BackfillInterval *bi)
+void ReplicatedPG::scan_range(
+ hobject_t begin, int min, int max, BackfillInterval *bi,
+ ThreadPool::TPHandle &handle)
{
assert(is_locked());
dout(10) << "scan_range from " << begin << dendl;
@@ -7496,6 +7511,7 @@ void ReplicatedPG::scan_range(hobject_t begin, int min, int max, BackfillInterva
dout(20) << ls << dendl;
for (vector<hobject_t>::iterator p = ls.begin(); p != ls.end(); ++p) {
+ handle.reset_tp_timeout();
ObjectContext *obc = NULL;
if (is_primary())
obc = _lookup_object_context(*p);
diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h
index 7b70b4381ea..41c8106ea00 100644
--- a/src/osd/ReplicatedPG.h
+++ b/src/osd/ReplicatedPG.h
@@ -759,10 +759,13 @@ protected:
void _clear_recovery_state();
void queue_for_recovery();
- int start_recovery_ops(int max, RecoveryCtx *prctx);
- int recover_primary(int max);
- int recover_replicas(int max);
- int recover_backfill(int max);
+ int start_recovery_ops(
+ int max, RecoveryCtx *prctx,
+ ThreadPool::TPHandle &handle);
+
+ int recover_primary(int max, ThreadPool::TPHandle &handle);
+ int recover_replicas(int max, ThreadPool::TPHandle &handle);
+ int recover_backfill(int max, ThreadPool::TPHandle &handle);
/**
* scan a (hash) range of objects in the current pg
@@ -772,7 +775,10 @@ protected:
* @max return no more than this many items
* @bi [out] resulting map of objects to eversion_t's
*/
- void scan_range(hobject_t begin, int min, int max, BackfillInterval *bi);
+ void scan_range(
+ hobject_t begin, int min, int max, BackfillInterval *bi,
+ ThreadPool::TPHandle &handle
+ );
void prep_backfill_object_push(
hobject_t oid, eversion_t v, eversion_t have, int peer,
@@ -939,7 +945,9 @@ public:
void do_pg_op(OpRequestRef op);
void do_sub_op(OpRequestRef op);
void do_sub_op_reply(OpRequestRef op);
- void do_scan(OpRequestRef op);
+ void do_scan(
+ OpRequestRef op,
+ ThreadPool::TPHandle &handle);
void do_backfill(OpRequestRef op);
void _do_push(OpRequestRef op);
void _do_pull_response(OpRequestRef op);