summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSage Weil <sage@inktank.com>2013-07-22 18:28:54 -0700
committerSage Weil <sage@inktank.com>2013-07-22 18:28:54 -0700
commit9626f77f095eb4c3aa38e56b5fc82a9aa931efa5 (patch)
treef83b3bf92e340cc32c16dfac90b20e926b8233ca
parent093182b79680994a0ccb7942aff6722e62905181 (diff)
parentcfe1395f479f152867e94371756a358a6fe4fe3d (diff)
downloadceph-9626f77f095eb4c3aa38e56b5fc82a9aa931efa5.tar.gz
Merge pull request #457 from ceph/wip-paxos
paxos fixes Reviewed-by: Greg Farnum <greg@inktank.com> Reviewed-by: Joao Eduardo Luis <joao.luis@inktank.com>
-rw-r--r--src/common/config_opts.h1
-rw-r--r--src/mon/Paxos.cc84
-rw-r--r--src/mon/Paxos.h7
-rw-r--r--src/tools/ceph-monstore-tool.cc14
4 files changed, 91 insertions, 15 deletions
diff --git a/src/common/config_opts.h b/src/common/config_opts.h
index d16dfe0061a..b43808e211c 100644
--- a/src/common/config_opts.h
+++ b/src/common/config_opts.h
@@ -209,6 +209,7 @@ OPTION(paxos_trim_min, OPT_INT, 250) // number of extra proposals tolerated bef
OPTION(paxos_trim_max, OPT_INT, 500) // max number of extra proposals to trim at a time
OPTION(paxos_service_trim_min, OPT_INT, 250) // minimum amount of versions to trigger a trim (0 disables it)
OPTION(paxos_service_trim_max, OPT_INT, 500) // maximum amount of versions to trim during a single proposal (0 disables it)
+OPTION(paxos_kill_at, OPT_INT, 0)
OPTION(clock_offset, OPT_DOUBLE, 0) // how much to offset the system clock in Clock.cc
OPTION(auth_cluster_required, OPT_STR, "cephx") // required of mon, mds, osd daemons
OPTION(auth_service_required, OPT_STR, "cephx") // required by daemons of clients
diff --git a/src/mon/Paxos.cc b/src/mon/Paxos.cc
index ee2ba3b6fdb..508669deef5 100644
--- a/src/mon/Paxos.cc
+++ b/src/mon/Paxos.cc
@@ -103,11 +103,21 @@ void Paxos::collect(version_t oldpn)
// look for uncommitted value
if (get_store()->exists(get_name(), last_committed+1)) {
+ version_t v = get_store()->get(get_name(), "pending_v");
+ version_t pn = get_store()->get(get_name(), "pending_pn");
+ if (v && pn && v == last_committed + 1) {
+ uncommitted_pn = pn;
+ } else {
+ dout(10) << "WARNING: no pending_pn on disk, using previous accepted_pn " << accepted_pn
+ << " and crossing our fingers" << dendl;
+ uncommitted_pn = accepted_pn;
+ }
uncommitted_v = last_committed+1;
- uncommitted_pn = accepted_pn;
+
get_store()->get(get_name(), last_committed+1, uncommitted_value);
assert(uncommitted_value.length());
dout(10) << "learned uncommitted " << (last_committed+1)
+ << " pn " << uncommitted_pn
<< " (" << uncommitted_value.length() << " bytes) from myself"
<< dendl;
}
@@ -164,6 +174,8 @@ void Paxos::handle_collect(MMonPaxos *collect)
last->last_committed = last_committed;
last->first_committed = first_committed;
+ version_t previous_pn = accepted_pn;
+
// can we accept this pn?
if (collect->pn > accepted_pn) {
// ok, accept it
@@ -198,13 +210,25 @@ void Paxos::handle_collect(MMonPaxos *collect)
// do we have an accepted but uncommitted value?
// (it'll be at last_committed+1)
bufferlist bl;
- if (get_store()->exists(get_name(), last_committed+1)) {
+ if (collect->last_committed == last_committed &&
+ get_store()->exists(get_name(), last_committed+1)) {
get_store()->get(get_name(), last_committed+1, bl);
assert(bl.length() > 0);
dout(10) << " sharing our accepted but uncommitted value for "
<< last_committed+1 << " (" << bl.length() << " bytes)" << dendl;
last->values[last_committed+1] = bl;
- last->uncommitted_pn = accepted_pn;
+
+ version_t v = get_store()->get(get_name(), "pending_v");
+ version_t pn = get_store()->get(get_name(), "pending_pn");
+ if (v && pn && v == last_committed + 1) {
+ last->uncommitted_pn = pn;
+ } else {
+ // previously we didn't record which pn a value was accepted
+ // under! use the pn value we just had... :(
+ dout(10) << "WARNING: no pending_pn on disk, using previous accepted_pn " << previous_pn
+ << " and crossing our fingers" << dendl;
+ last->uncommitted_pn = previous_pn;
+ }
}
// send reply
@@ -370,9 +394,13 @@ void Paxos::handle_last(MMonPaxos *last)
return;
}
+ assert(g_conf->paxos_kill_at != 1);
+
// store any committed values if any are specified in the message
store_state(last);
+ assert(g_conf->paxos_kill_at != 2);
+
// do they accept your pn?
if (last->pn > accepted_pn) {
// no, try again.
@@ -390,15 +418,23 @@ void Paxos::handle_last(MMonPaxos *last)
<< num_last << " peons" << dendl;
// did this person send back an accepted but uncommitted value?
- if (last->uncommitted_pn &&
- last->uncommitted_pn > uncommitted_pn) {
- uncommitted_v = last->last_committed+1;
- uncommitted_pn = last->uncommitted_pn;
- uncommitted_value = last->values[uncommitted_v];
- dout(10) << "we learned an uncommitted value for " << uncommitted_v
- << " pn " << uncommitted_pn
- << " " << uncommitted_value.length() << " bytes"
- << dendl;
+ if (last->uncommitted_pn) {
+ if (last->uncommitted_pn > uncommitted_pn &&
+ last->last_committed >= last_committed &&
+ last->last_committed + 1 >= uncommitted_v) {
+ uncommitted_v = last->last_committed+1;
+ uncommitted_pn = last->uncommitted_pn;
+ uncommitted_value = last->values[uncommitted_v];
+ dout(10) << "we learned an uncommitted value for " << uncommitted_v
+ << " pn " << uncommitted_pn
+ << " " << uncommitted_value.length() << " bytes"
+ << dendl;
+ } else {
+ dout(10) << "ignoring uncommitted value for " << (last->last_committed+1)
+ << " pn " << last->uncommitted_pn
+ << " " << last->values[last->last_committed+1].length() << " bytes"
+ << dendl;
+ }
}
// is that everyone?
@@ -502,6 +538,10 @@ void Paxos::begin(bufferlist& v)
MonitorDBStore::Transaction t;
t.put(get_name(), last_committed+1, new_value);
+ // note which pn this pending value is for.
+ t.put(get_name(), "pending_v", last_committed + 1);
+ t.put(get_name(), "pending_pn", accepted_pn);
+
dout(30) << __func__ << " transaction dump:\n";
JSONFormatter f(true);
t.dump(&f);
@@ -516,6 +556,8 @@ void Paxos::begin(bufferlist& v)
get_store()->apply_transaction(t);
+ assert(g_conf->paxos_kill_at != 3);
+
if (mon->get_quorum().size() == 1) {
// we're alone, take it easy
commit();
@@ -566,6 +608,8 @@ void Paxos::handle_begin(MMonPaxos *begin)
assert(begin->pn == accepted_pn);
assert(begin->last_committed == last_committed);
+ assert(g_conf->paxos_kill_at != 4);
+
// set state.
state = STATE_UPDATING;
lease_expire = utime_t(); // cancel lease
@@ -578,6 +622,10 @@ void Paxos::handle_begin(MMonPaxos *begin)
MonitorDBStore::Transaction t;
t.put(get_name(), v, begin->values[v]);
+ // note which pn this pending value is for.
+ t.put(get_name(), "pending_v", v);
+ t.put(get_name(), "pending_pn", accepted_pn);
+
dout(30) << __func__ << " transaction dump:\n";
JSONFormatter f(true);
t.dump(&f);
@@ -586,6 +634,8 @@ void Paxos::handle_begin(MMonPaxos *begin)
get_store()->apply_transaction(t);
+ assert(g_conf->paxos_kill_at != 5);
+
// reply
MMonPaxos *accept = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_ACCEPT,
ceph_clock_now(g_ceph_context));
@@ -620,6 +670,8 @@ void Paxos::handle_accept(MMonPaxos *accept)
accepted.insert(from);
dout(10) << " now " << accepted << " have accepted" << dendl;
+ assert(g_conf->paxos_kill_at != 6);
+
// new majority?
if (accepted.size() == (unsigned)mon->monmap->size()/2+1) {
// yay, commit!
@@ -643,6 +695,8 @@ void Paxos::handle_accept(MMonPaxos *accept)
// yay!
extend_lease();
+ assert(g_conf->paxos_kill_at != 10);
+
finish_round();
// wake people up
@@ -673,6 +727,8 @@ void Paxos::commit()
// leader still got a majority and committed with out us.)
lease_expire = utime_t(); // cancel lease
+ assert(g_conf->paxos_kill_at != 7);
+
MonitorDBStore::Transaction t;
// commit locally
@@ -692,6 +748,8 @@ void Paxos::commit()
get_store()->apply_transaction(t);
+ assert(g_conf->paxos_kill_at != 8);
+
// refresh first_committed; this txn may have trimmed.
first_committed = get_store()->get(get_name(), "first_committed");
@@ -713,6 +771,8 @@ void Paxos::commit()
mon->messenger->send_message(commit, mon->monmap->get_inst(*p));
}
+ assert(g_conf->paxos_kill_at != 9);
+
// get ready for a new round.
new_value.clear();
diff --git a/src/mon/Paxos.h b/src/mon/Paxos.h
index cab27f289a8..69419e64ab9 100644
--- a/src/mon/Paxos.h
+++ b/src/mon/Paxos.h
@@ -290,8 +290,9 @@ private:
*/
version_t accepted_pn;
/**
- * @todo This has something to do with the last_committed version. Not sure
- * about what it entails, tbh.
+ * The last_committed epoch of the leader at the time we accepted the last pn.
+ *
+ * This has NO SEMANTIC MEANING, and is there only for the debug output.
*/
version_t accepted_pn_from;
/**
@@ -1114,7 +1115,7 @@ public:
* @param t The transaction to which we will append the operations
* @param bl A bufferlist containing an encoded transaction
*/
- void decode_append_transaction(MonitorDBStore::Transaction& t,
+ static void decode_append_transaction(MonitorDBStore::Transaction& t,
bufferlist& bl) {
MonitorDBStore::Transaction vt;
bufferlist::iterator it = bl.begin();
diff --git a/src/tools/ceph-monstore-tool.cc b/src/tools/ceph-monstore-tool.cc
index ae608a302f2..f361266aff0 100644
--- a/src/tools/ceph-monstore-tool.cc
+++ b/src/tools/ceph-monstore-tool.cc
@@ -31,6 +31,7 @@
#include "global/global_init.h"
#include "os/LevelDBStore.h"
#include "mon/MonitorDBStore.h"
+#include "mon/Paxos.h"
#include "common/Formatter.h"
namespace po = boost::program_options;
@@ -246,6 +247,19 @@ int main(int argc, char **argv) {
goto done;
}
bl.write_fd(fd);
+ } else if (cmd == "dump-paxos") {
+ for (version_t v = dstart; v <= dstop; ++v) {
+ bufferlist bl;
+ st.get("paxos", v, bl);
+ if (bl.length() == 0)
+ break;
+ cout << "\n--- " << v << " ---" << std::endl;
+ MonitorDBStore::Transaction tx;
+ Paxos::decode_append_transaction(tx, bl);
+ JSONFormatter f(true);
+ tx.dump(&f);
+ f.flush(cout);
+ }
} else if (cmd == "dump-trace") {
if (tfile.empty()) {
std::cerr << "Need trace_file" << std::endl;