diff options
author | athanatos <rexludorum@gmail.com> | 2013-10-04 14:04:41 -0700 |
---|---|---|
committer | athanatos <rexludorum@gmail.com> | 2013-10-04 14:04:41 -0700 |
commit | ce2d9ae6174b5f8fdb0daacf6524a31ea3f93684 (patch) | |
tree | 7684618361f489e2fa7f482fd3835f7c932a0faf | |
parent | 20043eba126559cc84dbc33d04e6ead0fe856274 (diff) | |
parent | b87bc2311aa4da065477f402a869e2edc1558e2f (diff) | |
download | ceph-ce2d9ae6174b5f8fdb0daacf6524a31ea3f93684.tar.gz |
Merge pull request #692 from ceph/wip-5992-2
Wip 5992 2
Reviewed-by: Sage Weil <sage@inktank.com>
-rw-r--r-- | src/common/hobject.h | 4 | ||||
-rw-r--r-- | src/mon/PGMap.cc | 53 | ||||
-rw-r--r-- | src/mon/PGMap.h | 37 | ||||
-rw-r--r-- | src/mon/PGMonitor.cc | 24 | ||||
-rw-r--r-- | src/os/FileStore.cc | 2 | ||||
-rw-r--r-- | src/osd/PG.cc | 37 | ||||
-rw-r--r-- | src/osd/PG.h | 5 | ||||
-rw-r--r-- | src/osd/PGBackend.h | 20 | ||||
-rw-r--r-- | src/osd/PGLog.cc | 4 | ||||
-rw-r--r-- | src/osd/ReplicatedBackend.cc | 72 | ||||
-rw-r--r-- | src/osd/ReplicatedBackend.h | 20 | ||||
-rw-r--r-- | src/osd/ReplicatedPG.cc | 160 | ||||
-rw-r--r-- | src/osd/ReplicatedPG.h | 20 |
13 files changed, 375 insertions, 83 deletions
diff --git a/src/common/hobject.h b/src/common/hobject.h index 82eecf3bfc7..a769ad060d9 100644 --- a/src/common/hobject.h +++ b/src/common/hobject.h @@ -247,6 +247,10 @@ public: return hobj.get_filestore_key(); } + bool is_degenerate() const { + return generation == NO_GEN && shard_id == NO_SHARD; + } + // maximum sorted value. static ghobject_t get_max() { ghobject_t h(hobject_t::get_max()); diff --git a/src/mon/PGMap.cc b/src/mon/PGMap.cc index 4be39aba902..0b3a0a6506c 100644 --- a/src/mon/PGMap.cc +++ b/src/mon/PGMap.cc @@ -30,7 +30,7 @@ void PGMap::Incremental::encode(bufferlist &bl, uint64_t features) const return; } - ENCODE_START(6, 5, bl); + ENCODE_START(7, 5, bl); ::encode(version, bl); ::encode(pg_stat_updates, bl); ::encode(osd_stat_updates, bl); @@ -41,6 +41,7 @@ void PGMap::Incremental::encode(bufferlist &bl, uint64_t features) const ::encode(nearfull_ratio, bl); ::encode(pg_remove, bl); ::encode(stamp, bl); + ::encode(osd_epochs, bl); ENCODE_FINISH(bl); } @@ -89,6 +90,17 @@ void PGMap::Incremental::decode(bufferlist::iterator &bl) } if (struct_v >= 6) ::decode(stamp, bl); + if (struct_v >= 7) { + ::decode(osd_epochs, bl); + } else { + for (map<int32_t, osd_stat_t>::iterator i = osd_stat_updates.begin(); + i != osd_stat_updates.end(); + ++i) { + // This isn't accurate, but will cause trimming to behave like + // previously. + osd_epochs.insert(make_pair(i->first, osdmap_epoch)); + } + } DECODE_FINISH(bl); } @@ -195,8 +207,10 @@ void PGMap::apply_incremental(CephContext *cct, const Incremental& inc) } stat_pg_add(update_pg, update_stat); } - for (map<int32_t,osd_stat_t>::const_iterator p = inc.osd_stat_updates.begin(); - p != inc.osd_stat_updates.end(); + assert(osd_stat.size() == osd_epochs.size()); + for (map<int32_t,osd_stat_t>::const_iterator p = + inc.get_osd_stat_updates().begin(); + p != inc.get_osd_stat_updates().end(); ++p) { int osd = p->first; const osd_stat_t &new_stats(p->second); @@ -209,6 +223,8 @@ void PGMap::apply_incremental(CephContext *cct, const Incremental& inc) stat_osd_sub(t->second); t->second = new_stats; } + assert(inc.get_osd_epochs().find(osd) != inc.get_osd_epochs().end()); + osd_epochs.insert(*(inc.get_osd_epochs().find(osd))); stat_osd_add(new_stats); @@ -226,8 +242,8 @@ void PGMap::apply_incremental(CephContext *cct, const Incremental& inc) } } - for (set<int>::iterator p = inc.osd_stat_rm.begin(); - p != inc.osd_stat_rm.end(); + for (set<int>::iterator p = inc.get_osd_stat_rm().begin(); + p != inc.get_osd_stat_rm().end(); ++p) { hash_map<int32_t,osd_stat_t>::iterator t = osd_stat.find(*p); if (t != osd_stat.end()) { @@ -416,6 +432,14 @@ epoch_t PGMap::calc_min_last_epoch_clean() const if (lec < min) min = lec; } + // also scan osd epochs + // don't trim past the oldest reported osd epoch + for (hash_map<int32_t, epoch_t>::const_iterator i = osd_epochs.begin(); + i != osd_epochs.end(); + ++i) { + if (i->second < min) + min = i->second; + } return min; } @@ -434,7 +458,7 @@ void PGMap::encode(bufferlist &bl, uint64_t features) const return; } - ENCODE_START(5, 4, bl); + ENCODE_START(6, 4, bl); ::encode(version, bl); ::encode(pg_stat, bl); ::encode(osd_stat, bl); @@ -443,6 +467,7 @@ void PGMap::encode(bufferlist &bl, uint64_t features) const ::encode(full_ratio, bl); ::encode(nearfull_ratio, bl); ::encode(stamp, bl); + ::encode(osd_epochs, bl); ENCODE_FINISH(bl); } @@ -472,6 +497,17 @@ void PGMap::decode(bufferlist::iterator &bl) } if (struct_v >= 5) ::decode(stamp, bl); + if (struct_v >= 6) { + ::decode(osd_epochs, bl); + } else { + for (hash_map<int32_t, osd_stat_t>::iterator i = osd_stat.begin(); + i != osd_stat.end(); + ++i) { + // This isn't accurate, but will cause trimming to behave like + // previously. + osd_epochs.insert(make_pair(i->first, last_osdmap_epoch)); + } + } DECODE_FINISH(bl); calc_stats(); @@ -488,7 +524,10 @@ void PGMap::dirty_all(Incremental& inc) inc.pg_stat_updates[p->first] = p->second; } for (hash_map<int32_t, osd_stat_t>::const_iterator p = osd_stat.begin(); p != osd_stat.end(); ++p) { - inc.osd_stat_updates[p->first] = p->second; + assert(inc.get_osd_epochs().count(p->first)); + inc.update_stat(p->first, + inc.get_osd_epochs().find(p->first)->second, + p->second); } } diff --git a/src/mon/PGMap.h b/src/mon/PGMap.h index 84d89f87517..7a202fc0006 100644 --- a/src/mon/PGMap.h +++ b/src/mon/PGMap.h @@ -43,12 +43,13 @@ public: float full_ratio; float nearfull_ratio; + // mapping of osd to most recently reported osdmap epoch + hash_map<int32_t,epoch_t> osd_epochs; + class Incremental { public: version_t version; map<pg_t,pg_stat_t> pg_stat_updates; - map<int32_t,osd_stat_t> osd_stat_updates; - set<int32_t> osd_stat_rm; epoch_t osdmap_epoch; epoch_t pg_scan; // osdmap epoch set<pg_t> pg_remove; @@ -56,6 +57,38 @@ public: float nearfull_ratio; utime_t stamp; + private: + map<int32_t,osd_stat_t> osd_stat_updates; + set<int32_t> osd_stat_rm; + + // mapping of osd to most recently reported osdmap epoch + map<int32_t,epoch_t> osd_epochs; + public: + + const map<int32_t, osd_stat_t> &get_osd_stat_updates() const { + return osd_stat_updates; + } + const set<int32_t> &get_osd_stat_rm() const { + return osd_stat_rm; + } + const map<int32_t, epoch_t> &get_osd_epochs() const { + return osd_epochs; + } + + void update_stat(int32_t osd, epoch_t epoch, const osd_stat_t &stat) { + osd_stat_updates[osd] = stat; + osd_epochs[osd] = epoch; + assert(osd_epochs.size() == osd_stat_updates.size()); + } + void stat_osd_out(int32_t osd) { + // 0 the stats for the osd + osd_stat_updates[osd] = osd_stat_t(); + } + void rm_stat(int32_t osd) { + osd_stat_rm.insert(osd); + osd_epochs.erase(osd); + osd_stat_updates.erase(osd); + } void encode(bufferlist &bl, uint64_t features=-1) const; void decode(bufferlist::iterator &bl); void dump(Formatter *f) const; diff --git a/src/mon/PGMonitor.cc b/src/mon/PGMonitor.cc index 0f495052747..0644922ddb4 100644 --- a/src/mon/PGMonitor.cc +++ b/src/mon/PGMonitor.cc @@ -494,15 +494,19 @@ void PGMonitor::encode_pending(MonitorDBStore::Transaction *t) { bufferlist dirty; string prefix = pgmap_osd_prefix; - for (map<int32_t,osd_stat_t>::const_iterator p = pending_inc.osd_stat_updates.begin(); - p != pending_inc.osd_stat_updates.end(); + for (map<int32_t,osd_stat_t>::const_iterator p = + pending_inc.get_osd_stat_updates().begin(); + p != pending_inc.get_osd_stat_updates().end(); ++p) { ::encode(p->first, dirty); bufferlist bl; ::encode(p->second, bl, features); t->put(prefix, stringify(p->first), bl); } - for (set<int32_t>::const_iterator p = pending_inc.osd_stat_rm.begin(); p != pending_inc.osd_stat_rm.end(); ++p) { + for (set<int32_t>::const_iterator p = + pending_inc.get_osd_stat_rm().begin(); + p != pending_inc.get_osd_stat_rm().end(); + ++p) { ::encode(*p, dirty); t->erase(prefix, stringify(*p)); } @@ -725,7 +729,11 @@ bool PGMonitor::prepare_pg_stats(MPGStats *stats) } // osd stat - pending_inc.osd_stat_updates[from] = stats->osd_stat; + if (mon->osdmon()->osdmap.is_in(from)) { + pending_inc.update_stat(from, stats->epoch, stats->osd_stat); + } else { + pending_inc.update_stat(from, stats->epoch, osd_stat_t()); + } if (pg_map.osd_stat.count(from)) dout(10) << " got osd." << from << " " << stats->osd_stat << " (was " << pg_map.osd_stat[from] << ")" << dendl; @@ -842,11 +850,7 @@ void PGMonitor::check_osd_map(epoch_t epoch) ++p) if (p->second == CEPH_OSD_OUT) { dout(10) << "check_osd_map osd." << p->first << " went OUT" << dendl; - pending_inc.osd_stat_rm.insert(p->first); - } else { - dout(10) << "check_osd_map osd." << p->first << " is IN" << dendl; - pending_inc.osd_stat_rm.erase(p->first); - pending_inc.osd_stat_updates[p->first]; + pending_inc.stat_osd_out(p->first); } // this is conservative: we want to know if any osds (maybe) got marked down. @@ -867,7 +871,7 @@ void PGMonitor::check_osd_map(epoch_t epoch) // whether it was created *or* destroyed, we can safely drop // it's osd_stat_t record. dout(10) << "check_osd_map osd." << p->first << " created or destroyed" << dendl; - pending_inc.osd_stat_rm.insert(p->first); + pending_inc.rm_stat(p->first); // and adjust full, nearfull set pg_map.nearfull_osds.erase(p->first); diff --git a/src/os/FileStore.cc b/src/os/FileStore.cc index cd8a8e50658..43e4a288c4f 100644 --- a/src/os/FileStore.cc +++ b/src/os/FileStore.cc @@ -3555,6 +3555,8 @@ int FileStore::_setattrs(coll_t cid, const ghobject_t& oid, map<string,bufferptr dout(10) << __func__ << " could not remove_xattrs r = " << r << dendl; assert(!m_filestore_fail_eio || r != -EIO); goto out_close; + } else { + r = 0; // don't confuse the debug output } } diff --git a/src/osd/PG.cc b/src/osd/PG.cc index f1985bf961b..1d9ed5f6a31 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -1,4 +1,3 @@ - // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab /* @@ -1997,8 +1996,7 @@ void PG::upgrade(ObjectStore *store, const interval_set<snapid_t> &snapcolls) hobject_t cur; vector<hobject_t> objects; while (1) { - int r = store->collection_list_partial( - cid, + int r = get_pgbackend()->objects_list_partial( cur, store->get_ideal_list_min(), store->get_ideal_list_max(), @@ -2046,8 +2044,7 @@ void PG::upgrade(ObjectStore *store, const interval_set<snapid_t> &snapcolls) while (1) { dout(1) << "Updating snap_mapper from main collection, " << done << " objects done" << dendl; - int r = store->collection_list_partial( - cid, + int r = get_pgbackend()->objects_list_partial( cur, store->get_ideal_list_min(), store->get_ideal_list_max(), @@ -2070,19 +2067,16 @@ void PG::upgrade(ObjectStore *store, const interval_set<snapid_t> &snapcolls) ++j) { if (j->snap < CEPH_MAXSNAP) { OSDriver::OSTransaction _t(osdriver.get_transaction(&t)); - bufferptr bp; - r = store->getattr( - cid, + bufferlist bl; + r = get_pgbackend()->objects_get_attr( *j, OI_ATTR, - bp); + &bl); if (r < 0) { derr << __func__ << ": getattr returned " << cpp_strerror(r) << dendl; assert(0); } - bufferlist bl; - bl.push_back(bp); object_info_t oi(bl); set<snapid_t> oi_snaps(oi.snaps.begin(), oi.snaps.end()); set<snapid_t> cur_snaps; @@ -2412,9 +2406,8 @@ void PG::log_weirdness() << " log bound mismatch, empty but (" << pg_log.get_tail() << "," << pg_log.get_head() << "]\n"; } else { - if ((pg_log.get_log().log.begin()->version <= pg_log.get_tail()) || // sloppy check - (pg_log.get_log().log.rbegin()->version != pg_log.get_head() && - !(pg_log.get_head() == pg_log.get_tail()))) + // sloppy check + if ((pg_log.get_log().log.begin()->version <= pg_log.get_tail())) osd->clog.error() << info.pgid << " log bound mismatch, info (" << pg_log.get_tail() << "," << pg_log.get_head() << "]" @@ -3039,9 +3032,9 @@ int PG::build_scrub_map_chunk( // objects vector<hobject_t> ls; - int ret = osd->store->collection_list_range(coll, start, end, 0, &ls); + int ret = get_pgbackend()->objects_list_range(start, end, 0, &ls); if (ret < 0) { - dout(5) << "collection_list_range error: " << ret << dendl; + dout(5) << "objects_list_range error: " << ret << dendl; return ret; } @@ -3561,11 +3554,13 @@ void PG::chunky_scrub(ThreadPool::TPHandle &handle) hobject_t start = scrubber.start; while (!boundary_found) { vector<hobject_t> objects; - ret = osd->store->collection_list_partial(coll, start, - cct->_conf->osd_scrub_chunk_min, - cct->_conf->osd_scrub_chunk_max, - 0, - &objects, &scrubber.end); + ret = get_pgbackend()->objects_list_partial( + start, + cct->_conf->osd_scrub_chunk_min, + cct->_conf->osd_scrub_chunk_max, + 0, + &objects, + &scrubber.end); assert(ret >= 0); // in case we don't find a boundary: start again at the end diff --git a/src/osd/PG.h b/src/osd/PG.h index 74809eea268..275d30c7658 100644 --- a/src/osd/PG.h +++ b/src/osd/PG.h @@ -48,6 +48,7 @@ #include "common/WorkQueue.h" #include "common/ceph_context.h" #include "include/str_list.h" +#include "PGBackend.h" #include <list> #include <memory> @@ -193,6 +194,8 @@ protected: CephContext *cct; OSDriver osdriver; SnapMapper snap_mapper; + + virtual PGBackend *get_pgbackend() = 0; public: void update_snap_mapper_bits(uint32_t bits) { snap_mapper.update_bits(bits); @@ -439,6 +442,7 @@ protected: */ struct BackfillInterval { // info about a backfill interval on a peer + eversion_t version; /// version at which the scan occurred map<hobject_t,eversion_t> objects; hobject_t begin; hobject_t end; @@ -447,6 +451,7 @@ protected: void clear() { objects.clear(); begin = end = hobject_t(); + version = eversion_t(); } void reset(hobject_t start) { diff --git a/src/osd/PGBackend.h b/src/osd/PGBackend.h index e3cc05bf345..408c589a08a 100644 --- a/src/osd/PGBackend.h +++ b/src/osd/PGBackend.h @@ -205,6 +205,26 @@ virtual void clear_temp_obj(const hobject_t &oid) = 0; virtual ~PGBackend() {} + + /// List objects in collection + virtual int objects_list_partial( + const hobject_t &begin, + int min, + int max, + snapid_t seq, + vector<hobject_t> *ls, + hobject_t *next) = 0; + + virtual int objects_list_range( + const hobject_t &start, + const hobject_t &end, + snapid_t seq, + vector<hobject_t> *ls) = 0; + + virtual int objects_get_attr( + const hobject_t &hoid, + const string &attr, + bufferlist *out) = 0; }; #endif diff --git a/src/osd/PGLog.cc b/src/osd/PGLog.cc index 6e025f289bc..1949c96fd57 100644 --- a/src/osd/PGLog.cc +++ b/src/osd/PGLog.cc @@ -52,13 +52,9 @@ void PGLog::IndexedLog::split_into( if (log.empty()) tail = head; - else - head = log.rbegin()->version; if (olog->empty()) olog->tail = olog->head; - else - olog->head = olog->log.rbegin()->version; olog->index(); index(); diff --git a/src/osd/ReplicatedBackend.cc b/src/osd/ReplicatedBackend.cc index 9868e7af2c8..ddc39d70372 100644 --- a/src/osd/ReplicatedBackend.cc +++ b/src/osd/ReplicatedBackend.cc @@ -194,3 +194,75 @@ void ReplicatedBackend::on_flushed() assert(0 == "found garbage in the temp collection"); } } + + +int ReplicatedBackend::objects_list_partial( + const hobject_t &begin, + int min, + int max, + snapid_t seq, + vector<hobject_t> *ls, + hobject_t *next) +{ + vector<ghobject_t> objects; + ghobject_t _next; + int r = osd->store->collection_list_partial( + coll, + begin, + min, + max, + seq, + &objects, + &_next); + ls->reserve(objects.size()); + for (vector<ghobject_t>::iterator i = objects.begin(); + i != objects.end(); + ++i) { + assert(i->is_degenerate()); + ls->push_back(i->hobj); + } + assert(_next.is_degenerate()); + *next = _next.hobj; + return r; +} + +int ReplicatedBackend::objects_list_range( + const hobject_t &start, + const hobject_t &end, + snapid_t seq, + vector<hobject_t> *ls) +{ + vector<ghobject_t> objects; + int r = osd->store->collection_list_range( + coll, + start, + end, + seq, + &objects); + ls->reserve(objects.size()); + for (vector<ghobject_t>::iterator i = objects.begin(); + i != objects.end(); + ++i) { + assert(i->is_degenerate()); + ls->push_back(i->hobj); + } + return r; +} + +int ReplicatedBackend::objects_get_attr( + const hobject_t &hoid, + const string &attr, + bufferlist *out) +{ + bufferptr bp; + int r = osd->store->getattr( + coll, + hoid, + attr.c_str(), + bp); + if (r >= 0 && out) { + out->clear(); + out->push_back(bp); + } + return r; +} diff --git a/src/osd/ReplicatedBackend.h b/src/osd/ReplicatedBackend.h index e34e55a618e..cc5f060e136 100644 --- a/src/osd/ReplicatedBackend.h +++ b/src/osd/ReplicatedBackend.h @@ -148,6 +148,26 @@ public: f->close_section(); } } + + /// List objects in collection + int objects_list_partial( + const hobject_t &begin, + int min, + int max, + snapid_t seq, + vector<hobject_t> *ls, + hobject_t *next); + + int objects_list_range( + const hobject_t &start, + const hobject_t &end, + snapid_t seq, + vector<hobject_t> *ls); + + int objects_get_attr( + const hobject_t &hoid, + const string &attr, + bufferlist *out); private: // push struct PushInfo { diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index c4316196178..9df0495271b 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -398,8 +398,10 @@ bool PGLSPlainFilter::filter(bufferlist& xattr_data, bufferlist& outdata) bool ReplicatedPG::pgls_filter(PGLSFilter *filter, hobject_t& sobj, bufferlist& outdata) { bufferlist bl; - - int ret = osd->store->getattr(coll_t(info.pgid), sobj, filter->get_xattr().c_str(), bl); + int ret = pgbackend->objects_get_attr( + sobj, + filter->get_xattr(), + &bl); dout(0) << "getattr (sobj=" << sobj << ", attr=" << filter->get_xattr() << ") returned " << ret << dendl; if (ret < 0) return false; @@ -639,12 +641,13 @@ void ReplicatedPG::do_pg_op(OpRequestRef op) hobject_t next; hobject_t current = response.handle; osr->flush(); - int r = osd->store->collection_list_partial(coll, current, - list_size, - list_size, - snapid, - &sentries, - &next); + int r = pgbackend->objects_list_partial( + current, + list_size, + list_size, + snapid, + &sentries, + &next); if (r != 0) { result = -EINVAL; break; @@ -682,13 +685,17 @@ void ReplicatedPG::do_pg_op(OpRequestRef op) if (snapid != CEPH_NOSNAP) { bufferlist bl; if (candidate.snap == CEPH_NOSNAP) { - osd->store->getattr(coll, candidate, SS_ATTR, bl); + pgbackend->objects_get_attr( + candidate, + SS_ATTR, + &bl); SnapSet snapset(bl); if (snapid <= snapset.seq) continue; } else { bufferlist attr_bl; - osd->store->getattr(coll, candidate, OI_ATTR, attr_bl); + pgbackend->objects_get_attr( + candidate, OI_ATTR, &attr_bl); object_info_t oi(attr_bl); vector<snapid_t>::iterator i = find(oi.snaps.begin(), oi.snaps.end(), @@ -1536,8 +1543,9 @@ void ReplicatedPG::do_scan( BackfillInterval bi; osr->flush(); + bi.begin = m->begin; scan_range( - m->begin, cct->_conf->osd_backfill_scan_min, + cct->_conf->osd_backfill_scan_min, cct->_conf->osd_backfill_scan_max, &bi, handle); MOSDPGScan *reply = new MOSDPGScan(MOSDPGScan::OP_SCAN_DIGEST, get_osdmap()->get_epoch(), m->query_epoch, @@ -2659,7 +2667,10 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops) string aname; bp.copy(op.xattr.name_len, aname); string name = "_" + aname; - int r = osd->store->getattr(coll, soid, name.c_str(), osd_op.outdata); + int r = pgbackend->objects_get_attr( + soid, + name, + &(osd_op.outdata)); if (r >= 0) { op.xattr.value_len = r; result = 0; @@ -2702,9 +2713,15 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops) bufferlist xattr; if (op.op == CEPH_OSD_OP_CMPXATTR) - result = osd->store->getattr(coll, soid, name.c_str(), xattr); + result = pgbackend->objects_get_attr( + soid, + name, + &xattr); else - result = osd->store->getattr(coll, src_obc->obs.oi.soid, name.c_str(), xattr); + result = pgbackend->objects_get_attr( + src_obc->obs.oi.soid, + name, + &xattr); if (result < 0 && result != -EEXIST && result != -ENODATA) break; @@ -4550,10 +4567,19 @@ void ReplicatedPG::apply_repop(RepGather *repop) if (repop->ctx->clone_obc) repop->ctx->clone_obc->ondisk_write_lock(); + bool unlock_snapset_obc = false; + if (repop->ctx->snapset_obc && repop->ctx->snapset_obc->obs.oi.soid != + repop->obc->obs.oi.soid) { + repop->ctx->snapset_obc->ondisk_write_lock(); + unlock_snapset_obc = true; + } + Context *oncommit = new C_OSD_OpCommit(this, repop); Context *onapplied = new C_OSD_OpApplied(this, repop); - Context *onapplied_sync = new C_OSD_OndiskWriteUnlock(repop->obc, - repop->ctx->clone_obc); + Context *onapplied_sync = new C_OSD_OndiskWriteUnlock( + repop->obc, + repop->ctx->clone_obc, + unlock_snapset_obc ? repop->ctx->snapset_obc : ObjectContextRef()); int r = osd->store->queue_transactions(osr.get(), repop->tls, onapplied, oncommit, onapplied_sync, repop->ctx->op); if (r) { derr << "apply_repop queue_transactions returned " << r << " on " << *repop << dendl; @@ -5143,7 +5169,7 @@ ObjectContextRef ReplicatedPG::get_object_context(const hobject_t& soid, assert(attrs->count(OI_ATTR)); bv.push_back(attrs->find(OI_ATTR)->second); } else { - int r = osd->store->getattr(coll, soid, OI_ATTR, bv); + int r = pgbackend->objects_get_attr(soid, OI_ATTR, &bv); if (r < 0) { if (!can_create) return ObjectContextRef(); // -ENOENT! @@ -5407,12 +5433,12 @@ SnapSetContext *ReplicatedPG::get_snapset_context( if (!attrs) { hobject_t head(oid, key, CEPH_NOSNAP, seed, info.pgid.pool(), nspace); - int r = osd->store->getattr(coll, head, SS_ATTR, bv); + int r = pgbackend->objects_get_attr(head, SS_ATTR, &bv); if (r < 0) { // try _snapset hobject_t snapdir(oid, key, CEPH_SNAPDIR, seed, info.pgid.pool(), nspace); - r = osd->store->getattr(coll, snapdir, SS_ATTR, bv); + r = pgbackend->objects_get_attr(snapdir, SS_ATTR, &bv); if (r < 0 && !can_create) return NULL; } @@ -7789,6 +7815,8 @@ int ReplicatedPG::recover_replicas(int max, ThreadPool::TPHandle &handle) int peer = acting[i]; map<int, pg_missing_t>::const_iterator pm = peer_missing.find(peer); assert(pm != peer_missing.end()); + map<int, pg_info_t>::const_iterator pi = peer_info.find(peer); + assert(pi != peer_info.end()); size_t m_sz = pm->second.num_missing(); dout(10) << " peer osd." << peer << " missing " << m_sz << " objects." << dendl; @@ -7802,6 +7830,15 @@ int ReplicatedPG::recover_replicas(int max, ThreadPool::TPHandle &handle) handle.reset_tp_timeout(); const hobject_t soid(p->second); + if (soid > pi->second.last_backfill) { + if (!recovering.count(soid)) { + derr << __func__ << ": object added to missing set for backfill, but " + << "is not in recovering, error!" << dendl; + assert(0); + } + continue; + } + if (recovering.count(soid)) { dout(10) << __func__ << ": already recovering" << soid << dendl; continue; @@ -7869,17 +7906,12 @@ int ReplicatedPG::recover_backfill( << " interval " << pbi.begin << "-" << pbi.end << " " << pbi.objects.size() << " objects" << dendl; - int local_min = osd->store->get_ideal_list_min(); - int local_max = osd->store->get_ideal_list_max(); + int local_min = cct->_conf->osd_backfill_scan_min; + int local_max = cct->_conf->osd_backfill_scan_max; - // re-scan our local interval to cope with recent changes - // FIXME: we could track the eversion_t when we last scanned, and invalidate - // that way. or explicitly modify/invalidate when we actually change specific - // objects. - dout(10) << " rescanning local backfill_info from " << backfill_pos << dendl; - backfill_info.clear(); - osr->flush(); - scan_range(backfill_pos, local_min, local_max, &backfill_info, handle); + // update our local interval to cope with recent changes + backfill_info.begin = backfill_pos; + update_range(&backfill_info, handle); int ops = 0; map<hobject_t, pair<eversion_t, eversion_t> > to_push; @@ -7893,7 +7925,8 @@ int ReplicatedPG::recover_backfill( if (backfill_info.begin <= pbi.begin && !backfill_info.extends_to_end() && backfill_info.empty()) { osr->flush(); - scan_range(backfill_info.end, local_min, local_max, &backfill_info, + backfill_info.begin = backfill_info.end; + scan_range(local_min, local_max, &backfill_info, handle); backfill_info.trim(); } @@ -8054,26 +8087,81 @@ void ReplicatedPG::prep_backfill_object_push( start_recovery_op(oid); recovering.insert(oid); ObjectContextRef obc = get_object_context(oid, false); + + // We need to take the read_lock here in order to flush in-progress writes + obc->ondisk_read_lock(); pgbackend->recover_object( oid, ObjectContextRef(), obc, h); + obc->ondisk_read_unlock(); +} + +void ReplicatedPG::update_range( + BackfillInterval *bi, + ThreadPool::TPHandle &handle) +{ + int local_min = cct->_conf->osd_backfill_scan_min; + int local_max = cct->_conf->osd_backfill_scan_max; + if (bi->version >= info.last_update) { + dout(10) << __func__<< ": bi is current " << dendl; + assert(bi->version == info.last_update); + } else if (bi->version >= info.log_tail) { + assert(!pg_log.get_log().empty()); + dout(10) << __func__<< ": bi is old, (" << bi->version + << ") can be updated with log" << dendl; + list<pg_log_entry_t>::const_iterator i = + pg_log.get_log().log.end(); + --i; + while (i != pg_log.get_log().log.begin() && + i->version > bi->version) { + --i; + } + if (i->version == bi->version) + ++i; + + assert(i != pg_log.get_log().log.end()); + dout(10) << __func__ << ": updating from version " << i->version + << dendl; + for (; i != pg_log.get_log().log.end(); ++i) { + const hobject_t &soid = i->soid; + if (soid >= bi->begin && soid < bi->end) { + if (i->is_update()) { + dout(10) << __func__ << ": " << i->soid << " updated to version " + << i->version << dendl; + bi->objects.erase(i->soid); + bi->objects.insert( + make_pair( + i->soid, + i->version)); + } else if (i->is_delete()) { + dout(10) << __func__ << ": " << i->soid << " removed" << dendl; + bi->objects.erase(i->soid); + } + } + } + bi->version = info.last_update; + } else { + dout(10) << __func__<< ": bi is old, rescanning local backfill_info" + << dendl; + osr->flush(); + scan_range(local_min, local_max, &backfill_info, handle); + } } void ReplicatedPG::scan_range( - hobject_t begin, int min, int max, BackfillInterval *bi, + int min, int max, BackfillInterval *bi, ThreadPool::TPHandle &handle) { assert(is_locked()); - dout(10) << "scan_range from " << begin << dendl; - bi->begin = begin; + dout(10) << "scan_range from " << bi->begin << dendl; + bi->version = info.last_update; bi->objects.clear(); // for good measure vector<hobject_t> ls; ls.reserve(max); - int r = osd->store->collection_list_partial(coll, begin, min, max, - 0, &ls, &bi->end); + int r = pgbackend->objects_list_partial(bi->begin, min, max, 0, &ls, &bi->end); assert(r >= 0); dout(10) << " got " << ls.size() << " items, next " << bi->end << dendl; dout(20) << ls << dendl; @@ -8088,7 +8176,7 @@ void ReplicatedPG::scan_range( dout(20) << " " << *p << " " << obc->obs.oi.version << dendl; } else { bufferlist bl; - int r = osd->store->getattr(coll, *p, OI_ATTR, bl); + int r = pgbackend->objects_get_attr(*p, OI_ATTR, &bl); assert(r >= 0); object_info_t oi(bl); bi->objects[*p] = oi.version; diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index 05edcef6adf..5abfc4cea56 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -124,6 +124,9 @@ public: typedef boost::shared_ptr<CopyOp> CopyOpRef; boost::scoped_ptr<PGBackend> pgbackend; + PGBackend *get_pgbackend() { + return pgbackend.get(); + } /// Listener methods void on_local_recover_start( @@ -619,10 +622,16 @@ protected: * @bi [out] resulting map of objects to eversion_t's */ void scan_range( - hobject_t begin, int min, int max, BackfillInterval *bi, + int min, int max, BackfillInterval *bi, ThreadPool::TPHandle &handle ); + /// Update a hash range to reflect changes since the last scan + void update_range( + BackfillInterval *bi, ///< [in,out] interval to update + ThreadPool::TPHandle &handle ///< [in] tp handle + ); + void prep_backfill_object_push( hobject_t oid, eversion_t v, eversion_t have, int peer, PGBackend::RecoveryHandle *h); @@ -662,12 +671,17 @@ protected: } }; struct C_OSD_OndiskWriteUnlock : public Context { - ObjectContextRef obc, obc2; - C_OSD_OndiskWriteUnlock(ObjectContextRef o, ObjectContextRef o2 = ObjectContextRef()) : obc(o), obc2(o2) {} + ObjectContextRef obc, obc2, obc3; + C_OSD_OndiskWriteUnlock( + ObjectContextRef o, + ObjectContextRef o2 = ObjectContextRef(), + ObjectContextRef o3 = ObjectContextRef()) : obc(o), obc2(o2), obc3(o3) {} void finish(int r) { obc->ondisk_write_unlock(); if (obc2) obc2->ondisk_write_unlock(); + if (obc3) + obc3->ondisk_write_unlock(); } }; struct C_OSD_OndiskWriteUnlockList : public Context { |