summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSamuel Just <sam.just@inktank.com>2012-11-20 16:47:49 -0800
committerSamuel Just <sam.just@inktank.com>2012-12-06 22:51:52 -0800
commit9981bee565dae5921f45df80fe2744dc7fd50db2 (patch)
tree009511e945a56de1d7f7a58105b1f9df321ff732
parent993ff14357d702c3404bde70bd417eb9a3f9ff3c (diff)
downloadceph-9981bee565dae5921f45df80fe2744dc7fd50db2.tar.gz
OSD: add initial split support
PGs are split after updating to the map on which they split. OSD::activate_map populates the set of currently "splitting" pgs. Messages for those pgs are delayed until the split is complete. We add the newly split children to pg_map once the transaction populating their on-disk state completes. Signed-off-by: Samuel Just <sam.just@inktank.com>
-rw-r--r--src/osd/OSD.cc288
-rw-r--r--src/osd/OSD.h37
-rw-r--r--src/osd/PG.cc2
3 files changed, 303 insertions, 24 deletions
diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc
index f9ea8624be1..ec3035c0a66 100644
--- a/src/osd/OSD.cc
+++ b/src/osd/OSD.cc
@@ -175,9 +175,57 @@ OSDService::OSDService(OSD *osd) :
map_cache_lock("OSDService::map_lock"),
map_cache(g_conf->osd_map_cache_size),
map_bl_cache(g_conf->osd_map_cache_size),
- map_bl_inc_cache(g_conf->osd_map_cache_size)
+ map_bl_inc_cache(g_conf->osd_map_cache_size),
+ in_progress_split_lock("OSDService::in_progress_split_lock")
{}
+void OSDService::_start_split(const set<pg_t> &pgs)
+{
+ for (set<pg_t>::const_iterator i = pgs.begin();
+ i != pgs.end();
+ ++i) {
+ assert(!in_progress_splits.count(*i));
+ in_progress_splits.insert(*i);
+ }
+}
+
+void OSDService::expand_pg_num(OSDMapRef old_map,
+ OSDMapRef new_map)
+{
+ Mutex::Locker l(in_progress_split_lock);
+ set<pg_t> children;
+ for (set<pg_t>::iterator i = in_progress_splits.begin();
+ i != in_progress_splits.end();
+ ) {
+ assert(old_map->have_pg_pool(i->pool()));
+ if (!new_map->have_pg_pool(i->pool())) {
+ in_progress_splits.erase(i++);
+ } else {
+ i->is_split(old_map->get_pg_num(i->pool()),
+ new_map->get_pg_num(i->pool()), &children);
+ ++i;
+ }
+ }
+ _start_split(children);
+}
+
+bool OSDService::splitting(pg_t pgid)
+{
+ Mutex::Locker l(in_progress_split_lock);
+ return in_progress_splits.count(pgid);
+}
+
+void OSDService::complete_split(const set<pg_t> &pgs)
+{
+ Mutex::Locker l(in_progress_split_lock);
+ for (set<pg_t>::const_iterator i = pgs.begin();
+ i != pgs.end();
+ ++i) {
+ assert(in_progress_splits.count(*i));
+ in_progress_splits.erase(*i);
+ }
+}
+
void OSDService::need_heartbeat_peer_update()
{
osd->need_heartbeat_peer_update();
@@ -1271,6 +1319,22 @@ PG *OSD::_open_lock_pg(
{
assert(osd_lock.is_locked());
+ PG* pg = _make_pg(createmap, pgid);
+
+ pg_map[pgid] = pg;
+
+ if (hold_map_lock)
+ pg->lock_with_map_lock_held(no_lockdep_check);
+ else
+ pg->lock(no_lockdep_check);
+ pg->get(); // because it's in pg_map
+ return pg;
+}
+
+PG* OSD::_make_pg(
+ OSDMapRef createmap,
+ pg_t pgid)
+{
dout(10) << "_open_lock_pg " << pgid << dendl;
PGPool pool = _get_pool(pgid.pool(), createmap);
@@ -1283,17 +1347,39 @@ PG *OSD::_open_lock_pg(
else
assert(0);
- assert(pg_map.count(pgid) == 0);
- pg_map[pgid] = pg;
-
- if (hold_map_lock)
- pg->lock_with_map_lock_held(no_lockdep_check);
- else
- pg->lock(no_lockdep_check);
- pg->get(); // because it's in pg_map
return pg;
}
+
+void OSD::add_newly_split_pg(PG *pg, PG::RecoveryCtx *rctx)
+{
+ epoch_t e(service.get_osdmap()->get_epoch());
+ pg->get(); // For pg_map
+ pg_map[pg->info.pgid] = pg;
+ dout(10) << "Adding newly split pg " << *pg << dendl;
+ vector<int> up, acting;
+ pg->get_osdmap()->pg_to_up_acting_osds(pg->info.pgid, up, acting);
+ int role = pg->get_osdmap()->calc_pg_role(service.whoami, acting);
+ pg->set_role(role);
+ service.reg_last_pg_scrub(pg->info.pgid,
+ pg->info.history.last_scrub_stamp);
+ pg->handle_loaded(rctx);
+ pg->write_if_dirty(*(rctx->transaction));
+ pg->queue_null(e, e);
+ map<pg_t, list<PG::CephPeeringEvtRef> >::iterator to_wake =
+ peering_wait_for_split.find(pg->info.pgid);
+ if (to_wake != peering_wait_for_split.end()) {
+ for (list<PG::CephPeeringEvtRef>::iterator i =
+ to_wake->second.begin();
+ i != to_wake->second.end();
+ ++i) {
+ pg->queue_peering_event(*i);
+ }
+ peering_wait_for_split.erase(to_wake);
+ }
+ wake_pg_waiters(pg->info.pgid);
+}
+
PG *OSD::_create_lock_pg(
OSDMapRef createmap,
pg_t pgid, bool newly_created, bool hold_map_lock,
@@ -1567,13 +1653,14 @@ void OSD::build_past_intervals_parallel()
store->apply_transaction(t);
}
-
/*
* look up a pg. if we have it, great. if not, consider creating it IF the pg mapping
* hasn't changed since the given epoch and we are the primary.
*/
-PG *OSD::get_or_create_pg(const pg_info_t& info, pg_interval_map_t& pi,
- epoch_t epoch, int from, int& created, bool primary)
+PG *OSD::get_or_create_pg(
+ const pg_info_t& info, pg_interval_map_t& pi,
+ epoch_t epoch, int from, int& created, bool primary,
+ OpRequestRef op)
{
PG *pg;
@@ -1594,6 +1681,10 @@ PG *OSD::get_or_create_pg(const pg_info_t& info, pg_interval_map_t& pi,
return NULL;
}
+ if (service.splitting(info.pgid)) {
+ assert(0);
+ }
+
bool create = false;
if (primary) {
assert(role == 0); // otherwise, probably bug in project_pg_history.
@@ -3865,7 +3956,9 @@ void OSD::check_osdmap_features()
}
}
-void OSD::advance_pg(epoch_t osd_epoch, PG *pg, PG::RecoveryCtx *rctx)
+void OSD::advance_pg(
+ epoch_t osd_epoch, PG *pg, PG::RecoveryCtx *rctx,
+ set<boost::intrusive_ptr<PG> > *new_pgs)
{
assert(pg->is_locked());
epoch_t next_epoch = pg->get_osdmap()->get_epoch() + 1;
@@ -3879,9 +3972,22 @@ void OSD::advance_pg(epoch_t osd_epoch, PG *pg, PG::RecoveryCtx *rctx)
next_epoch <= osd_epoch;
++next_epoch) {
OSDMapRef nextmap = get_map(next_epoch);
+
vector<int> newup, newacting;
nextmap->pg_to_up_acting_osds(pg->info.pgid, newup, newacting);
pg->handle_advance_map(nextmap, lastmap, newup, newacting, rctx);
+
+ // Check for split!
+ set<pg_t> children;
+ if (pg->info.pgid.is_split(
+ lastmap->get_pg_num(pg->pool.id),
+ nextmap->get_pg_num(pg->pool.id),
+ &children)) {
+ split_pgs(
+ pg, children, new_pgs, lastmap, nextmap,
+ rctx);
+ }
+
lastmap = nextmap;
}
pg->handle_activate_map(rctx);
@@ -3953,6 +4059,22 @@ void OSD::advance_map(ObjectStore::Transaction& t, C_Contexts *tfin)
waiting_for_pg.erase(p++);
}
}
+ map<pg_t, list<PG::CephPeeringEvtRef> >::iterator q =
+ peering_wait_for_split.begin();
+ while (q != peering_wait_for_split.end()) {
+ pg_t pgid = q->first;
+
+ // am i still primary?
+ vector<int> acting;
+ int nrep = osdmap->pg_to_acting_osds(pgid, acting);
+ int role = osdmap->calc_pg_role(whoami, acting, nrep);
+ if (role >= 0) {
+ ++q; // still me
+ } else {
+ dout(10) << " discarding waiting ops for " << pgid << dendl;
+ peering_wait_for_split.erase(q++);
+ }
+ }
}
void OSD::activate_map()
@@ -3971,6 +4093,9 @@ void OSD::activate_map()
list<PG*> to_remove;
+ service.expand_pg_num(service.get_osdmap(),
+ osdmap);
+
// scan pg's
for (hash_map<pg_t,PG*>::iterator it = pg_map.begin();
it != pg_map.end();
@@ -3987,11 +4112,18 @@ void OSD::activate_map()
if (pg->is_primary() && pg->info.history.last_epoch_clean < oldest_last_clean)
oldest_last_clean = pg->info.history.last_epoch_clean;
+ set<pg_t> split_pgs;
if (!osdmap->have_pg_pool(pg->info.pgid.pool())) {
//pool is deleted!
pg->get();
to_remove.push_back(pg);
+ } else if (it->first.is_split(
+ service.get_osdmap()->get_pg_num(it->first.pool()),
+ osdmap->get_pg_num(it->first.pool()),
+ &split_pgs)) {
+ service.start_split(split_pgs);
}
+
pg->unlock();
}
@@ -4330,6 +4462,59 @@ bool OSD::can_create_pg(pg_t pgid)
return true;
}
+void OSD::split_pgs(
+ PG *parent,
+ const set<pg_t> &childpgids, set<boost::intrusive_ptr<PG> > *out_pgs,
+ OSDMapRef curmap,
+ OSDMapRef nextmap,
+ PG::RecoveryCtx *rctx)
+{
+ for (set<pg_t>::const_iterator i = childpgids.begin();
+ i != childpgids.end();
+ ++i) {
+ dout(10) << "Splitting " << *parent << " into " << *i << dendl;
+ assert(service.splitting(*i));
+ PG* child = _make_pg(nextmap, *i);
+ child->lock(true);
+ out_pgs->insert(child);
+
+ unsigned pg_num = nextmap->get_pg_num(
+ parent->pool.id);
+
+ unsigned split_bits = i->get_split_bits(pg_num);
+ dout(10) << "pg_num is " << pg_num << dendl;
+ dout(10) << "m_seed " << i->ps() << dendl;
+ dout(10) << "split_bits is " << split_bits << dendl;
+
+ rctx->transaction->split_collection(
+ coll_t(parent->info.pgid),
+ split_bits,
+ i->m_seed,
+ coll_t(*i));
+ for (interval_set<snapid_t>::iterator k = parent->snap_collections.begin();
+ k != parent->snap_collections.end();
+ ++k) {
+ for (snapid_t j = k.get_start(); j < k.get_start() + k.get_len();
+ ++j) {
+ rctx->transaction->split_collection(
+ coll_t(parent->info.pgid, j),
+ split_bits,
+ i->m_seed,
+ coll_t(*i, j));
+ }
+ }
+ child->snap_collections = parent->snap_collections;
+ parent->split_into(
+ *i,
+ child,
+ split_bits);
+
+ child->write_if_dirty(*(rctx->transaction));
+ child->unlock();
+ }
+}
+
+
void OSD::do_split(PG *parent, set<pg_t>& childpgids, ObjectStore::Transaction& t,
C_Contexts *tfin)
{
@@ -4820,8 +5005,17 @@ void OSD::handle_pg_notify(OpRequestRef op)
}
int created = 0;
+ if (service.splitting(it->first.info.pgid)) {
+ peering_wait_for_split[it->first.info.pgid].push_back(
+ PG::CephPeeringEvtRef(
+ new PG::CephPeeringEvt(
+ it->first.epoch_sent, it->first.query_epoch,
+ PG::MNotifyRec(from, it->first))));
+ continue;
+ }
+
pg = get_or_create_pg(it->first.info, it->second,
- it->first.query_epoch, from, created, true);
+ it->first.query_epoch, from, created, true, op);
if (!pg)
continue;
pg->queue_notify(it->first.epoch_sent, it->first.query_epoch, from, it->first);
@@ -4846,9 +5040,18 @@ void OSD::handle_pg_log(OpRequestRef op)
return;
}
+ if (service.splitting(m->info.pgid)) {
+ peering_wait_for_split[m->info.pgid].push_back(
+ PG::CephPeeringEvtRef(
+ new PG::CephPeeringEvt(
+ m->get_epoch(), m->get_query_epoch(),
+ PG::MLogRec(from, m))));
+ return;
+ }
+
int created = 0;
PG *pg = get_or_create_pg(m->info, m->past_intervals, m->get_epoch(),
- from, created, false);
+ from, created, false, op);
if (!pg)
return;
op->mark_started();
@@ -4880,8 +5083,16 @@ void OSD::handle_pg_info(OpRequestRef op)
continue;
}
+ if (service.splitting(p->first.info.pgid)) {
+ peering_wait_for_split[p->first.info.pgid].push_back(
+ PG::CephPeeringEvtRef(
+ new PG::CephPeeringEvt(
+ p->first.epoch_sent, p->first.query_epoch,
+ PG::MInfoRec(from, p->first.info, p->first.epoch_sent))));
+ continue;
+ }
PG *pg = get_or_create_pg(p->first.info, p->second, p->first.epoch_sent,
- from, created, false);
+ from, created, false, op);
if (!pg)
continue;
pg->queue_info(p->first.epoch_sent, p->first.query_epoch, from,
@@ -5121,6 +5332,15 @@ void OSD::handle_pg_query(OpRequestRef op)
continue;
}
+ if (service.splitting(pgid)) {
+ peering_wait_for_split[pgid].push_back(
+ PG::CephPeeringEvtRef(
+ new PG::CephPeeringEvt(
+ it->second.epoch_sent, it->second.epoch_sent,
+ PG::MQuery(from, it->second, it->second.epoch_sent))));
+ continue;
+ }
+
PG *pg = 0;
if (pg_map.count(pgid)) {
@@ -5653,6 +5873,11 @@ void OSD::handle_sub_op(OpRequestRef op)
_share_map_incoming(m->get_source_inst(), m->map_epoch,
(Session*)m->get_connection()->get_priv());
+ if (service.splitting(pgid)) {
+ waiting_for_pg[pgid].push_back(op);
+ return;
+ }
+
PG *pg = _have_pg(pgid) ? _lookup_pg(pgid) : NULL;
if (!pg) {
return;
@@ -5804,6 +6029,30 @@ void OSDService::queue_for_peering(PG *pg)
peering_wq.queue(pg);
}
+struct C_CompleteSplits : public Context {
+ OSD *osd;
+ set<boost::intrusive_ptr<PG> > pgs;
+ C_CompleteSplits(OSD *osd, const set<boost::intrusive_ptr<PG> > &in)
+ : osd(osd), pgs(in) {}
+ void finish(int r) {
+ Mutex::Locker l(osd->osd_lock);
+ PG::RecoveryCtx rctx = osd->create_context();
+ set<pg_t> to_complete;
+ for (set<boost::intrusive_ptr<PG> >::iterator i = pgs.begin();
+ i != pgs.end();
+ ++i) {
+ (*i)->lock();
+ osd->add_newly_split_pg(&**i, &rctx);
+ osd->dispatch_context_transaction(rctx, &**i);
+ if (!((*i)->deleting))
+ to_complete.insert((*i)->info.pgid);
+ (*i)->unlock();
+ }
+ osd->service.complete_split(to_complete);
+ osd->dispatch_context(rctx, 0, osd->service.get_osdmap());
+ }
+};
+
void OSD::process_peering_events(const list<PG*> &pgs)
{
bool need_up_thru = false;
@@ -5813,6 +6062,7 @@ void OSD::process_peering_events(const list<PG*> &pgs)
for (list<PG*>::const_iterator i = pgs.begin();
i != pgs.end();
++i) {
+ set<boost::intrusive_ptr<PG> > split_pgs;
PG *pg = *i;
pg->lock();
curmap = service.get_osdmap();
@@ -5820,7 +6070,7 @@ void OSD::process_peering_events(const list<PG*> &pgs)
pg->unlock();
continue;
}
- advance_pg(curmap->get_epoch(), pg, &rctx);
+ advance_pg(curmap->get_epoch(), pg, &rctx, &split_pgs);
if (!pg->peering_queue.empty()) {
PG::CephPeeringEvtRef evt = pg->peering_queue.front();
pg->peering_queue.pop_front();
@@ -5830,6 +6080,10 @@ void OSD::process_peering_events(const list<PG*> &pgs)
same_interval_since = MAX(pg->info.history.same_interval_since,
same_interval_since);
pg->write_if_dirty(*rctx.transaction);
+ if (split_pgs.size()) {
+ rctx.on_applied->add(new C_CompleteSplits(this, split_pgs));
+ split_pgs.clear();
+ }
dispatch_context_transaction(rctx, pg);
pg->unlock();
}
diff --git a/src/osd/OSD.h b/src/osd/OSD.h
index ba0da042a31..a87431a69d1 100644
--- a/src/osd/OSD.h
+++ b/src/osd/OSD.h
@@ -133,6 +133,7 @@ class AuthAuthorizeHandlerRegistry;
class OpsFlightSocketHook;
class HistoricOpsSocketHook;
+struct C_CompleteSplits;
extern const coll_t meta_coll;
@@ -365,6 +366,19 @@ public:
void init();
void shutdown();
+ // split
+ Mutex in_progress_split_lock;
+ set<pg_t> in_progress_splits;
+ void _start_split(const set<pg_t> &pgs);
+ void start_split(const set<pg_t> &pgs) {
+ Mutex::Locker l(in_progress_split_lock);
+ return _start_split(pgs);
+ }
+ void complete_split(const set<pg_t> &pgs);
+ bool splitting(pg_t pgid);
+ void expand_pg_num(OSDMapRef old_map,
+ OSDMapRef new_map);
+
OSDService(OSD *osd);
};
class OSD : public Dispatcher {
@@ -601,6 +615,7 @@ private:
}
friend class OpsFlightSocketHook;
friend class HistoricOpsSocketHook;
+ friend class C_CompleteSplits;
OpsFlightSocketHook *admin_ops_hook;
HistoricOpsSocketHook *historic_ops_hook;
@@ -742,7 +757,9 @@ private:
void note_down_osd(int osd);
void note_up_osd(int osd);
- void advance_pg(epoch_t advance_to, PG *pg, PG::RecoveryCtx *rctx);
+ void advance_pg(
+ epoch_t advance_to, PG *pg, PG::RecoveryCtx *rctx,
+ set<boost::intrusive_ptr<PG> > *split_pgs);
void advance_map(ObjectStore::Transaction& t, C_Contexts *tfin);
void activate_map();
@@ -782,6 +799,7 @@ protected:
// -- placement groups --
hash_map<pg_t, PG*> pg_map;
map<pg_t, list<OpRequestRef> > waiting_for_pg;
+ map<pg_t, list<PG::CephPeeringEvtRef> > peering_wait_for_split;
PGRecoveryStats pg_recovery_stats;
PGPool _get_pool(int id, OSDMapRef createmap);
@@ -803,11 +821,15 @@ protected:
PG *_lookup_qlock_pg(pg_t pgid);
PG *lookup_lock_raw_pg(pg_t pgid);
+ PG* _make_pg(OSDMapRef createmap, pg_t pgid);
+ void add_newly_split_pg(PG *pg,
+ PG::RecoveryCtx *rctx);
PG *get_or_create_pg(const pg_info_t& info,
- pg_interval_map_t& pi,
- epoch_t epoch, int from, int& pcreated,
- bool primary);
+ pg_interval_map_t& pi,
+ epoch_t epoch, int from, int& pcreated,
+ bool primary,
+ OpRequestRef op);
void load_pgs();
void build_past_intervals_parallel();
@@ -848,7 +870,12 @@ protected:
void do_split(PG *parent, set<pg_t>& children, ObjectStore::Transaction &t, C_Contexts *tfin);
void split_pg(PG *parent, map<pg_t,PG*>& children, ObjectStore::Transaction &t);
-
+ void split_pgs(
+ PG *parent,
+ const set<pg_t> &childpgids, set<boost::intrusive_ptr<PG> > *out_pgs,
+ OSDMapRef curmap,
+ OSDMapRef nextmap,
+ PG::RecoveryCtx *rctx);
// == monitor interaction ==
utime_t last_mon_report;
diff --git a/src/osd/PG.cc b/src/osd/PG.cc
index dfac75c5ca5..2323b988f70 100644
--- a/src/osd/PG.cc
+++ b/src/osd/PG.cc
@@ -1978,8 +1978,6 @@ void PG::split_into(pg_t child_pgid, PG *child, unsigned split_bits)
child->info.last_backfill = info.last_backfill;
child->info.stats = info.stats;
- info.stats.stats_invalid = true;
- child->info.stats.stats_invalid = true;
child->snap_trimq = snap_trimq;