summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSage Weil <sage@inktank.com>2013-05-01 14:03:31 -0700
committerSage Weil <sage@inktank.com>2013-05-01 14:03:31 -0700
commitfdbab85ff3cb942555814dc0f8100a34bd3137a0 (patch)
tree770cd945c7112ca0c05ed3b10c0cc352c71ef7b0
parent7bb145b27e5983b3828fb0b3134c5b8e8f089f3e (diff)
parenta21ea0186d9a7ef136ccadf96c02ba683bc5e533 (diff)
downloadceph-fdbab85ff3cb942555814dc0f8100a34bd3137a0.tar.gz
Merge remote-tracking branch 'gh/next'
-rwxr-xr-xsrc/ceph-disk6
-rw-r--r--src/ceph_mon.cc9
-rw-r--r--src/common/config_opts.h9
-rw-r--r--src/messages/MMonElection.h23
-rw-r--r--src/mon/Elector.cc38
-rw-r--r--src/mon/Elector.h14
-rw-r--r--src/mon/Monitor.cc35
-rw-r--r--src/mon/MonitorDBStore.h32
-rw-r--r--src/mon/Paxos.cc10
-rw-r--r--src/mon/Paxos.h7
-rw-r--r--src/mon/PaxosService.cc14
-rw-r--r--src/os/LevelDBStore.h16
-rw-r--r--src/osd/OSD.cc126
-rw-r--r--src/osd/OSD.h16
-rw-r--r--src/osd/PG.cc74
-rw-r--r--src/osd/PG.h15
-rw-r--r--src/osd/ReplicatedPG.cc41
-rw-r--r--src/osdc/ObjectCacher.cc22
-rw-r--r--src/osdc/ObjectCacher.h3
19 files changed, 357 insertions, 153 deletions
diff --git a/src/ceph-disk b/src/ceph-disk
index 8694ee1eefa..f1380be8701 100755
--- a/src/ceph-disk
+++ b/src/ceph-disk
@@ -1311,9 +1311,13 @@ def start_daemon(
],
)
elif os.path.exists(os.path.join(path, 'sysvinit')):
+ if os.path.exists('/usr/sbin/service'):
+ svc = '/usr/sbin/service'
+ else:
+ svc = '/sbin/service'
subprocess.check_call(
args=[
- '/usr/sbin/service',
+ svc,
'ceph',
'start',
'osd.{osd_id}'.format(osd_id=osd_id),
diff --git a/src/ceph_mon.cc b/src/ceph_mon.cc
index 01072728db2..69bcf6d3282 100644
--- a/src/ceph_mon.cc
+++ b/src/ceph_mon.cc
@@ -114,6 +114,7 @@ int main(int argc, const char **argv)
int err;
bool mkfs = false;
+ bool compact = false;
std::string osdmapfn, inject_monmap;
vector<const char*> args;
@@ -132,6 +133,8 @@ int main(int argc, const char **argv)
exit(0);
} else if (ceph_argparse_flag(args, i, "--mkfs", (char*)NULL)) {
mkfs = true;
+ } else if (ceph_argparse_flag(args, i, "--compact", (char*)NULL)) {
+ compact = true;
} else if (ceph_argparse_witharg(args, i, &val, "--osdmap", (char*)NULL)) {
osdmapfn = val;
} else if (ceph_argparse_witharg(args, i, &val, "--inject_monmap", (char*)NULL)) {
@@ -474,6 +477,12 @@ int main(int argc, const char **argv)
if (err < 0)
return 1;
+ if (compact || g_conf->mon_compact_on_start) {
+ derr << "compacting monitor store ..." << dendl;
+ mon->store->compact();
+ derr << "done compacting" << dendl;
+ }
+
global_init_daemonize(g_ceph_context, 0);
common_init_finish(g_ceph_context);
global_init_chdir(g_ceph_context);
diff --git a/src/common/config_opts.h b/src/common/config_opts.h
index 22d3d334f36..9f7dafeb218 100644
--- a/src/common/config_opts.h
+++ b/src/common/config_opts.h
@@ -124,6 +124,9 @@ OPTION(ms_inject_internal_delays, OPT_DOUBLE, 0) // seconds
OPTION(mon_data, OPT_STR, "/var/lib/ceph/mon/$cluster-$id")
OPTION(mon_initial_members, OPT_STR, "") // list of initial cluster mon ids; if specified, need majority to form initial quorum and create new cluster
OPTION(mon_sync_fs_threshold, OPT_INT, 5) // sync() when writing this many objects; 0 to disable.
+OPTION(mon_compact_on_start, OPT_BOOL, false) // compact leveldb on ceph-mon start
+OPTION(mon_compact_on_bootstrap, OPT_BOOL, false) // trigger leveldb compaction on bootstrap
+OPTION(mon_compact_on_trim, OPT_BOOL, true) // compact (a prefix) when we trim old states
OPTION(mon_tick_interval, OPT_INT, 5)
OPTION(mon_subscribe_interval, OPT_DOUBLE, 300)
OPTION(mon_osd_laggy_halflife, OPT_INT, 60*60) // (seconds) how quickly our laggy estimations decay
@@ -182,7 +185,7 @@ OPTION(mon_sync_provider_kill_at, OPT_INT, 0) // kill the sync provider at a spe
OPTION(mon_sync_requester_kill_at, OPT_INT, 0) // kill the sync requester at a specific point in the work flow
OPTION(mon_leveldb_write_buffer_size, OPT_U64, 32*1024*1024) // monitor's leveldb write buffer size
OPTION(mon_leveldb_cache_size, OPT_U64, 0) // monitor's leveldb cache size
-OPTION(mon_leveldb_block_size, OPT_U64, 4*1024*1024) // monitor's leveldb block size
+OPTION(mon_leveldb_block_size, OPT_U64, 64*1024) // monitor's leveldb block size
OPTION(mon_leveldb_bloom_size, OPT_INT, 0) // monitor's leveldb bloom bits per entry
OPTION(mon_leveldb_max_open_files, OPT_INT, 0) // monitor's leveldb max open files
OPTION(mon_leveldb_compression, OPT_BOOL, false) // monitor's leveldb uses compression
@@ -192,8 +195,8 @@ OPTION(paxos_propose_interval, OPT_DOUBLE, 1.0) // gather updates for this long
OPTION(paxos_min_wait, OPT_DOUBLE, 0.05) // min time to gather updates for after period of inactivity
OPTION(paxos_trim_tolerance, OPT_INT, 30) // number of extra proposals tolerated before trimming
OPTION(paxos_trim_disabled_max_versions, OPT_INT, 100) // maximum amount of versions we shall allow passing by without trimming
-OPTION(paxos_service_trim_max, OPT_INT, 50) // maximum amount of versions to trim during a single proposal (0 disables it)
-OPTION(paxos_service_trim_min, OPT_INT, 30) // minimum amount of versions to trigger a trim (0 disables it)
+OPTION(paxos_service_trim_max, OPT_INT, 500) // maximum amount of versions to trim during a single proposal (0 disables it)
+OPTION(paxos_service_trim_min, OPT_INT, 250) // minimum amount of versions to trigger a trim (0 disables it)
OPTION(clock_offset, OPT_DOUBLE, 0) // how much to offset the system clock in Clock.cc
OPTION(auth_cluster_required, OPT_STR, "cephx") // required of mon, mds, osd daemons
OPTION(auth_service_required, OPT_STR, "cephx") // required by daemons of clients
diff --git a/src/messages/MMonElection.h b/src/messages/MMonElection.h
index 9771f6123d6..3d7dd4ec90e 100644
--- a/src/messages/MMonElection.h
+++ b/src/messages/MMonElection.h
@@ -45,19 +45,20 @@ public:
bufferlist monmap_bl;
set<int> quorum;
uint64_t quorum_features;
- version_t paxos_first_version;
- version_t paxos_last_version;
+ /* the following were both used in the next branch for a while
+ * on user cluster, so we've left them in for compatibility. */
+ version_t defunct_one;
+ version_t defunct_two;
MMonElection() : Message(MSG_MON_ELECTION, HEAD_VERSION, COMPAT_VERSION),
- op(0), epoch(0), quorum_features(0), paxos_first_version(0),
- paxos_last_version(0)
+ op(0), epoch(0), quorum_features(0), defunct_one(0),
+ defunct_two(0)
{ }
- MMonElection(int o, epoch_t e, MonMap *m,
- version_t paxos_first, version_t paxos_last)
+ MMonElection(int o, epoch_t e, MonMap *m)
: Message(MSG_MON_ELECTION, HEAD_VERSION, COMPAT_VERSION),
fsid(m->fsid), op(o), epoch(e), quorum_features(0),
- paxos_first_version(paxos_first), paxos_last_version(paxos_last)
+ defunct_one(0), defunct_two(0)
{
// encode using full feature set; we will reencode for dest later,
// if necessary
@@ -87,8 +88,8 @@ public:
::encode(monmap_bl, payload);
::encode(quorum, payload);
::encode(quorum_features, payload);
- ::encode(paxos_first_version, payload);
- ::encode(paxos_last_version, payload);
+ ::encode(defunct_one, payload);
+ ::encode(defunct_two, payload);
}
void decode_payload() {
bufferlist::iterator p = payload.begin();
@@ -105,8 +106,8 @@ public:
else
quorum_features = 0;
if (header.version >= 4) {
- ::decode(paxos_first_version, p);
- ::decode(paxos_last_version, p);
+ ::decode(defunct_one, p);
+ ::decode(defunct_two, p);
}
}
diff --git a/src/mon/Elector.cc b/src/mon/Elector.cc
index eed2d40e901..b3db1afab3c 100644
--- a/src/mon/Elector.cc
+++ b/src/mon/Elector.cc
@@ -55,6 +55,7 @@ void Elector::bump_epoch(epoch_t e)
MonitorDBStore::Transaction t;
t.put(Monitor::MONITOR_NAME, "election_epoch", epoch);
mon->store->apply_transaction(t);
+ mon->reset();
// clear up some state
electing_me = false;
@@ -80,21 +81,18 @@ void Elector::start()
electing_me = true;
acked_me[mon->rank] = CEPH_FEATURES_ALL;
leader_acked = -1;
- acked_first_paxos_version = mon->paxos->get_first_committed();
// bcast to everyone else
for (unsigned i=0; i<mon->monmap->size(); ++i) {
if ((int)i == mon->rank) continue;
- Message *m = new MMonElection(MMonElection::OP_PROPOSE, epoch, mon->monmap,
- mon->paxos->get_first_committed(),
- mon->paxos->get_version());
+ Message *m = new MMonElection(MMonElection::OP_PROPOSE, epoch, mon->monmap);
mon->messenger->send_message(m, mon->monmap->get_inst(i));
}
reset_timer();
}
-void Elector::defer(int who, version_t paxos_first)
+void Elector::defer(int who)
{
dout(5) << "defer to " << who << dendl;
@@ -106,11 +104,8 @@ void Elector::defer(int who, version_t paxos_first)
// ack them
leader_acked = who;
- acked_first_paxos_version = paxos_first;
ack_stamp = ceph_clock_now(g_ceph_context);
- mon->messenger->send_message(new MMonElection(MMonElection::OP_ACK, epoch, mon->monmap,
- mon->paxos->get_first_committed(),
- mon->paxos->get_version()),
+ mon->messenger->send_message(new MMonElection(MMonElection::OP_ACK, epoch, mon->monmap),
mon->monmap->get_inst(who));
// set a timer
@@ -174,11 +169,9 @@ void Elector::victory()
p != quorum.end();
++p) {
if (*p == mon->rank) continue;
- MMonElection *m = new MMonElection(MMonElection::OP_VICTORY, epoch,
- mon->monmap,
- mon->paxos->get_first_committed(),
- mon->paxos->get_version());
+ MMonElection *m = new MMonElection(MMonElection::OP_VICTORY, epoch, mon->monmap);
m->quorum = quorum;
+ m->quorum_features = features;
mon->messenger->send_message(m, mon->monmap->get_inst(*p));
}
@@ -213,13 +206,10 @@ void Elector::handle_propose(MMonElection *m)
}
}
- if ((mon->rank < from) &&
- // be careful that we have new enough data to be leader!
- (m->paxos_first_version <= mon->paxos->get_version())) {
+ if (mon->rank < from) {
// i would win over them.
if (leader_acked >= 0) { // we already acked someone
- assert((leader_acked < from) || // and they still win, of course
- (acked_first_paxos_version > mon->paxos->get_version()));
+ assert(leader_acked < from); // and they still win, of course
dout(5) << "no, we already acked " << leader_acked << dendl;
} else {
// wait, i should win!
@@ -228,20 +218,16 @@ void Elector::handle_propose(MMonElection *m)
mon->start_election();
}
}
- } else if (m->paxos_last_version >= mon->paxos->get_first_committed()) {
+ } else {
// they would win over me
if (leader_acked < 0 || // haven't acked anyone yet, or
leader_acked > from || // they would win over who you did ack, or
- leader_acked == from) { // this is the guy we're already deferring to
- defer(from, m->paxos_first_version);
+ leader_acked == from) { // this is the guy we're already deferring to
+ defer(from);
} else {
// ignore them!
dout(5) << "no, we already acked " << leader_acked << dendl;
}
- } else { // they are too out-of-date
- dout(5) << "no, they are too far behind; paxos version: "
- << m->paxos_last_version << " versus my first "
- << mon->paxos->get_first_committed() << dendl;
}
m->put();
@@ -286,7 +272,7 @@ void Elector::handle_victory(MMonElection *m)
dout(5) << "handle_victory from " << m->get_source() << " quorum_features " << m->quorum_features << dendl;
int from = m->get_source().num();
- assert((from < mon->rank) || (acked_first_paxos_version > mon->paxos->get_version()));
+ assert(from < mon->rank);
assert(m->epoch % 2 == 0);
leader_acked = -1;
diff --git a/src/mon/Elector.h b/src/mon/Elector.h
index 9cce81e9f49..d81eb239763 100644
--- a/src/mon/Elector.h
+++ b/src/mon/Elector.h
@@ -126,10 +126,6 @@ class Elector {
*/
int leader_acked;
/**
- * Indicates the first_paxos_commit on who we've acked
- */
- version_t acked_first_paxos_version;
- /**
* Indicates when we have acked him
*/
utime_t ack_stamp;
@@ -201,17 +197,16 @@ class Elector {
* to become the Leader. We will only defer an election if the monitor we
* are deferring to outranks us.
*
- * @pre @p who outranks us (who < our rank, or we're behind their store)
+ * @pre @p who outranks us (i.e., who < our rank)
* @pre @p who outranks any other monitor we have deferred to in the past
* @post electing_me is false
* @post leader_acked equals @p who
* @post we sent an ack message to @p who
* @post we reset the expire_event timer
*
- * @param who Some other monitor's numeric identifier.
- * @param paxos_first The other monitor's first committed paxos version
+ * @param who Some other monitor's numeric identifier.
*/
- void defer(int who, version_t paxos_first);
+ void defer(int who);
/**
* The election has taken too long and has expired.
*
@@ -331,8 +326,7 @@ class Elector {
epoch(0),
participating(true),
electing_me(false),
- leader_acked(-1),
- acked_first_paxos_version(0) { }
+ leader_acked(-1) { }
/**
* Initiate the Elector class.
diff --git a/src/mon/Monitor.cc b/src/mon/Monitor.cc
index 162f9c6739b..a9b293571e8 100644
--- a/src/mon/Monitor.cc
+++ b/src/mon/Monitor.cc
@@ -635,6 +635,13 @@ void Monitor::bootstrap()
reset();
+ // sync store
+ if (g_conf->mon_compact_on_bootstrap) {
+ dout(10) << "bootstrap -- triggering compaction" << dendl;
+ store->compact();
+ dout(10) << "bootstrap -- finished compaction" << dendl;
+ }
+
// singleton monitor?
if (monmap->size() == 1 && rank == 0) {
win_standalone_election();
@@ -773,13 +780,6 @@ void Monitor::handle_sync_start(MMonSync *m)
{
dout(10) << __func__ << " " << *m << dendl;
- /**
- * This looks a bit odd, but we've seen cases where sync start messages
- * get bounced around and end up at the originator without anybody
- * noticing!
- */
- assert(m->reply_to != messenger->get_myinst());
-
/* If we are not the leader, then some monitor picked us as the point of
* entry to the quorum during its synchronization process. Therefore, we
* have an obligation of forwarding this message to leader, so the sender
@@ -822,6 +822,20 @@ void Monitor::handle_sync_start(MMonSync *m)
assert(quorum.empty());
assert(sync_leader.get() != NULL);
+ /**
+ * This looks a bit odd, but we've seen cases where sync start messages
+ * get bounced around and end up at the originator without anybody
+ * noticing!* If it happens, just drop the message and the timeouts
+ * will clean everything up -- eventually.
+ * [*] If a leader gets elected who is too far behind, he'll drop into
+ * bootstrap and sync, but the person he sends his sync to thinks he's
+ * still the leader and forwards the reply back.
+ */
+ if (m->reply_to == messenger->get_myinst()) {
+ m->put();
+ return;
+ }
+
dout(10) << __func__ << " forward " << *m
<< " to our sync leader at "
<< sync_leader->entity << dendl;
@@ -1927,10 +1941,9 @@ void Monitor::handle_probe_reply(MMonProbe *m)
if (m->quorum.size()) {
dout(10) << " existing quorum " << m->quorum << dendl;
- if ((paxos->get_version() + g_conf->paxos_max_join_drift <
- m->paxos_last_version) ||
- (paxos->get_version() < m->paxos_first_version)){
- dout(10) << " peer paxos version " << m->paxos_last_version
+ if (paxos->get_version() < m->paxos_first_version) {
+ dout(10) << " peer paxos versions [" << m->paxos_first_version
+ << "," << m->paxos_last_version << "]"
<< " vs my version " << paxos->get_version()
<< " (too far ahead)"
<< dendl;
diff --git a/src/mon/MonitorDBStore.h b/src/mon/MonitorDBStore.h
index ac2703ec5e6..c4c681043b1 100644
--- a/src/mon/MonitorDBStore.h
+++ b/src/mon/MonitorDBStore.h
@@ -70,6 +70,7 @@ class MonitorDBStore
enum {
OP_PUT = 1,
OP_ERASE = 2,
+ OP_COMPACT_PREFIX = 3,
};
void put(string prefix, string key, bufferlist& bl) {
@@ -98,6 +99,10 @@ class MonitorDBStore
erase(prefix, os.str());
}
+ void compact_prefix(string prefix) {
+ ops.push_back(Op(OP_COMPACT_PREFIX, prefix, string()));
+ }
+
void encode(bufferlist& bl) const {
ENCODE_START(1, 1, bl);
::encode(ops, bl);
@@ -157,6 +162,12 @@ class MonitorDBStore
f->dump_string("key", op.key);
}
break;
+ case OP_COMPACT_PREFIX:
+ {
+ f->dump_string("type", "COMPACT_PREFIX");
+ f->dump_string("prefix", op.prefix);
+ }
+ break;
default:
{
f->dump_string("type", "unknown");
@@ -174,6 +185,7 @@ class MonitorDBStore
int apply_transaction(MonitorDBStore::Transaction& t) {
KeyValueDB::Transaction dbt = db->get_transaction();
+ list<string> compact_prefixes;
for (list<Op>::iterator it = t.ops.begin(); it != t.ops.end(); ++it) {
Op& op = *it;
switch (op.type) {
@@ -183,13 +195,23 @@ class MonitorDBStore
case Transaction::OP_ERASE:
dbt->rmkey(op.prefix, op.key);
break;
+ case Transaction::OP_COMPACT_PREFIX:
+ compact_prefixes.push_back(op.prefix);
+ break;
default:
derr << __func__ << " unknown op type " << op.type << dendl;
ceph_assert(0);
break;
}
}
- return db->submit_transaction_sync(dbt);
+ int r = db->submit_transaction_sync(dbt);
+ if (r >= 0) {
+ while (!compact_prefixes.empty()) {
+ db->compact_prefix(compact_prefixes.front());
+ compact_prefixes.pop_front();
+ }
+ }
+ return r;
}
class StoreIteratorImpl {
@@ -456,6 +478,14 @@ class MonitorDBStore
return db->create_and_open(out);
}
+ void compact() {
+ db->compact();
+ }
+
+ void compact_prefix(const string& prefix) {
+ db->compact_prefix(prefix);
+ }
+
MonitorDBStore(const string& path) : db(0) {
string::const_reverse_iterator rit;
int pos = 0;
diff --git a/src/mon/Paxos.cc b/src/mon/Paxos.cc
index 46eaf88273d..71ef2ec3de0 100644
--- a/src/mon/Paxos.cc
+++ b/src/mon/Paxos.cc
@@ -362,8 +362,9 @@ void Paxos::handle_last(MMonPaxos *last)
return;
}
- // note peer's last_committed, in case we learn a new commit and need to
- // push it to them.
+ // note peer's first_ and last_committed, in case we learn a new
+ // commit and need to push it to them.
+ peer_first_committed[last->get_source().num()] = last->first_committed;
peer_last_committed[last->get_source().num()] = last->last_committed;
if (last->first_committed > last_committed+1) {
@@ -974,6 +975,10 @@ void Paxos::trim_to(MonitorDBStore::Transaction *t,
t->erase(get_name(), from);
from++;
}
+ if (g_conf->mon_compact_on_trim) {
+ dout(10) << " compacting prefix" << dendl;
+ t->compact_prefix(get_name());
+ }
}
void Paxos::trim_to(MonitorDBStore::Transaction *t, version_t first)
@@ -985,6 +990,7 @@ void Paxos::trim_to(MonitorDBStore::Transaction *t, version_t first)
return;
trim_to(t, first_committed, first);
t->put(get_name(), "first_committed", first);
+ first_committed = first;
}
void Paxos::trim_to(version_t first)
diff --git a/src/mon/Paxos.h b/src/mon/Paxos.h
index ca467ce3db8..2e1bb62dda9 100644
--- a/src/mon/Paxos.h
+++ b/src/mon/Paxos.h
@@ -300,8 +300,11 @@ private:
*/
version_t accepted_pn_from;
/**
- * @todo Check out if this map has any purpose at all. So far, we have only
- * seen it being read, although it is never affected.
+ * Map holding the first committed version by each quorum member.
+ *
+ * The versions kept in this map are updated during the collect phase.
+ * When the Leader starts the collect phase, each Peon will reply with its
+ * first committed version, which will then be kept in this map.
*/
map<int,version_t> peer_first_committed;
/**
diff --git a/src/mon/PaxosService.cc b/src/mon/PaxosService.cc
index d02cb1d7ab5..8f421ab3d81 100644
--- a/src/mon/PaxosService.cc
+++ b/src/mon/PaxosService.cc
@@ -294,13 +294,19 @@ void PaxosService::put_version(MonitorDBStore::Transaction *t,
const string& prefix, version_t ver,
bufferlist& bl)
{
- t->put(get_service_name(), ver, bl);
+ ostringstream os;
+ os << ver;
+ string key = mon->store->combine_strings(prefix, os.str());
+ t->put(get_service_name(), key, bl);
}
int PaxosService::get_version(const string& prefix, version_t ver,
bufferlist& bl)
{
- return mon->store->get(get_service_name(), ver, bl);
+ ostringstream os;
+ os << ver;
+ string key = mon->store->combine_strings(prefix, os.str());
+ return mon->store->get(get_service_name(), key, bl);
}
void PaxosService::trim(MonitorDBStore::Transaction *t,
@@ -318,6 +324,10 @@ void PaxosService::trim(MonitorDBStore::Transaction *t,
t->erase(get_service_name(), full_key);
}
}
+ if (g_conf->mon_compact_on_trim) {
+ dout(20) << " compacting prefix " << get_service_name() << dendl;
+ t->compact_prefix(get_service_name());
+ }
}
void PaxosService::encode_trim(MonitorDBStore::Transaction *t)
diff --git a/src/os/LevelDBStore.h b/src/os/LevelDBStore.h
index 6b1afceb753..83f2ed3b4c4 100644
--- a/src/os/LevelDBStore.h
+++ b/src/os/LevelDBStore.h
@@ -33,6 +33,22 @@ class LevelDBStore : public KeyValueDB {
int init(ostream &out, bool create_if_missing);
public:
+ /// compact the underlying leveldb store
+ void compact() {
+ db->CompactRange(NULL, NULL);
+ }
+
+ /// compact leveldb for all keys with a given prefix
+ void compact_prefix(const string& prefix) {
+ // if we combine the prefix with key by adding a '\0' separator,
+ // a char(1) will capture all such keys.
+ string end = prefix;
+ end += (char)1;
+ leveldb::Slice cstart(prefix);
+ leveldb::Slice cend(end);
+ db->CompactRange(&cstart, &cend);
+ }
+
/**
* options_t: Holds options which are minimally interpreted
* on initialization and then passed through to LevelDB.
diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc
index 85edf66f3b4..e63361b8ddd 100644
--- a/src/osd/OSD.cc
+++ b/src/osd/OSD.cc
@@ -194,45 +194,107 @@ OSDService::OSDService(OSD *osd) :
#endif
{}
-void OSDService::_start_split(const set<pg_t> &pgs)
+void OSDService::_start_split(pg_t parent, const set<pg_t> &children)
{
- for (set<pg_t>::const_iterator i = pgs.begin();
- i != pgs.end();
+ for (set<pg_t>::const_iterator i = children.begin();
+ i != children.end();
+ ++i) {
+ dout(10) << __func__ << ": Starting split on pg " << *i
+ << ", parent=" << parent << dendl;
+ assert(!pending_splits.count(*i));
+ assert(!in_progress_splits.count(*i));
+ pending_splits.insert(make_pair(*i, parent));
+
+ assert(!rev_pending_splits[parent].count(*i));
+ rev_pending_splits[parent].insert(*i);
+ }
+}
+
+void OSDService::mark_split_in_progress(pg_t parent, const set<pg_t> &children)
+{
+ Mutex::Locker l(in_progress_split_lock);
+ map<pg_t, set<pg_t> >::iterator piter = rev_pending_splits.find(parent);
+ assert(piter != rev_pending_splits.end());
+ for (set<pg_t>::const_iterator i = children.begin();
+ i != children.end();
++i) {
- dout(10) << __func__ << ": Starting split on pg " << *i << dendl;
+ assert(piter->second.count(*i));
+ assert(pending_splits.count(*i));
assert(!in_progress_splits.count(*i));
+ assert(pending_splits[*i] == parent);
+
+ pending_splits.erase(*i);
+ piter->second.erase(*i);
in_progress_splits.insert(*i);
}
+ if (piter->second.empty())
+ rev_pending_splits.erase(piter);
+}
+
+void OSDService::cancel_pending_splits_for_parent(pg_t parent)
+{
+ Mutex::Locker l(in_progress_split_lock);
+ map<pg_t, set<pg_t> >::iterator piter = rev_pending_splits.find(parent);
+ if (piter == rev_pending_splits.end())
+ return;
+
+ for (set<pg_t>::iterator i = piter->second.begin();
+ i != piter->second.end();
+ ++i) {
+ assert(pending_splits.count(*i));
+ assert(!in_progress_splits.count(*i));
+ pending_splits.erase(*i);
+ }
+ rev_pending_splits.erase(piter);
+}
+
+void OSDService::_maybe_split_pgid(OSDMapRef old_map,
+ OSDMapRef new_map,
+ pg_t pgid)
+{
+ assert(old_map->have_pg_pool(pgid.pool()));
+ if (pgid.ps() < static_cast<unsigned>(old_map->get_pg_num(pgid.pool()))) {
+ set<pg_t> children;
+ pgid.is_split(old_map->get_pg_num(pgid.pool()),
+ new_map->get_pg_num(pgid.pool()), &children);
+ _start_split(pgid, children);
+ } else {
+ assert(pgid.ps() < static_cast<unsigned>(new_map->get_pg_num(pgid.pool())));
+ }
}
void OSDService::expand_pg_num(OSDMapRef old_map,
OSDMapRef new_map)
{
Mutex::Locker l(in_progress_split_lock);
- set<pg_t> children;
for (set<pg_t>::iterator i = in_progress_splits.begin();
i != in_progress_splits.end();
- ) {
- assert(old_map->have_pg_pool(i->pool()));
+ ) {
if (!new_map->have_pg_pool(i->pool())) {
in_progress_splits.erase(i++);
} else {
- if (i->ps() < static_cast<unsigned>(old_map->get_pg_num(i->pool()))) {
- i->is_split(old_map->get_pg_num(i->pool()),
- new_map->get_pg_num(i->pool()), &children);
- } else {
- assert(i->ps() < static_cast<unsigned>(new_map->get_pg_num(i->pool())));
- }
+ _maybe_split_pgid(old_map, new_map, *i);
+ ++i;
+ }
+ }
+ for (map<pg_t, pg_t>::iterator i = pending_splits.begin();
+ i != pending_splits.end();
+ ) {
+ if (!new_map->have_pg_pool(i->first.pool())) {
+ rev_pending_splits.erase(i->second);
+ pending_splits.erase(i++);
+ } else {
+ _maybe_split_pgid(old_map, new_map, i->first);
++i;
}
}
- _start_split(children);
}
bool OSDService::splitting(pg_t pgid)
{
Mutex::Locker l(in_progress_split_lock);
- return in_progress_splits.count(pgid);
+ return in_progress_splits.count(pgid) ||
+ pending_splits.count(pgid);
}
void OSDService::complete_split(const set<pg_t> &pgs)
@@ -242,6 +304,7 @@ void OSDService::complete_split(const set<pg_t> &pgs)
i != pgs.end();
++i) {
dout(10) << __func__ << ": Completing split on pg " << *i << dendl;
+ assert(!pending_splits.count(*i));
assert(in_progress_splits.count(*i));
in_progress_splits.erase(*i);
}
@@ -1680,7 +1743,7 @@ void OSD::load_pgs()
pg->info.pgid.is_split(pg->get_osdmap()->get_pg_num(pg->info.pgid.pool()),
osdmap->get_pg_num(pg->info.pgid.pool()),
&split_pgs)) {
- service.start_split(split_pgs);
+ service.start_split(pg->info.pgid, split_pgs);
}
pg->reg_next_scrub();
@@ -3014,14 +3077,14 @@ void OSD::send_pg_stats(const utime_t &now)
pg->put("pg_stat_queue");
continue;
}
- pg->pg_stats_lock.Lock();
- if (pg->pg_stats_valid) {
- m->pg_stat[pg->info.pgid] = pg->pg_stats_stable;
- dout(25) << " sending " << pg->info.pgid << " " << pg->pg_stats_stable.reported << dendl;
+ pg->pg_stats_publish_lock.Lock();
+ if (pg->pg_stats_publish_valid) {
+ m->pg_stat[pg->info.pgid] = pg->pg_stats_publish;
+ dout(25) << " sending " << pg->info.pgid << " " << pg->pg_stats_publish.reported << dendl;
} else {
- dout(25) << " NOT sending " << pg->info.pgid << " " << pg->pg_stats_stable.reported << ", not valid" << dendl;
+ dout(25) << " NOT sending " << pg->info.pgid << " " << pg->pg_stats_publish.reported << ", not valid" << dendl;
}
- pg->pg_stats_lock.Unlock();
+ pg->pg_stats_publish_lock.Unlock();
}
if (!outstanding_pg_stats) {
@@ -3060,18 +3123,18 @@ void OSD::handle_pg_stats_ack(MPGStatsAck *ack)
if (ack->pg_stat.count(pg->info.pgid)) {
eversion_t acked = ack->pg_stat[pg->info.pgid];
- pg->pg_stats_lock.Lock();
- if (acked == pg->pg_stats_stable.reported) {
- dout(25) << " ack on " << pg->info.pgid << " " << pg->pg_stats_stable.reported << dendl;
+ pg->pg_stats_publish_lock.Lock();
+ if (acked == pg->pg_stats_publish.reported) {
+ dout(25) << " ack on " << pg->info.pgid << " " << pg->pg_stats_publish.reported << dendl;
pg->stat_queue_item.remove_myself();
pg->put("pg_stat_queue");
} else {
- dout(25) << " still pending " << pg->info.pgid << " " << pg->pg_stats_stable.reported
+ dout(25) << " still pending " << pg->info.pgid << " " << pg->pg_stats_publish.reported
<< " > acked " << acked << dendl;
}
- pg->pg_stats_lock.Unlock();
+ pg->pg_stats_publish_lock.Unlock();
} else {
- dout(30) << " still pending " << pg->info.pgid << " " << pg->pg_stats_stable.reported << dendl;
+ dout(30) << " still pending " << pg->info.pgid << " " << pg->pg_stats_publish.reported << dendl;
}
}
@@ -4385,6 +4448,7 @@ void OSD::advance_pg(
lastmap->get_pg_num(pg->pool.id),
nextmap->get_pg_num(pg->pool.id),
&children)) {
+ service.mark_split_in_progress(pg->info.pgid, children);
split_pgs(
pg, children, new_pgs, lastmap, nextmap,
rctx);
@@ -4507,7 +4571,7 @@ void OSD::consume_map()
service.get_osdmap()->get_pg_num(it->first.pool()),
osdmap->get_pg_num(it->first.pool()),
&split_pgs)) {
- service.start_split(split_pgs);
+ service.start_split(it->first, split_pgs);
}
pg->unlock();
@@ -5153,7 +5217,7 @@ void OSD::handle_pg_create(OpRequestRef op)
wake_pg_waiters(pg->info.pgid);
pg->handle_create(&rctx);
pg->write_if_dirty(*rctx.transaction);
- pg->update_stats();
+ pg->publish_stats_to_osd();
pg->unlock();
num_created++;
}
@@ -5841,6 +5905,8 @@ void OSD::_remove_pg(PG *pg)
// and handle_notify_timeout
pg->on_removal(rmt);
+ service.cancel_pending_splits_for_parent(pg->info.pgid);
+
coll_t to_remove = get_next_removal_coll(pg->info.pgid);
removals.push_back(to_remove);
rmt->collection_rename(coll_t(pg->info.pgid), to_remove);
diff --git a/src/osd/OSD.h b/src/osd/OSD.h
index 513bd43ec6c..f894768fbe5 100644
--- a/src/osd/OSD.h
+++ b/src/osd/OSD.h
@@ -386,16 +386,24 @@ public:
// split
Mutex in_progress_split_lock;
- set<pg_t> in_progress_splits;
- void _start_split(const set<pg_t> &pgs);
- void start_split(const set<pg_t> &pgs) {
+ map<pg_t, pg_t> pending_splits; // child -> parent
+ map<pg_t, set<pg_t> > rev_pending_splits; // parent -> [children]
+ set<pg_t> in_progress_splits; // child
+
+ void _start_split(pg_t parent, const set<pg_t> &children);
+ void start_split(pg_t parent, const set<pg_t> &children) {
Mutex::Locker l(in_progress_split_lock);
- return _start_split(pgs);
+ return _start_split(parent, children);
}
+ void mark_split_in_progress(pg_t parent, const set<pg_t> &pgs);
void complete_split(const set<pg_t> &pgs);
+ void cancel_pending_splits_for_parent(pg_t parent);
bool splitting(pg_t pgid);
void expand_pg_num(OSDMapRef old_map,
OSDMapRef new_map);
+ void _maybe_split_pgid(OSDMapRef old_map,
+ OSDMapRef new_map,
+ pg_t pgid);
// -- OSD Full Status --
Mutex full_status_lock;
diff --git a/src/osd/PG.cc b/src/osd/PG.cc
index 70584e44591..ae88be652da 100644
--- a/src/osd/PG.cc
+++ b/src/osd/PG.cc
@@ -163,8 +163,8 @@ PG::PG(OSDService *o, OSDMapRef curmap,
backfill_reserved(0),
backfill_reserving(0),
flushed(false),
- pg_stats_lock("PG::pg_stats_lock"),
- pg_stats_valid(false),
+ pg_stats_publish_lock("PG::pg_stats_publish_lock"),
+ pg_stats_publish_valid(false),
osr(osd->osr_registry.lookup_or_create(p, (stringify(p)))),
finish_sync_event(NULL),
scrub_after_recovery(false),
@@ -790,7 +790,7 @@ bool PG::search_for_missing(const pg_info_t &oinfo, const pg_missing_t *omissing
found_missing = true;
}
if (stats_updated) {
- update_stats();
+ publish_stats_to_osd();
}
dout(20) << "search_for_missing missing " << missing.missing << dendl;
@@ -1705,7 +1705,7 @@ void PG::activate(ObjectStore::Transaction& t,
AllReplicasRecovered())));
}
- update_stats();
+ publish_stats_to_osd();
}
// we need to flush this all out before doing anything else..
@@ -1848,7 +1848,7 @@ void PG::replay_queued_ops()
requeue_ops(replay);
requeue_ops(waiting_for_active);
- update_stats();
+ publish_stats_to_osd();
}
void PG::_activate_committed(epoch_t e)
@@ -1899,7 +1899,7 @@ void PG::all_activated_and_committed()
info.history.last_epoch_started = info.last_epoch_started;
share_pg_info();
- update_stats();
+ publish_stats_to_osd();
queue_peering_event(
CephPeeringEvtRef(
@@ -1991,7 +1991,7 @@ void PG::_finish_recovery(Context *c)
finish_sync_event = 0;
purge_strays();
- update_stats();
+ publish_stats_to_osd();
if (scrub_after_recovery) {
dout(10) << "_finish_recovery requeueing for scrub" << dendl;
@@ -2299,9 +2299,9 @@ void PG::update_heartbeat_peers()
osd->need_heartbeat_peer_update();
}
-void PG::update_stats()
+void PG::publish_stats_to_osd()
{
- pg_stats_lock.Lock();
+ pg_stats_publish_lock.Lock();
if (is_primary()) {
// update our stat summary
info.stats.reported.inc(info.history.same_primary_since);
@@ -2339,17 +2339,18 @@ void PG::update_stats()
info.stats.log_start = log.tail;
info.stats.ondisk_log_start = log.tail;
- pg_stats_valid = true;
- pg_stats_stable = info.stats;
+ pg_stats_publish_valid = true;
+ pg_stats_publish = info.stats;
+ pg_stats_publish.stats.add(unstable_stats);
// calc copies, degraded
unsigned target = MAX(get_osdmap()->get_pg_size(info.pgid), acting.size());
- pg_stats_stable.stats.calc_copies(target);
- pg_stats_stable.stats.sum.num_objects_degraded = 0;
+ pg_stats_publish.stats.calc_copies(target);
+ pg_stats_publish.stats.sum.num_objects_degraded = 0;
if ((is_degraded() || !is_clean()) && is_active()) {
// NOTE: we only generate copies, degraded, unfound values for
// the summation, not individual stat categories.
- uint64_t num_objects = pg_stats_stable.stats.sum.num_objects;
+ uint64_t num_objects = pg_stats_publish.stats.sum.num_objects;
uint64_t degraded = 0;
@@ -2358,7 +2359,7 @@ void PG::update_stats()
degraded += (target - acting.size()) * num_objects;
// missing on primary
- pg_stats_stable.stats.sum.num_objects_missing_on_primary = missing.num_missing();
+ pg_stats_publish.stats.sum.num_objects_missing_on_primary = missing.num_missing();
degraded += missing.num_missing();
for (unsigned i=1; i<acting.size(); i++) {
@@ -2370,27 +2371,27 @@ void PG::update_stats()
// not yet backfilled
degraded += num_objects - peer_info[acting[i]].stats.stats.sum.num_objects;
}
- pg_stats_stable.stats.sum.num_objects_degraded = degraded;
- pg_stats_stable.stats.sum.num_objects_unfound = get_num_unfound();
+ pg_stats_publish.stats.sum.num_objects_degraded = degraded;
+ pg_stats_publish.stats.sum.num_objects_unfound = get_num_unfound();
}
- dout(15) << "update_stats " << pg_stats_stable.reported << dendl;
+ dout(15) << "publish_stats_to_osd " << pg_stats_publish.reported << dendl;
} else {
- pg_stats_valid = false;
- dout(15) << "update_stats -- not primary" << dendl;
+ pg_stats_publish_valid = false;
+ dout(15) << "publish_stats_to_osd -- not primary" << dendl;
}
- pg_stats_lock.Unlock();
+ pg_stats_publish_lock.Unlock();
if (is_primary())
osd->pg_stat_queue_enqueue(this);
}
-void PG::clear_stats()
+void PG::clear_publish_stats()
{
dout(15) << "clear_stats" << dendl;
- pg_stats_lock.Lock();
- pg_stats_valid = false;
- pg_stats_lock.Unlock();
+ pg_stats_publish_lock.Lock();
+ pg_stats_publish_valid = false;
+ pg_stats_publish_lock.Unlock();
osd->pg_stat_queue_dequeue(this);
}
@@ -2620,6 +2621,9 @@ int PG::_write_info(ObjectStore::Transaction& t, epoch_t epoch,
void PG::write_info(ObjectStore::Transaction& t)
{
+ info.stats.stats.add(unstable_stats);
+ unstable_stats.clear();
+
int ret = _write_info(t, get_osdmap()->get_epoch(), info, coll,
past_intervals, snap_collections, osd->infos_oid,
info_struct_v, dirty_big_info);
@@ -3817,7 +3821,7 @@ void PG::scrub()
state_clear(PG_STATE_SCRUBBING);
state_clear(PG_STATE_REPAIR);
state_clear(PG_STATE_DEEP_SCRUB);
- update_stats();
+ publish_stats_to_osd();
unlock();
return;
}
@@ -3903,7 +3907,7 @@ void PG::classic_scrub()
scrubber.active = true;
scrubber.classic = true;
- update_stats();
+ publish_stats_to_osd();
scrubber.received_maps.clear();
scrubber.epoch_start = info.history.same_interval_since;
@@ -4078,7 +4082,7 @@ void PG::chunky_scrub() {
case PG::Scrubber::INACTIVE:
dout(10) << "scrub start" << dendl;
- update_stats();
+ publish_stats_to_osd();
scrubber.epoch_start = info.history.same_interval_since;
scrubber.active = true;
@@ -4256,7 +4260,7 @@ void PG::scrub_clear_state()
state_clear(PG_STATE_SCRUBBING);
state_clear(PG_STATE_REPAIR);
state_clear(PG_STATE_DEEP_SCRUB);
- update_stats();
+ publish_stats_to_osd();
// active -> nothing.
if (scrubber.active)
@@ -5093,7 +5097,7 @@ void PG::start_peering_interval(const OSDMapRef lastmap,
// old primary?
if (oldrole == 0) {
state_clear(PG_STATE_CLEAN);
- clear_stats();
+ clear_publish_stats();
// take replay queue waiters
list<OpRequestRef> ls;
@@ -6048,7 +6052,7 @@ boost::statechart::result PG::RecoveryState::Primary::react(const ActMap&)
{
dout(7) << "handle ActMap primary" << dendl;
PG *pg = context< RecoveryMachine >().pg;
- pg->update_stats();
+ pg->publish_stats_to_osd();
pg->take_waiters();
return discard_event();
}
@@ -6568,7 +6572,7 @@ PG::RecoveryState::Clean::Clean(my_context ctx)
pg->mark_clean();
pg->share_pg_info();
- pg->update_stats();
+ pg->publish_stats_to_osd();
}
@@ -6636,7 +6640,7 @@ boost::statechart::result PG::RecoveryState::Active::react(const AdvMap& advmap)
pg->state_clear(PG_STATE_DEGRADED);
else
pg->state_set(PG_STATE_DEGRADED);
- pg->update_stats(); // degraded may have changed
+ pg->publish_stats_to_osd(); // degraded may have changed
}
return forward_event();
}
@@ -7015,7 +7019,7 @@ PG::RecoveryState::GetInfo::GetInfo(my_context ctx)
if (!prior_set.get())
pg->build_prior(prior_set);
- pg->update_stats();
+ pg->publish_stats_to_osd();
get_infos();
if (peer_info_requested.empty() && !prior_set->pg_down) {
@@ -7348,7 +7352,7 @@ PG::RecoveryState::Incomplete::Incomplete(my_context ctx)
pg->state_clear(PG_STATE_PEERING);
pg->state_set(PG_STATE_INCOMPLETE);
- pg->update_stats();
+ pg->publish_stats_to_osd();
}
boost::statechart::result PG::RecoveryState::Incomplete::react(const AdvMap &advmap) {
diff --git a/src/osd/PG.h b/src/osd/PG.h
index 115ec1b5eaf..720fcb58772 100644
--- a/src/osd/PG.h
+++ b/src/osd/PG.h
@@ -714,16 +714,19 @@ protected:
void requeue_object_waiters(map<hobject_t, list<OpRequestRef> >& m);
void requeue_ops(list<OpRequestRef> &l);
- // stats
- Mutex pg_stats_lock;
- bool pg_stats_valid;
- pg_stat_t pg_stats_stable;
+ // stats that persist lazily
+ object_stat_collection_t unstable_stats;
+
+ // publish stats
+ Mutex pg_stats_publish_lock;
+ bool pg_stats_publish_valid;
+ pg_stat_t pg_stats_publish;
// for ordering writes
std::tr1::shared_ptr<ObjectStore::Sequencer> osr;
- void update_stats();
- void clear_stats();
+ void publish_stats_to_osd();
+ void clear_publish_stats();
public:
void clear_primary_state();
diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc
index fb24375dca6..708e4153ca8 100644
--- a/src/osd/ReplicatedPG.cc
+++ b/src/osd/ReplicatedPG.cc
@@ -1039,7 +1039,7 @@ void ReplicatedPG::do_op(OpRequestRef op)
if (ctx->op_t.empty() || result < 0) {
if (result >= 0) {
log_op_stats(ctx);
- update_stats();
+ publish_stats_to_osd();
}
MOSDOpReply *reply = ctx->reply;
@@ -2197,9 +2197,10 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
if (r >= 0) {
op.xattr.value_len = r;
result = 0;
+ ctx->delta_stats.num_rd_kb += SHIFT_ROUND_UP(r, 10);
+ ctx->delta_stats.num_rd++;
} else
result = r;
- ctx->delta_stats.num_rd++;
}
break;
@@ -2217,6 +2218,8 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
bufferlist bl;
::encode(newattrs, bl);
+ ctx->delta_stats.num_rd_kb += SHIFT_ROUND_UP(bl.length(), 10);
+ ctx->delta_stats.num_rd++;
osd_op.outdata.claim_append(bl);
}
break;
@@ -2237,6 +2240,9 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
if (result < 0 && result != -EEXIST && result != -ENODATA)
break;
+ ctx->delta_stats.num_rd++;
+ ctx->delta_stats.num_rd_kb += SHIFT_ROUND_UP(xattr.length(), 10);
+
switch (op.xattr.cmp_mode) {
case CEPH_OSD_CMPXATTR_MODE_STRING:
{
@@ -2281,7 +2287,6 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
}
dout(10) << "comparison returned true" << dendl;
- ctx->delta_stats.num_rd++;
}
break;
@@ -2848,6 +2853,8 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
out_set.insert(iter->first);
}
::encode(out_set, osd_op.outdata);
+ ctx->delta_stats.num_rd_kb += SHIFT_ROUND_UP(osd_op.outdata.length(), 10);
+ ctx->delta_stats.num_rd++;
break;
}
dout(10) << "failed, reading from omap" << dendl;
@@ -2867,6 +2874,8 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
}
}
::encode(out_set, osd_op.outdata);
+ ctx->delta_stats.num_rd_kb += SHIFT_ROUND_UP(osd_op.outdata.length(), 10);
+ ctx->delta_stats.num_rd++;
}
break;
case CEPH_OSD_OP_OMAPGETVALS:
@@ -2901,6 +2910,8 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
out_set.insert(*iter);
}
::encode(out_set, osd_op.outdata);
+ ctx->delta_stats.num_rd_kb += SHIFT_ROUND_UP(osd_op.outdata.length(), 10);
+ ctx->delta_stats.num_rd++;
break;
}
// No valid tmap, use omap
@@ -2923,6 +2934,8 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
}
}
::encode(out_set, osd_op.outdata);
+ ctx->delta_stats.num_rd_kb += SHIFT_ROUND_UP(osd_op.outdata.length(), 10);
+ ctx->delta_stats.num_rd++;
}
break;
case CEPH_OSD_OP_OMAPGETHEADER:
@@ -2941,6 +2954,8 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
dout(10) << "failed, reading from omap" << dendl;
}
osd->store->omap_get_header(coll, soid, &osd_op.outdata);
+ ctx->delta_stats.num_rd_kb += SHIFT_ROUND_UP(osd_op.outdata.length(), 10);
+ ctx->delta_stats.num_rd++;
}
break;
case CEPH_OSD_OP_OMAPGETVALSBYKEYS:
@@ -2969,6 +2984,8 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
}
}
::encode(out, osd_op.outdata);
+ ctx->delta_stats.num_rd_kb += SHIFT_ROUND_UP(osd_op.outdata.length(), 10);
+ ctx->delta_stats.num_rd++;
break;
}
// No valid tmap, use omap
@@ -2976,6 +2993,8 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
}
osd->store->omap_get_values(coll, soid, keys_to_get, &out);
::encode(out, osd_op.outdata);
+ ctx->delta_stats.num_rd_kb += SHIFT_ROUND_UP(osd_op.outdata.length(), 10);
+ ctx->delta_stats.num_rd++;
}
break;
case CEPH_OSD_OP_OMAP_CMP:
@@ -3004,6 +3023,8 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
result = r;
break;
}
+ //Should set num_rd_kb based on encode length of map
+ ctx->delta_stats.num_rd++;
r = 0;
bufferlist empty;
@@ -3066,6 +3087,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
dout(20) << "\t" << i->first << dendl;
}
t.omap_setkeys(coll, soid, to_set);
+ ctx->delta_stats.num_wr++;
}
break;
case CEPH_OSD_OP_OMAPSETHEADER:
@@ -3079,6 +3101,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
}
t.touch(coll, soid);
t.omap_setheader(coll, soid, osd_op.indata);
+ ctx->delta_stats.num_wr++;
}
break;
case CEPH_OSD_OP_OMAPCLEAR:
@@ -3092,6 +3115,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
}
t.touch(coll, soid);
t.omap_clear(coll, soid);
+ ctx->delta_stats.num_wr++;
}
break;
case CEPH_OSD_OP_OMAPRMKEYS:
@@ -3113,6 +3137,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
goto fail;
}
t.omap_rmkeys(coll, soid, to_rm);
+ ctx->delta_stats.num_wr++;
}
break;
default:
@@ -3575,6 +3600,7 @@ int ReplicatedPG::prepare_transaction(OpContext *ctx)
// read-op? done?
if (ctx->op_t.empty() && !ctx->modify) {
ctx->reply_version = ctx->obs->oi.user_version;
+ unstable_stats.add(ctx->delta_stats, ctx->obc->obs.oi.category);
return result;
}
@@ -3879,7 +3905,7 @@ void ReplicatedPG::eval_repop(RepGather *repop)
if (repop->waitfor_disk.empty()) {
log_op_stats(repop->ctx);
- update_stats();
+ publish_stats_to_osd();
// send dup commits, in order
if (waiting_for_ondisk.count(repop->v)) {
@@ -5454,7 +5480,7 @@ void ReplicatedPG::handle_pull_response(OpRequestRef op)
if (complete) {
pulling.erase(hoid);
pull_from_peer[m->get_source().num()].erase(hoid);
- update_stats();
+ publish_stats_to_osd();
if (waiting_for_missing_object.count(hoid)) {
dout(20) << " kicking waiters on " << hoid << dendl;
requeue_ops(waiting_for_missing_object[hoid]);
@@ -5701,7 +5727,7 @@ void ReplicatedPG::sub_op_push_reply(OpRequestRef op)
pushing[soid].erase(peer);
pi = NULL;
- update_stats();
+ publish_stats_to_osd();
if (pushing[soid].empty()) {
pushing.erase(soid);
@@ -6335,6 +6361,7 @@ void ReplicatedPG::on_change()
snap_trimmer_machine.process_event(Reset());
debug_op_order.clear();
+ unstable_stats.clear();
}
void ReplicatedPG::on_role_change()
@@ -7277,7 +7304,7 @@ void ReplicatedPG::_scrub_finish()
if (repair) {
++scrubber.fixed;
info.stats.stats = scrub_cstat;
- update_stats();
+ publish_stats_to_osd();
share_pg_info();
}
}
diff --git a/src/osdc/ObjectCacher.cc b/src/osdc/ObjectCacher.cc
index 21370f0f982..56c56d640f1 100644
--- a/src/osdc/ObjectCacher.cc
+++ b/src/osdc/ObjectCacher.cc
@@ -502,7 +502,7 @@ ObjectCacher::ObjectCacher(CephContext *cct_, string name, WritebackHandler& wb,
flush_set_callback(flush_callback), flush_set_callback_arg(flush_callback_arg),
flusher_stop(false), flusher_thread(this), finisher(cct),
stat_clean(0), stat_zero(0), stat_dirty(0), stat_rx(0), stat_tx(0), stat_missing(0),
- stat_error(0), stat_dirty_waiting(0)
+ stat_error(0), stat_dirty_waiting(0), reads_outstanding(0)
{
this->max_dirty_age.set_from_double(max_dirty_age);
perf_start();
@@ -592,7 +592,8 @@ void ObjectCacher::close_object(Object *ob)
void ObjectCacher::bh_read(BufferHead *bh)
{
assert(lock.is_locked());
- ldout(cct, 7) << "bh_read on " << *bh << dendl;
+ ldout(cct, 7) << "bh_read on " << *bh << " outstanding reads "
+ << reads_outstanding << dendl;
mark_rx(bh);
@@ -607,6 +608,7 @@ void ObjectCacher::bh_read(BufferHead *bh)
bh->start(), bh->length(), bh->ob->get_snap(),
&onfinish->bl, oset->truncate_size, oset->truncate_seq,
onfinish);
+ ++reads_outstanding;
}
void ObjectCacher::bh_read_finish(int64_t poolid, sobject_t oid, loff_t start,
@@ -619,6 +621,7 @@ void ObjectCacher::bh_read_finish(int64_t poolid, sobject_t oid, loff_t start,
<< " " << start << "~" << length
<< " (bl is " << bl.length() << ")"
<< " returned " << r
+ << " outstanding reads " << reads_outstanding
<< dendl;
if (bl.length() < length) {
@@ -768,6 +771,8 @@ void ObjectCacher::bh_read_finish(int64_t poolid, sobject_t oid, loff_t start,
ldout(cct, 20) << "finishing waiters " << ls << dendl;
finish_contexts(cct, ls, err);
+ --reads_outstanding;
+ read_cond.Signal();
}
@@ -1433,6 +1438,19 @@ void ObjectCacher::flusher_entry()
break;
flusher_cond.WaitInterval(cct, lock, utime_t(1,0));
}
+
+ /* Wait for reads to finish. This is only possible if handling
+ * -ENOENT made some read completions finish before their rados read
+ * came back. If we don't wait for them, and destroy the cache, when
+ * the rados reads do come back their callback will try to access the
+ * no-longer-valid ObjectCacher.
+ */
+ while (reads_outstanding > 0) {
+ ldout(cct, 10) << "Waiting for all reads to complete. Number left: "
+ << reads_outstanding << dendl;
+ read_cond.Wait(lock);
+ }
+
lock.Unlock();
ldout(cct, 10) << "flusher finish" << dendl;
}
diff --git a/src/osdc/ObjectCacher.h b/src/osdc/ObjectCacher.h
index a17046f9126..80a90bd0457 100644
--- a/src/osdc/ObjectCacher.h
+++ b/src/osdc/ObjectCacher.h
@@ -444,6 +444,9 @@ class ObjectCacher {
loff_t release(Object *o);
void purge(Object *o);
+ int64_t reads_outstanding;
+ Cond read_cond;
+
int _readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
bool external_call);