summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSamuel Just <sam.just@inktank.com>2013-08-29 18:46:21 -0700
committerSamuel Just <sam.just@inktank.com>2013-09-26 11:24:25 -0700
commit7da308bba0d60b1435cda9698a3320d97050699b (patch)
tree3eaab36c257742d46576063deee34e444f629d85
parent3148b12141c8d61d43295bfd7ad3aafc3b3cd484 (diff)
downloadceph-7da308bba0d60b1435cda9698a3320d97050699b.tar.gz
ReplicatedPG: extract PGBackend::Listener recovery callbacks
Signed-off-by: Samuel Just <sam.just@inktank.com>
-rw-r--r--src/osd/PGBackend.h6
-rw-r--r--src/osd/ReplicatedBackend.h1
-rw-r--r--src/osd/ReplicatedPG.cc247
-rw-r--r--src/osd/ReplicatedPG.h15
4 files changed, 160 insertions, 109 deletions
diff --git a/src/osd/PGBackend.h b/src/osd/PGBackend.h
index b17f0542d55..27dbd91b80e 100644
--- a/src/osd/PGBackend.h
+++ b/src/osd/PGBackend.h
@@ -53,6 +53,7 @@
const hobject_t &oid,
const object_stat_sum_t &stat_diff,
const ObjectRecoveryInfo &recovery_info,
+ ObjectContextRef obc,
ObjectStore::Transaction *t
) = 0;
@@ -71,6 +72,10 @@
const ObjectRecoveryInfo &recovery_info
) = 0;
+ virtual void begin_peer_recover(
+ int peer,
+ const hobject_t oid) = 0;
+
virtual void failed_push(int from, const hobject_t &soid) = 0;
/**
@@ -91,6 +96,7 @@
virtual const map<hobject_t, set<int> > &get_missing_loc() = 0;
virtual const map<int, pg_missing_t> &get_peer_missing() = 0;
+ virtual const map<int, pg_info_t> &get_peer_info() = 0;
virtual const pg_missing_t &get_local_missing() = 0;
virtual const PGLog &get_log() = 0;
virtual bool pgb_is_primary() const = 0;
diff --git a/src/osd/ReplicatedBackend.h b/src/osd/ReplicatedBackend.h
index bcd1239c626..4c20f248650 100644
--- a/src/osd/ReplicatedBackend.h
+++ b/src/osd/ReplicatedBackend.h
@@ -22,7 +22,6 @@
class ReplicatedBackend : public PGBackend {
struct RPGHandle : public PGBackend::RecoveryHandle {
map<int, vector<PushOp> > pushes;
- map<int, vector<PushReplyOp> > push_replies;
map<int, vector<PullOp> > pulls;
};
private:
diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc
index f7bcdd2949b..d413847455a 100644
--- a/src/osd/ReplicatedPG.cc
+++ b/src/osd/ReplicatedPG.cc
@@ -79,6 +79,137 @@ PGLSFilter::~PGLSFilter()
{
}
+// ======================
+// PGBackend::Listener
+
+
+void ReplicatedPG::on_local_recover_start(
+ const hobject_t &oid,
+ ObjectStore::Transaction *t)
+{
+ pg_log.revise_have(oid, eversion_t());
+ remove_snap_mapped_object(*t, oid);
+ t->remove(coll, oid);
+}
+
+void ReplicatedPG::on_local_recover(
+ const hobject_t &hoid,
+ const object_stat_sum_t &stat_diff,
+ const ObjectRecoveryInfo &_recovery_info,
+ ObjectContextRef obc,
+ ObjectStore::Transaction *t
+ )
+{
+ ObjectRecoveryInfo recovery_info(_recovery_info);
+ if (recovery_info.soid.snap < CEPH_NOSNAP) {
+ assert(recovery_info.oi.snaps.size());
+ OSDriver::OSTransaction _t(osdriver.get_transaction(t));
+ set<snapid_t> snaps(
+ recovery_info.oi.snaps.begin(),
+ recovery_info.oi.snaps.end());
+ snap_mapper.add_oid(
+ recovery_info.soid,
+ snaps,
+ &_t);
+ }
+
+ if (pg_log.get_missing().is_missing(recovery_info.soid) &&
+ pg_log.get_missing().missing.find(recovery_info.soid)->second.need > recovery_info.version) {
+ assert(is_primary());
+ const pg_log_entry_t *latest = pg_log.get_log().objects.find(recovery_info.soid)->second;
+ if (latest->op == pg_log_entry_t::LOST_REVERT &&
+ latest->reverting_to == recovery_info.version) {
+ dout(10) << " got old revert version " << recovery_info.version
+ << " for " << *latest << dendl;
+ recovery_info.version = latest->version;
+ // update the attr to the revert event version
+ recovery_info.oi.prior_version = recovery_info.oi.version;
+ recovery_info.oi.version = latest->version;
+ bufferlist bl;
+ ::encode(recovery_info.oi, bl);
+ t->setattr(coll, recovery_info.soid, OI_ATTR, bl);
+ }
+ }
+
+ // keep track of active pushes for scrub
+ ++active_pushes;
+
+ recover_got(recovery_info.soid, recovery_info.version);
+
+ if (is_primary()) {
+ info.stats.stats.sum.add(stat_diff);
+
+ assert(obc);
+ obc->obs.exists = true;
+ obc->ondisk_write_lock();
+ obc->obs.oi = recovery_info.oi; // may have been updated above
+
+
+ t->register_on_applied(new C_OSD_AppliedRecoveredObject(this, obc));
+ t->register_on_applied_sync(new C_OSD_OndiskWriteUnlock(obc));
+ t->register_on_complete(
+ new C_OSD_CompletedPull(this, hoid, get_osdmap()->get_epoch()));
+
+ publish_stats_to_osd();
+ if (waiting_for_missing_object.count(hoid)) {
+ dout(20) << " kicking waiters on " << hoid << dendl;
+ requeue_ops(waiting_for_missing_object[hoid]);
+ waiting_for_missing_object.erase(hoid);
+ if (pg_log.get_missing().missing.size() == 0) {
+ requeue_ops(waiting_for_all_missing);
+ waiting_for_all_missing.clear();
+ }
+ }
+ } else {
+ t->register_on_applied(
+ new C_OSD_AppliedRecoveredObjectReplica(this));
+
+ }
+
+ t->register_on_commit(
+ new C_OSD_CommittedPushedObject(
+ this,
+ get_osdmap()->get_epoch(),
+ info.last_complete));
+
+ // update pg
+ dirty_info = true;
+ write_if_dirty(*t);
+
+}
+
+void ReplicatedPG::on_global_recover(
+ const hobject_t &soid)
+{
+ publish_stats_to_osd();
+ pushing.erase(soid);
+ dout(10) << "pushed " << soid << " to all replicas" << dendl;
+ finish_recovery_op(soid);
+ if (waiting_for_degraded_object.count(soid)) {
+ requeue_ops(waiting_for_degraded_object[soid]);
+ waiting_for_degraded_object.erase(soid);
+ }
+ finish_degraded_object(soid);
+}
+
+void ReplicatedPG::on_peer_recover(
+ int peer,
+ const hobject_t &soid,
+ const ObjectRecoveryInfo &recovery_info)
+{
+ // done!
+ if (peer == backfill_target && backfills_in_flight.count(soid))
+ backfills_in_flight.erase(soid);
+ else
+ peer_missing[peer].got(soid, recovery_info.version);
+}
+
+void ReplicatedPG::begin_peer_recover(
+ int peer,
+ const hobject_t soid)
+{
+}
+
// =======================
// pg changes
@@ -6020,9 +6151,7 @@ void ReplicatedPG::submit_push_data(
}
if (first) {
- pg_log.revise_have(recovery_info.soid, eversion_t());
- remove_snap_mapped_object(*t, recovery_info.soid);
- t->remove(coll, recovery_info.soid);
+ on_local_recover_start(recovery_info.soid, t);
t->remove(get_temp_coll(t), recovery_info.soid);
t->touch(target_coll, recovery_info.soid);
t->omap_setheader(target_coll, recovery_info.soid, omap_header);
@@ -6072,41 +6201,6 @@ void ReplicatedPG::submit_push_complete(ObjectRecoveryInfo &recovery_info,
q.get_start(), q.get_len(), q.get_start());
}
}
-
- if (recovery_info.soid.snap < CEPH_NOSNAP) {
- assert(recovery_info.oi.snaps.size());
- OSDriver::OSTransaction _t(osdriver.get_transaction(t));
- set<snapid_t> snaps(
- recovery_info.oi.snaps.begin(),
- recovery_info.oi.snaps.end());
- snap_mapper.add_oid(
- recovery_info.soid,
- snaps,
- &_t);
- }
-
- if (pg_log.get_missing().is_missing(recovery_info.soid) &&
- pg_log.get_missing().missing.find(recovery_info.soid)->second.need > recovery_info.version) {
- assert(is_primary());
- const pg_log_entry_t *latest = pg_log.get_log().objects.find(recovery_info.soid)->second;
- if (latest->op == pg_log_entry_t::LOST_REVERT &&
- latest->reverting_to == recovery_info.version) {
- dout(10) << " got old revert version " << recovery_info.version
- << " for " << *latest << dendl;
- recovery_info.version = latest->version;
- // update the attr to the revert event version
- recovery_info.oi.prior_version = recovery_info.oi.version;
- recovery_info.oi.version = latest->version;
- bufferlist bl;
- ::encode(recovery_info.oi, bl);
- t->setattr(coll, recovery_info.soid, OI_ATTR, bl);
- }
- }
- recover_got(recovery_info.soid, recovery_info.version);
-
- // update pg
- dirty_info = true;
- write_if_dirty(*t);
}
ObjectRecoveryInfo ReplicatedPG::recalc_subsets(const ObjectRecoveryInfo& recovery_info)
@@ -6216,50 +6310,10 @@ bool ReplicatedPG::handle_pull_response(
info.stats.stats.sum.num_keys_recovered += pop.omap_entries.size();
if (complete) {
- info.stats.stats.sum.num_objects_recovered++;
-
- SnapSetContext *ssc;
- if (hoid.snap == CEPH_NOSNAP || hoid.snap == CEPH_SNAPDIR) {
- ssc = create_snapset_context(hoid.oid);
- ssc->snapset = pi.recovery_info.ss;
- } else {
- ssc = get_snapset_context(hoid.oid, hoid.get_key(), hoid.hash, false,
- hoid.get_namespace());
- assert(ssc);
- }
- ObjectContextRef obc = create_object_context(pi.recovery_info.oi, ssc);
- obc->obs.exists = true;
-
- obc->ondisk_write_lock();
-
- // keep track of active pushes for scrub
- ++active_pushes;
-
- t->register_on_applied(new C_OSD_AppliedRecoveredObject(this, obc));
- t->register_on_applied_sync(new C_OSD_OndiskWriteUnlock(obc));
- t->register_on_complete(
- new C_OSD_CompletedPull(this, hoid, get_osdmap()->get_epoch()));
- }
-
- t->register_on_commit(
- new C_OSD_CommittedPushedObject(
- this,
- get_osdmap()->get_epoch(),
- info.last_complete));
-
- if (complete) {
pulling.erase(hoid);
pull_from_peer[from].erase(hoid);
- publish_stats_to_osd();
- if (waiting_for_missing_object.count(hoid)) {
- dout(20) << " kicking waiters on " << hoid << dendl;
- requeue_ops(waiting_for_missing_object[hoid]);
- waiting_for_missing_object.erase(hoid);
- if (pg_log.get_missing().missing.size() == 0) {
- requeue_ops(waiting_for_all_missing);
- waiting_for_all_missing.clear();
- }
- }
+ info.stats.stats.sum.num_objects_recovered++;
+ on_local_recover(hoid, object_stat_sum_t(), pi.recovery_info, pi.obc, t);
return false;
} else {
response->soid = pop.soid;
@@ -6293,12 +6347,7 @@ void ReplicatedPG::handle_push(
bool complete = pop.after_progress.data_complete &&
pop.after_progress.omap_complete;
- // keep track of active pushes for scrub
- ++active_pushes;
-
response->soid = pop.recovery_info.soid;
- t->register_on_applied(
- new C_OSD_AppliedRecoveredObjectReplica(this));
submit_push_data(pop.recovery_info,
first,
complete,
@@ -6309,11 +6358,13 @@ void ReplicatedPG::handle_push(
pop.omap_entries,
t);
- t->register_on_commit(
- new C_OSD_CommittedPushedObject(
- this,
- get_osdmap()->get_epoch(),
- info.last_complete));
+ if (complete)
+ on_local_recover(
+ pop.recovery_info.soid,
+ object_stat_sum_t(),
+ pop.recovery_info,
+ ObjectContextRef(), // ok, is replica
+ t);
}
void ReplicatedPG::send_pushes(int prio, map<int, vector<PushOp> > &pushes)
@@ -6582,25 +6633,14 @@ bool ReplicatedPG::handle_push_reply(int peer, PushReplyOp &op, PushOp *reply)
return true;
} else {
// done!
- if (peer == backfill_target && backfills_in_flight.count(soid))
- backfills_in_flight.erase(soid);
- else
- peer_missing[peer].got(soid, pi->recovery_info.version);
+ on_peer_recover(peer, soid, pi->recovery_info);
pushing[soid].erase(peer);
pi = NULL;
- publish_stats_to_osd();
if (pushing[soid].empty()) {
- pushing.erase(soid);
- dout(10) << "pushed " << soid << " to all replicas" << dendl;
- finish_recovery_op(soid);
- if (waiting_for_degraded_object.count(soid)) {
- requeue_ops(waiting_for_degraded_object[soid]);
- waiting_for_degraded_object.erase(soid);
- }
- finish_degraded_object(soid);
+ on_global_recover(soid);
} else {
dout(10) << "pushed " << soid << ", still waiting for push ack from "
<< pushing[soid].size() << " others" << dendl;
@@ -6867,7 +6907,6 @@ void ReplicatedPG::sub_op_push(OpRequestRef op)
t->register_on_complete(new C_OSD_SendMessageOnConn(
osd, reply, m->get_connection()));
}
- t->register_on_commit(new C_OnPushCommit(this, op));
osd->store->queue_transaction(osr.get(), t);
return;
}
diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h
index 24f001b0fba..22ae47287d2 100644
--- a/src/osd/ReplicatedPG.h
+++ b/src/osd/ReplicatedPG.h
@@ -130,19 +130,23 @@ public:
/// Listener methods
void on_local_recover_start(
const hobject_t &oid,
- ObjectStore::Transaction *t) {}
+ ObjectStore::Transaction *t);
void on_local_recover(
const hobject_t &oid,
const object_stat_sum_t &stat_diff,
const ObjectRecoveryInfo &recovery_info,
+ ObjectContextRef obc,
ObjectStore::Transaction *t
- ) {}
+ );
void on_peer_recover(
int peer,
const hobject_t &oid,
- const ObjectRecoveryInfo &recovery_info) {}
+ const ObjectRecoveryInfo &recovery_info);
+ void begin_peer_recover(
+ int peer,
+ const hobject_t oid);
void on_global_recover(
- const hobject_t &oid) {}
+ const hobject_t &oid);
void failed_push(int from, const hobject_t &soid);
template <typename T>
@@ -207,6 +211,9 @@ public:
const map<int, pg_missing_t> &get_peer_missing() {
return peer_missing;
}
+ const map<int, pg_info_t> &get_peer_info() {
+ return peer_info;
+ }
const pg_missing_t &get_local_missing() {
return pg_log.get_missing();
}