diff options
author | Sage Weil <sage@inktank.com> | 2013-09-04 17:09:52 -0700 |
---|---|---|
committer | Sage Weil <sage@inktank.com> | 2013-09-17 11:06:27 -0700 |
commit | 4e29e362e7981634d751ee982144fbf602782a9a (patch) | |
tree | 46d3bca55beff4dcf5bae6080cffc9b655c21b9e | |
parent | 25a608cdccaf667f465f44c9776ca0f8a132cece (diff) | |
download | ceph-4e29e362e7981634d751ee982144fbf602782a9a.tar.gz |
osd/ReplicatedPG: stage object chunks to replicas during COPY_FROM
As we get each chunk of data during the COPY_FROM operation, write it out
to a temporary object on the replicas. When we get all the pieces, move
it into place.
Signed-off-by: Sage Weil <sage@inktank.com>
-rw-r--r-- | src/osd/ReplicatedPG.cc | 76 | ||||
-rw-r--r-- | src/osd/ReplicatedPG.h | 5 | ||||
-rw-r--r-- | src/test/librados/misc.cc | 2 |
3 files changed, 73 insertions, 10 deletions
diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index ae1c38e4aef..2318aba6f7a 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -3435,7 +3435,11 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops) } ::encode(out_omap, osd_op.outdata); - dout(20) << " cursor.is_complete=" << cursor.is_complete() << dendl; + dout(20) << " cursor.is_complete=" << cursor.is_complete() + << " " << out_attrs.size() << " attrs" + << " " << bl.length() << " bytes" + << " " << out_omap.size() << " keys" + << dendl; ::encode(cursor, osd_op.outdata); result = 0; } @@ -3477,20 +3481,29 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops) } else { t.remove(coll, soid); } - t.write(coll, soid, 0, cop->data.length(), cop->data); - for (map<string,bufferlist>::iterator p = cop->attrs.begin(); p != cop->attrs.end(); ++p) - t.setattr(coll, soid, string("_") + p->first, p->second); - t.omap_setkeys(coll, soid, cop->omap); + + if (cop->temp_cursor.is_initial()) { + // write directly to final object + cop->temp_coll = coll; + cop->temp_oid = soid; + _write_copy_chunk(cop, &t); + } else { + // finish writing to temp object, then move into place + _write_copy_chunk(cop, &t); + t.collection_move_rename(cop->temp_coll, cop->temp_oid, coll, soid); + temp_contents.erase(cop->temp_oid); + ctx->old_temp_oid = cop->temp_oid; + } interval_set<uint64_t> ch; if (oi.size > 0) ch.insert(0, oi.size); ctx->modified_ranges.union_of(ch); - if (cop->data.length() != oi.size) { + if (cop->cursor.data_offset != oi.size) { ctx->delta_stats.num_bytes -= oi.size; - oi.size = cop->data.length(); ctx->delta_stats.num_bytes += oi.size; + oi.size = cop->cursor.data_offset; } ctx->delta_stats.num_wr++; ctx->delta_stats.num_wr_kb += SHIFT_ROUND_UP(cop->data.length(), 10); @@ -4182,8 +4195,27 @@ void ReplicatedPG::process_copy_chunk(hobject_t oid, tid_t tid, int r) } assert(cop->rval >= 0); - // FIXME: this is accumulating the entire object in memory. if (!cop->cursor.is_complete()) { + // write out what we have so far + vector<OSDOp> ops; + tid_t rep_tid = osd->get_tid(); + osd_reqid_t reqid(osd->get_cluster_msgr_name(), 0, rep_tid); + OpContext *tctx = new OpContext(OpRequestRef(), reqid, ops, &ctx->obc->obs, ctx->obc->ssc, this); + tctx->mtime = ceph_clock_now(g_ceph_context); + RepGather *repop = new_repop(tctx, ctx->obc, rep_tid); + + if (cop->temp_cursor.is_initial()) { + cop->temp_coll = get_temp_coll(&tctx->local_t); + cop->temp_oid = generate_temp_object(); + temp_contents.insert(cop->temp_oid); + repop->ctx->new_temp_oid = cop->temp_oid; + } + + _write_copy_chunk(cop, &tctx->op_t); + + issue_repop(repop, repop->ctx->mtime); + eval_repop(repop); + dout(10) << __func__ << " fetching more" << dendl; _copy_some(ctx, cop); return; @@ -4197,6 +4229,30 @@ void ReplicatedPG::process_copy_chunk(hobject_t oid, tid_t tid, int r) ctx->copy_op.reset(); } +void ReplicatedPG::_write_copy_chunk(CopyOpRef cop, ObjectStore::Transaction *t) +{ + dout(20) << __func__ << " " << cop + << " " << cop->attrs.size() << " attrs" + << " " << cop->data.length() << " bytes" + << " " << cop->omap.size() << " keys" + << dendl; + if (!cop->temp_cursor.attr_complete) { + t->touch(cop->temp_coll, cop->temp_oid); + for (map<string,bufferlist>::iterator p = cop->attrs.begin(); p != cop->attrs.end(); ++p) + t->setattr(cop->temp_coll, cop->temp_oid, string("_") + p->first, p->second); + cop->attrs.clear(); + } + if (!cop->temp_cursor.data_complete) { + t->write(cop->temp_coll, cop->temp_oid, cop->temp_cursor.data_offset, cop->data.length(), cop->data); + cop->data.clear(); + } + if (!cop->temp_cursor.omap_complete) { + t->omap_setkeys(cop->temp_coll, cop->temp_oid, cop->omap); + cop->omap.clear(); + } + cop->temp_cursor = cop->cursor; +} + void ReplicatedPG::cancel_copy(CopyOpRef cop) { OpContext *ctx = cop->ctx; @@ -4565,7 +4621,8 @@ void ReplicatedPG::issue_repop(RepGather *repop, utime_t now) } // ship resulting transaction, log entries, and pg_stats - if (peer == backfill_target && soid >= backfill_pos) { + if (peer == backfill_target && soid >= backfill_pos && + soid.pool == (int64_t)info.pgid.pool()) { // only skip normal (not temp pool=-1) objects dout(10) << "issue_repop shipping empty opt to osd." << peer << ", object beyond backfill_pos " << backfill_pos << ", last_backfill is " << pinfo.last_backfill << dendl; ObjectStore::Transaction t; @@ -4573,6 +4630,7 @@ void ReplicatedPG::issue_repop(RepGather *repop, utime_t now) } else { ::encode(repop->ctx->op_t, wr->get_data()); } + ::encode(repop->ctx->log, wr->logbl); if (backfill_target >= 0 && backfill_target == peer) diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index f80be1b9391..ab874351274 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -109,6 +109,10 @@ public: map<string,bufferlist> omap; int rval; + coll_t temp_coll; + hobject_t temp_oid; + object_copy_cursor_t temp_cursor; + CopyOp(OpContext *c, hobject_t s, object_locator_t l, version_t v) : ctx(c), src(s), oloc(l), version(v), objecter_tid(0), @@ -788,6 +792,7 @@ protected: int start_copy(OpContext *ctx, hobject_t src, object_locator_t oloc, version_t version, CopyOpRef *pcop); void process_copy_chunk(hobject_t oid, tid_t tid, int r); + void _write_copy_chunk(CopyOpRef cop, ObjectStore::Transaction *t); void _copy_some(OpContext *ctx, CopyOpRef cop); void cancel_copy(CopyOpRef cop); void requeue_cancel_copy_ops(bool requeue=true); diff --git a/src/test/librados/misc.cc b/src/test/librados/misc.cc index af17847aeab..24cb431261a 100644 --- a/src/test/librados/misc.cc +++ b/src/test/librados/misc.cc @@ -592,7 +592,7 @@ TEST(LibRadosMisc, CopyPP) { ASSERT_TRUE(x.contents_equal(x2)); // do a big object - bl.append(buffer::create(8000000)); + bl.append(buffer::create(g_conf->osd_copyfrom_max_chunk * 3)); bl.zero(); bl.append("tail"); blc = bl; |