summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSage Weil <sage@inktank.com>2013-09-04 17:09:52 -0700
committerSage Weil <sage@inktank.com>2013-09-17 11:06:27 -0700
commit4e29e362e7981634d751ee982144fbf602782a9a (patch)
tree46d3bca55beff4dcf5bae6080cffc9b655c21b9e
parent25a608cdccaf667f465f44c9776ca0f8a132cece (diff)
downloadceph-4e29e362e7981634d751ee982144fbf602782a9a.tar.gz
osd/ReplicatedPG: stage object chunks to replicas during COPY_FROM
As we get each chunk of data during the COPY_FROM operation, write it out to a temporary object on the replicas. When we get all the pieces, move it into place. Signed-off-by: Sage Weil <sage@inktank.com>
-rw-r--r--src/osd/ReplicatedPG.cc76
-rw-r--r--src/osd/ReplicatedPG.h5
-rw-r--r--src/test/librados/misc.cc2
3 files changed, 73 insertions, 10 deletions
diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc
index ae1c38e4aef..2318aba6f7a 100644
--- a/src/osd/ReplicatedPG.cc
+++ b/src/osd/ReplicatedPG.cc
@@ -3435,7 +3435,11 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
}
::encode(out_omap, osd_op.outdata);
- dout(20) << " cursor.is_complete=" << cursor.is_complete() << dendl;
+ dout(20) << " cursor.is_complete=" << cursor.is_complete()
+ << " " << out_attrs.size() << " attrs"
+ << " " << bl.length() << " bytes"
+ << " " << out_omap.size() << " keys"
+ << dendl;
::encode(cursor, osd_op.outdata);
result = 0;
}
@@ -3477,20 +3481,29 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
} else {
t.remove(coll, soid);
}
- t.write(coll, soid, 0, cop->data.length(), cop->data);
- for (map<string,bufferlist>::iterator p = cop->attrs.begin(); p != cop->attrs.end(); ++p)
- t.setattr(coll, soid, string("_") + p->first, p->second);
- t.omap_setkeys(coll, soid, cop->omap);
+
+ if (cop->temp_cursor.is_initial()) {
+ // write directly to final object
+ cop->temp_coll = coll;
+ cop->temp_oid = soid;
+ _write_copy_chunk(cop, &t);
+ } else {
+ // finish writing to temp object, then move into place
+ _write_copy_chunk(cop, &t);
+ t.collection_move_rename(cop->temp_coll, cop->temp_oid, coll, soid);
+ temp_contents.erase(cop->temp_oid);
+ ctx->old_temp_oid = cop->temp_oid;
+ }
interval_set<uint64_t> ch;
if (oi.size > 0)
ch.insert(0, oi.size);
ctx->modified_ranges.union_of(ch);
- if (cop->data.length() != oi.size) {
+ if (cop->cursor.data_offset != oi.size) {
ctx->delta_stats.num_bytes -= oi.size;
- oi.size = cop->data.length();
ctx->delta_stats.num_bytes += oi.size;
+ oi.size = cop->cursor.data_offset;
}
ctx->delta_stats.num_wr++;
ctx->delta_stats.num_wr_kb += SHIFT_ROUND_UP(cop->data.length(), 10);
@@ -4182,8 +4195,27 @@ void ReplicatedPG::process_copy_chunk(hobject_t oid, tid_t tid, int r)
}
assert(cop->rval >= 0);
- // FIXME: this is accumulating the entire object in memory.
if (!cop->cursor.is_complete()) {
+ // write out what we have so far
+ vector<OSDOp> ops;
+ tid_t rep_tid = osd->get_tid();
+ osd_reqid_t reqid(osd->get_cluster_msgr_name(), 0, rep_tid);
+ OpContext *tctx = new OpContext(OpRequestRef(), reqid, ops, &ctx->obc->obs, ctx->obc->ssc, this);
+ tctx->mtime = ceph_clock_now(g_ceph_context);
+ RepGather *repop = new_repop(tctx, ctx->obc, rep_tid);
+
+ if (cop->temp_cursor.is_initial()) {
+ cop->temp_coll = get_temp_coll(&tctx->local_t);
+ cop->temp_oid = generate_temp_object();
+ temp_contents.insert(cop->temp_oid);
+ repop->ctx->new_temp_oid = cop->temp_oid;
+ }
+
+ _write_copy_chunk(cop, &tctx->op_t);
+
+ issue_repop(repop, repop->ctx->mtime);
+ eval_repop(repop);
+
dout(10) << __func__ << " fetching more" << dendl;
_copy_some(ctx, cop);
return;
@@ -4197,6 +4229,30 @@ void ReplicatedPG::process_copy_chunk(hobject_t oid, tid_t tid, int r)
ctx->copy_op.reset();
}
+void ReplicatedPG::_write_copy_chunk(CopyOpRef cop, ObjectStore::Transaction *t)
+{
+ dout(20) << __func__ << " " << cop
+ << " " << cop->attrs.size() << " attrs"
+ << " " << cop->data.length() << " bytes"
+ << " " << cop->omap.size() << " keys"
+ << dendl;
+ if (!cop->temp_cursor.attr_complete) {
+ t->touch(cop->temp_coll, cop->temp_oid);
+ for (map<string,bufferlist>::iterator p = cop->attrs.begin(); p != cop->attrs.end(); ++p)
+ t->setattr(cop->temp_coll, cop->temp_oid, string("_") + p->first, p->second);
+ cop->attrs.clear();
+ }
+ if (!cop->temp_cursor.data_complete) {
+ t->write(cop->temp_coll, cop->temp_oid, cop->temp_cursor.data_offset, cop->data.length(), cop->data);
+ cop->data.clear();
+ }
+ if (!cop->temp_cursor.omap_complete) {
+ t->omap_setkeys(cop->temp_coll, cop->temp_oid, cop->omap);
+ cop->omap.clear();
+ }
+ cop->temp_cursor = cop->cursor;
+}
+
void ReplicatedPG::cancel_copy(CopyOpRef cop)
{
OpContext *ctx = cop->ctx;
@@ -4565,7 +4621,8 @@ void ReplicatedPG::issue_repop(RepGather *repop, utime_t now)
}
// ship resulting transaction, log entries, and pg_stats
- if (peer == backfill_target && soid >= backfill_pos) {
+ if (peer == backfill_target && soid >= backfill_pos &&
+ soid.pool == (int64_t)info.pgid.pool()) { // only skip normal (not temp pool=-1) objects
dout(10) << "issue_repop shipping empty opt to osd." << peer << ", object beyond backfill_pos "
<< backfill_pos << ", last_backfill is " << pinfo.last_backfill << dendl;
ObjectStore::Transaction t;
@@ -4573,6 +4630,7 @@ void ReplicatedPG::issue_repop(RepGather *repop, utime_t now)
} else {
::encode(repop->ctx->op_t, wr->get_data());
}
+
::encode(repop->ctx->log, wr->logbl);
if (backfill_target >= 0 && backfill_target == peer)
diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h
index f80be1b9391..ab874351274 100644
--- a/src/osd/ReplicatedPG.h
+++ b/src/osd/ReplicatedPG.h
@@ -109,6 +109,10 @@ public:
map<string,bufferlist> omap;
int rval;
+ coll_t temp_coll;
+ hobject_t temp_oid;
+ object_copy_cursor_t temp_cursor;
+
CopyOp(OpContext *c, hobject_t s, object_locator_t l, version_t v)
: ctx(c), src(s), oloc(l), version(v),
objecter_tid(0),
@@ -788,6 +792,7 @@ protected:
int start_copy(OpContext *ctx, hobject_t src, object_locator_t oloc, version_t version,
CopyOpRef *pcop);
void process_copy_chunk(hobject_t oid, tid_t tid, int r);
+ void _write_copy_chunk(CopyOpRef cop, ObjectStore::Transaction *t);
void _copy_some(OpContext *ctx, CopyOpRef cop);
void cancel_copy(CopyOpRef cop);
void requeue_cancel_copy_ops(bool requeue=true);
diff --git a/src/test/librados/misc.cc b/src/test/librados/misc.cc
index af17847aeab..24cb431261a 100644
--- a/src/test/librados/misc.cc
+++ b/src/test/librados/misc.cc
@@ -592,7 +592,7 @@ TEST(LibRadosMisc, CopyPP) {
ASSERT_TRUE(x.contents_equal(x2));
// do a big object
- bl.append(buffer::create(8000000));
+ bl.append(buffer::create(g_conf->osd_copyfrom_max_chunk * 3));
bl.zero();
bl.append("tail");
blc = bl;