summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSage Weil <sage@inktank.com>2013-08-28 15:04:16 -0700
committerSage Weil <sage@inktank.com>2013-09-03 15:48:30 -0700
commited6807919897e92144ac798b17922784f53d4608 (patch)
tree31c8f7d3d62389f9bf78d517cf80313c3f81cdb1
parent3a8adf53143a0841b4971d68d26f26ca274e902b (diff)
downloadceph-ed6807919897e92144ac798b17922784f53d4608.tar.gz
osd: initial COPY_FROM (not viable for large objects)
Initial pass at COPY_FROM implementation. This uses COPY_GET to read an object from another OSD and write it locally. It chunks the read but accumulates it all in-memory and commits it at once, so it is only suitable for smaller objects. Signed-off-by: Sage Weil <sage@inktank.com>
-rw-r--r--src/common/config_opts.h1
-rw-r--r--src/osd/ReplicatedPG.cc219
-rw-r--r--src/osd/ReplicatedPG.h48
-rw-r--r--src/osd/osd_types.h6
-rw-r--r--src/test/librados/misc.cc44
5 files changed, 305 insertions, 13 deletions
diff --git a/src/common/config_opts.h b/src/common/config_opts.h
index 328f7f4b94d..2fa72d4ce0f 100644
--- a/src/common/config_opts.h
+++ b/src/common/config_opts.h
@@ -444,6 +444,7 @@ OPTION(osd_recovery_delay_start, OPT_FLOAT, 0)
OPTION(osd_recovery_max_active, OPT_INT, 15)
OPTION(osd_recovery_max_single_start, OPT_INT, 5)
OPTION(osd_recovery_max_chunk, OPT_U64, 8<<20) // max size of push chunk
+OPTION(osd_copyfrom_max_chunk, OPT_U64, 8<<20) // max size of a COPYFROM chunk
OPTION(osd_push_per_object_cost, OPT_U64, 1000) // push cost per object
OPTION(osd_max_push_cost, OPT_U64, 8<<20) // max size of push message
OPTION(osd_max_push_objects, OPT_U64, 10) // max objects in single push op
diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc
index 0027edda077..2c96180b13a 100644
--- a/src/osd/ReplicatedPG.cc
+++ b/src/osd/ReplicatedPG.cc
@@ -1000,6 +1000,11 @@ void ReplicatedPG::execute_ctx(OpContext *ctx)
p->second->ondisk_read_unlock();
}
+ if (result == -EINPROGRESS) {
+ // come back later.
+ return;
+ }
+
if (result == -EAGAIN) {
// clean up after the ctx
delete ctx;
@@ -3386,6 +3391,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
if (result < 0)
break;
cursor.attr_complete = true;
+ dout(20) << " got attrs" << dendl;
}
::encode(out_attrs, osd_op.outdata);
@@ -3395,15 +3401,17 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
bufferlist bl;
if (left > 0 && !cursor.data_complete) {
if (cursor.data_offset < oi.size) {
- result = osd->store->read(coll, oi.soid, cursor.data_offset, out_max, bl);
+ result = osd->store->read(coll, oi.soid, cursor.data_offset, left, bl);
if (result < 0)
return result;
assert(result <= left);
left -= result;
cursor.data_offset += result;
}
- if (cursor.data_offset == oi.size)
+ if (cursor.data_offset == oi.size) {
cursor.data_complete = true;
+ dout(20) << " got data" << dendl;
+ }
}
::encode(bl, osd_op.outdata);
@@ -3423,15 +3431,73 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
cursor.omap_offset = iter->key();
} else {
cursor.omap_complete = true;
+ dout(20) << " got omap" << dendl;
}
}
::encode(out_omap, osd_op.outdata);
+ dout(20) << " cursor.is_complete=" << cursor.is_complete() << dendl;
::encode(cursor, osd_op.outdata);
result = 0;
}
break;
+ case CEPH_OSD_OP_COPY_FROM:
+ ++ctx->num_write;
+ {
+ object_t src_name;
+ object_locator_t src_oloc;
+ snapid_t src_snapid = (uint64_t)op.copy_from.snapid;
+ version_t src_version = op.copy_from.src_version;
+ try {
+ ::decode(src_name, bp);
+ ::decode(src_oloc, bp);
+ }
+ catch (buffer::error& e) {
+ result = -EINVAL;
+ goto fail;
+ }
+ pg_t raw_pg;
+ get_osdmap()->object_locator_to_pg(src_name, src_oloc, raw_pg);
+ hobject_t src(src_name, src_oloc.key, src_snapid,
+ raw_pg.ps(), raw_pg.pool(),
+ src_oloc.nspace);
+ if (!ctx->copy_op) {
+ // start
+ result = start_copy(ctx, src, src_oloc, src_version, &ctx->copy_op);
+ if (result < 0)
+ goto fail;
+ result = -EINPROGRESS;
+ } else {
+ // finish
+ CopyOpRef cop = ctx->copy_op;
+
+ if (!obs.exists) {
+ ctx->delta_stats.num_objects++;
+ obs.exists = true;
+ } else {
+ t.remove(coll, soid);
+ }
+ t.write(coll, soid, 0, cop->data.length(), cop->data);
+ for (map<string,bufferlist>::iterator p = cop->attrs.begin(); p != cop->attrs.end(); ++p)
+ t.setattr(coll, soid, string("_") + p->first, p->second);
+ t.omap_setkeys(coll, soid, cop->omap);
+
+ interval_set<uint64_t> ch;
+ if (oi.size > 0)
+ ch.insert(0, oi.size);
+ ctx->modified_ranges.union_of(ch);
+
+ if (cop->data.length() != oi.size) {
+ ctx->delta_stats.num_bytes -= oi.size;
+ oi.size = cop->data.length();
+ ctx->delta_stats.num_bytes += oi.size;
+ }
+ ctx->delta_stats.num_wr++;
+ ctx->delta_stats.num_wr_kb += SHIFT_ROUND_UP(cop->data.length(), 10);
+ }
+ }
+ break;
default:
dout(1) << "unrecognized osd op " << op.op
@@ -4013,6 +4079,152 @@ int ReplicatedPG::prepare_transaction(OpContext *ctx)
return result;
}
+// ========================================================================
+// copyfrom
+
+struct C_Copyfrom : public Context {
+ ReplicatedPGRef pg;
+ hobject_t oid;
+ epoch_t last_peering_reset;
+ tid_t tid;
+ C_Copyfrom(ReplicatedPG *p, hobject_t o, epoch_t lpr)
+ : pg(p), oid(o), last_peering_reset(lpr),
+ tid(0)
+ {}
+ void finish(int r) {
+ pg->lock();
+ if (last_peering_reset == pg->get_last_peering_reset()) {
+ pg->process_copy_chunk(oid, tid, r);
+ }
+ pg->unlock();
+ }
+};
+
+int ReplicatedPG::start_copy(OpContext *ctx,
+ hobject_t src, object_locator_t oloc, version_t version,
+ CopyOpRef *pcop)
+{
+ const hobject_t& dest = ctx->obs->oi.soid;
+ dout(10) << __func__ << " " << dest << " ctx " << ctx
+ << " from " << src << " " << oloc << " v" << version
+ << dendl;
+
+ // cancel a previous in-progress copy?
+ if (copy_ops.count(dest)) {
+ // FIXME: if the src etc match, we could avoid restarting from the
+ // beginning.
+ CopyOpRef cop = copy_ops[dest];
+ cancel_copy(cop);
+ }
+
+ CopyOpRef cop(new CopyOp(ctx, src, oloc, version));
+ copy_ops[dest] = cop;
+ ctx->copy_op = cop;
+ ++ctx->obc->copyfrom_readside;
+
+ _copy_some(ctx, cop);
+
+ return 0;
+}
+
+void ReplicatedPG::_copy_some(OpContext *ctx, CopyOpRef cop)
+{
+ dout(10) << __func__ << " " << ctx << " " << cop << dendl;
+ ObjectOperation op;
+ op.assert_version(cop->version);
+ op.copy_get(&cop->cursor, g_conf->osd_copyfrom_max_chunk,
+ &cop->size, &cop->mtime, &cop->attrs,
+ &cop->data, &cop->omap,
+ &cop->rval);
+
+ C_Copyfrom *fin = new C_Copyfrom(this, ctx->obs->oi.soid,
+ get_last_peering_reset());
+ osd->objecter_lock.Lock();
+ tid_t tid = osd->objecter->read(cop->src.oid, cop->oloc, op,
+ cop->src.snap, NULL, 0,
+ new C_OnFinisher(fin,
+ &osd->objecter_finisher),
+ NULL);
+ fin->tid = tid;
+ cop->objecter_tid = tid;
+ osd->objecter_lock.Unlock();
+}
+
+void ReplicatedPG::process_copy_chunk(hobject_t oid, tid_t tid, int r)
+{
+ dout(10) << __func__ << " tid " << tid << " " << cpp_strerror(r) << dendl;
+ map<hobject_t,CopyOpRef>::iterator p = copy_ops.find(oid);
+ if (p == copy_ops.end()) {
+ dout(10) << __func__ << " no copy_op found" << dendl;
+ return;
+ }
+ CopyOpRef cop = p->second;
+ if (tid != cop->objecter_tid) {
+ dout(10) << __func__ << " tid " << tid << " != cop " << cop
+ << " tid " << cop->objecter_tid << dendl;
+ return;
+ }
+ OpContext *ctx = cop->ctx;
+ cop->objecter_tid = 0;
+ if (r < 0) {
+ copy_ops.erase(ctx->obc->obs.oi.soid);
+ --ctx->obc->copyfrom_readside;
+ reply_ctx(ctx, r);
+ return;
+ }
+ assert(cop->rval >= 0);
+
+ // FIXME: this is accumulating the entire object in memory.
+
+ if (!cop->cursor.is_complete()) {
+ dout(10) << __func__ << " fetching more" << dendl;
+ _copy_some(ctx, cop);
+ return;
+ }
+
+ dout(20) << __func__ << " complete; committing" << dendl;
+ execute_ctx(ctx);
+
+ copy_ops.erase(ctx->obc->obs.oi.soid);
+ --ctx->obc->copyfrom_readside;
+ ctx->copy_op.reset();
+}
+
+void ReplicatedPG::cancel_copy(CopyOpRef cop)
+{
+ OpContext *ctx = cop->ctx;
+ dout(10) << __func__ << " " << ctx->obc->obs.oi.soid << " ctx " << ctx
+ << " from " << cop->src << " " << cop->oloc << " v" << cop->version
+ << dendl;
+
+ // cancel objecter op, if we can
+ if (cop->objecter_tid) {
+ Mutex::Locker l(osd->objecter_lock);
+ osd->objecter->op_cancel(cop->objecter_tid);
+ }
+
+ copy_ops.erase(ctx->obc->obs.oi.soid);
+ --ctx->obc->copyfrom_readside;
+ ctx->copy_op.reset();
+
+ delete ctx;
+}
+
+void ReplicatedPG::requeue_cancel_copy_ops(bool requeue)
+{
+ dout(10) << __func__ << dendl;
+ for (map<hobject_t,CopyOpRef>::iterator p = copy_ops.begin();
+ p != copy_ops.end();
+ copy_ops.erase(p++)) {
+ // requeue initiating copy *and* any subsequent waiters
+ CopyOpRef cop = p->second;
+ if (requeue) {
+ cop->waiting.push_front(cop->ctx->op);
+ requeue_ops(cop->waiting);
+ }
+ cancel_copy(cop);
+ }
+}
// ========================================================================
@@ -6736,6 +6948,7 @@ void ReplicatedPG::on_shutdown()
deleting = true;
unreg_next_scrub();
+ requeue_cancel_copy_ops(false);
apply_and_flush_repops(false);
context_registry_on_change();
@@ -6786,6 +6999,8 @@ void ReplicatedPG::on_change(ObjectStore::Transaction *t)
context_registry_on_change();
+ requeue_cancel_copy_ops(is_primary());
+
// requeue object waiters
if (is_primary()) {
requeue_ops(waiting_for_backfill_pos);
diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h
index 5dc7d882a8b..254b5842ffc 100644
--- a/src/osd/ReplicatedPG.h
+++ b/src/osd/ReplicatedPG.h
@@ -83,7 +83,40 @@ public:
class ReplicatedPG : public PG {
friend class OSD;
friend class Watch;
-public:
+
+public:
+
+ /*
+ * state associated with a copy operation
+ */
+ struct OpContext;
+
+ struct CopyOp {
+ OpContext *ctx;
+ hobject_t src;
+ object_locator_t oloc;
+ version_t version;
+
+ tid_t objecter_tid;
+
+ list<OpRequestRef> waiting;
+
+ object_copy_cursor_t cursor;
+ uint64_t size;
+ utime_t mtime;
+ map<string,bufferlist> attrs;
+ bufferlist data;
+ map<string,bufferlist> omap;
+ int rval;
+
+ CopyOp(OpContext *c, hobject_t s, object_locator_t l, version_t v)
+ : ctx(c), src(s), oloc(l), version(v),
+ objecter_tid(0),
+ size(0),
+ rval(-1)
+ {}
+ };
+ typedef boost::shared_ptr<CopyOp> CopyOpRef;
/*
* Capture all object state associated with an in-progress read or write.
@@ -145,6 +178,8 @@ public:
int num_read; ///< count read ops
int num_write; ///< count update ops
+ CopyOpRef copy_op;
+
OpContext(const OpContext& other);
const OpContext& operator=(const OpContext& other);
@@ -749,6 +784,17 @@ protected:
void log_subop_stats(OpRequestRef op, int tag_inb, int tag_lat);
+ // -- copyfrom --
+ map<hobject_t, CopyOpRef> copy_ops;
+
+ int start_copy(OpContext *ctx, hobject_t src, object_locator_t oloc, version_t version,
+ CopyOpRef *pcop);
+ void process_copy_chunk(hobject_t oid, tid_t tid, int r);
+ void _copy_some(OpContext *ctx, CopyOpRef cop);
+ void cancel_copy(CopyOpRef cop);
+ void requeue_cancel_copy_ops(bool requeue=true);
+
+ friend class C_Copyfrom;
// -- scrub --
virtual void _scrub(ScrubMap& map);
diff --git a/src/osd/osd_types.h b/src/osd/osd_types.h
index 00e9409c98a..312eb81e3fd 100644
--- a/src/osd/osd_types.h
+++ b/src/osd/osd_types.h
@@ -2130,6 +2130,9 @@ public:
Cond cond;
int unstable_writes, readers, writers_waiting, readers_waiting;
+ /// in-progress copyfrom ops for this object
+ int copyfrom_readside;
+
// set if writes for this object are blocked on another objects recovery
ObjectContextRef blocked_by; // object blocking our writes
set<ObjectContextRef> blocking; // objects whose writes we block
@@ -2141,7 +2144,8 @@ public:
: ssc(NULL),
destructor_callback(0),
lock("ReplicatedPG::ObjectContext::lock"),
- unstable_writes(0), readers(0), writers_waiting(0), readers_waiting(0) {}
+ unstable_writes(0), readers(0), writers_waiting(0), readers_waiting(0),
+ copyfrom_readside(0) {}
~ObjectContext() {
if (destructor_callback)
diff --git a/src/test/librados/misc.cc b/src/test/librados/misc.cc
index 9fe6427d3f8..af17847aeab 100644
--- a/src/test/librados/misc.cc
+++ b/src/test/librados/misc.cc
@@ -571,18 +571,44 @@ TEST(LibRadosMisc, CopyPP) {
IoCtx ioctx;
ASSERT_EQ(0, cluster.ioctx_create(pool_name.c_str(), ioctx));
- char buf[64];
- memset(buf, 0xcc, sizeof(buf));
- bufferlist bl;
- bl.append(buf, sizeof(buf));
-
- ASSERT_EQ(0, ioctx.write_full("foo", bl));
+ bufferlist bl, x;
+ bl.append("hi there");
+ x.append("bar");
+ // small object
+ bufferlist blc = bl;
+ bufferlist xc = x;
+ ASSERT_EQ(0, ioctx.write_full("foo", blc));
+ ASSERT_EQ(0, ioctx.setxattr("foo", "myattr", xc));
ObjectWriteOperation op;
- op.copyfrom("foo", ioctx, ioctx.get_last_version());
-
- ASSERT_EQ(0, ioctx.operate("bar", &op));
+ op.copy_from("foo", ioctx, ioctx.get_last_version());
+ ASSERT_EQ(0, ioctx.operate("foo.copy", &op));
+
+ bufferlist bl2, x2;
+ ASSERT_EQ((int)bl.length(), ioctx.read("foo.copy", bl2, 10000, 0));
+ ASSERT_TRUE(bl.contents_equal(bl2));
+ ASSERT_EQ((int)x.length(), ioctx.getxattr("foo.copy", "myattr", x2));
+ ASSERT_TRUE(x.contents_equal(x2));
+
+ // do a big object
+ bl.append(buffer::create(8000000));
+ bl.zero();
+ bl.append("tail");
+ blc = bl;
+ xc = x;
+ ASSERT_EQ(0, ioctx.write_full("big", blc));
+ ASSERT_EQ(0, ioctx.setxattr("big", "myattr", xc));
+
+ ObjectWriteOperation op2;
+ op.copy_from("big", ioctx, ioctx.get_last_version());
+ ASSERT_EQ(0, ioctx.operate("big.copy", &op));
+
+ bl2.clear();
+ ASSERT_EQ((int)bl.length(), ioctx.read("big.copy", bl2, bl.length(), 0));
+ ASSERT_TRUE(bl.contents_equal(bl2));
+ ASSERT_EQ((int)x.length(), ioctx.getxattr("foo.copy", "myattr", x2));
+ ASSERT_TRUE(x.contents_equal(x2));
ioctx.close();
ASSERT_EQ(0, destroy_one_pool_pp(pool_name, cluster));