summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGregory Farnum <greg@inktank.com>2013-10-04 15:58:50 -0700
committerGregory Farnum <greg@inktank.com>2013-10-04 15:58:50 -0700
commitc95d0a164fe9406e85be0033ace3ebc91f7b2dc6 (patch)
treea68c334e5a6745c9e21868f384f305cd5cd1714b
parent5258c9c4e5bd61e6b1a1fdbdb781b9dee702fda3 (diff)
parent806725a8b0f3bf79af9bca3c2c6ed8d70655deff (diff)
downloadceph-c95d0a164fe9406e85be0033ace3ebc91f7b2dc6.tar.gz
Merge pull request #676 from ceph/wip-start-copy
Reviewed-by: Samuel Just <sam.just@inktank.com>
-rw-r--r--src/osd/ReplicatedPG.cc151
-rw-r--r--src/osd/ReplicatedPG.h93
2 files changed, 167 insertions, 77 deletions
diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc
index 9df0495271b..d02a9c9cc48 100644
--- a/src/osd/ReplicatedPG.cc
+++ b/src/osd/ReplicatedPG.cc
@@ -3692,7 +3692,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
result = -EINVAL;
goto fail;
}
- if (!ctx->copy_op) {
+ if (!ctx->copy_cb) {
// start
pg_t raw_pg;
get_osdmap()->object_locator_to_pg(src_name, src_oloc, raw_pg);
@@ -3704,13 +3704,18 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
result = -EINVAL;
break;
}
- result = start_copy(ctx, src, src_oloc, src_version);
+ hobject_t temp_target = generate_temp_object();
+ CopyFromCallback *cb = new CopyFromCallback(ctx, temp_target);
+ ctx->copy_cb = cb;
+ result = start_copy(cb, ctx->obc, src, src_oloc, src_version,
+ temp_target);
if (result < 0)
goto fail;
result = -EINPROGRESS;
} else {
// finish
- result = finish_copy(ctx);
+ assert(ctx->copy_cb->get_result() >= 0);
+ result = finish_copyfrom(ctx);
}
}
break;
@@ -4307,11 +4312,12 @@ struct C_Copyfrom : public Context {
}
};
-int ReplicatedPG::start_copy(OpContext *ctx,
- hobject_t src, object_locator_t oloc, version_t version)
+int ReplicatedPG::start_copy(CopyCallback *cb, ObjectContextRef obc,
+ hobject_t src, object_locator_t oloc, version_t version,
+ const hobject_t& temp_dest_oid)
{
- const hobject_t& dest = ctx->obs->oi.soid;
- dout(10) << __func__ << " " << dest << " ctx " << ctx
+ const hobject_t& dest = obc->obs.oi.soid;
+ dout(10) << __func__ << " " << dest
<< " from " << src << " " << oloc << " v" << version
<< dendl;
@@ -4323,19 +4329,18 @@ int ReplicatedPG::start_copy(OpContext *ctx,
cancel_copy(cop);
}
- CopyOpRef cop(new CopyOp(ctx, src, oloc, version));
+ CopyOpRef cop(new CopyOp(cb, obc, src, oloc, version, temp_dest_oid));
copy_ops[dest] = cop;
- ctx->copy_op = cop;
- ++ctx->obc->copyfrom_readside;
+ ++obc->copyfrom_readside;
- _copy_some(ctx, cop);
+ _copy_some(obc, cop);
return 0;
}
-void ReplicatedPG::_copy_some(OpContext *ctx, CopyOpRef cop)
+void ReplicatedPG::_copy_some(ObjectContextRef obc, CopyOpRef cop)
{
- dout(10) << __func__ << " " << ctx << " " << cop << dendl;
+ dout(10) << __func__ << " " << obc << " " << cop << dendl;
ObjectOperation op;
if (cop->version) {
op.assert_version(cop->version);
@@ -4349,7 +4354,7 @@ void ReplicatedPG::_copy_some(OpContext *ctx, CopyOpRef cop)
&cop->data, &cop->omap,
&cop->rval);
- C_Copyfrom *fin = new C_Copyfrom(this, ctx->obs->oi.soid,
+ C_Copyfrom *fin = new C_Copyfrom(this, obc->obs.oi.soid,
get_last_peering_reset());
osd->objecter_lock.Lock();
tid_t tid = osd->objecter->read(cop->src.oid, cop->oloc, op,
@@ -4377,50 +4382,49 @@ void ReplicatedPG::process_copy_chunk(hobject_t oid, tid_t tid, int r)
<< " tid " << cop->objecter_tid << dendl;
return;
}
- OpContext *ctx = cop->ctx;
+ ObjectContextRef obc = cop->obc;
cop->objecter_tid = 0;
- if (r < 0) {
- copy_ops.erase(ctx->obc->obs.oi.soid);
- --ctx->obc->copyfrom_readside;
- kick_object_context_blocked(ctx->obc);
- reply_ctx(ctx, r);
- return;
- }
- assert(cop->rval >= 0);
- if (!cop->cursor.is_complete()) {
- // write out what we have so far
- vector<OSDOp> ops;
- tid_t rep_tid = osd->get_tid();
- osd_reqid_t reqid(osd->get_cluster_msgr_name(), 0, rep_tid);
- OpContext *tctx = new OpContext(OpRequestRef(), reqid, ops, &ctx->obc->obs, ctx->obc->ssc, this);
- tctx->mtime = ceph_clock_now(g_ceph_context);
- RepGather *repop = new_repop(tctx, ctx->obc, rep_tid);
-
- if (cop->temp_cursor.is_initial()) {
- cop->temp_coll = get_temp_coll(&tctx->local_t);
- cop->temp_oid = generate_temp_object();
- repop->ctx->new_temp_oid = cop->temp_oid;
- }
+ CopyResults results;
+ if (r >= 0) {
+ assert(cop->rval >= 0);
+
+ if (!cop->cursor.is_complete()) {
+ // write out what we have so far
+ vector<OSDOp> ops;
+ tid_t rep_tid = osd->get_tid();
+ osd_reqid_t reqid(osd->get_cluster_msgr_name(), 0, rep_tid);
+ OpContext *tctx = new OpContext(OpRequestRef(), reqid, ops, &obc->obs, obc->ssc, this);
+ tctx->mtime = ceph_clock_now(g_ceph_context);
+ RepGather *repop = new_repop(tctx, obc, rep_tid);
+
+ if (cop->temp_cursor.is_initial()) {
+ cop->temp_coll = get_temp_coll(&tctx->local_t);
+ repop->ctx->new_temp_oid = cop->temp_oid;
+ }
- _write_copy_chunk(cop, &tctx->op_t);
+ _write_copy_chunk(cop, &tctx->op_t);
- issue_repop(repop, repop->ctx->mtime);
- eval_repop(repop);
- repop->put();
+ issue_repop(repop, repop->ctx->mtime);
+ eval_repop(repop);
+ repop->put();
- dout(10) << __func__ << " fetching more" << dendl;
- _copy_some(ctx, cop);
- return;
+ dout(10) << __func__ << " fetching more" << dendl;
+ _copy_some(obc, cop);
+ return;
+ } else {
+ _build_finish_copy_transaction(cop, results.get<3>());
+ results.get<1>() = cop->temp_cursor.data_offset;
+ }
}
dout(20) << __func__ << " complete; committing" << dendl;
- execute_ctx(ctx);
+ results.get<0>() = cop->rval;
+ cop->cb->complete(results);
- copy_ops.erase(ctx->obc->obs.oi.soid);
- --ctx->obc->copyfrom_readside;
- ctx->copy_op.reset();
- kick_object_context_blocked(ctx->obc);
+ copy_ops.erase(obc->obs.oi.soid);
+ --obc->copyfrom_readside;
+ kick_object_context_blocked(obc);
}
void ReplicatedPG::_write_copy_chunk(CopyOpRef cop, ObjectStore::Transaction *t)
@@ -4447,16 +4451,12 @@ void ReplicatedPG::_write_copy_chunk(CopyOpRef cop, ObjectStore::Transaction *t)
cop->temp_cursor = cop->cursor;
}
-int ReplicatedPG::finish_copy(OpContext *ctx)
+void ReplicatedPG::_build_finish_copy_transaction(CopyOpRef cop,
+ ObjectStore::Transaction& t)
{
- CopyOpRef cop = ctx->copy_op;
- ObjectState& obs = ctx->new_obs;
- ObjectStore::Transaction& t = ctx->op_t;
+ ObjectState& obs = cop->obc->obs;
- if (!obs.exists) {
- ctx->delta_stats.num_objects++;
- obs.exists = true;
- } else {
+ if (obs.exists) {
t.remove(coll, obs.oi.soid);
}
@@ -4470,18 +4470,34 @@ int ReplicatedPG::finish_copy(OpContext *ctx)
_write_copy_chunk(cop, &t);
t.collection_move_rename(cop->temp_coll, cop->temp_oid, coll, obs.oi.soid);
pgbackend->clear_temp_obj(cop->temp_oid);
- ctx->discard_temp_oid = cop->temp_oid;
}
+}
+
+int ReplicatedPG::finish_copyfrom(OpContext *ctx)
+{
+ dout(20) << "finish_copyfrom on " << ctx->obs->oi.soid << dendl;
+ ObjectState& obs = ctx->new_obs;
+ CopyFromCallback *cb = static_cast<CopyFromCallback*>(ctx->copy_cb);
+
+ if (!ctx->obs->exists) {
+ ctx->delta_stats.num_objects++;
+ obs.exists = true;
+ }
+ if (cb->is_temp_obj_used()) {
+ ctx->discard_temp_oid = cb->temp_obj;
+ }
+ ctx->op_t.swap(cb->results.get<3>());
+ ctx->op_t.append(cb->results.get<3>());
interval_set<uint64_t> ch;
if (obs.oi.size > 0)
ch.insert(0, obs.oi.size);
ctx->modified_ranges.union_of(ch);
- if (cop->cursor.data_offset != obs.oi.size) {
+ if (cb->get_data_size() != obs.oi.size) {
ctx->delta_stats.num_bytes -= obs.oi.size;
+ obs.oi.size = cb->get_data_size();
ctx->delta_stats.num_bytes += obs.oi.size;
- obs.oi.size = cop->cursor.data_offset;
}
ctx->delta_stats.num_wr++;
ctx->delta_stats.num_wr_kb += SHIFT_ROUND_UP(obs.oi.size, 10);
@@ -4491,8 +4507,7 @@ int ReplicatedPG::finish_copy(OpContext *ctx)
void ReplicatedPG::cancel_copy(CopyOpRef cop)
{
- OpContext *ctx = cop->ctx;
- dout(10) << __func__ << " " << ctx->obc->obs.oi.soid << " ctx " << ctx
+ dout(10) << __func__ << " " << cop->obc->obs.oi.soid
<< " from " << cop->src << " " << cop->oloc << " v" << cop->version
<< dendl;
@@ -4502,13 +4517,13 @@ void ReplicatedPG::cancel_copy(CopyOpRef cop)
osd->objecter->op_cancel(cop->objecter_tid);
}
- copy_ops.erase(ctx->obc->obs.oi.soid);
- --ctx->obc->copyfrom_readside;
- ctx->copy_op.reset();
-
- kick_object_context_blocked(ctx->obc);
+ copy_ops.erase(cop->obc->obs.oi.soid);
+ --cop->obc->copyfrom_readside;
- delete ctx;
+ kick_object_context_blocked(cop->obc);
+ bool temp_obj_created = !cop->cursor.is_initial();
+ CopyResults result(-ECANCELED, 0, temp_obj_created, ObjectStore::Transaction());
+ cop->cb->complete(result);
}
void ReplicatedPG::cancel_copy_ops()
diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h
index 5abfc4cea56..c277c0d3f86 100644
--- a/src/osd/ReplicatedPG.h
+++ b/src/osd/ReplicatedPG.h
@@ -18,6 +18,7 @@
#define CEPH_REPLICATEDPG_H
#include <boost/optional.hpp>
+#include <boost/tuple/tuple.hpp>
#include "include/assert.h"
#include "common/cmdparse.h"
@@ -93,9 +94,11 @@ public:
* state associated with a copy operation
*/
struct OpContext;
+ class CopyCallback;
struct CopyOp {
- OpContext *ctx;
+ CopyCallback *cb;
+ ObjectContextRef obc;
hobject_t src;
object_locator_t oloc;
version_t version;
@@ -114,15 +117,82 @@ public:
hobject_t temp_oid;
object_copy_cursor_t temp_cursor;
- CopyOp(OpContext *c, hobject_t s, object_locator_t l, version_t v)
- : ctx(c), src(s), oloc(l), version(v),
+ CopyOp(CopyCallback *cb_, ObjectContextRef _obc, hobject_t s, object_locator_t l,
+ version_t v, const hobject_t& dest)
+ : cb(cb_), obc(_obc), src(s), oloc(l), version(v),
objecter_tid(0),
size(0),
- rval(-1)
+ rval(-1),
+ temp_oid(dest)
{}
};
typedef boost::shared_ptr<CopyOp> CopyOpRef;
+ /**
+ * The CopyCallback class defines an interface for completions to the
+ * copy_start code. Users of the copy infrastructure must implement
+ * one and give an instance of the class to start_copy.
+ *
+ * The implementer is responsible for making sure that the CopyCallback
+ * can associate itself with the correct copy operation. The presence
+ * of the closing Transaction ensures that write operations can be performed
+ * atomically with the copy being completed (which doing them in separate
+ * transactions would not allow); if you are doing the copy for a read
+ * op you will have to generate a separate op to finish the copy with.
+ */
+ /// return code, total object size, data in temp object?, final Transaction
+ typedef boost::tuple<int, size_t, bool, ObjectStore::Transaction> CopyResults;
+ class CopyCallback : public GenContext<CopyResults&> {
+ protected:
+ CopyCallback() {}
+ /**
+ * results.get<0>() is the return code: 0 for success; -ECANCELLED if
+ * the operation was cancelled by the local OSD; -errno for other issues.
+ * results.get<1>() is the total size of the object (for updating pg stats)
+ * results.get<2>() indicates whether we have already written data to
+ * the temp object (so it needs to get cleaned up, if the return code
+ * indicates a failure)
+ * results.get<3>() is a Transaction; if non-empty you need to perform
+ * its results before any other accesses to the object in order to
+ * complete the copy.
+ */
+ virtual void finish(CopyResults& results_) = 0;
+
+ public:
+ /// Provide the final size of the copied object to the CopyCallback
+ virtual ~CopyCallback() {};
+ };
+
+ class CopyFromCallback: public CopyCallback {
+ public:
+ CopyResults results;
+ OpContext *ctx;
+ hobject_t temp_obj;
+ CopyFromCallback(OpContext *ctx_, const hobject_t& temp_obj_) :
+ ctx(ctx_), temp_obj(temp_obj_) {}
+ ~CopyFromCallback() {}
+
+ virtual void finish(CopyResults& results_) {
+ results = results_;
+ int r = results.get<0>();
+ if (r >= 0) {
+ ctx->pg->execute_ctx(ctx);
+ }
+ ctx->copy_cb = NULL;
+ if (r < 0) {
+ if (r != -ECANCELED) { // on cancel just toss it out; client resends
+ ctx->pg->osd->reply_op_error(ctx->op, r);
+ }
+ delete ctx;
+ }
+ }
+
+ bool is_temp_obj_used() { return results.get<2>(); }
+ uint64_t get_data_size() { return results.get<1>(); }
+ int get_result() { return results.get<0>(); }
+ };
+ friend class CopyFromCallback;
+
boost::scoped_ptr<PGBackend> pgbackend;
PGBackend *get_pgbackend() {
return pgbackend.get();
@@ -300,7 +370,7 @@ public:
int num_read; ///< count read ops
int num_write; ///< count update ops
- CopyOpRef copy_op;
+ CopyFromCallback *copy_cb;
hobject_t new_temp_oid, discard_temp_oid; ///< temp objects we should start/stop tracking
@@ -317,7 +387,8 @@ public:
current_osd_subop_num(0),
data_off(0), reply(NULL), pg(_pg),
num_read(0),
- num_write(0) {
+ num_write(0),
+ copy_cb(NULL) {
if (_ssc) {
new_snapset = _ssc->snapset;
snapset = &_ssc->snapset;
@@ -737,11 +808,15 @@ protected:
// -- copyfrom --
map<hobject_t, CopyOpRef> copy_ops;
- int start_copy(OpContext *ctx, hobject_t src, object_locator_t oloc, version_t version);
+ int start_copy(CopyCallback *cb, ObjectContextRef obc, hobject_t src,
+ object_locator_t oloc, version_t version,
+ const hobject_t& temp_dest_oid);
void process_copy_chunk(hobject_t oid, tid_t tid, int r);
void _write_copy_chunk(CopyOpRef cop, ObjectStore::Transaction *t);
- void _copy_some(OpContext *ctx, CopyOpRef cop);
- int finish_copy(OpContext *ctx);
+ void _copy_some(ObjectContextRef obc, CopyOpRef cop);
+ void _build_finish_copy_transaction(CopyOpRef cop,
+ ObjectStore::Transaction& t);
+ int finish_copyfrom(OpContext *ctx);
void cancel_copy(CopyOpRef cop);
void cancel_copy_ops();