diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/common/config_opts.h | 1 | ||||
-rw-r--r-- | src/mon/Paxos.cc | 84 | ||||
-rw-r--r-- | src/mon/Paxos.h | 7 | ||||
-rw-r--r-- | src/tools/ceph-monstore-tool.cc | 14 |
4 files changed, 91 insertions, 15 deletions
diff --git a/src/common/config_opts.h b/src/common/config_opts.h index d16dfe0061a..b43808e211c 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -209,6 +209,7 @@ OPTION(paxos_trim_min, OPT_INT, 250) // number of extra proposals tolerated bef OPTION(paxos_trim_max, OPT_INT, 500) // max number of extra proposals to trim at a time OPTION(paxos_service_trim_min, OPT_INT, 250) // minimum amount of versions to trigger a trim (0 disables it) OPTION(paxos_service_trim_max, OPT_INT, 500) // maximum amount of versions to trim during a single proposal (0 disables it) +OPTION(paxos_kill_at, OPT_INT, 0) OPTION(clock_offset, OPT_DOUBLE, 0) // how much to offset the system clock in Clock.cc OPTION(auth_cluster_required, OPT_STR, "cephx") // required of mon, mds, osd daemons OPTION(auth_service_required, OPT_STR, "cephx") // required by daemons of clients diff --git a/src/mon/Paxos.cc b/src/mon/Paxos.cc index ee2ba3b6fdb..508669deef5 100644 --- a/src/mon/Paxos.cc +++ b/src/mon/Paxos.cc @@ -103,11 +103,21 @@ void Paxos::collect(version_t oldpn) // look for uncommitted value if (get_store()->exists(get_name(), last_committed+1)) { + version_t v = get_store()->get(get_name(), "pending_v"); + version_t pn = get_store()->get(get_name(), "pending_pn"); + if (v && pn && v == last_committed + 1) { + uncommitted_pn = pn; + } else { + dout(10) << "WARNING: no pending_pn on disk, using previous accepted_pn " << accepted_pn + << " and crossing our fingers" << dendl; + uncommitted_pn = accepted_pn; + } uncommitted_v = last_committed+1; - uncommitted_pn = accepted_pn; + get_store()->get(get_name(), last_committed+1, uncommitted_value); assert(uncommitted_value.length()); dout(10) << "learned uncommitted " << (last_committed+1) + << " pn " << uncommitted_pn << " (" << uncommitted_value.length() << " bytes) from myself" << dendl; } @@ -164,6 +174,8 @@ void Paxos::handle_collect(MMonPaxos *collect) last->last_committed = last_committed; last->first_committed = first_committed; + version_t previous_pn = accepted_pn; + // can we accept this pn? if (collect->pn > accepted_pn) { // ok, accept it @@ -198,13 +210,25 @@ void Paxos::handle_collect(MMonPaxos *collect) // do we have an accepted but uncommitted value? // (it'll be at last_committed+1) bufferlist bl; - if (get_store()->exists(get_name(), last_committed+1)) { + if (collect->last_committed == last_committed && + get_store()->exists(get_name(), last_committed+1)) { get_store()->get(get_name(), last_committed+1, bl); assert(bl.length() > 0); dout(10) << " sharing our accepted but uncommitted value for " << last_committed+1 << " (" << bl.length() << " bytes)" << dendl; last->values[last_committed+1] = bl; - last->uncommitted_pn = accepted_pn; + + version_t v = get_store()->get(get_name(), "pending_v"); + version_t pn = get_store()->get(get_name(), "pending_pn"); + if (v && pn && v == last_committed + 1) { + last->uncommitted_pn = pn; + } else { + // previously we didn't record which pn a value was accepted + // under! use the pn value we just had... :( + dout(10) << "WARNING: no pending_pn on disk, using previous accepted_pn " << previous_pn + << " and crossing our fingers" << dendl; + last->uncommitted_pn = previous_pn; + } } // send reply @@ -370,9 +394,13 @@ void Paxos::handle_last(MMonPaxos *last) return; } + assert(g_conf->paxos_kill_at != 1); + // store any committed values if any are specified in the message store_state(last); + assert(g_conf->paxos_kill_at != 2); + // do they accept your pn? if (last->pn > accepted_pn) { // no, try again. @@ -390,15 +418,23 @@ void Paxos::handle_last(MMonPaxos *last) << num_last << " peons" << dendl; // did this person send back an accepted but uncommitted value? - if (last->uncommitted_pn && - last->uncommitted_pn > uncommitted_pn) { - uncommitted_v = last->last_committed+1; - uncommitted_pn = last->uncommitted_pn; - uncommitted_value = last->values[uncommitted_v]; - dout(10) << "we learned an uncommitted value for " << uncommitted_v - << " pn " << uncommitted_pn - << " " << uncommitted_value.length() << " bytes" - << dendl; + if (last->uncommitted_pn) { + if (last->uncommitted_pn > uncommitted_pn && + last->last_committed >= last_committed && + last->last_committed + 1 >= uncommitted_v) { + uncommitted_v = last->last_committed+1; + uncommitted_pn = last->uncommitted_pn; + uncommitted_value = last->values[uncommitted_v]; + dout(10) << "we learned an uncommitted value for " << uncommitted_v + << " pn " << uncommitted_pn + << " " << uncommitted_value.length() << " bytes" + << dendl; + } else { + dout(10) << "ignoring uncommitted value for " << (last->last_committed+1) + << " pn " << last->uncommitted_pn + << " " << last->values[last->last_committed+1].length() << " bytes" + << dendl; + } } // is that everyone? @@ -502,6 +538,10 @@ void Paxos::begin(bufferlist& v) MonitorDBStore::Transaction t; t.put(get_name(), last_committed+1, new_value); + // note which pn this pending value is for. + t.put(get_name(), "pending_v", last_committed + 1); + t.put(get_name(), "pending_pn", accepted_pn); + dout(30) << __func__ << " transaction dump:\n"; JSONFormatter f(true); t.dump(&f); @@ -516,6 +556,8 @@ void Paxos::begin(bufferlist& v) get_store()->apply_transaction(t); + assert(g_conf->paxos_kill_at != 3); + if (mon->get_quorum().size() == 1) { // we're alone, take it easy commit(); @@ -566,6 +608,8 @@ void Paxos::handle_begin(MMonPaxos *begin) assert(begin->pn == accepted_pn); assert(begin->last_committed == last_committed); + assert(g_conf->paxos_kill_at != 4); + // set state. state = STATE_UPDATING; lease_expire = utime_t(); // cancel lease @@ -578,6 +622,10 @@ void Paxos::handle_begin(MMonPaxos *begin) MonitorDBStore::Transaction t; t.put(get_name(), v, begin->values[v]); + // note which pn this pending value is for. + t.put(get_name(), "pending_v", v); + t.put(get_name(), "pending_pn", accepted_pn); + dout(30) << __func__ << " transaction dump:\n"; JSONFormatter f(true); t.dump(&f); @@ -586,6 +634,8 @@ void Paxos::handle_begin(MMonPaxos *begin) get_store()->apply_transaction(t); + assert(g_conf->paxos_kill_at != 5); + // reply MMonPaxos *accept = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_ACCEPT, ceph_clock_now(g_ceph_context)); @@ -620,6 +670,8 @@ void Paxos::handle_accept(MMonPaxos *accept) accepted.insert(from); dout(10) << " now " << accepted << " have accepted" << dendl; + assert(g_conf->paxos_kill_at != 6); + // new majority? if (accepted.size() == (unsigned)mon->monmap->size()/2+1) { // yay, commit! @@ -643,6 +695,8 @@ void Paxos::handle_accept(MMonPaxos *accept) // yay! extend_lease(); + assert(g_conf->paxos_kill_at != 10); + finish_round(); // wake people up @@ -673,6 +727,8 @@ void Paxos::commit() // leader still got a majority and committed with out us.) lease_expire = utime_t(); // cancel lease + assert(g_conf->paxos_kill_at != 7); + MonitorDBStore::Transaction t; // commit locally @@ -692,6 +748,8 @@ void Paxos::commit() get_store()->apply_transaction(t); + assert(g_conf->paxos_kill_at != 8); + // refresh first_committed; this txn may have trimmed. first_committed = get_store()->get(get_name(), "first_committed"); @@ -713,6 +771,8 @@ void Paxos::commit() mon->messenger->send_message(commit, mon->monmap->get_inst(*p)); } + assert(g_conf->paxos_kill_at != 9); + // get ready for a new round. new_value.clear(); diff --git a/src/mon/Paxos.h b/src/mon/Paxos.h index cab27f289a8..69419e64ab9 100644 --- a/src/mon/Paxos.h +++ b/src/mon/Paxos.h @@ -290,8 +290,9 @@ private: */ version_t accepted_pn; /** - * @todo This has something to do with the last_committed version. Not sure - * about what it entails, tbh. + * The last_committed epoch of the leader at the time we accepted the last pn. + * + * This has NO SEMANTIC MEANING, and is there only for the debug output. */ version_t accepted_pn_from; /** @@ -1114,7 +1115,7 @@ public: * @param t The transaction to which we will append the operations * @param bl A bufferlist containing an encoded transaction */ - void decode_append_transaction(MonitorDBStore::Transaction& t, + static void decode_append_transaction(MonitorDBStore::Transaction& t, bufferlist& bl) { MonitorDBStore::Transaction vt; bufferlist::iterator it = bl.begin(); diff --git a/src/tools/ceph-monstore-tool.cc b/src/tools/ceph-monstore-tool.cc index ae608a302f2..f361266aff0 100644 --- a/src/tools/ceph-monstore-tool.cc +++ b/src/tools/ceph-monstore-tool.cc @@ -31,6 +31,7 @@ #include "global/global_init.h" #include "os/LevelDBStore.h" #include "mon/MonitorDBStore.h" +#include "mon/Paxos.h" #include "common/Formatter.h" namespace po = boost::program_options; @@ -246,6 +247,19 @@ int main(int argc, char **argv) { goto done; } bl.write_fd(fd); + } else if (cmd == "dump-paxos") { + for (version_t v = dstart; v <= dstop; ++v) { + bufferlist bl; + st.get("paxos", v, bl); + if (bl.length() == 0) + break; + cout << "\n--- " << v << " ---" << std::endl; + MonitorDBStore::Transaction tx; + Paxos::decode_append_transaction(tx, bl); + JSONFormatter f(true); + tx.dump(&f); + f.flush(cout); + } } else if (cmd == "dump-trace") { if (tfile.empty()) { std::cerr << "Need trace_file" << std::endl; |