summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSage Weil <sage@newdream.net>2012-05-05 16:25:26 -0700
committerSage Weil <sage@newdream.net>2012-05-05 16:25:26 -0700
commitc8bd471b59aee862add90013c76e3d83bcbe08f0 (patch)
tree8d8c7f178bc073c0ff5272f9a66ad78c0217444b
parent203a7d67aa0836409f1f393bb40485ddb8b2c8f6 (diff)
downloadceph-c8bd471b59aee862add90013c76e3d83bcbe08f0.tar.gz
objectcacher: flush range, set
Add ability to flush a range of an object, or a vector of ObjectExtents. Flush any buffers that intersect the specified range, or the entire object if len==0. Signed-off-by: Sage Weil <sage@newdream.net>
-rw-r--r--src/osdc/ObjectCacher.cc74
-rw-r--r--src/osdc/ObjectCacher.h14
2 files changed, 79 insertions, 9 deletions
diff --git a/src/osdc/ObjectCacher.cc b/src/osdc/ObjectCacher.cc
index ea0205add71..bab8fcb3db2 100644
--- a/src/osdc/ObjectCacher.cc
+++ b/src/osdc/ObjectCacher.cc
@@ -1340,7 +1340,7 @@ void ObjectCacher::wrunlock(Object *o)
return;
}
- flush(o); // flush first
+ flush(o, 0, 0); // flush first
int op = 0;
if (o->rdlock_ref > 0) {
@@ -1425,19 +1425,31 @@ void ObjectCacher::purge(Object *ob)
// flush. non-blocking. no callback.
// true if clean, already flushed.
// false if we wrote something.
-bool ObjectCacher::flush(Object *ob)
+// be sloppy about the ranges and flush any buffer it touches
+bool ObjectCacher::flush(Object *ob, loff_t offset, loff_t length)
{
bool clean = true;
- for (map<loff_t,BufferHead*>::iterator p = ob->data.begin();
- p != ob->data.end();
- p++) {
+ ldout(cct, 10) << "flush " << *ob << " " << offset << "~" << length << dendl;
+ map<loff_t,BufferHead*>::iterator p = ob->data.lower_bound(offset);
+ if (p != ob->data.begin() &&
+ (p == ob->data.end() || p->first > offset)) {
+ p--; // might overlap!
+ if (p->first + p->second->length() <= offset)
+ p++; // doesn't overlap.
+ }
+ for ( ; p != ob->data.end(); p++) {
BufferHead *bh = p->second;
+ ldout(cct, 20) << "flush " << *bh << dendl;
+ if (length && bh->start() > offset+length) {
+ break;
+ }
if (bh->is_tx()) {
clean = false;
continue;
}
- if (!bh->is_dirty()) continue;
-
+ if (!bh->is_dirty()) {
+ continue;
+ }
bh_write(bh);
clean = false;
}
@@ -1463,7 +1475,7 @@ bool ObjectCacher::flush_set(ObjectSet *oset, Context *onfinish)
!i.end(); ++i) {
Object *ob = *i;
- if (!flush(ob)) {
+ if (!flush(ob, 0, 0)) {
// we'll need to gather...
safe = false;
@@ -1485,6 +1497,52 @@ bool ObjectCacher::flush_set(ObjectSet *oset, Context *onfinish)
return false;
}
+// flush. non-blocking, takes callback.
+// returns true if already flushed
+bool ObjectCacher::flush_set(ObjectSet *oset, vector<ObjectExtent>& exv, Context *onfinish)
+{
+ if (oset->objects.empty()) {
+ ldout(cct, 10) << "flush_set on " << oset << " dne" << dendl;
+ return true;
+ }
+
+ ldout(cct, 10) << "flush_set " << oset << " on " << exv.size() << " ObjectExtents" << dendl;
+
+ // we'll need to wait for all objects to flush!
+ C_GatherBuilder gather(cct, onfinish);
+
+ bool safe = true;
+ for (vector<ObjectExtent>::iterator p = exv.begin();
+ p != exv.end();
+ ++p) {
+ ObjectExtent &ex = *p;
+ sobject_t soid(ex.oid, CEPH_NOSNAP);
+ if (objects[oset->poolid].count(soid) == 0)
+ continue;
+ Object *ob = objects[oset->poolid][soid];
+
+ ldout(cct, 20) << "flush_set " << oset << " ex " << ex << " ob " << soid << " " << ob << dendl;
+
+ if (!flush(ob, ex.offset, ex.length)) {
+ // we'll need to gather...
+ safe = false;
+
+ ldout(cct, 10) << "flush_set " << oset << " will wait for ack tid "
+ << ob->last_write_tid << " on " << *ob << dendl;
+ if (onfinish != NULL)
+ ob->waitfor_commit[ob->last_write_tid].push_back(gather.new_sub());
+ }
+ }
+ if (onfinish != NULL)
+ gather.activate();
+
+ if (safe) {
+ ldout(cct, 10) << "flush_set " << oset << " has no dirty|tx bhs" << dendl;
+ return true;
+ }
+ return false;
+}
+
// commit. non-blocking, takes callback.
// return true if already flushed.
diff --git a/src/osdc/ObjectCacher.h b/src/osdc/ObjectCacher.h
index 07ed0342736..db02727c6d2 100644
--- a/src/osdc/ObjectCacher.h
+++ b/src/osdc/ObjectCacher.h
@@ -366,7 +366,18 @@ class ObjectCacher {
void trim(loff_t max=-1);
void flush(loff_t amount=0);
- bool flush(Object *o);
+ /**
+ * flush a range of buffers
+ *
+ * Flush any buffers that intersect the specified extent. If len==0,
+ * flush *all* buffers for the object.
+ *
+ * @param o object
+ * @param off start offset
+ * @param len extent length, or 0 for entire object
+ * @return true if object was already clean/flushed.
+ */
+ bool flush(Object *o, loff_t off, loff_t len);
loff_t release(Object *o);
void purge(Object *o);
@@ -481,6 +492,7 @@ public:
bool set_is_dirty_or_committing(ObjectSet *oset);
bool flush_set(ObjectSet *oset, Context *onfinish=0);
+ bool flush_set(ObjectSet *oset, vector<ObjectExtent>& ex, Context *onfinish=0);
void flush_all(Context *onfinish=0);
bool commit_set(ObjectSet *oset, Context *oncommit);