summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/ceph_mon.cc20
-rw-r--r--src/common/config_opts.h13
-rw-r--r--src/messages/MMonSync.h184
-rw-r--r--src/mon/Monitor.cc1461
-rw-r--r--src/mon/Monitor.h908
-rw-r--r--src/mon/MonitorDBStore.h72
-rw-r--r--src/mon/PGMonitor.cc16
-rw-r--r--src/mon/PGMonitor.h16
-rw-r--r--src/mon/Paxos.cc48
-rw-r--r--src/mon/Paxos.h42
-rw-r--r--src/mon/PaxosService.h7
11 files changed, 558 insertions, 2229 deletions
diff --git a/src/ceph_mon.cc b/src/ceph_mon.cc
index 7d8d5c4db29..b62443aa035 100644
--- a/src/ceph_mon.cc
+++ b/src/ceph_mon.cc
@@ -107,6 +107,8 @@ void usage()
cerr << " debug monitor level (e.g. 10)\n";
cerr << " --mkfs\n";
cerr << " build fresh monitor fs\n";
+ cerr << " --force-sync\n";
+ cerr << " force a sync from another mon by wiping local data (BE CAREFUL)\n";
generic_server_usage();
}
@@ -116,6 +118,8 @@ int main(int argc, const char **argv)
bool mkfs = false;
bool compact = false;
+ bool force_sync = false;
+ bool yes_really = false;
std::string osdmapfn, inject_monmap, extract_monmap;
vector<const char*> args;
@@ -136,6 +140,10 @@ int main(int argc, const char **argv)
mkfs = true;
} else if (ceph_argparse_flag(args, i, "--compact", (char*)NULL)) {
compact = true;
+ } else if (ceph_argparse_flag(args, i, "--force-sync", (char*)NULL)) {
+ force_sync = true;
+ } else if (ceph_argparse_flag(args, i, "--yes-i-really-mean-it", (char*)NULL)) {
+ yes_really = true;
} else if (ceph_argparse_witharg(args, i, &val, "--osdmap", (char*)NULL)) {
osdmapfn = val;
} else if (ceph_argparse_witharg(args, i, &val, "--inject_monmap", (char*)NULL)) {
@@ -151,6 +159,12 @@ int main(int argc, const char **argv)
usage();
}
+ if (force_sync && !yes_really) {
+ cerr << "are you SURE you want to force a sync? this will erase local data and may\n"
+ << "break your mon cluster. pass --yes-i-really-mean-it if you do." << std::endl;
+ exit(1);
+ }
+
if (g_conf->mon_data.empty()) {
cerr << "must specify '--mon-data=foo' data path" << std::endl;
usage();
@@ -406,7 +420,6 @@ int main(int argc, const char **argv)
}
}
-
// this is what i will bind to
entity_addr_t ipaddr;
@@ -510,6 +523,11 @@ int main(int argc, const char **argv)
mon = new Monitor(g_ceph_context, g_conf->name.get_id(), store,
messenger, &monmap);
+ if (force_sync) {
+ derr << "flagging a forced sync ..." << dendl;
+ mon->sync_force(cerr);
+ }
+
err = mon->preinit();
if (err < 0)
prefork.exit(1);
diff --git a/src/common/config_opts.h b/src/common/config_opts.h
index 2e386f9d732..01ec8375cd2 100644
--- a/src/common/config_opts.h
+++ b/src/common/config_opts.h
@@ -171,17 +171,13 @@ OPTION(mon_health_data_update_interval, OPT_FLOAT, 60.0)
OPTION(mon_data_avail_crit, OPT_INT, 5)
OPTION(mon_data_avail_warn, OPT_INT, 30)
OPTION(mon_config_key_max_entry_size, OPT_INT, 4096) // max num bytes per config-key entry
-OPTION(mon_sync_trim_timeout, OPT_DOUBLE, 30.0)
-OPTION(mon_sync_heartbeat_timeout, OPT_DOUBLE, 30.0)
-OPTION(mon_sync_heartbeat_interval, OPT_DOUBLE, 5.0)
-OPTION(mon_sync_backoff_timeout, OPT_DOUBLE, 30.0)
-OPTION(mon_sync_timeout, OPT_DOUBLE, 30.0)
-OPTION(mon_sync_max_retries, OPT_INT, 5)
+OPTION(mon_sync_timeout, OPT_DOUBLE, 60.0)
OPTION(mon_sync_max_payload_size, OPT_U32, 1048576) // max size for a sync chunk payload (say, 1MB)
OPTION(mon_sync_debug, OPT_BOOL, false) // enable sync-specific debug
OPTION(mon_sync_debug_leader, OPT_INT, -1) // monitor to be used as the sync leader
OPTION(mon_sync_debug_provider, OPT_INT, -1) // monitor to be used as the sync provider
OPTION(mon_sync_debug_provider_fallback, OPT_INT, -1) // monitor to be used as fallback if sync provider fails
+OPTION(mon_inject_sync_get_chunk_delay, OPT_DOUBLE, 0) // inject N second delay on each get_chunk request
OPTION(mon_osd_min_down_reporters, OPT_INT, 1) // number of OSDs who need to report a down OSD for it to count
OPTION(mon_osd_min_down_reports, OPT_INT, 3) // number of times a down OSD must be reported for it to count
OPTION(mon_osd_force_trim_to, OPT_INT, 0) // force mon to trim maps to this point, regardless of min_last_epoch_clean (dangerous, use with care)
@@ -190,8 +186,7 @@ OPTION(mon_osd_force_trim_to, OPT_INT, 0) // force mon to trim maps to this po
OPTION(mon_debug_dump_transactions, OPT_BOOL, false)
OPTION(mon_debug_dump_location, OPT_STR, "/var/log/ceph/$cluster-$name.tdump")
-OPTION(mon_sync_leader_kill_at, OPT_INT, 0) // kill the sync leader at a specifc point in the work flow
-OPTION(mon_sync_provider_kill_at, OPT_INT, 0) // kill the sync provider at a specific point in the work flow
+OPTION(mon_sync_provider_kill_at, OPT_INT, 0) // kill the sync provider at a specific point in the work flow
OPTION(mon_sync_requester_kill_at, OPT_INT, 0) // kill the sync requester at a specific point in the work flow
OPTION(mon_leveldb_write_buffer_size, OPT_U64, 32*1024*1024) // monitor's leveldb write buffer size
OPTION(mon_leveldb_cache_size, OPT_U64, 512*1024*1024) // monitor's leveldb cache size
@@ -205,9 +200,9 @@ OPTION(paxos_stash_full_interval, OPT_INT, 25) // how often (in commits) to st
OPTION(paxos_max_join_drift, OPT_INT, 10) // max paxos iterations before we must first sync the monitor stores
OPTION(paxos_propose_interval, OPT_DOUBLE, 1.0) // gather updates for this long before proposing a map update
OPTION(paxos_min_wait, OPT_DOUBLE, 0.05) // min time to gather updates for after period of inactivity
+OPTION(paxos_min, OPT_INT, 500) // minimum number of paxos states to keep around
OPTION(paxos_trim_min, OPT_INT, 250) // number of extra proposals tolerated before trimming
OPTION(paxos_trim_max, OPT_INT, 500) // max number of extra proposals to trim at a time
-OPTION(paxos_trim_disabled_max_versions, OPT_INT, 108000) // maximum amount of versions we shall allow passing by without trimming
OPTION(paxos_service_trim_min, OPT_INT, 250) // minimum amount of versions to trigger a trim (0 disables it)
OPTION(paxos_service_trim_max, OPT_INT, 500) // maximum amount of versions to trim during a single proposal (0 disables it)
OPTION(clock_offset, OPT_DOUBLE, 0) // how much to offset the system clock in Clock.cc
diff --git a/src/messages/MMonSync.h b/src/messages/MMonSync.h
index 6183578e167..a5415a8f451 100644
--- a/src/messages/MMonSync.h
+++ b/src/messages/MMonSync.h
@@ -17,85 +17,24 @@
class MMonSync : public Message
{
- static const int HEAD_VERSION = 1;
- static const int COMPAT_VERSION = 1;
+ static const int HEAD_VERSION = 2;
+ static const int COMPAT_VERSION = 2;
public:
/**
* Operation types
*/
enum {
- /**
- * Start synchronization request
- * (mon.X -> Leader)
- */
- OP_START = 1,
- /**
- * Reply to an OP_START
- * (Leader -> mon.X)
- */
- OP_START_REPLY = 2,
- /**
- * Let the Leader know we are still synchronizing
- * (mon.X -> Leader)
- */
- OP_HEARTBEAT = 3,
- /**
- * Reply to a hearbeat
- * (Leader -> mon.X)
- */
- OP_HEARTBEAT_REPLY = 4,
- /**
- * Let the Leader know we finished synchronizing
- * (mon.X -> Leader)
- */
- OP_FINISH = 5,
- /**
- * Request a given monitor (mon.Y) to start synchronizing with us, hence
- * sending us chunks.
- * (mon.X -> mon.Y)
- */
- OP_START_CHUNKS = 6,
- /**
- * Send a chunk to a given monitor (mon.X)
- * (mon.Y -> mon.X)
- */
- OP_CHUNK = 7,
- /**
- * Acknowledge that we received the last chunk sent
- * (mon.X -> mon.Y)
- */
- OP_CHUNK_REPLY = 8,
- /**
- * Reply to an OP_FINISH
- * (Leader -> mon.X)
- */
- OP_FINISH_REPLY = 9,
- /**
- * Let the receiver know that he should abort whatever he is in the middle
- * of doing with the sender.
- */
- OP_ABORT = 10,
+ OP_GET_COOKIE_FULL = 1, // -> start a session (full scan)
+ OP_GET_COOKIE_RECENT = 2, // -> start a session (only recent paxos events)
+ OP_COOKIE = 3, // <- pass the iterator cookie, or
+ OP_GET_CHUNK = 4, // -> get some keys
+ OP_CHUNK = 5, // <- return some keys
+ OP_LAST_CHUNK = 6, // <- return the last set of keys
+ OP_NO_COOKIE = 8, // <- sorry, no cookie
};
/**
- * Chunk is the last available
- */
- const static uint8_t FLAG_LAST = 0x01;
- /**
- * Let the other monitor it should retry again its last operation.
- */
- const static uint8_t FLAG_RETRY = 0x02;
- /**
- * This message contains a crc
- */
- const static uint8_t FLAG_CRC = 0x04;
- /**
- * Do not reply to this message to the sender, but to @p reply_to.
- */
- const static uint8_t FLAG_REPLY_TO = 0x08;
-
- /**
* Obtain a string corresponding to the operation type @p op
*
* @param op Operation type
@@ -103,119 +42,68 @@ public:
*/
static const char *get_opname(int op) {
switch (op) {
- case OP_START: return "start";
- case OP_START_REPLY: return "start_reply";
- case OP_HEARTBEAT: return "heartbeat";
- case OP_HEARTBEAT_REPLY: return "heartbeat_reply";
- case OP_FINISH: return "finish";
- case OP_FINISH_REPLY: return "finish_reply";
- case OP_START_CHUNKS: return "start_chunks";
+ case OP_GET_COOKIE_FULL: return "get_cookie_full";
+ case OP_GET_COOKIE_RECENT: return "get_cookie_recent";
+ case OP_COOKIE: return "cookie";
+ case OP_GET_CHUNK: return "get_chunk";
case OP_CHUNK: return "chunk";
- case OP_CHUNK_REPLY: return "chunk_reply";
- case OP_ABORT: return "abort";
+ case OP_LAST_CHUNK: return "last_chunk";
+ case OP_NO_COOKIE: return "no_cookie";
default: assert("unknown op type"); return NULL;
}
}
uint32_t op;
- uint8_t flags;
- version_t version;
- bufferlist chunk_bl;
+ uint64_t cookie;
+ version_t last_committed;
pair<string,string> last_key;
- __u32 crc;
+ bufferlist chunk_bl;
entity_inst_t reply_to;
MMonSync()
: Message(MSG_MON_SYNC, HEAD_VERSION, COMPAT_VERSION)
{ }
- MMonSync(uint32_t op)
- : Message(MSG_MON_SYNC, HEAD_VERSION, COMPAT_VERSION),
- op(op), flags(0), version(0), crc(0)
- { }
-
- MMonSync(uint32_t op, bufferlist bl, uint8_t flags = 0)
- : Message(MSG_MON_SYNC, HEAD_VERSION, COMPAT_VERSION),
- op(op), flags(flags), version(0), chunk_bl(bl), crc(0)
- { }
-
- MMonSync(MMonSync *m)
+ MMonSync(uint32_t op, uint64_t c = 0)
: Message(MSG_MON_SYNC, HEAD_VERSION, COMPAT_VERSION),
- op(m->op), flags(m->flags), version(m->version),
- chunk_bl(m->chunk_bl), last_key(m->last_key),
- crc(m->crc), reply_to(m->reply_to)
+ op(op),
+ cookie(c),
+ last_committed(0)
{ }
- /**
- * Obtain this message type's name */
const char *get_type_name() const { return "mon_sync"; }
- void set_reply_to(entity_inst_t other) {
- reply_to = other;
- flags |= FLAG_REPLY_TO;
- }
-
- /**
- * Print this message in a pretty format to @p out
- *
- * @param out The output stream to output to
- */
void print(ostream& out) const {
- out << "mon_sync( " << get_opname(op);
-
- if (version > 0)
- out << " v " << version;
-
- if (flags) {
- out << " flags( ";
- if (flags & FLAG_LAST)
- out << "last ";
- if (flags & FLAG_RETRY)
- out << "retry ";
- if (flags & FLAG_CRC)
- out << "crc(" << crc << ") ";
- if (flags & FLAG_REPLY_TO)
- out << "reply-to(" << reply_to << ") ";
- out << ")";
- }
-
+ out << "mon_sync(" << get_opname(op);
+ if (cookie)
+ out << " cookie " << cookie;
+ if (last_committed > 0)
+ out << " lc " << last_committed;
if (chunk_bl.length())
out << " bl " << chunk_bl.length() << " bytes";
-
- if (!last_key.first.empty() || !last_key.second.empty()) {
- out << " last_key ( " << last_key.first << ","
- << last_key.second << " )";
- }
-
- out << " )";
+ if (!last_key.first.empty() || !last_key.second.empty())
+ out << " last_key " << last_key.first << "," << last_key.second;
+ out << ")";
}
- /**
- * Encode this message into the Message's payload
- */
void encode_payload(uint64_t features) {
::encode(op, payload);
- ::encode(flags, payload);
- ::encode(version, payload);
- ::encode(chunk_bl, payload);
+ ::encode(cookie, payload);
+ ::encode(last_committed, payload);
::encode(last_key.first, payload);
::encode(last_key.second, payload);
- ::encode(crc, payload);
+ ::encode(chunk_bl, payload);
::encode(reply_to, payload);
}
- /**
- * Decode the message's payload into this message
- */
void decode_payload() {
bufferlist::iterator p = payload.begin();
::decode(op, p);
- ::decode(flags, p);
- ::decode(version, p);
- ::decode(chunk_bl, p);
+ ::decode(cookie, p);
+ ::decode(last_committed, p);
::decode(last_key.first, p);
::decode(last_key.second, p);
- ::decode(crc, p);
+ ::decode(chunk_bl, p);
::decode(reply_to, p);
}
};
diff --git a/src/mon/Monitor.cc b/src/mon/Monitor.cc
index b2676ec4357..a78ad741ede 100644
--- a/src/mon/Monitor.cc
+++ b/src/mon/Monitor.cc
@@ -144,14 +144,13 @@ Monitor::Monitor(CephContext* cct_, string nm, MonitorDBStore *s,
leader(0),
quorum_features(0),
- // trim & store sync
- sync_role(SYNC_ROLE_NONE),
- trim_lock("Monitor::trim_lock"),
- trim_enable_timer(NULL),
- sync_rng(getpid()),
- sync_state(SYNC_STATE_NONE),
- sync_leader(),
- sync_provider(),
+ // sync state
+ sync_provider_count(0),
+ sync_cookie(0),
+ sync_full(false),
+ sync_start_version(0),
+ sync_timeout_event(NULL),
+ sync_last_committed_floor(0),
timecheck_round(0),
timecheck_acks(0),
@@ -240,8 +239,6 @@ void Monitor::do_admin_command(string command, string args, ostream& ss)
_mon_status(ss);
else if (command == "quorum_status")
_quorum_status(ss);
- else if (command == "sync_status")
- _sync_status(ss);
else if (command == "sync_force") {
if (args != "--yes-i-really-mean-it") {
ss << "are you SURE? this will mean the monitor store will be erased "
@@ -249,7 +246,7 @@ void Monitor::do_admin_command(string command, string args, ostream& ss)
"'--yes-i-really-mean-it' if you really do.";
return;
}
- _sync_force(ss);
+ sync_force(ss);
} else if (command.find("add_bootstrap_peer_hint") == 0)
_add_bootstrap_peer_hint(command, args, ss);
else
@@ -416,7 +413,7 @@ int Monitor::preinit()
// We have a potentially inconsistent store state in hands. Get rid of it
// and start fresh.
bool clear_store = false;
- if (is_sync_on_going()) {
+ if (store->exists("mon_sync", "in_sync")) {
dout(1) << __func__ << " clean up potentially inconsistent store state"
<< dendl;
clear_store = true;
@@ -430,14 +427,12 @@ int Monitor::preinit()
if (clear_store) {
set<string> sync_prefixes = get_sync_targets_names();
store->clear(sync_prefixes);
-
- MonitorDBStore::Transaction t;
- t.erase("mon_sync", "in_sync");
- t.erase("mon_sync", "force_sync");
- store->apply_transaction(t);
}
}
+ sync_last_committed_floor = store->get("mon_sync", "last_committed_floor");
+ dout(10) << "sync_last_committed_floor " << sync_last_committed_floor << dendl;
+
init_paxos();
health_monitor->init();
@@ -484,9 +479,6 @@ int Monitor::preinit()
r = admin_socket->register_command("quorum_status", "quorum_status",
admin_hook, "show current quorum status");
assert(r == 0);
- r = admin_socket->register_command("sync_status", "sync_status", admin_hook,
- "show current synchronization status");
- assert(r == 0);
r = admin_socket->register_command("add_bootstrap_peer_hint",
"add_bootstrap_peer_hint name=addr,type=CephIPAddr",
admin_hook,
@@ -526,10 +518,7 @@ void Monitor::init_paxos()
paxos_service[i]->init();
}
- // update paxos
- if (paxos->is_consistent()) {
- refresh_from_paxos(NULL);
- }
+ refresh_from_paxos(NULL);
}
void Monitor::refresh_from_paxos(bool *need_bootstrap)
@@ -582,7 +571,6 @@ void Monitor::shutdown()
AdminSocket* admin_socket = cct->get_admin_socket();
admin_socket->unregister_command("mon_status");
admin_socket->unregister_command("quorum_status");
- admin_socket->unregister_command("sync_status");
admin_socket->unregister_command("add_bootstrap_peer_hint");
delete admin_hook;
admin_hook = NULL;
@@ -625,6 +613,7 @@ void Monitor::bootstrap()
{
dout(10) << "bootstrap" << dendl;
+ sync_reset();
unregister_cluster_logger();
cancel_probe_timeout();
@@ -646,8 +635,6 @@ void Monitor::bootstrap()
messenger->mark_down_all();
}
- reset_sync();
-
// reset
state = STATE_PROBING;
@@ -738,615 +725,25 @@ void Monitor::reset()
health_monitor->finish();
}
-set<string> Monitor::get_sync_targets_names() {
+
+// -----------------------------------------------------------
+// sync
+
+set<string> Monitor::get_sync_targets_names()
+{
set<string> targets;
targets.insert(paxos->get_name());
for (int i = 0; i < PAXOS_NUM; ++i)
- targets.insert(paxos_service[i]->get_service_name());
+ paxos_service[i]->get_store_prefixes(targets);
return targets;
}
-/**
- * Reset any lingering sync/trim informations we might have.
- */
-void Monitor::reset_sync(bool abort)
-{
- dout(10) << __func__ << dendl;
- // clear everything trim/sync related
- {
- map<entity_inst_t,Context*>::iterator iter = trim_timeouts.begin();
- for (; iter != trim_timeouts.end(); ++iter) {
- if (!iter->second)
- continue;
-
- timer.cancel_event(iter->second);
- if (abort) {
- MMonSync *msg = new MMonSync(MMonSync::OP_ABORT);
- entity_inst_t other = iter->first;
- messenger->send_message(msg, other);
- }
- }
- trim_timeouts.clear();
- }
- {
- map<entity_inst_t,SyncEntity>::iterator iter = sync_entities.begin();
- for (; iter != sync_entities.end(); ++iter) {
- (*iter).second->cancel_timeout();
- }
- sync_entities.clear();
- }
-
- sync_entities_states.clear();
- trim_entities_states.clear();
-
- sync_leader.reset();
- sync_provider.reset();
-
- sync_state = SYNC_STATE_NONE;
- sync_role = SYNC_ROLE_NONE;
-}
-
-// leader
-
-void Monitor::sync_send_heartbeat(entity_inst_t &other, bool reply)
-{
- dout(10) << __func__ << " " << other << " reply(" << reply << ")" << dendl;
- uint32_t op = (reply ? MMonSync::OP_HEARTBEAT_REPLY : MMonSync::OP_HEARTBEAT);
- MMonSync *msg = new MMonSync(op);
- messenger->send_message(msg, other);
-}
-
-void Monitor::handle_sync_start(MMonSync *m)
-{
- dout(10) << __func__ << " " << *m << dendl;
-
- /* If we are not the leader, then some monitor picked us as the point of
- * entry to the quorum during its synchronization process. Therefore, we
- * have an obligation of forwarding this message to leader, so the sender
- * can start synchronizing.
- */
- if (!is_leader() && !quorum.empty()) {
- assert(!(sync_role & SYNC_ROLE_REQUESTER));
- assert(!(sync_role & SYNC_ROLE_LEADER));
- assert(!is_synchronizing());
-
- entity_inst_t leader = monmap->get_inst(get_leader());
- MMonSync *msg = new MMonSync(m);
- // keep forwarding the message up the chain if it has already been
- // forwarded.
- if (!(m->flags & MMonSync::FLAG_REPLY_TO)) {
- msg->set_reply_to(m->get_source_inst());
- }
- dout(10) << __func__ << " forward " << *m
- << " to leader at " << leader << dendl;
- assert(g_conf->mon_sync_provider_kill_at != 1);
- messenger->send_message(msg, leader);
- assert(g_conf->mon_sync_provider_kill_at != 2);
- m->put();
- return;
- }
-
- // If we are synchronizing, then it means that we know someone who has a
- // higher version than the one we have; and if someone attempted to sync
- // from us, then that must mean they have a lower version than us.
- // Therefore, they must be much more interested in synchronizing from the
- // one we are trying to synchronize from than they are from us.
- // Moreover, if we are already synchronizing under the REQUESTER role, then
- // we must know someone who is in the quorum, either because we were lucky
- // enough to contact them in the first place or we managed to contact
- // someone who knew who they were. Therefore, just forward this request to
- // our sync leader.
- if (is_synchronizing()) {
- assert(!(sync_role & SYNC_ROLE_LEADER));
- assert(!(sync_role & SYNC_ROLE_PROVIDER));
- assert(quorum.empty());
- assert(sync_leader.get() != NULL);
-
- /**
- * This looks a bit odd, but we've seen cases where sync start messages
- * get bounced around and end up at the originator without anybody
- * noticing!* If it happens, just drop the message and the timeouts
- * will clean everything up -- eventually.
- * [*] If a leader gets elected who is too far behind, he'll drop into
- * bootstrap and sync, but the person he sends his sync to thinks he's
- * still the leader and forwards the reply back.
- */
- if (m->reply_to == messenger->get_myinst()) {
- m->put();
- return;
- }
-
- dout(10) << __func__ << " forward " << *m
- << " to our sync leader at "
- << sync_leader->entity << dendl;
-
- MMonSync *msg = new MMonSync(m);
- // keep forwarding the message up the chain if it has already been
- // forwarded.
- if (!(m->flags & MMonSync::FLAG_REPLY_TO)) {
- msg->set_reply_to(m->get_source_inst());
- }
- messenger->send_message(msg, sync_leader->entity);
- m->put();
- return;
- }
-
- // At this point we may or may not be the leader. If we are not the leader,
- // it means that we are not in the quorum, but someone would still very much
- // like to synchronize with us. In certain circumstances, letting them
- // synchronize with us is by far our only option -- for instance, when we
- // need them to form a quorum and they have started fresh or are severely
- // out of date --, but it won't hurt to let them sync from us anyway: if
- // they chose us, then they must have noticed that we had a higher version
- // than they do, so it makes sense to let them try their luck and join the
- // party.
-
- Mutex::Locker l(trim_lock);
- entity_inst_t other =
- (m->flags & MMonSync::FLAG_REPLY_TO ? m->reply_to : m->get_source_inst());
-
- assert(g_conf->mon_sync_leader_kill_at != 1);
-
- if (trim_timeouts.count(other) > 0) {
- dout(1) << __func__ << " sync session already in progress for " << other
- << dendl;
-
- if (trim_entities_states[other] != SYNC_STATE_NONE) {
- dout(1) << __func__ << " ignore stray message" << dendl;
- m->put();
- return;
- }
-
- dout(1) << __func__<< " destroying current state and creating new"
- << dendl;
-
- if (trim_timeouts[other])
- timer.cancel_event(trim_timeouts[other]);
- trim_timeouts.erase(other);
- trim_entities_states.erase(other);
- }
-
- MMonSync *msg = new MMonSync(MMonSync::OP_START_REPLY);
-
- if ((!quorum.empty() && paxos->should_trim())
- || (trim_enable_timer != NULL)) {
- msg->flags |= MMonSync::FLAG_RETRY;
- } else {
- trim_timeouts.insert(make_pair(other, new C_TrimTimeout(this, other)));
- timer.add_event_after(g_conf->mon_sync_trim_timeout, trim_timeouts[other]);
-
- trim_entities_states[other] = SYNC_STATE_START;
- sync_role |= SYNC_ROLE_LEADER;
-
- paxos->trim_disable();
-
- // Is the one that contacted us in the quorum?
- if (quorum.count(m->get_source().num())) {
- // Was it forwarded by someone?
- if (m->flags & MMonSync::FLAG_REPLY_TO) {
- // Then they must not be synchronizing. What sense would that make, eh?
- assert(trim_timeouts.count(m->get_source_inst()) == 0);
- // Set the provider as the one that contacted us. He's in the
- // quorum, so he's up to the job (i.e., he's not synchronizing)
- msg->set_reply_to(m->get_source_inst());
- dout(10) << __func__ << " set provider to " << msg->reply_to << dendl;
- } else {
- // Then they must have gotten into the quorum, and boostrapped.
- // We should tell them to abort and bootstrap ourselves.
- msg->put();
- msg = new MMonSync(MMonSync::OP_ABORT);
- messenger->send_message(msg, other);
- m->put();
- bootstrap();
- return;
- }
- } else if (!quorum.empty()) {
- // grab someone from the quorum and assign them as the sync provider
- int n = _pick_random_quorum_mon(rank);
- if (n >= 0) {
- msg->set_reply_to(monmap->get_inst(n));
- dout(10) << __func__ << " set quorum-based provider to "
- << msg->reply_to << dendl;
- } else {
- assert(0 == "We shouldn't get here!");
- }
- } else {
- // There is no quorum, so we must either be in a quorum-less cluster,
- // or we must be mid-election. Either way, tell them it is okay to
- // sync from us by not setting the reply-to field.
- assert(!(msg->flags & MMonSync::FLAG_REPLY_TO));
- }
- }
- messenger->send_message(msg, other);
- m->put();
-
- assert(g_conf->mon_sync_leader_kill_at != 2);
-}
-
-void Monitor::handle_sync_heartbeat(MMonSync *m)
-{
- dout(10) << __func__ << " " << *m << dendl;
-
- entity_inst_t other = m->get_source_inst();
- if (!(sync_role & SYNC_ROLE_LEADER)
- || !trim_entities_states.count(other)
- || (trim_entities_states[other] != SYNC_STATE_START)) {
- // stray message; ignore.
- dout(1) << __func__ << " ignored stray message " << *m << dendl;
- m->put();
- return;
- }
-
- if (!is_leader() && !quorum.empty()
- && (trim_timeouts.count(other) > 0)) {
- // we must have been the leader before, but we lost leadership to
- // someone else.
- sync_finish_abort(other);
- m->put();
- return;
- }
-
- assert(trim_timeouts.count(other) > 0);
-
- if (trim_timeouts[other])
- timer.cancel_event(trim_timeouts[other]);
- trim_timeouts[other] = new C_TrimTimeout(this, other);
- timer.add_event_after(g_conf->mon_sync_trim_timeout, trim_timeouts[other]);
-
- assert(g_conf->mon_sync_leader_kill_at != 3);
- sync_send_heartbeat(other, true);
- assert(g_conf->mon_sync_leader_kill_at != 4);
-
- m->put();
-}
-
-void Monitor::sync_finish(entity_inst_t &entity, bool abort)
-{
- dout(10) << __func__ << " entity(" << entity << ")" << dendl;
-
- Mutex::Locker l(trim_lock);
-
- if (!trim_timeouts.count(entity)) {
- dout(1) << __func__ << " we know of no sync effort from "
- << entity << " -- ignore it." << dendl;
- return;
- }
-
- if (trim_timeouts[entity] != NULL)
- timer.cancel_event(trim_timeouts[entity]);
-
- trim_timeouts.erase(entity);
- trim_entities_states.erase(entity);
-
- if (abort) {
- MMonSync *m = new MMonSync(MMonSync::OP_ABORT);
- assert(g_conf->mon_sync_leader_kill_at != 5);
- messenger->send_message(m, entity);
- assert(g_conf->mon_sync_leader_kill_at != 6);
- }
-
- if (!trim_timeouts.empty())
- return;
-
- dout(10) << __func__ << " no longer a sync leader" << dendl;
- sync_role &= ~SYNC_ROLE_LEADER;
-
- // we may have been the leader, but by now we may no longer be.
- // this can happen when the we sync'ed a monitor that became the
- // leader, or that same monitor simply came back to life and got
- // elected as the new leader.
- if (is_leader() && paxos->is_trim_disabled()) {
- trim_enable_timer = new C_TrimEnable(this);
- timer.add_event_after(30.0, trim_enable_timer);
- }
-
- finish_contexts(g_ceph_context, maybe_wait_for_quorum);
-}
-
-void Monitor::handle_sync_finish(MMonSync *m)
-{
- dout(10) << __func__ << " " << *m << dendl;
-
- entity_inst_t other = m->get_source_inst();
-
- if (!trim_timeouts.count(other) || !trim_entities_states.count(other)
- || (trim_entities_states[other] != SYNC_STATE_START)) {
- dout(1) << __func__ << " ignored stray message from " << other << dendl;
- if (!trim_timeouts.count(other))
- dout(1) << __func__ << " not on trim_timeouts" << dendl;
- if (!trim_entities_states.count(other))
- dout(1) << __func__ << " not on trim_entities_states" << dendl;
- else if (trim_entities_states[other] != SYNC_STATE_START)
- dout(1) << __func__ << " state " << trim_entities_states[other] << dendl;
- m->put();
- return;
- }
-
- // We may no longer the leader. In such case, we should just inform the
- // other monitor that he should abort his sync. However, it appears that
- // his sync has finished, so there is no use in scraping the whole thing
- // now. Therefore, just go along and acknowledge.
- if (!is_leader()) {
- dout(10) << __func__ << " We are no longer the leader; reply nonetheless"
- << dendl;
- }
-
- MMonSync *msg = new MMonSync(MMonSync::OP_FINISH_REPLY);
- assert(g_conf->mon_sync_leader_kill_at != 7);
- messenger->send_message(msg, other);
- assert(g_conf->mon_sync_leader_kill_at != 8);
-
- sync_finish(other);
- m->put();
-}
-
-// end of leader
-
-// synchronization provider
-int Monitor::_pick_random_mon(int other)
-{
- assert(monmap->size() > 0);
- if (monmap->size() == 1)
- return 0;
-
- int max = monmap->size();
- if (other >= 0)
- max--;
- int n = sync_rng() % max;
- if (other >= 0 && n >= other)
- n++;
- return n;
-}
-
-int Monitor::_pick_random_quorum_mon(int other)
-{
- assert(monmap->size() > 0);
- if (quorum.empty())
- return -1;
- set<int>::iterator p = quorum.begin();
- for (int n = sync_rng() % quorum.size(); p != quorum.end() && n; ++p, --n);
- if (other >= 0 && p != quorum.end() && *p == other)
- ++p;
-
- return (p == quorum.end() ? *(quorum.rbegin()) : *p);
-}
-
-void Monitor::sync_timeout(entity_inst_t &entity)
-{
- if (state == STATE_SYNCHRONIZING) {
- assert(sync_role == SYNC_ROLE_REQUESTER);
- assert(sync_state == SYNC_STATE_CHUNKS);
-
- // we are a sync requester; our provider just timed out, so find another
- // monitor to synchronize with.
- dout(1) << __func__ << " " << sync_provider->entity << dendl;
-
- sync_provider->attempts++;
- if ((sync_provider->attempts > g_conf->mon_sync_max_retries)
- || (monmap->size() == 2)) {
- // We either tried too many times to sync, or there's just us and the
- // monitor we were attempting to sync with.
- // Therefore, just abort the whole sync and start off fresh whenever he
- // (or somebody else) comes back.
- sync_requester_abort();
- return;
- }
-
- unsigned int i = 0;
- int entity_rank = monmap->get_rank(entity.addr);
- int debug_mon = g_conf->mon_sync_debug_provider;
- int debug_fallback = g_conf->mon_sync_debug_provider_fallback;
-
- dout(10) << __func__ << " entity " << entity << " rank " << entity_rank << dendl;
- dout(10) << __func__ << " our-rank " << rank << dendl;
-
- while ((i++) < 2*monmap->size()) {
- // we are trying to pick a random monitor, but we cannot do this forever.
- // in case something goes awfully wrong, just stop doing it after a
- // couple of attempts and try again later.
- int new_mon = _pick_random_mon(rank);
-
- if (debug_fallback >= 0) {
- if (entity_rank != debug_fallback)
- new_mon = debug_fallback;
- else if (debug_mon >= 0 && (entity_rank != debug_mon))
- new_mon = debug_mon;
- }
-
- dout(10) << __func__ << " randomly picking mon rank " << new_mon << dendl;
-
- if ((new_mon != rank) && (new_mon != entity_rank)) {
- sync_provider->entity = monmap->get_inst(new_mon);
- dout(10) << __func__ << " randomly choosing " << sync_provider->entity << " rank " << new_mon << dendl;
- sync_state = SYNC_STATE_START;
- sync_start_chunks(sync_provider);
- return;
- }
- }
-
- // well that sucks. Let's see if we can find a monitor to connect to
- for (int i = 0; i < (int)monmap->size(); ++i) {
- entity_inst_t i_inst = monmap->get_inst(i);
- if (i != rank && i_inst != entity) {
- sync_provider->entity = i_inst;
- sync_state = SYNC_STATE_START;
- sync_start_chunks(sync_provider);
- return;
- }
- }
-
- assert(0 == "Unable to find a new monitor to connect to. Not cool.");
- } else if (sync_role & SYNC_ROLE_PROVIDER) {
- dout(10) << __func__ << " cleanup " << entity << dendl;
- sync_provider_cleanup(entity);
- return;
- } else
- assert(0 == "We should never reach this");
-}
-
-void Monitor::sync_provider_cleanup(entity_inst_t &entity)
-{
- dout(10) << __func__ << " " << entity << dendl;
- if (sync_entities.count(entity) > 0) {
- sync_entities[entity]->cancel_timeout();
- sync_entities.erase(entity);
- sync_entities_states.erase(entity);
- }
-
- if (sync_entities.empty()) {
- dout(1) << __func__ << " no longer a sync provider" << dendl;
- sync_role &= ~SYNC_ROLE_PROVIDER;
- }
-}
-
-void Monitor::handle_sync_start_chunks(MMonSync *m)
-{
- dout(10) << __func__ << " " << *m << dendl;
- assert(!(sync_role & SYNC_ROLE_REQUESTER));
-
- entity_inst_t other = m->get_source_inst();
-
- // if we have a sync going on for this entity, just drop the message. If it
- // was a stray message, we did the right thing. If it wasn't, then it means
- // that we still have an old state of this entity, and that the said entity
- // failed in the meantime and is now up again; therefore, just let the
- // timeout timers fulfill their purpose and deal with state cleanup when
- // they are triggered. Until then, no Sir, we won't accept your messages.
- if (sync_entities.count(other) > 0) {
- dout(1) << __func__ << " sync session already in progress for " << other
- << " -- assumed as stray message." << dendl;
- m->put();
- return;
- }
-
- SyncEntity sync = get_sync_entity(other, this);
- sync->version = paxos->get_version();
-
- if (!m->last_key.first.empty() && !m->last_key.second.empty()) {
- sync->last_received_key = m->last_key;
- dout(10) << __func__ << " set last received key to ("
- << sync->last_received_key.first << ","
- << sync->last_received_key.second << ")" << dendl;
- }
-
- sync->sync_init();
-
- sync_entities.insert(make_pair(other, sync));
- sync_entities_states[other] = SYNC_STATE_START;
- sync_role |= SYNC_ROLE_PROVIDER;
-
- sync_send_chunks(sync);
- m->put();
-}
-
-void Monitor::handle_sync_chunk_reply(MMonSync *m)
-{
- dout(10) << __func__ << " " << *m << dendl;
-
- entity_inst_t other = m->get_source_inst();
-
- if (!(sync_role & SYNC_ROLE_PROVIDER)
- || !sync_entities.count(other)
- || (sync_entities_states[other] != SYNC_STATE_START)) {
- dout(1) << __func__ << " ignored stray message from " << other << dendl;
- m->put();
- return;
- }
-
- if (m->flags & MMonSync::FLAG_LAST) {
- // they acked the last chunk. Clean up.
- sync_provider_cleanup(other);
- m->put();
- return;
- }
-
- sync_send_chunks(sync_entities[other]);
- m->put();
-}
-
-void Monitor::sync_send_chunks(SyncEntity sync)
-{
- dout(10) << __func__ << " entity(" << sync->entity << ")" << dendl;
-
- sync->cancel_timeout();
-
- assert(sync->synchronizer.use_count() > 0);
- assert(sync->synchronizer->has_next_chunk());
-
- MMonSync *msg = new MMonSync(MMonSync::OP_CHUNK);
-
- sync->synchronizer->get_chunk(msg->chunk_bl);
- msg->last_key = sync->synchronizer->get_last_key();
- dout(10) << __func__ << " last key ("
- << msg->last_key.first << ","
- << msg->last_key.second << ")" << dendl;
-
- sync->sync_update();
-
- if (sync->has_crc()) {
- msg->flags |= MMonSync::FLAG_CRC;
- msg->crc = sync->crc_get();
- sync->crc_clear();
- }
-
- if (!sync->synchronizer->has_next_chunk()) {
- msg->flags |= MMonSync::FLAG_LAST;
- msg->version = sync->get_version();
- sync->synchronizer.reset();
- }
-
- sync->set_timeout(new C_SyncTimeout(this, sync->entity),
- g_conf->mon_sync_timeout);
- assert(g_conf->mon_sync_provider_kill_at != 3);
- messenger->send_message(msg, sync->entity);
- assert(g_conf->mon_sync_provider_kill_at != 4);
-
- // kill the monitor as soon as we move into synchronizing the paxos versions.
- // This is intended as debug.
- if (sync->sync_state == SyncEntityImpl::STATE_PAXOS)
- assert(g_conf->mon_sync_provider_kill_at != 5);
-
-
-}
-// end of synchronization provider
-
-// start of synchronization requester
-
-void Monitor::sync_requester_abort()
+void Monitor::sync_timeout()
{
dout(10) << __func__ << dendl;
assert(state == STATE_SYNCHRONIZING);
- assert(sync_role == SYNC_ROLE_REQUESTER);
-
- if (sync_leader.get() != NULL) {
- dout(10) << __func__ << " leader " << sync_leader->entity << dendl;
- sync_leader->cancel_timeout();
- sync_leader.reset();
- }
-
- if (sync_provider.get() != NULL) {
- dout(10) << __func__ << " provider " << sync_provider->entity << dendl;
- sync_provider->cancel_timeout();
-
- MMonSync *msg = new MMonSync(MMonSync::OP_ABORT);
- messenger->send_message(msg, sync_provider->entity);
-
- sync_provider.reset();
- }
-
- // Given that we are explicitely aborting the whole sync process, we should
- // play it safe and clear the store.
- set<string> targets = get_sync_targets_names();
- store->clear(targets);
-
- dout(1) << __func__ << " no longer a sync requester" << dendl;
- sync_role = SYNC_ROLE_NONE;
- sync_state = SYNC_STATE_NONE;
-
- state = 0;
-
bootstrap();
}
@@ -1400,473 +797,392 @@ void Monitor::sync_obtain_latest_monmap(bufferlist &bl)
latest_monmap.encode(bl, CEPH_FEATURES_ALL);
}
-/**
- *
- */
-void Monitor::sync_store_init()
+void Monitor::sync_reset()
{
- MonitorDBStore::Transaction t;
- t.put("mon_sync", "in_sync", 1);
-
- bufferlist backup_monmap;
- sync_obtain_latest_monmap(backup_monmap);
- assert(backup_monmap.length() > 0);
-
- t.put("mon_sync", "latest_monmap", backup_monmap);
-
- store->apply_transaction(t);
-}
+ if (sync_timeout_event) {
+ timer.cancel_event(sync_timeout_event);
+ sync_timeout_event = NULL;
+ }
-void Monitor::sync_store_cleanup()
-{
- MonitorDBStore::Transaction t;
- t.erase("mon_sync", "in_sync");
- t.erase("mon_sync", "latest_monmap");
- store->apply_transaction(t);
-}
+ // leader state
+ sync_providers.clear();
-bool Monitor::is_sync_on_going()
-{
- return store->exists("mon_sync", "in_sync");
+ // requester state
+ sync_provider = entity_inst_t();
+ sync_cookie = 0;
+ sync_full = false;
+ sync_start_version = 0;
}
/**
- * Start Sync process
+ * Start sync process
*
- * Create SyncEntity instances for the leader and the provider;
- * Send OP_START message to the leader;
- * Set trim timeout on the leader
+ * Start pulling committed state from another monitor.
*
* @param other Synchronization provider to-be.
+ * @param whether to do a full sync or just catch up on recent paxos
*/
-void Monitor::sync_start(entity_inst_t &other)
+void Monitor::sync_start(entity_inst_t &other, bool full)
{
- cancel_probe_timeout();
+ dout(10) << __func__ << " " << other << (full ? " full" : " recent") << dendl;
- dout(10) << __func__ << " entity( " << other << " )" << dendl;
- if ((state == STATE_SYNCHRONIZING) && (sync_role == SYNC_ROLE_REQUESTER)) {
- dout(1) << __func__ << " already synchronizing; drop it" << dendl;
- return;
- }
+ assert(state == STATE_PROBING ||
+ state == STATE_SYNCHRONIZING);
+ state = STATE_SYNCHRONIZING;
- // Looks like we are the acting leader for someone. Better force them to
- // abort their endeavours. After all, if they are trying to sync from us,
- // it means that we must have a higher paxos version than the one they
- // have; however, if we are trying to sync as well, it must mean that
- // someone has a higher version than the one we have. Everybody wins if
- // we force them to cancel their sync and try again.
- if (sync_role & SYNC_ROLE_LEADER) {
- dout(10) << __func__ << " we are acting as a leader to someone; "
- << "destroy their dreams" << dendl;
+ // make sure are not a provider for anyone!
+ sync_reset();
- assert(!trim_timeouts.empty());
- reset_sync();
- }
+ sync_full = full;
- assert(sync_role == SYNC_ROLE_NONE);
- assert(sync_state == SYNC_STATE_NONE);
+ if (sync_full) {
+ // mark that we are syncing
+ MonitorDBStore::Transaction t;
- state = STATE_SYNCHRONIZING;
- sync_role = SYNC_ROLE_REQUESTER;
- sync_state = SYNC_STATE_START;
+ bufferlist backup_monmap;
+ sync_obtain_latest_monmap(backup_monmap);
+ assert(backup_monmap.length() > 0);
- // First init the store (grab the monmap and all that) and only then
- // clear the store (except for the mon_sync prefix). This avoids that
- // we end up losing the monmaps from the store.
- sync_store_init();
+ sync_last_committed_floor = MAX(sync_last_committed_floor, paxos->get_version());
+ dout(10) << __func__ << " marking sync in progress, storing sync_last_commited_floor "
+ << sync_last_committed_floor << dendl;
- // clear the underlying store, since we are starting a whole
- // sync process from the bare beginning.
- set<string> targets = get_sync_targets_names();
- store->clear(targets);
+ t.put("mon_sync", "latest_monmap", backup_monmap);
+ t.put("mon_sync", "in_sync", 1);
+ t.put("mon_sync", "last_committed_floor", sync_last_committed_floor);
+ store->apply_transaction(t);
+ assert(g_conf->mon_sync_requester_kill_at != 1);
- // assume 'other' as the leader. We will update the leader once we receive
- // a reply to the sync start.
- entity_inst_t leader = other;
- entity_inst_t provider = other;
+ // clear the underlying store
+ set<string> targets = get_sync_targets_names();
+ dout(10) << __func__ << " clearing prefixes " << targets << dendl;
+ store->clear(targets);
- if (g_conf->mon_sync_debug_leader >= 0) {
- leader = monmap->get_inst(g_conf->mon_sync_debug_leader);
- dout(10) << __func__ << " assuming " << leader
- << " as the leader for debug" << dendl;
+ assert(g_conf->mon_sync_requester_kill_at != 2);
}
- if (g_conf->mon_sync_debug_provider >= 0) {
- provider = monmap->get_inst(g_conf->mon_sync_debug_provider);
- dout(10) << __func__ << " assuming " << provider
- << " as the provider for debug" << dendl;
- }
+ // assume 'other' as the leader. We will update the leader once we receive
+ // a reply to the sync start.
+ sync_provider = other;
- sync_leader = get_sync_entity(leader, this);
- sync_provider = get_sync_entity(provider, this);
+ sync_reset_timeout();
- // this message may bounce through 'other' (if 'other' is not the leader)
- // in order to reach the leader. Therefore, set a higher timeout to allow
- // breathing room for the reply message to reach us.
- sync_leader->set_timeout(new C_SyncStartTimeout(this),
- g_conf->mon_sync_trim_timeout*2);
+ MMonSync *m = new MMonSync(sync_full ? MMonSync::OP_GET_COOKIE_FULL : MMonSync::OP_GET_COOKIE_RECENT);
+ if (!sync_full)
+ m->last_committed = paxos->get_version();
+ messenger->send_message(m, sync_provider);
+}
- MMonSync *m = new MMonSync(MMonSync::OP_START);
- messenger->send_message(m, other);
- assert(g_conf->mon_sync_requester_kill_at != 1);
+void Monitor::sync_reset_timeout()
+{
+ dout(10) << __func__ << dendl;
+ if (sync_timeout_event)
+ timer.cancel_event(sync_timeout_event);
+ sync_timeout_event = new C_SyncTimeout(this);
+ timer.add_event_after(g_conf->mon_sync_timeout, sync_timeout_event);
}
-void Monitor::sync_start_chunks(SyncEntity provider)
+void Monitor::sync_finish(version_t last_committed)
{
- dout(10) << __func__ << " provider(" << provider->entity << ")" << dendl;
+ dout(10) << __func__ << " lc " << last_committed << " from " << sync_provider << dendl;
- assert(sync_role == SYNC_ROLE_REQUESTER);
- assert(sync_state == SYNC_STATE_START);
+ assert(g_conf->mon_sync_requester_kill_at != 7);
- sync_state = SYNC_STATE_CHUNKS;
+ if (sync_full) {
+ // finalize the paxos commits
+ MonitorDBStore::Transaction tx;
+ paxos->read_and_prepare_transactions(&tx, sync_start_version, last_committed);
+ tx.put(paxos->get_name(), "last_committed", last_committed);
- provider->set_timeout(new C_SyncTimeout(this, provider->entity),
- g_conf->mon_sync_timeout);
- MMonSync *msg = new MMonSync(MMonSync::OP_START_CHUNKS);
- pair<string,string> last_key = provider->last_received_key;
- if (!last_key.first.empty() && !last_key.second.empty())
- msg->last_key = last_key;
+ dout(30) << __func__ << " final tx dump:\n";
+ JSONFormatter f(true);
+ tx.dump(&f);
+ f.flush(*_dout);
+ *_dout << dendl;
- assert(g_conf->mon_sync_requester_kill_at != 4);
- messenger->send_message(msg, provider->entity);
- assert(g_conf->mon_sync_requester_kill_at != 5);
-}
+ store->apply_transaction(tx);
+ }
-void Monitor::sync_start_reply_timeout()
-{
- dout(10) << __func__ << dendl;
+ assert(g_conf->mon_sync_requester_kill_at != 8);
- assert(state == STATE_SYNCHRONIZING);
- assert(sync_role == SYNC_ROLE_REQUESTER);
- assert(sync_state == SYNC_STATE_START);
-
- // Restart the sync attempt. It's not as if we were going to lose a vast
- // amount of work, and if we take into account that we are timing out while
- // waiting for a reply from the Leader, it sure seems like the right path
- // to take.
- sync_requester_abort();
+ MonitorDBStore::Transaction t;
+ t.erase("mon_sync", "in_sync");
+ t.erase("mon_sync", "force_sync");
+ t.erase("mon_sync", "last_committed_floor");
+ store->apply_transaction(t);
+
+ sync_reset();
+
+ assert(g_conf->mon_sync_requester_kill_at != 9);
+
+ init_paxos();
+
+ assert(g_conf->mon_sync_requester_kill_at != 10);
+
+ bootstrap();
}
-void Monitor::handle_sync_start_reply(MMonSync *m)
+void Monitor::handle_sync(MMonSync *m)
{
dout(10) << __func__ << " " << *m << dendl;
+ switch (m->op) {
- entity_inst_t other = m->get_source_inst();
-
- if ((sync_role != SYNC_ROLE_REQUESTER)
- || (sync_state != SYNC_STATE_START)) {
- // If the leader has sent this message before we failed, there is no point
- // in replying to it, as he has no idea that we actually received it. On
- // the other hand, if he received one of our stray messages (because it was
- // delivered once he got back up after failing) and replied accordingly,
- // there is a chance that he did stopped trimming on our behalf. However,
- // we have no way to know it, and we really don't want to mess with his
- // state if that is not the case. Therefore, just drop it and let the
- // timeouts figure it out. Eventually.
- dout(1) << __func__ << " stray message -- drop it." << dendl;
- goto out;
- }
+ // provider ---------
- assert(state == STATE_SYNCHRONIZING);
- assert(sync_leader.get() != NULL);
- assert(sync_provider.get() != NULL);
-
- // We now know for sure who the leader is.
- sync_leader->entity = other;
- sync_leader->cancel_timeout();
-
- if (m->flags & MMonSync::FLAG_RETRY) {
- dout(10) << __func__ << " retrying sync at a later time" << dendl;
- sync_role = SYNC_ROLE_NONE;
- sync_state = SYNC_STATE_NONE;
- sync_leader->set_timeout(new C_SyncStartRetry(this, sync_leader->entity),
- g_conf->mon_sync_backoff_timeout);
- goto out;
- }
+ case MMonSync::OP_GET_COOKIE_FULL:
+ case MMonSync::OP_GET_COOKIE_RECENT:
+ handle_sync_get_cookie(m);
+ break;
+ case MMonSync::OP_GET_CHUNK:
+ handle_sync_get_chunk(m);
+ break;
- if (m->flags & MMonSync::FLAG_REPLY_TO) {
- dout(10) << __func__ << " leader told us to use " << m->reply_to
- << " as sync provider" << dendl;
- sync_provider->entity = m->reply_to;
- } else {
- dout(10) << __func__ << " synchronizing from leader at " << other << dendl;
- sync_provider->entity = other;
- }
+ // client -----------
- sync_leader->set_timeout(new C_HeartbeatTimeout(this),
- g_conf->mon_sync_heartbeat_timeout);
+ case MMonSync::OP_COOKIE:
+ handle_sync_cookie(m);
+ break;
- assert(g_conf->mon_sync_requester_kill_at != 2);
- sync_send_heartbeat(sync_leader->entity);
- assert(g_conf->mon_sync_requester_kill_at != 3);
+ case MMonSync::OP_CHUNK:
+ case MMonSync::OP_LAST_CHUNK:
+ handle_sync_chunk(m);
+ break;
+ case MMonSync::OP_NO_COOKIE:
+ handle_sync_no_cookie(m);
+ break;
- sync_start_chunks(sync_provider);
-out:
+ default:
+ dout(0) << __func__ << " unknown op " << m->op << dendl;
+ assert(0 == "unknown op");
+ }
m->put();
}
-void Monitor::handle_sync_heartbeat_reply(MMonSync *m)
+// leader
+
+void Monitor::_sync_reply_no_cookie(MMonSync *m)
{
- dout(10) << __func__ << " " << *m << dendl;
+ MMonSync *reply = new MMonSync(MMonSync::OP_NO_COOKIE, m->cookie);
+ messenger->send_message(reply, m->get_connection());
+}
- entity_inst_t other = m->get_source_inst();
- if ((sync_role != SYNC_ROLE_REQUESTER)
- || (sync_state == SYNC_STATE_NONE)
- || (sync_leader.get() == NULL)
- || (other != sync_leader->entity)) {
- dout(1) << __func__ << " stray message -- drop it." << dendl;
- m->put();
+void Monitor::handle_sync_get_cookie(MMonSync *m)
+{
+ if (is_synchronizing()) {
+ _sync_reply_no_cookie(m);
return;
}
- assert(state == STATE_SYNCHRONIZING);
- assert(sync_role == SYNC_ROLE_REQUESTER);
- assert(sync_state != SYNC_STATE_NONE);
+ assert(g_conf->mon_sync_provider_kill_at != 1);
- assert(sync_leader.get() != NULL);
- assert(sync_leader->entity == other);
+ // make up a unique cookie. include election epoch (which persists
+ // across restarts for the whole cluster) and a counter for this
+ // process instance. there is no need to be unique *across*
+ // monitors, though.
+ uint64_t cookie = ((unsigned long long)elector.get_epoch() << 24) + ++sync_provider_count;
+ assert(sync_providers.count(cookie) == 0);
- sync_leader->cancel_timeout();
- sync_leader->set_timeout(new C_HeartbeatInterval(this, sync_leader->entity),
- g_conf->mon_sync_heartbeat_interval);
- m->put();
+ dout(10) << __func__ << " cookie " << cookie << " for " << m->get_source_inst() << dendl;
+
+ SyncProvider& sp = sync_providers[cookie];
+ sp.cookie = cookie;
+ sp.entity = m->get_source_inst();
+ sp.reset_timeout(g_ceph_context, g_conf->mon_sync_timeout * 2);
+
+ set<string> sync_targets;
+ if (m->op == MMonSync::OP_GET_COOKIE_FULL) {
+ // full scan
+ sync_targets = get_sync_targets_names();
+ sp.last_committed = paxos->get_version();
+ sp.synchronizer = store->get_synchronizer(sp.last_key, sync_targets);
+ sp.full = true;
+ dout(10) << __func__ << " will sync prefixes " << sync_targets << dendl;
+ } else {
+ // just catch up paxos
+ sp.last_committed = m->last_committed;
+ }
+ dout(10) << __func__ << " will sync from version " << sp.last_committed << dendl;
+
+ MMonSync *reply = new MMonSync(MMonSync::OP_COOKIE, sp.cookie);
+ reply->last_committed = sp.last_committed;
+ messenger->send_message(reply, m->get_connection());
}
-void Monitor::handle_sync_chunk(MMonSync *m)
+void Monitor::handle_sync_get_chunk(MMonSync *m)
{
dout(10) << __func__ << " " << *m << dendl;
- entity_inst_t other = m->get_source_inst();
-
- if ((sync_role != SYNC_ROLE_REQUESTER)
- || (sync_state != SYNC_STATE_CHUNKS)
- || (sync_provider.get() == NULL)
- || (other != sync_provider->entity)) {
- dout(1) << __func__ << " stray message -- drop it." << dendl;
- m->put();
+ if (sync_providers.count(m->cookie) == 0) {
+ dout(10) << __func__ << " no cookie " << m->cookie << dendl;
+ _sync_reply_no_cookie(m);
return;
}
- assert(state == STATE_SYNCHRONIZING);
- assert(sync_role == SYNC_ROLE_REQUESTER);
- assert(sync_state == SYNC_STATE_CHUNKS);
-
- assert(sync_leader.get() != NULL);
+ assert(g_conf->mon_sync_provider_kill_at != 2);
- assert(sync_provider.get() != NULL);
- assert(other == sync_provider->entity);
+ SyncProvider& sp = sync_providers[m->cookie];
+ sp.reset_timeout(g_ceph_context, g_conf->mon_sync_timeout * 2);
- sync_provider->cancel_timeout();
+ if (sp.last_committed < paxos->get_first_committed()) {
+ dout(10) << __func__ << " sync requester fell behind paxos, their lc " << sp.last_committed
+ << " < our fc " << paxos->get_first_committed() << dendl;
+ sync_providers.erase(m->cookie);
+ _sync_reply_no_cookie(m);
+ return;
+ }
+ MMonSync *reply = new MMonSync(MMonSync::OP_CHUNK, sp.cookie);
MonitorDBStore::Transaction tx;
- tx.append_from_encoded(m->chunk_bl);
-
- sync_provider->set_timeout(new C_SyncTimeout(this, sync_provider->entity),
- g_conf->mon_sync_timeout);
- sync_provider->last_received_key = m->last_key;
- MMonSync *msg = new MMonSync(MMonSync::OP_CHUNK_REPLY);
-
- bool stop = false;
- if (m->flags & MMonSync::FLAG_LAST) {
- msg->flags |= MMonSync::FLAG_LAST;
- assert(m->version > 0);
- tx.put(paxos->get_name(), "last_committed", m->version);
- stop = true;
+ int left = g_conf->mon_sync_max_payload_size;
+ while (sp.last_committed < paxos->get_version() && left > 0) {
+ bufferlist bl;
+ sp.last_committed++;
+ store->get(paxos->get_name(), sp.last_committed, bl);
+ tx.put(paxos->get_name(), sp.last_committed, bl);
+ left -= bl.length();
+ dout(20) << __func__ << " including paxos state " << sp.last_committed << dendl;
}
- assert(g_conf->mon_sync_requester_kill_at != 8);
- messenger->send_message(msg, sync_provider->entity);
+ reply->last_committed = sp.last_committed;
- store->apply_transaction(tx);
+ if (sp.full && left > 0) {
+ sp.synchronizer->get_chunk_tx(tx, left);
+ sp.last_key = sp.synchronizer->get_last_key();
+ reply->last_key = sp.last_key;
+ }
- if (g_conf->mon_sync_debug && (m->flags & MMonSync::FLAG_CRC)) {
- dout(10) << __func__ << " checking CRC" << dendl;
- MonitorDBStore::Synchronizer sync;
- if (m->flags & MMonSync::FLAG_LAST) {
- dout(10) << __func__ << " checking CRC only for Paxos" << dendl;
- string paxos_name("paxos");
- sync = store->get_synchronizer(paxos_name);
- } else {
- dout(10) << __func__ << " checking CRC for all prefixes" << dendl;
- set<string> prefixes = get_sync_targets_names();
- pair<string,string> empty_key;
- sync = store->get_synchronizer(empty_key, prefixes);
- }
+ if ((sp.full && sp.synchronizer->has_next_chunk()) ||
+ sp.last_committed < paxos->get_version()) {
+ dout(10) << __func__ << " chunk, through version " << sp.last_committed << " key " << sp.last_key << dendl;
+ } else {
+ dout(10) << __func__ << " last chunk, through version " << sp.last_committed << " key " << sp.last_key << dendl;
+ reply->op = MMonSync::OP_LAST_CHUNK;
- while (sync->has_next_chunk()) {
- bufferlist bl;
- sync->get_chunk(bl);
- }
- __u32 got_crc = sync->crc();
- dout(10) << __func__ << " expected crc " << m->crc
- << " got " << got_crc << dendl;
+ assert(g_conf->mon_sync_provider_kill_at != 3);
- assert(m->crc == got_crc);
- dout(10) << __func__ << " CRC matches" << dendl;
+ // clean up our local state
+ sync_providers.erase(sp.cookie);
}
- m->put();
- if (stop)
- sync_stop();
-}
-
-void Monitor::sync_stop()
-{
- dout(10) << __func__ << dendl;
+ ::encode(tx, reply->chunk_bl);
- assert(sync_role == SYNC_ROLE_REQUESTER);
- assert(sync_state == SYNC_STATE_CHUNKS);
-
- sync_state = SYNC_STATE_STOP;
+ messenger->send_message(reply, m->get_connection());
+}
- sync_leader->cancel_timeout();
- sync_provider->cancel_timeout();
- sync_provider.reset();
+// requester
- entity_inst_t leader = sync_leader->entity;
+void Monitor::handle_sync_cookie(MMonSync *m)
+{
+ dout(10) << __func__ << " " << *m << dendl;
+ if (sync_cookie) {
+ dout(10) << __func__ << " already have a cookie, ignoring" << dendl;
+ return;
+ }
+ if (m->get_source_inst() != sync_provider) {
+ dout(10) << __func__ << " source does not match, discarding" << dendl;
+ return;
+ }
+ sync_cookie = m->cookie;
+ sync_start_version = m->last_committed;
- sync_leader->set_timeout(new C_SyncFinishReplyTimeout(this),
- g_conf->mon_sync_timeout);
+ sync_reset_timeout();
+ sync_get_next_chunk();
- MMonSync *msg = new MMonSync(MMonSync::OP_FINISH);
- assert(g_conf->mon_sync_requester_kill_at != 9);
- messenger->send_message(msg, leader);
- assert(g_conf->mon_sync_requester_kill_at != 10);
+ assert(g_conf->mon_sync_requester_kill_at != 3);
}
-void Monitor::sync_finish_reply_timeout()
+void Monitor::sync_get_next_chunk()
{
- dout(10) << __func__ << dendl;
- assert(state == STATE_SYNCHRONIZING);
- assert(sync_leader.get() != NULL);
- assert(sync_role == SYNC_ROLE_REQUESTER);
- assert(sync_state == SYNC_STATE_STOP);
+ dout(20) << __func__ << " cookie " << sync_cookie << " provider " << sync_provider << dendl;
+ if (g_conf->mon_inject_sync_get_chunk_delay > 0) {
+ dout(20) << __func__ << " injecting delay of " << g_conf->mon_inject_sync_get_chunk_delay << dendl;
+ usleep((long long)(g_conf->mon_inject_sync_get_chunk_delay * 1000000.0));
+ }
+ MMonSync *r = new MMonSync(MMonSync::OP_GET_CHUNK, sync_cookie);
+ messenger->send_message(r, sync_provider);
- sync_requester_abort();
+ assert(g_conf->mon_sync_requester_kill_at != 4);
}
-void Monitor::handle_sync_finish_reply(MMonSync *m)
+void Monitor::handle_sync_chunk(MMonSync *m)
{
dout(10) << __func__ << " " << *m << dendl;
- entity_inst_t other = m->get_source_inst();
- if ((sync_role != SYNC_ROLE_REQUESTER)
- || (sync_state != SYNC_STATE_STOP)
- || (sync_leader.get() == NULL)
- || (sync_leader->entity != other)) {
- dout(1) << __func__ << " stray message -- drop it." << dendl;
- m->put();
+ if (m->cookie != sync_cookie) {
+ dout(10) << __func__ << " cookie does not match, discarding" << dendl;
+ return;
+ }
+ if (m->get_source_inst() != sync_provider) {
+ dout(10) << __func__ << " source does not match, discarding" << dendl;
return;
}
- assert(sync_role == SYNC_ROLE_REQUESTER);
- assert(sync_state == SYNC_STATE_STOP);
+ assert(state == STATE_SYNCHRONIZING);
+ assert(g_conf->mon_sync_requester_kill_at != 5);
- assert(sync_leader.get() != NULL);
- assert(sync_leader->entity == other);
+ MonitorDBStore::Transaction tx;
+ tx.append_from_encoded(m->chunk_bl);
- sync_role = SYNC_ROLE_NONE;
- sync_state = SYNC_STATE_NONE;
+ dout(30) << __func__ << " tx dump:\n";
+ JSONFormatter f(true);
+ tx.dump(&f);
+ f.flush(*_dout);
+ *_dout << dendl;
- sync_leader->cancel_timeout();
- sync_leader.reset();
+ store->apply_transaction(tx);
- paxos->reapply_all_versions();
+ assert(g_conf->mon_sync_requester_kill_at != 6);
- sync_store_cleanup();
+ if (!sync_full) {
+ dout(10) << __func__ << " applying recent paxos transactions as we go" << dendl;
+ MonitorDBStore::Transaction tx;
+ paxos->read_and_prepare_transactions(&tx, paxos->get_version(), m->last_committed);
+ tx.put(paxos->get_name(), "last_committed", m->last_committed);
- init_paxos();
+ dout(30) << __func__ << " tx dump:\n";
+ JSONFormatter f(true);
+ tx.dump(&f);
+ f.flush(*_dout);
+ *_dout << dendl;
- assert(g_conf->mon_sync_requester_kill_at != 11);
+ store->apply_transaction(tx);
+ paxos->init(); // to refresh what we just wrote
+ }
- m->put();
+ if (m->op == MMonSync::OP_CHUNK) {
+ sync_reset_timeout();
+ sync_get_next_chunk();
+ } else if (m->op == MMonSync::OP_LAST_CHUNK) {
+ sync_finish(m->last_committed);
+ }
+}
+void Monitor::handle_sync_no_cookie(MMonSync *m)
+{
+ dout(10) << __func__ << dendl;
+ sync_reset();
bootstrap();
}
-void Monitor::handle_sync_abort(MMonSync *m)
+void Monitor::sync_trim_providers()
{
- dout(10) << __func__ << " " << *m << dendl;
- /* This function's responsabilities are manifold, and they depend on
- * who we (the monitor) are and what is our role in the sync.
- *
- * If we are the sync requester (i.e., if we are synchronizing), it
- * means that we *must* abort the current sync and bootstrap. This may
- * be required if there was a leader change and we are talking to the
- * wrong leader, which makes continuing with the current sync way too
- * risky, given that a Paxos trim may be underway and we certainly incur
- * in the chance of ending up with an inconsistent store state.
- *
- * If we are the sync provider, it means that the requester wants to
- * abort his sync, either because he lost connectivity to the leader
- * (i.e., his heartbeat timeout was triggered) or he became aware of a
- * leader change.
- *
- * As a leader, we should never receive such a message though, unless we
- * have just won an election, in which case we should have been a sync
- * provider before. In such a case, we should behave as if we were a sync
- * provider and clean up the requester's state.
- */
- entity_inst_t other = m->get_source_inst();
-
- if ((sync_role == SYNC_ROLE_REQUESTER)
- && (sync_leader.get() != NULL)
- && (sync_leader->entity == other)) {
-
- sync_requester_abort();
- } else if ((sync_role & SYNC_ROLE_PROVIDER)
- && (sync_entities.count(other) > 0)
- && (sync_entities_states[other] == SYNC_STATE_START)) {
+ dout(20) << __func__ << dendl;
- sync_provider_cleanup(other);
- } else {
- dout(1) << __func__ << " stray message -- drop it." << dendl;
+ utime_t now = ceph_clock_now(g_ceph_context);
+ map<uint64_t,SyncProvider>::iterator p = sync_providers.begin();
+ while (p != sync_providers.end()) {
+ if (now > p->second.timeout) {
+ dout(10) << __func__ << " expiring cookie " << p->second.cookie << " for " << p->second.entity << dendl;
+ sync_providers.erase(p++);
+ } else {
+ ++p;
+ }
}
- m->put();
}
-void Monitor::handle_sync(MMonSync *m)
-{
- dout(10) << __func__ << " " << *m << dendl;
- switch (m->op) {
- case MMonSync::OP_START:
- handle_sync_start(m);
- break;
- case MMonSync::OP_START_REPLY:
- handle_sync_start_reply(m);
- break;
- case MMonSync::OP_HEARTBEAT:
- handle_sync_heartbeat(m);
- break;
- case MMonSync::OP_HEARTBEAT_REPLY:
- handle_sync_heartbeat_reply(m);
- break;
- case MMonSync::OP_FINISH:
- handle_sync_finish(m);
- break;
- case MMonSync::OP_START_CHUNKS:
- handle_sync_start_chunks(m);
- break;
- case MMonSync::OP_CHUNK:
- handle_sync_chunk(m);
- break;
- case MMonSync::OP_CHUNK_REPLY:
- handle_sync_chunk_reply(m);
- break;
- case MMonSync::OP_FINISH_REPLY:
- handle_sync_finish_reply(m);
- break;
- case MMonSync::OP_ABORT:
- handle_sync_abort(m);
- break;
- default:
- dout(0) << __func__ << " unknown op " << m->op << dendl;
- m->put();
- assert(0 == "unknown op");
- break;
- }
-}
+// ---------------------------------------------------
+// probe
void Monitor::cancel_probe_timeout()
{
@@ -2001,27 +1317,47 @@ void Monitor::handle_probe_reply(MMonProbe *m)
assert(paxos != NULL);
if (is_synchronizing()) {
- dout(10) << " we are currently synchronizing, so that will continue."
- << dendl;
+ dout(10) << " currently syncing" << dendl;
m->put();
return;
}
entity_inst_t other = m->get_source_inst();
- // is there an existing quorum?
- if (m->quorum.size()) {
- dout(10) << " existing quorum " << m->quorum << dendl;
- if (paxos->get_version() < m->paxos_first_version) {
+ if (m->paxos_last_version < sync_last_committed_floor) {
+ dout(10) << " peer paxos versions [" << m->paxos_first_version
+ << "," << m->paxos_last_version << "] < my sync_last_committed_floor "
+ << sync_last_committed_floor << ", ignoring"
+ << dendl;
+ } else {
+ if (paxos->get_version() < m->paxos_first_version &&
+ m->paxos_first_version > 1) { // no need to sync if we're 0 and they start at 1.
dout(10) << " peer paxos versions [" << m->paxos_first_version
- << "," << m->paxos_last_version << "]"
+ << "," << m->paxos_last_version << "]"
<< " vs my version " << paxos->get_version()
<< " (too far ahead)"
<< dendl;
- sync_start(other);
+ cancel_probe_timeout();
+ sync_start(other, true);
m->put();
return;
}
+ if (paxos->get_version() + g_conf->paxos_max_join_drift < m->paxos_last_version) {
+ dout(10) << " peer paxos version " << m->paxos_last_version
+ << " vs my version " << paxos->get_version()
+ << " (too far ahead)"
+ << dendl;
+ cancel_probe_timeout();
+ sync_start(other, false);
+ m->put();
+ return;
+ }
+ }
+
+ // is there an existing quorum?
+ if (m->quorum.size()) {
+ dout(10) << " existing quorum " << m->quorum << dendl;
+
dout(10) << " peer paxos version " << m->paxos_last_version
<< " vs my version " << paxos->get_version()
<< " (ok)"
@@ -2046,16 +1382,6 @@ void Monitor::handle_probe_reply(MMonProbe *m)
return;
}
- if (paxos->get_version() + g_conf->paxos_max_join_drift < m->paxos_last_version) {
- dout(10) << " peer paxos version " << m->paxos_last_version
- << " vs my version " << paxos->get_version()
- << " (too far ahead)"
- << dendl;
- sync_start(other);
- m->put();
- return;
- }
-
unsigned need = monmap->size() / 2 + 1;
dout(10) << " outside_quorum now " << outside_quorum << ", need " << need << dendl;
if (outside_quorum.size() >= need) {
@@ -2066,7 +1392,7 @@ void Monitor::handle_probe_reply(MMonProbe *m)
dout(10) << " that's enough to form a new quorum, but it does not include me; waiting" << dendl;
}
} else {
- dout(10) << " that's not yet enough for a new quorum, waiting" << dendl;
+ dout(10) << " that's not yet enough for a new quorum, waiting" << dendl;
}
}
m->put();
@@ -2149,19 +1475,6 @@ void Monitor::lose_election(epoch_t epoch, set<int> &q, int l, uint64_t features
dout(10) << "lose_election, epoch " << epoch << " leader is mon" << leader
<< " quorum is " << quorum << " features are " << quorum_features << dendl;
- // let everyone currently syncing know that we are no longer the leader and
- // that they should all abort their on-going syncs
- for (map<entity_inst_t,Context*>::iterator iter = trim_timeouts.begin();
- iter != trim_timeouts.end();
- ++iter) {
- timer.cancel_event((*iter).second);
- entity_inst_t entity = (*iter).first;
- MMonSync *msg = new MMonSync(MMonSync::OP_ABORT);
- messenger->send_message(msg, entity);
- }
- trim_timeouts.clear();
- sync_role &= ~SYNC_ROLE_LEADER;
-
paxos->peon_init();
for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p)
(*p)->election_finished();
@@ -2216,79 +1529,7 @@ bool Monitor::_allowed_command(MonSession *s, map<string, cmd_vartype>& cmd)
return retval;
}
-void Monitor::_sync_status(ostream& ss)
-{
- JSONFormatter jf(true);
- jf.open_object_section("sync_status");
- jf.dump_string("state", get_state_name());
- jf.dump_unsigned("paxos_version", paxos->get_version());
-
- if (is_leader() || (sync_role == SYNC_ROLE_LEADER)) {
- Mutex::Locker l(trim_lock);
- jf.open_object_section("trim");
- jf.dump_int("disabled", paxos->is_trim_disabled());
- jf.dump_int("should_trim", paxos->should_trim());
- if (!trim_timeouts.empty()) {
- jf.open_array_section("mons");
- for (map<entity_inst_t,Context*>::iterator it = trim_timeouts.begin();
- it != trim_timeouts.end();
- ++it) {
- entity_inst_t e = (*it).first;
- jf.dump_stream("mon") << e;
- int s = -1;
- if (trim_entities_states.count(e))
- s = trim_entities_states[e];
- jf.dump_stream("sync_state") << get_sync_state_name(s);
- }
- }
- jf.close_section();
- }
-
- if (!sync_entities.empty() || (sync_role == SYNC_ROLE_PROVIDER)) {
- jf.open_array_section("on_going");
- for (map<entity_inst_t,SyncEntity>::iterator it = sync_entities.begin();
- it != sync_entities.end();
- ++it) {
- entity_inst_t e = (*it).first;
- jf.open_object_section("mon");
- jf.dump_stream("addr") << e;
- jf.dump_string("state", (*it).second->get_state());
- int s = -1;
- if (sync_entities_states.count(e))
- s = sync_entities_states[e];
- jf.dump_stream("sync_state") << get_sync_state_name(s);
-
- jf.close_section();
- }
- jf.close_section();
- }
-
- if (is_synchronizing() || (sync_role == SYNC_ROLE_REQUESTER)) {
- jf.open_object_section("leader");
- SyncEntity sync_entity = sync_leader;
- if (sync_entity.get() != NULL)
- jf.dump_stream("addr") << sync_entity->entity;
- jf.close_section();
-
- jf.open_object_section("provider");
- sync_entity = sync_provider;
- if (sync_entity.get() != NULL)
- jf.dump_stream("addr") << sync_entity->entity;
- jf.close_section();
- }
-
- if (g_conf->mon_sync_leader_kill_at > 0)
- jf.dump_int("leader_kill_at", g_conf->mon_sync_leader_kill_at);
- if (g_conf->mon_sync_provider_kill_at > 0)
- jf.dump_int("provider_kill_at", g_conf->mon_sync_provider_kill_at);
- if (g_conf->mon_sync_requester_kill_at > 0)
- jf.dump_int("requester_kill_at", g_conf->mon_sync_requester_kill_at);
-
- jf.close_section();
- jf.flush(ss);
-}
-
-void Monitor::_sync_force(ostream& ss)
+void Monitor::sync_force(ostream& ss)
{
MonitorDBStore::Transaction tx;
tx.put("mon_sync", "force_sync", 1);
@@ -2347,17 +1588,31 @@ void Monitor::_mon_status(ostream& ss)
jf.dump_stream("peer") << *p;
jf.close_section();
+ jf.open_array_section("sync_provider");
+ for (map<uint64_t,SyncProvider>::const_iterator p = sync_providers.begin();
+ p != sync_providers.end();
+ ++p) {
+ jf.dump_unsigned("cookie", p->second.cookie);
+ jf.dump_stream("entity") << p->second.entity;
+ jf.dump_stream("timeout") << p->second.timeout;
+ jf.dump_unsigned("last_committed", p->second.last_committed);
+ jf.dump_stream("last_key") << p->second.last_key;
+ }
+ jf.close_section();
+
if (is_synchronizing()) {
- if (sync_leader)
- jf.dump_stream("sync_leader") << sync_leader->entity;
- else
- jf.dump_string("sync_leader", "");
- if (sync_provider)
- jf.dump_stream("sync_provider") << sync_provider->entity;
- else
- jf.dump_string("sync_provider", "");
+ jf.open_object_section("sync");
+ jf.dump_stream("sync_provider") << sync_provider;
+ jf.dump_unsigned("sync_cookie", sync_cookie);
+ jf.dump_unsigned("sync_start_version", sync_start_version);
+ jf.close_section();
}
+ if (g_conf->mon_sync_provider_kill_at > 0)
+ jf.dump_int("provider_kill_at", g_conf->mon_sync_provider_kill_at);
+ if (g_conf->mon_sync_requester_kill_at > 0)
+ jf.dump_int("requester_kill_at", g_conf->mon_sync_requester_kill_at);
+
jf.open_object_section("monmap");
monmap->dump(&jf);
jf.close_section();
@@ -2843,16 +2098,6 @@ void Monitor::handle_command(MMonCommand *m)
rdata.append(ds);
rs = "";
r = 0;
- } else if (prefix == "sync status") {
- if (!access_r) {
- r = -EACCES;
- rs = "access denied";
- goto out;
- }
- _sync_status(ds);
- rdata.append(ds);
- rs = "";
- r = 0;
} else if (prefix == "sync force") {
string validate1, validate2;
cmd_getval(g_ceph_context, cmdmap, "validate1", validate1);
@@ -2865,7 +2110,7 @@ void Monitor::handle_command(MMonCommand *m)
"--i-know-what-i-am-doing' if you really do.";
goto out;
}
- _sync_force(ds);
+ sync_force(ds);
rs = ds.str();
r = 0;
} else if (prefix == "heap") {
@@ -4174,6 +3419,8 @@ void Monitor::tick()
}
}
+ sync_trim_providers();
+
if (!maybe_wait_for_quorum.empty()) {
finish_contexts(g_ceph_context, maybe_wait_for_quorum);
}
diff --git a/src/mon/Monitor.h b/src/mon/Monitor.h
index 004292b9778..cbc91529c21 100644
--- a/src/mon/Monitor.h
+++ b/src/mon/Monitor.h
@@ -54,7 +54,7 @@
#include <errno.h>
-#define CEPH_MON_PROTOCOL 12 /* cluster internal */
+#define CEPH_MON_PROTOCOL 13 /* cluster internal */
enum {
@@ -171,11 +171,8 @@ public:
default: return "???";
}
}
- const string get_state_name() const {
- string sn(get_state_name(state));
- string sync_name(get_sync_state_name());
- sn.append(sync_name);
- return sn;
+ const char *get_state_name() const {
+ return get_state_name(state);
}
bool is_shutdown() const { return state == STATE_SHUTDOWN; }
@@ -224,878 +221,165 @@ private:
* @{
*/
/**
- * Obtain the synchronization target prefixes in set form.
- *
- * We consider a target prefix all those that are relevant when
- * synchronizing two stores. That is, all those that hold paxos service's
- * versions, as well as paxos versions, or any control keys such as the
- * first or last committed version.
- *
- * Given the current design, this function should return the name of all and
- * any available paxos service, plus the paxos name.
- *
- * @returns a set of strings referring to the prefixes being synchronized
- */
- set<string> get_sync_targets_names();
- /**
- * Handle a sync-related message
- *
- * This function will call the appropriate handling functions for each
- * operation type.
- *
- * @param m A sync-related message (i.e., of type MMonSync)
- */
- void handle_sync(MMonSync *m);
- /**
- * Handle a sync-related message of operation type OP_ABORT.
- *
- * @param m A sync-related message of type OP_ABORT
- */
- void handle_sync_abort(MMonSync *m);
- /**
- * Reset the monitor's sync-related data structures and state.
- */
- void reset_sync(bool abort = false);
-
- /**
- * @defgroup Synchronization_Roles
- * @{
- */
- /**
- * The monitor has no role in any on-going synchronization.
- */
- static const uint8_t SYNC_ROLE_NONE = 0x0;
- /**
- * The monitor is the Leader in at least one synchronization.
- */
- static const uint8_t SYNC_ROLE_LEADER = 0x1;
- /**
- * The monitor is the Provider in at least one synchronization.
- */
- static const uint8_t SYNC_ROLE_PROVIDER = 0x2;
- /**
- * The monitor is a requester in the on-going synchronization.
- */
- static const uint8_t SYNC_ROLE_REQUESTER = 0x4;
-
- /**
- * The monitor's current role in on-going synchronizations, if any.
- *
- * A monitor can either be part of no synchronization at all, in which case
- * @p sync_role shall hold the value @p SYNC_ROLE_NONE, or it can be part of
- * an on-going synchronization, in which case it may be playing either one or
- * two roles at the same time:
- *
- * - If the monitor is the sync requester (i.e., be the one synchronizing
- * against some other monitor), the @p sync_role field will hold only the
- * @p SYNC_ROLE_REQUESTER value.
- * - Otherwise, the monitor can be either a sync leader, or a sync provider,
- * or both, in which case @p sync_role will hold a binary OR of both
- * @p SYNC_ROLE_LEADER and @p SYNC_ROLE_PROVIDER.
- */
- uint8_t sync_role;
- /**
- * @}
- */
- /**
- * @defgroup Leader-specific
- * @{
- */
- /**
- * Guarantee mutual exclusion access to the @p trim_timeouts map.
- *
- * We need this mutex specially when we have a monitor starting a sync with
- * the leader and another one finishing or aborting an on-going sync, that
- * happens to be the last on-going trim on the map. Given that we will
- * enable the Paxos trim once we deplete the @p trim_timeouts map, we must
- * then ensure that we either add the new sync start to the map before
- * removing the one just finishing, or that we remove the finishing one
- * first and enable the trim before we add the new one. If we fail to do
- * this, nasty repercussions could follow.
- */
- Mutex trim_lock;
- /**
- * Map holding all on-going syncs' timeouts.
- *
- * An on-going sync leads to the Paxos trim to be suspended, and this map
- * will associate entities to the timeouts to be triggered if the monitor
- * being synchronized fails to check-in with the leader, letting him know
- * that the sync is still in effect and that in no circumstances should the
- * Paxos trim be enabled.
- */
- map<entity_inst_t, Context*> trim_timeouts;
- map<entity_inst_t, uint8_t> trim_entities_states;
- /**
- * Map associating monitors to a sync state.
- *
- * This map is used by both the Leader and the Sync Provider, and has the
- * sole objective of keeping track of the state each monitor's sync process
- * is in.
- */
- map<entity_inst_t, uint8_t> sync_entities_states;
- /**
- * Timer that will enable the Paxos trim.
- *
- * This timer is set after the @p trim_timeouts map is depleted, and once
- * fired it will enable the Paxos trim (if still disabled). By setting
- * this timer, we avoid a scenario in which a monitor has just finished
- * synchronizing, but because the Paxos trim had been disabled for a long,
- * long time and a lot of trims were proposed in the timespan of the monitor
- * finishing its sync and actually joining the cluster, the monitor happens
- * to be out-of-sync yet again. Backing off enabling the Paxos trim will
- * allow the other monitor to join the cluster before actually trimming.
- */
- Context *trim_enable_timer;
-
- /**
- * Callback class responsible for finishing a monitor's sync session on the
- * leader's side, because the said monitor failed to acknowledge its
- * liveliness in a timely manner, thus being assumed as failed.
+ * @} // provider state
*/
- struct C_TrimTimeout : public Context {
- Monitor *mon;
- entity_inst_t entity;
-
- C_TrimTimeout(Monitor *m, entity_inst_t& entity)
- : mon(m), entity(entity) { }
- void finish(int r) {
- mon->sync_finish(entity);
- }
- };
+ struct SyncProvider {
+ entity_inst_t entity; ///< who
+ uint64_t cookie; ///< unique cookie for this sync attempt
+ utime_t timeout; ///< when we give up and expire this attempt
+ version_t last_committed; ///< last paxos version on peer
+ pair<string,string> last_key; ///< last key sent to (or on) peer
+ bool full; ///< full scan?
+ MonitorDBStore::Synchronizer synchronizer; ///< iterator
- /**
- * Callback class responsible for enabling the Paxos trim if there are no
- * more on-going syncs.
- */
- struct C_TrimEnable : public Context {
- Monitor *mon;
+ SyncProvider() : cookie(0), last_committed(0), full(false) {}
- C_TrimEnable(Monitor *m) : mon(m) { }
- void finish(int r) {
- Mutex::Locker(mon->trim_lock);
- // even if we are no longer the leader, we should re-enable trim if
- // we have disabled it in the past. It doesn't mean we are going to
- // do anything about it, but if we happen to become the leader
- // sometime down the future, we sure want to have the trim enabled.
- if (mon->trim_timeouts.empty())
- mon->paxos->trim_enable();
- mon->trim_enable_timer = NULL;
+ void reset_timeout(CephContext *cct, int grace) {
+ timeout = ceph_clock_now(cct);
+ timeout += grace;
}
};
- void sync_obtain_latest_monmap(bufferlist &bl);
- void sync_store_init();
- void sync_store_cleanup();
- bool is_sync_on_going();
+ map<uint64_t, SyncProvider> sync_providers; ///< cookie -> SyncProvider for those syncing from us
+ uint64_t sync_provider_count; ///< counter for issued cookies to keep them unique
/**
- * Send a heartbeat message to another entity.
- *
- * The sent message may be a heartbeat reply if the @p reply parameter is
- * set to true.
- *
- * This function is used both by the leader (always with @p reply = true),
- * and by the sync requester (always with @p reply = false).
- *
- * @param other The target monitor's entity instance.
- * @param reply Whether the message to be sent should be a heartbeat reply.
- */
- void sync_send_heartbeat(entity_inst_t &other, bool reply = false);
- /**
- * Handle a Sync Start request.
- *
- * Monitors wanting to synchronize with the cluster will have to first ask
- * the leader to do so. The only objective with this is so that the we can
- * gurantee that the leader won't trim the paxos state.
- *
- * The leader may not be the only one receiving this request. A sync provider
- * may also receive it when it is taken as the point of entry onto the
- * cluster. In this scenario, the provider must then forward this request to
- * the leader, if he know of one, or assume himself as the leader for this
- * sync purpose (this may happen if there is no formed quorum).
- *
- * @param m Sync message with operation type MMonSync::OP_START
- */
- void handle_sync_start(MMonSync *m);
- /**
- * Handle a Heartbeat sent by a sync requester.
- *
- * We use heartbeats as a way to guarantee that both the leader and the sync
- * requester are still alive. Receiving this message means that the requester
- * if still working on getting his store synchronized.
- *
- * @param m Sync message with operation type MMonSync::OP_HEARTBEAT
- */
- void handle_sync_heartbeat(MMonSync *m);
- /**
- * Handle a Sync Finish.
- *
- * A MMonSync::OP_FINISH is the way the sync requester has to inform the
- * leader that he finished synchronizing his store.
- *
- * @param m Sync message with operation type MMonSync::OP_FINISH
+ * @} // requester state
*/
- void handle_sync_finish(MMonSync *m);
+ entity_inst_t sync_provider; ///< who we are syncing from
+ uint64_t sync_cookie; ///< 0 if we are starting, non-zero otherwise
+ bool sync_full; ///< true if we are a full sync, false for recent catch-up
+ version_t sync_start_version; ///< last_committed at sync start
+ Context *sync_timeout_event; ///< timeout event
+
/**
- * Finish a given monitor's sync process on the leader's side.
+ * floor for sync source
*
- * This means cleaning up the state referring to the monitor whose sync has
- * finished (may it have been finished successfully, by receiving a message
- * with type MMonSync::OP_FINISH, or due to the assumption that the said
- * monitor failed).
+ * When we sync we forget about our old last_committed value which
+ * can be dangerous. For example, if we have a cluster of:
*
- * If we happen to know of no other monitor synchronizing, we may then enable
- * the paxos trim.
- *
- * @param entity Entity instance of the monitor whose sync we are considering
- * as finished.
- * @param abort If true, we consider this sync has finished due to an abort.
- */
- void sync_finish(entity_inst_t &entity, bool abort = false);
- /**
- * Abort a given monitor's sync process on the leader's side.
+ * mon.a: lc 100
+ * mon.b: lc 80
+ * mon.c: lc 100 (us)
*
- * This function is a wrapper for Monitor::sync_finish().
+ * If something forces us to sync (say, corruption, or manual
+ * intervention, or bug), we forget last_committed, and might abort.
+ * If mon.a happens to be down when we come back, we will see:
*
- * @param entity Entity instance of the monitor whose sync we are aborting.
- */
- void sync_finish_abort(entity_inst_t &entity) {
- sync_finish(entity, true);
- }
- /**
- * @} // Leader-specific
- */
- /**
- * @defgroup Synchronization Provider-specific
- * @{
- */
- /**
- * Represents a participant in a synchronization, along with its state.
+ * mon.b: lc 80
+ * mon.c: lc 0 (us)
*
- * This class is used to track down all the sync requesters we are providing
- * to. In such scenario, it won't be uncommon to have the @p synchronizer
- * field set with a connection to the MonitorDBStore, the @p timeout field
- * containing a timeout event and @p entity containing the entity instance
- * of the monitor we are here representing.
+ * and sync from mon.b, at which point a+b will both have lc 80 and
+ * come online with a majority holding out of date commits.
*
- * The sync requester will also use this class to represent both the sync
- * leader and the sync provider.
+ * Avoid this by preserving our old last_committed value prior to
+ * sync and never going backwards.
*/
- struct SyncEntityImpl {
-
- /**
- * Store synchronization related Sync state.
- */
- enum {
- /**
- * No state whatsoever. We are not providing any sync suppport.
- */
- STATE_NONE = 0,
- /**
- * This entity's sync effort is currently focused on reading and sharing
- * our whole store state with @p entity. This means all the entries in
- * the key/value space.
- */
- STATE_WHOLE = 1,
- /**
- * This entity's sync effor is currently focused on reading and sharing
- * our Paxos state with @p entity. This means all the Paxos-related
- * key/value entries, such as the Paxos versions.
- */
- STATE_PAXOS = 2
- };
-
- /**
- * The entity instace of the monitor whose sync effort we are representing.
- */
- entity_inst_t entity;
- /**
- * Our Monitor.
- */
- Monitor *mon;
- /**
- * The Paxos version we are focusing on.
- *
- * @note This is not used at the moment. We are still assessing whether we
- * need it.
- */
- version_t version;
- /**
- * Timeout event. Its type and purpose varies depending on the situation.
- */
- Context *timeout;
- /**
- * Last key received during a sync effort.
- *
- * This field is mainly used by the sync requester to track the last
- * received key, in case he needs to switch providers due to failure. The
- * sync provider will also use this field whenever the requester specifies
- * a last received key when requesting the provider to start sending his
- * store chunks.
- */
- pair<string,string> last_received_key;
- /**
- * Hold the Store Synchronization related Sync State.
- */
- int sync_state;
- /**
- * The MonitorDBStore's chunk iterator instance we are currently using
- * to obtain the store's chunks and pack them to the sync requester.
- */
- MonitorDBStore::Synchronizer synchronizer;
- MonitorDBStore::Synchronizer paxos_synchronizer;
- /* Should only be used for debugging purposes */
- /**
- * crc of the contents read from the store.
- *
- * @note may not always be available, as it is used only on specific
- * points in time during the sync process.
- * @note depends on '--mon-sync-debug' being set.
- */
- __u32 crc;
- /**
- * Should be true if @p crc has been set.
- */
- bool crc_available;
- /**
- * Total synchronization attempts.
- */
- int attempts;
-
- SyncEntityImpl(entity_inst_t &entity, Monitor *mon)
- : entity(entity),
- mon(mon),
- version(0),
- timeout(NULL),
- sync_state(STATE_NONE),
- crc(0),
- crc_available(false),
- attempts(0)
- { }
-
- /**
- * Obtain current Sync State name.
- *
- * @returns Name of current sync state.
- */
- string get_state() {
- switch (sync_state) {
- case STATE_NONE: return "none";
- case STATE_WHOLE: return "whole";
- case STATE_PAXOS: return "paxos";
- default: return "unknown";
- }
- }
- /**
- * Obtain the paxos version at which this sync started.
- *
- * @returns Paxos version at which this sync started
- */
- version_t get_version() {
- return version;
- }
- /**
- * Set a timeout event for this sync entity.
- *
- * @param event Timeout class to be called after @p fire_after seconds.
- * @param fire_after Number of seconds until we fire the @p event event.
- */
- void set_timeout(Context *event, double fire_after) {
- cancel_timeout();
- timeout = event;
- mon->timer.add_event_after(fire_after, timeout);
- }
- /**
- * Cancel the currently set timeout, if any.
- */
- void cancel_timeout() {
- if (timeout)
- mon->timer.cancel_event(timeout);
- timeout = NULL;
- }
- /**
- * Initiate the required fields for obtaining chunks out of the
- * MonitorDBStore.
- *
- * This function will initiate @p synchronizer with a chunk iterator whose
- * scope is all the keys/values that belong to one of the sync targets
- * (i.e., paxos services or paxos).
- *
- * Calling @p Monitor::sync_update() will be essential during the efforts
- * of providing a correct store state to the requester, since we will need
- * to eventually update the iterator in order to start packing the Paxos
- * versions.
- */
- void sync_init() {
- sync_state = STATE_WHOLE;
- set<string> sync_targets = mon->get_sync_targets_names();
-
- string prefix("paxos");
- paxos_synchronizer = mon->store->get_synchronizer(prefix);
- version = mon->paxos->get_version();
- generic_dout(10) << __func__ << " version " << version << dendl;
-
- synchronizer = mon->store->get_synchronizer(last_received_key,
- sync_targets);
- sync_update();
- assert(synchronizer->has_next_chunk());
- }
- /**
- * Update the @p synchronizer chunk iterator, if needed.
- *
- * Whenever we reach the end of the iterator during @p STATE_WHOLE, we
- * must update the @p synchronizer to an iterator focused on reading only
- * Paxos versions. This is an essential part of the sync store approach,
- * and it will guarantee that we end up with a consistent store.
- */
- void sync_update() {
- assert(sync_state != STATE_NONE);
- assert(synchronizer.use_count() != 0);
-
- if (!synchronizer->has_next_chunk()) {
- crc_set(synchronizer->crc());
- if (sync_state == STATE_WHOLE) {
- assert(paxos_synchronizer.use_count() != 0);
- sync_state = STATE_PAXOS;
- synchronizer = paxos_synchronizer;
- }
- }
- }
+ version_t sync_last_committed_floor;
- /* For debug purposes only */
- /**
- * Check if we have a CRC available.
- *
- * @returns true if crc is available; false otherwise.
- */
- bool has_crc() {
- return (g_conf->mon_sync_debug && crc_available);
- }
- /**
- * Set @p crc to @p to_set
- *
- * @param to_set a crc value to set.
- */
- void crc_set(__u32 to_set) {
- crc = to_set;
- crc_available = true;
- }
- /**
- * Get the current CRC value from @p crc
- *
- * @returns the currenct CRC value from @p crc
- */
- __u32 crc_get() {
- return crc;
- }
- /**
- * Clear the current CRC.
- */
- void crc_clear() {
- crc_available = false;
- }
- };
- typedef std::tr1::shared_ptr< SyncEntityImpl > SyncEntity;
- /**
- * Get a Monitor::SyncEntity instance.
- *
- * @param entity The monitor's entity instance that we want to associate
- * with this Monitor::SyncEntity.
- * @param mon The Monitor.
- *
- * @returns A Monitor::SyncEntity
- */
- SyncEntity get_sync_entity(entity_inst_t &entity, Monitor *mon) {
- return std::tr1::shared_ptr<SyncEntityImpl>(
- new SyncEntityImpl(entity, mon));
- }
- /**
- * Callback class responsible for dealing with the consequences of a sync
- * process timing out.
- */
struct C_SyncTimeout : public Context {
Monitor *mon;
- entity_inst_t entity;
-
- C_SyncTimeout(Monitor *mon, entity_inst_t &entity)
- : mon(mon), entity(entity)
- { }
-
+ C_SyncTimeout(Monitor *m) : mon(m) {}
void finish(int r) {
- mon->sync_timeout(entity);
+ mon->sync_timeout();
}
};
+
/**
- * Map containing all the monitor entities to whom we are acting as sync
- * providers.
- */
- map<entity_inst_t, SyncEntity> sync_entities;
- /**
- * RNG used for the sync (currently only used to pick random monitors)
- */
- SimpleRNG sync_rng;
- /**
- * Obtain random monitor from the monmap.
- *
- * @param other Any monitor other than the one with rank @p other
- * @returns The picked monitor's name.
- */
- int _pick_random_mon(int other = -1);
- int _pick_random_quorum_mon(int other = -1);
- /**
- * Deal with the consequences of @p entity's sync timing out.
- *
- * @note Both the sync provider and the sync requester make use of this
- * function, since both use the @p Monitor::C_SyncTimeout callback.
- *
- * Being the sync provider, whenever a Monitor::C_SyncTimeout is triggered,
- * we only have to clean up the sync requester's state we are maintaining.
- *
- * Being the sync requester, we will have to choose a new sync provider, and
- * resume our sync from where it was left.
- *
- * @param entity Entity instance of the monitor whose sync has timed out.
- */
- void sync_timeout(entity_inst_t &entity);
- /**
- * Cleanup the state we, the provider, are keeping during @p entity's sync.
- *
- * @param entity Entity instance of the monitor whose sync state we are
- * cleaning up.
- */
- void sync_provider_cleanup(entity_inst_t &entity);
- /**
- * Handle a Sync Start Chunks request from a sync requester.
- *
- * This request will create the necessary state our the provider's end, and
- * the provider will then be able to send chunks of his own store to the
- * requester.
- *
- * @param m Sync message with operation type MMonSync::OP_START_CHUNKS
- */
- void handle_sync_start_chunks(MMonSync *m);
- /**
- * Handle a requester's reply to the last chunk we sent him.
- *
- * We will only send a new chunk to the sync requester once he has acked the
- * reception of the last chunk we sent them.
- *
- * That's also how we will make sure that, on their end, they became aware
- * that there are no more chunks to send (since we shall tag a message with
- * MMonSync::FLAG_LAST when we are sending them the last chunk of all),
- * allowing us to clean up the requester's state.
- *
- * @param m Sync message with operation type MMonSync::OP_CHUNK_REPLY
- */
- void handle_sync_chunk_reply(MMonSync *m);
- /**
- * Send a chunk to the sync entity represented by @p sync.
+ * Obtain the synchronization target prefixes in set form.
*
- * This function will send the next chunk available on the synchronizer. If
- * it happens to be the last chunk, then the message shall be marked as
- * such using MMonSync::FLAG_LAST.
+ * We consider a target prefix all those that are relevant when
+ * synchronizing two stores. That is, all those that hold paxos service's
+ * versions, as well as paxos versions, or any control keys such as the
+ * first or last committed version.
*
- * @param sync A Monitor::SyncEntity representing a sync requester monitor.
- */
- void sync_send_chunks(SyncEntity sync);
- /**
- * @} // Synchronization Provider-specific
- */
- /**
- * @defgroup Synchronization Requester-specific
- * @{
- */
- /**
- * The state in which we (the sync leader, provider or requester) are in
- * regard to our sync process (if we are the requester) or any entity that
- * we may be leading or providing to.
- */
- enum {
- /**
- * We are not part of any synchronization effort, or it has not began yet.
- */
- SYNC_STATE_NONE = 0,
- /**
- * We have started our role in the synchronization.
- *
- * This state may have multiple meanings, depending on which entity is
- * employing it and within which context.
- *
- * For instance, the leader will consider a sync requester to enter
- * SYNC_STATE_START whenever it receives a MMonSync::OP_START from the
- * said requester. On the other hand, the provider will consider that the
- * requester enters this state after receiving a MMonSync::OP_START_CHUNKS.
- * The sync requester will enter this state as soon as it begins its sync
- * efforts.
- */
- SYNC_STATE_START = 1,
- /**
- * We are synchronizing chunks.
- *
- * This state is not used by the sync leader; only the sync requester and
- * the sync provider will.
- */
- SYNC_STATE_CHUNKS = 2,
- /**
- * We are stopping the sync effort.
- */
- SYNC_STATE_STOP = 3
- };
- /**
- * The current sync state.
+ * Given the current design, this function should return the name of all and
+ * any available paxos service, plus the paxos name.
*
- * This field is only used by the sync requester, being the only one that
- * will take this state as part of its global state. The sync leader and the
- * sync provider will only associate sync states to other entities (i.e., to
- * sync requesters), and those shall be kept in the @p sync_entities_states
- * map.
- */
- int sync_state;
- /**
- * Callback class responsible for dealing with the consequences of the sync
- * requester not receiving a MMonSync::OP_START_REPLY in a timely manner.
- */
- struct C_SyncStartTimeout : public Context {
- Monitor *mon;
-
- C_SyncStartTimeout(Monitor *mon)
- : mon(mon)
- { }
-
- void finish(int r) {
- mon->sync_start_reply_timeout();
- }
- };
- /**
- * Callback class responsible for retrying a Sync Start after a given
- * backoff period, whenever the Sync Leader flags a MMonSync::OP_START_REPLY
- * with the MMonSync::FLAG_RETRY flag.
+ * @returns a set of strings referring to the prefixes being synchronized
*/
- struct C_SyncStartRetry : public Context {
- Monitor *mon;
- entity_inst_t entity;
-
- C_SyncStartRetry(Monitor *mon, entity_inst_t &entity)
- : mon(mon), entity(entity)
- { }
+ set<string> get_sync_targets_names();
- void finish(int r) {
- mon->bootstrap();
- }
- };
/**
- * We use heartbeats to check if both the Leader and the Synchronization
- * Requester are both still alive, so we can determine if we should continue
- * with the synchronization process, granted that trim is disabled.
+ * Reset the monitor's sync-related data structures and state, both
+ * for the requester- and provider-side.
*/
- struct C_HeartbeatTimeout : public Context {
- Monitor *mon;
-
- C_HeartbeatTimeout(Monitor *mon)
- : mon(mon)
- { }
+ void sync_reset();
- void finish(int r) {
- mon->sync_requester_abort();
- }
- };
/**
- * Callback class responsible for sending a heartbeat message to the sync
- * leader. We use this callback to keep an assynchronous heartbeat with
- * the sync leader at predefined intervals.
+ * Caled when a sync attempt times out (requester-side)
*/
- struct C_HeartbeatInterval : public Context {
- Monitor *mon;
- entity_inst_t entity;
+ void sync_timeout();
- C_HeartbeatInterval(Monitor *mon, entity_inst_t &entity)
- : mon(mon), entity(entity)
- { }
-
- void finish(int r) {
- mon->sync_leader->set_timeout(new C_HeartbeatTimeout(mon),
- g_conf->mon_sync_heartbeat_timeout);
- mon->sync_send_heartbeat(entity);
- }
- };
/**
- * Callback class responsible for dealing with the consequences of never
- * receiving a reply to a MMonSync::OP_FINISH sent to the sync leader.
+ * Get the latest monmap for backup purposes during sync
*/
- struct C_SyncFinishReplyTimeout : public Context {
- Monitor *mon;
-
- C_SyncFinishReplyTimeout(Monitor *mon)
- : mon(mon)
- { }
+ void sync_obtain_latest_monmap(bufferlist &bl);
- void finish(int r) {
- mon->sync_finish_reply_timeout();
- }
- };
- /**
- * The entity we, the sync requester, consider to be our sync leader. If
- * there is a formed quorum, the @p sync_leader should represent the actual
- * cluster Leader; otherwise, it can be any monitor and will likely be the
- * same as @p sync_provider.
- */
- SyncEntity sync_leader;
- /**
- * The entity we, the sync requester, are synchronizing against. This entity
- * will be our source of store chunks, and we will ultimately obtain a store
- * state equal (or very similar, maybe off by a couple of versions) as their
- * own.
- */
- SyncEntity sync_provider;
- /**
- * Clean up the Sync Requester's state (both in-memory and in-store).
- */
- void sync_requester_cleanup();
- /**
- * Abort the current sync effort.
- *
- * This will be translated into a MMonSync::OP_ABORT sent to the sync leader
- * and to the sync provider, and ultimately it will also involve calling
- * @p Monitor::sync_requester_cleanup() to clean up our current sync state.
- */
- void sync_requester_abort();
- /**
- * Deal with a timeout while waiting for a MMonSync::OP_FINISH_REPLY.
- *
- * This will be assumed as a leader failure, and having been exposed to the
- * side-effects of a new Leader being elected, we have no other choice but
- * to abort our sync process and start fresh.
- */
- void sync_finish_reply_timeout();
- /**
- * Deal with a timeout while waiting for a MMonSync::OP_START_REPLY.
- *
- * This will be assumed as a leader failure. Since we didn't get to do
- * much work (as we haven't even started our sync), we will simply bootstrap
- * and start off fresh with a new sync leader.
- */
- void sync_start_reply_timeout();
/**
* Start the synchronization efforts.
*
* This function should be called whenever we find the need to synchronize
* our store state with the remaining cluster.
*
- * Starting the sync process means that we will have to request the cluster
- * Leader (if there is a formed quorum) to stop trimming the Paxos state and
- * allow us to start synchronizing with the sync provider we picked.
*
* @param entity An entity instance referring to the sync provider we picked.
+ * @param whether to sycn the full store, or just pull recent paxos commits
*/
- void sync_start(entity_inst_t &entity);
- /**
- * Request the provider to start sending the chunks of his store, in order
- * for us to obtain a consistent store state similar to the one shared by
- * the cluster.
- *
- * @param provider The SyncEntity representing the Sync Provider.
- */
- void sync_start_chunks(SyncEntity provider);
+ void sync_start(entity_inst_t &entity, bool full);
+
+public:
/**
- * Handle a MMonSync::OP_START_REPLY sent by the Sync Leader.
- *
- * Reception of this message may be twofold: if it was marked with the
- * MMonSync::FLAG_RETRY flag, we must backoff for a while and retry starting
- * the sync at a later time; otherwise, we have the green-light to request
- * the Sync Provider to start sharing his chunks with us.
- *
- * @param m Sync message with operation type MMonSync::OP_START_REPLY
+ * force a sync on next mon restart
*/
- void handle_sync_start_reply(MMonSync *m);
+ void sync_force(ostream& ss);
+
+private:
/**
- * Handle a Heartbeat reply sent by the Sync Leader.
+ * reset the sync timeout
*
- * We use heartbeats to keep the Sync Leader aware that we are keeping our
- * sync efforts alive. We also use them to make sure our Sync Leader is
- * still alive. If the Sync Leader fails, we will have to abort our on-going
- * sync, or we could incurr in an inconsistent store state due to a trim on
- * the Paxos state of the monitor provinding us with his store chunks.
- *
- * @param m Sync message with operation type MMonSync::OP_HEARTBEAT_REPLY
+ * This is used on the client to restart if things aren't progressing
*/
- void handle_sync_heartbeat_reply(MMonSync *m);
+ void sync_reset_timeout();
+
/**
- * Handle a chunk sent by the Sync Provider.
+ * trim stale sync provider state
*
- * We will receive the Sync Provider's store in chunks. These are encoded
- * in bufferlists containing a transaction that will be directly applied
- * onto our MonitorDBStore.
- *
- * Whenever we receive such a message, we must reply to the Sync Provider,
- * as a way of acknowledging the reception of its last chunk. If the message
- * is tagged with a MMonSync::FLAG_LAST, we can then consider we have
- * received all the chunks the Sync Provider had to offer, and finish our
- * sync efforts with the Sync Leader.
- *
- * @param m Sync message with operation type MMonSync::OP_CHUNK
+ * If someone is syncing from us and hasn't talked to us recently, expire their state.
*/
- void handle_sync_chunk(MMonSync *m);
+ void sync_trim_providers();
+
/**
- * Handle a reply sent by the Sync Leader to a MMonSync::OP_FINISH.
- *
- * As soon as we receive this message, we know we finally have a store state
- * consistent with the remaining cluster (give or take a couple of versions).
- * We may then bootstrap and attempt to join the other monitors in the
- * cluster.
+ * Complete a sync
*
- * @param m Sync message with operation type MMonSync::OP_FINISH_REPLY
- */
- void handle_sync_finish_reply(MMonSync *m);
- /**
- * Stop our synchronization effort by sending a MMonSync::OP_FINISH to the
- * Sync Leader.
+ * Finish up a sync after we've gotten all of the chunks.
*
- * Once we receive the last chunk from the Sync Provider, we are in
- * conditions of officially finishing our sync efforts. With that purpose in
- * mind, we must then send a MMonSync::OP_FINISH to the Leader, letting him
- * know that we no longer require the Paxos state to be preserved.
+ * @param last_committed final last_committed value from provider
*/
- void sync_stop();
+ void sync_finish(version_t last_committed);
+
/**
- * @} // Synchronization Requester-specific
+ * request the next chunk from the provider
*/
- const string get_sync_state_name(int s) const {
- switch (s) {
- case SYNC_STATE_NONE: return "none";
- case SYNC_STATE_START: return "start";
- case SYNC_STATE_CHUNKS: return "chunks";
- case SYNC_STATE_STOP: return "stop";
- }
- return "???";
- }
+ void sync_get_next_chunk();
+
/**
- * Obtain a string describing the current Sync State.
+ * handle sync message
*
- * @returns A string describing the current Sync State, if any, or an empty
- * string if no sync (or sync effort we know of) is in progress.
+ * @param m Sync message with operation type MMonSync::OP_START_CHUNKS
*/
- const string get_sync_state_name() const {
- string sn;
-
- if (sync_role == SYNC_ROLE_NONE)
- return "";
-
- sn.append(" sync(");
-
- if (sync_role & SYNC_ROLE_LEADER)
- sn.append(" leader");
- if (sync_role & SYNC_ROLE_PROVIDER)
- sn.append(" provider");
- if (sync_role & SYNC_ROLE_REQUESTER)
- sn.append(" requester");
+ void handle_sync(MMonSync *m);
- sn.append(" state ");
- sn.append(get_sync_state_name(sync_state));
+ void _sync_reply_no_cookie(MMonSync *m);
- sn.append(" )");
+ void handle_sync_get_cookie(MMonSync *m);
+ void handle_sync_get_chunk(MMonSync *m);
+ void handle_sync_finish(MMonSync *m);
- return sn;
- }
+ void handle_sync_cookie(MMonSync *m);
+ void handle_sync_forward(MMonSync *m);
+ void handle_sync_chunk(MMonSync *m);
+ void handle_sync_no_cookie(MMonSync *m);
/**
* @} // Synchronization
@@ -1288,8 +572,6 @@ public:
bool _allowed_command(MonSession *s, map<std::string, cmd_vartype>& cmd);
void _mon_status(ostream& ss);
void _quorum_status(ostream& ss);
- void _sync_status(ostream& ss);
- void _sync_force(ostream& ss);
void _add_bootstrap_peer_hint(string cmd, string args, ostream& ss);
void handle_command(class MMonCommand *m);
void handle_route(MRoute *m);
diff --git a/src/mon/MonitorDBStore.h b/src/mon/MonitorDBStore.h
index 8405dbed2af..60a648de19b 100644
--- a/src/mon/MonitorDBStore.h
+++ b/src/mon/MonitorDBStore.h
@@ -251,7 +251,8 @@ class MonitorDBStore
bool add_chunk_entry(Transaction &tx,
string &prefix,
string &key,
- bufferlist &value) {
+ bufferlist &value,
+ uint64_t max) {
Transaction tmp;
bufferlist tmp_bl;
tmp.put(prefix, key, value);
@@ -262,7 +263,7 @@ class MonitorDBStore
size_t len = tx_bl.length() + tmp_bl.length();
- if (!tx.empty() && (len > g_conf->mon_sync_max_payload_size)) {
+ if (!tx.empty() && (len > max)) {
return false;
}
@@ -279,7 +280,6 @@ class MonitorDBStore
return true;
}
- virtual void _get_chunk(Transaction &tx) = 0;
virtual bool _is_valid() = 0;
public:
@@ -294,12 +294,7 @@ class MonitorDBStore
virtual bool has_next_chunk() {
return !done && _is_valid();
}
- virtual void get_chunk(bufferlist &bl) {
- Transaction tx;
- _get_chunk(tx);
- if (!tx.empty())
- tx.encode(bl);
- }
+ virtual void get_chunk_tx(Transaction &tx, uint64_t max) = 0;
virtual pair<string,string> get_next_key() = 0;
};
typedef std::tr1::shared_ptr<StoreIteratorImpl> Synchronizer;
@@ -327,7 +322,7 @@ class MonitorDBStore
* differ from the one passed on to the function)
* @param last_key[out] Last key in the chunk
*/
- virtual void _get_chunk(Transaction &tx) {
+ virtual void get_chunk_tx(Transaction &tx, uint64_t max) {
assert(done == false);
assert(iter->valid() == true);
@@ -336,7 +331,7 @@ class MonitorDBStore
string key(iter->raw_key().second);
if (sync_prefixes.count(prefix)) {
bufferlist value = iter->value();
- if (!add_chunk_entry(tx, prefix, key, value))
+ if (!add_chunk_entry(tx, prefix, key, value, max))
return;
}
iter->next();
@@ -359,49 +354,6 @@ class MonitorDBStore
}
};
- class SinglePrefixStoreIteratorImpl : public StoreIteratorImpl {
- KeyValueDB::Iterator iter;
- string prefix;
-
- public:
- SinglePrefixStoreIteratorImpl(KeyValueDB::Iterator iter, string prefix)
- : StoreIteratorImpl(),
- iter(iter),
- prefix(prefix)
- { }
-
- virtual ~SinglePrefixStoreIteratorImpl() { }
-
- private:
- virtual void _get_chunk(Transaction &tx) {
- assert(done == false);
- assert(iter->valid() == true);
-
- while (iter->valid()) {
- string key(iter->key());
- bufferlist value = iter->value();
- if (!add_chunk_entry(tx, prefix, key, value))
- return;
- iter->next();
- }
- assert(iter->valid() == false);
- done = true;
- }
-
- virtual pair<string,string> get_next_key() {
- // this method is only used by scrub on the whole store
- // iterator. also, the single prefix iterator has been dropped
- // in later code. we leave this here only for the benefit of
- // backporting.
- assert(0 == "this should not get called");
- return make_pair(string(), string());
- }
-
- virtual bool _is_valid() {
- return iter->valid();
- }
- };
-
Synchronizer get_synchronizer(pair<string,string> &key,
set<string> &prefixes) {
KeyValueDB::WholeSpaceIterator iter;
@@ -417,18 +369,6 @@ class MonitorDBStore
);
}
- Synchronizer get_synchronizer(string &prefix) {
- assert(!prefix.empty());
-
- KeyValueDB::Iterator iter;
- iter = db->get_snapshot_iterator(prefix);
- iter->seek_to_first();
-
- return std::tr1::shared_ptr<StoreIteratorImpl>(
- new SinglePrefixStoreIteratorImpl(iter, prefix)
- );
- }
-
KeyValueDB::Iterator get_iterator(const string &prefix) {
assert(!prefix.empty());
KeyValueDB::Iterator iter = db->get_snapshot_iterator(prefix);
diff --git a/src/mon/PGMonitor.cc b/src/mon/PGMonitor.cc
index efe41e1cb74..6c9effccd6e 100644
--- a/src/mon/PGMonitor.cc
+++ b/src/mon/PGMonitor.cc
@@ -316,7 +316,7 @@ void PGMonitor::read_pgmap_meta()
{
dout(10) << __func__ << dendl;
- string prefix = "pgmap_meta";
+ string prefix = pgmap_meta_prefix;
version_t version = mon->store->get(prefix, "version");
epoch_t last_osdmap_epoch = mon->store->get(prefix, "last_osdmap_epoch");
@@ -358,7 +358,7 @@ void PGMonitor::read_pgmap_full()
{
read_pgmap_meta();
- string prefix = "pgmap_pg";
+ string prefix = pgmap_pg_prefix;
for (KeyValueDB::Iterator i = mon->store->get_iterator(prefix); i->valid(); i->next()) {
string key = i->key();
pg_t pgid;
@@ -371,7 +371,7 @@ void PGMonitor::read_pgmap_full()
dout(20) << " got " << pgid << dendl;
}
- prefix = "pgmap_osd";
+ prefix = pgmap_osd_prefix;
for (KeyValueDB::Iterator i = mon->store->get_iterator(prefix); i->valid(); i->next()) {
string key = i->key();
int osd = atoi(key.c_str());
@@ -403,7 +403,7 @@ void PGMonitor::apply_pgmap_delta(bufferlist& bl)
::decode(pgid, p);
dout(20) << " refreshing pg " << pgid << dendl;
bufferlist bl;
- int r = mon->store->get("pgmap_pg", stringify(pgid), bl);
+ int r = mon->store->get(pgmap_pg_prefix, stringify(pgid), bl);
if (r >= 0) {
pg_map.update_pg(pgid, bl);
} else {
@@ -418,7 +418,7 @@ void PGMonitor::apply_pgmap_delta(bufferlist& bl)
::decode(osd, p);
dout(20) << " refreshing osd." << osd << dendl;
bufferlist bl;
- int r = mon->store->get("pgmap_osd", stringify(osd), bl);
+ int r = mon->store->get(pgmap_osd_prefix, stringify(osd), bl);
if (r >= 0) {
pg_map.update_osd(osd, bl);
} else {
@@ -442,7 +442,7 @@ void PGMonitor::encode_pending(MonitorDBStore::Transaction *t)
uint64_t features = mon->get_quorum_features();
- string prefix = "pgmap_meta";
+ string prefix = pgmap_meta_prefix;
t->put(prefix, "version", pending_inc.version);
{
@@ -470,7 +470,7 @@ void PGMonitor::encode_pending(MonitorDBStore::Transaction *t)
::encode(pending_inc.stamp, incbl);
{
bufferlist dirty;
- string prefix = "pgmap_pg";
+ string prefix = pgmap_pg_prefix;
for (map<pg_t,pg_stat_t>::const_iterator p = pending_inc.pg_stat_updates.begin();
p != pending_inc.pg_stat_updates.end();
++p) {
@@ -487,7 +487,7 @@ void PGMonitor::encode_pending(MonitorDBStore::Transaction *t)
}
{
bufferlist dirty;
- string prefix = "pgmap_osd";
+ string prefix = pgmap_osd_prefix;
for (map<int32_t,osd_stat_t>::const_iterator p = pending_inc.osd_stat_updates.begin();
p != pending_inc.osd_stat_updates.end();
++p) {
diff --git a/src/mon/PGMonitor.h b/src/mon/PGMonitor.h
index e9b8f217255..271d0e1161d 100644
--- a/src/mon/PGMonitor.h
+++ b/src/mon/PGMonitor.h
@@ -53,6 +53,10 @@ public:
private:
PGMap::Incremental pending_inc;
+ const char *pgmap_meta_prefix;
+ const char *pgmap_pg_prefix;
+ const char *pgmap_osd_prefix;
+
void create_initial();
void update_from_paxos(bool *need_bootstrap);
void upgrade_format();
@@ -146,10 +150,20 @@ public:
PGMonitor(Monitor *mn, Paxos *p, const string& service_name)
: PaxosService(mn, p, service_name),
need_check_down_pgs(false),
- last_map_pg_create_osd_epoch(0)
+ last_map_pg_create_osd_epoch(0),
+ pgmap_meta_prefix("pgmap_meta"),
+ pgmap_pg_prefix("pgmap_pg"),
+ pgmap_osd_prefix("pgmap_osd")
{ }
~PGMonitor() { }
+ virtual void get_store_prefixes(set<string>& s) {
+ s.insert(get_service_name());
+ s.insert(pgmap_meta_prefix);
+ s.insert(pgmap_pg_prefix);
+ s.insert(pgmap_osd_prefix);
+ }
+
virtual void on_restart();
/* Courtesy function provided by PaxosService, called when an election
diff --git a/src/mon/Paxos.cc b/src/mon/Paxos.cc
index da00e400113..7779360ca33 100644
--- a/src/mon/Paxos.cc
+++ b/src/mon/Paxos.cc
@@ -43,36 +43,18 @@ MonitorDBStore *Paxos::get_store()
return mon->store;
}
-
-void Paxos::apply_version(MonitorDBStore::Transaction &tx, version_t v)
-{
- bufferlist bl;
- int err = get_store()->get(get_name(), v, bl);
- assert(err == 0);
- assert(bl.length());
- decode_append_transaction(tx, bl);
-}
-
-void Paxos::reapply_all_versions()
+void Paxos::read_and_prepare_transactions(MonitorDBStore::Transaction *tx, version_t first, version_t last)
{
- version_t first = get_store()->get(get_name(), "first_committed");
- version_t last = get_store()->get(get_name(), "last_committed");
dout(10) << __func__ << " first " << first << " last " << last << dendl;
-
- MonitorDBStore::Transaction tx;
for (version_t v = first; v <= last; ++v) {
dout(30) << __func__ << " apply version " << v << dendl;
- apply_version(tx, v);
+ bufferlist bl;
+ int err = get_store()->get(get_name(), v, bl);
+ assert(err == 0);
+ assert(bl.length());
+ decode_append_transaction(*tx, bl);
}
dout(15) << __func__ << " total versions " << (last-first) << dendl;
-
- dout(30) << __func__ << " tx dump:\n";
- JSONFormatter f(true);
- tx.dump(&f);
- f.flush(*_dout);
- *_dout << dendl;
-
- get_store()->apply_transaction(tx);
}
void Paxos::init()
@@ -88,6 +70,7 @@ void Paxos::init()
<< " first_committed: " << first_committed << dendl;
dout(10) << "init" << dendl;
+ assert(is_consistent());
}
// ---------------------------------
@@ -970,8 +953,8 @@ void Paxos::lease_renew_timeout()
void Paxos::trim()
{
assert(should_trim());
- version_t end = MIN(get_version() - g_conf->paxos_max_join_drift,
- get_first_committed() + g_conf->paxos_trim_max);
+ version_t end = MIN(get_version() - g_conf->paxos_min,
+ get_first_committed() + g_conf->paxos_trim_max);
if (first_committed >= end)
return;
@@ -1003,17 +986,6 @@ void Paxos::trim()
queue_proposal(bl, new C_Trimmed(this));
}
-void Paxos::trim_enable()
-{
- trim_disabled_version = 0;
- // We may not be the leader when we reach this function. We sure must
- // have been the leader at some point, but we may have been demoted and
- // we really should reset 'trim_disabled_version' if that was the case.
- // So, make sure we only trim() iff we are the leader.
- if (mon->is_leader() && should_trim())
- trim();
-}
-
/*
* return a globally unique, monotonically increasing proposal number
*/
@@ -1027,7 +999,7 @@ version_t Paxos::get_new_proposal_number(version_t gt)
last_pn++;
last_pn *= 100;
last_pn += (version_t)mon->rank;
-
+
// write
MonitorDBStore::Transaction t;
t.put(get_name(), "last_pn", last_pn);
diff --git a/src/mon/Paxos.h b/src/mon/Paxos.h
index 9be4713a282..1cdad50e5bb 100644
--- a/src/mon/Paxos.h
+++ b/src/mon/Paxos.h
@@ -530,12 +530,6 @@ private:
* trimming; false otherwise.
*/
bool trimming;
- /**
- * If we have disabled trimming our state, this variable should have a
- * value greater than zero, corresponding to the version we had at the time
- * we disabled the trim.
- */
- version_t trim_disabled_version;
/**
* @defgroup Paxos_h_callbacks Callback classes.
@@ -1013,8 +1007,7 @@ public:
lease_timeout_event(0),
accept_timeout_event(0),
clock_drift_warned(0),
- trimming(false),
- trim_disabled_version(0) { }
+ trimming(false) { }
const string get_name() const {
return paxos_name;
@@ -1022,8 +1015,7 @@ public:
void dispatch(PaxosServiceMessage *m);
- void reapply_all_versions();
- void apply_version(MonitorDBStore::Transaction &tx, version_t v);
+ void read_and_prepare_transactions(MonitorDBStore::Transaction *tx, version_t from, version_t last);
void init();
/**
@@ -1137,26 +1129,6 @@ public:
void trim();
/**
- * Disable trimming
- *
- * This is required by the Monitor's store synchronization mechanisms
- * to guarantee a consistent store state.
- */
- void trim_disable() {
- if (!trim_disabled_version)
- trim_disabled_version = get_version();
- }
- /**
- * Enable trimming
- */
- void trim_enable();
- /**
- * Check if trimming has been disabled
- *
- * @returns true if trim has been disabled; false otherwise.
- */
- bool is_trim_disabled() { return (trim_disabled_version > 0); }
- /**
* Check if we should trim.
*
* If trimming is disabled, we must take that into consideration and only
@@ -1165,18 +1137,12 @@ public:
* @returns true if we should trim; false otherwise.
*/
bool should_trim() {
- int available_versions = (get_version() - get_first_committed());
- int maximum_versions =
- (g_conf->paxos_max_join_drift + g_conf->paxos_trim_min);
+ int available_versions = get_version() - get_first_committed();
+ int maximum_versions = g_conf->paxos_min + g_conf->paxos_trim_min;
if (trimming || (available_versions <= maximum_versions))
return false;
- if (trim_disabled_version > 0) {
- int disabled_versions = (get_version() - trim_disabled_version);
- if (disabled_versions < g_conf->paxos_trim_disabled_max_versions)
- return false;
- }
return true;
}
diff --git a/src/mon/PaxosService.h b/src/mon/PaxosService.h
index 90b7e67c411..6c8d9c0bcc4 100644
--- a/src/mon/PaxosService.h
+++ b/src/mon/PaxosService.h
@@ -210,6 +210,13 @@ public:
* @returns The service's name.
*/
string get_service_name() { return service_name; }
+
+ /**
+ * Get the store prefixes we utilize
+ */
+ virtual void get_store_prefixes(set<string>& s) {
+ s.insert(service_name);
+ }
// i implement and you ignore
/**