summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSage Weil <sage@inktank.com>2013-10-15 10:27:56 -0700
committerSage Weil <sage@inktank.com>2013-10-15 10:27:56 -0700
commit7f7f760890867648499827e412acae61efa35909 (patch)
tree168097b426a26ed4f77be35b7d6059b670d722f9
parent0ad33423f0e6e6823c805d90a531535e04386519 (diff)
parentb0f49e0fba34faf37e8fe6fc2c8f2350b1ab1d23 (diff)
downloadceph-7f7f760890867648499827e412acae61efa35909.tar.gz
Merge pull request #706 from ceph/wip-6059
Wip 6059 Partly-reviewed-by: Greg Farnum <greg@inktank.com> Partly-reviewed-by: Sage Weil <sage@inktank.com>
-rw-r--r--src/osd/PG.h4
-rw-r--r--src/osd/ReplicatedPG.cc88
-rw-r--r--src/osd/ReplicatedPG.h167
-rw-r--r--src/test/osd/RadosModel.h26
4 files changed, 245 insertions, 40 deletions
diff --git a/src/osd/PG.h b/src/osd/PG.h
index 275d30c7658..9b42ff4272b 100644
--- a/src/osd/PG.h
+++ b/src/osd/PG.h
@@ -449,9 +449,7 @@ protected:
/// clear content
void clear() {
- objects.clear();
- begin = end = hobject_t();
- version = eversion_t();
+ *this = BackfillInterval();
}
void reset(hobject_t start) {
diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc
index f466eb8ccdc..c4dccf68442 100644
--- a/src/osd/ReplicatedPG.cc
+++ b/src/osd/ReplicatedPG.cc
@@ -988,21 +988,8 @@ void ReplicatedPG::do_op(OpRequestRef op)
return;
}
- if ((op->may_read()) && (obc->obs.oi.is_lost())) {
- // This object is lost. Reading from it returns an error.
- dout(20) << __func__ << ": object " << obc->obs.oi.soid
- << " is lost" << dendl;
- osd->reply_op_error(op, -ENFILE);
- return;
- }
dout(25) << __func__ << ": object " << obc->obs.oi.soid
<< " has oi of " << obc->obs.oi << dendl;
-
- if (!op->may_write() && (!obc->obs.exists ||
- obc->obs.oi.is_whiteout())) {
- osd->reply_op_error(op, -ENOENT);
- return;
- }
// are writes blocked by another object?
if (obc->blocked_by) {
@@ -1126,11 +1113,31 @@ void ReplicatedPG::do_op(OpRequestRef op)
}
}
- op->mark_started();
-
OpContext *ctx = new OpContext(op, m->get_reqid(), m->ops,
&obc->obs, obc->ssc,
this);
+ if (!get_rw_locks(ctx)) {
+ op->mark_delayed("waiting for rw locks");
+ close_op_ctx(ctx);
+ return;
+ }
+
+ if ((op->may_read()) && (obc->obs.oi.is_lost())) {
+ // This object is lost. Reading from it returns an error.
+ dout(20) << __func__ << ": object " << obc->obs.oi.soid
+ << " is lost" << dendl;
+ close_op_ctx(ctx);
+ osd->reply_op_error(op, -ENFILE);
+ return;
+ }
+ if (!op->may_write() && (!obc->obs.exists ||
+ obc->obs.oi.is_whiteout())) {
+ close_op_ctx(ctx);
+ osd->reply_op_error(op, -ENOENT);
+ return;
+ }
+
+ op->mark_started();
ctx->obc = obc;
ctx->src_obc = src_obc;
@@ -1207,7 +1214,7 @@ void ReplicatedPG::execute_ctx(OpContext *ctx)
if (already_complete(oldv)) {
reply_ctx(ctx, 0, oldv, entry->user_version);
} else {
- delete ctx;
+ close_op_ctx(ctx);
if (m->wants_ack()) {
if (already_ack(oldv)) {
@@ -1300,7 +1307,7 @@ void ReplicatedPG::execute_ctx(OpContext *ctx)
if (result == -EAGAIN) {
// clean up after the ctx
- delete ctx;
+ close_op_ctx(ctx);
return;
}
@@ -1352,7 +1359,7 @@ void ReplicatedPG::execute_ctx(OpContext *ctx)
reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
osd->send_message_osd_client(reply, m->get_connection());
- delete ctx;
+ close_op_ctx(ctx);
return;
}
@@ -1400,13 +1407,13 @@ void ReplicatedPG::execute_ctx(OpContext *ctx)
void ReplicatedPG::reply_ctx(OpContext *ctx, int r)
{
osd->reply_op_error(ctx->op, r);
- delete ctx;
+ close_op_ctx(ctx);
}
void ReplicatedPG::reply_ctx(OpContext *ctx, int r, eversion_t v, version_t uv)
{
osd->reply_op_error(ctx->op, r, v, uv);
- delete ctx;
+ close_op_ctx(ctx);
}
void ReplicatedPG::log_op_stats(OpContext *ctx)
@@ -1542,11 +1549,14 @@ void ReplicatedPG::do_scan(
}
BackfillInterval bi;
- osr->flush();
bi.begin = m->begin;
+ // No need to flush, there won't be any in progress writes occuring
+ // past m->begin
scan_range(
cct->_conf->osd_backfill_scan_min,
- cct->_conf->osd_backfill_scan_max, &bi, handle);
+ cct->_conf->osd_backfill_scan_max,
+ &bi,
+ handle);
MOSDPGScan *reply = new MOSDPGScan(MOSDPGScan::OP_SCAN_DIGEST,
get_osdmap()->get_epoch(), m->query_epoch,
info.pgid, bi.begin, bi.end);
@@ -4724,6 +4734,8 @@ void ReplicatedPG::eval_repop(RepGather *repop)
// ondisk?
if (repop->waitfor_disk.empty()) {
+ release_op_ctx_locks(repop->ctx);
+
log_op_stats(repop->ctx);
publish_stats_to_osd();
@@ -4929,6 +4941,7 @@ ReplicatedPG::RepGather *ReplicatedPG::new_repop(OpContext *ctx, ObjectContextRe
void ReplicatedPG::remove_repop(RepGather *repop)
{
+ release_op_ctx_locks(repop->ctx);
repop_map.erase(repop->rep_tid);
repop->put();
@@ -7920,9 +7933,6 @@ int ReplicatedPG::recover_backfill(
<< " interval " << pbi.begin << "-" << pbi.end
<< " " << pbi.objects.size() << " objects" << dendl;
- int local_min = cct->_conf->osd_backfill_scan_min;
- int local_max = cct->_conf->osd_backfill_scan_max;
-
// update our local interval to cope with recent changes
backfill_info.begin = backfill_pos;
update_range(&backfill_info, handle);
@@ -7938,10 +7948,11 @@ int ReplicatedPG::recover_backfill(
while (ops < max) {
if (backfill_info.begin <= pbi.begin &&
!backfill_info.extends_to_end() && backfill_info.empty()) {
- osr->flush();
- backfill_info.begin = backfill_info.end;
- scan_range(local_min, local_max, &backfill_info,
- handle);
+ hobject_t next = backfill_info.end;
+ backfill_info.clear();
+ backfill_info.begin = next;
+ backfill_info.end = hobject_t::get_max();
+ update_range(&backfill_info, handle);
backfill_info.trim();
}
backfill_pos = backfill_info.begin > pbi.begin ? pbi.begin : backfill_info.begin;
@@ -8118,6 +8129,19 @@ void ReplicatedPG::update_range(
{
int local_min = cct->_conf->osd_backfill_scan_min;
int local_max = cct->_conf->osd_backfill_scan_max;
+
+ if (bi->version < info.log_tail) {
+ dout(10) << __func__<< ": bi is old, rescanning local backfill_info"
+ << dendl;
+ if (last_update_applied >= info.log_tail) {
+ bi->version = last_update_applied;
+ } else {
+ osr->flush();
+ bi->version = info.last_update;
+ }
+ scan_range(local_min, local_max, bi, handle);
+ }
+
if (bi->version >= info.last_update) {
dout(10) << __func__<< ": bi is current " << dendl;
assert(bi->version == info.last_update);
@@ -8157,10 +8181,7 @@ void ReplicatedPG::update_range(
}
bi->version = info.last_update;
} else {
- dout(10) << __func__<< ": bi is old, rescanning local backfill_info"
- << dendl;
- osr->flush();
- scan_range(local_min, local_max, &backfill_info, handle);
+ assert(0 == "scan_range should have raised bi->version past log_tail");
}
}
@@ -8170,7 +8191,6 @@ void ReplicatedPG::scan_range(
{
assert(is_locked());
dout(10) << "scan_range from " << bi->begin << dendl;
- bi->version = info.last_update;
bi->objects.clear(); // for good measure
vector<hobject_t> ls;
diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h
index 27c9d1bb605..1292780d044 100644
--- a/src/osd/ReplicatedPG.h
+++ b/src/osd/ReplicatedPG.h
@@ -183,7 +183,7 @@ public:
if (r != -ECANCELED) { // on cancel just toss it out; client resends
ctx->pg->osd->reply_op_error(ctx->op, r);
}
- delete ctx;
+ ctx->pg->close_op_ctx(ctx);
}
}
@@ -374,6 +374,8 @@ public:
hobject_t new_temp_oid, discard_temp_oid; ///< temp objects we should start/stop tracking
+ enum { W_LOCK, R_LOCK, NONE } lock_to_release;
+
OpContext(const OpContext& other);
const OpContext& operator=(const OpContext& other);
@@ -388,7 +390,8 @@ public:
data_off(0), reply(NULL), pg(_pg),
num_read(0),
num_write(0),
- copy_cb(NULL) {
+ copy_cb(NULL),
+ lock_to_release(NONE) {
if (_ssc) {
new_snapset = _ssc->snapset;
snapset = &_ssc->snapset;
@@ -396,6 +399,7 @@ public:
}
~OpContext() {
assert(!clone_obc);
+ assert(lock_to_release == NONE);
if (reply)
reply->put();
}
@@ -454,7 +458,7 @@ public:
if (--nref == 0) {
assert(!obc);
assert(src_obc.empty());
- delete ctx;
+ delete ctx; // must already be unlocked
delete this;
//generic_dout(0) << "deleting " << this << dendl;
}
@@ -465,6 +469,163 @@ public:
protected:
+ /// Tracks pending readers or writers on an object
+ class RWTracker {
+ struct ObjState {
+ enum State {
+ NONE,
+ READ,
+ WRITE
+ };
+ State state; /// rw state
+ uint64_t count; /// number of readers or writers
+ list<OpRequestRef> waiters; /// ops waiting on state change
+
+ ObjState() : state(NONE), count(0) {}
+ bool get_read(OpRequestRef op) {
+ // don't starve!
+ if (!waiters.empty()) {
+ waiters.push_back(op);
+ return false;
+ }
+ switch (state) {
+ case NONE:
+ assert(count == 0);
+ state = READ;
+ // fall through
+ case READ:
+ count++;
+ return true;
+ case WRITE:
+ waiters.push_back(op);
+ return false;
+ default:
+ assert(0 == "unhandled case");
+ return false;
+ }
+ }
+ bool get_write(OpRequestRef op) {
+ if (!waiters.empty()) {
+ // don't starve!
+ waiters.push_back(op);
+ return false;
+ }
+ switch (state) {
+ case NONE:
+ assert(count == 0);
+ state = WRITE;
+ // fall through
+ case WRITE:
+ count++;
+ return true;
+ case READ:
+ waiters.push_back(op);
+ return false;
+ default:
+ assert(0 == "unhandled case");
+ return false;
+ }
+ }
+ void dec(list<OpRequestRef> *requeue) {
+ assert(count > 0);
+ assert(requeue);
+ assert(requeue->empty());
+ count--;
+ if (count == 0) {
+ state = NONE;
+ requeue->swap(waiters);
+ }
+ }
+ void put_read(list<OpRequestRef> *requeue) {
+ assert(state == READ);
+ dec(requeue);
+ }
+ void put_write(list<OpRequestRef> *requeue) {
+ assert(state == WRITE);
+ dec(requeue);
+ }
+ bool empty() const { return state == NONE; }
+ };
+ map<hobject_t, ObjState > obj_state;
+ public:
+ bool get_read(const hobject_t &hoid, OpRequestRef op) {
+ return obj_state[hoid].get_read(op);
+ }
+ bool get_write(const hobject_t &hoid, OpRequestRef op) {
+ return obj_state[hoid].get_write(op);
+ }
+ void put_read(const hobject_t &hoid, list<OpRequestRef> *to_wake) {
+ obj_state[hoid].put_read(to_wake);
+ if (obj_state[hoid].empty()) {
+ obj_state.erase(hoid);
+ }
+ }
+ void put_write(const hobject_t &hoid, list<OpRequestRef> *to_wake) {
+ obj_state[hoid].put_write(to_wake);
+ if (obj_state[hoid].empty()) {
+ obj_state.erase(hoid);
+ }
+ }
+ } rw_manager;
+
+ /**
+ * Grabs locks for OpContext, should be cleaned up in close_op_ctx
+ *
+ * @param ctx [in,out] ctx to get locks for
+ * @return true on success, false if we are queued
+ */
+ bool get_rw_locks(OpContext *ctx) {
+ if (ctx->op->may_write()) {
+ if (rw_manager.get_write(ctx->obs->oi.soid, ctx->op)) {
+ ctx->lock_to_release = OpContext::W_LOCK;
+ return true;
+ } else {
+ return false;
+ }
+ } else {
+ assert(ctx->op->may_read());
+ if (rw_manager.get_read(ctx->obs->oi.soid, ctx->op)) {
+ ctx->lock_to_release = OpContext::R_LOCK;
+ return true;
+ } else {
+ return false;
+ }
+ }
+ }
+
+ /**
+ * Cleans up OpContext
+ *
+ * @param ctx [in] ctx to clean up
+ */
+ void close_op_ctx(OpContext *ctx) {
+ release_op_ctx_locks(ctx);
+ delete ctx;
+ }
+
+ /**
+ * Releases ctx locks
+ *
+ * @param ctx [in] ctx to clean up
+ */
+ void release_op_ctx_locks(OpContext *ctx) {
+ list<OpRequestRef> to_req;
+ switch (ctx->lock_to_release) {
+ case OpContext::W_LOCK:
+ rw_manager.put_write(ctx->obs->oi.soid, &to_req);
+ break;
+ case OpContext::R_LOCK:
+ rw_manager.put_read(ctx->obs->oi.soid, &to_req);
+ break;
+ case OpContext::NONE:
+ break;
+ default:
+ assert(0);
+ };
+ ctx->lock_to_release = OpContext::NONE;
+ requeue_ops(to_req);
+ }
+
// replica ops
// [primary|tail]
xlist<RepGather*> repop_queue;
diff --git a/src/test/osd/RadosModel.h b/src/test/osd/RadosModel.h
index aba6a531c6f..ac2f336f110 100644
--- a/src/test/osd/RadosModel.h
+++ b/src/test/osd/RadosModel.h
@@ -767,9 +767,13 @@ public:
string oid;
ContDesc cont;
set<librados::AioCompletion *> waiting;
+ librados::AioCompletion *rcompletion;
uint64_t waiting_on;
uint64_t last_acked_tid;
+ librados::ObjectReadOperation read_op;
+ bufferlist rbuffer;
+
WriteOp(int n,
RadosTestContext *context,
const string &oid,
@@ -824,6 +828,21 @@ public:
context->io_ctx.aio_write(context->prefix+oid, completion,
to_write, i.get_len(), i.get_start());
}
+
+ pair<TestOp*, TestOp::CallbackInfo*> *cb_arg =
+ new pair<TestOp*, TestOp::CallbackInfo*>(
+ this,
+ new TestOp::CallbackInfo(tid));
+ rcompletion = context->rados.aio_create_completion(
+ (void*) cb_arg, &write_callback, NULL);
+ waiting_on++;
+ read_op.read(0, 1, &rbuffer, 0);
+ context->io_ctx.aio_operate(
+ context->prefix+oid, rcompletion,
+ &read_op,
+ librados::SNAP_HEAD,
+ librados::OPERATION_ORDER_READS_WRITES, // order wrt previous write/update
+ 0);
}
void _finish(CallbackInfo *info)
@@ -860,6 +879,13 @@ public:
}
context->update_object_version(oid, version);
+ if (rcompletion->get_version64() != version) {
+ cerr << "Error: racing read on " << oid << " returned version "
+ << rcompletion->get_version64() << " rather than version "
+ << version << std::endl;
+ assert(0 == "racing read got wrong version");
+ }
+ rcompletion->release();
context->oid_in_use.erase(oid);
context->oid_not_in_use.insert(oid);
context->kick();