summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGreg Farnum <greg@inktank.com>2013-10-01 16:41:22 -0700
committerGreg Farnum <greg@inktank.com>2013-10-01 21:52:11 -0700
commita96b12f03a5bf3189cf758379238c4c57202c4a7 (patch)
treeadcb7bc974f8a7fc7eb112f635de4ee1567e83f2
parentda1b9b6c107d34392d24812da1501dd99cc483bf (diff)
downloadceph-a96b12f03a5bf3189cf758379238c4c57202c4a7.tar.gz
ReplicatedPG: copy: use CopyCallback instead of CopyOp in OpContext
In order to make this happen, we make the switch to generate the complete transaction in the generic copy code and save it into the Callback. Then in finish_copy() we just take that transaction and prepend it to the existing transaction. With that change, and by making use of the existing CopyCallback data, we no longer need to access the CopyOp from the OpContext, so we can remove it. Hurray, the pipelines are now independent! Signed-off-by: Greg Farnum <greg@inktank.com>
-rw-r--r--src/osd/ReplicatedPG.cc36
-rw-r--r--src/osd/ReplicatedPG.h11
2 files changed, 26 insertions, 21 deletions
diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc
index 8d8dde1b365..35e28c18361 100644
--- a/src/osd/ReplicatedPG.cc
+++ b/src/osd/ReplicatedPG.cc
@@ -3752,7 +3752,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
result = -EINVAL;
goto fail;
}
- if (!ctx->copy_op) {
+ if (!ctx->copy_cb) {
// start
pg_t raw_pg;
get_osdmap()->object_locator_to_pg(src_name, src_oloc, raw_pg);
@@ -3766,15 +3766,16 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
}
hobject_t temp_target = generate_temp_object();
CopyFromCallback *cb = new CopyFromCallback(ctx, temp_target);
- result = start_copy(ctx, cb, ctx->obc, src, src_oloc, src_version,
+ ctx->copy_cb = cb;
+ result = start_copy(cb, ctx->obc, src, src_oloc, src_version,
temp_target);
if (result < 0)
goto fail;
result = -EINPROGRESS;
} else {
// finish
- result = ctx->copy_op->rval;
- if (ctx->copy_op->rval >= 0) { //success!
+ result = ctx->copy_cb->get_result();
+ if (result >= 0) { //success!
result = finish_copy(ctx);
}
}
@@ -4382,12 +4383,12 @@ struct C_Copyfrom : public Context {
}
};
-int ReplicatedPG::start_copy(OpContext *ctx, CopyCallback *cb, ObjectContextRef obc,
+int ReplicatedPG::start_copy(CopyCallback *cb, ObjectContextRef obc,
hobject_t src, object_locator_t oloc, version_t version,
const hobject_t& temp_dest_oid)
{
- const hobject_t& dest = ctx->obs->oi.soid;
- dout(10) << __func__ << " " << dest << " ctx " << ctx
+ const hobject_t& dest = obc->obs.oi.soid;
+ dout(10) << __func__ << " " << dest
<< " from " << src << " " << oloc << " v" << version
<< dendl;
@@ -4401,7 +4402,6 @@ int ReplicatedPG::start_copy(OpContext *ctx, CopyCallback *cb, ObjectContextRef
CopyOpRef cop(new CopyOp(cb, obc, src, oloc, version, temp_dest_oid));
copy_ops[dest] = cop;
- ctx->copy_op = cop;
++obc->copyfrom_readside;
_copy_some(obc, cop);
@@ -4482,6 +4482,11 @@ void ReplicatedPG::process_copy_chunk(hobject_t oid, tid_t tid, int r)
dout(10) << __func__ << " fetching more" << dendl;
_copy_some(obc, cop);
return;
+ } else {
+ ObjectStore::Transaction t;
+ _build_finish_copy_transaction(cop, t);
+ cop->cb->copy_complete_ops(t);
+ cop->cb->set_data_size(cop->temp_cursor.data_offset);
}
}
@@ -4541,28 +4546,27 @@ void ReplicatedPG::_build_finish_copy_transaction(CopyOpRef cop,
int ReplicatedPG::finish_copy(OpContext *ctx)
{
- CopyOpRef cop = ctx->copy_op;
ObjectState& obs = ctx->new_obs;
- ObjectStore::Transaction& t = ctx->op_t;
+ CopyFromCallback *cb = static_cast<CopyFromCallback*>(ctx->copy_cb);
if (!ctx->obs->exists) {
ctx->delta_stats.num_objects++;
obs.exists = true;
}
- if (cop->temp_cursor.is_initial()) {
- ctx->discard_temp_oid = cop->temp_oid;
+ if (cb->is_temp_obj_used()) {
+ ctx->discard_temp_oid = cb->temp_obj;
}
-
- _build_finish_copy_transaction(cop, t);
+ ctx->op_t.swap(cb->final_tx);
+ ctx->op_t.append(cb->final_tx);
interval_set<uint64_t> ch;
if (obs.oi.size > 0)
ch.insert(0, obs.oi.size);
ctx->modified_ranges.union_of(ch);
- if (cop->cursor.data_offset != obs.oi.size) {
+ if (cb->get_data_size() != obs.oi.size) {
ctx->delta_stats.num_bytes -= obs.oi.size;
- obs.oi.size = cop->cursor.data_offset;
+ obs.oi.size = cb->get_data_size();
ctx->delta_stats.num_bytes += obs.oi.size;
}
ctx->delta_stats.num_wr++;
diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h
index 00611104555..d5ef9b26ca3 100644
--- a/src/osd/ReplicatedPG.h
+++ b/src/osd/ReplicatedPG.h
@@ -182,7 +182,6 @@ public:
if (r >= 0) {
ctx->pg->execute_ctx(ctx);
}
- ctx->copy_op.reset();
ctx->copy_cb = NULL;
if (r < 0) {
if (r == -ECANCELED) { // toss it out; client resends
@@ -195,9 +194,10 @@ public:
public:
OpContext *ctx;
hobject_t temp_obj;
+ ObjectStore::Transaction final_tx;
CopyFromCallback(OpContext *ctx_, const hobject_t& temp_obj_) :
ctx(ctx_), temp_obj(temp_obj_) {}
- void copy_complete_ops(ObjectStore::Transaction& t) {}
+ void copy_complete_ops(ObjectStore::Transaction& t) { final_tx.swap(t); }
~CopyFromCallback() {}
};
friend class CopyFromCallback;
@@ -375,7 +375,7 @@ public:
int num_read; ///< count read ops
int num_write; ///< count update ops
- CopyOpRef copy_op;
+ CopyCallback *copy_cb;
hobject_t new_temp_oid, discard_temp_oid; ///< temp objects we should start/stop tracking
@@ -392,7 +392,8 @@ public:
current_osd_subop_num(0),
data_off(0), reply(NULL), pg(_pg),
num_read(0),
- num_write(0) {
+ num_write(0),
+ copy_cb(NULL) {
if (_ssc) {
new_snapset = _ssc->snapset;
snapset = &_ssc->snapset;
@@ -801,7 +802,7 @@ protected:
// -- copyfrom --
map<hobject_t, CopyOpRef> copy_ops;
- int start_copy(OpContext *ctx, CopyCallback *cb, ObjectContextRef obc, hobject_t src,
+ int start_copy(CopyCallback *cb, ObjectContextRef obc, hobject_t src,
object_locator_t oloc, version_t version,
const hobject_t& temp_dest_oid);
void process_copy_chunk(hobject_t oid, tid_t tid, int r);