diff options
author | Sage Weil <sage@inktank.com> | 2013-05-01 14:03:31 -0700 |
---|---|---|
committer | Sage Weil <sage@inktank.com> | 2013-05-01 14:03:31 -0700 |
commit | fdbab85ff3cb942555814dc0f8100a34bd3137a0 (patch) | |
tree | 770cd945c7112ca0c05ed3b10c0cc352c71ef7b0 | |
parent | 7bb145b27e5983b3828fb0b3134c5b8e8f089f3e (diff) | |
parent | a21ea0186d9a7ef136ccadf96c02ba683bc5e533 (diff) | |
download | ceph-fdbab85ff3cb942555814dc0f8100a34bd3137a0.tar.gz |
Merge remote-tracking branch 'gh/next'
-rwxr-xr-x | src/ceph-disk | 6 | ||||
-rw-r--r-- | src/ceph_mon.cc | 9 | ||||
-rw-r--r-- | src/common/config_opts.h | 9 | ||||
-rw-r--r-- | src/messages/MMonElection.h | 23 | ||||
-rw-r--r-- | src/mon/Elector.cc | 38 | ||||
-rw-r--r-- | src/mon/Elector.h | 14 | ||||
-rw-r--r-- | src/mon/Monitor.cc | 35 | ||||
-rw-r--r-- | src/mon/MonitorDBStore.h | 32 | ||||
-rw-r--r-- | src/mon/Paxos.cc | 10 | ||||
-rw-r--r-- | src/mon/Paxos.h | 7 | ||||
-rw-r--r-- | src/mon/PaxosService.cc | 14 | ||||
-rw-r--r-- | src/os/LevelDBStore.h | 16 | ||||
-rw-r--r-- | src/osd/OSD.cc | 126 | ||||
-rw-r--r-- | src/osd/OSD.h | 16 | ||||
-rw-r--r-- | src/osd/PG.cc | 74 | ||||
-rw-r--r-- | src/osd/PG.h | 15 | ||||
-rw-r--r-- | src/osd/ReplicatedPG.cc | 41 | ||||
-rw-r--r-- | src/osdc/ObjectCacher.cc | 22 | ||||
-rw-r--r-- | src/osdc/ObjectCacher.h | 3 |
19 files changed, 357 insertions, 153 deletions
diff --git a/src/ceph-disk b/src/ceph-disk index 8694ee1eefa..f1380be8701 100755 --- a/src/ceph-disk +++ b/src/ceph-disk @@ -1311,9 +1311,13 @@ def start_daemon( ], ) elif os.path.exists(os.path.join(path, 'sysvinit')): + if os.path.exists('/usr/sbin/service'): + svc = '/usr/sbin/service' + else: + svc = '/sbin/service' subprocess.check_call( args=[ - '/usr/sbin/service', + svc, 'ceph', 'start', 'osd.{osd_id}'.format(osd_id=osd_id), diff --git a/src/ceph_mon.cc b/src/ceph_mon.cc index 01072728db2..69bcf6d3282 100644 --- a/src/ceph_mon.cc +++ b/src/ceph_mon.cc @@ -114,6 +114,7 @@ int main(int argc, const char **argv) int err; bool mkfs = false; + bool compact = false; std::string osdmapfn, inject_monmap; vector<const char*> args; @@ -132,6 +133,8 @@ int main(int argc, const char **argv) exit(0); } else if (ceph_argparse_flag(args, i, "--mkfs", (char*)NULL)) { mkfs = true; + } else if (ceph_argparse_flag(args, i, "--compact", (char*)NULL)) { + compact = true; } else if (ceph_argparse_witharg(args, i, &val, "--osdmap", (char*)NULL)) { osdmapfn = val; } else if (ceph_argparse_witharg(args, i, &val, "--inject_monmap", (char*)NULL)) { @@ -474,6 +477,12 @@ int main(int argc, const char **argv) if (err < 0) return 1; + if (compact || g_conf->mon_compact_on_start) { + derr << "compacting monitor store ..." << dendl; + mon->store->compact(); + derr << "done compacting" << dendl; + } + global_init_daemonize(g_ceph_context, 0); common_init_finish(g_ceph_context); global_init_chdir(g_ceph_context); diff --git a/src/common/config_opts.h b/src/common/config_opts.h index 22d3d334f36..9f7dafeb218 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -124,6 +124,9 @@ OPTION(ms_inject_internal_delays, OPT_DOUBLE, 0) // seconds OPTION(mon_data, OPT_STR, "/var/lib/ceph/mon/$cluster-$id") OPTION(mon_initial_members, OPT_STR, "") // list of initial cluster mon ids; if specified, need majority to form initial quorum and create new cluster OPTION(mon_sync_fs_threshold, OPT_INT, 5) // sync() when writing this many objects; 0 to disable. +OPTION(mon_compact_on_start, OPT_BOOL, false) // compact leveldb on ceph-mon start +OPTION(mon_compact_on_bootstrap, OPT_BOOL, false) // trigger leveldb compaction on bootstrap +OPTION(mon_compact_on_trim, OPT_BOOL, true) // compact (a prefix) when we trim old states OPTION(mon_tick_interval, OPT_INT, 5) OPTION(mon_subscribe_interval, OPT_DOUBLE, 300) OPTION(mon_osd_laggy_halflife, OPT_INT, 60*60) // (seconds) how quickly our laggy estimations decay @@ -182,7 +185,7 @@ OPTION(mon_sync_provider_kill_at, OPT_INT, 0) // kill the sync provider at a spe OPTION(mon_sync_requester_kill_at, OPT_INT, 0) // kill the sync requester at a specific point in the work flow OPTION(mon_leveldb_write_buffer_size, OPT_U64, 32*1024*1024) // monitor's leveldb write buffer size OPTION(mon_leveldb_cache_size, OPT_U64, 0) // monitor's leveldb cache size -OPTION(mon_leveldb_block_size, OPT_U64, 4*1024*1024) // monitor's leveldb block size +OPTION(mon_leveldb_block_size, OPT_U64, 64*1024) // monitor's leveldb block size OPTION(mon_leveldb_bloom_size, OPT_INT, 0) // monitor's leveldb bloom bits per entry OPTION(mon_leveldb_max_open_files, OPT_INT, 0) // monitor's leveldb max open files OPTION(mon_leveldb_compression, OPT_BOOL, false) // monitor's leveldb uses compression @@ -192,8 +195,8 @@ OPTION(paxos_propose_interval, OPT_DOUBLE, 1.0) // gather updates for this long OPTION(paxos_min_wait, OPT_DOUBLE, 0.05) // min time to gather updates for after period of inactivity OPTION(paxos_trim_tolerance, OPT_INT, 30) // number of extra proposals tolerated before trimming OPTION(paxos_trim_disabled_max_versions, OPT_INT, 100) // maximum amount of versions we shall allow passing by without trimming -OPTION(paxos_service_trim_max, OPT_INT, 50) // maximum amount of versions to trim during a single proposal (0 disables it) -OPTION(paxos_service_trim_min, OPT_INT, 30) // minimum amount of versions to trigger a trim (0 disables it) +OPTION(paxos_service_trim_max, OPT_INT, 500) // maximum amount of versions to trim during a single proposal (0 disables it) +OPTION(paxos_service_trim_min, OPT_INT, 250) // minimum amount of versions to trigger a trim (0 disables it) OPTION(clock_offset, OPT_DOUBLE, 0) // how much to offset the system clock in Clock.cc OPTION(auth_cluster_required, OPT_STR, "cephx") // required of mon, mds, osd daemons OPTION(auth_service_required, OPT_STR, "cephx") // required by daemons of clients diff --git a/src/messages/MMonElection.h b/src/messages/MMonElection.h index 9771f6123d6..3d7dd4ec90e 100644 --- a/src/messages/MMonElection.h +++ b/src/messages/MMonElection.h @@ -45,19 +45,20 @@ public: bufferlist monmap_bl; set<int> quorum; uint64_t quorum_features; - version_t paxos_first_version; - version_t paxos_last_version; + /* the following were both used in the next branch for a while + * on user cluster, so we've left them in for compatibility. */ + version_t defunct_one; + version_t defunct_two; MMonElection() : Message(MSG_MON_ELECTION, HEAD_VERSION, COMPAT_VERSION), - op(0), epoch(0), quorum_features(0), paxos_first_version(0), - paxos_last_version(0) + op(0), epoch(0), quorum_features(0), defunct_one(0), + defunct_two(0) { } - MMonElection(int o, epoch_t e, MonMap *m, - version_t paxos_first, version_t paxos_last) + MMonElection(int o, epoch_t e, MonMap *m) : Message(MSG_MON_ELECTION, HEAD_VERSION, COMPAT_VERSION), fsid(m->fsid), op(o), epoch(e), quorum_features(0), - paxos_first_version(paxos_first), paxos_last_version(paxos_last) + defunct_one(0), defunct_two(0) { // encode using full feature set; we will reencode for dest later, // if necessary @@ -87,8 +88,8 @@ public: ::encode(monmap_bl, payload); ::encode(quorum, payload); ::encode(quorum_features, payload); - ::encode(paxos_first_version, payload); - ::encode(paxos_last_version, payload); + ::encode(defunct_one, payload); + ::encode(defunct_two, payload); } void decode_payload() { bufferlist::iterator p = payload.begin(); @@ -105,8 +106,8 @@ public: else quorum_features = 0; if (header.version >= 4) { - ::decode(paxos_first_version, p); - ::decode(paxos_last_version, p); + ::decode(defunct_one, p); + ::decode(defunct_two, p); } } diff --git a/src/mon/Elector.cc b/src/mon/Elector.cc index eed2d40e901..b3db1afab3c 100644 --- a/src/mon/Elector.cc +++ b/src/mon/Elector.cc @@ -55,6 +55,7 @@ void Elector::bump_epoch(epoch_t e) MonitorDBStore::Transaction t; t.put(Monitor::MONITOR_NAME, "election_epoch", epoch); mon->store->apply_transaction(t); + mon->reset(); // clear up some state electing_me = false; @@ -80,21 +81,18 @@ void Elector::start() electing_me = true; acked_me[mon->rank] = CEPH_FEATURES_ALL; leader_acked = -1; - acked_first_paxos_version = mon->paxos->get_first_committed(); // bcast to everyone else for (unsigned i=0; i<mon->monmap->size(); ++i) { if ((int)i == mon->rank) continue; - Message *m = new MMonElection(MMonElection::OP_PROPOSE, epoch, mon->monmap, - mon->paxos->get_first_committed(), - mon->paxos->get_version()); + Message *m = new MMonElection(MMonElection::OP_PROPOSE, epoch, mon->monmap); mon->messenger->send_message(m, mon->monmap->get_inst(i)); } reset_timer(); } -void Elector::defer(int who, version_t paxos_first) +void Elector::defer(int who) { dout(5) << "defer to " << who << dendl; @@ -106,11 +104,8 @@ void Elector::defer(int who, version_t paxos_first) // ack them leader_acked = who; - acked_first_paxos_version = paxos_first; ack_stamp = ceph_clock_now(g_ceph_context); - mon->messenger->send_message(new MMonElection(MMonElection::OP_ACK, epoch, mon->monmap, - mon->paxos->get_first_committed(), - mon->paxos->get_version()), + mon->messenger->send_message(new MMonElection(MMonElection::OP_ACK, epoch, mon->monmap), mon->monmap->get_inst(who)); // set a timer @@ -174,11 +169,9 @@ void Elector::victory() p != quorum.end(); ++p) { if (*p == mon->rank) continue; - MMonElection *m = new MMonElection(MMonElection::OP_VICTORY, epoch, - mon->monmap, - mon->paxos->get_first_committed(), - mon->paxos->get_version()); + MMonElection *m = new MMonElection(MMonElection::OP_VICTORY, epoch, mon->monmap); m->quorum = quorum; + m->quorum_features = features; mon->messenger->send_message(m, mon->monmap->get_inst(*p)); } @@ -213,13 +206,10 @@ void Elector::handle_propose(MMonElection *m) } } - if ((mon->rank < from) && - // be careful that we have new enough data to be leader! - (m->paxos_first_version <= mon->paxos->get_version())) { + if (mon->rank < from) { // i would win over them. if (leader_acked >= 0) { // we already acked someone - assert((leader_acked < from) || // and they still win, of course - (acked_first_paxos_version > mon->paxos->get_version())); + assert(leader_acked < from); // and they still win, of course dout(5) << "no, we already acked " << leader_acked << dendl; } else { // wait, i should win! @@ -228,20 +218,16 @@ void Elector::handle_propose(MMonElection *m) mon->start_election(); } } - } else if (m->paxos_last_version >= mon->paxos->get_first_committed()) { + } else { // they would win over me if (leader_acked < 0 || // haven't acked anyone yet, or leader_acked > from || // they would win over who you did ack, or - leader_acked == from) { // this is the guy we're already deferring to - defer(from, m->paxos_first_version); + leader_acked == from) { // this is the guy we're already deferring to + defer(from); } else { // ignore them! dout(5) << "no, we already acked " << leader_acked << dendl; } - } else { // they are too out-of-date - dout(5) << "no, they are too far behind; paxos version: " - << m->paxos_last_version << " versus my first " - << mon->paxos->get_first_committed() << dendl; } m->put(); @@ -286,7 +272,7 @@ void Elector::handle_victory(MMonElection *m) dout(5) << "handle_victory from " << m->get_source() << " quorum_features " << m->quorum_features << dendl; int from = m->get_source().num(); - assert((from < mon->rank) || (acked_first_paxos_version > mon->paxos->get_version())); + assert(from < mon->rank); assert(m->epoch % 2 == 0); leader_acked = -1; diff --git a/src/mon/Elector.h b/src/mon/Elector.h index 9cce81e9f49..d81eb239763 100644 --- a/src/mon/Elector.h +++ b/src/mon/Elector.h @@ -126,10 +126,6 @@ class Elector { */ int leader_acked; /** - * Indicates the first_paxos_commit on who we've acked - */ - version_t acked_first_paxos_version; - /** * Indicates when we have acked him */ utime_t ack_stamp; @@ -201,17 +197,16 @@ class Elector { * to become the Leader. We will only defer an election if the monitor we * are deferring to outranks us. * - * @pre @p who outranks us (who < our rank, or we're behind their store) + * @pre @p who outranks us (i.e., who < our rank) * @pre @p who outranks any other monitor we have deferred to in the past * @post electing_me is false * @post leader_acked equals @p who * @post we sent an ack message to @p who * @post we reset the expire_event timer * - * @param who Some other monitor's numeric identifier. - * @param paxos_first The other monitor's first committed paxos version + * @param who Some other monitor's numeric identifier. */ - void defer(int who, version_t paxos_first); + void defer(int who); /** * The election has taken too long and has expired. * @@ -331,8 +326,7 @@ class Elector { epoch(0), participating(true), electing_me(false), - leader_acked(-1), - acked_first_paxos_version(0) { } + leader_acked(-1) { } /** * Initiate the Elector class. diff --git a/src/mon/Monitor.cc b/src/mon/Monitor.cc index 162f9c6739b..a9b293571e8 100644 --- a/src/mon/Monitor.cc +++ b/src/mon/Monitor.cc @@ -635,6 +635,13 @@ void Monitor::bootstrap() reset(); + // sync store + if (g_conf->mon_compact_on_bootstrap) { + dout(10) << "bootstrap -- triggering compaction" << dendl; + store->compact(); + dout(10) << "bootstrap -- finished compaction" << dendl; + } + // singleton monitor? if (monmap->size() == 1 && rank == 0) { win_standalone_election(); @@ -773,13 +780,6 @@ void Monitor::handle_sync_start(MMonSync *m) { dout(10) << __func__ << " " << *m << dendl; - /** - * This looks a bit odd, but we've seen cases where sync start messages - * get bounced around and end up at the originator without anybody - * noticing! - */ - assert(m->reply_to != messenger->get_myinst()); - /* If we are not the leader, then some monitor picked us as the point of * entry to the quorum during its synchronization process. Therefore, we * have an obligation of forwarding this message to leader, so the sender @@ -822,6 +822,20 @@ void Monitor::handle_sync_start(MMonSync *m) assert(quorum.empty()); assert(sync_leader.get() != NULL); + /** + * This looks a bit odd, but we've seen cases where sync start messages + * get bounced around and end up at the originator without anybody + * noticing!* If it happens, just drop the message and the timeouts + * will clean everything up -- eventually. + * [*] If a leader gets elected who is too far behind, he'll drop into + * bootstrap and sync, but the person he sends his sync to thinks he's + * still the leader and forwards the reply back. + */ + if (m->reply_to == messenger->get_myinst()) { + m->put(); + return; + } + dout(10) << __func__ << " forward " << *m << " to our sync leader at " << sync_leader->entity << dendl; @@ -1927,10 +1941,9 @@ void Monitor::handle_probe_reply(MMonProbe *m) if (m->quorum.size()) { dout(10) << " existing quorum " << m->quorum << dendl; - if ((paxos->get_version() + g_conf->paxos_max_join_drift < - m->paxos_last_version) || - (paxos->get_version() < m->paxos_first_version)){ - dout(10) << " peer paxos version " << m->paxos_last_version + if (paxos->get_version() < m->paxos_first_version) { + dout(10) << " peer paxos versions [" << m->paxos_first_version + << "," << m->paxos_last_version << "]" << " vs my version " << paxos->get_version() << " (too far ahead)" << dendl; diff --git a/src/mon/MonitorDBStore.h b/src/mon/MonitorDBStore.h index ac2703ec5e6..c4c681043b1 100644 --- a/src/mon/MonitorDBStore.h +++ b/src/mon/MonitorDBStore.h @@ -70,6 +70,7 @@ class MonitorDBStore enum { OP_PUT = 1, OP_ERASE = 2, + OP_COMPACT_PREFIX = 3, }; void put(string prefix, string key, bufferlist& bl) { @@ -98,6 +99,10 @@ class MonitorDBStore erase(prefix, os.str()); } + void compact_prefix(string prefix) { + ops.push_back(Op(OP_COMPACT_PREFIX, prefix, string())); + } + void encode(bufferlist& bl) const { ENCODE_START(1, 1, bl); ::encode(ops, bl); @@ -157,6 +162,12 @@ class MonitorDBStore f->dump_string("key", op.key); } break; + case OP_COMPACT_PREFIX: + { + f->dump_string("type", "COMPACT_PREFIX"); + f->dump_string("prefix", op.prefix); + } + break; default: { f->dump_string("type", "unknown"); @@ -174,6 +185,7 @@ class MonitorDBStore int apply_transaction(MonitorDBStore::Transaction& t) { KeyValueDB::Transaction dbt = db->get_transaction(); + list<string> compact_prefixes; for (list<Op>::iterator it = t.ops.begin(); it != t.ops.end(); ++it) { Op& op = *it; switch (op.type) { @@ -183,13 +195,23 @@ class MonitorDBStore case Transaction::OP_ERASE: dbt->rmkey(op.prefix, op.key); break; + case Transaction::OP_COMPACT_PREFIX: + compact_prefixes.push_back(op.prefix); + break; default: derr << __func__ << " unknown op type " << op.type << dendl; ceph_assert(0); break; } } - return db->submit_transaction_sync(dbt); + int r = db->submit_transaction_sync(dbt); + if (r >= 0) { + while (!compact_prefixes.empty()) { + db->compact_prefix(compact_prefixes.front()); + compact_prefixes.pop_front(); + } + } + return r; } class StoreIteratorImpl { @@ -456,6 +478,14 @@ class MonitorDBStore return db->create_and_open(out); } + void compact() { + db->compact(); + } + + void compact_prefix(const string& prefix) { + db->compact_prefix(prefix); + } + MonitorDBStore(const string& path) : db(0) { string::const_reverse_iterator rit; int pos = 0; diff --git a/src/mon/Paxos.cc b/src/mon/Paxos.cc index 46eaf88273d..71ef2ec3de0 100644 --- a/src/mon/Paxos.cc +++ b/src/mon/Paxos.cc @@ -362,8 +362,9 @@ void Paxos::handle_last(MMonPaxos *last) return; } - // note peer's last_committed, in case we learn a new commit and need to - // push it to them. + // note peer's first_ and last_committed, in case we learn a new + // commit and need to push it to them. + peer_first_committed[last->get_source().num()] = last->first_committed; peer_last_committed[last->get_source().num()] = last->last_committed; if (last->first_committed > last_committed+1) { @@ -974,6 +975,10 @@ void Paxos::trim_to(MonitorDBStore::Transaction *t, t->erase(get_name(), from); from++; } + if (g_conf->mon_compact_on_trim) { + dout(10) << " compacting prefix" << dendl; + t->compact_prefix(get_name()); + } } void Paxos::trim_to(MonitorDBStore::Transaction *t, version_t first) @@ -985,6 +990,7 @@ void Paxos::trim_to(MonitorDBStore::Transaction *t, version_t first) return; trim_to(t, first_committed, first); t->put(get_name(), "first_committed", first); + first_committed = first; } void Paxos::trim_to(version_t first) diff --git a/src/mon/Paxos.h b/src/mon/Paxos.h index ca467ce3db8..2e1bb62dda9 100644 --- a/src/mon/Paxos.h +++ b/src/mon/Paxos.h @@ -300,8 +300,11 @@ private: */ version_t accepted_pn_from; /** - * @todo Check out if this map has any purpose at all. So far, we have only - * seen it being read, although it is never affected. + * Map holding the first committed version by each quorum member. + * + * The versions kept in this map are updated during the collect phase. + * When the Leader starts the collect phase, each Peon will reply with its + * first committed version, which will then be kept in this map. */ map<int,version_t> peer_first_committed; /** diff --git a/src/mon/PaxosService.cc b/src/mon/PaxosService.cc index d02cb1d7ab5..8f421ab3d81 100644 --- a/src/mon/PaxosService.cc +++ b/src/mon/PaxosService.cc @@ -294,13 +294,19 @@ void PaxosService::put_version(MonitorDBStore::Transaction *t, const string& prefix, version_t ver, bufferlist& bl) { - t->put(get_service_name(), ver, bl); + ostringstream os; + os << ver; + string key = mon->store->combine_strings(prefix, os.str()); + t->put(get_service_name(), key, bl); } int PaxosService::get_version(const string& prefix, version_t ver, bufferlist& bl) { - return mon->store->get(get_service_name(), ver, bl); + ostringstream os; + os << ver; + string key = mon->store->combine_strings(prefix, os.str()); + return mon->store->get(get_service_name(), key, bl); } void PaxosService::trim(MonitorDBStore::Transaction *t, @@ -318,6 +324,10 @@ void PaxosService::trim(MonitorDBStore::Transaction *t, t->erase(get_service_name(), full_key); } } + if (g_conf->mon_compact_on_trim) { + dout(20) << " compacting prefix " << get_service_name() << dendl; + t->compact_prefix(get_service_name()); + } } void PaxosService::encode_trim(MonitorDBStore::Transaction *t) diff --git a/src/os/LevelDBStore.h b/src/os/LevelDBStore.h index 6b1afceb753..83f2ed3b4c4 100644 --- a/src/os/LevelDBStore.h +++ b/src/os/LevelDBStore.h @@ -33,6 +33,22 @@ class LevelDBStore : public KeyValueDB { int init(ostream &out, bool create_if_missing); public: + /// compact the underlying leveldb store + void compact() { + db->CompactRange(NULL, NULL); + } + + /// compact leveldb for all keys with a given prefix + void compact_prefix(const string& prefix) { + // if we combine the prefix with key by adding a '\0' separator, + // a char(1) will capture all such keys. + string end = prefix; + end += (char)1; + leveldb::Slice cstart(prefix); + leveldb::Slice cend(end); + db->CompactRange(&cstart, &cend); + } + /** * options_t: Holds options which are minimally interpreted * on initialization and then passed through to LevelDB. diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 85edf66f3b4..e63361b8ddd 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -194,45 +194,107 @@ OSDService::OSDService(OSD *osd) : #endif {} -void OSDService::_start_split(const set<pg_t> &pgs) +void OSDService::_start_split(pg_t parent, const set<pg_t> &children) { - for (set<pg_t>::const_iterator i = pgs.begin(); - i != pgs.end(); + for (set<pg_t>::const_iterator i = children.begin(); + i != children.end(); + ++i) { + dout(10) << __func__ << ": Starting split on pg " << *i + << ", parent=" << parent << dendl; + assert(!pending_splits.count(*i)); + assert(!in_progress_splits.count(*i)); + pending_splits.insert(make_pair(*i, parent)); + + assert(!rev_pending_splits[parent].count(*i)); + rev_pending_splits[parent].insert(*i); + } +} + +void OSDService::mark_split_in_progress(pg_t parent, const set<pg_t> &children) +{ + Mutex::Locker l(in_progress_split_lock); + map<pg_t, set<pg_t> >::iterator piter = rev_pending_splits.find(parent); + assert(piter != rev_pending_splits.end()); + for (set<pg_t>::const_iterator i = children.begin(); + i != children.end(); ++i) { - dout(10) << __func__ << ": Starting split on pg " << *i << dendl; + assert(piter->second.count(*i)); + assert(pending_splits.count(*i)); assert(!in_progress_splits.count(*i)); + assert(pending_splits[*i] == parent); + + pending_splits.erase(*i); + piter->second.erase(*i); in_progress_splits.insert(*i); } + if (piter->second.empty()) + rev_pending_splits.erase(piter); +} + +void OSDService::cancel_pending_splits_for_parent(pg_t parent) +{ + Mutex::Locker l(in_progress_split_lock); + map<pg_t, set<pg_t> >::iterator piter = rev_pending_splits.find(parent); + if (piter == rev_pending_splits.end()) + return; + + for (set<pg_t>::iterator i = piter->second.begin(); + i != piter->second.end(); + ++i) { + assert(pending_splits.count(*i)); + assert(!in_progress_splits.count(*i)); + pending_splits.erase(*i); + } + rev_pending_splits.erase(piter); +} + +void OSDService::_maybe_split_pgid(OSDMapRef old_map, + OSDMapRef new_map, + pg_t pgid) +{ + assert(old_map->have_pg_pool(pgid.pool())); + if (pgid.ps() < static_cast<unsigned>(old_map->get_pg_num(pgid.pool()))) { + set<pg_t> children; + pgid.is_split(old_map->get_pg_num(pgid.pool()), + new_map->get_pg_num(pgid.pool()), &children); + _start_split(pgid, children); + } else { + assert(pgid.ps() < static_cast<unsigned>(new_map->get_pg_num(pgid.pool()))); + } } void OSDService::expand_pg_num(OSDMapRef old_map, OSDMapRef new_map) { Mutex::Locker l(in_progress_split_lock); - set<pg_t> children; for (set<pg_t>::iterator i = in_progress_splits.begin(); i != in_progress_splits.end(); - ) { - assert(old_map->have_pg_pool(i->pool())); + ) { if (!new_map->have_pg_pool(i->pool())) { in_progress_splits.erase(i++); } else { - if (i->ps() < static_cast<unsigned>(old_map->get_pg_num(i->pool()))) { - i->is_split(old_map->get_pg_num(i->pool()), - new_map->get_pg_num(i->pool()), &children); - } else { - assert(i->ps() < static_cast<unsigned>(new_map->get_pg_num(i->pool()))); - } + _maybe_split_pgid(old_map, new_map, *i); + ++i; + } + } + for (map<pg_t, pg_t>::iterator i = pending_splits.begin(); + i != pending_splits.end(); + ) { + if (!new_map->have_pg_pool(i->first.pool())) { + rev_pending_splits.erase(i->second); + pending_splits.erase(i++); + } else { + _maybe_split_pgid(old_map, new_map, i->first); ++i; } } - _start_split(children); } bool OSDService::splitting(pg_t pgid) { Mutex::Locker l(in_progress_split_lock); - return in_progress_splits.count(pgid); + return in_progress_splits.count(pgid) || + pending_splits.count(pgid); } void OSDService::complete_split(const set<pg_t> &pgs) @@ -242,6 +304,7 @@ void OSDService::complete_split(const set<pg_t> &pgs) i != pgs.end(); ++i) { dout(10) << __func__ << ": Completing split on pg " << *i << dendl; + assert(!pending_splits.count(*i)); assert(in_progress_splits.count(*i)); in_progress_splits.erase(*i); } @@ -1680,7 +1743,7 @@ void OSD::load_pgs() pg->info.pgid.is_split(pg->get_osdmap()->get_pg_num(pg->info.pgid.pool()), osdmap->get_pg_num(pg->info.pgid.pool()), &split_pgs)) { - service.start_split(split_pgs); + service.start_split(pg->info.pgid, split_pgs); } pg->reg_next_scrub(); @@ -3014,14 +3077,14 @@ void OSD::send_pg_stats(const utime_t &now) pg->put("pg_stat_queue"); continue; } - pg->pg_stats_lock.Lock(); - if (pg->pg_stats_valid) { - m->pg_stat[pg->info.pgid] = pg->pg_stats_stable; - dout(25) << " sending " << pg->info.pgid << " " << pg->pg_stats_stable.reported << dendl; + pg->pg_stats_publish_lock.Lock(); + if (pg->pg_stats_publish_valid) { + m->pg_stat[pg->info.pgid] = pg->pg_stats_publish; + dout(25) << " sending " << pg->info.pgid << " " << pg->pg_stats_publish.reported << dendl; } else { - dout(25) << " NOT sending " << pg->info.pgid << " " << pg->pg_stats_stable.reported << ", not valid" << dendl; + dout(25) << " NOT sending " << pg->info.pgid << " " << pg->pg_stats_publish.reported << ", not valid" << dendl; } - pg->pg_stats_lock.Unlock(); + pg->pg_stats_publish_lock.Unlock(); } if (!outstanding_pg_stats) { @@ -3060,18 +3123,18 @@ void OSD::handle_pg_stats_ack(MPGStatsAck *ack) if (ack->pg_stat.count(pg->info.pgid)) { eversion_t acked = ack->pg_stat[pg->info.pgid]; - pg->pg_stats_lock.Lock(); - if (acked == pg->pg_stats_stable.reported) { - dout(25) << " ack on " << pg->info.pgid << " " << pg->pg_stats_stable.reported << dendl; + pg->pg_stats_publish_lock.Lock(); + if (acked == pg->pg_stats_publish.reported) { + dout(25) << " ack on " << pg->info.pgid << " " << pg->pg_stats_publish.reported << dendl; pg->stat_queue_item.remove_myself(); pg->put("pg_stat_queue"); } else { - dout(25) << " still pending " << pg->info.pgid << " " << pg->pg_stats_stable.reported + dout(25) << " still pending " << pg->info.pgid << " " << pg->pg_stats_publish.reported << " > acked " << acked << dendl; } - pg->pg_stats_lock.Unlock(); + pg->pg_stats_publish_lock.Unlock(); } else { - dout(30) << " still pending " << pg->info.pgid << " " << pg->pg_stats_stable.reported << dendl; + dout(30) << " still pending " << pg->info.pgid << " " << pg->pg_stats_publish.reported << dendl; } } @@ -4385,6 +4448,7 @@ void OSD::advance_pg( lastmap->get_pg_num(pg->pool.id), nextmap->get_pg_num(pg->pool.id), &children)) { + service.mark_split_in_progress(pg->info.pgid, children); split_pgs( pg, children, new_pgs, lastmap, nextmap, rctx); @@ -4507,7 +4571,7 @@ void OSD::consume_map() service.get_osdmap()->get_pg_num(it->first.pool()), osdmap->get_pg_num(it->first.pool()), &split_pgs)) { - service.start_split(split_pgs); + service.start_split(it->first, split_pgs); } pg->unlock(); @@ -5153,7 +5217,7 @@ void OSD::handle_pg_create(OpRequestRef op) wake_pg_waiters(pg->info.pgid); pg->handle_create(&rctx); pg->write_if_dirty(*rctx.transaction); - pg->update_stats(); + pg->publish_stats_to_osd(); pg->unlock(); num_created++; } @@ -5841,6 +5905,8 @@ void OSD::_remove_pg(PG *pg) // and handle_notify_timeout pg->on_removal(rmt); + service.cancel_pending_splits_for_parent(pg->info.pgid); + coll_t to_remove = get_next_removal_coll(pg->info.pgid); removals.push_back(to_remove); rmt->collection_rename(coll_t(pg->info.pgid), to_remove); diff --git a/src/osd/OSD.h b/src/osd/OSD.h index 513bd43ec6c..f894768fbe5 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -386,16 +386,24 @@ public: // split Mutex in_progress_split_lock; - set<pg_t> in_progress_splits; - void _start_split(const set<pg_t> &pgs); - void start_split(const set<pg_t> &pgs) { + map<pg_t, pg_t> pending_splits; // child -> parent + map<pg_t, set<pg_t> > rev_pending_splits; // parent -> [children] + set<pg_t> in_progress_splits; // child + + void _start_split(pg_t parent, const set<pg_t> &children); + void start_split(pg_t parent, const set<pg_t> &children) { Mutex::Locker l(in_progress_split_lock); - return _start_split(pgs); + return _start_split(parent, children); } + void mark_split_in_progress(pg_t parent, const set<pg_t> &pgs); void complete_split(const set<pg_t> &pgs); + void cancel_pending_splits_for_parent(pg_t parent); bool splitting(pg_t pgid); void expand_pg_num(OSDMapRef old_map, OSDMapRef new_map); + void _maybe_split_pgid(OSDMapRef old_map, + OSDMapRef new_map, + pg_t pgid); // -- OSD Full Status -- Mutex full_status_lock; diff --git a/src/osd/PG.cc b/src/osd/PG.cc index 70584e44591..ae88be652da 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -163,8 +163,8 @@ PG::PG(OSDService *o, OSDMapRef curmap, backfill_reserved(0), backfill_reserving(0), flushed(false), - pg_stats_lock("PG::pg_stats_lock"), - pg_stats_valid(false), + pg_stats_publish_lock("PG::pg_stats_publish_lock"), + pg_stats_publish_valid(false), osr(osd->osr_registry.lookup_or_create(p, (stringify(p)))), finish_sync_event(NULL), scrub_after_recovery(false), @@ -790,7 +790,7 @@ bool PG::search_for_missing(const pg_info_t &oinfo, const pg_missing_t *omissing found_missing = true; } if (stats_updated) { - update_stats(); + publish_stats_to_osd(); } dout(20) << "search_for_missing missing " << missing.missing << dendl; @@ -1705,7 +1705,7 @@ void PG::activate(ObjectStore::Transaction& t, AllReplicasRecovered()))); } - update_stats(); + publish_stats_to_osd(); } // we need to flush this all out before doing anything else.. @@ -1848,7 +1848,7 @@ void PG::replay_queued_ops() requeue_ops(replay); requeue_ops(waiting_for_active); - update_stats(); + publish_stats_to_osd(); } void PG::_activate_committed(epoch_t e) @@ -1899,7 +1899,7 @@ void PG::all_activated_and_committed() info.history.last_epoch_started = info.last_epoch_started; share_pg_info(); - update_stats(); + publish_stats_to_osd(); queue_peering_event( CephPeeringEvtRef( @@ -1991,7 +1991,7 @@ void PG::_finish_recovery(Context *c) finish_sync_event = 0; purge_strays(); - update_stats(); + publish_stats_to_osd(); if (scrub_after_recovery) { dout(10) << "_finish_recovery requeueing for scrub" << dendl; @@ -2299,9 +2299,9 @@ void PG::update_heartbeat_peers() osd->need_heartbeat_peer_update(); } -void PG::update_stats() +void PG::publish_stats_to_osd() { - pg_stats_lock.Lock(); + pg_stats_publish_lock.Lock(); if (is_primary()) { // update our stat summary info.stats.reported.inc(info.history.same_primary_since); @@ -2339,17 +2339,18 @@ void PG::update_stats() info.stats.log_start = log.tail; info.stats.ondisk_log_start = log.tail; - pg_stats_valid = true; - pg_stats_stable = info.stats; + pg_stats_publish_valid = true; + pg_stats_publish = info.stats; + pg_stats_publish.stats.add(unstable_stats); // calc copies, degraded unsigned target = MAX(get_osdmap()->get_pg_size(info.pgid), acting.size()); - pg_stats_stable.stats.calc_copies(target); - pg_stats_stable.stats.sum.num_objects_degraded = 0; + pg_stats_publish.stats.calc_copies(target); + pg_stats_publish.stats.sum.num_objects_degraded = 0; if ((is_degraded() || !is_clean()) && is_active()) { // NOTE: we only generate copies, degraded, unfound values for // the summation, not individual stat categories. - uint64_t num_objects = pg_stats_stable.stats.sum.num_objects; + uint64_t num_objects = pg_stats_publish.stats.sum.num_objects; uint64_t degraded = 0; @@ -2358,7 +2359,7 @@ void PG::update_stats() degraded += (target - acting.size()) * num_objects; // missing on primary - pg_stats_stable.stats.sum.num_objects_missing_on_primary = missing.num_missing(); + pg_stats_publish.stats.sum.num_objects_missing_on_primary = missing.num_missing(); degraded += missing.num_missing(); for (unsigned i=1; i<acting.size(); i++) { @@ -2370,27 +2371,27 @@ void PG::update_stats() // not yet backfilled degraded += num_objects - peer_info[acting[i]].stats.stats.sum.num_objects; } - pg_stats_stable.stats.sum.num_objects_degraded = degraded; - pg_stats_stable.stats.sum.num_objects_unfound = get_num_unfound(); + pg_stats_publish.stats.sum.num_objects_degraded = degraded; + pg_stats_publish.stats.sum.num_objects_unfound = get_num_unfound(); } - dout(15) << "update_stats " << pg_stats_stable.reported << dendl; + dout(15) << "publish_stats_to_osd " << pg_stats_publish.reported << dendl; } else { - pg_stats_valid = false; - dout(15) << "update_stats -- not primary" << dendl; + pg_stats_publish_valid = false; + dout(15) << "publish_stats_to_osd -- not primary" << dendl; } - pg_stats_lock.Unlock(); + pg_stats_publish_lock.Unlock(); if (is_primary()) osd->pg_stat_queue_enqueue(this); } -void PG::clear_stats() +void PG::clear_publish_stats() { dout(15) << "clear_stats" << dendl; - pg_stats_lock.Lock(); - pg_stats_valid = false; - pg_stats_lock.Unlock(); + pg_stats_publish_lock.Lock(); + pg_stats_publish_valid = false; + pg_stats_publish_lock.Unlock(); osd->pg_stat_queue_dequeue(this); } @@ -2620,6 +2621,9 @@ int PG::_write_info(ObjectStore::Transaction& t, epoch_t epoch, void PG::write_info(ObjectStore::Transaction& t) { + info.stats.stats.add(unstable_stats); + unstable_stats.clear(); + int ret = _write_info(t, get_osdmap()->get_epoch(), info, coll, past_intervals, snap_collections, osd->infos_oid, info_struct_v, dirty_big_info); @@ -3817,7 +3821,7 @@ void PG::scrub() state_clear(PG_STATE_SCRUBBING); state_clear(PG_STATE_REPAIR); state_clear(PG_STATE_DEEP_SCRUB); - update_stats(); + publish_stats_to_osd(); unlock(); return; } @@ -3903,7 +3907,7 @@ void PG::classic_scrub() scrubber.active = true; scrubber.classic = true; - update_stats(); + publish_stats_to_osd(); scrubber.received_maps.clear(); scrubber.epoch_start = info.history.same_interval_since; @@ -4078,7 +4082,7 @@ void PG::chunky_scrub() { case PG::Scrubber::INACTIVE: dout(10) << "scrub start" << dendl; - update_stats(); + publish_stats_to_osd(); scrubber.epoch_start = info.history.same_interval_since; scrubber.active = true; @@ -4256,7 +4260,7 @@ void PG::scrub_clear_state() state_clear(PG_STATE_SCRUBBING); state_clear(PG_STATE_REPAIR); state_clear(PG_STATE_DEEP_SCRUB); - update_stats(); + publish_stats_to_osd(); // active -> nothing. if (scrubber.active) @@ -5093,7 +5097,7 @@ void PG::start_peering_interval(const OSDMapRef lastmap, // old primary? if (oldrole == 0) { state_clear(PG_STATE_CLEAN); - clear_stats(); + clear_publish_stats(); // take replay queue waiters list<OpRequestRef> ls; @@ -6048,7 +6052,7 @@ boost::statechart::result PG::RecoveryState::Primary::react(const ActMap&) { dout(7) << "handle ActMap primary" << dendl; PG *pg = context< RecoveryMachine >().pg; - pg->update_stats(); + pg->publish_stats_to_osd(); pg->take_waiters(); return discard_event(); } @@ -6568,7 +6572,7 @@ PG::RecoveryState::Clean::Clean(my_context ctx) pg->mark_clean(); pg->share_pg_info(); - pg->update_stats(); + pg->publish_stats_to_osd(); } @@ -6636,7 +6640,7 @@ boost::statechart::result PG::RecoveryState::Active::react(const AdvMap& advmap) pg->state_clear(PG_STATE_DEGRADED); else pg->state_set(PG_STATE_DEGRADED); - pg->update_stats(); // degraded may have changed + pg->publish_stats_to_osd(); // degraded may have changed } return forward_event(); } @@ -7015,7 +7019,7 @@ PG::RecoveryState::GetInfo::GetInfo(my_context ctx) if (!prior_set.get()) pg->build_prior(prior_set); - pg->update_stats(); + pg->publish_stats_to_osd(); get_infos(); if (peer_info_requested.empty() && !prior_set->pg_down) { @@ -7348,7 +7352,7 @@ PG::RecoveryState::Incomplete::Incomplete(my_context ctx) pg->state_clear(PG_STATE_PEERING); pg->state_set(PG_STATE_INCOMPLETE); - pg->update_stats(); + pg->publish_stats_to_osd(); } boost::statechart::result PG::RecoveryState::Incomplete::react(const AdvMap &advmap) { diff --git a/src/osd/PG.h b/src/osd/PG.h index 115ec1b5eaf..720fcb58772 100644 --- a/src/osd/PG.h +++ b/src/osd/PG.h @@ -714,16 +714,19 @@ protected: void requeue_object_waiters(map<hobject_t, list<OpRequestRef> >& m); void requeue_ops(list<OpRequestRef> &l); - // stats - Mutex pg_stats_lock; - bool pg_stats_valid; - pg_stat_t pg_stats_stable; + // stats that persist lazily + object_stat_collection_t unstable_stats; + + // publish stats + Mutex pg_stats_publish_lock; + bool pg_stats_publish_valid; + pg_stat_t pg_stats_publish; // for ordering writes std::tr1::shared_ptr<ObjectStore::Sequencer> osr; - void update_stats(); - void clear_stats(); + void publish_stats_to_osd(); + void clear_publish_stats(); public: void clear_primary_state(); diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index fb24375dca6..708e4153ca8 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -1039,7 +1039,7 @@ void ReplicatedPG::do_op(OpRequestRef op) if (ctx->op_t.empty() || result < 0) { if (result >= 0) { log_op_stats(ctx); - update_stats(); + publish_stats_to_osd(); } MOSDOpReply *reply = ctx->reply; @@ -2197,9 +2197,10 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops) if (r >= 0) { op.xattr.value_len = r; result = 0; + ctx->delta_stats.num_rd_kb += SHIFT_ROUND_UP(r, 10); + ctx->delta_stats.num_rd++; } else result = r; - ctx->delta_stats.num_rd++; } break; @@ -2217,6 +2218,8 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops) bufferlist bl; ::encode(newattrs, bl); + ctx->delta_stats.num_rd_kb += SHIFT_ROUND_UP(bl.length(), 10); + ctx->delta_stats.num_rd++; osd_op.outdata.claim_append(bl); } break; @@ -2237,6 +2240,9 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops) if (result < 0 && result != -EEXIST && result != -ENODATA) break; + ctx->delta_stats.num_rd++; + ctx->delta_stats.num_rd_kb += SHIFT_ROUND_UP(xattr.length(), 10); + switch (op.xattr.cmp_mode) { case CEPH_OSD_CMPXATTR_MODE_STRING: { @@ -2281,7 +2287,6 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops) } dout(10) << "comparison returned true" << dendl; - ctx->delta_stats.num_rd++; } break; @@ -2848,6 +2853,8 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops) out_set.insert(iter->first); } ::encode(out_set, osd_op.outdata); + ctx->delta_stats.num_rd_kb += SHIFT_ROUND_UP(osd_op.outdata.length(), 10); + ctx->delta_stats.num_rd++; break; } dout(10) << "failed, reading from omap" << dendl; @@ -2867,6 +2874,8 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops) } } ::encode(out_set, osd_op.outdata); + ctx->delta_stats.num_rd_kb += SHIFT_ROUND_UP(osd_op.outdata.length(), 10); + ctx->delta_stats.num_rd++; } break; case CEPH_OSD_OP_OMAPGETVALS: @@ -2901,6 +2910,8 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops) out_set.insert(*iter); } ::encode(out_set, osd_op.outdata); + ctx->delta_stats.num_rd_kb += SHIFT_ROUND_UP(osd_op.outdata.length(), 10); + ctx->delta_stats.num_rd++; break; } // No valid tmap, use omap @@ -2923,6 +2934,8 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops) } } ::encode(out_set, osd_op.outdata); + ctx->delta_stats.num_rd_kb += SHIFT_ROUND_UP(osd_op.outdata.length(), 10); + ctx->delta_stats.num_rd++; } break; case CEPH_OSD_OP_OMAPGETHEADER: @@ -2941,6 +2954,8 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops) dout(10) << "failed, reading from omap" << dendl; } osd->store->omap_get_header(coll, soid, &osd_op.outdata); + ctx->delta_stats.num_rd_kb += SHIFT_ROUND_UP(osd_op.outdata.length(), 10); + ctx->delta_stats.num_rd++; } break; case CEPH_OSD_OP_OMAPGETVALSBYKEYS: @@ -2969,6 +2984,8 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops) } } ::encode(out, osd_op.outdata); + ctx->delta_stats.num_rd_kb += SHIFT_ROUND_UP(osd_op.outdata.length(), 10); + ctx->delta_stats.num_rd++; break; } // No valid tmap, use omap @@ -2976,6 +2993,8 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops) } osd->store->omap_get_values(coll, soid, keys_to_get, &out); ::encode(out, osd_op.outdata); + ctx->delta_stats.num_rd_kb += SHIFT_ROUND_UP(osd_op.outdata.length(), 10); + ctx->delta_stats.num_rd++; } break; case CEPH_OSD_OP_OMAP_CMP: @@ -3004,6 +3023,8 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops) result = r; break; } + //Should set num_rd_kb based on encode length of map + ctx->delta_stats.num_rd++; r = 0; bufferlist empty; @@ -3066,6 +3087,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops) dout(20) << "\t" << i->first << dendl; } t.omap_setkeys(coll, soid, to_set); + ctx->delta_stats.num_wr++; } break; case CEPH_OSD_OP_OMAPSETHEADER: @@ -3079,6 +3101,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops) } t.touch(coll, soid); t.omap_setheader(coll, soid, osd_op.indata); + ctx->delta_stats.num_wr++; } break; case CEPH_OSD_OP_OMAPCLEAR: @@ -3092,6 +3115,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops) } t.touch(coll, soid); t.omap_clear(coll, soid); + ctx->delta_stats.num_wr++; } break; case CEPH_OSD_OP_OMAPRMKEYS: @@ -3113,6 +3137,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops) goto fail; } t.omap_rmkeys(coll, soid, to_rm); + ctx->delta_stats.num_wr++; } break; default: @@ -3575,6 +3600,7 @@ int ReplicatedPG::prepare_transaction(OpContext *ctx) // read-op? done? if (ctx->op_t.empty() && !ctx->modify) { ctx->reply_version = ctx->obs->oi.user_version; + unstable_stats.add(ctx->delta_stats, ctx->obc->obs.oi.category); return result; } @@ -3879,7 +3905,7 @@ void ReplicatedPG::eval_repop(RepGather *repop) if (repop->waitfor_disk.empty()) { log_op_stats(repop->ctx); - update_stats(); + publish_stats_to_osd(); // send dup commits, in order if (waiting_for_ondisk.count(repop->v)) { @@ -5454,7 +5480,7 @@ void ReplicatedPG::handle_pull_response(OpRequestRef op) if (complete) { pulling.erase(hoid); pull_from_peer[m->get_source().num()].erase(hoid); - update_stats(); + publish_stats_to_osd(); if (waiting_for_missing_object.count(hoid)) { dout(20) << " kicking waiters on " << hoid << dendl; requeue_ops(waiting_for_missing_object[hoid]); @@ -5701,7 +5727,7 @@ void ReplicatedPG::sub_op_push_reply(OpRequestRef op) pushing[soid].erase(peer); pi = NULL; - update_stats(); + publish_stats_to_osd(); if (pushing[soid].empty()) { pushing.erase(soid); @@ -6335,6 +6361,7 @@ void ReplicatedPG::on_change() snap_trimmer_machine.process_event(Reset()); debug_op_order.clear(); + unstable_stats.clear(); } void ReplicatedPG::on_role_change() @@ -7277,7 +7304,7 @@ void ReplicatedPG::_scrub_finish() if (repair) { ++scrubber.fixed; info.stats.stats = scrub_cstat; - update_stats(); + publish_stats_to_osd(); share_pg_info(); } } diff --git a/src/osdc/ObjectCacher.cc b/src/osdc/ObjectCacher.cc index 21370f0f982..56c56d640f1 100644 --- a/src/osdc/ObjectCacher.cc +++ b/src/osdc/ObjectCacher.cc @@ -502,7 +502,7 @@ ObjectCacher::ObjectCacher(CephContext *cct_, string name, WritebackHandler& wb, flush_set_callback(flush_callback), flush_set_callback_arg(flush_callback_arg), flusher_stop(false), flusher_thread(this), finisher(cct), stat_clean(0), stat_zero(0), stat_dirty(0), stat_rx(0), stat_tx(0), stat_missing(0), - stat_error(0), stat_dirty_waiting(0) + stat_error(0), stat_dirty_waiting(0), reads_outstanding(0) { this->max_dirty_age.set_from_double(max_dirty_age); perf_start(); @@ -592,7 +592,8 @@ void ObjectCacher::close_object(Object *ob) void ObjectCacher::bh_read(BufferHead *bh) { assert(lock.is_locked()); - ldout(cct, 7) << "bh_read on " << *bh << dendl; + ldout(cct, 7) << "bh_read on " << *bh << " outstanding reads " + << reads_outstanding << dendl; mark_rx(bh); @@ -607,6 +608,7 @@ void ObjectCacher::bh_read(BufferHead *bh) bh->start(), bh->length(), bh->ob->get_snap(), &onfinish->bl, oset->truncate_size, oset->truncate_seq, onfinish); + ++reads_outstanding; } void ObjectCacher::bh_read_finish(int64_t poolid, sobject_t oid, loff_t start, @@ -619,6 +621,7 @@ void ObjectCacher::bh_read_finish(int64_t poolid, sobject_t oid, loff_t start, << " " << start << "~" << length << " (bl is " << bl.length() << ")" << " returned " << r + << " outstanding reads " << reads_outstanding << dendl; if (bl.length() < length) { @@ -768,6 +771,8 @@ void ObjectCacher::bh_read_finish(int64_t poolid, sobject_t oid, loff_t start, ldout(cct, 20) << "finishing waiters " << ls << dendl; finish_contexts(cct, ls, err); + --reads_outstanding; + read_cond.Signal(); } @@ -1433,6 +1438,19 @@ void ObjectCacher::flusher_entry() break; flusher_cond.WaitInterval(cct, lock, utime_t(1,0)); } + + /* Wait for reads to finish. This is only possible if handling + * -ENOENT made some read completions finish before their rados read + * came back. If we don't wait for them, and destroy the cache, when + * the rados reads do come back their callback will try to access the + * no-longer-valid ObjectCacher. + */ + while (reads_outstanding > 0) { + ldout(cct, 10) << "Waiting for all reads to complete. Number left: " + << reads_outstanding << dendl; + read_cond.Wait(lock); + } + lock.Unlock(); ldout(cct, 10) << "flusher finish" << dendl; } diff --git a/src/osdc/ObjectCacher.h b/src/osdc/ObjectCacher.h index a17046f9126..80a90bd0457 100644 --- a/src/osdc/ObjectCacher.h +++ b/src/osdc/ObjectCacher.h @@ -444,6 +444,9 @@ class ObjectCacher { loff_t release(Object *o); void purge(Object *o); + int64_t reads_outstanding; + Cond read_cond; + int _readx(OSDRead *rd, ObjectSet *oset, Context *onfinish, bool external_call); |