diff options
author | Sage Weil <sage@inktank.com> | 2013-09-11 15:10:47 -0700 |
---|---|---|
committer | Sage Weil <sage@inktank.com> | 2013-09-17 11:06:27 -0700 |
commit | f97277cc50a1bfe34689aadda65eb8a4b8521798 (patch) | |
tree | 8463d3833d9e2d4292ce3bcc4a147bb0f85eb037 | |
parent | df7c36ac08ec59b5435e758713dd65598410d763 (diff) | |
download | ceph-f97277cc50a1bfe34689aadda65eb8a4b8521798.tar.gz |
osd: add infrastructure to block io on an obc
Add an is_blocked() method for the obc, and add infrastructure to block
any operations if it returns true. Clean up on_change(), and add a helper
to kick an obc when whatever condition leading to it being blocked is no
longer true.
For now, is_blocked() is always false...
Signed-off-by: Sage Weil <sage@inktank.com>
-rw-r--r-- | src/osd/PG.h | 3 | ||||
-rw-r--r-- | src/osd/ReplicatedPG.cc | 41 | ||||
-rw-r--r-- | src/osd/ReplicatedPG.h | 3 | ||||
-rw-r--r-- | src/osd/osd_types.h | 5 |
4 files changed, 50 insertions, 2 deletions
diff --git a/src/osd/PG.h b/src/osd/PG.h index cbafd0f43d9..cdbe827a4a9 100644 --- a/src/osd/PG.h +++ b/src/osd/PG.h @@ -523,7 +523,8 @@ protected: list<OpRequestRef> waiting_for_active; list<OpRequestRef> waiting_for_all_missing; map<hobject_t, list<OpRequestRef> > waiting_for_missing_object, - waiting_for_degraded_object; + waiting_for_degraded_object, + waiting_for_blocked_object; // Callbacks should assume pg (and nothing else) is locked map<hobject_t, list<Context*> > callbacks_for_degraded_object; map<eversion_t,list<OpRequestRef> > waiting_for_ack, waiting_for_ondisk; diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index c42825259b6..c9f4cb624de 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -192,6 +192,13 @@ void ReplicatedPG::wait_for_degraded_object(const hobject_t& soid, OpRequestRef op->mark_delayed("waiting for degraded object"); } +void ReplicatedPG::wait_for_blocked_object(const hobject_t& soid, OpRequestRef op) +{ + dout(10) << __func__ << " " << soid << " " << op << dendl; + waiting_for_blocked_object[soid].push_back(op); + op->mark_delayed("waiting for blocked object"); +} + void ReplicatedPG::wait_for_backfill_pos(OpRequestRef op) { waiting_for_backfill_pos.push_back(op); @@ -735,7 +742,7 @@ void ReplicatedPG::do_op(OpRequestRef op) osd->reply_op_error(op, r); return; } - + // make sure locator is consistent object_locator_t oloc(obc->obs.oi.soid); if (m->get_object_locator() != oloc) { @@ -746,6 +753,12 @@ void ReplicatedPG::do_op(OpRequestRef op) << " op " << *m << "\n"; } + // io blocked on obc? + if (obc->is_blocked()) { + wait_for_blocked_object(obc->obs.oi.soid, op); + return; + } + if ((op->may_read()) && (obc->obs.oi.lost)) { // This object is lost. Reading from it returns an error. dout(20) << __func__ << ": object " << obc->obs.oi.soid @@ -5140,6 +5153,24 @@ void ReplicatedPG::add_object_context_to_pg_stat(ObjectContextRef obc, pg_stat_t pgstat->stats.cat_sum[oi.category].add(stat); } +void ReplicatedPG::kick_object_context_blocked(ObjectContextRef obc) +{ + const hobject_t& soid = obc->obs.oi.soid; + map<hobject_t, list<OpRequestRef> >::iterator p = waiting_for_blocked_object.find(soid); + if (p == waiting_for_blocked_object.end()) + return; + + if (obc->is_blocked()) { + dout(10) << __func__ << " " << soid << " still blocked" << dendl; + return; + } + + list<OpRequestRef>& ls = waiting_for_blocked_object[soid]; + dout(10) << __func__ << " " << soid << " requeuing " << ls.size() << " requests" << dendl; + requeue_ops(ls); + waiting_for_blocked_object.erase(soid); +} + SnapSetContext *ReplicatedPG::create_snapset_context(const object_t& oid) { Mutex::Locker l(snapset_contexts_lock); @@ -7095,6 +7126,14 @@ void ReplicatedPG::on_change(ObjectStore::Transaction *t) p->second.clear(); finish_degraded_object(p->first); } + for (map<hobject_t,list<OpRequestRef> >::iterator p = waiting_for_blocked_object.begin(); + p != waiting_for_blocked_object.end(); + waiting_for_blocked_object.erase(p++)) { + if (is_primary()) + requeue_ops(p->second); + else + p->second.clear(); + } if (is_primary()) requeue_ops(waiting_for_all_missing); diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index 80ee9cf8d29..a58e1070739 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -936,6 +936,9 @@ public: bool is_degraded_object(const hobject_t& oid); void wait_for_degraded_object(const hobject_t& oid, OpRequestRef op); + void wait_for_blocked_object(const hobject_t& soid, OpRequestRef op); + void kick_object_context_blocked(ObjectContextRef obc); + void mark_all_unfound_lost(int what); eversion_t pick_newest_available(const hobject_t& oid); ObjectContextRef mark_object_lost(ObjectStore::Transaction *t, diff --git a/src/osd/osd_types.h b/src/osd/osd_types.h index da139b853b1..45937a91dd8 100644 --- a/src/osd/osd_types.h +++ b/src/osd/osd_types.h @@ -2151,6 +2151,11 @@ public: if (destructor_callback) destructor_callback->complete(0); } + + bool is_blocked() const { + return false; + } + // do simple synchronous mutual exclusion, for now. now waitqueues or anything fancy. void ondisk_write_lock() { lock.Lock(); |