summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGregory Farnum <greg@inktank.com>2013-09-17 11:14:26 -0700
committerGregory Farnum <greg@inktank.com>2013-09-17 11:14:26 -0700
commitb37f88c92d50e017cb4fde9a346ead8cd14a3345 (patch)
treec4da0f98f5acb0f2ca715b3b3ad0f97be19d30ca
parent1f80d63272a29ff0d1e75fa217102dd15a12c328 (diff)
parenta80f831b1f22b7f1bd24154e4a284c0dd838f7ff (diff)
downloadceph-b37f88c92d50e017cb4fde9a346ead8cd14a3345.tar.gz
Merge pull request #586 from ceph/wip-copyfrom-big
Reviewed-by: Greg Farnum <greg@inktank.com> Reviewed-by: Samuel Just <sam.just@inktank.com>
-rw-r--r--src/messages/MOSDSubOp.h12
-rw-r--r--src/os/FileStore.cc113
-rw-r--r--src/os/FileStore.h8
-rw-r--r--src/os/ObjectStore.h10
-rw-r--r--src/osd/OSD.cc2
-rw-r--r--src/osd/OSD.h2
-rw-r--r--src/osd/PG.h3
-rw-r--r--src/osd/ReplicatedPG.cc312
-rw-r--r--src/osd/ReplicatedPG.h20
-rw-r--r--src/osd/osd_types.cc2
-rw-r--r--src/osd/osd_types.h5
-rw-r--r--src/test/filestore/store_test.cc59
-rw-r--r--src/test/librados/misc.cc63
-rw-r--r--src/test/osd/RadosModel.h242
-rw-r--r--src/test/osd/TestRados.cc53
15 files changed, 641 insertions, 265 deletions
diff --git a/src/messages/MOSDSubOp.h b/src/messages/MOSDSubOp.h
index 50b1a926957..4169e01325e 100644
--- a/src/messages/MOSDSubOp.h
+++ b/src/messages/MOSDSubOp.h
@@ -25,7 +25,7 @@
class MOSDSubOp : public Message {
- static const int HEAD_VERSION = 7;
+ static const int HEAD_VERSION = 8;
static const int COMPAT_VERSION = 1;
public:
@@ -86,6 +86,9 @@ public:
// indicates that we must fix hobject_t encoding
bool hobject_incorrect_pool;
+ hobject_t new_temp_oid; ///< new temp object that we must now start tracking
+ hobject_t discard_temp_oid; ///< previously used temp object that we can now stop tracking
+
int get_cost() const {
if (ops.size() == 1 && ops[0].op.op == CEPH_OSD_OP_PULL)
return ops[0].op.extent.length;
@@ -150,6 +153,11 @@ public:
poid.pool = pgid.pool();
hobject_incorrect_pool = true;
}
+
+ if (header.version >= 8) {
+ ::decode(new_temp_oid, p);
+ ::decode(discard_temp_oid, p);
+ }
}
virtual void encode_payload(uint64_t features) {
@@ -194,6 +202,8 @@ public:
::encode(current_progress, payload);
::encode(omap_entries, payload);
::encode(omap_header, payload);
+ ::encode(new_temp_oid, payload);
+ ::encode(discard_temp_oid, payload);
}
MOSDSubOp()
diff --git a/src/os/FileStore.cc b/src/os/FileStore.cc
index 7ef415213b0..66581bcde38 100644
--- a/src/os/FileStore.cc
+++ b/src/os/FileStore.cc
@@ -246,14 +246,14 @@ void FileStore::lfn_close(FDRef fd)
{
}
-int FileStore::lfn_link(coll_t c, coll_t cid, const hobject_t& o)
+int FileStore::lfn_link(coll_t c, coll_t newcid, const hobject_t& o, const hobject_t& newoid)
{
Index index_new, index_old;
IndexedPath path_new, path_old;
int exist;
int r;
- if (c < cid) {
- r = get_index(cid, &index_new);
+ if (c < newcid) {
+ r = get_index(newcid, &index_new);
if (r < 0)
return r;
r = get_index(c, &index_old);
@@ -263,7 +263,7 @@ int FileStore::lfn_link(coll_t c, coll_t cid, const hobject_t& o)
r = get_index(c, &index_old);
if (r < 0)
return r;
- r = get_index(cid, &index_new);
+ r = get_index(newcid, &index_new);
if (r < 0)
return r;
}
@@ -276,7 +276,7 @@ int FileStore::lfn_link(coll_t c, coll_t cid, const hobject_t& o)
if (!exist)
return -ENOENT;
- r = index_new->lookup(o, &path_new, &exist);
+ r = index_new->lookup(newoid, &path_new, &exist);
if (r < 0) {
assert(!m_filestore_fail_eio || r != -EIO);
return r;
@@ -290,7 +290,7 @@ int FileStore::lfn_link(coll_t c, coll_t cid, const hobject_t& o)
if (r < 0)
return -errno;
- r = index_new->created(o, path_new->path());
+ r = index_new->created(newoid, path_new->path());
if (r < 0) {
assert(!m_filestore_fail_eio || r != -EIO);
return r;
@@ -299,7 +299,8 @@ int FileStore::lfn_link(coll_t c, coll_t cid, const hobject_t& o)
}
int FileStore::lfn_unlink(coll_t cid, const hobject_t& o,
- const SequencerPosition &spos)
+ const SequencerPosition &spos,
+ bool force_clear_omap)
{
Index index;
int r = get_index(cid, &index);
@@ -315,14 +316,17 @@ int FileStore::lfn_unlink(coll_t cid, const hobject_t& o,
return r;
}
- struct stat st;
- r = ::stat(path->path(), &st);
- if (r < 0) {
- r = -errno;
- assert(!m_filestore_fail_eio || r != -EIO);
- return r;
+ if (!force_clear_omap) {
+ struct stat st;
+ r = ::stat(path->path(), &st);
+ if (r < 0) {
+ r = -errno;
+ assert(!m_filestore_fail_eio || r != -EIO);
+ return r;
+ }
+ force_clear_omap = true;
}
- if (st.st_nlink == 1) {
+ if (force_clear_omap) {
dout(20) << __func__ << ": clearing omap on " << o
<< " in cid " << cid << dendl;
r = object_map->clear(o, &spos);
@@ -2176,6 +2180,16 @@ unsigned FileStore::_do_transaction(Transaction& t, uint64_t op_seq, int trans_n
}
break;
+ case Transaction::OP_COLL_MOVE_RENAME:
+ {
+ coll_t oldcid = i.get_cid();
+ hobject_t oldoid = i.get_oid();
+ coll_t newcid = i.get_cid();
+ hobject_t newoid = i.get_oid();
+ r = _collection_move_rename(oldcid, oldoid, newcid, newoid, spos);
+ }
+ break;
+
case Transaction::OP_COLL_SETATTR:
{
coll_t cid = i.get_cid();
@@ -4086,7 +4100,7 @@ int FileStore::_destroy_collection(coll_t c)
int FileStore::_collection_add(coll_t c, coll_t oldcid, const hobject_t& o,
- const SequencerPosition& spos)
+ const SequencerPosition& spos)
{
dout(15) << "collection_add " << c << "/" << o << " from " << oldcid << "/" << o << dendl;
@@ -4116,7 +4130,7 @@ int FileStore::_collection_add(coll_t c, coll_t oldcid, const hobject_t& o,
_set_replay_guard(**fd, spos, &o, true);
}
- r = lfn_link(oldcid, c, o);
+ r = lfn_link(oldcid, c, o, o);
if (replaying && !backend->can_checkpoint() &&
r == -EEXIST) // crashed between link() and set_replay_guard()
r = 0;
@@ -4133,6 +4147,73 @@ int FileStore::_collection_add(coll_t c, coll_t oldcid, const hobject_t& o,
return r;
}
+int FileStore::_collection_move_rename(coll_t oldcid, const hobject_t& oldoid,
+ coll_t c, const hobject_t& o,
+ const SequencerPosition& spos)
+{
+ dout(15) << __func__ << " " << c << "/" << o << " from " << oldcid << "/" << oldoid << dendl;
+ int r;
+ int dstcmp, srccmp;
+
+ dstcmp = _check_replay_guard(c, o, spos);
+ if (dstcmp < 0)
+ goto out_rm_src;
+
+ // check the src name too; it might have a newer guard, and we don't
+ // want to clobber it
+ srccmp = _check_replay_guard(oldcid, oldoid, spos);
+ if (srccmp < 0)
+ return 0;
+
+ {
+ // open guard on object so we don't any previous operations on the
+ // new name that will modify the source inode.
+ FDRef fd;
+ r = lfn_open(oldcid, oldoid, 0, &fd);
+ if (r < 0) {
+ // the source collection/object does not exist. If we are replaying, we
+ // should be safe, so just return 0 and move on.
+ assert(replaying);
+ dout(10) << __func__ << " " << c << "/" << o << " from "
+ << oldcid << "/" << oldoid << " (dne, continue replay) " << dendl;
+ return 0;
+ }
+ if (dstcmp > 0) { // if dstcmp == 0 the guard already says "in-progress"
+ _set_replay_guard(**fd, spos, &o, true);
+ }
+
+ r = lfn_link(oldcid, c, oldoid, o);
+ if (replaying && !backend->can_checkpoint() &&
+ r == -EEXIST) // crashed between link() and set_replay_guard()
+ r = 0;
+
+ _inject_failure();
+
+ // the name changed; link the omap content
+ r = object_map->clone(oldoid, o, &spos);
+ if (r == -ENOENT)
+ r = 0;
+
+ _inject_failure();
+
+ // close guard on object so we don't do this again
+ if (r == 0) {
+ _close_replay_guard(**fd, spos);
+ }
+ lfn_close(fd);
+ }
+
+ out_rm_src:
+ // remove source
+ if (_check_replay_guard(oldcid, oldoid, spos) > 0) {
+ r = lfn_unlink(oldcid, oldoid, spos, true);
+ }
+
+ dout(10) << __func__ << " " << c << "/" << o << " from " << oldcid << "/" << oldoid
+ << " = " << r << dendl;
+ return r;
+}
+
void FileStore::_inject_failure()
{
if (m_filestore_kill_at.read()) {
diff --git a/src/os/FileStore.h b/src/os/FileStore.h
index c603949b399..4f58df4d698 100644
--- a/src/os/FileStore.h
+++ b/src/os/FileStore.h
@@ -298,8 +298,9 @@ public:
IndexedPath *path = 0,
Index *index = 0);
void lfn_close(FDRef fd);
- int lfn_link(coll_t c, coll_t cid, const hobject_t& o) ;
- int lfn_unlink(coll_t cid, const hobject_t& o, const SequencerPosition &spos);
+ int lfn_link(coll_t c, coll_t newcid, const hobject_t& o, const hobject_t& newoid) ;
+ int lfn_unlink(coll_t cid, const hobject_t& o, const SequencerPosition &spos,
+ bool force_clear_omap=false);
public:
FileStore(const std::string &base, const std::string &jdev, const char *internal_name = "filestore", bool update_to=false);
@@ -499,6 +500,9 @@ public:
int _destroy_collection(coll_t c);
int _collection_add(coll_t c, coll_t ocid, const hobject_t& o,
const SequencerPosition& spos);
+ int _collection_move_rename(coll_t oldcid, const hobject_t& oldoid,
+ coll_t c, const hobject_t& o,
+ const SequencerPosition& spos);
void dump_start(const std::string& file);
void dump_stop();
void dump_transactions(list<ObjectStore::Transaction*>& ls, uint64_t seq, OpSequencer *osr);
diff --git a/src/os/ObjectStore.h b/src/os/ObjectStore.h
index 655afee004f..7e8f6ce43bf 100644
--- a/src/os/ObjectStore.h
+++ b/src/os/ObjectStore.h
@@ -159,6 +159,7 @@ public:
OP_SPLIT_COLLECTION2 = 36, /* cid, bits, destination
doesn't create the destination */
OP_OMAP_RMKEYRANGE = 37, // cid, oid, firstkey, lastkey
+ OP_COLL_MOVE_RENAME = 38, // oldcid, oldoid, newcid, newoid
};
private:
@@ -554,6 +555,15 @@ public:
collection_remove(oldcid, oid);
return;
}
+ void collection_move_rename(coll_t oldcid, const hobject_t& oldoid,
+ coll_t cid, const hobject_t& oid) {
+ __u32 op = OP_COLL_MOVE_RENAME;
+ ::encode(op, tbl);
+ ::encode(oldcid, tbl);
+ ::encode(oldoid, tbl);
+ ::encode(cid, tbl);
+ ::encode(oid, tbl);
+ }
void collection_setattr(coll_t cid, const char* name, bufferlist& val) {
string n(name);
diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc
index 76bdb02c0ad..ff1276969d8 100644
--- a/src/osd/OSD.cc
+++ b/src/osd/OSD.cc
@@ -134,8 +134,6 @@ static ostream& _prefix(std::ostream* _dout, int whoami, OSDMapRef osdmap) {
<< " ";
}
-const coll_t coll_t::META_COLL("meta");
-
static CompatSet get_osd_compat_set() {
CompatSet::FeatureSet ceph_osd_feature_compat;
CompatSet::FeatureSet ceph_osd_feature_ro_compat;
diff --git a/src/osd/OSD.h b/src/osd/OSD.h
index f5f50be4af8..c2f45196870 100644
--- a/src/osd/OSD.h
+++ b/src/osd/OSD.h
@@ -179,8 +179,6 @@ class HistoricOpsSocketHook;
class TestOpsSocketHook;
struct C_CompleteSplits;
-extern const coll_t meta_coll;
-
typedef std::tr1::shared_ptr<ObjectStore::Sequencer> SequencerRef;
class DeletingState {
diff --git a/src/osd/PG.h b/src/osd/PG.h
index cbafd0f43d9..cdbe827a4a9 100644
--- a/src/osd/PG.h
+++ b/src/osd/PG.h
@@ -523,7 +523,8 @@ protected:
list<OpRequestRef> waiting_for_active;
list<OpRequestRef> waiting_for_all_missing;
map<hobject_t, list<OpRequestRef> > waiting_for_missing_object,
- waiting_for_degraded_object;
+ waiting_for_degraded_object,
+ waiting_for_blocked_object;
// Callbacks should assume pg (and nothing else) is locked
map<hobject_t, list<Context*> > callbacks_for_degraded_object;
map<eversion_t,list<OpRequestRef> > waiting_for_ack, waiting_for_ondisk;
diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc
index 3cd5a7ef865..a92403ae370 100644
--- a/src/osd/ReplicatedPG.cc
+++ b/src/osd/ReplicatedPG.cc
@@ -50,6 +50,7 @@
#include "include/compat.h"
#include "common/cmdparse.h"
+#include "mon/MonClient.h"
#include "osdc/Objecter.h"
#include "json_spirit/json_spirit_value.h"
@@ -191,6 +192,13 @@ void ReplicatedPG::wait_for_degraded_object(const hobject_t& soid, OpRequestRef
op->mark_delayed("waiting for degraded object");
}
+void ReplicatedPG::wait_for_blocked_object(const hobject_t& soid, OpRequestRef op)
+{
+ dout(10) << __func__ << " " << soid << " " << op << dendl;
+ waiting_for_blocked_object[soid].push_back(op);
+ op->mark_delayed("waiting for blocked object");
+}
+
void ReplicatedPG::wait_for_backfill_pos(OpRequestRef op)
{
waiting_for_backfill_pos.push_back(op);
@@ -622,7 +630,9 @@ ReplicatedPG::ReplicatedPG(OSDService *o, OSDMapRef curmap,
PG(o, curmap, _pool, p, oid, ioid),
snapset_contexts_lock("ReplicatedPG::snapset_contexts"),
temp_created(false),
- temp_coll(coll_t::make_temp_coll(p)), snap_trimmer_machine(this)
+ temp_coll(coll_t::make_temp_coll(p)),
+ temp_seq(0),
+ snap_trimmer_machine(this)
{
snap_trimmer_machine.initiate();
}
@@ -737,7 +747,7 @@ void ReplicatedPG::do_op(OpRequestRef op)
osd->reply_op_error(op, r);
return;
}
-
+
// make sure locator is consistent
object_locator_t oloc(obc->obs.oi.soid);
if (m->get_object_locator() != oloc) {
@@ -748,6 +758,12 @@ void ReplicatedPG::do_op(OpRequestRef op)
<< " op " << *m << "\n";
}
+ // io blocked on obc?
+ if (obc->is_blocked()) {
+ wait_for_blocked_object(obc->obs.oi.soid, op);
+ return;
+ }
+
if ((op->may_read()) && (obc->obs.oi.lost)) {
// This object is lost. Reading from it returns an error.
dout(20) << __func__ << ": object " << obc->obs.oi.soid
@@ -1028,12 +1044,6 @@ void ReplicatedPG::execute_ctx(OpContext *ctx)
// note my stats
utime_t now = ceph_clock_now(cct);
- // note some basic context for op replication that prepare_transaction may clobber
- eversion_t old_last_update = pg_log.get_head();
- bool old_exists = obc->obs.exists;
- uint64_t old_size = obc->obs.oi.size;
- eversion_t old_version = obc->obs.oi.version;
-
if (op->may_read()) {
dout(10) << " taking ondisk_read_lock" << dendl;
obc->ondisk_read_lock();
@@ -1152,7 +1162,7 @@ void ReplicatedPG::execute_ctx(OpContext *ctx)
repop->src_obc.swap(src_obc); // and src_obc.
- issue_repop(repop, now, old_last_update, old_exists, old_size, old_version);
+ issue_repop(repop, now);
eval_repop(repop);
repop->put();
@@ -3490,7 +3500,11 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
}
::encode(out_omap, osd_op.outdata);
- dout(20) << " cursor.is_complete=" << cursor.is_complete() << dendl;
+ dout(20) << " cursor.is_complete=" << cursor.is_complete()
+ << " " << out_attrs.size() << " attrs"
+ << " " << bl.length() << " bytes"
+ << " " << out_omap.size() << " keys"
+ << dendl;
::encode(cursor, osd_op.outdata);
result = 0;
}
@@ -3511,44 +3525,20 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
result = -EINVAL;
goto fail;
}
- pg_t raw_pg;
- get_osdmap()->object_locator_to_pg(src_name, src_oloc, raw_pg);
- hobject_t src(src_name, src_oloc.key, src_snapid,
- raw_pg.ps(), raw_pg.pool(),
- src_oloc.nspace);
if (!ctx->copy_op) {
// start
+ pg_t raw_pg;
+ get_osdmap()->object_locator_to_pg(src_name, src_oloc, raw_pg);
+ hobject_t src(src_name, src_oloc.key, src_snapid,
+ raw_pg.ps(), raw_pg.pool(),
+ src_oloc.nspace);
result = start_copy(ctx, src, src_oloc, src_version, &ctx->copy_op);
if (result < 0)
goto fail;
result = -EINPROGRESS;
} else {
// finish
- CopyOpRef cop = ctx->copy_op;
-
- if (!obs.exists) {
- ctx->delta_stats.num_objects++;
- obs.exists = true;
- } else {
- t.remove(coll, soid);
- }
- t.write(coll, soid, 0, cop->data.length(), cop->data);
- for (map<string,bufferlist>::iterator p = cop->attrs.begin(); p != cop->attrs.end(); ++p)
- t.setattr(coll, soid, string("_") + p->first, p->second);
- t.omap_setkeys(coll, soid, cop->omap);
-
- interval_set<uint64_t> ch;
- if (oi.size > 0)
- ch.insert(0, oi.size);
- ctx->modified_ranges.union_of(ch);
-
- if (cop->data.length() != oi.size) {
- ctx->delta_stats.num_bytes -= oi.size;
- oi.size = cop->data.length();
- ctx->delta_stats.num_bytes += oi.size;
- }
- ctx->delta_stats.num_wr++;
- ctx->delta_stats.num_wr_kb += SHIFT_ROUND_UP(cop->data.length(), 10);
+ result = finish_copy(ctx);
}
}
break;
@@ -3987,6 +3977,15 @@ coll_t ReplicatedPG::get_temp_coll(ObjectStore::Transaction *t)
return temp_coll;
}
+hobject_t ReplicatedPG::generate_temp_object()
+{
+ ostringstream ss;
+ ss << "temp_" << info.pgid << "_" << get_role() << "_" << osd->monc->get_global_id() << "_" << (++temp_seq);
+ hobject_t hoid(object_t(ss.str()), "", CEPH_NOSNAP, 0, -1, "");
+ dout(20) << __func__ << " " << hoid << dendl;
+ return hoid;
+}
+
int ReplicatedPG::prepare_transaction(OpContext *ctx)
{
assert(!ctx->ops.empty());
@@ -4185,7 +4184,13 @@ void ReplicatedPG::_copy_some(OpContext *ctx, CopyOpRef cop)
{
dout(10) << __func__ << " " << ctx << " " << cop << dendl;
ObjectOperation op;
- op.assert_version(cop->version);
+ if (cop->version) {
+ op.assert_version(cop->version);
+ } else {
+ // we should learn the version after the first chunk, if we didn't know
+ // it already!
+ assert(cop->cursor.is_initial());
+ }
op.copy_get(&cop->cursor, cct->_conf->osd_copyfrom_max_chunk,
&cop->size, &cop->mtime, &cop->attrs,
&cop->data, &cop->omap,
@@ -4195,10 +4200,11 @@ void ReplicatedPG::_copy_some(OpContext *ctx, CopyOpRef cop)
get_last_peering_reset());
osd->objecter_lock.Lock();
tid_t tid = osd->objecter->read(cop->src.oid, cop->oloc, op,
- cop->src.snap, NULL, 0,
- new C_OnFinisher(fin,
- &osd->objecter_finisher),
- NULL);
+ cop->src.snap, NULL, 0,
+ new C_OnFinisher(fin,
+ &osd->objecter_finisher),
+ // discover the object version if we don't know it yet
+ cop->version ? NULL : &cop->version);
fin->tid = tid;
cop->objecter_tid = tid;
osd->objecter_lock.Unlock();
@@ -4223,14 +4229,33 @@ void ReplicatedPG::process_copy_chunk(hobject_t oid, tid_t tid, int r)
if (r < 0) {
copy_ops.erase(ctx->obc->obs.oi.soid);
--ctx->obc->copyfrom_readside;
+ kick_object_context_blocked(ctx->obc);
reply_ctx(ctx, r);
return;
}
assert(cop->rval >= 0);
- // FIXME: this is accumulating the entire object in memory.
-
if (!cop->cursor.is_complete()) {
+ // write out what we have so far
+ vector<OSDOp> ops;
+ tid_t rep_tid = osd->get_tid();
+ osd_reqid_t reqid(osd->get_cluster_msgr_name(), 0, rep_tid);
+ OpContext *tctx = new OpContext(OpRequestRef(), reqid, ops, &ctx->obc->obs, ctx->obc->ssc, this);
+ tctx->mtime = ceph_clock_now(g_ceph_context);
+ RepGather *repop = new_repop(tctx, ctx->obc, rep_tid);
+
+ if (cop->temp_cursor.is_initial()) {
+ cop->temp_coll = get_temp_coll(&tctx->local_t);
+ cop->temp_oid = generate_temp_object();
+ temp_contents.insert(cop->temp_oid);
+ repop->ctx->new_temp_oid = cop->temp_oid;
+ }
+
+ _write_copy_chunk(cop, &tctx->op_t);
+
+ issue_repop(repop, repop->ctx->mtime);
+ eval_repop(repop);
+
dout(10) << __func__ << " fetching more" << dendl;
_copy_some(ctx, cop);
return;
@@ -4242,6 +4267,73 @@ void ReplicatedPG::process_copy_chunk(hobject_t oid, tid_t tid, int r)
copy_ops.erase(ctx->obc->obs.oi.soid);
--ctx->obc->copyfrom_readside;
ctx->copy_op.reset();
+ kick_object_context_blocked(ctx->obc);
+}
+
+void ReplicatedPG::_write_copy_chunk(CopyOpRef cop, ObjectStore::Transaction *t)
+{
+ dout(20) << __func__ << " " << cop
+ << " " << cop->attrs.size() << " attrs"
+ << " " << cop->data.length() << " bytes"
+ << " " << cop->omap.size() << " keys"
+ << dendl;
+ if (!cop->temp_cursor.attr_complete) {
+ t->touch(cop->temp_coll, cop->temp_oid);
+ for (map<string,bufferlist>::iterator p = cop->attrs.begin(); p != cop->attrs.end(); ++p)
+ t->setattr(cop->temp_coll, cop->temp_oid, string("_") + p->first, p->second);
+ cop->attrs.clear();
+ }
+ if (!cop->temp_cursor.data_complete) {
+ t->write(cop->temp_coll, cop->temp_oid, cop->temp_cursor.data_offset, cop->data.length(), cop->data);
+ cop->data.clear();
+ }
+ if (!cop->temp_cursor.omap_complete) {
+ t->omap_setkeys(cop->temp_coll, cop->temp_oid, cop->omap);
+ cop->omap.clear();
+ }
+ cop->temp_cursor = cop->cursor;
+}
+
+int ReplicatedPG::finish_copy(OpContext *ctx)
+{
+ CopyOpRef cop = ctx->copy_op;
+ ObjectState& obs = ctx->new_obs;
+ ObjectStore::Transaction& t = ctx->op_t;
+
+ if (!obs.exists) {
+ ctx->delta_stats.num_objects++;
+ obs.exists = true;
+ } else {
+ t.remove(coll, obs.oi.soid);
+ }
+
+ if (cop->temp_cursor.is_initial()) {
+ // write directly to final object
+ cop->temp_coll = coll;
+ cop->temp_oid = obs.oi.soid;
+ _write_copy_chunk(cop, &t);
+ } else {
+ // finish writing to temp object, then move into place
+ _write_copy_chunk(cop, &t);
+ t.collection_move_rename(cop->temp_coll, cop->temp_oid, coll, obs.oi.soid);
+ temp_contents.erase(cop->temp_oid);
+ ctx->discard_temp_oid = cop->temp_oid;
+ }
+
+ interval_set<uint64_t> ch;
+ if (obs.oi.size > 0)
+ ch.insert(0, obs.oi.size);
+ ctx->modified_ranges.union_of(ch);
+
+ if (cop->cursor.data_offset != obs.oi.size) {
+ ctx->delta_stats.num_bytes -= obs.oi.size;
+ ctx->delta_stats.num_bytes += obs.oi.size;
+ obs.oi.size = cop->cursor.data_offset;
+ }
+ ctx->delta_stats.num_wr++;
+ ctx->delta_stats.num_wr_kb += SHIFT_ROUND_UP(obs.oi.size, 10);
+
+ return 0;
}
void ReplicatedPG::cancel_copy(CopyOpRef cop)
@@ -4261,6 +4353,8 @@ void ReplicatedPG::cancel_copy(CopyOpRef cop)
--ctx->obc->copyfrom_readside;
ctx->copy_op.reset();
+ kick_object_context_blocked(ctx->obc);
+
delete ctx;
}
@@ -4366,9 +4460,11 @@ void ReplicatedPG::op_applied(RepGather *repop)
repop->waitfor_disk.count(whoami) == 0); // commit before ondisk
repop->waitfor_ack.erase(whoami);
- assert(info.last_update >= repop->v);
- assert(last_update_applied < repop->v);
- last_update_applied = repop->v;
+ if (repop->v != eversion_t()) {
+ assert(info.last_update >= repop->v);
+ assert(last_update_applied < repop->v);
+ last_update_applied = repop->v;
+ }
// chunky scrub
if (scrubber.active && scrubber.is_chunky) {
@@ -4415,9 +4511,10 @@ void ReplicatedPG::op_commit(RepGather *repop)
// is no separate reply sent.
repop->waitfor_ack.erase(whoami);
- last_update_ondisk = repop->v;
-
- last_complete_ondisk = repop->pg_local_last_complete;
+ if (repop->v != eversion_t()) {
+ last_update_ondisk = repop->v;
+ last_complete_ondisk = repop->pg_local_last_complete;
+ }
eval_repop(repop);
}
@@ -4568,8 +4665,7 @@ void ReplicatedPG::eval_repop(RepGather *repop)
}
}
-void ReplicatedPG::issue_repop(RepGather *repop, utime_t now,
- eversion_t old_last_update, bool old_exists, uint64_t old_size, eversion_t old_version)
+void ReplicatedPG::issue_repop(RepGather *repop, utime_t now)
{
OpContext *ctx = repop->ctx;
const hobject_t& soid = ctx->obs->oi.soid;
@@ -4607,34 +4703,31 @@ void ReplicatedPG::issue_repop(RepGather *repop, utime_t now,
((static_cast<MOSDOp *>(ctx->op->request))->get_flags() & CEPH_OSD_FLAG_PARALLELEXEC)) {
// replicate original op for parallel execution on replica
assert(0 == "broken implementation, do not use");
- wr->oloc = object_locator_t(repop->ctx->obs->oi.soid);
- wr->ops = repop->ctx->ops;
- wr->mtime = repop->ctx->mtime;
- wr->old_exists = old_exists;
- wr->old_size = old_size;
- wr->old_version = old_version;
- wr->snapset = repop->obc->ssc->snapset;
- wr->snapc = repop->ctx->snapc;
- wr->set_data(repop->ctx->op->request->get_data()); // _copy_ bufferlist
- } else {
- // ship resulting transaction, log entries, and pg_stats
- if (peer == backfill_target && soid >= backfill_pos) {
- dout(10) << "issue_repop shipping empty opt to osd." << peer << ", object beyond backfill_pos "
- << backfill_pos << ", last_backfill is " << pinfo.last_backfill << dendl;
- ObjectStore::Transaction t;
- ::encode(t, wr->get_data());
- } else {
- ::encode(repop->ctx->op_t, wr->get_data());
- }
- ::encode(repop->ctx->log, wr->logbl);
+ }
- if (backfill_target >= 0 && backfill_target == peer)
- wr->pg_stats = pinfo.stats; // reflects backfill progress
- else
- wr->pg_stats = info.stats;
+ // ship resulting transaction, log entries, and pg_stats
+ if (peer == backfill_target && soid >= backfill_pos &&
+ soid.pool == (int64_t)info.pgid.pool()) { // only skip normal (not temp pool=-1) objects
+ dout(10) << "issue_repop shipping empty opt to osd." << peer << ", object beyond backfill_pos "
+ << backfill_pos << ", last_backfill is " << pinfo.last_backfill << dendl;
+ ObjectStore::Transaction t;
+ ::encode(t, wr->get_data());
+ } else {
+ ::encode(repop->ctx->op_t, wr->get_data());
}
+
+ ::encode(repop->ctx->log, wr->logbl);
+
+ if (backfill_target >= 0 && backfill_target == peer)
+ wr->pg_stats = pinfo.stats; // reflects backfill progress
+ else
+ wr->pg_stats = info.stats;
wr->pg_trim_to = pg_trim_to;
+
+ wr->new_temp_oid = repop->ctx->new_temp_oid;
+ wr->discard_temp_oid = repop->ctx->discard_temp_oid;
+
osd->send_message_osd_cluster(peer, wr, get_osdmap()->get_epoch());
// keep peer_info up to date
@@ -4870,12 +4963,7 @@ void ReplicatedPG::handle_watch_timeout(WatchRef watch)
0,
osd_reqid_t(), ctx->mtime));
- eversion_t old_last_update = pg_log.get_head();
- bool old_exists = repop->obc->obs.exists;
- uint64_t old_size = repop->obc->obs.oi.size;
- eversion_t old_version = repop->obc->obs.oi.version;
-
- obc->obs.oi.prior_version = old_version;
+ obc->obs.oi.prior_version = repop->obc->obs.oi.version;
obc->obs.oi.version = ctx->at_version;
bufferlist bl;
::encode(obc->obs.oi, bl);
@@ -4884,8 +4972,7 @@ void ReplicatedPG::handle_watch_timeout(WatchRef watch)
append_log(repop->ctx->log, eversion_t(), repop->ctx->local_t);
// obc ref swallowed by repop!
- issue_repop(repop, repop->ctx->mtime, old_last_update, old_exists,
- old_size, old_version);
+ issue_repop(repop, repop->ctx->mtime);
eval_repop(repop);
}
@@ -5129,6 +5216,24 @@ void ReplicatedPG::add_object_context_to_pg_stat(ObjectContextRef obc, pg_stat_t
pgstat->stats.cat_sum[oi.category].add(stat);
}
+void ReplicatedPG::kick_object_context_blocked(ObjectContextRef obc)
+{
+ const hobject_t& soid = obc->obs.oi.soid;
+ map<hobject_t, list<OpRequestRef> >::iterator p = waiting_for_blocked_object.find(soid);
+ if (p == waiting_for_blocked_object.end())
+ return;
+
+ if (obc->is_blocked()) {
+ dout(10) << __func__ << " " << soid << " still blocked" << dendl;
+ return;
+ }
+
+ list<OpRequestRef>& ls = waiting_for_blocked_object[soid];
+ dout(10) << __func__ << " " << soid << " requeuing " << ls.size() << " requests" << dendl;
+ requeue_ops(ls);
+ waiting_for_blocked_object.erase(soid);
+}
+
SnapSetContext *ReplicatedPG::create_snapset_context(const object_t& oid)
{
Mutex::Locker l(snapset_contexts_lock);
@@ -5238,6 +5343,16 @@ void ReplicatedPG::sub_op_modify(OpRequestRef op)
bufferlist::iterator p = m->get_data().begin();
+ if (m->new_temp_oid != hobject_t()) {
+ dout(20) << __func__ << " start tracking temp " << m->new_temp_oid << dendl;
+ temp_contents.insert(m->new_temp_oid);
+ get_temp_coll(&rm->localt);
+ }
+ if (m->discard_temp_oid != hobject_t()) {
+ dout(20) << __func__ << " stop tracking temp " << m->discard_temp_oid << dendl;
+ temp_contents.erase(m->discard_temp_oid);
+ }
+
::decode(rm->opt, p);
if (!(m->get_connection()->get_features() & CEPH_FEATURE_OSD_SNAPMAPPER))
rm->opt.set_tolerate_collection_add_enoent();
@@ -5309,9 +5424,11 @@ void ReplicatedPG::sub_op_modify_applied(RepModify *rm)
osd->send_message_osd_cluster(rm->ackerosd, ack, get_osdmap()->get_epoch());
}
- assert(info.last_update >= m->version);
- assert(last_update_applied < m->version);
- last_update_applied = m->version;
+ if (m->version != eversion_t()) {
+ assert(info.last_update >= m->version);
+ assert(last_update_applied < m->version);
+ last_update_applied = m->version;
+ }
if (scrubber.active_rep_scrub) {
if (last_update_applied == scrubber.active_rep_scrub->scrub_to) {
osd->rep_scrub_wq.queue(scrubber.active_rep_scrub);
@@ -7072,6 +7189,14 @@ void ReplicatedPG::on_change(ObjectStore::Transaction *t)
p->second.clear();
finish_degraded_object(p->first);
}
+ for (map<hobject_t,list<OpRequestRef> >::iterator p = waiting_for_blocked_object.begin();
+ p != waiting_for_blocked_object.end();
+ waiting_for_blocked_object.erase(p++)) {
+ if (is_primary())
+ requeue_ops(p->second);
+ else
+ p->second.clear();
+ }
if (is_primary())
requeue_ops(waiting_for_all_missing);
@@ -8203,15 +8328,10 @@ boost::statechart::result ReplicatedPG::TrimmingObjects::react(const SnapTrim&)
dout(10) << "TrimmingObjects react trimming " << pos << dendl;
RepGather *repop = pg->trim_object(pos);
assert(repop);
-
repop->queue_snap_trimmer = true;
- eversion_t old_last_update = pg->pg_log.get_head();
- bool old_exists = repop->obc->obs.exists;
- uint64_t old_size = repop->obc->obs.oi.size;
- eversion_t old_version = repop->obc->obs.oi.version;
pg->append_log(repop->ctx->log, eversion_t(), repop->ctx->local_t);
- pg->issue_repop(repop, repop->ctx->mtime, old_last_update, old_exists, old_size, old_version);
+ pg->issue_repop(repop, repop->ctx->mtime);
pg->eval_repop(repop);
repops.insert(repop);
diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h
index fef3814d93a..e880bdecade 100644
--- a/src/osd/ReplicatedPG.h
+++ b/src/osd/ReplicatedPG.h
@@ -109,6 +109,10 @@ public:
map<string,bufferlist> omap;
int rval;
+ coll_t temp_coll;
+ hobject_t temp_oid;
+ object_copy_cursor_t temp_cursor;
+
CopyOp(OpContext *c, hobject_t s, object_locator_t l, version_t v)
: ctx(c), src(s), oloc(l), version(v),
objecter_tid(0),
@@ -180,6 +184,8 @@ public:
CopyOpRef copy_op;
+ hobject_t new_temp_oid, discard_temp_oid; ///< temp objects we should start/stop tracking
+
OpContext(const OpContext& other);
const OpContext& operator=(const OpContext& other);
@@ -279,8 +285,7 @@ protected:
void op_applied(RepGather *repop);
void op_commit(RepGather *repop);
void eval_repop(RepGather*);
- void issue_repop(RepGather *repop, utime_t now,
- eversion_t old_last_update, bool old_exists, uint64_t old_size, eversion_t old_version);
+ void issue_repop(RepGather *repop, utime_t now);
RepGather *new_repop(OpContext *ctx, ObjectContextRef obc, tid_t rep_tid);
void remove_repop(RepGather *repop);
void repop_ack(RepGather *repop,
@@ -419,9 +424,6 @@ protected:
};
map<hobject_t, PullInfo> pulling;
- // Track contents of temp collection, clear on reset
- set<hobject_t> temp_contents;
-
ObjectRecoveryInfo recalc_subsets(const ObjectRecoveryInfo& recovery_info);
static void trim_pushed_data(const interval_set<uint64_t> &copy_subset,
const interval_set<uint64_t> &intervals_received,
@@ -793,7 +795,9 @@ protected:
int start_copy(OpContext *ctx, hobject_t src, object_locator_t oloc, version_t version,
CopyOpRef *pcop);
void process_copy_chunk(hobject_t oid, tid_t tid, int r);
+ void _write_copy_chunk(CopyOpRef cop, ObjectStore::Transaction *t);
void _copy_some(OpContext *ctx, CopyOpRef cop);
+ int finish_copy(OpContext *ctx);
void cancel_copy(CopyOpRef cop);
void requeue_cancel_copy_ops(bool requeue=true);
@@ -855,7 +859,10 @@ public:
private:
bool temp_created;
coll_t temp_coll;
+ set<hobject_t> temp_contents; ///< contents of temp collection, clear on reset
+ uint64_t temp_seq; ///< last id for naming temp objects
coll_t get_temp_coll(ObjectStore::Transaction *t);
+ hobject_t generate_temp_object(); ///< generate a new temp object name
public:
bool have_temp_coll();
coll_t get_temp_coll() {
@@ -932,6 +939,9 @@ public:
bool is_degraded_object(const hobject_t& oid);
void wait_for_degraded_object(const hobject_t& oid, OpRequestRef op);
+ void wait_for_blocked_object(const hobject_t& soid, OpRequestRef op);
+ void kick_object_context_blocked(ObjectContextRef obc);
+
void mark_all_unfound_lost(int what);
eversion_t pick_newest_available(const hobject_t& oid);
ObjectContextRef mark_object_lost(ObjectStore::Transaction *t,
diff --git a/src/osd/osd_types.cc b/src/osd/osd_types.cc
index e94fd02a5ad..aa20dc592fa 100644
--- a/src/osd/osd_types.cc
+++ b/src/osd/osd_types.cc
@@ -393,6 +393,8 @@ ostream& operator<<(ostream& out, const pg_t &pg)
// -- coll_t --
+const coll_t coll_t::META_COLL("meta");
+
bool coll_t::is_temp(pg_t& pgid) const
{
const char *cstr(str.c_str());
diff --git a/src/osd/osd_types.h b/src/osd/osd_types.h
index 901d9dbb488..091b2b95e8f 100644
--- a/src/osd/osd_types.h
+++ b/src/osd/osd_types.h
@@ -2197,6 +2197,11 @@ public:
if (destructor_callback)
destructor_callback->complete(0);
}
+
+ bool is_blocked() const {
+ return copyfrom_readside > 0;
+ }
+
// do simple synchronous mutual exclusion, for now. now waitqueues or anything fancy.
void ondisk_write_lock() {
lock.Lock();
diff --git a/src/test/filestore/store_test.cc b/src/test/filestore/store_test.cc
index 80c775052ec..92104960127 100644
--- a/src/test/filestore/store_test.cc
+++ b/src/test/filestore/store_test.cc
@@ -898,6 +898,65 @@ TEST_F(StoreTest, TwoHash) {
ASSERT_EQ(r, 0);
}
+TEST_F(StoreTest, MoveRename) {
+ coll_t temp_cid("mytemp");
+ hobject_t temp_oid("tmp_oid", "", CEPH_NOSNAP, 0, 0, "");
+ coll_t cid("dest");
+ hobject_t oid("dest_oid", "", CEPH_NOSNAP, 0, 0, "");
+ int r;
+ {
+ ObjectStore::Transaction t;
+ t.create_collection(cid);
+ t.touch(cid, oid);
+ r = store->apply_transaction(t);
+ ASSERT_EQ(r, 0);
+ }
+ ASSERT_TRUE(store->exists(cid, oid));
+ bufferlist data, attr;
+ map<string, bufferlist> omap;
+ data.append("data payload");
+ attr.append("attr value");
+ omap["omap_key"].append("omap value");
+ {
+ ObjectStore::Transaction t;
+ t.create_collection(temp_cid);
+ t.touch(temp_cid, temp_oid);
+ t.write(temp_cid, temp_oid, 0, data.length(), data);
+ t.setattr(temp_cid, temp_oid, "attr", attr);
+ t.omap_setkeys(temp_cid, temp_oid, omap);
+ r = store->apply_transaction(t);
+ ASSERT_EQ(r, 0);
+ }
+ ASSERT_TRUE(store->exists(temp_cid, temp_oid));
+ {
+ ObjectStore::Transaction t;
+ t.remove(cid, oid);
+ t.collection_move_rename(temp_cid, temp_oid, cid, oid);
+ r = store->apply_transaction(t);
+ ASSERT_EQ(r, 0);
+ }
+ ASSERT_TRUE(store->exists(cid, oid));
+ ASSERT_FALSE(store->exists(temp_cid, temp_oid));
+ {
+ bufferlist newdata;
+ r = store->read(cid, oid, 0, 1000, newdata);
+ ASSERT_GE(r, 0);
+ ASSERT_TRUE(newdata.contents_equal(data));
+ bufferlist newattr;
+ r = store->getattr(cid, oid, "attr", newattr);
+ ASSERT_GE(r, 0);
+ ASSERT_TRUE(newattr.contents_equal(attr));
+ set<string> keys;
+ keys.insert("omap_key");
+ map<string, bufferlist> newomap;
+ r = store->omap_get_values(cid, oid, keys, &newomap);
+ ASSERT_GE(r, 0);
+ ASSERT_EQ(1u, newomap.size());
+ ASSERT_TRUE(newomap.count("omap_key"));
+ ASSERT_TRUE(newomap["omap_key"].contents_equal(omap["omap_key"]));
+ }
+}
+
//
// support tests for qa/workunits/filestore/filestore.sh
//
diff --git a/src/test/librados/misc.cc b/src/test/librados/misc.cc
index af17847aeab..803c8b1cc77 100644
--- a/src/test/librados/misc.cc
+++ b/src/test/librados/misc.cc
@@ -581,18 +581,33 @@ TEST(LibRadosMisc, CopyPP) {
ASSERT_EQ(0, ioctx.write_full("foo", blc));
ASSERT_EQ(0, ioctx.setxattr("foo", "myattr", xc));
- ObjectWriteOperation op;
- op.copy_from("foo", ioctx, ioctx.get_last_version());
- ASSERT_EQ(0, ioctx.operate("foo.copy", &op));
+ {
+ ObjectWriteOperation op;
+ op.copy_from("foo", ioctx, ioctx.get_last_version());
+ ASSERT_EQ(0, ioctx.operate("foo.copy", &op));
+
+ bufferlist bl2, x2;
+ ASSERT_EQ((int)bl.length(), ioctx.read("foo.copy", bl2, 10000, 0));
+ ASSERT_TRUE(bl.contents_equal(bl2));
+ ASSERT_EQ((int)x.length(), ioctx.getxattr("foo.copy", "myattr", x2));
+ ASSERT_TRUE(x.contents_equal(x2));
+ }
- bufferlist bl2, x2;
- ASSERT_EQ((int)bl.length(), ioctx.read("foo.copy", bl2, 10000, 0));
- ASSERT_TRUE(bl.contents_equal(bl2));
- ASSERT_EQ((int)x.length(), ioctx.getxattr("foo.copy", "myattr", x2));
- ASSERT_TRUE(x.contents_equal(x2));
+ // small object without a version
+ {
+ ObjectWriteOperation op;
+ op.copy_from("foo", ioctx, 0);
+ ASSERT_EQ(0, ioctx.operate("foo.copy2", &op));
+
+ bufferlist bl2, x2;
+ ASSERT_EQ((int)bl.length(), ioctx.read("foo.copy2", bl2, 10000, 0));
+ ASSERT_TRUE(bl.contents_equal(bl2));
+ ASSERT_EQ((int)x.length(), ioctx.getxattr("foo.copy2", "myattr", x2));
+ ASSERT_TRUE(x.contents_equal(x2));
+ }
// do a big object
- bl.append(buffer::create(8000000));
+ bl.append(buffer::create(g_conf->osd_copyfrom_max_chunk * 3));
bl.zero();
bl.append("tail");
blc = bl;
@@ -600,15 +615,29 @@ TEST(LibRadosMisc, CopyPP) {
ASSERT_EQ(0, ioctx.write_full("big", blc));
ASSERT_EQ(0, ioctx.setxattr("big", "myattr", xc));
- ObjectWriteOperation op2;
- op.copy_from("big", ioctx, ioctx.get_last_version());
- ASSERT_EQ(0, ioctx.operate("big.copy", &op));
+ {
+ ObjectWriteOperation op;
+ op.copy_from("big", ioctx, ioctx.get_last_version());
+ ASSERT_EQ(0, ioctx.operate("big.copy", &op));
+
+ bufferlist bl2, x2;
+ ASSERT_EQ((int)bl.length(), ioctx.read("big.copy", bl2, bl.length(), 0));
+ ASSERT_TRUE(bl.contents_equal(bl2));
+ ASSERT_EQ((int)x.length(), ioctx.getxattr("foo.copy", "myattr", x2));
+ ASSERT_TRUE(x.contents_equal(x2));
+ }
- bl2.clear();
- ASSERT_EQ((int)bl.length(), ioctx.read("big.copy", bl2, bl.length(), 0));
- ASSERT_TRUE(bl.contents_equal(bl2));
- ASSERT_EQ((int)x.length(), ioctx.getxattr("foo.copy", "myattr", x2));
- ASSERT_TRUE(x.contents_equal(x2));
+ {
+ ObjectWriteOperation op;
+ op.copy_from("big", ioctx, 0);
+ ASSERT_EQ(0, ioctx.operate("big.copy2", &op));
+
+ bufferlist bl2, x2;
+ ASSERT_EQ((int)bl.length(), ioctx.read("big.copy2", bl2, bl.length(), 0));
+ ASSERT_TRUE(bl.contents_equal(bl2));
+ ASSERT_EQ((int)x.length(), ioctx.getxattr("foo.copy2", "myattr", x2));
+ ASSERT_TRUE(x.contents_equal(x2));
+ }
ioctx.close();
ASSERT_EQ(0, destroy_one_pool_pp(pool_name, cluster));
diff --git a/src/test/osd/RadosModel.h b/src/test/osd/RadosModel.h
index 4f1aa4c3582..3a73ac33faf 100644
--- a/src/test/osd/RadosModel.h
+++ b/src/test/osd/RadosModel.h
@@ -83,14 +83,16 @@ public:
class TestOp {
public:
+ int num;
RadosTestContext *context;
TestOpStat *stat;
bool done;
- TestOp(RadosTestContext *context,
- TestOpStat *stat = 0) :
- context(context),
- stat(stat),
- done(0)
+ TestOp(int n, RadosTestContext *context,
+ TestOpStat *stat = 0)
+ : num(n),
+ context(context),
+ stat(stat),
+ done(0)
{}
virtual ~TestOp() {};
@@ -230,6 +232,7 @@ public:
for (list<TestOp*>::iterator i = inflight.begin();
i != inflight.end();) {
if ((*i)->finished()) {
+ cout << (*i)->num << ": done (" << (inflight.size()-1) << " left)" << std::endl;
delete *i;
inflight.erase(i++);
} else {
@@ -238,7 +241,7 @@ public:
}
if (inflight.size() >= (unsigned) max_in_flight || (!next && !inflight.empty())) {
- cout << "Waiting on " << inflight.size() << std::endl;
+ cout << " waiting on " << inflight.size() << std::endl;
wait();
} else {
break;
@@ -488,11 +491,11 @@ public:
librados::ObjectWriteOperation op;
librados::AioCompletion *comp;
bool done;
- RemoveAttrsOp(RadosTestContext *context,
+ RemoveAttrsOp(int n, RadosTestContext *context,
const string &oid,
- TestOpStat *stat) :
- TestOp(context, stat), oid(oid), comp(NULL), done(false)
- {}
+ TestOpStat *stat)
+ : TestOp(n, context, stat), oid(oid), comp(NULL), done(false)
+ {}
void _begin()
{
@@ -577,11 +580,12 @@ public:
librados::ObjectWriteOperation op;
librados::AioCompletion *comp;
bool done;
- TmapPutOp(RadosTestContext *context,
- const string &oid,
- TestOpStat *stat) :
- TestOp(context, stat), oid(oid), comp(NULL), done(false)
- {}
+ TmapPutOp(int n,
+ RadosTestContext *context,
+ const string &oid,
+ TestOpStat *stat)
+ : TestOp(n, context, stat), oid(oid), comp(NULL), done(false)
+ {}
void _begin()
{
@@ -647,7 +651,7 @@ public:
assert(0);
}
done = true;
- context->update_object_version(oid, comp->get_version());
+ context->update_object_version(oid, comp->get_version64());
context->oid_in_use.erase(oid);
context->oid_not_in_use.insert(oid);
context->kick();
@@ -670,11 +674,13 @@ public:
librados::ObjectWriteOperation op;
librados::AioCompletion *comp;
bool done;
- SetAttrsOp(RadosTestContext *context,
- const string &oid,
- TestOpStat *stat) :
- TestOp(context, stat), oid(oid), comp(NULL), done(false)
- {}
+ SetAttrsOp(int n,
+ RadosTestContext *context,
+ const string &oid,
+ TestOpStat *stat)
+ : TestOp(n, context, stat),
+ oid(oid), comp(NULL), done(false)
+ {}
void _begin()
{
@@ -738,7 +744,7 @@ public:
assert(0);
}
done = true;
- context->update_object_version(oid, comp->get_version());
+ context->update_object_version(oid, comp->get_version64());
context->oid_in_use.erase(oid);
context->oid_not_in_use.insert(oid);
context->kick();
@@ -763,11 +769,12 @@ public:
uint64_t waiting_on;
uint64_t last_acked_tid;
- WriteOp(RadosTestContext *context,
+ WriteOp(int n,
+ RadosTestContext *context,
const string &oid,
- TestOpStat *stat = 0) :
- TestOp(context, stat),
- oid(oid), waiting_on(0), last_acked_tid(0)
+ TestOpStat *stat = 0)
+ : TestOp(n, context, stat),
+ oid(oid), waiting_on(0), last_acked_tid(0)
{}
void _begin()
@@ -796,16 +803,17 @@ public:
}
interval_set<uint64_t> ranges;
context->cont_gen.get_ranges(cont, ranges);
+ std::cout << num << ": seq_num " << context->seq_num << " ranges " << ranges << std::endl;
context->state_lock.Unlock();
int r = context->io_ctx.selfmanaged_snap_set_write_ctx(context->seq, snapset);
if (r) {
- cerr << "r is " << r << " snapset is " << snapset << " seq is " << context->seq << std::endl;
+ cerr << " r is " << r << " snapset is " << snapset << " seq is " << context->seq << std::endl;
assert(0);
}
waiting_on = ranges.num_intervals();
- cout << "waiting_on = " << waiting_on << std::endl;
+ //cout << " waiting_on = " << waiting_on << std::endl;
ContentsGenerator::iterator gen_pos = context->cont_gen.get_iterator(cont);
uint64_t tid = 1;
for (interval_set<uint64_t>::iterator i = ranges.begin();
@@ -818,9 +826,8 @@ public:
}
assert(to_write.length() == i.get_len());
assert(to_write.length() > 0);
- std::cout << "Writing " << context->prefix+oid << " from " << i.get_start()
- << " to " << i.get_len() + i.get_start() << " tid " << tid
- << " ranges are " << ranges << std::endl;
+ std::cout << num << ": writing " << context->prefix+oid << " from " << i.get_start()
+ << " to " << i.get_len() + i.get_start() << " tid " << tid << std::endl;
pair<TestOp*, TestOp::CallbackInfo*> *cb_arg =
new pair<TestOp*, TestOp::CallbackInfo*>(this,
new TestOp::CallbackInfo(tid));
@@ -838,7 +845,7 @@ public:
context->state_lock.Lock();
uint64_t tid = info->id;
- cout << "finishing write tid " << tid << " to " << context->prefix + oid << std::endl;
+ cout << num << ": finishing write tid " << tid << " to " << context->prefix + oid << std::endl;
if (tid <= last_acked_tid) {
cerr << "Error: finished tid " << tid
@@ -889,10 +896,11 @@ class DeleteOp : public TestOp {
public:
string oid;
- DeleteOp(RadosTestContext *context,
+ DeleteOp(int n,
+ RadosTestContext *context,
const string &oid,
- TestOpStat *stat = 0) :
- TestOp(context, stat), oid(oid)
+ TestOpStat *stat = 0)
+ : TestOp(n, context, stat), oid(oid)
{}
void _begin()
@@ -970,16 +978,17 @@ public:
bufferlist header;
map<string, bufferlist> xattrs;
- ReadOp(RadosTestContext *context,
+ ReadOp(int n,
+ RadosTestContext *context,
const string &oid,
- TestOpStat *stat = 0) :
- TestOp(context, stat),
- completion(NULL),
- oid(oid),
- old_value(&context->cont_gen),
- snap(0),
- retval(0),
- attrretval(0)
+ TestOpStat *stat = 0)
+ : TestOp(n, context, stat),
+ completion(NULL),
+ oid(oid),
+ old_value(&context->cont_gen),
+ snap(0),
+ retval(0),
+ attrretval(0)
{}
void _begin()
@@ -1002,9 +1011,9 @@ public:
if (ctx) {
assert(old_value.exists);
TestAlarm alarm;
- std::cerr << "about to start" << std::endl;
+ std::cerr << num << ": about to start" << std::endl;
ctx->start();
- std::cerr << "started" << std::endl;
+ std::cerr << num << ": started" << std::endl;
bufferlist bl;
context->io_ctx.set_notify_timeout(600);
int r = context->io_ctx.notify(context->prefix+oid, 0, bl);
@@ -1012,7 +1021,7 @@ public:
std::cerr << "r is " << r << std::endl;
assert(0);
}
- std::cerr << "notified, waiting" << std::endl;
+ std::cerr << num << ": notified, waiting" << std::endl;
ctx->wait();
}
if (snap >= 0) {
@@ -1059,7 +1068,7 @@ public:
uint64_t version = completion->get_version64();
if (int err = completion->get_return_value()) {
if (!(err == -ENOENT && old_value.deleted())) {
- cerr << "Error: oid " << oid << " read returned error code "
+ cerr << num << ": Error: oid " << oid << " read returned error code "
<< err << std::endl;
}
} else {
@@ -1068,16 +1077,16 @@ public:
ContDesc to_check;
bufferlist::iterator p = result.begin();
if (!context->cont_gen.read_header(p, to_check)) {
- cerr << "Unable to decode oid " << oid << " at snap " << context->current_snap << std::endl;
+ cerr << num << ": Unable to decode oid " << oid << " at snap " << context->current_snap << std::endl;
context->errors++;
}
if (to_check != old_value.most_recent()) {
- cerr << "Found incorrect object contents " << to_check
+ cerr << num << ": Found incorrect object contents " << to_check
<< ", expected " << old_value.most_recent() << " oid " << oid << std::endl;
context->errors++;
}
if (!old_value.check(result)) {
- cerr << "Object " << oid << " contents " << to_check << " corrupt" << std::endl;
+ cerr << num << ": oid " << oid << " contents " << to_check << " corrupt" << std::endl;
context->errors++;
}
if (context->errors) assert(0);
@@ -1085,28 +1094,28 @@ public:
// Attributes
if (!(old_value.header == header)) {
- cerr << "oid: " << oid << " header does not match, old size: "
+ cerr << num << ": oid " << oid << " header does not match, old size: "
<< old_value.header.length() << " new size " << header.length()
<< std::endl;
assert(old_value.header == header);
}
if (omap.size() != old_value.attrs.size()) {
- cerr << "oid: " << oid << " tmap.size() is " << omap.size()
+ cerr << num << ": oid " << oid << " tmap.size() is " << omap.size()
<< " and old is " << old_value.attrs.size() << std::endl;
assert(omap.size() == old_value.attrs.size());
}
if (omap_keys.size() != old_value.attrs.size()) {
- cerr << "oid: " << oid << " tmap.size() is " << omap_keys.size()
+ cerr << num << ": oid " << oid << " tmap.size() is " << omap_keys.size()
<< " and old is " << old_value.attrs.size() << std::endl;
assert(omap_keys.size() == old_value.attrs.size());
}
if (xattrs.size() != old_value.attrs.size()) {
- cerr << "oid: " << oid << " xattrs.size() is " << xattrs.size()
+ cerr << num << ": oid " << oid << " xattrs.size() is " << xattrs.size()
<< " and old is " << old_value.attrs.size() << std::endl;
assert(xattrs.size() == old_value.attrs.size());
}
if (version != old_value.version) {
- cerr << "oid: " << oid << " version is " << version
+ cerr << num << ": oid " << oid << " version is " << version
<< " and expected " << old_value.version << std::endl;
assert(version == old_value.version);
}
@@ -1163,9 +1172,10 @@ public:
class SnapCreateOp : public TestOp {
public:
- SnapCreateOp(RadosTestContext *context,
- TestOpStat *stat = 0) :
- TestOp(context, stat)
+ SnapCreateOp(int n,
+ RadosTestContext *context,
+ TestOpStat *stat = 0)
+ : TestOp(n, context, stat)
{}
void _begin()
@@ -1201,11 +1211,11 @@ public:
class SnapRemoveOp : public TestOp {
public:
int to_remove;
- SnapRemoveOp(RadosTestContext *context,
+ SnapRemoveOp(int n, RadosTestContext *context,
int snap,
- TestOpStat *stat = 0) :
- TestOp(context, stat),
- to_remove(snap)
+ TestOpStat *stat = 0)
+ : TestOp(n, context, stat),
+ to_remove(snap)
{}
void _begin()
@@ -1241,11 +1251,12 @@ public:
class WatchOp : public TestOp {
string oid;
public:
- WatchOp(RadosTestContext *context,
- const string &_oid,
- TestOpStat *stat = 0) :
- TestOp(context, stat),
- oid(_oid)
+ WatchOp(int n,
+ RadosTestContext *context,
+ const string &_oid,
+ TestOpStat *stat = 0)
+ : TestOp(n, context, stat),
+ oid(_oid)
{}
void _begin()
@@ -1318,13 +1329,14 @@ public:
librados::ObjectWriteOperation op;
librados::AioCompletion *comp;
- RollbackOp(RadosTestContext *context,
+ RollbackOp(int n,
+ RadosTestContext *context,
const string &_oid,
int snap,
- TestOpStat *stat = 0) :
- TestOp(context, stat),
- oid(_oid),
- roll_back_to(snap), done(false)
+ TestOpStat *stat = 0)
+ : TestOp(n, context, stat),
+ oid(_oid),
+ roll_back_to(snap), done(false)
{}
void _begin()
@@ -1369,7 +1381,7 @@ public:
assert(0);
}
done = true;
- context->update_object_version(oid, comp->get_version());
+ context->update_object_version(oid, comp->get_version64());
context->oid_in_use.erase(oid);
context->oid_not_in_use.insert(oid);
context->kick();
@@ -1392,16 +1404,20 @@ public:
ObjectDesc src_value;
librados::ObjectWriteOperation op;
librados::AioCompletion *comp;
+ librados::AioCompletion *comp_racing_read;
int snap;
- bool done;
- tid_t tid;
- CopyFromOp(RadosTestContext *context,
+ int done;
+ uint64_t version;
+ int r;
+ CopyFromOp(int n,
+ RadosTestContext *context,
const string &oid,
const string &oid_src,
TestOpStat *stat)
- : TestOp(context, stat), oid(oid), oid_src(oid_src),
+ : TestOp(n, context, stat),
+ oid(oid), oid_src(oid_src),
src_value(&context->cont_gen),
- comp(NULL), done(false), tid(0)
+ comp(NULL), done(0), version(0), r(0)
{}
void _begin()
@@ -1433,35 +1449,67 @@ public:
new TestOp::CallbackInfo(0));
comp = context->rados.aio_create_completion((void*) cb_arg, &write_callback,
NULL);
- tid = context->io_ctx.aio_operate(context->prefix+oid, comp, &op);
+ context->io_ctx.aio_operate(context->prefix+oid, comp, &op);
+
+ // queue up a racing read, too.
+ pair<TestOp*, TestOp::CallbackInfo*> *read_cb_arg =
+ new pair<TestOp*, TestOp::CallbackInfo*>(this,
+ new TestOp::CallbackInfo(1));
+ comp_racing_read = context->rados.aio_create_completion((void*) read_cb_arg, &write_callback,
+ NULL);
+ context->io_ctx.aio_stat(context->prefix+oid, comp_racing_read, NULL, NULL);
}
void _finish(CallbackInfo *info)
{
Mutex::Locker l(context->state_lock);
- done = true;
- int r;
- assert(comp->is_complete());
- cout << "finishing copy_from tid " << tid << " to " << context->prefix + oid << std::endl;
- if ((r = comp->get_return_value())) {
- if (!(r == -ENOENT && src_value.deleted())) {
- cerr << "Error: oid " << oid << " copy_from " << oid_src << " returned error code "
- << r << std::endl;
+
+ // note that the read can (and atm will) come back before the
+ // write reply, but will reflect the update and the versions will
+ // match.
+
+ if (info->id == 0) {
+ // copy_from
+ assert(comp->is_complete());
+ cout << num << ": finishing copy_from to " << context->prefix + oid << std::endl;
+ if ((r = comp->get_return_value())) {
+ if (!(r == -ENOENT && src_value.deleted())) {
+ cerr << "Error: oid " << oid << " copy_from " << oid_src << " returned error code "
+ << r << std::endl;
+ }
+ } else {
+ assert(!version || comp->get_version64() == version);
+ version = comp->get_version64();
+ context->update_object_full(oid, src_value);
+ context->update_object_version(oid, comp->get_version64());
}
- } else {
- context->update_object_full(oid, src_value);
- context->update_object_version(oid, comp->get_version());
+ context->oid_in_use.erase(oid_src);
+ context->oid_not_in_use.insert(oid_src);
+ context->kick();
+ } else if (info->id == 1) {
+ // racing read
+ assert(comp_racing_read->is_complete());
+ cout << num << ": finishing copy_from racing read to " << context->prefix + oid << std::endl;
+ if ((r = comp_racing_read->get_return_value())) {
+ if (!(r == -ENOENT && src_value.deleted())) {
+ cerr << "Error: oid " << oid << " copy_from " << oid_src << " returned error code "
+ << r << std::endl;
+ }
+ } else {
+ assert(comp_racing_read->get_return_value() == 0);
+ assert(!version || comp_racing_read->get_version64() == version);
+ version = comp_racing_read->get_version64();
+ }
+ context->oid_in_use.erase(oid);
+ context->oid_not_in_use.insert(oid);
+ context->kick();
}
- context->oid_in_use.erase(oid);
- context->oid_not_in_use.insert(oid);
- context->oid_in_use.erase(oid_src);
- context->oid_not_in_use.insert(oid_src);
- context->kick();
+ ++done;
}
bool finished()
{
- return done;
+ return done == 2;
}
string getType()
diff --git a/src/test/osd/TestRados.cc b/src/test/osd/TestRados.cc
index 1deee23aa2c..1b6bd073a12 100644
--- a/src/test/osd/TestRados.cc
+++ b/src/test/osd/TestRados.cc
@@ -48,8 +48,8 @@ public:
if (m_op <= m_objects) {
stringstream oid;
oid << m_op;
- cout << m_op << ": Writing initial " << oid.str() << std::endl;
- return new WriteOp(&context, oid.str());
+ cout << m_op << ": write initial oid " << oid.str() << std::endl;
+ return new WriteOp(m_op, &context, oid.str());
} else if (m_op >= m_ops) {
return NULL;
}
@@ -71,7 +71,6 @@ public:
it != m_weight_sums.end();
++it) {
if (rand_val < it->second) {
- cout << m_op << ": ";
retval = gen_op(context, it->first);
break;
}
@@ -85,37 +84,39 @@ private:
TestOp *gen_op(RadosTestContext &context, TestOpType type)
{
string oid, oid2;
- cout << "oids not in use " << context.oid_not_in_use.size() << std::endl;
+ //cout << "oids not in use " << context.oid_not_in_use.size() << std::endl;
assert(context.oid_not_in_use.size());
+
+ cout << m_op << ": ";
switch (type) {
case TEST_OP_READ:
oid = *(rand_choose(context.oid_not_in_use));
- cout << "Reading " << oid << std::endl;
- return new ReadOp(&context, oid, m_stats);
+ cout << "read oid " << oid << std::endl;
+ return new ReadOp(m_op, &context, oid, m_stats);
case TEST_OP_WRITE:
oid = *(rand_choose(context.oid_not_in_use));
- cout << "Writing " << oid << " current snap is "
+ cout << "write oid " << oid << " current snap is "
<< context.current_snap << std::endl;
- return new WriteOp(&context, oid, m_stats);
+ return new WriteOp(m_op, &context, oid, m_stats);
case TEST_OP_DELETE:
oid = *(rand_choose(context.oid_not_in_use));
- cout << "Deleting " << oid << " current snap is "
+ cout << "delete oid " << oid << " current snap is "
<< context.current_snap << std::endl;
- return new DeleteOp(&context, oid, m_stats);
+ return new DeleteOp(m_op, &context, oid, m_stats);
case TEST_OP_SNAP_CREATE:
- cout << "Snapping" << std::endl;
- return new SnapCreateOp(&context, m_stats);
+ cout << "snap_create" << std::endl;
+ return new SnapCreateOp(m_op, &context, m_stats);
case TEST_OP_SNAP_REMOVE:
if (context.snaps.empty()) {
return NULL;
} else {
int snap = rand_choose(context.snaps)->first;
- cout << "RemovingSnap " << snap << std::endl;
- return new SnapRemoveOp(&context, snap, m_stats);
+ cout << "snap_remove snap " << snap << std::endl;
+ return new SnapRemoveOp(m_op, &context, snap, m_stats);
}
case TEST_OP_ROLLBACK:
@@ -124,40 +125,40 @@ private:
} else {
int snap = rand_choose(context.snaps)->first;
string oid = *(rand_choose(context.oid_not_in_use));
- cout << "RollingBack " << oid << " to " << snap << std::endl;
- return new RollbackOp(&context, oid, snap);
+ cout << "rollback oid " << oid << " to " << snap << std::endl;
+ return new RollbackOp(m_op, &context, oid, snap);
}
case TEST_OP_SETATTR:
oid = *(rand_choose(context.oid_not_in_use));
- cout << "Setting attrs on " << oid
+ cout << "setattr oid " << oid
<< " current snap is " << context.current_snap << std::endl;
- return new SetAttrsOp(&context, oid, m_stats);
+ return new SetAttrsOp(m_op, &context, oid, m_stats);
case TEST_OP_RMATTR:
oid = *(rand_choose(context.oid_not_in_use));
- cout << "Removing attrs on " << oid
+ cout << "rmattr oid " << oid
<< " current snap is " << context.current_snap << std::endl;
- return new RemoveAttrsOp(&context, oid, m_stats);
+ return new RemoveAttrsOp(m_op, &context, oid, m_stats);
case TEST_OP_TMAPPUT:
oid = *(rand_choose(context.oid_not_in_use));
- cout << "Setting tmap on " << oid
+ cout << "tmapput oid " << oid
<< " current snap is " << context.current_snap << std::endl;
- return new TmapPutOp(&context, oid, m_stats);
+ return new TmapPutOp(m_op, &context, oid, m_stats);
case TEST_OP_WATCH:
oid = *(rand_choose(context.oid_not_in_use));
- cout << "Watching " << oid
+ cout << "watch oid " << oid
<< " current snap is " << context.current_snap << std::endl;
- return new WatchOp(&context, oid, m_stats);
+ return new WatchOp(m_op, &context, oid, m_stats);
case TEST_OP_COPY_FROM:
oid = *(rand_choose(context.oid_not_in_use));
oid2 = *(rand_choose(context.oid_not_in_use));
- cout << "copy_from " << oid << " from " << oid2
+ cout << "copy_from oid " << oid << " from oid " << oid2
<< " current snap is " << context.current_snap << std::endl;
- return new CopyFromOp(&context, oid, oid2, m_stats);
+ return new CopyFromOp(m_op, &context, oid, oid2, m_stats);
default:
cerr << "Invalid op type " << type << std::endl;