summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSage Weil <sage@inktank.com>2013-09-03 16:00:28 -0700
committerSage Weil <sage@inktank.com>2013-09-03 16:00:28 -0700
commit2b2f296ed84169cd872d2ced3714f9a180994903 (patch)
tree8547a130e27169a79976685bdbc627379d6f5611
parentdcbdeaf6d19848e4d93861df6dbdcab0e24fff57 (diff)
parent996af2d8fd8e60bcdce8e9408249b974521de24e (diff)
downloadceph-2b2f296ed84169cd872d2ced3714f9a180994903.tar.gz
Merge branch 'wip-copyfrom'
Reviewed-by: Samuel Just <sam.just@inktank.com>
-rw-r--r--src/common/config_opts.h1
-rw-r--r--src/include/ceph_strings.cc1
-rw-r--r--src/include/rados.h5
-rw-r--r--src/include/rados/librados.hpp14
-rw-r--r--src/librados/librados.cc8
-rw-r--r--src/osd/ReplicatedPG.cc219
-rw-r--r--src/osd/ReplicatedPG.h48
-rw-r--r--src/osd/osd_types.cc2
-rw-r--r--src/osd/osd_types.h6
-rw-r--r--src/osdc/Objecter.h8
-rw-r--r--src/test/librados/misc.cc50
-rw-r--r--src/test/osd/RadosModel.h94
-rw-r--r--src/test/osd/TestRados.cc10
13 files changed, 460 insertions, 6 deletions
diff --git a/src/common/config_opts.h b/src/common/config_opts.h
index 328f7f4b94d..2fa72d4ce0f 100644
--- a/src/common/config_opts.h
+++ b/src/common/config_opts.h
@@ -444,6 +444,7 @@ OPTION(osd_recovery_delay_start, OPT_FLOAT, 0)
OPTION(osd_recovery_max_active, OPT_INT, 15)
OPTION(osd_recovery_max_single_start, OPT_INT, 5)
OPTION(osd_recovery_max_chunk, OPT_U64, 8<<20) // max size of push chunk
+OPTION(osd_copyfrom_max_chunk, OPT_U64, 8<<20) // max size of a COPYFROM chunk
OPTION(osd_push_per_object_cost, OPT_U64, 1000) // push cost per object
OPTION(osd_max_push_cost, OPT_U64, 8<<20) // max size of push message
OPTION(osd_max_push_objects, OPT_U64, 10) // max objects in single push op
diff --git a/src/include/ceph_strings.cc b/src/include/ceph_strings.cc
index f14f29ce0e9..e86aae4fd50 100644
--- a/src/include/ceph_strings.cc
+++ b/src/include/ceph_strings.cc
@@ -49,6 +49,7 @@ const char *ceph_osd_op_name(int op)
case CEPH_OSD_OP_WATCH: return "watch";
case CEPH_OSD_OP_COPY_GET: return "copy-get";
+ case CEPH_OSD_OP_COPY_FROM: return "copy-from";
case CEPH_OSD_OP_CLONERANGE: return "clonerange";
case CEPH_OSD_OP_ASSERT_SRC_VERSION: return "assert-src-version";
diff --git a/src/include/rados.h b/src/include/rados.h
index 27291a7440e..178c171c445 100644
--- a/src/include/rados.h
+++ b/src/include/rados.h
@@ -217,6 +217,7 @@ enum {
CEPH_OSD_OP_OMAPRMKEYS = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 24,
CEPH_OSD_OP_OMAP_CMP = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 25,
+ CEPH_OSD_OP_COPY_FROM = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 26,
CEPH_OSD_OP_COPY_GET = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 27,
/** multi **/
@@ -410,6 +411,10 @@ struct ceph_osd_op {
struct {
__le64 max; /* max data in reply */
} __attribute__ ((packed)) copy_get;
+ struct {
+ __le64 snapid;
+ __le64 src_version;
+ } __attribute__ ((packed)) copy_from;
};
__le32 payload_len;
} __attribute__ ((packed));
diff --git a/src/include/rados/librados.hpp b/src/include/rados/librados.hpp
index bc0bcc95ceb..5a750cbc0d1 100644
--- a/src/include/rados/librados.hpp
+++ b/src/include/rados/librados.hpp
@@ -265,6 +265,19 @@ namespace librados
*/
void omap_rm_keys(const std::set<std::string> &to_rm);
+ /**
+ * Copy an object
+ *
+ * Copies an object from another location. The operation is atomic in that
+ * the copy either succeeds in its entirety or fails (e.g., because the
+ * source object was modified while the copy was in progress).
+ *
+ * @param src source object name
+ * @param src_ioctx ioctx for the source object
+ * @param version current version of the source object
+ */
+ void copy_from(const std::string& src, const IoCtx& src_ioctx, uint64_t src_version);
+
friend class IoCtx;
};
@@ -674,6 +687,7 @@ namespace librados
IoCtx(IoCtxImpl *io_ctx_impl_);
friend class Rados; // Only Rados can use our private constructor to create IoCtxes.
+ friend class ObjectWriteOperation; // copy_from needs to see our IoCtxImpl
IoCtxImpl *io_ctx_impl;
};
diff --git a/src/librados/librados.cc b/src/librados/librados.cc
index 12372d960b1..852228ed383 100644
--- a/src/librados/librados.cc
+++ b/src/librados/librados.cc
@@ -382,6 +382,14 @@ void librados::ObjectWriteOperation::omap_rm_keys(
o->omap_rm_keys(to_rm);
}
+void librados::ObjectWriteOperation::copy_from(const std::string& src,
+ const IoCtx& src_ioctx,
+ uint64_t src_version)
+{
+ ::ObjectOperation *o = (::ObjectOperation *)impl;
+ o->copy_from(object_t(src), src_ioctx.io_ctx_impl->snap_seq, src_ioctx.io_ctx_impl->oloc, src_version);
+}
+
void librados::ObjectWriteOperation::tmap_put(const bufferlist &bl)
{
::ObjectOperation *o = (::ObjectOperation *)impl;
diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc
index 0027edda077..2c96180b13a 100644
--- a/src/osd/ReplicatedPG.cc
+++ b/src/osd/ReplicatedPG.cc
@@ -1000,6 +1000,11 @@ void ReplicatedPG::execute_ctx(OpContext *ctx)
p->second->ondisk_read_unlock();
}
+ if (result == -EINPROGRESS) {
+ // come back later.
+ return;
+ }
+
if (result == -EAGAIN) {
// clean up after the ctx
delete ctx;
@@ -3386,6 +3391,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
if (result < 0)
break;
cursor.attr_complete = true;
+ dout(20) << " got attrs" << dendl;
}
::encode(out_attrs, osd_op.outdata);
@@ -3395,15 +3401,17 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
bufferlist bl;
if (left > 0 && !cursor.data_complete) {
if (cursor.data_offset < oi.size) {
- result = osd->store->read(coll, oi.soid, cursor.data_offset, out_max, bl);
+ result = osd->store->read(coll, oi.soid, cursor.data_offset, left, bl);
if (result < 0)
return result;
assert(result <= left);
left -= result;
cursor.data_offset += result;
}
- if (cursor.data_offset == oi.size)
+ if (cursor.data_offset == oi.size) {
cursor.data_complete = true;
+ dout(20) << " got data" << dendl;
+ }
}
::encode(bl, osd_op.outdata);
@@ -3423,15 +3431,73 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
cursor.omap_offset = iter->key();
} else {
cursor.omap_complete = true;
+ dout(20) << " got omap" << dendl;
}
}
::encode(out_omap, osd_op.outdata);
+ dout(20) << " cursor.is_complete=" << cursor.is_complete() << dendl;
::encode(cursor, osd_op.outdata);
result = 0;
}
break;
+ case CEPH_OSD_OP_COPY_FROM:
+ ++ctx->num_write;
+ {
+ object_t src_name;
+ object_locator_t src_oloc;
+ snapid_t src_snapid = (uint64_t)op.copy_from.snapid;
+ version_t src_version = op.copy_from.src_version;
+ try {
+ ::decode(src_name, bp);
+ ::decode(src_oloc, bp);
+ }
+ catch (buffer::error& e) {
+ result = -EINVAL;
+ goto fail;
+ }
+ pg_t raw_pg;
+ get_osdmap()->object_locator_to_pg(src_name, src_oloc, raw_pg);
+ hobject_t src(src_name, src_oloc.key, src_snapid,
+ raw_pg.ps(), raw_pg.pool(),
+ src_oloc.nspace);
+ if (!ctx->copy_op) {
+ // start
+ result = start_copy(ctx, src, src_oloc, src_version, &ctx->copy_op);
+ if (result < 0)
+ goto fail;
+ result = -EINPROGRESS;
+ } else {
+ // finish
+ CopyOpRef cop = ctx->copy_op;
+
+ if (!obs.exists) {
+ ctx->delta_stats.num_objects++;
+ obs.exists = true;
+ } else {
+ t.remove(coll, soid);
+ }
+ t.write(coll, soid, 0, cop->data.length(), cop->data);
+ for (map<string,bufferlist>::iterator p = cop->attrs.begin(); p != cop->attrs.end(); ++p)
+ t.setattr(coll, soid, string("_") + p->first, p->second);
+ t.omap_setkeys(coll, soid, cop->omap);
+
+ interval_set<uint64_t> ch;
+ if (oi.size > 0)
+ ch.insert(0, oi.size);
+ ctx->modified_ranges.union_of(ch);
+
+ if (cop->data.length() != oi.size) {
+ ctx->delta_stats.num_bytes -= oi.size;
+ oi.size = cop->data.length();
+ ctx->delta_stats.num_bytes += oi.size;
+ }
+ ctx->delta_stats.num_wr++;
+ ctx->delta_stats.num_wr_kb += SHIFT_ROUND_UP(cop->data.length(), 10);
+ }
+ }
+ break;
default:
dout(1) << "unrecognized osd op " << op.op
@@ -4013,6 +4079,152 @@ int ReplicatedPG::prepare_transaction(OpContext *ctx)
return result;
}
+// ========================================================================
+// copyfrom
+
+struct C_Copyfrom : public Context {
+ ReplicatedPGRef pg;
+ hobject_t oid;
+ epoch_t last_peering_reset;
+ tid_t tid;
+ C_Copyfrom(ReplicatedPG *p, hobject_t o, epoch_t lpr)
+ : pg(p), oid(o), last_peering_reset(lpr),
+ tid(0)
+ {}
+ void finish(int r) {
+ pg->lock();
+ if (last_peering_reset == pg->get_last_peering_reset()) {
+ pg->process_copy_chunk(oid, tid, r);
+ }
+ pg->unlock();
+ }
+};
+
+int ReplicatedPG::start_copy(OpContext *ctx,
+ hobject_t src, object_locator_t oloc, version_t version,
+ CopyOpRef *pcop)
+{
+ const hobject_t& dest = ctx->obs->oi.soid;
+ dout(10) << __func__ << " " << dest << " ctx " << ctx
+ << " from " << src << " " << oloc << " v" << version
+ << dendl;
+
+ // cancel a previous in-progress copy?
+ if (copy_ops.count(dest)) {
+ // FIXME: if the src etc match, we could avoid restarting from the
+ // beginning.
+ CopyOpRef cop = copy_ops[dest];
+ cancel_copy(cop);
+ }
+
+ CopyOpRef cop(new CopyOp(ctx, src, oloc, version));
+ copy_ops[dest] = cop;
+ ctx->copy_op = cop;
+ ++ctx->obc->copyfrom_readside;
+
+ _copy_some(ctx, cop);
+
+ return 0;
+}
+
+void ReplicatedPG::_copy_some(OpContext *ctx, CopyOpRef cop)
+{
+ dout(10) << __func__ << " " << ctx << " " << cop << dendl;
+ ObjectOperation op;
+ op.assert_version(cop->version);
+ op.copy_get(&cop->cursor, g_conf->osd_copyfrom_max_chunk,
+ &cop->size, &cop->mtime, &cop->attrs,
+ &cop->data, &cop->omap,
+ &cop->rval);
+
+ C_Copyfrom *fin = new C_Copyfrom(this, ctx->obs->oi.soid,
+ get_last_peering_reset());
+ osd->objecter_lock.Lock();
+ tid_t tid = osd->objecter->read(cop->src.oid, cop->oloc, op,
+ cop->src.snap, NULL, 0,
+ new C_OnFinisher(fin,
+ &osd->objecter_finisher),
+ NULL);
+ fin->tid = tid;
+ cop->objecter_tid = tid;
+ osd->objecter_lock.Unlock();
+}
+
+void ReplicatedPG::process_copy_chunk(hobject_t oid, tid_t tid, int r)
+{
+ dout(10) << __func__ << " tid " << tid << " " << cpp_strerror(r) << dendl;
+ map<hobject_t,CopyOpRef>::iterator p = copy_ops.find(oid);
+ if (p == copy_ops.end()) {
+ dout(10) << __func__ << " no copy_op found" << dendl;
+ return;
+ }
+ CopyOpRef cop = p->second;
+ if (tid != cop->objecter_tid) {
+ dout(10) << __func__ << " tid " << tid << " != cop " << cop
+ << " tid " << cop->objecter_tid << dendl;
+ return;
+ }
+ OpContext *ctx = cop->ctx;
+ cop->objecter_tid = 0;
+ if (r < 0) {
+ copy_ops.erase(ctx->obc->obs.oi.soid);
+ --ctx->obc->copyfrom_readside;
+ reply_ctx(ctx, r);
+ return;
+ }
+ assert(cop->rval >= 0);
+
+ // FIXME: this is accumulating the entire object in memory.
+
+ if (!cop->cursor.is_complete()) {
+ dout(10) << __func__ << " fetching more" << dendl;
+ _copy_some(ctx, cop);
+ return;
+ }
+
+ dout(20) << __func__ << " complete; committing" << dendl;
+ execute_ctx(ctx);
+
+ copy_ops.erase(ctx->obc->obs.oi.soid);
+ --ctx->obc->copyfrom_readside;
+ ctx->copy_op.reset();
+}
+
+void ReplicatedPG::cancel_copy(CopyOpRef cop)
+{
+ OpContext *ctx = cop->ctx;
+ dout(10) << __func__ << " " << ctx->obc->obs.oi.soid << " ctx " << ctx
+ << " from " << cop->src << " " << cop->oloc << " v" << cop->version
+ << dendl;
+
+ // cancel objecter op, if we can
+ if (cop->objecter_tid) {
+ Mutex::Locker l(osd->objecter_lock);
+ osd->objecter->op_cancel(cop->objecter_tid);
+ }
+
+ copy_ops.erase(ctx->obc->obs.oi.soid);
+ --ctx->obc->copyfrom_readside;
+ ctx->copy_op.reset();
+
+ delete ctx;
+}
+
+void ReplicatedPG::requeue_cancel_copy_ops(bool requeue)
+{
+ dout(10) << __func__ << dendl;
+ for (map<hobject_t,CopyOpRef>::iterator p = copy_ops.begin();
+ p != copy_ops.end();
+ copy_ops.erase(p++)) {
+ // requeue initiating copy *and* any subsequent waiters
+ CopyOpRef cop = p->second;
+ if (requeue) {
+ cop->waiting.push_front(cop->ctx->op);
+ requeue_ops(cop->waiting);
+ }
+ cancel_copy(cop);
+ }
+}
// ========================================================================
@@ -6736,6 +6948,7 @@ void ReplicatedPG::on_shutdown()
deleting = true;
unreg_next_scrub();
+ requeue_cancel_copy_ops(false);
apply_and_flush_repops(false);
context_registry_on_change();
@@ -6786,6 +6999,8 @@ void ReplicatedPG::on_change(ObjectStore::Transaction *t)
context_registry_on_change();
+ requeue_cancel_copy_ops(is_primary());
+
// requeue object waiters
if (is_primary()) {
requeue_ops(waiting_for_backfill_pos);
diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h
index 5dc7d882a8b..254b5842ffc 100644
--- a/src/osd/ReplicatedPG.h
+++ b/src/osd/ReplicatedPG.h
@@ -83,7 +83,40 @@ public:
class ReplicatedPG : public PG {
friend class OSD;
friend class Watch;
-public:
+
+public:
+
+ /*
+ * state associated with a copy operation
+ */
+ struct OpContext;
+
+ struct CopyOp {
+ OpContext *ctx;
+ hobject_t src;
+ object_locator_t oloc;
+ version_t version;
+
+ tid_t objecter_tid;
+
+ list<OpRequestRef> waiting;
+
+ object_copy_cursor_t cursor;
+ uint64_t size;
+ utime_t mtime;
+ map<string,bufferlist> attrs;
+ bufferlist data;
+ map<string,bufferlist> omap;
+ int rval;
+
+ CopyOp(OpContext *c, hobject_t s, object_locator_t l, version_t v)
+ : ctx(c), src(s), oloc(l), version(v),
+ objecter_tid(0),
+ size(0),
+ rval(-1)
+ {}
+ };
+ typedef boost::shared_ptr<CopyOp> CopyOpRef;
/*
* Capture all object state associated with an in-progress read or write.
@@ -145,6 +178,8 @@ public:
int num_read; ///< count read ops
int num_write; ///< count update ops
+ CopyOpRef copy_op;
+
OpContext(const OpContext& other);
const OpContext& operator=(const OpContext& other);
@@ -749,6 +784,17 @@ protected:
void log_subop_stats(OpRequestRef op, int tag_inb, int tag_lat);
+ // -- copyfrom --
+ map<hobject_t, CopyOpRef> copy_ops;
+
+ int start_copy(OpContext *ctx, hobject_t src, object_locator_t oloc, version_t version,
+ CopyOpRef *pcop);
+ void process_copy_chunk(hobject_t oid, tid_t tid, int r);
+ void _copy_some(OpContext *ctx, CopyOpRef cop);
+ void cancel_copy(CopyOpRef cop);
+ void requeue_cancel_copy_ops(bool requeue=true);
+
+ friend class C_Copyfrom;
// -- scrub --
virtual void _scrub(ScrubMap& map);
diff --git a/src/osd/osd_types.cc b/src/osd/osd_types.cc
index 1c1b457002c..3451d520ff2 100644
--- a/src/osd/osd_types.cc
+++ b/src/osd/osd_types.cc
@@ -3484,6 +3484,8 @@ ostream& operator<<(ostream& out, const OSDOp& op)
break;
case CEPH_OSD_OP_COPY_GET:
out << " max " << op.op.copy_get.max;
+ case CEPH_OSD_OP_COPY_FROM:
+ out << " ver " << op.op.copy_from.src_version;
break;
default:
out << " " << op.op.extent.offset << "~" << op.op.extent.length;
diff --git a/src/osd/osd_types.h b/src/osd/osd_types.h
index 00e9409c98a..312eb81e3fd 100644
--- a/src/osd/osd_types.h
+++ b/src/osd/osd_types.h
@@ -2130,6 +2130,9 @@ public:
Cond cond;
int unstable_writes, readers, writers_waiting, readers_waiting;
+ /// in-progress copyfrom ops for this object
+ int copyfrom_readside;
+
// set if writes for this object are blocked on another objects recovery
ObjectContextRef blocked_by; // object blocking our writes
set<ObjectContextRef> blocking; // objects whose writes we block
@@ -2141,7 +2144,8 @@ public:
: ssc(NULL),
destructor_callback(0),
lock("ReplicatedPG::ObjectContext::lock"),
- unstable_writes(0), readers(0), writers_waiting(0), readers_waiting(0) {}
+ unstable_writes(0), readers(0), writers_waiting(0), readers_waiting(0),
+ copyfrom_readside(0) {}
~ObjectContext() {
if (destructor_callback)
diff --git a/src/osdc/Objecter.h b/src/osdc/Objecter.h
index 91f62551729..154ee410fde 100644
--- a/src/osdc/Objecter.h
+++ b/src/osdc/Objecter.h
@@ -753,6 +753,14 @@ struct ObjectOperation {
OSDOp& osd_op = add_op(CEPH_OSD_OP_ROLLBACK);
osd_op.op.snap.snapid = snapid;
}
+
+ void copy_from(object_t src, snapid_t snapid, object_locator_t src_oloc, version_t src_version) {
+ OSDOp& osd_op = add_op(CEPH_OSD_OP_COPY_FROM);
+ osd_op.op.copy_from.snapid = snapid;
+ osd_op.op.copy_from.src_version = src_version;
+ ::encode(src, osd_op.indata);
+ ::encode(src_oloc, osd_op.indata);
+ }
};
diff --git a/src/test/librados/misc.cc b/src/test/librados/misc.cc
index 6cb7cf5452a..af17847aeab 100644
--- a/src/test/librados/misc.cc
+++ b/src/test/librados/misc.cc
@@ -564,6 +564,56 @@ TEST(LibRadosMisc, BigAttrPP) {
ASSERT_EQ(0, destroy_one_pool_pp(pool_name, cluster));
}
+TEST(LibRadosMisc, CopyPP) {
+ Rados cluster;
+ std::string pool_name = get_temp_pool_name();
+ ASSERT_EQ("", create_one_pool_pp(pool_name, cluster));
+ IoCtx ioctx;
+ ASSERT_EQ(0, cluster.ioctx_create(pool_name.c_str(), ioctx));
+
+ bufferlist bl, x;
+ bl.append("hi there");
+ x.append("bar");
+
+ // small object
+ bufferlist blc = bl;
+ bufferlist xc = x;
+ ASSERT_EQ(0, ioctx.write_full("foo", blc));
+ ASSERT_EQ(0, ioctx.setxattr("foo", "myattr", xc));
+
+ ObjectWriteOperation op;
+ op.copy_from("foo", ioctx, ioctx.get_last_version());
+ ASSERT_EQ(0, ioctx.operate("foo.copy", &op));
+
+ bufferlist bl2, x2;
+ ASSERT_EQ((int)bl.length(), ioctx.read("foo.copy", bl2, 10000, 0));
+ ASSERT_TRUE(bl.contents_equal(bl2));
+ ASSERT_EQ((int)x.length(), ioctx.getxattr("foo.copy", "myattr", x2));
+ ASSERT_TRUE(x.contents_equal(x2));
+
+ // do a big object
+ bl.append(buffer::create(8000000));
+ bl.zero();
+ bl.append("tail");
+ blc = bl;
+ xc = x;
+ ASSERT_EQ(0, ioctx.write_full("big", blc));
+ ASSERT_EQ(0, ioctx.setxattr("big", "myattr", xc));
+
+ ObjectWriteOperation op2;
+ op.copy_from("big", ioctx, ioctx.get_last_version());
+ ASSERT_EQ(0, ioctx.operate("big.copy", &op));
+
+ bl2.clear();
+ ASSERT_EQ((int)bl.length(), ioctx.read("big.copy", bl2, bl.length(), 0));
+ ASSERT_TRUE(bl.contents_equal(bl2));
+ ASSERT_EQ((int)x.length(), ioctx.getxattr("foo.copy", "myattr", x2));
+ ASSERT_TRUE(x.contents_equal(x2));
+
+ ioctx.close();
+ ASSERT_EQ(0, destroy_one_pool_pp(pool_name, cluster));
+}
+
int main(int argc, char **argv)
{
::testing::InitGoogleTest(&argc, argv);
diff --git a/src/test/osd/RadosModel.h b/src/test/osd/RadosModel.h
index c545d9a29b8..b9967d7af50 100644
--- a/src/test/osd/RadosModel.h
+++ b/src/test/osd/RadosModel.h
@@ -48,7 +48,8 @@ enum TestOpType {
TEST_OP_SETATTR,
TEST_OP_RMATTR,
TEST_OP_TMAPPUT,
- TEST_OP_WATCH
+ TEST_OP_WATCH,
+ TEST_OP_COPY_FROM
};
class TestWatchContext : public librados::WatchCtx {
@@ -396,6 +397,12 @@ public:
pool_obj_cont[current_snap].insert(pair<string,ObjectDesc>(oid, new_obj));
}
+ void update_object_full(const string &oid, const ObjectDesc &contents)
+ {
+ pool_obj_cont.rbegin()->second.erase(oid);
+ pool_obj_cont.rbegin()->second.insert(pair<string,ObjectDesc>(oid, contents));
+ }
+
void update_object_version(const string &oid, uint64_t version)
{
for (map<int, map<string,ObjectDesc> >::reverse_iterator i =
@@ -1378,4 +1385,89 @@ public:
}
};
+class CopyFromOp : public TestOp {
+public:
+ string oid, oid_src;
+ ObjectDesc src_value;
+ librados::ObjectWriteOperation op;
+ librados::AioCompletion *comp;
+ int snap;
+ bool done;
+ tid_t tid;
+ CopyFromOp(RadosTestContext *context,
+ const string &oid,
+ const string &oid_src,
+ TestOpStat *stat)
+ : TestOp(context, stat), oid(oid), oid_src(oid_src),
+ src_value(&context->cont_gen),
+ comp(NULL), done(false), tid(0)
+ {}
+
+ void _begin()
+ {
+ ContDesc cont;
+ {
+ Mutex::Locker l(context->state_lock);
+ cont = ContDesc(context->seq_num, context->current_snap,
+ context->seq_num, "");
+ context->oid_in_use.insert(oid);
+ context->oid_not_in_use.erase(oid);
+ context->oid_in_use.insert(oid_src);
+ context->oid_not_in_use.erase(oid_src);
+ }
+
+ // choose source snap
+ if (0 && !(rand() % 4) && !context->snaps.empty()) {
+ snap = rand_choose(context->snaps)->first;
+ } else {
+ snap = -1;
+ }
+ context->find_object(oid_src, &src_value, snap);
+
+ string src = context->prefix+oid_src;
+ op.copy_from(src.c_str(), context->io_ctx, src_value.version);
+
+ pair<TestOp*, TestOp::CallbackInfo*> *cb_arg =
+ new pair<TestOp*, TestOp::CallbackInfo*>(this,
+ new TestOp::CallbackInfo(0));
+ comp = context->rados.aio_create_completion((void*) cb_arg, &write_callback,
+ NULL);
+ tid = context->io_ctx.aio_operate(context->prefix+oid, comp, &op);
+ }
+
+ void _finish(CallbackInfo *info)
+ {
+ Mutex::Locker l(context->state_lock);
+ done = true;
+ int r;
+ assert(comp->is_complete());
+ cout << "finishing copy_from tid " << tid << " to " << context->prefix + oid << std::endl;
+ if ((r = comp->get_return_value())) {
+ if (!(r == -ENOENT && src_value.deleted())) {
+ cerr << "Error: oid " << oid << " copy_from " << oid_src << " returned error code "
+ << r << std::endl;
+ }
+ } else {
+ context->update_object_full(oid, src_value);
+ context->update_object_version(oid, comp->get_version());
+ }
+ context->oid_in_use.erase(oid);
+ context->oid_not_in_use.insert(oid);
+ context->oid_in_use.erase(oid_src);
+ context->oid_not_in_use.insert(oid_src);
+ context->kick();
+ }
+
+ bool finished()
+ {
+ return done;
+ }
+
+ string getType()
+ {
+ return "TmapPutOp";
+ }
+};
+
+
#endif
diff --git a/src/test/osd/TestRados.cc b/src/test/osd/TestRados.cc
index 6ac661c0629..1deee23aa2c 100644
--- a/src/test/osd/TestRados.cc
+++ b/src/test/osd/TestRados.cc
@@ -84,7 +84,7 @@ private:
TestOp *gen_op(RadosTestContext &context, TestOpType type)
{
- string oid;
+ string oid, oid2;
cout << "oids not in use " << context.oid_not_in_use.size() << std::endl;
assert(context.oid_not_in_use.size());
switch (type) {
@@ -152,6 +152,13 @@ private:
<< " current snap is " << context.current_snap << std::endl;
return new WatchOp(&context, oid, m_stats);
+ case TEST_OP_COPY_FROM:
+ oid = *(rand_choose(context.oid_not_in_use));
+ oid2 = *(rand_choose(context.oid_not_in_use));
+ cout << "copy_from " << oid << " from " << oid2
+ << " current snap is " << context.current_snap << std::endl;
+ return new CopyFromOp(&context, oid, oid2, m_stats);
+
default:
cerr << "Invalid op type " << type << std::endl;
assert(0);
@@ -192,6 +199,7 @@ int main(int argc, char **argv)
{ TEST_OP_RMATTR, "rmattr" },
{ TEST_OP_TMAPPUT, "tmapput" },
{ TEST_OP_WATCH, "watch" },
+ { TEST_OP_COPY_FROM, "copy_from" },
{ TEST_OP_READ /* grr */, NULL },
};