summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorathanatos <rexludorum@gmail.com>2013-10-04 14:04:41 -0700
committerathanatos <rexludorum@gmail.com>2013-10-04 14:04:41 -0700
commitce2d9ae6174b5f8fdb0daacf6524a31ea3f93684 (patch)
tree7684618361f489e2fa7f482fd3835f7c932a0faf
parent20043eba126559cc84dbc33d04e6ead0fe856274 (diff)
parentb87bc2311aa4da065477f402a869e2edc1558e2f (diff)
downloadceph-ce2d9ae6174b5f8fdb0daacf6524a31ea3f93684.tar.gz
Merge pull request #692 from ceph/wip-5992-2
Wip 5992 2 Reviewed-by: Sage Weil <sage@inktank.com>
-rw-r--r--src/common/hobject.h4
-rw-r--r--src/mon/PGMap.cc53
-rw-r--r--src/mon/PGMap.h37
-rw-r--r--src/mon/PGMonitor.cc24
-rw-r--r--src/os/FileStore.cc2
-rw-r--r--src/osd/PG.cc37
-rw-r--r--src/osd/PG.h5
-rw-r--r--src/osd/PGBackend.h20
-rw-r--r--src/osd/PGLog.cc4
-rw-r--r--src/osd/ReplicatedBackend.cc72
-rw-r--r--src/osd/ReplicatedBackend.h20
-rw-r--r--src/osd/ReplicatedPG.cc160
-rw-r--r--src/osd/ReplicatedPG.h20
13 files changed, 375 insertions, 83 deletions
diff --git a/src/common/hobject.h b/src/common/hobject.h
index 82eecf3bfc7..a769ad060d9 100644
--- a/src/common/hobject.h
+++ b/src/common/hobject.h
@@ -247,6 +247,10 @@ public:
return hobj.get_filestore_key();
}
+ bool is_degenerate() const {
+ return generation == NO_GEN && shard_id == NO_SHARD;
+ }
+
// maximum sorted value.
static ghobject_t get_max() {
ghobject_t h(hobject_t::get_max());
diff --git a/src/mon/PGMap.cc b/src/mon/PGMap.cc
index 4be39aba902..0b3a0a6506c 100644
--- a/src/mon/PGMap.cc
+++ b/src/mon/PGMap.cc
@@ -30,7 +30,7 @@ void PGMap::Incremental::encode(bufferlist &bl, uint64_t features) const
return;
}
- ENCODE_START(6, 5, bl);
+ ENCODE_START(7, 5, bl);
::encode(version, bl);
::encode(pg_stat_updates, bl);
::encode(osd_stat_updates, bl);
@@ -41,6 +41,7 @@ void PGMap::Incremental::encode(bufferlist &bl, uint64_t features) const
::encode(nearfull_ratio, bl);
::encode(pg_remove, bl);
::encode(stamp, bl);
+ ::encode(osd_epochs, bl);
ENCODE_FINISH(bl);
}
@@ -89,6 +90,17 @@ void PGMap::Incremental::decode(bufferlist::iterator &bl)
}
if (struct_v >= 6)
::decode(stamp, bl);
+ if (struct_v >= 7) {
+ ::decode(osd_epochs, bl);
+ } else {
+ for (map<int32_t, osd_stat_t>::iterator i = osd_stat_updates.begin();
+ i != osd_stat_updates.end();
+ ++i) {
+ // This isn't accurate, but will cause trimming to behave like
+ // previously.
+ osd_epochs.insert(make_pair(i->first, osdmap_epoch));
+ }
+ }
DECODE_FINISH(bl);
}
@@ -195,8 +207,10 @@ void PGMap::apply_incremental(CephContext *cct, const Incremental& inc)
}
stat_pg_add(update_pg, update_stat);
}
- for (map<int32_t,osd_stat_t>::const_iterator p = inc.osd_stat_updates.begin();
- p != inc.osd_stat_updates.end();
+ assert(osd_stat.size() == osd_epochs.size());
+ for (map<int32_t,osd_stat_t>::const_iterator p =
+ inc.get_osd_stat_updates().begin();
+ p != inc.get_osd_stat_updates().end();
++p) {
int osd = p->first;
const osd_stat_t &new_stats(p->second);
@@ -209,6 +223,8 @@ void PGMap::apply_incremental(CephContext *cct, const Incremental& inc)
stat_osd_sub(t->second);
t->second = new_stats;
}
+ assert(inc.get_osd_epochs().find(osd) != inc.get_osd_epochs().end());
+ osd_epochs.insert(*(inc.get_osd_epochs().find(osd)));
stat_osd_add(new_stats);
@@ -226,8 +242,8 @@ void PGMap::apply_incremental(CephContext *cct, const Incremental& inc)
}
}
- for (set<int>::iterator p = inc.osd_stat_rm.begin();
- p != inc.osd_stat_rm.end();
+ for (set<int>::iterator p = inc.get_osd_stat_rm().begin();
+ p != inc.get_osd_stat_rm().end();
++p) {
hash_map<int32_t,osd_stat_t>::iterator t = osd_stat.find(*p);
if (t != osd_stat.end()) {
@@ -416,6 +432,14 @@ epoch_t PGMap::calc_min_last_epoch_clean() const
if (lec < min)
min = lec;
}
+ // also scan osd epochs
+ // don't trim past the oldest reported osd epoch
+ for (hash_map<int32_t, epoch_t>::const_iterator i = osd_epochs.begin();
+ i != osd_epochs.end();
+ ++i) {
+ if (i->second < min)
+ min = i->second;
+ }
return min;
}
@@ -434,7 +458,7 @@ void PGMap::encode(bufferlist &bl, uint64_t features) const
return;
}
- ENCODE_START(5, 4, bl);
+ ENCODE_START(6, 4, bl);
::encode(version, bl);
::encode(pg_stat, bl);
::encode(osd_stat, bl);
@@ -443,6 +467,7 @@ void PGMap::encode(bufferlist &bl, uint64_t features) const
::encode(full_ratio, bl);
::encode(nearfull_ratio, bl);
::encode(stamp, bl);
+ ::encode(osd_epochs, bl);
ENCODE_FINISH(bl);
}
@@ -472,6 +497,17 @@ void PGMap::decode(bufferlist::iterator &bl)
}
if (struct_v >= 5)
::decode(stamp, bl);
+ if (struct_v >= 6) {
+ ::decode(osd_epochs, bl);
+ } else {
+ for (hash_map<int32_t, osd_stat_t>::iterator i = osd_stat.begin();
+ i != osd_stat.end();
+ ++i) {
+ // This isn't accurate, but will cause trimming to behave like
+ // previously.
+ osd_epochs.insert(make_pair(i->first, last_osdmap_epoch));
+ }
+ }
DECODE_FINISH(bl);
calc_stats();
@@ -488,7 +524,10 @@ void PGMap::dirty_all(Incremental& inc)
inc.pg_stat_updates[p->first] = p->second;
}
for (hash_map<int32_t, osd_stat_t>::const_iterator p = osd_stat.begin(); p != osd_stat.end(); ++p) {
- inc.osd_stat_updates[p->first] = p->second;
+ assert(inc.get_osd_epochs().count(p->first));
+ inc.update_stat(p->first,
+ inc.get_osd_epochs().find(p->first)->second,
+ p->second);
}
}
diff --git a/src/mon/PGMap.h b/src/mon/PGMap.h
index 84d89f87517..7a202fc0006 100644
--- a/src/mon/PGMap.h
+++ b/src/mon/PGMap.h
@@ -43,12 +43,13 @@ public:
float full_ratio;
float nearfull_ratio;
+ // mapping of osd to most recently reported osdmap epoch
+ hash_map<int32_t,epoch_t> osd_epochs;
+
class Incremental {
public:
version_t version;
map<pg_t,pg_stat_t> pg_stat_updates;
- map<int32_t,osd_stat_t> osd_stat_updates;
- set<int32_t> osd_stat_rm;
epoch_t osdmap_epoch;
epoch_t pg_scan; // osdmap epoch
set<pg_t> pg_remove;
@@ -56,6 +57,38 @@ public:
float nearfull_ratio;
utime_t stamp;
+ private:
+ map<int32_t,osd_stat_t> osd_stat_updates;
+ set<int32_t> osd_stat_rm;
+
+ // mapping of osd to most recently reported osdmap epoch
+ map<int32_t,epoch_t> osd_epochs;
+ public:
+
+ const map<int32_t, osd_stat_t> &get_osd_stat_updates() const {
+ return osd_stat_updates;
+ }
+ const set<int32_t> &get_osd_stat_rm() const {
+ return osd_stat_rm;
+ }
+ const map<int32_t, epoch_t> &get_osd_epochs() const {
+ return osd_epochs;
+ }
+
+ void update_stat(int32_t osd, epoch_t epoch, const osd_stat_t &stat) {
+ osd_stat_updates[osd] = stat;
+ osd_epochs[osd] = epoch;
+ assert(osd_epochs.size() == osd_stat_updates.size());
+ }
+ void stat_osd_out(int32_t osd) {
+ // 0 the stats for the osd
+ osd_stat_updates[osd] = osd_stat_t();
+ }
+ void rm_stat(int32_t osd) {
+ osd_stat_rm.insert(osd);
+ osd_epochs.erase(osd);
+ osd_stat_updates.erase(osd);
+ }
void encode(bufferlist &bl, uint64_t features=-1) const;
void decode(bufferlist::iterator &bl);
void dump(Formatter *f) const;
diff --git a/src/mon/PGMonitor.cc b/src/mon/PGMonitor.cc
index 0f495052747..0644922ddb4 100644
--- a/src/mon/PGMonitor.cc
+++ b/src/mon/PGMonitor.cc
@@ -494,15 +494,19 @@ void PGMonitor::encode_pending(MonitorDBStore::Transaction *t)
{
bufferlist dirty;
string prefix = pgmap_osd_prefix;
- for (map<int32_t,osd_stat_t>::const_iterator p = pending_inc.osd_stat_updates.begin();
- p != pending_inc.osd_stat_updates.end();
+ for (map<int32_t,osd_stat_t>::const_iterator p =
+ pending_inc.get_osd_stat_updates().begin();
+ p != pending_inc.get_osd_stat_updates().end();
++p) {
::encode(p->first, dirty);
bufferlist bl;
::encode(p->second, bl, features);
t->put(prefix, stringify(p->first), bl);
}
- for (set<int32_t>::const_iterator p = pending_inc.osd_stat_rm.begin(); p != pending_inc.osd_stat_rm.end(); ++p) {
+ for (set<int32_t>::const_iterator p =
+ pending_inc.get_osd_stat_rm().begin();
+ p != pending_inc.get_osd_stat_rm().end();
+ ++p) {
::encode(*p, dirty);
t->erase(prefix, stringify(*p));
}
@@ -725,7 +729,11 @@ bool PGMonitor::prepare_pg_stats(MPGStats *stats)
}
// osd stat
- pending_inc.osd_stat_updates[from] = stats->osd_stat;
+ if (mon->osdmon()->osdmap.is_in(from)) {
+ pending_inc.update_stat(from, stats->epoch, stats->osd_stat);
+ } else {
+ pending_inc.update_stat(from, stats->epoch, osd_stat_t());
+ }
if (pg_map.osd_stat.count(from))
dout(10) << " got osd." << from << " " << stats->osd_stat << " (was " << pg_map.osd_stat[from] << ")" << dendl;
@@ -842,11 +850,7 @@ void PGMonitor::check_osd_map(epoch_t epoch)
++p)
if (p->second == CEPH_OSD_OUT) {
dout(10) << "check_osd_map osd." << p->first << " went OUT" << dendl;
- pending_inc.osd_stat_rm.insert(p->first);
- } else {
- dout(10) << "check_osd_map osd." << p->first << " is IN" << dendl;
- pending_inc.osd_stat_rm.erase(p->first);
- pending_inc.osd_stat_updates[p->first];
+ pending_inc.stat_osd_out(p->first);
}
// this is conservative: we want to know if any osds (maybe) got marked down.
@@ -867,7 +871,7 @@ void PGMonitor::check_osd_map(epoch_t epoch)
// whether it was created *or* destroyed, we can safely drop
// it's osd_stat_t record.
dout(10) << "check_osd_map osd." << p->first << " created or destroyed" << dendl;
- pending_inc.osd_stat_rm.insert(p->first);
+ pending_inc.rm_stat(p->first);
// and adjust full, nearfull set
pg_map.nearfull_osds.erase(p->first);
diff --git a/src/os/FileStore.cc b/src/os/FileStore.cc
index cd8a8e50658..43e4a288c4f 100644
--- a/src/os/FileStore.cc
+++ b/src/os/FileStore.cc
@@ -3555,6 +3555,8 @@ int FileStore::_setattrs(coll_t cid, const ghobject_t& oid, map<string,bufferptr
dout(10) << __func__ << " could not remove_xattrs r = " << r << dendl;
assert(!m_filestore_fail_eio || r != -EIO);
goto out_close;
+ } else {
+ r = 0; // don't confuse the debug output
}
}
diff --git a/src/osd/PG.cc b/src/osd/PG.cc
index f1985bf961b..1d9ed5f6a31 100644
--- a/src/osd/PG.cc
+++ b/src/osd/PG.cc
@@ -1,4 +1,3 @@
-
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
@@ -1997,8 +1996,7 @@ void PG::upgrade(ObjectStore *store, const interval_set<snapid_t> &snapcolls)
hobject_t cur;
vector<hobject_t> objects;
while (1) {
- int r = store->collection_list_partial(
- cid,
+ int r = get_pgbackend()->objects_list_partial(
cur,
store->get_ideal_list_min(),
store->get_ideal_list_max(),
@@ -2046,8 +2044,7 @@ void PG::upgrade(ObjectStore *store, const interval_set<snapid_t> &snapcolls)
while (1) {
dout(1) << "Updating snap_mapper from main collection, "
<< done << " objects done" << dendl;
- int r = store->collection_list_partial(
- cid,
+ int r = get_pgbackend()->objects_list_partial(
cur,
store->get_ideal_list_min(),
store->get_ideal_list_max(),
@@ -2070,19 +2067,16 @@ void PG::upgrade(ObjectStore *store, const interval_set<snapid_t> &snapcolls)
++j) {
if (j->snap < CEPH_MAXSNAP) {
OSDriver::OSTransaction _t(osdriver.get_transaction(&t));
- bufferptr bp;
- r = store->getattr(
- cid,
+ bufferlist bl;
+ r = get_pgbackend()->objects_get_attr(
*j,
OI_ATTR,
- bp);
+ &bl);
if (r < 0) {
derr << __func__ << ": getattr returned "
<< cpp_strerror(r) << dendl;
assert(0);
}
- bufferlist bl;
- bl.push_back(bp);
object_info_t oi(bl);
set<snapid_t> oi_snaps(oi.snaps.begin(), oi.snaps.end());
set<snapid_t> cur_snaps;
@@ -2412,9 +2406,8 @@ void PG::log_weirdness()
<< " log bound mismatch, empty but (" << pg_log.get_tail() << ","
<< pg_log.get_head() << "]\n";
} else {
- if ((pg_log.get_log().log.begin()->version <= pg_log.get_tail()) || // sloppy check
- (pg_log.get_log().log.rbegin()->version != pg_log.get_head() &&
- !(pg_log.get_head() == pg_log.get_tail())))
+ // sloppy check
+ if ((pg_log.get_log().log.begin()->version <= pg_log.get_tail()))
osd->clog.error() << info.pgid
<< " log bound mismatch, info (" << pg_log.get_tail() << ","
<< pg_log.get_head() << "]"
@@ -3039,9 +3032,9 @@ int PG::build_scrub_map_chunk(
// objects
vector<hobject_t> ls;
- int ret = osd->store->collection_list_range(coll, start, end, 0, &ls);
+ int ret = get_pgbackend()->objects_list_range(start, end, 0, &ls);
if (ret < 0) {
- dout(5) << "collection_list_range error: " << ret << dendl;
+ dout(5) << "objects_list_range error: " << ret << dendl;
return ret;
}
@@ -3561,11 +3554,13 @@ void PG::chunky_scrub(ThreadPool::TPHandle &handle)
hobject_t start = scrubber.start;
while (!boundary_found) {
vector<hobject_t> objects;
- ret = osd->store->collection_list_partial(coll, start,
- cct->_conf->osd_scrub_chunk_min,
- cct->_conf->osd_scrub_chunk_max,
- 0,
- &objects, &scrubber.end);
+ ret = get_pgbackend()->objects_list_partial(
+ start,
+ cct->_conf->osd_scrub_chunk_min,
+ cct->_conf->osd_scrub_chunk_max,
+ 0,
+ &objects,
+ &scrubber.end);
assert(ret >= 0);
// in case we don't find a boundary: start again at the end
diff --git a/src/osd/PG.h b/src/osd/PG.h
index 74809eea268..275d30c7658 100644
--- a/src/osd/PG.h
+++ b/src/osd/PG.h
@@ -48,6 +48,7 @@
#include "common/WorkQueue.h"
#include "common/ceph_context.h"
#include "include/str_list.h"
+#include "PGBackend.h"
#include <list>
#include <memory>
@@ -193,6 +194,8 @@ protected:
CephContext *cct;
OSDriver osdriver;
SnapMapper snap_mapper;
+
+ virtual PGBackend *get_pgbackend() = 0;
public:
void update_snap_mapper_bits(uint32_t bits) {
snap_mapper.update_bits(bits);
@@ -439,6 +442,7 @@ protected:
*/
struct BackfillInterval {
// info about a backfill interval on a peer
+ eversion_t version; /// version at which the scan occurred
map<hobject_t,eversion_t> objects;
hobject_t begin;
hobject_t end;
@@ -447,6 +451,7 @@ protected:
void clear() {
objects.clear();
begin = end = hobject_t();
+ version = eversion_t();
}
void reset(hobject_t start) {
diff --git a/src/osd/PGBackend.h b/src/osd/PGBackend.h
index e3cc05bf345..408c589a08a 100644
--- a/src/osd/PGBackend.h
+++ b/src/osd/PGBackend.h
@@ -205,6 +205,26 @@
virtual void clear_temp_obj(const hobject_t &oid) = 0;
virtual ~PGBackend() {}
+
+ /// List objects in collection
+ virtual int objects_list_partial(
+ const hobject_t &begin,
+ int min,
+ int max,
+ snapid_t seq,
+ vector<hobject_t> *ls,
+ hobject_t *next) = 0;
+
+ virtual int objects_list_range(
+ const hobject_t &start,
+ const hobject_t &end,
+ snapid_t seq,
+ vector<hobject_t> *ls) = 0;
+
+ virtual int objects_get_attr(
+ const hobject_t &hoid,
+ const string &attr,
+ bufferlist *out) = 0;
};
#endif
diff --git a/src/osd/PGLog.cc b/src/osd/PGLog.cc
index 6e025f289bc..1949c96fd57 100644
--- a/src/osd/PGLog.cc
+++ b/src/osd/PGLog.cc
@@ -52,13 +52,9 @@ void PGLog::IndexedLog::split_into(
if (log.empty())
tail = head;
- else
- head = log.rbegin()->version;
if (olog->empty())
olog->tail = olog->head;
- else
- olog->head = olog->log.rbegin()->version;
olog->index();
index();
diff --git a/src/osd/ReplicatedBackend.cc b/src/osd/ReplicatedBackend.cc
index 9868e7af2c8..ddc39d70372 100644
--- a/src/osd/ReplicatedBackend.cc
+++ b/src/osd/ReplicatedBackend.cc
@@ -194,3 +194,75 @@ void ReplicatedBackend::on_flushed()
assert(0 == "found garbage in the temp collection");
}
}
+
+
+int ReplicatedBackend::objects_list_partial(
+ const hobject_t &begin,
+ int min,
+ int max,
+ snapid_t seq,
+ vector<hobject_t> *ls,
+ hobject_t *next)
+{
+ vector<ghobject_t> objects;
+ ghobject_t _next;
+ int r = osd->store->collection_list_partial(
+ coll,
+ begin,
+ min,
+ max,
+ seq,
+ &objects,
+ &_next);
+ ls->reserve(objects.size());
+ for (vector<ghobject_t>::iterator i = objects.begin();
+ i != objects.end();
+ ++i) {
+ assert(i->is_degenerate());
+ ls->push_back(i->hobj);
+ }
+ assert(_next.is_degenerate());
+ *next = _next.hobj;
+ return r;
+}
+
+int ReplicatedBackend::objects_list_range(
+ const hobject_t &start,
+ const hobject_t &end,
+ snapid_t seq,
+ vector<hobject_t> *ls)
+{
+ vector<ghobject_t> objects;
+ int r = osd->store->collection_list_range(
+ coll,
+ start,
+ end,
+ seq,
+ &objects);
+ ls->reserve(objects.size());
+ for (vector<ghobject_t>::iterator i = objects.begin();
+ i != objects.end();
+ ++i) {
+ assert(i->is_degenerate());
+ ls->push_back(i->hobj);
+ }
+ return r;
+}
+
+int ReplicatedBackend::objects_get_attr(
+ const hobject_t &hoid,
+ const string &attr,
+ bufferlist *out)
+{
+ bufferptr bp;
+ int r = osd->store->getattr(
+ coll,
+ hoid,
+ attr.c_str(),
+ bp);
+ if (r >= 0 && out) {
+ out->clear();
+ out->push_back(bp);
+ }
+ return r;
+}
diff --git a/src/osd/ReplicatedBackend.h b/src/osd/ReplicatedBackend.h
index e34e55a618e..cc5f060e136 100644
--- a/src/osd/ReplicatedBackend.h
+++ b/src/osd/ReplicatedBackend.h
@@ -148,6 +148,26 @@ public:
f->close_section();
}
}
+
+ /// List objects in collection
+ int objects_list_partial(
+ const hobject_t &begin,
+ int min,
+ int max,
+ snapid_t seq,
+ vector<hobject_t> *ls,
+ hobject_t *next);
+
+ int objects_list_range(
+ const hobject_t &start,
+ const hobject_t &end,
+ snapid_t seq,
+ vector<hobject_t> *ls);
+
+ int objects_get_attr(
+ const hobject_t &hoid,
+ const string &attr,
+ bufferlist *out);
private:
// push
struct PushInfo {
diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc
index c4316196178..9df0495271b 100644
--- a/src/osd/ReplicatedPG.cc
+++ b/src/osd/ReplicatedPG.cc
@@ -398,8 +398,10 @@ bool PGLSPlainFilter::filter(bufferlist& xattr_data, bufferlist& outdata)
bool ReplicatedPG::pgls_filter(PGLSFilter *filter, hobject_t& sobj, bufferlist& outdata)
{
bufferlist bl;
-
- int ret = osd->store->getattr(coll_t(info.pgid), sobj, filter->get_xattr().c_str(), bl);
+ int ret = pgbackend->objects_get_attr(
+ sobj,
+ filter->get_xattr(),
+ &bl);
dout(0) << "getattr (sobj=" << sobj << ", attr=" << filter->get_xattr() << ") returned " << ret << dendl;
if (ret < 0)
return false;
@@ -639,12 +641,13 @@ void ReplicatedPG::do_pg_op(OpRequestRef op)
hobject_t next;
hobject_t current = response.handle;
osr->flush();
- int r = osd->store->collection_list_partial(coll, current,
- list_size,
- list_size,
- snapid,
- &sentries,
- &next);
+ int r = pgbackend->objects_list_partial(
+ current,
+ list_size,
+ list_size,
+ snapid,
+ &sentries,
+ &next);
if (r != 0) {
result = -EINVAL;
break;
@@ -682,13 +685,17 @@ void ReplicatedPG::do_pg_op(OpRequestRef op)
if (snapid != CEPH_NOSNAP) {
bufferlist bl;
if (candidate.snap == CEPH_NOSNAP) {
- osd->store->getattr(coll, candidate, SS_ATTR, bl);
+ pgbackend->objects_get_attr(
+ candidate,
+ SS_ATTR,
+ &bl);
SnapSet snapset(bl);
if (snapid <= snapset.seq)
continue;
} else {
bufferlist attr_bl;
- osd->store->getattr(coll, candidate, OI_ATTR, attr_bl);
+ pgbackend->objects_get_attr(
+ candidate, OI_ATTR, &attr_bl);
object_info_t oi(attr_bl);
vector<snapid_t>::iterator i = find(oi.snaps.begin(),
oi.snaps.end(),
@@ -1536,8 +1543,9 @@ void ReplicatedPG::do_scan(
BackfillInterval bi;
osr->flush();
+ bi.begin = m->begin;
scan_range(
- m->begin, cct->_conf->osd_backfill_scan_min,
+ cct->_conf->osd_backfill_scan_min,
cct->_conf->osd_backfill_scan_max, &bi, handle);
MOSDPGScan *reply = new MOSDPGScan(MOSDPGScan::OP_SCAN_DIGEST,
get_osdmap()->get_epoch(), m->query_epoch,
@@ -2659,7 +2667,10 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
string aname;
bp.copy(op.xattr.name_len, aname);
string name = "_" + aname;
- int r = osd->store->getattr(coll, soid, name.c_str(), osd_op.outdata);
+ int r = pgbackend->objects_get_attr(
+ soid,
+ name,
+ &(osd_op.outdata));
if (r >= 0) {
op.xattr.value_len = r;
result = 0;
@@ -2702,9 +2713,15 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
bufferlist xattr;
if (op.op == CEPH_OSD_OP_CMPXATTR)
- result = osd->store->getattr(coll, soid, name.c_str(), xattr);
+ result = pgbackend->objects_get_attr(
+ soid,
+ name,
+ &xattr);
else
- result = osd->store->getattr(coll, src_obc->obs.oi.soid, name.c_str(), xattr);
+ result = pgbackend->objects_get_attr(
+ src_obc->obs.oi.soid,
+ name,
+ &xattr);
if (result < 0 && result != -EEXIST && result != -ENODATA)
break;
@@ -4550,10 +4567,19 @@ void ReplicatedPG::apply_repop(RepGather *repop)
if (repop->ctx->clone_obc)
repop->ctx->clone_obc->ondisk_write_lock();
+ bool unlock_snapset_obc = false;
+ if (repop->ctx->snapset_obc && repop->ctx->snapset_obc->obs.oi.soid !=
+ repop->obc->obs.oi.soid) {
+ repop->ctx->snapset_obc->ondisk_write_lock();
+ unlock_snapset_obc = true;
+ }
+
Context *oncommit = new C_OSD_OpCommit(this, repop);
Context *onapplied = new C_OSD_OpApplied(this, repop);
- Context *onapplied_sync = new C_OSD_OndiskWriteUnlock(repop->obc,
- repop->ctx->clone_obc);
+ Context *onapplied_sync = new C_OSD_OndiskWriteUnlock(
+ repop->obc,
+ repop->ctx->clone_obc,
+ unlock_snapset_obc ? repop->ctx->snapset_obc : ObjectContextRef());
int r = osd->store->queue_transactions(osr.get(), repop->tls, onapplied, oncommit, onapplied_sync, repop->ctx->op);
if (r) {
derr << "apply_repop queue_transactions returned " << r << " on " << *repop << dendl;
@@ -5143,7 +5169,7 @@ ObjectContextRef ReplicatedPG::get_object_context(const hobject_t& soid,
assert(attrs->count(OI_ATTR));
bv.push_back(attrs->find(OI_ATTR)->second);
} else {
- int r = osd->store->getattr(coll, soid, OI_ATTR, bv);
+ int r = pgbackend->objects_get_attr(soid, OI_ATTR, &bv);
if (r < 0) {
if (!can_create)
return ObjectContextRef(); // -ENOENT!
@@ -5407,12 +5433,12 @@ SnapSetContext *ReplicatedPG::get_snapset_context(
if (!attrs) {
hobject_t head(oid, key, CEPH_NOSNAP, seed,
info.pgid.pool(), nspace);
- int r = osd->store->getattr(coll, head, SS_ATTR, bv);
+ int r = pgbackend->objects_get_attr(head, SS_ATTR, &bv);
if (r < 0) {
// try _snapset
hobject_t snapdir(oid, key, CEPH_SNAPDIR, seed,
info.pgid.pool(), nspace);
- r = osd->store->getattr(coll, snapdir, SS_ATTR, bv);
+ r = pgbackend->objects_get_attr(snapdir, SS_ATTR, &bv);
if (r < 0 && !can_create)
return NULL;
}
@@ -7789,6 +7815,8 @@ int ReplicatedPG::recover_replicas(int max, ThreadPool::TPHandle &handle)
int peer = acting[i];
map<int, pg_missing_t>::const_iterator pm = peer_missing.find(peer);
assert(pm != peer_missing.end());
+ map<int, pg_info_t>::const_iterator pi = peer_info.find(peer);
+ assert(pi != peer_info.end());
size_t m_sz = pm->second.num_missing();
dout(10) << " peer osd." << peer << " missing " << m_sz << " objects." << dendl;
@@ -7802,6 +7830,15 @@ int ReplicatedPG::recover_replicas(int max, ThreadPool::TPHandle &handle)
handle.reset_tp_timeout();
const hobject_t soid(p->second);
+ if (soid > pi->second.last_backfill) {
+ if (!recovering.count(soid)) {
+ derr << __func__ << ": object added to missing set for backfill, but "
+ << "is not in recovering, error!" << dendl;
+ assert(0);
+ }
+ continue;
+ }
+
if (recovering.count(soid)) {
dout(10) << __func__ << ": already recovering" << soid << dendl;
continue;
@@ -7869,17 +7906,12 @@ int ReplicatedPG::recover_backfill(
<< " interval " << pbi.begin << "-" << pbi.end
<< " " << pbi.objects.size() << " objects" << dendl;
- int local_min = osd->store->get_ideal_list_min();
- int local_max = osd->store->get_ideal_list_max();
+ int local_min = cct->_conf->osd_backfill_scan_min;
+ int local_max = cct->_conf->osd_backfill_scan_max;
- // re-scan our local interval to cope with recent changes
- // FIXME: we could track the eversion_t when we last scanned, and invalidate
- // that way. or explicitly modify/invalidate when we actually change specific
- // objects.
- dout(10) << " rescanning local backfill_info from " << backfill_pos << dendl;
- backfill_info.clear();
- osr->flush();
- scan_range(backfill_pos, local_min, local_max, &backfill_info, handle);
+ // update our local interval to cope with recent changes
+ backfill_info.begin = backfill_pos;
+ update_range(&backfill_info, handle);
int ops = 0;
map<hobject_t, pair<eversion_t, eversion_t> > to_push;
@@ -7893,7 +7925,8 @@ int ReplicatedPG::recover_backfill(
if (backfill_info.begin <= pbi.begin &&
!backfill_info.extends_to_end() && backfill_info.empty()) {
osr->flush();
- scan_range(backfill_info.end, local_min, local_max, &backfill_info,
+ backfill_info.begin = backfill_info.end;
+ scan_range(local_min, local_max, &backfill_info,
handle);
backfill_info.trim();
}
@@ -8054,26 +8087,81 @@ void ReplicatedPG::prep_backfill_object_push(
start_recovery_op(oid);
recovering.insert(oid);
ObjectContextRef obc = get_object_context(oid, false);
+
+ // We need to take the read_lock here in order to flush in-progress writes
+ obc->ondisk_read_lock();
pgbackend->recover_object(
oid,
ObjectContextRef(),
obc,
h);
+ obc->ondisk_read_unlock();
+}
+
+void ReplicatedPG::update_range(
+ BackfillInterval *bi,
+ ThreadPool::TPHandle &handle)
+{
+ int local_min = cct->_conf->osd_backfill_scan_min;
+ int local_max = cct->_conf->osd_backfill_scan_max;
+ if (bi->version >= info.last_update) {
+ dout(10) << __func__<< ": bi is current " << dendl;
+ assert(bi->version == info.last_update);
+ } else if (bi->version >= info.log_tail) {
+ assert(!pg_log.get_log().empty());
+ dout(10) << __func__<< ": bi is old, (" << bi->version
+ << ") can be updated with log" << dendl;
+ list<pg_log_entry_t>::const_iterator i =
+ pg_log.get_log().log.end();
+ --i;
+ while (i != pg_log.get_log().log.begin() &&
+ i->version > bi->version) {
+ --i;
+ }
+ if (i->version == bi->version)
+ ++i;
+
+ assert(i != pg_log.get_log().log.end());
+ dout(10) << __func__ << ": updating from version " << i->version
+ << dendl;
+ for (; i != pg_log.get_log().log.end(); ++i) {
+ const hobject_t &soid = i->soid;
+ if (soid >= bi->begin && soid < bi->end) {
+ if (i->is_update()) {
+ dout(10) << __func__ << ": " << i->soid << " updated to version "
+ << i->version << dendl;
+ bi->objects.erase(i->soid);
+ bi->objects.insert(
+ make_pair(
+ i->soid,
+ i->version));
+ } else if (i->is_delete()) {
+ dout(10) << __func__ << ": " << i->soid << " removed" << dendl;
+ bi->objects.erase(i->soid);
+ }
+ }
+ }
+ bi->version = info.last_update;
+ } else {
+ dout(10) << __func__<< ": bi is old, rescanning local backfill_info"
+ << dendl;
+ osr->flush();
+ scan_range(local_min, local_max, &backfill_info, handle);
+ }
}
void ReplicatedPG::scan_range(
- hobject_t begin, int min, int max, BackfillInterval *bi,
+ int min, int max, BackfillInterval *bi,
ThreadPool::TPHandle &handle)
{
assert(is_locked());
- dout(10) << "scan_range from " << begin << dendl;
- bi->begin = begin;
+ dout(10) << "scan_range from " << bi->begin << dendl;
+ bi->version = info.last_update;
bi->objects.clear(); // for good measure
vector<hobject_t> ls;
ls.reserve(max);
- int r = osd->store->collection_list_partial(coll, begin, min, max,
- 0, &ls, &bi->end);
+ int r = pgbackend->objects_list_partial(bi->begin, min, max, 0, &ls, &bi->end);
assert(r >= 0);
dout(10) << " got " << ls.size() << " items, next " << bi->end << dendl;
dout(20) << ls << dendl;
@@ -8088,7 +8176,7 @@ void ReplicatedPG::scan_range(
dout(20) << " " << *p << " " << obc->obs.oi.version << dendl;
} else {
bufferlist bl;
- int r = osd->store->getattr(coll, *p, OI_ATTR, bl);
+ int r = pgbackend->objects_get_attr(*p, OI_ATTR, &bl);
assert(r >= 0);
object_info_t oi(bl);
bi->objects[*p] = oi.version;
diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h
index 05edcef6adf..5abfc4cea56 100644
--- a/src/osd/ReplicatedPG.h
+++ b/src/osd/ReplicatedPG.h
@@ -124,6 +124,9 @@ public:
typedef boost::shared_ptr<CopyOp> CopyOpRef;
boost::scoped_ptr<PGBackend> pgbackend;
+ PGBackend *get_pgbackend() {
+ return pgbackend.get();
+ }
/// Listener methods
void on_local_recover_start(
@@ -619,10 +622,16 @@ protected:
* @bi [out] resulting map of objects to eversion_t's
*/
void scan_range(
- hobject_t begin, int min, int max, BackfillInterval *bi,
+ int min, int max, BackfillInterval *bi,
ThreadPool::TPHandle &handle
);
+ /// Update a hash range to reflect changes since the last scan
+ void update_range(
+ BackfillInterval *bi, ///< [in,out] interval to update
+ ThreadPool::TPHandle &handle ///< [in] tp handle
+ );
+
void prep_backfill_object_push(
hobject_t oid, eversion_t v, eversion_t have, int peer,
PGBackend::RecoveryHandle *h);
@@ -662,12 +671,17 @@ protected:
}
};
struct C_OSD_OndiskWriteUnlock : public Context {
- ObjectContextRef obc, obc2;
- C_OSD_OndiskWriteUnlock(ObjectContextRef o, ObjectContextRef o2 = ObjectContextRef()) : obc(o), obc2(o2) {}
+ ObjectContextRef obc, obc2, obc3;
+ C_OSD_OndiskWriteUnlock(
+ ObjectContextRef o,
+ ObjectContextRef o2 = ObjectContextRef(),
+ ObjectContextRef o3 = ObjectContextRef()) : obc(o), obc2(o2), obc3(o3) {}
void finish(int r) {
obc->ondisk_write_unlock();
if (obc2)
obc2->ondisk_write_unlock();
+ if (obc3)
+ obc3->ondisk_write_unlock();
}
};
struct C_OSD_OndiskWriteUnlockList : public Context {