summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSage Weil <sage@inktank.com>2013-09-11 15:10:47 -0700
committerSage Weil <sage@inktank.com>2013-09-17 11:06:27 -0700
commitf97277cc50a1bfe34689aadda65eb8a4b8521798 (patch)
tree8463d3833d9e2d4292ce3bcc4a147bb0f85eb037
parentdf7c36ac08ec59b5435e758713dd65598410d763 (diff)
downloadceph-f97277cc50a1bfe34689aadda65eb8a4b8521798.tar.gz
osd: add infrastructure to block io on an obc
Add an is_blocked() method for the obc, and add infrastructure to block any operations if it returns true. Clean up on_change(), and add a helper to kick an obc when whatever condition leading to it being blocked is no longer true. For now, is_blocked() is always false... Signed-off-by: Sage Weil <sage@inktank.com>
-rw-r--r--src/osd/PG.h3
-rw-r--r--src/osd/ReplicatedPG.cc41
-rw-r--r--src/osd/ReplicatedPG.h3
-rw-r--r--src/osd/osd_types.h5
4 files changed, 50 insertions, 2 deletions
diff --git a/src/osd/PG.h b/src/osd/PG.h
index cbafd0f43d9..cdbe827a4a9 100644
--- a/src/osd/PG.h
+++ b/src/osd/PG.h
@@ -523,7 +523,8 @@ protected:
list<OpRequestRef> waiting_for_active;
list<OpRequestRef> waiting_for_all_missing;
map<hobject_t, list<OpRequestRef> > waiting_for_missing_object,
- waiting_for_degraded_object;
+ waiting_for_degraded_object,
+ waiting_for_blocked_object;
// Callbacks should assume pg (and nothing else) is locked
map<hobject_t, list<Context*> > callbacks_for_degraded_object;
map<eversion_t,list<OpRequestRef> > waiting_for_ack, waiting_for_ondisk;
diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc
index c42825259b6..c9f4cb624de 100644
--- a/src/osd/ReplicatedPG.cc
+++ b/src/osd/ReplicatedPG.cc
@@ -192,6 +192,13 @@ void ReplicatedPG::wait_for_degraded_object(const hobject_t& soid, OpRequestRef
op->mark_delayed("waiting for degraded object");
}
+void ReplicatedPG::wait_for_blocked_object(const hobject_t& soid, OpRequestRef op)
+{
+ dout(10) << __func__ << " " << soid << " " << op << dendl;
+ waiting_for_blocked_object[soid].push_back(op);
+ op->mark_delayed("waiting for blocked object");
+}
+
void ReplicatedPG::wait_for_backfill_pos(OpRequestRef op)
{
waiting_for_backfill_pos.push_back(op);
@@ -735,7 +742,7 @@ void ReplicatedPG::do_op(OpRequestRef op)
osd->reply_op_error(op, r);
return;
}
-
+
// make sure locator is consistent
object_locator_t oloc(obc->obs.oi.soid);
if (m->get_object_locator() != oloc) {
@@ -746,6 +753,12 @@ void ReplicatedPG::do_op(OpRequestRef op)
<< " op " << *m << "\n";
}
+ // io blocked on obc?
+ if (obc->is_blocked()) {
+ wait_for_blocked_object(obc->obs.oi.soid, op);
+ return;
+ }
+
if ((op->may_read()) && (obc->obs.oi.lost)) {
// This object is lost. Reading from it returns an error.
dout(20) << __func__ << ": object " << obc->obs.oi.soid
@@ -5140,6 +5153,24 @@ void ReplicatedPG::add_object_context_to_pg_stat(ObjectContextRef obc, pg_stat_t
pgstat->stats.cat_sum[oi.category].add(stat);
}
+void ReplicatedPG::kick_object_context_blocked(ObjectContextRef obc)
+{
+ const hobject_t& soid = obc->obs.oi.soid;
+ map<hobject_t, list<OpRequestRef> >::iterator p = waiting_for_blocked_object.find(soid);
+ if (p == waiting_for_blocked_object.end())
+ return;
+
+ if (obc->is_blocked()) {
+ dout(10) << __func__ << " " << soid << " still blocked" << dendl;
+ return;
+ }
+
+ list<OpRequestRef>& ls = waiting_for_blocked_object[soid];
+ dout(10) << __func__ << " " << soid << " requeuing " << ls.size() << " requests" << dendl;
+ requeue_ops(ls);
+ waiting_for_blocked_object.erase(soid);
+}
+
SnapSetContext *ReplicatedPG::create_snapset_context(const object_t& oid)
{
Mutex::Locker l(snapset_contexts_lock);
@@ -7095,6 +7126,14 @@ void ReplicatedPG::on_change(ObjectStore::Transaction *t)
p->second.clear();
finish_degraded_object(p->first);
}
+ for (map<hobject_t,list<OpRequestRef> >::iterator p = waiting_for_blocked_object.begin();
+ p != waiting_for_blocked_object.end();
+ waiting_for_blocked_object.erase(p++)) {
+ if (is_primary())
+ requeue_ops(p->second);
+ else
+ p->second.clear();
+ }
if (is_primary())
requeue_ops(waiting_for_all_missing);
diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h
index 80ee9cf8d29..a58e1070739 100644
--- a/src/osd/ReplicatedPG.h
+++ b/src/osd/ReplicatedPG.h
@@ -936,6 +936,9 @@ public:
bool is_degraded_object(const hobject_t& oid);
void wait_for_degraded_object(const hobject_t& oid, OpRequestRef op);
+ void wait_for_blocked_object(const hobject_t& soid, OpRequestRef op);
+ void kick_object_context_blocked(ObjectContextRef obc);
+
void mark_all_unfound_lost(int what);
eversion_t pick_newest_available(const hobject_t& oid);
ObjectContextRef mark_object_lost(ObjectStore::Transaction *t,
diff --git a/src/osd/osd_types.h b/src/osd/osd_types.h
index da139b853b1..45937a91dd8 100644
--- a/src/osd/osd_types.h
+++ b/src/osd/osd_types.h
@@ -2151,6 +2151,11 @@ public:
if (destructor_callback)
destructor_callback->complete(0);
}
+
+ bool is_blocked() const {
+ return false;
+ }
+
// do simple synchronous mutual exclusion, for now. now waitqueues or anything fancy.
void ondisk_write_lock() {
lock.Lock();