diff options
author | Samuel Just <sam.just@inktank.com> | 2012-11-20 16:47:49 -0800 |
---|---|---|
committer | Samuel Just <sam.just@inktank.com> | 2012-12-06 22:51:52 -0800 |
commit | 9981bee565dae5921f45df80fe2744dc7fd50db2 (patch) | |
tree | 009511e945a56de1d7f7a58105b1f9df321ff732 | |
parent | 993ff14357d702c3404bde70bd417eb9a3f9ff3c (diff) | |
download | ceph-9981bee565dae5921f45df80fe2744dc7fd50db2.tar.gz |
OSD: add initial split support
PGs are split after updating to the map on which they split.
OSD::activate_map populates the set of currently "splitting"
pgs. Messages for those pgs are delayed until the split
is complete. We add the newly split children to pg_map
once the transaction populating their on-disk state completes.
Signed-off-by: Samuel Just <sam.just@inktank.com>
-rw-r--r-- | src/osd/OSD.cc | 288 | ||||
-rw-r--r-- | src/osd/OSD.h | 37 | ||||
-rw-r--r-- | src/osd/PG.cc | 2 |
3 files changed, 303 insertions, 24 deletions
diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index f9ea8624be1..ec3035c0a66 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -175,9 +175,57 @@ OSDService::OSDService(OSD *osd) : map_cache_lock("OSDService::map_lock"), map_cache(g_conf->osd_map_cache_size), map_bl_cache(g_conf->osd_map_cache_size), - map_bl_inc_cache(g_conf->osd_map_cache_size) + map_bl_inc_cache(g_conf->osd_map_cache_size), + in_progress_split_lock("OSDService::in_progress_split_lock") {} +void OSDService::_start_split(const set<pg_t> &pgs) +{ + for (set<pg_t>::const_iterator i = pgs.begin(); + i != pgs.end(); + ++i) { + assert(!in_progress_splits.count(*i)); + in_progress_splits.insert(*i); + } +} + +void OSDService::expand_pg_num(OSDMapRef old_map, + OSDMapRef new_map) +{ + Mutex::Locker l(in_progress_split_lock); + set<pg_t> children; + for (set<pg_t>::iterator i = in_progress_splits.begin(); + i != in_progress_splits.end(); + ) { + assert(old_map->have_pg_pool(i->pool())); + if (!new_map->have_pg_pool(i->pool())) { + in_progress_splits.erase(i++); + } else { + i->is_split(old_map->get_pg_num(i->pool()), + new_map->get_pg_num(i->pool()), &children); + ++i; + } + } + _start_split(children); +} + +bool OSDService::splitting(pg_t pgid) +{ + Mutex::Locker l(in_progress_split_lock); + return in_progress_splits.count(pgid); +} + +void OSDService::complete_split(const set<pg_t> &pgs) +{ + Mutex::Locker l(in_progress_split_lock); + for (set<pg_t>::const_iterator i = pgs.begin(); + i != pgs.end(); + ++i) { + assert(in_progress_splits.count(*i)); + in_progress_splits.erase(*i); + } +} + void OSDService::need_heartbeat_peer_update() { osd->need_heartbeat_peer_update(); @@ -1271,6 +1319,22 @@ PG *OSD::_open_lock_pg( { assert(osd_lock.is_locked()); + PG* pg = _make_pg(createmap, pgid); + + pg_map[pgid] = pg; + + if (hold_map_lock) + pg->lock_with_map_lock_held(no_lockdep_check); + else + pg->lock(no_lockdep_check); + pg->get(); // because it's in pg_map + return pg; +} + +PG* OSD::_make_pg( + OSDMapRef createmap, + pg_t pgid) +{ dout(10) << "_open_lock_pg " << pgid << dendl; PGPool pool = _get_pool(pgid.pool(), createmap); @@ -1283,17 +1347,39 @@ PG *OSD::_open_lock_pg( else assert(0); - assert(pg_map.count(pgid) == 0); - pg_map[pgid] = pg; - - if (hold_map_lock) - pg->lock_with_map_lock_held(no_lockdep_check); - else - pg->lock(no_lockdep_check); - pg->get(); // because it's in pg_map return pg; } + +void OSD::add_newly_split_pg(PG *pg, PG::RecoveryCtx *rctx) +{ + epoch_t e(service.get_osdmap()->get_epoch()); + pg->get(); // For pg_map + pg_map[pg->info.pgid] = pg; + dout(10) << "Adding newly split pg " << *pg << dendl; + vector<int> up, acting; + pg->get_osdmap()->pg_to_up_acting_osds(pg->info.pgid, up, acting); + int role = pg->get_osdmap()->calc_pg_role(service.whoami, acting); + pg->set_role(role); + service.reg_last_pg_scrub(pg->info.pgid, + pg->info.history.last_scrub_stamp); + pg->handle_loaded(rctx); + pg->write_if_dirty(*(rctx->transaction)); + pg->queue_null(e, e); + map<pg_t, list<PG::CephPeeringEvtRef> >::iterator to_wake = + peering_wait_for_split.find(pg->info.pgid); + if (to_wake != peering_wait_for_split.end()) { + for (list<PG::CephPeeringEvtRef>::iterator i = + to_wake->second.begin(); + i != to_wake->second.end(); + ++i) { + pg->queue_peering_event(*i); + } + peering_wait_for_split.erase(to_wake); + } + wake_pg_waiters(pg->info.pgid); +} + PG *OSD::_create_lock_pg( OSDMapRef createmap, pg_t pgid, bool newly_created, bool hold_map_lock, @@ -1567,13 +1653,14 @@ void OSD::build_past_intervals_parallel() store->apply_transaction(t); } - /* * look up a pg. if we have it, great. if not, consider creating it IF the pg mapping * hasn't changed since the given epoch and we are the primary. */ -PG *OSD::get_or_create_pg(const pg_info_t& info, pg_interval_map_t& pi, - epoch_t epoch, int from, int& created, bool primary) +PG *OSD::get_or_create_pg( + const pg_info_t& info, pg_interval_map_t& pi, + epoch_t epoch, int from, int& created, bool primary, + OpRequestRef op) { PG *pg; @@ -1594,6 +1681,10 @@ PG *OSD::get_or_create_pg(const pg_info_t& info, pg_interval_map_t& pi, return NULL; } + if (service.splitting(info.pgid)) { + assert(0); + } + bool create = false; if (primary) { assert(role == 0); // otherwise, probably bug in project_pg_history. @@ -3865,7 +3956,9 @@ void OSD::check_osdmap_features() } } -void OSD::advance_pg(epoch_t osd_epoch, PG *pg, PG::RecoveryCtx *rctx) +void OSD::advance_pg( + epoch_t osd_epoch, PG *pg, PG::RecoveryCtx *rctx, + set<boost::intrusive_ptr<PG> > *new_pgs) { assert(pg->is_locked()); epoch_t next_epoch = pg->get_osdmap()->get_epoch() + 1; @@ -3879,9 +3972,22 @@ void OSD::advance_pg(epoch_t osd_epoch, PG *pg, PG::RecoveryCtx *rctx) next_epoch <= osd_epoch; ++next_epoch) { OSDMapRef nextmap = get_map(next_epoch); + vector<int> newup, newacting; nextmap->pg_to_up_acting_osds(pg->info.pgid, newup, newacting); pg->handle_advance_map(nextmap, lastmap, newup, newacting, rctx); + + // Check for split! + set<pg_t> children; + if (pg->info.pgid.is_split( + lastmap->get_pg_num(pg->pool.id), + nextmap->get_pg_num(pg->pool.id), + &children)) { + split_pgs( + pg, children, new_pgs, lastmap, nextmap, + rctx); + } + lastmap = nextmap; } pg->handle_activate_map(rctx); @@ -3953,6 +4059,22 @@ void OSD::advance_map(ObjectStore::Transaction& t, C_Contexts *tfin) waiting_for_pg.erase(p++); } } + map<pg_t, list<PG::CephPeeringEvtRef> >::iterator q = + peering_wait_for_split.begin(); + while (q != peering_wait_for_split.end()) { + pg_t pgid = q->first; + + // am i still primary? + vector<int> acting; + int nrep = osdmap->pg_to_acting_osds(pgid, acting); + int role = osdmap->calc_pg_role(whoami, acting, nrep); + if (role >= 0) { + ++q; // still me + } else { + dout(10) << " discarding waiting ops for " << pgid << dendl; + peering_wait_for_split.erase(q++); + } + } } void OSD::activate_map() @@ -3971,6 +4093,9 @@ void OSD::activate_map() list<PG*> to_remove; + service.expand_pg_num(service.get_osdmap(), + osdmap); + // scan pg's for (hash_map<pg_t,PG*>::iterator it = pg_map.begin(); it != pg_map.end(); @@ -3987,11 +4112,18 @@ void OSD::activate_map() if (pg->is_primary() && pg->info.history.last_epoch_clean < oldest_last_clean) oldest_last_clean = pg->info.history.last_epoch_clean; + set<pg_t> split_pgs; if (!osdmap->have_pg_pool(pg->info.pgid.pool())) { //pool is deleted! pg->get(); to_remove.push_back(pg); + } else if (it->first.is_split( + service.get_osdmap()->get_pg_num(it->first.pool()), + osdmap->get_pg_num(it->first.pool()), + &split_pgs)) { + service.start_split(split_pgs); } + pg->unlock(); } @@ -4330,6 +4462,59 @@ bool OSD::can_create_pg(pg_t pgid) return true; } +void OSD::split_pgs( + PG *parent, + const set<pg_t> &childpgids, set<boost::intrusive_ptr<PG> > *out_pgs, + OSDMapRef curmap, + OSDMapRef nextmap, + PG::RecoveryCtx *rctx) +{ + for (set<pg_t>::const_iterator i = childpgids.begin(); + i != childpgids.end(); + ++i) { + dout(10) << "Splitting " << *parent << " into " << *i << dendl; + assert(service.splitting(*i)); + PG* child = _make_pg(nextmap, *i); + child->lock(true); + out_pgs->insert(child); + + unsigned pg_num = nextmap->get_pg_num( + parent->pool.id); + + unsigned split_bits = i->get_split_bits(pg_num); + dout(10) << "pg_num is " << pg_num << dendl; + dout(10) << "m_seed " << i->ps() << dendl; + dout(10) << "split_bits is " << split_bits << dendl; + + rctx->transaction->split_collection( + coll_t(parent->info.pgid), + split_bits, + i->m_seed, + coll_t(*i)); + for (interval_set<snapid_t>::iterator k = parent->snap_collections.begin(); + k != parent->snap_collections.end(); + ++k) { + for (snapid_t j = k.get_start(); j < k.get_start() + k.get_len(); + ++j) { + rctx->transaction->split_collection( + coll_t(parent->info.pgid, j), + split_bits, + i->m_seed, + coll_t(*i, j)); + } + } + child->snap_collections = parent->snap_collections; + parent->split_into( + *i, + child, + split_bits); + + child->write_if_dirty(*(rctx->transaction)); + child->unlock(); + } +} + + void OSD::do_split(PG *parent, set<pg_t>& childpgids, ObjectStore::Transaction& t, C_Contexts *tfin) { @@ -4820,8 +5005,17 @@ void OSD::handle_pg_notify(OpRequestRef op) } int created = 0; + if (service.splitting(it->first.info.pgid)) { + peering_wait_for_split[it->first.info.pgid].push_back( + PG::CephPeeringEvtRef( + new PG::CephPeeringEvt( + it->first.epoch_sent, it->first.query_epoch, + PG::MNotifyRec(from, it->first)))); + continue; + } + pg = get_or_create_pg(it->first.info, it->second, - it->first.query_epoch, from, created, true); + it->first.query_epoch, from, created, true, op); if (!pg) continue; pg->queue_notify(it->first.epoch_sent, it->first.query_epoch, from, it->first); @@ -4846,9 +5040,18 @@ void OSD::handle_pg_log(OpRequestRef op) return; } + if (service.splitting(m->info.pgid)) { + peering_wait_for_split[m->info.pgid].push_back( + PG::CephPeeringEvtRef( + new PG::CephPeeringEvt( + m->get_epoch(), m->get_query_epoch(), + PG::MLogRec(from, m)))); + return; + } + int created = 0; PG *pg = get_or_create_pg(m->info, m->past_intervals, m->get_epoch(), - from, created, false); + from, created, false, op); if (!pg) return; op->mark_started(); @@ -4880,8 +5083,16 @@ void OSD::handle_pg_info(OpRequestRef op) continue; } + if (service.splitting(p->first.info.pgid)) { + peering_wait_for_split[p->first.info.pgid].push_back( + PG::CephPeeringEvtRef( + new PG::CephPeeringEvt( + p->first.epoch_sent, p->first.query_epoch, + PG::MInfoRec(from, p->first.info, p->first.epoch_sent)))); + continue; + } PG *pg = get_or_create_pg(p->first.info, p->second, p->first.epoch_sent, - from, created, false); + from, created, false, op); if (!pg) continue; pg->queue_info(p->first.epoch_sent, p->first.query_epoch, from, @@ -5121,6 +5332,15 @@ void OSD::handle_pg_query(OpRequestRef op) continue; } + if (service.splitting(pgid)) { + peering_wait_for_split[pgid].push_back( + PG::CephPeeringEvtRef( + new PG::CephPeeringEvt( + it->second.epoch_sent, it->second.epoch_sent, + PG::MQuery(from, it->second, it->second.epoch_sent)))); + continue; + } + PG *pg = 0; if (pg_map.count(pgid)) { @@ -5653,6 +5873,11 @@ void OSD::handle_sub_op(OpRequestRef op) _share_map_incoming(m->get_source_inst(), m->map_epoch, (Session*)m->get_connection()->get_priv()); + if (service.splitting(pgid)) { + waiting_for_pg[pgid].push_back(op); + return; + } + PG *pg = _have_pg(pgid) ? _lookup_pg(pgid) : NULL; if (!pg) { return; @@ -5804,6 +6029,30 @@ void OSDService::queue_for_peering(PG *pg) peering_wq.queue(pg); } +struct C_CompleteSplits : public Context { + OSD *osd; + set<boost::intrusive_ptr<PG> > pgs; + C_CompleteSplits(OSD *osd, const set<boost::intrusive_ptr<PG> > &in) + : osd(osd), pgs(in) {} + void finish(int r) { + Mutex::Locker l(osd->osd_lock); + PG::RecoveryCtx rctx = osd->create_context(); + set<pg_t> to_complete; + for (set<boost::intrusive_ptr<PG> >::iterator i = pgs.begin(); + i != pgs.end(); + ++i) { + (*i)->lock(); + osd->add_newly_split_pg(&**i, &rctx); + osd->dispatch_context_transaction(rctx, &**i); + if (!((*i)->deleting)) + to_complete.insert((*i)->info.pgid); + (*i)->unlock(); + } + osd->service.complete_split(to_complete); + osd->dispatch_context(rctx, 0, osd->service.get_osdmap()); + } +}; + void OSD::process_peering_events(const list<PG*> &pgs) { bool need_up_thru = false; @@ -5813,6 +6062,7 @@ void OSD::process_peering_events(const list<PG*> &pgs) for (list<PG*>::const_iterator i = pgs.begin(); i != pgs.end(); ++i) { + set<boost::intrusive_ptr<PG> > split_pgs; PG *pg = *i; pg->lock(); curmap = service.get_osdmap(); @@ -5820,7 +6070,7 @@ void OSD::process_peering_events(const list<PG*> &pgs) pg->unlock(); continue; } - advance_pg(curmap->get_epoch(), pg, &rctx); + advance_pg(curmap->get_epoch(), pg, &rctx, &split_pgs); if (!pg->peering_queue.empty()) { PG::CephPeeringEvtRef evt = pg->peering_queue.front(); pg->peering_queue.pop_front(); @@ -5830,6 +6080,10 @@ void OSD::process_peering_events(const list<PG*> &pgs) same_interval_since = MAX(pg->info.history.same_interval_since, same_interval_since); pg->write_if_dirty(*rctx.transaction); + if (split_pgs.size()) { + rctx.on_applied->add(new C_CompleteSplits(this, split_pgs)); + split_pgs.clear(); + } dispatch_context_transaction(rctx, pg); pg->unlock(); } diff --git a/src/osd/OSD.h b/src/osd/OSD.h index ba0da042a31..a87431a69d1 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -133,6 +133,7 @@ class AuthAuthorizeHandlerRegistry; class OpsFlightSocketHook; class HistoricOpsSocketHook; +struct C_CompleteSplits; extern const coll_t meta_coll; @@ -365,6 +366,19 @@ public: void init(); void shutdown(); + // split + Mutex in_progress_split_lock; + set<pg_t> in_progress_splits; + void _start_split(const set<pg_t> &pgs); + void start_split(const set<pg_t> &pgs) { + Mutex::Locker l(in_progress_split_lock); + return _start_split(pgs); + } + void complete_split(const set<pg_t> &pgs); + bool splitting(pg_t pgid); + void expand_pg_num(OSDMapRef old_map, + OSDMapRef new_map); + OSDService(OSD *osd); }; class OSD : public Dispatcher { @@ -601,6 +615,7 @@ private: } friend class OpsFlightSocketHook; friend class HistoricOpsSocketHook; + friend class C_CompleteSplits; OpsFlightSocketHook *admin_ops_hook; HistoricOpsSocketHook *historic_ops_hook; @@ -742,7 +757,9 @@ private: void note_down_osd(int osd); void note_up_osd(int osd); - void advance_pg(epoch_t advance_to, PG *pg, PG::RecoveryCtx *rctx); + void advance_pg( + epoch_t advance_to, PG *pg, PG::RecoveryCtx *rctx, + set<boost::intrusive_ptr<PG> > *split_pgs); void advance_map(ObjectStore::Transaction& t, C_Contexts *tfin); void activate_map(); @@ -782,6 +799,7 @@ protected: // -- placement groups -- hash_map<pg_t, PG*> pg_map; map<pg_t, list<OpRequestRef> > waiting_for_pg; + map<pg_t, list<PG::CephPeeringEvtRef> > peering_wait_for_split; PGRecoveryStats pg_recovery_stats; PGPool _get_pool(int id, OSDMapRef createmap); @@ -803,11 +821,15 @@ protected: PG *_lookup_qlock_pg(pg_t pgid); PG *lookup_lock_raw_pg(pg_t pgid); + PG* _make_pg(OSDMapRef createmap, pg_t pgid); + void add_newly_split_pg(PG *pg, + PG::RecoveryCtx *rctx); PG *get_or_create_pg(const pg_info_t& info, - pg_interval_map_t& pi, - epoch_t epoch, int from, int& pcreated, - bool primary); + pg_interval_map_t& pi, + epoch_t epoch, int from, int& pcreated, + bool primary, + OpRequestRef op); void load_pgs(); void build_past_intervals_parallel(); @@ -848,7 +870,12 @@ protected: void do_split(PG *parent, set<pg_t>& children, ObjectStore::Transaction &t, C_Contexts *tfin); void split_pg(PG *parent, map<pg_t,PG*>& children, ObjectStore::Transaction &t); - + void split_pgs( + PG *parent, + const set<pg_t> &childpgids, set<boost::intrusive_ptr<PG> > *out_pgs, + OSDMapRef curmap, + OSDMapRef nextmap, + PG::RecoveryCtx *rctx); // == monitor interaction == utime_t last_mon_report; diff --git a/src/osd/PG.cc b/src/osd/PG.cc index dfac75c5ca5..2323b988f70 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -1978,8 +1978,6 @@ void PG::split_into(pg_t child_pgid, PG *child, unsigned split_bits) child->info.last_backfill = info.last_backfill; child->info.stats = info.stats; - info.stats.stats_invalid = true; - child->info.stats.stats_invalid = true; child->snap_trimq = snap_trimq; |