From 2b216c3ae2f66ce7e3d62e1c7b13c96c9f7bd687 Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Thu, 3 Oct 2013 23:20:07 -0700 Subject: ReplicatedPG: block reads on an object until the write is committed Signed-off-by: Samuel Just --- src/osd/ReplicatedPG.cc | 50 ++++++++------ src/osd/ReplicatedPG.h | 175 +++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 202 insertions(+), 23 deletions(-) diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index f466eb8ccdc..781ba33ec35 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -988,21 +988,8 @@ void ReplicatedPG::do_op(OpRequestRef op) return; } - if ((op->may_read()) && (obc->obs.oi.is_lost())) { - // This object is lost. Reading from it returns an error. - dout(20) << __func__ << ": object " << obc->obs.oi.soid - << " is lost" << dendl; - osd->reply_op_error(op, -ENFILE); - return; - } dout(25) << __func__ << ": object " << obc->obs.oi.soid << " has oi of " << obc->obs.oi << dendl; - - if (!op->may_write() && (!obc->obs.exists || - obc->obs.oi.is_whiteout())) { - osd->reply_op_error(op, -ENOENT); - return; - } // are writes blocked by another object? if (obc->blocked_by) { @@ -1126,11 +1113,31 @@ void ReplicatedPG::do_op(OpRequestRef op) } } - op->mark_started(); - OpContext *ctx = new OpContext(op, m->get_reqid(), m->ops, &obc->obs, obc->ssc, this); + if (!get_rw_locks(ctx)) { + op->mark_delayed("waiting for rw locks"); + close_op_ctx(ctx); + return; + } + + if ((op->may_read()) && (obc->obs.oi.is_lost())) { + // This object is lost. Reading from it returns an error. + dout(20) << __func__ << ": object " << obc->obs.oi.soid + << " is lost" << dendl; + close_op_ctx(ctx); + osd->reply_op_error(op, -ENFILE); + return; + } + if (!op->may_write() && (!obc->obs.exists || + obc->obs.oi.is_whiteout())) { + close_op_ctx(ctx); + osd->reply_op_error(op, -ENOENT); + return; + } + + op->mark_started(); ctx->obc = obc; ctx->src_obc = src_obc; @@ -1207,7 +1214,7 @@ void ReplicatedPG::execute_ctx(OpContext *ctx) if (already_complete(oldv)) { reply_ctx(ctx, 0, oldv, entry->user_version); } else { - delete ctx; + close_op_ctx(ctx); if (m->wants_ack()) { if (already_ack(oldv)) { @@ -1300,7 +1307,7 @@ void ReplicatedPG::execute_ctx(OpContext *ctx) if (result == -EAGAIN) { // clean up after the ctx - delete ctx; + close_op_ctx(ctx); return; } @@ -1352,7 +1359,7 @@ void ReplicatedPG::execute_ctx(OpContext *ctx) reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK); osd->send_message_osd_client(reply, m->get_connection()); - delete ctx; + close_op_ctx(ctx); return; } @@ -1400,13 +1407,13 @@ void ReplicatedPG::execute_ctx(OpContext *ctx) void ReplicatedPG::reply_ctx(OpContext *ctx, int r) { osd->reply_op_error(ctx->op, r); - delete ctx; + close_op_ctx(ctx); } void ReplicatedPG::reply_ctx(OpContext *ctx, int r, eversion_t v, version_t uv) { osd->reply_op_error(ctx->op, r, v, uv); - delete ctx; + close_op_ctx(ctx); } void ReplicatedPG::log_op_stats(OpContext *ctx) @@ -4724,6 +4731,8 @@ void ReplicatedPG::eval_repop(RepGather *repop) // ondisk? if (repop->waitfor_disk.empty()) { + release_op_ctx_locks(repop->ctx); + log_op_stats(repop->ctx); publish_stats_to_osd(); @@ -4929,6 +4938,7 @@ ReplicatedPG::RepGather *ReplicatedPG::new_repop(OpContext *ctx, ObjectContextRe void ReplicatedPG::remove_repop(RepGather *repop) { + release_op_ctx_locks(repop->ctx); repop_map.erase(repop->rep_tid); repop->put(); diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index 27c9d1bb605..21cca471538 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -183,7 +183,7 @@ public: if (r != -ECANCELED) { // on cancel just toss it out; client resends ctx->pg->osd->reply_op_error(ctx->op, r); } - delete ctx; + ctx->pg->close_op_ctx(ctx); } } @@ -374,6 +374,8 @@ public: hobject_t new_temp_oid, discard_temp_oid; ///< temp objects we should start/stop tracking + enum { W_LOCK, R_LOCK, NONE } lock_to_release; + OpContext(const OpContext& other); const OpContext& operator=(const OpContext& other); @@ -388,7 +390,8 @@ public: data_off(0), reply(NULL), pg(_pg), num_read(0), num_write(0), - copy_cb(NULL) { + copy_cb(NULL), + lock_to_release(NONE) { if (_ssc) { new_snapset = _ssc->snapset; snapset = &_ssc->snapset; @@ -396,6 +399,7 @@ public: } ~OpContext() { assert(!clone_obc); + assert(lock_to_release == NONE); if (reply) reply->put(); } @@ -454,7 +458,7 @@ public: if (--nref == 0) { assert(!obc); assert(src_obc.empty()); - delete ctx; + delete ctx; // must already be unlocked delete this; //generic_dout(0) << "deleting " << this << dendl; } @@ -465,6 +469,171 @@ public: protected: + /// Tracks pending readers or writers on an object + class RWTracker { + struct ObjState { + enum State { + NONE, + READ, + WRITE + }; + State state; /// rw state + uint64_t count; /// number of readers or writers + list waiters; /// ops waiting on state change + + ObjState() : state(NONE), count(0) {} + bool get_read(OpRequestRef op) { + // don't starve! + if (!waiters.empty()) { + waiters.push_back(op); + return false; + } + switch (state) { + case NONE: + assert(count == 0); + state = READ; + // fall through + case READ: + count++; + return true; + case WRITE: + waiters.push_back(op); + return false; + default: + assert(0 == "unhandled case"); + return false; + } + } + bool get_write(OpRequestRef op) { + if (!waiters.empty()) { + // don't starve! + waiters.push_back(op); + return false; + } + switch (state) { + case NONE: + assert(count == 0); + state = WRITE; + // fall through + case WRITE: + count++; + return true; + case READ: + waiters.push_back(op); + return false; + default: + assert(0 == "unhandled case"); + return false; + } + } + void dec(list *requeue) { + assert(count > 0); + assert(requeue); + assert(requeue->empty()); + count--; + if (count == 0) { + state = NONE; + requeue->swap(waiters); + } + } + void put_read(list *requeue) { + assert(state == READ); + dec(requeue); + } + void put_write(list *requeue) { + assert(state == WRITE); + dec(requeue); + } + void clear(list *requeue) { + state = NONE; + count = 0; + assert(requeue); + assert(requeue->empty()); + requeue->swap(waiters); + } + bool empty() const { return state == NONE; } + }; + map obj_state; + public: + bool get_read(const hobject_t &hoid, OpRequestRef op) { + return obj_state[hoid].get_read(op); + } + bool get_write(const hobject_t &hoid, OpRequestRef op) { + return obj_state[hoid].get_write(op); + } + void put_read(const hobject_t &hoid, list *to_wake) { + obj_state[hoid].put_read(to_wake); + if (obj_state[hoid].empty()) { + obj_state.erase(hoid); + } + } + void put_write(const hobject_t &hoid, list *to_wake) { + obj_state[hoid].put_write(to_wake); + if (obj_state[hoid].empty()) { + obj_state.erase(hoid); + } + } + } rw_manager; + + /** + * Grabs locks for OpContext, should be cleaned up in close_op_ctx + * + * @param ctx [in,out] ctx to get locks for + * @return true on success, false if we are queued + */ + bool get_rw_locks(OpContext *ctx) { + if (ctx->op->may_write()) { + if (rw_manager.get_write(ctx->obs->oi.soid, ctx->op)) { + ctx->lock_to_release = OpContext::W_LOCK; + return true; + } else { + assert(0 == "Currently there cannot be a read in flight here"); + return false; + } + } else { + assert(ctx->op->may_read()); + if (rw_manager.get_read(ctx->obs->oi.soid, ctx->op)) { + ctx->lock_to_release = OpContext::R_LOCK; + return true; + } else { + return false; + } + } + } + + /** + * Cleans up OpContext + * + * @param ctx [in] ctx to clean up + */ + void close_op_ctx(OpContext *ctx) { + release_op_ctx_locks(ctx); + delete ctx; + } + + /** + * Releases ctx locks + * + * @param ctx [in] ctx to clean up + */ + void release_op_ctx_locks(OpContext *ctx) { + list to_req; + switch (ctx->lock_to_release) { + case OpContext::W_LOCK: + rw_manager.put_write(ctx->obs->oi.soid, &to_req); + break; + case OpContext::R_LOCK: + rw_manager.put_read(ctx->obs->oi.soid, &to_req); + break; + case OpContext::NONE: + break; + default: + assert(0); + }; + ctx->lock_to_release = OpContext::NONE; + requeue_ops(to_req); + } + // replica ops // [primary|tail] xlist repop_queue; -- cgit v1.2.1 From db6623fa3ec0c3d693af9d6770f29351e1ef352c Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Mon, 7 Oct 2013 19:03:23 -0700 Subject: RadosModel: send racing read on write Signed-off-by: Samuel Just --- src/test/osd/RadosModel.h | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/src/test/osd/RadosModel.h b/src/test/osd/RadosModel.h index aba6a531c6f..ac2f336f110 100644 --- a/src/test/osd/RadosModel.h +++ b/src/test/osd/RadosModel.h @@ -767,9 +767,13 @@ public: string oid; ContDesc cont; set waiting; + librados::AioCompletion *rcompletion; uint64_t waiting_on; uint64_t last_acked_tid; + librados::ObjectReadOperation read_op; + bufferlist rbuffer; + WriteOp(int n, RadosTestContext *context, const string &oid, @@ -824,6 +828,21 @@ public: context->io_ctx.aio_write(context->prefix+oid, completion, to_write, i.get_len(), i.get_start()); } + + pair *cb_arg = + new pair( + this, + new TestOp::CallbackInfo(tid)); + rcompletion = context->rados.aio_create_completion( + (void*) cb_arg, &write_callback, NULL); + waiting_on++; + read_op.read(0, 1, &rbuffer, 0); + context->io_ctx.aio_operate( + context->prefix+oid, rcompletion, + &read_op, + librados::SNAP_HEAD, + librados::OPERATION_ORDER_READS_WRITES, // order wrt previous write/update + 0); } void _finish(CallbackInfo *info) @@ -860,6 +879,13 @@ public: } context->update_object_version(oid, version); + if (rcompletion->get_version64() != version) { + cerr << "Error: racing read on " << oid << " returned version " + << rcompletion->get_version64() << " rather than version " + << version << std::endl; + assert(0 == "racing read got wrong version"); + } + rcompletion->release(); context->oid_in_use.erase(oid); context->oid_not_in_use.insert(oid); context->kick(); -- cgit v1.2.1 From 1f50750d0fd94ffddc14f3d7a1e95fa4449aa1b8 Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Thu, 3 Oct 2013 22:00:46 -0700 Subject: ReplicatedPG: remove the other backfill related flushes Signed-off-by: Samuel Just --- src/osd/PG.h | 4 +--- src/osd/ReplicatedPG.cc | 38 ++++++++++++++++++++++++-------------- 2 files changed, 25 insertions(+), 17 deletions(-) diff --git a/src/osd/PG.h b/src/osd/PG.h index 275d30c7658..9b42ff4272b 100644 --- a/src/osd/PG.h +++ b/src/osd/PG.h @@ -449,9 +449,7 @@ protected: /// clear content void clear() { - objects.clear(); - begin = end = hobject_t(); - version = eversion_t(); + *this = BackfillInterval(); } void reset(hobject_t start) { diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 781ba33ec35..c4dccf68442 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -1549,11 +1549,14 @@ void ReplicatedPG::do_scan( } BackfillInterval bi; - osr->flush(); bi.begin = m->begin; + // No need to flush, there won't be any in progress writes occuring + // past m->begin scan_range( cct->_conf->osd_backfill_scan_min, - cct->_conf->osd_backfill_scan_max, &bi, handle); + cct->_conf->osd_backfill_scan_max, + &bi, + handle); MOSDPGScan *reply = new MOSDPGScan(MOSDPGScan::OP_SCAN_DIGEST, get_osdmap()->get_epoch(), m->query_epoch, info.pgid, bi.begin, bi.end); @@ -7930,9 +7933,6 @@ int ReplicatedPG::recover_backfill( << " interval " << pbi.begin << "-" << pbi.end << " " << pbi.objects.size() << " objects" << dendl; - int local_min = cct->_conf->osd_backfill_scan_min; - int local_max = cct->_conf->osd_backfill_scan_max; - // update our local interval to cope with recent changes backfill_info.begin = backfill_pos; update_range(&backfill_info, handle); @@ -7948,10 +7948,11 @@ int ReplicatedPG::recover_backfill( while (ops < max) { if (backfill_info.begin <= pbi.begin && !backfill_info.extends_to_end() && backfill_info.empty()) { - osr->flush(); - backfill_info.begin = backfill_info.end; - scan_range(local_min, local_max, &backfill_info, - handle); + hobject_t next = backfill_info.end; + backfill_info.clear(); + backfill_info.begin = next; + backfill_info.end = hobject_t::get_max(); + update_range(&backfill_info, handle); backfill_info.trim(); } backfill_pos = backfill_info.begin > pbi.begin ? pbi.begin : backfill_info.begin; @@ -8128,6 +8129,19 @@ void ReplicatedPG::update_range( { int local_min = cct->_conf->osd_backfill_scan_min; int local_max = cct->_conf->osd_backfill_scan_max; + + if (bi->version < info.log_tail) { + dout(10) << __func__<< ": bi is old, rescanning local backfill_info" + << dendl; + if (last_update_applied >= info.log_tail) { + bi->version = last_update_applied; + } else { + osr->flush(); + bi->version = info.last_update; + } + scan_range(local_min, local_max, bi, handle); + } + if (bi->version >= info.last_update) { dout(10) << __func__<< ": bi is current " << dendl; assert(bi->version == info.last_update); @@ -8167,10 +8181,7 @@ void ReplicatedPG::update_range( } bi->version = info.last_update; } else { - dout(10) << __func__<< ": bi is old, rescanning local backfill_info" - << dendl; - osr->flush(); - scan_range(local_min, local_max, &backfill_info, handle); + assert(0 == "scan_range should have raised bi->version past log_tail"); } } @@ -8180,7 +8191,6 @@ void ReplicatedPG::scan_range( { assert(is_locked()); dout(10) << "scan_range from " << bi->begin << dendl; - bi->version = info.last_update; bi->objects.clear(); // for good measure vector ls; -- cgit v1.2.1 From d8faa82fe4ed4386f48fe760e269e848052d17a1 Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Thu, 10 Oct 2013 16:23:02 -0700 Subject: ReplicatedPG: remove unused RWTracker::ObjState::clear Signed-off-by: Samuel Just --- src/osd/ReplicatedPG.h | 7 ------- 1 file changed, 7 deletions(-) diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index 21cca471538..015c6d3c38a 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -544,13 +544,6 @@ protected: assert(state == WRITE); dec(requeue); } - void clear(list *requeue) { - state = NONE; - count = 0; - assert(requeue); - assert(requeue->empty()); - requeue->swap(waiters); - } bool empty() const { return state == NONE; } }; map obj_state; -- cgit v1.2.1 From b0f49e0fba34faf37e8fe6fc2c8f2350b1ab1d23 Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Sun, 13 Oct 2013 09:47:58 -0700 Subject: ReplicatedPG.h: while there cannot be a read in progress, there may be a read blocked Signed-off-by: Samuel Just --- src/osd/ReplicatedPG.h | 1 - 1 file changed, 1 deletion(-) diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index 015c6d3c38a..1292780d044 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -580,7 +580,6 @@ protected: ctx->lock_to_release = OpContext::W_LOCK; return true; } else { - assert(0 == "Currently there cannot be a read in flight here"); return false; } } else { -- cgit v1.2.1