diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/ceph_mon.cc | 20 | ||||
-rw-r--r-- | src/common/config_opts.h | 13 | ||||
-rw-r--r-- | src/messages/MMonSync.h | 184 | ||||
-rw-r--r-- | src/mon/Monitor.cc | 1461 | ||||
-rw-r--r-- | src/mon/Monitor.h | 908 | ||||
-rw-r--r-- | src/mon/MonitorDBStore.h | 72 | ||||
-rw-r--r-- | src/mon/PGMonitor.cc | 16 | ||||
-rw-r--r-- | src/mon/PGMonitor.h | 16 | ||||
-rw-r--r-- | src/mon/Paxos.cc | 48 | ||||
-rw-r--r-- | src/mon/Paxos.h | 42 | ||||
-rw-r--r-- | src/mon/PaxosService.h | 7 |
11 files changed, 558 insertions, 2229 deletions
diff --git a/src/ceph_mon.cc b/src/ceph_mon.cc index 7d8d5c4db29..b62443aa035 100644 --- a/src/ceph_mon.cc +++ b/src/ceph_mon.cc @@ -107,6 +107,8 @@ void usage() cerr << " debug monitor level (e.g. 10)\n"; cerr << " --mkfs\n"; cerr << " build fresh monitor fs\n"; + cerr << " --force-sync\n"; + cerr << " force a sync from another mon by wiping local data (BE CAREFUL)\n"; generic_server_usage(); } @@ -116,6 +118,8 @@ int main(int argc, const char **argv) bool mkfs = false; bool compact = false; + bool force_sync = false; + bool yes_really = false; std::string osdmapfn, inject_monmap, extract_monmap; vector<const char*> args; @@ -136,6 +140,10 @@ int main(int argc, const char **argv) mkfs = true; } else if (ceph_argparse_flag(args, i, "--compact", (char*)NULL)) { compact = true; + } else if (ceph_argparse_flag(args, i, "--force-sync", (char*)NULL)) { + force_sync = true; + } else if (ceph_argparse_flag(args, i, "--yes-i-really-mean-it", (char*)NULL)) { + yes_really = true; } else if (ceph_argparse_witharg(args, i, &val, "--osdmap", (char*)NULL)) { osdmapfn = val; } else if (ceph_argparse_witharg(args, i, &val, "--inject_monmap", (char*)NULL)) { @@ -151,6 +159,12 @@ int main(int argc, const char **argv) usage(); } + if (force_sync && !yes_really) { + cerr << "are you SURE you want to force a sync? this will erase local data and may\n" + << "break your mon cluster. pass --yes-i-really-mean-it if you do." << std::endl; + exit(1); + } + if (g_conf->mon_data.empty()) { cerr << "must specify '--mon-data=foo' data path" << std::endl; usage(); @@ -406,7 +420,6 @@ int main(int argc, const char **argv) } } - // this is what i will bind to entity_addr_t ipaddr; @@ -510,6 +523,11 @@ int main(int argc, const char **argv) mon = new Monitor(g_ceph_context, g_conf->name.get_id(), store, messenger, &monmap); + if (force_sync) { + derr << "flagging a forced sync ..." << dendl; + mon->sync_force(cerr); + } + err = mon->preinit(); if (err < 0) prefork.exit(1); diff --git a/src/common/config_opts.h b/src/common/config_opts.h index 2e386f9d732..01ec8375cd2 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -171,17 +171,13 @@ OPTION(mon_health_data_update_interval, OPT_FLOAT, 60.0) OPTION(mon_data_avail_crit, OPT_INT, 5) OPTION(mon_data_avail_warn, OPT_INT, 30) OPTION(mon_config_key_max_entry_size, OPT_INT, 4096) // max num bytes per config-key entry -OPTION(mon_sync_trim_timeout, OPT_DOUBLE, 30.0) -OPTION(mon_sync_heartbeat_timeout, OPT_DOUBLE, 30.0) -OPTION(mon_sync_heartbeat_interval, OPT_DOUBLE, 5.0) -OPTION(mon_sync_backoff_timeout, OPT_DOUBLE, 30.0) -OPTION(mon_sync_timeout, OPT_DOUBLE, 30.0) -OPTION(mon_sync_max_retries, OPT_INT, 5) +OPTION(mon_sync_timeout, OPT_DOUBLE, 60.0) OPTION(mon_sync_max_payload_size, OPT_U32, 1048576) // max size for a sync chunk payload (say, 1MB) OPTION(mon_sync_debug, OPT_BOOL, false) // enable sync-specific debug OPTION(mon_sync_debug_leader, OPT_INT, -1) // monitor to be used as the sync leader OPTION(mon_sync_debug_provider, OPT_INT, -1) // monitor to be used as the sync provider OPTION(mon_sync_debug_provider_fallback, OPT_INT, -1) // monitor to be used as fallback if sync provider fails +OPTION(mon_inject_sync_get_chunk_delay, OPT_DOUBLE, 0) // inject N second delay on each get_chunk request OPTION(mon_osd_min_down_reporters, OPT_INT, 1) // number of OSDs who need to report a down OSD for it to count OPTION(mon_osd_min_down_reports, OPT_INT, 3) // number of times a down OSD must be reported for it to count OPTION(mon_osd_force_trim_to, OPT_INT, 0) // force mon to trim maps to this point, regardless of min_last_epoch_clean (dangerous, use with care) @@ -190,8 +186,7 @@ OPTION(mon_osd_force_trim_to, OPT_INT, 0) // force mon to trim maps to this po OPTION(mon_debug_dump_transactions, OPT_BOOL, false) OPTION(mon_debug_dump_location, OPT_STR, "/var/log/ceph/$cluster-$name.tdump") -OPTION(mon_sync_leader_kill_at, OPT_INT, 0) // kill the sync leader at a specifc point in the work flow -OPTION(mon_sync_provider_kill_at, OPT_INT, 0) // kill the sync provider at a specific point in the work flow +OPTION(mon_sync_provider_kill_at, OPT_INT, 0) // kill the sync provider at a specific point in the work flow OPTION(mon_sync_requester_kill_at, OPT_INT, 0) // kill the sync requester at a specific point in the work flow OPTION(mon_leveldb_write_buffer_size, OPT_U64, 32*1024*1024) // monitor's leveldb write buffer size OPTION(mon_leveldb_cache_size, OPT_U64, 512*1024*1024) // monitor's leveldb cache size @@ -205,9 +200,9 @@ OPTION(paxos_stash_full_interval, OPT_INT, 25) // how often (in commits) to st OPTION(paxos_max_join_drift, OPT_INT, 10) // max paxos iterations before we must first sync the monitor stores OPTION(paxos_propose_interval, OPT_DOUBLE, 1.0) // gather updates for this long before proposing a map update OPTION(paxos_min_wait, OPT_DOUBLE, 0.05) // min time to gather updates for after period of inactivity +OPTION(paxos_min, OPT_INT, 500) // minimum number of paxos states to keep around OPTION(paxos_trim_min, OPT_INT, 250) // number of extra proposals tolerated before trimming OPTION(paxos_trim_max, OPT_INT, 500) // max number of extra proposals to trim at a time -OPTION(paxos_trim_disabled_max_versions, OPT_INT, 108000) // maximum amount of versions we shall allow passing by without trimming OPTION(paxos_service_trim_min, OPT_INT, 250) // minimum amount of versions to trigger a trim (0 disables it) OPTION(paxos_service_trim_max, OPT_INT, 500) // maximum amount of versions to trim during a single proposal (0 disables it) OPTION(clock_offset, OPT_DOUBLE, 0) // how much to offset the system clock in Clock.cc diff --git a/src/messages/MMonSync.h b/src/messages/MMonSync.h index 6183578e167..a5415a8f451 100644 --- a/src/messages/MMonSync.h +++ b/src/messages/MMonSync.h @@ -17,85 +17,24 @@ class MMonSync : public Message { - static const int HEAD_VERSION = 1; - static const int COMPAT_VERSION = 1; + static const int HEAD_VERSION = 2; + static const int COMPAT_VERSION = 2; public: /** * Operation types */ enum { - /** - * Start synchronization request - * (mon.X -> Leader) - */ - OP_START = 1, - /** - * Reply to an OP_START - * (Leader -> mon.X) - */ - OP_START_REPLY = 2, - /** - * Let the Leader know we are still synchronizing - * (mon.X -> Leader) - */ - OP_HEARTBEAT = 3, - /** - * Reply to a hearbeat - * (Leader -> mon.X) - */ - OP_HEARTBEAT_REPLY = 4, - /** - * Let the Leader know we finished synchronizing - * (mon.X -> Leader) - */ - OP_FINISH = 5, - /** - * Request a given monitor (mon.Y) to start synchronizing with us, hence - * sending us chunks. - * (mon.X -> mon.Y) - */ - OP_START_CHUNKS = 6, - /** - * Send a chunk to a given monitor (mon.X) - * (mon.Y -> mon.X) - */ - OP_CHUNK = 7, - /** - * Acknowledge that we received the last chunk sent - * (mon.X -> mon.Y) - */ - OP_CHUNK_REPLY = 8, - /** - * Reply to an OP_FINISH - * (Leader -> mon.X) - */ - OP_FINISH_REPLY = 9, - /** - * Let the receiver know that he should abort whatever he is in the middle - * of doing with the sender. - */ - OP_ABORT = 10, + OP_GET_COOKIE_FULL = 1, // -> start a session (full scan) + OP_GET_COOKIE_RECENT = 2, // -> start a session (only recent paxos events) + OP_COOKIE = 3, // <- pass the iterator cookie, or + OP_GET_CHUNK = 4, // -> get some keys + OP_CHUNK = 5, // <- return some keys + OP_LAST_CHUNK = 6, // <- return the last set of keys + OP_NO_COOKIE = 8, // <- sorry, no cookie }; /** - * Chunk is the last available - */ - const static uint8_t FLAG_LAST = 0x01; - /** - * Let the other monitor it should retry again its last operation. - */ - const static uint8_t FLAG_RETRY = 0x02; - /** - * This message contains a crc - */ - const static uint8_t FLAG_CRC = 0x04; - /** - * Do not reply to this message to the sender, but to @p reply_to. - */ - const static uint8_t FLAG_REPLY_TO = 0x08; - - /** * Obtain a string corresponding to the operation type @p op * * @param op Operation type @@ -103,119 +42,68 @@ public: */ static const char *get_opname(int op) { switch (op) { - case OP_START: return "start"; - case OP_START_REPLY: return "start_reply"; - case OP_HEARTBEAT: return "heartbeat"; - case OP_HEARTBEAT_REPLY: return "heartbeat_reply"; - case OP_FINISH: return "finish"; - case OP_FINISH_REPLY: return "finish_reply"; - case OP_START_CHUNKS: return "start_chunks"; + case OP_GET_COOKIE_FULL: return "get_cookie_full"; + case OP_GET_COOKIE_RECENT: return "get_cookie_recent"; + case OP_COOKIE: return "cookie"; + case OP_GET_CHUNK: return "get_chunk"; case OP_CHUNK: return "chunk"; - case OP_CHUNK_REPLY: return "chunk_reply"; - case OP_ABORT: return "abort"; + case OP_LAST_CHUNK: return "last_chunk"; + case OP_NO_COOKIE: return "no_cookie"; default: assert("unknown op type"); return NULL; } } uint32_t op; - uint8_t flags; - version_t version; - bufferlist chunk_bl; + uint64_t cookie; + version_t last_committed; pair<string,string> last_key; - __u32 crc; + bufferlist chunk_bl; entity_inst_t reply_to; MMonSync() : Message(MSG_MON_SYNC, HEAD_VERSION, COMPAT_VERSION) { } - MMonSync(uint32_t op) - : Message(MSG_MON_SYNC, HEAD_VERSION, COMPAT_VERSION), - op(op), flags(0), version(0), crc(0) - { } - - MMonSync(uint32_t op, bufferlist bl, uint8_t flags = 0) - : Message(MSG_MON_SYNC, HEAD_VERSION, COMPAT_VERSION), - op(op), flags(flags), version(0), chunk_bl(bl), crc(0) - { } - - MMonSync(MMonSync *m) + MMonSync(uint32_t op, uint64_t c = 0) : Message(MSG_MON_SYNC, HEAD_VERSION, COMPAT_VERSION), - op(m->op), flags(m->flags), version(m->version), - chunk_bl(m->chunk_bl), last_key(m->last_key), - crc(m->crc), reply_to(m->reply_to) + op(op), + cookie(c), + last_committed(0) { } - /** - * Obtain this message type's name */ const char *get_type_name() const { return "mon_sync"; } - void set_reply_to(entity_inst_t other) { - reply_to = other; - flags |= FLAG_REPLY_TO; - } - - /** - * Print this message in a pretty format to @p out - * - * @param out The output stream to output to - */ void print(ostream& out) const { - out << "mon_sync( " << get_opname(op); - - if (version > 0) - out << " v " << version; - - if (flags) { - out << " flags( "; - if (flags & FLAG_LAST) - out << "last "; - if (flags & FLAG_RETRY) - out << "retry "; - if (flags & FLAG_CRC) - out << "crc(" << crc << ") "; - if (flags & FLAG_REPLY_TO) - out << "reply-to(" << reply_to << ") "; - out << ")"; - } - + out << "mon_sync(" << get_opname(op); + if (cookie) + out << " cookie " << cookie; + if (last_committed > 0) + out << " lc " << last_committed; if (chunk_bl.length()) out << " bl " << chunk_bl.length() << " bytes"; - - if (!last_key.first.empty() || !last_key.second.empty()) { - out << " last_key ( " << last_key.first << "," - << last_key.second << " )"; - } - - out << " )"; + if (!last_key.first.empty() || !last_key.second.empty()) + out << " last_key " << last_key.first << "," << last_key.second; + out << ")"; } - /** - * Encode this message into the Message's payload - */ void encode_payload(uint64_t features) { ::encode(op, payload); - ::encode(flags, payload); - ::encode(version, payload); - ::encode(chunk_bl, payload); + ::encode(cookie, payload); + ::encode(last_committed, payload); ::encode(last_key.first, payload); ::encode(last_key.second, payload); - ::encode(crc, payload); + ::encode(chunk_bl, payload); ::encode(reply_to, payload); } - /** - * Decode the message's payload into this message - */ void decode_payload() { bufferlist::iterator p = payload.begin(); ::decode(op, p); - ::decode(flags, p); - ::decode(version, p); - ::decode(chunk_bl, p); + ::decode(cookie, p); + ::decode(last_committed, p); ::decode(last_key.first, p); ::decode(last_key.second, p); - ::decode(crc, p); + ::decode(chunk_bl, p); ::decode(reply_to, p); } }; diff --git a/src/mon/Monitor.cc b/src/mon/Monitor.cc index b2676ec4357..a78ad741ede 100644 --- a/src/mon/Monitor.cc +++ b/src/mon/Monitor.cc @@ -144,14 +144,13 @@ Monitor::Monitor(CephContext* cct_, string nm, MonitorDBStore *s, leader(0), quorum_features(0), - // trim & store sync - sync_role(SYNC_ROLE_NONE), - trim_lock("Monitor::trim_lock"), - trim_enable_timer(NULL), - sync_rng(getpid()), - sync_state(SYNC_STATE_NONE), - sync_leader(), - sync_provider(), + // sync state + sync_provider_count(0), + sync_cookie(0), + sync_full(false), + sync_start_version(0), + sync_timeout_event(NULL), + sync_last_committed_floor(0), timecheck_round(0), timecheck_acks(0), @@ -240,8 +239,6 @@ void Monitor::do_admin_command(string command, string args, ostream& ss) _mon_status(ss); else if (command == "quorum_status") _quorum_status(ss); - else if (command == "sync_status") - _sync_status(ss); else if (command == "sync_force") { if (args != "--yes-i-really-mean-it") { ss << "are you SURE? this will mean the monitor store will be erased " @@ -249,7 +246,7 @@ void Monitor::do_admin_command(string command, string args, ostream& ss) "'--yes-i-really-mean-it' if you really do."; return; } - _sync_force(ss); + sync_force(ss); } else if (command.find("add_bootstrap_peer_hint") == 0) _add_bootstrap_peer_hint(command, args, ss); else @@ -416,7 +413,7 @@ int Monitor::preinit() // We have a potentially inconsistent store state in hands. Get rid of it // and start fresh. bool clear_store = false; - if (is_sync_on_going()) { + if (store->exists("mon_sync", "in_sync")) { dout(1) << __func__ << " clean up potentially inconsistent store state" << dendl; clear_store = true; @@ -430,14 +427,12 @@ int Monitor::preinit() if (clear_store) { set<string> sync_prefixes = get_sync_targets_names(); store->clear(sync_prefixes); - - MonitorDBStore::Transaction t; - t.erase("mon_sync", "in_sync"); - t.erase("mon_sync", "force_sync"); - store->apply_transaction(t); } } + sync_last_committed_floor = store->get("mon_sync", "last_committed_floor"); + dout(10) << "sync_last_committed_floor " << sync_last_committed_floor << dendl; + init_paxos(); health_monitor->init(); @@ -484,9 +479,6 @@ int Monitor::preinit() r = admin_socket->register_command("quorum_status", "quorum_status", admin_hook, "show current quorum status"); assert(r == 0); - r = admin_socket->register_command("sync_status", "sync_status", admin_hook, - "show current synchronization status"); - assert(r == 0); r = admin_socket->register_command("add_bootstrap_peer_hint", "add_bootstrap_peer_hint name=addr,type=CephIPAddr", admin_hook, @@ -526,10 +518,7 @@ void Monitor::init_paxos() paxos_service[i]->init(); } - // update paxos - if (paxos->is_consistent()) { - refresh_from_paxos(NULL); - } + refresh_from_paxos(NULL); } void Monitor::refresh_from_paxos(bool *need_bootstrap) @@ -582,7 +571,6 @@ void Monitor::shutdown() AdminSocket* admin_socket = cct->get_admin_socket(); admin_socket->unregister_command("mon_status"); admin_socket->unregister_command("quorum_status"); - admin_socket->unregister_command("sync_status"); admin_socket->unregister_command("add_bootstrap_peer_hint"); delete admin_hook; admin_hook = NULL; @@ -625,6 +613,7 @@ void Monitor::bootstrap() { dout(10) << "bootstrap" << dendl; + sync_reset(); unregister_cluster_logger(); cancel_probe_timeout(); @@ -646,8 +635,6 @@ void Monitor::bootstrap() messenger->mark_down_all(); } - reset_sync(); - // reset state = STATE_PROBING; @@ -738,615 +725,25 @@ void Monitor::reset() health_monitor->finish(); } -set<string> Monitor::get_sync_targets_names() { + +// ----------------------------------------------------------- +// sync + +set<string> Monitor::get_sync_targets_names() +{ set<string> targets; targets.insert(paxos->get_name()); for (int i = 0; i < PAXOS_NUM; ++i) - targets.insert(paxos_service[i]->get_service_name()); + paxos_service[i]->get_store_prefixes(targets); return targets; } -/** - * Reset any lingering sync/trim informations we might have. - */ -void Monitor::reset_sync(bool abort) -{ - dout(10) << __func__ << dendl; - // clear everything trim/sync related - { - map<entity_inst_t,Context*>::iterator iter = trim_timeouts.begin(); - for (; iter != trim_timeouts.end(); ++iter) { - if (!iter->second) - continue; - - timer.cancel_event(iter->second); - if (abort) { - MMonSync *msg = new MMonSync(MMonSync::OP_ABORT); - entity_inst_t other = iter->first; - messenger->send_message(msg, other); - } - } - trim_timeouts.clear(); - } - { - map<entity_inst_t,SyncEntity>::iterator iter = sync_entities.begin(); - for (; iter != sync_entities.end(); ++iter) { - (*iter).second->cancel_timeout(); - } - sync_entities.clear(); - } - - sync_entities_states.clear(); - trim_entities_states.clear(); - - sync_leader.reset(); - sync_provider.reset(); - - sync_state = SYNC_STATE_NONE; - sync_role = SYNC_ROLE_NONE; -} - -// leader - -void Monitor::sync_send_heartbeat(entity_inst_t &other, bool reply) -{ - dout(10) << __func__ << " " << other << " reply(" << reply << ")" << dendl; - uint32_t op = (reply ? MMonSync::OP_HEARTBEAT_REPLY : MMonSync::OP_HEARTBEAT); - MMonSync *msg = new MMonSync(op); - messenger->send_message(msg, other); -} - -void Monitor::handle_sync_start(MMonSync *m) -{ - dout(10) << __func__ << " " << *m << dendl; - - /* If we are not the leader, then some monitor picked us as the point of - * entry to the quorum during its synchronization process. Therefore, we - * have an obligation of forwarding this message to leader, so the sender - * can start synchronizing. - */ - if (!is_leader() && !quorum.empty()) { - assert(!(sync_role & SYNC_ROLE_REQUESTER)); - assert(!(sync_role & SYNC_ROLE_LEADER)); - assert(!is_synchronizing()); - - entity_inst_t leader = monmap->get_inst(get_leader()); - MMonSync *msg = new MMonSync(m); - // keep forwarding the message up the chain if it has already been - // forwarded. - if (!(m->flags & MMonSync::FLAG_REPLY_TO)) { - msg->set_reply_to(m->get_source_inst()); - } - dout(10) << __func__ << " forward " << *m - << " to leader at " << leader << dendl; - assert(g_conf->mon_sync_provider_kill_at != 1); - messenger->send_message(msg, leader); - assert(g_conf->mon_sync_provider_kill_at != 2); - m->put(); - return; - } - - // If we are synchronizing, then it means that we know someone who has a - // higher version than the one we have; and if someone attempted to sync - // from us, then that must mean they have a lower version than us. - // Therefore, they must be much more interested in synchronizing from the - // one we are trying to synchronize from than they are from us. - // Moreover, if we are already synchronizing under the REQUESTER role, then - // we must know someone who is in the quorum, either because we were lucky - // enough to contact them in the first place or we managed to contact - // someone who knew who they were. Therefore, just forward this request to - // our sync leader. - if (is_synchronizing()) { - assert(!(sync_role & SYNC_ROLE_LEADER)); - assert(!(sync_role & SYNC_ROLE_PROVIDER)); - assert(quorum.empty()); - assert(sync_leader.get() != NULL); - - /** - * This looks a bit odd, but we've seen cases where sync start messages - * get bounced around and end up at the originator without anybody - * noticing!* If it happens, just drop the message and the timeouts - * will clean everything up -- eventually. - * [*] If a leader gets elected who is too far behind, he'll drop into - * bootstrap and sync, but the person he sends his sync to thinks he's - * still the leader and forwards the reply back. - */ - if (m->reply_to == messenger->get_myinst()) { - m->put(); - return; - } - - dout(10) << __func__ << " forward " << *m - << " to our sync leader at " - << sync_leader->entity << dendl; - - MMonSync *msg = new MMonSync(m); - // keep forwarding the message up the chain if it has already been - // forwarded. - if (!(m->flags & MMonSync::FLAG_REPLY_TO)) { - msg->set_reply_to(m->get_source_inst()); - } - messenger->send_message(msg, sync_leader->entity); - m->put(); - return; - } - - // At this point we may or may not be the leader. If we are not the leader, - // it means that we are not in the quorum, but someone would still very much - // like to synchronize with us. In certain circumstances, letting them - // synchronize with us is by far our only option -- for instance, when we - // need them to form a quorum and they have started fresh or are severely - // out of date --, but it won't hurt to let them sync from us anyway: if - // they chose us, then they must have noticed that we had a higher version - // than they do, so it makes sense to let them try their luck and join the - // party. - - Mutex::Locker l(trim_lock); - entity_inst_t other = - (m->flags & MMonSync::FLAG_REPLY_TO ? m->reply_to : m->get_source_inst()); - - assert(g_conf->mon_sync_leader_kill_at != 1); - - if (trim_timeouts.count(other) > 0) { - dout(1) << __func__ << " sync session already in progress for " << other - << dendl; - - if (trim_entities_states[other] != SYNC_STATE_NONE) { - dout(1) << __func__ << " ignore stray message" << dendl; - m->put(); - return; - } - - dout(1) << __func__<< " destroying current state and creating new" - << dendl; - - if (trim_timeouts[other]) - timer.cancel_event(trim_timeouts[other]); - trim_timeouts.erase(other); - trim_entities_states.erase(other); - } - - MMonSync *msg = new MMonSync(MMonSync::OP_START_REPLY); - - if ((!quorum.empty() && paxos->should_trim()) - || (trim_enable_timer != NULL)) { - msg->flags |= MMonSync::FLAG_RETRY; - } else { - trim_timeouts.insert(make_pair(other, new C_TrimTimeout(this, other))); - timer.add_event_after(g_conf->mon_sync_trim_timeout, trim_timeouts[other]); - - trim_entities_states[other] = SYNC_STATE_START; - sync_role |= SYNC_ROLE_LEADER; - - paxos->trim_disable(); - - // Is the one that contacted us in the quorum? - if (quorum.count(m->get_source().num())) { - // Was it forwarded by someone? - if (m->flags & MMonSync::FLAG_REPLY_TO) { - // Then they must not be synchronizing. What sense would that make, eh? - assert(trim_timeouts.count(m->get_source_inst()) == 0); - // Set the provider as the one that contacted us. He's in the - // quorum, so he's up to the job (i.e., he's not synchronizing) - msg->set_reply_to(m->get_source_inst()); - dout(10) << __func__ << " set provider to " << msg->reply_to << dendl; - } else { - // Then they must have gotten into the quorum, and boostrapped. - // We should tell them to abort and bootstrap ourselves. - msg->put(); - msg = new MMonSync(MMonSync::OP_ABORT); - messenger->send_message(msg, other); - m->put(); - bootstrap(); - return; - } - } else if (!quorum.empty()) { - // grab someone from the quorum and assign them as the sync provider - int n = _pick_random_quorum_mon(rank); - if (n >= 0) { - msg->set_reply_to(monmap->get_inst(n)); - dout(10) << __func__ << " set quorum-based provider to " - << msg->reply_to << dendl; - } else { - assert(0 == "We shouldn't get here!"); - } - } else { - // There is no quorum, so we must either be in a quorum-less cluster, - // or we must be mid-election. Either way, tell them it is okay to - // sync from us by not setting the reply-to field. - assert(!(msg->flags & MMonSync::FLAG_REPLY_TO)); - } - } - messenger->send_message(msg, other); - m->put(); - - assert(g_conf->mon_sync_leader_kill_at != 2); -} - -void Monitor::handle_sync_heartbeat(MMonSync *m) -{ - dout(10) << __func__ << " " << *m << dendl; - - entity_inst_t other = m->get_source_inst(); - if (!(sync_role & SYNC_ROLE_LEADER) - || !trim_entities_states.count(other) - || (trim_entities_states[other] != SYNC_STATE_START)) { - // stray message; ignore. - dout(1) << __func__ << " ignored stray message " << *m << dendl; - m->put(); - return; - } - - if (!is_leader() && !quorum.empty() - && (trim_timeouts.count(other) > 0)) { - // we must have been the leader before, but we lost leadership to - // someone else. - sync_finish_abort(other); - m->put(); - return; - } - - assert(trim_timeouts.count(other) > 0); - - if (trim_timeouts[other]) - timer.cancel_event(trim_timeouts[other]); - trim_timeouts[other] = new C_TrimTimeout(this, other); - timer.add_event_after(g_conf->mon_sync_trim_timeout, trim_timeouts[other]); - - assert(g_conf->mon_sync_leader_kill_at != 3); - sync_send_heartbeat(other, true); - assert(g_conf->mon_sync_leader_kill_at != 4); - - m->put(); -} - -void Monitor::sync_finish(entity_inst_t &entity, bool abort) -{ - dout(10) << __func__ << " entity(" << entity << ")" << dendl; - - Mutex::Locker l(trim_lock); - - if (!trim_timeouts.count(entity)) { - dout(1) << __func__ << " we know of no sync effort from " - << entity << " -- ignore it." << dendl; - return; - } - - if (trim_timeouts[entity] != NULL) - timer.cancel_event(trim_timeouts[entity]); - - trim_timeouts.erase(entity); - trim_entities_states.erase(entity); - - if (abort) { - MMonSync *m = new MMonSync(MMonSync::OP_ABORT); - assert(g_conf->mon_sync_leader_kill_at != 5); - messenger->send_message(m, entity); - assert(g_conf->mon_sync_leader_kill_at != 6); - } - - if (!trim_timeouts.empty()) - return; - - dout(10) << __func__ << " no longer a sync leader" << dendl; - sync_role &= ~SYNC_ROLE_LEADER; - - // we may have been the leader, but by now we may no longer be. - // this can happen when the we sync'ed a monitor that became the - // leader, or that same monitor simply came back to life and got - // elected as the new leader. - if (is_leader() && paxos->is_trim_disabled()) { - trim_enable_timer = new C_TrimEnable(this); - timer.add_event_after(30.0, trim_enable_timer); - } - - finish_contexts(g_ceph_context, maybe_wait_for_quorum); -} - -void Monitor::handle_sync_finish(MMonSync *m) -{ - dout(10) << __func__ << " " << *m << dendl; - - entity_inst_t other = m->get_source_inst(); - - if (!trim_timeouts.count(other) || !trim_entities_states.count(other) - || (trim_entities_states[other] != SYNC_STATE_START)) { - dout(1) << __func__ << " ignored stray message from " << other << dendl; - if (!trim_timeouts.count(other)) - dout(1) << __func__ << " not on trim_timeouts" << dendl; - if (!trim_entities_states.count(other)) - dout(1) << __func__ << " not on trim_entities_states" << dendl; - else if (trim_entities_states[other] != SYNC_STATE_START) - dout(1) << __func__ << " state " << trim_entities_states[other] << dendl; - m->put(); - return; - } - - // We may no longer the leader. In such case, we should just inform the - // other monitor that he should abort his sync. However, it appears that - // his sync has finished, so there is no use in scraping the whole thing - // now. Therefore, just go along and acknowledge. - if (!is_leader()) { - dout(10) << __func__ << " We are no longer the leader; reply nonetheless" - << dendl; - } - - MMonSync *msg = new MMonSync(MMonSync::OP_FINISH_REPLY); - assert(g_conf->mon_sync_leader_kill_at != 7); - messenger->send_message(msg, other); - assert(g_conf->mon_sync_leader_kill_at != 8); - - sync_finish(other); - m->put(); -} - -// end of leader - -// synchronization provider -int Monitor::_pick_random_mon(int other) -{ - assert(monmap->size() > 0); - if (monmap->size() == 1) - return 0; - - int max = monmap->size(); - if (other >= 0) - max--; - int n = sync_rng() % max; - if (other >= 0 && n >= other) - n++; - return n; -} - -int Monitor::_pick_random_quorum_mon(int other) -{ - assert(monmap->size() > 0); - if (quorum.empty()) - return -1; - set<int>::iterator p = quorum.begin(); - for (int n = sync_rng() % quorum.size(); p != quorum.end() && n; ++p, --n); - if (other >= 0 && p != quorum.end() && *p == other) - ++p; - - return (p == quorum.end() ? *(quorum.rbegin()) : *p); -} - -void Monitor::sync_timeout(entity_inst_t &entity) -{ - if (state == STATE_SYNCHRONIZING) { - assert(sync_role == SYNC_ROLE_REQUESTER); - assert(sync_state == SYNC_STATE_CHUNKS); - - // we are a sync requester; our provider just timed out, so find another - // monitor to synchronize with. - dout(1) << __func__ << " " << sync_provider->entity << dendl; - - sync_provider->attempts++; - if ((sync_provider->attempts > g_conf->mon_sync_max_retries) - || (monmap->size() == 2)) { - // We either tried too many times to sync, or there's just us and the - // monitor we were attempting to sync with. - // Therefore, just abort the whole sync and start off fresh whenever he - // (or somebody else) comes back. - sync_requester_abort(); - return; - } - - unsigned int i = 0; - int entity_rank = monmap->get_rank(entity.addr); - int debug_mon = g_conf->mon_sync_debug_provider; - int debug_fallback = g_conf->mon_sync_debug_provider_fallback; - - dout(10) << __func__ << " entity " << entity << " rank " << entity_rank << dendl; - dout(10) << __func__ << " our-rank " << rank << dendl; - - while ((i++) < 2*monmap->size()) { - // we are trying to pick a random monitor, but we cannot do this forever. - // in case something goes awfully wrong, just stop doing it after a - // couple of attempts and try again later. - int new_mon = _pick_random_mon(rank); - - if (debug_fallback >= 0) { - if (entity_rank != debug_fallback) - new_mon = debug_fallback; - else if (debug_mon >= 0 && (entity_rank != debug_mon)) - new_mon = debug_mon; - } - - dout(10) << __func__ << " randomly picking mon rank " << new_mon << dendl; - - if ((new_mon != rank) && (new_mon != entity_rank)) { - sync_provider->entity = monmap->get_inst(new_mon); - dout(10) << __func__ << " randomly choosing " << sync_provider->entity << " rank " << new_mon << dendl; - sync_state = SYNC_STATE_START; - sync_start_chunks(sync_provider); - return; - } - } - - // well that sucks. Let's see if we can find a monitor to connect to - for (int i = 0; i < (int)monmap->size(); ++i) { - entity_inst_t i_inst = monmap->get_inst(i); - if (i != rank && i_inst != entity) { - sync_provider->entity = i_inst; - sync_state = SYNC_STATE_START; - sync_start_chunks(sync_provider); - return; - } - } - - assert(0 == "Unable to find a new monitor to connect to. Not cool."); - } else if (sync_role & SYNC_ROLE_PROVIDER) { - dout(10) << __func__ << " cleanup " << entity << dendl; - sync_provider_cleanup(entity); - return; - } else - assert(0 == "We should never reach this"); -} - -void Monitor::sync_provider_cleanup(entity_inst_t &entity) -{ - dout(10) << __func__ << " " << entity << dendl; - if (sync_entities.count(entity) > 0) { - sync_entities[entity]->cancel_timeout(); - sync_entities.erase(entity); - sync_entities_states.erase(entity); - } - - if (sync_entities.empty()) { - dout(1) << __func__ << " no longer a sync provider" << dendl; - sync_role &= ~SYNC_ROLE_PROVIDER; - } -} - -void Monitor::handle_sync_start_chunks(MMonSync *m) -{ - dout(10) << __func__ << " " << *m << dendl; - assert(!(sync_role & SYNC_ROLE_REQUESTER)); - - entity_inst_t other = m->get_source_inst(); - - // if we have a sync going on for this entity, just drop the message. If it - // was a stray message, we did the right thing. If it wasn't, then it means - // that we still have an old state of this entity, and that the said entity - // failed in the meantime and is now up again; therefore, just let the - // timeout timers fulfill their purpose and deal with state cleanup when - // they are triggered. Until then, no Sir, we won't accept your messages. - if (sync_entities.count(other) > 0) { - dout(1) << __func__ << " sync session already in progress for " << other - << " -- assumed as stray message." << dendl; - m->put(); - return; - } - - SyncEntity sync = get_sync_entity(other, this); - sync->version = paxos->get_version(); - - if (!m->last_key.first.empty() && !m->last_key.second.empty()) { - sync->last_received_key = m->last_key; - dout(10) << __func__ << " set last received key to (" - << sync->last_received_key.first << "," - << sync->last_received_key.second << ")" << dendl; - } - - sync->sync_init(); - - sync_entities.insert(make_pair(other, sync)); - sync_entities_states[other] = SYNC_STATE_START; - sync_role |= SYNC_ROLE_PROVIDER; - - sync_send_chunks(sync); - m->put(); -} - -void Monitor::handle_sync_chunk_reply(MMonSync *m) -{ - dout(10) << __func__ << " " << *m << dendl; - - entity_inst_t other = m->get_source_inst(); - - if (!(sync_role & SYNC_ROLE_PROVIDER) - || !sync_entities.count(other) - || (sync_entities_states[other] != SYNC_STATE_START)) { - dout(1) << __func__ << " ignored stray message from " << other << dendl; - m->put(); - return; - } - - if (m->flags & MMonSync::FLAG_LAST) { - // they acked the last chunk. Clean up. - sync_provider_cleanup(other); - m->put(); - return; - } - - sync_send_chunks(sync_entities[other]); - m->put(); -} - -void Monitor::sync_send_chunks(SyncEntity sync) -{ - dout(10) << __func__ << " entity(" << sync->entity << ")" << dendl; - - sync->cancel_timeout(); - - assert(sync->synchronizer.use_count() > 0); - assert(sync->synchronizer->has_next_chunk()); - - MMonSync *msg = new MMonSync(MMonSync::OP_CHUNK); - - sync->synchronizer->get_chunk(msg->chunk_bl); - msg->last_key = sync->synchronizer->get_last_key(); - dout(10) << __func__ << " last key (" - << msg->last_key.first << "," - << msg->last_key.second << ")" << dendl; - - sync->sync_update(); - - if (sync->has_crc()) { - msg->flags |= MMonSync::FLAG_CRC; - msg->crc = sync->crc_get(); - sync->crc_clear(); - } - - if (!sync->synchronizer->has_next_chunk()) { - msg->flags |= MMonSync::FLAG_LAST; - msg->version = sync->get_version(); - sync->synchronizer.reset(); - } - - sync->set_timeout(new C_SyncTimeout(this, sync->entity), - g_conf->mon_sync_timeout); - assert(g_conf->mon_sync_provider_kill_at != 3); - messenger->send_message(msg, sync->entity); - assert(g_conf->mon_sync_provider_kill_at != 4); - - // kill the monitor as soon as we move into synchronizing the paxos versions. - // This is intended as debug. - if (sync->sync_state == SyncEntityImpl::STATE_PAXOS) - assert(g_conf->mon_sync_provider_kill_at != 5); - - -} -// end of synchronization provider - -// start of synchronization requester - -void Monitor::sync_requester_abort() +void Monitor::sync_timeout() { dout(10) << __func__ << dendl; assert(state == STATE_SYNCHRONIZING); - assert(sync_role == SYNC_ROLE_REQUESTER); - - if (sync_leader.get() != NULL) { - dout(10) << __func__ << " leader " << sync_leader->entity << dendl; - sync_leader->cancel_timeout(); - sync_leader.reset(); - } - - if (sync_provider.get() != NULL) { - dout(10) << __func__ << " provider " << sync_provider->entity << dendl; - sync_provider->cancel_timeout(); - - MMonSync *msg = new MMonSync(MMonSync::OP_ABORT); - messenger->send_message(msg, sync_provider->entity); - - sync_provider.reset(); - } - - // Given that we are explicitely aborting the whole sync process, we should - // play it safe and clear the store. - set<string> targets = get_sync_targets_names(); - store->clear(targets); - - dout(1) << __func__ << " no longer a sync requester" << dendl; - sync_role = SYNC_ROLE_NONE; - sync_state = SYNC_STATE_NONE; - - state = 0; - bootstrap(); } @@ -1400,473 +797,392 @@ void Monitor::sync_obtain_latest_monmap(bufferlist &bl) latest_monmap.encode(bl, CEPH_FEATURES_ALL); } -/** - * - */ -void Monitor::sync_store_init() +void Monitor::sync_reset() { - MonitorDBStore::Transaction t; - t.put("mon_sync", "in_sync", 1); - - bufferlist backup_monmap; - sync_obtain_latest_monmap(backup_monmap); - assert(backup_monmap.length() > 0); - - t.put("mon_sync", "latest_monmap", backup_monmap); - - store->apply_transaction(t); -} + if (sync_timeout_event) { + timer.cancel_event(sync_timeout_event); + sync_timeout_event = NULL; + } -void Monitor::sync_store_cleanup() -{ - MonitorDBStore::Transaction t; - t.erase("mon_sync", "in_sync"); - t.erase("mon_sync", "latest_monmap"); - store->apply_transaction(t); -} + // leader state + sync_providers.clear(); -bool Monitor::is_sync_on_going() -{ - return store->exists("mon_sync", "in_sync"); + // requester state + sync_provider = entity_inst_t(); + sync_cookie = 0; + sync_full = false; + sync_start_version = 0; } /** - * Start Sync process + * Start sync process * - * Create SyncEntity instances for the leader and the provider; - * Send OP_START message to the leader; - * Set trim timeout on the leader + * Start pulling committed state from another monitor. * * @param other Synchronization provider to-be. + * @param whether to do a full sync or just catch up on recent paxos */ -void Monitor::sync_start(entity_inst_t &other) +void Monitor::sync_start(entity_inst_t &other, bool full) { - cancel_probe_timeout(); + dout(10) << __func__ << " " << other << (full ? " full" : " recent") << dendl; - dout(10) << __func__ << " entity( " << other << " )" << dendl; - if ((state == STATE_SYNCHRONIZING) && (sync_role == SYNC_ROLE_REQUESTER)) { - dout(1) << __func__ << " already synchronizing; drop it" << dendl; - return; - } + assert(state == STATE_PROBING || + state == STATE_SYNCHRONIZING); + state = STATE_SYNCHRONIZING; - // Looks like we are the acting leader for someone. Better force them to - // abort their endeavours. After all, if they are trying to sync from us, - // it means that we must have a higher paxos version than the one they - // have; however, if we are trying to sync as well, it must mean that - // someone has a higher version than the one we have. Everybody wins if - // we force them to cancel their sync and try again. - if (sync_role & SYNC_ROLE_LEADER) { - dout(10) << __func__ << " we are acting as a leader to someone; " - << "destroy their dreams" << dendl; + // make sure are not a provider for anyone! + sync_reset(); - assert(!trim_timeouts.empty()); - reset_sync(); - } + sync_full = full; - assert(sync_role == SYNC_ROLE_NONE); - assert(sync_state == SYNC_STATE_NONE); + if (sync_full) { + // mark that we are syncing + MonitorDBStore::Transaction t; - state = STATE_SYNCHRONIZING; - sync_role = SYNC_ROLE_REQUESTER; - sync_state = SYNC_STATE_START; + bufferlist backup_monmap; + sync_obtain_latest_monmap(backup_monmap); + assert(backup_monmap.length() > 0); - // First init the store (grab the monmap and all that) and only then - // clear the store (except for the mon_sync prefix). This avoids that - // we end up losing the monmaps from the store. - sync_store_init(); + sync_last_committed_floor = MAX(sync_last_committed_floor, paxos->get_version()); + dout(10) << __func__ << " marking sync in progress, storing sync_last_commited_floor " + << sync_last_committed_floor << dendl; - // clear the underlying store, since we are starting a whole - // sync process from the bare beginning. - set<string> targets = get_sync_targets_names(); - store->clear(targets); + t.put("mon_sync", "latest_monmap", backup_monmap); + t.put("mon_sync", "in_sync", 1); + t.put("mon_sync", "last_committed_floor", sync_last_committed_floor); + store->apply_transaction(t); + assert(g_conf->mon_sync_requester_kill_at != 1); - // assume 'other' as the leader. We will update the leader once we receive - // a reply to the sync start. - entity_inst_t leader = other; - entity_inst_t provider = other; + // clear the underlying store + set<string> targets = get_sync_targets_names(); + dout(10) << __func__ << " clearing prefixes " << targets << dendl; + store->clear(targets); - if (g_conf->mon_sync_debug_leader >= 0) { - leader = monmap->get_inst(g_conf->mon_sync_debug_leader); - dout(10) << __func__ << " assuming " << leader - << " as the leader for debug" << dendl; + assert(g_conf->mon_sync_requester_kill_at != 2); } - if (g_conf->mon_sync_debug_provider >= 0) { - provider = monmap->get_inst(g_conf->mon_sync_debug_provider); - dout(10) << __func__ << " assuming " << provider - << " as the provider for debug" << dendl; - } + // assume 'other' as the leader. We will update the leader once we receive + // a reply to the sync start. + sync_provider = other; - sync_leader = get_sync_entity(leader, this); - sync_provider = get_sync_entity(provider, this); + sync_reset_timeout(); - // this message may bounce through 'other' (if 'other' is not the leader) - // in order to reach the leader. Therefore, set a higher timeout to allow - // breathing room for the reply message to reach us. - sync_leader->set_timeout(new C_SyncStartTimeout(this), - g_conf->mon_sync_trim_timeout*2); + MMonSync *m = new MMonSync(sync_full ? MMonSync::OP_GET_COOKIE_FULL : MMonSync::OP_GET_COOKIE_RECENT); + if (!sync_full) + m->last_committed = paxos->get_version(); + messenger->send_message(m, sync_provider); +} - MMonSync *m = new MMonSync(MMonSync::OP_START); - messenger->send_message(m, other); - assert(g_conf->mon_sync_requester_kill_at != 1); +void Monitor::sync_reset_timeout() +{ + dout(10) << __func__ << dendl; + if (sync_timeout_event) + timer.cancel_event(sync_timeout_event); + sync_timeout_event = new C_SyncTimeout(this); + timer.add_event_after(g_conf->mon_sync_timeout, sync_timeout_event); } -void Monitor::sync_start_chunks(SyncEntity provider) +void Monitor::sync_finish(version_t last_committed) { - dout(10) << __func__ << " provider(" << provider->entity << ")" << dendl; + dout(10) << __func__ << " lc " << last_committed << " from " << sync_provider << dendl; - assert(sync_role == SYNC_ROLE_REQUESTER); - assert(sync_state == SYNC_STATE_START); + assert(g_conf->mon_sync_requester_kill_at != 7); - sync_state = SYNC_STATE_CHUNKS; + if (sync_full) { + // finalize the paxos commits + MonitorDBStore::Transaction tx; + paxos->read_and_prepare_transactions(&tx, sync_start_version, last_committed); + tx.put(paxos->get_name(), "last_committed", last_committed); - provider->set_timeout(new C_SyncTimeout(this, provider->entity), - g_conf->mon_sync_timeout); - MMonSync *msg = new MMonSync(MMonSync::OP_START_CHUNKS); - pair<string,string> last_key = provider->last_received_key; - if (!last_key.first.empty() && !last_key.second.empty()) - msg->last_key = last_key; + dout(30) << __func__ << " final tx dump:\n"; + JSONFormatter f(true); + tx.dump(&f); + f.flush(*_dout); + *_dout << dendl; - assert(g_conf->mon_sync_requester_kill_at != 4); - messenger->send_message(msg, provider->entity); - assert(g_conf->mon_sync_requester_kill_at != 5); -} + store->apply_transaction(tx); + } -void Monitor::sync_start_reply_timeout() -{ - dout(10) << __func__ << dendl; + assert(g_conf->mon_sync_requester_kill_at != 8); - assert(state == STATE_SYNCHRONIZING); - assert(sync_role == SYNC_ROLE_REQUESTER); - assert(sync_state == SYNC_STATE_START); - - // Restart the sync attempt. It's not as if we were going to lose a vast - // amount of work, and if we take into account that we are timing out while - // waiting for a reply from the Leader, it sure seems like the right path - // to take. - sync_requester_abort(); + MonitorDBStore::Transaction t; + t.erase("mon_sync", "in_sync"); + t.erase("mon_sync", "force_sync"); + t.erase("mon_sync", "last_committed_floor"); + store->apply_transaction(t); + + sync_reset(); + + assert(g_conf->mon_sync_requester_kill_at != 9); + + init_paxos(); + + assert(g_conf->mon_sync_requester_kill_at != 10); + + bootstrap(); } -void Monitor::handle_sync_start_reply(MMonSync *m) +void Monitor::handle_sync(MMonSync *m) { dout(10) << __func__ << " " << *m << dendl; + switch (m->op) { - entity_inst_t other = m->get_source_inst(); - - if ((sync_role != SYNC_ROLE_REQUESTER) - || (sync_state != SYNC_STATE_START)) { - // If the leader has sent this message before we failed, there is no point - // in replying to it, as he has no idea that we actually received it. On - // the other hand, if he received one of our stray messages (because it was - // delivered once he got back up after failing) and replied accordingly, - // there is a chance that he did stopped trimming on our behalf. However, - // we have no way to know it, and we really don't want to mess with his - // state if that is not the case. Therefore, just drop it and let the - // timeouts figure it out. Eventually. - dout(1) << __func__ << " stray message -- drop it." << dendl; - goto out; - } + // provider --------- - assert(state == STATE_SYNCHRONIZING); - assert(sync_leader.get() != NULL); - assert(sync_provider.get() != NULL); - - // We now know for sure who the leader is. - sync_leader->entity = other; - sync_leader->cancel_timeout(); - - if (m->flags & MMonSync::FLAG_RETRY) { - dout(10) << __func__ << " retrying sync at a later time" << dendl; - sync_role = SYNC_ROLE_NONE; - sync_state = SYNC_STATE_NONE; - sync_leader->set_timeout(new C_SyncStartRetry(this, sync_leader->entity), - g_conf->mon_sync_backoff_timeout); - goto out; - } + case MMonSync::OP_GET_COOKIE_FULL: + case MMonSync::OP_GET_COOKIE_RECENT: + handle_sync_get_cookie(m); + break; + case MMonSync::OP_GET_CHUNK: + handle_sync_get_chunk(m); + break; - if (m->flags & MMonSync::FLAG_REPLY_TO) { - dout(10) << __func__ << " leader told us to use " << m->reply_to - << " as sync provider" << dendl; - sync_provider->entity = m->reply_to; - } else { - dout(10) << __func__ << " synchronizing from leader at " << other << dendl; - sync_provider->entity = other; - } + // client ----------- - sync_leader->set_timeout(new C_HeartbeatTimeout(this), - g_conf->mon_sync_heartbeat_timeout); + case MMonSync::OP_COOKIE: + handle_sync_cookie(m); + break; - assert(g_conf->mon_sync_requester_kill_at != 2); - sync_send_heartbeat(sync_leader->entity); - assert(g_conf->mon_sync_requester_kill_at != 3); + case MMonSync::OP_CHUNK: + case MMonSync::OP_LAST_CHUNK: + handle_sync_chunk(m); + break; + case MMonSync::OP_NO_COOKIE: + handle_sync_no_cookie(m); + break; - sync_start_chunks(sync_provider); -out: + default: + dout(0) << __func__ << " unknown op " << m->op << dendl; + assert(0 == "unknown op"); + } m->put(); } -void Monitor::handle_sync_heartbeat_reply(MMonSync *m) +// leader + +void Monitor::_sync_reply_no_cookie(MMonSync *m) { - dout(10) << __func__ << " " << *m << dendl; + MMonSync *reply = new MMonSync(MMonSync::OP_NO_COOKIE, m->cookie); + messenger->send_message(reply, m->get_connection()); +} - entity_inst_t other = m->get_source_inst(); - if ((sync_role != SYNC_ROLE_REQUESTER) - || (sync_state == SYNC_STATE_NONE) - || (sync_leader.get() == NULL) - || (other != sync_leader->entity)) { - dout(1) << __func__ << " stray message -- drop it." << dendl; - m->put(); +void Monitor::handle_sync_get_cookie(MMonSync *m) +{ + if (is_synchronizing()) { + _sync_reply_no_cookie(m); return; } - assert(state == STATE_SYNCHRONIZING); - assert(sync_role == SYNC_ROLE_REQUESTER); - assert(sync_state != SYNC_STATE_NONE); + assert(g_conf->mon_sync_provider_kill_at != 1); - assert(sync_leader.get() != NULL); - assert(sync_leader->entity == other); + // make up a unique cookie. include election epoch (which persists + // across restarts for the whole cluster) and a counter for this + // process instance. there is no need to be unique *across* + // monitors, though. + uint64_t cookie = ((unsigned long long)elector.get_epoch() << 24) + ++sync_provider_count; + assert(sync_providers.count(cookie) == 0); - sync_leader->cancel_timeout(); - sync_leader->set_timeout(new C_HeartbeatInterval(this, sync_leader->entity), - g_conf->mon_sync_heartbeat_interval); - m->put(); + dout(10) << __func__ << " cookie " << cookie << " for " << m->get_source_inst() << dendl; + + SyncProvider& sp = sync_providers[cookie]; + sp.cookie = cookie; + sp.entity = m->get_source_inst(); + sp.reset_timeout(g_ceph_context, g_conf->mon_sync_timeout * 2); + + set<string> sync_targets; + if (m->op == MMonSync::OP_GET_COOKIE_FULL) { + // full scan + sync_targets = get_sync_targets_names(); + sp.last_committed = paxos->get_version(); + sp.synchronizer = store->get_synchronizer(sp.last_key, sync_targets); + sp.full = true; + dout(10) << __func__ << " will sync prefixes " << sync_targets << dendl; + } else { + // just catch up paxos + sp.last_committed = m->last_committed; + } + dout(10) << __func__ << " will sync from version " << sp.last_committed << dendl; + + MMonSync *reply = new MMonSync(MMonSync::OP_COOKIE, sp.cookie); + reply->last_committed = sp.last_committed; + messenger->send_message(reply, m->get_connection()); } -void Monitor::handle_sync_chunk(MMonSync *m) +void Monitor::handle_sync_get_chunk(MMonSync *m) { dout(10) << __func__ << " " << *m << dendl; - entity_inst_t other = m->get_source_inst(); - - if ((sync_role != SYNC_ROLE_REQUESTER) - || (sync_state != SYNC_STATE_CHUNKS) - || (sync_provider.get() == NULL) - || (other != sync_provider->entity)) { - dout(1) << __func__ << " stray message -- drop it." << dendl; - m->put(); + if (sync_providers.count(m->cookie) == 0) { + dout(10) << __func__ << " no cookie " << m->cookie << dendl; + _sync_reply_no_cookie(m); return; } - assert(state == STATE_SYNCHRONIZING); - assert(sync_role == SYNC_ROLE_REQUESTER); - assert(sync_state == SYNC_STATE_CHUNKS); - - assert(sync_leader.get() != NULL); + assert(g_conf->mon_sync_provider_kill_at != 2); - assert(sync_provider.get() != NULL); - assert(other == sync_provider->entity); + SyncProvider& sp = sync_providers[m->cookie]; + sp.reset_timeout(g_ceph_context, g_conf->mon_sync_timeout * 2); - sync_provider->cancel_timeout(); + if (sp.last_committed < paxos->get_first_committed()) { + dout(10) << __func__ << " sync requester fell behind paxos, their lc " << sp.last_committed + << " < our fc " << paxos->get_first_committed() << dendl; + sync_providers.erase(m->cookie); + _sync_reply_no_cookie(m); + return; + } + MMonSync *reply = new MMonSync(MMonSync::OP_CHUNK, sp.cookie); MonitorDBStore::Transaction tx; - tx.append_from_encoded(m->chunk_bl); - - sync_provider->set_timeout(new C_SyncTimeout(this, sync_provider->entity), - g_conf->mon_sync_timeout); - sync_provider->last_received_key = m->last_key; - MMonSync *msg = new MMonSync(MMonSync::OP_CHUNK_REPLY); - - bool stop = false; - if (m->flags & MMonSync::FLAG_LAST) { - msg->flags |= MMonSync::FLAG_LAST; - assert(m->version > 0); - tx.put(paxos->get_name(), "last_committed", m->version); - stop = true; + int left = g_conf->mon_sync_max_payload_size; + while (sp.last_committed < paxos->get_version() && left > 0) { + bufferlist bl; + sp.last_committed++; + store->get(paxos->get_name(), sp.last_committed, bl); + tx.put(paxos->get_name(), sp.last_committed, bl); + left -= bl.length(); + dout(20) << __func__ << " including paxos state " << sp.last_committed << dendl; } - assert(g_conf->mon_sync_requester_kill_at != 8); - messenger->send_message(msg, sync_provider->entity); + reply->last_committed = sp.last_committed; - store->apply_transaction(tx); + if (sp.full && left > 0) { + sp.synchronizer->get_chunk_tx(tx, left); + sp.last_key = sp.synchronizer->get_last_key(); + reply->last_key = sp.last_key; + } - if (g_conf->mon_sync_debug && (m->flags & MMonSync::FLAG_CRC)) { - dout(10) << __func__ << " checking CRC" << dendl; - MonitorDBStore::Synchronizer sync; - if (m->flags & MMonSync::FLAG_LAST) { - dout(10) << __func__ << " checking CRC only for Paxos" << dendl; - string paxos_name("paxos"); - sync = store->get_synchronizer(paxos_name); - } else { - dout(10) << __func__ << " checking CRC for all prefixes" << dendl; - set<string> prefixes = get_sync_targets_names(); - pair<string,string> empty_key; - sync = store->get_synchronizer(empty_key, prefixes); - } + if ((sp.full && sp.synchronizer->has_next_chunk()) || + sp.last_committed < paxos->get_version()) { + dout(10) << __func__ << " chunk, through version " << sp.last_committed << " key " << sp.last_key << dendl; + } else { + dout(10) << __func__ << " last chunk, through version " << sp.last_committed << " key " << sp.last_key << dendl; + reply->op = MMonSync::OP_LAST_CHUNK; - while (sync->has_next_chunk()) { - bufferlist bl; - sync->get_chunk(bl); - } - __u32 got_crc = sync->crc(); - dout(10) << __func__ << " expected crc " << m->crc - << " got " << got_crc << dendl; + assert(g_conf->mon_sync_provider_kill_at != 3); - assert(m->crc == got_crc); - dout(10) << __func__ << " CRC matches" << dendl; + // clean up our local state + sync_providers.erase(sp.cookie); } - m->put(); - if (stop) - sync_stop(); -} - -void Monitor::sync_stop() -{ - dout(10) << __func__ << dendl; + ::encode(tx, reply->chunk_bl); - assert(sync_role == SYNC_ROLE_REQUESTER); - assert(sync_state == SYNC_STATE_CHUNKS); - - sync_state = SYNC_STATE_STOP; + messenger->send_message(reply, m->get_connection()); +} - sync_leader->cancel_timeout(); - sync_provider->cancel_timeout(); - sync_provider.reset(); +// requester - entity_inst_t leader = sync_leader->entity; +void Monitor::handle_sync_cookie(MMonSync *m) +{ + dout(10) << __func__ << " " << *m << dendl; + if (sync_cookie) { + dout(10) << __func__ << " already have a cookie, ignoring" << dendl; + return; + } + if (m->get_source_inst() != sync_provider) { + dout(10) << __func__ << " source does not match, discarding" << dendl; + return; + } + sync_cookie = m->cookie; + sync_start_version = m->last_committed; - sync_leader->set_timeout(new C_SyncFinishReplyTimeout(this), - g_conf->mon_sync_timeout); + sync_reset_timeout(); + sync_get_next_chunk(); - MMonSync *msg = new MMonSync(MMonSync::OP_FINISH); - assert(g_conf->mon_sync_requester_kill_at != 9); - messenger->send_message(msg, leader); - assert(g_conf->mon_sync_requester_kill_at != 10); + assert(g_conf->mon_sync_requester_kill_at != 3); } -void Monitor::sync_finish_reply_timeout() +void Monitor::sync_get_next_chunk() { - dout(10) << __func__ << dendl; - assert(state == STATE_SYNCHRONIZING); - assert(sync_leader.get() != NULL); - assert(sync_role == SYNC_ROLE_REQUESTER); - assert(sync_state == SYNC_STATE_STOP); + dout(20) << __func__ << " cookie " << sync_cookie << " provider " << sync_provider << dendl; + if (g_conf->mon_inject_sync_get_chunk_delay > 0) { + dout(20) << __func__ << " injecting delay of " << g_conf->mon_inject_sync_get_chunk_delay << dendl; + usleep((long long)(g_conf->mon_inject_sync_get_chunk_delay * 1000000.0)); + } + MMonSync *r = new MMonSync(MMonSync::OP_GET_CHUNK, sync_cookie); + messenger->send_message(r, sync_provider); - sync_requester_abort(); + assert(g_conf->mon_sync_requester_kill_at != 4); } -void Monitor::handle_sync_finish_reply(MMonSync *m) +void Monitor::handle_sync_chunk(MMonSync *m) { dout(10) << __func__ << " " << *m << dendl; - entity_inst_t other = m->get_source_inst(); - if ((sync_role != SYNC_ROLE_REQUESTER) - || (sync_state != SYNC_STATE_STOP) - || (sync_leader.get() == NULL) - || (sync_leader->entity != other)) { - dout(1) << __func__ << " stray message -- drop it." << dendl; - m->put(); + if (m->cookie != sync_cookie) { + dout(10) << __func__ << " cookie does not match, discarding" << dendl; + return; + } + if (m->get_source_inst() != sync_provider) { + dout(10) << __func__ << " source does not match, discarding" << dendl; return; } - assert(sync_role == SYNC_ROLE_REQUESTER); - assert(sync_state == SYNC_STATE_STOP); + assert(state == STATE_SYNCHRONIZING); + assert(g_conf->mon_sync_requester_kill_at != 5); - assert(sync_leader.get() != NULL); - assert(sync_leader->entity == other); + MonitorDBStore::Transaction tx; + tx.append_from_encoded(m->chunk_bl); - sync_role = SYNC_ROLE_NONE; - sync_state = SYNC_STATE_NONE; + dout(30) << __func__ << " tx dump:\n"; + JSONFormatter f(true); + tx.dump(&f); + f.flush(*_dout); + *_dout << dendl; - sync_leader->cancel_timeout(); - sync_leader.reset(); + store->apply_transaction(tx); - paxos->reapply_all_versions(); + assert(g_conf->mon_sync_requester_kill_at != 6); - sync_store_cleanup(); + if (!sync_full) { + dout(10) << __func__ << " applying recent paxos transactions as we go" << dendl; + MonitorDBStore::Transaction tx; + paxos->read_and_prepare_transactions(&tx, paxos->get_version(), m->last_committed); + tx.put(paxos->get_name(), "last_committed", m->last_committed); - init_paxos(); + dout(30) << __func__ << " tx dump:\n"; + JSONFormatter f(true); + tx.dump(&f); + f.flush(*_dout); + *_dout << dendl; - assert(g_conf->mon_sync_requester_kill_at != 11); + store->apply_transaction(tx); + paxos->init(); // to refresh what we just wrote + } - m->put(); + if (m->op == MMonSync::OP_CHUNK) { + sync_reset_timeout(); + sync_get_next_chunk(); + } else if (m->op == MMonSync::OP_LAST_CHUNK) { + sync_finish(m->last_committed); + } +} +void Monitor::handle_sync_no_cookie(MMonSync *m) +{ + dout(10) << __func__ << dendl; + sync_reset(); bootstrap(); } -void Monitor::handle_sync_abort(MMonSync *m) +void Monitor::sync_trim_providers() { - dout(10) << __func__ << " " << *m << dendl; - /* This function's responsabilities are manifold, and they depend on - * who we (the monitor) are and what is our role in the sync. - * - * If we are the sync requester (i.e., if we are synchronizing), it - * means that we *must* abort the current sync and bootstrap. This may - * be required if there was a leader change and we are talking to the - * wrong leader, which makes continuing with the current sync way too - * risky, given that a Paxos trim may be underway and we certainly incur - * in the chance of ending up with an inconsistent store state. - * - * If we are the sync provider, it means that the requester wants to - * abort his sync, either because he lost connectivity to the leader - * (i.e., his heartbeat timeout was triggered) or he became aware of a - * leader change. - * - * As a leader, we should never receive such a message though, unless we - * have just won an election, in which case we should have been a sync - * provider before. In such a case, we should behave as if we were a sync - * provider and clean up the requester's state. - */ - entity_inst_t other = m->get_source_inst(); - - if ((sync_role == SYNC_ROLE_REQUESTER) - && (sync_leader.get() != NULL) - && (sync_leader->entity == other)) { - - sync_requester_abort(); - } else if ((sync_role & SYNC_ROLE_PROVIDER) - && (sync_entities.count(other) > 0) - && (sync_entities_states[other] == SYNC_STATE_START)) { + dout(20) << __func__ << dendl; - sync_provider_cleanup(other); - } else { - dout(1) << __func__ << " stray message -- drop it." << dendl; + utime_t now = ceph_clock_now(g_ceph_context); + map<uint64_t,SyncProvider>::iterator p = sync_providers.begin(); + while (p != sync_providers.end()) { + if (now > p->second.timeout) { + dout(10) << __func__ << " expiring cookie " << p->second.cookie << " for " << p->second.entity << dendl; + sync_providers.erase(p++); + } else { + ++p; + } } - m->put(); } -void Monitor::handle_sync(MMonSync *m) -{ - dout(10) << __func__ << " " << *m << dendl; - switch (m->op) { - case MMonSync::OP_START: - handle_sync_start(m); - break; - case MMonSync::OP_START_REPLY: - handle_sync_start_reply(m); - break; - case MMonSync::OP_HEARTBEAT: - handle_sync_heartbeat(m); - break; - case MMonSync::OP_HEARTBEAT_REPLY: - handle_sync_heartbeat_reply(m); - break; - case MMonSync::OP_FINISH: - handle_sync_finish(m); - break; - case MMonSync::OP_START_CHUNKS: - handle_sync_start_chunks(m); - break; - case MMonSync::OP_CHUNK: - handle_sync_chunk(m); - break; - case MMonSync::OP_CHUNK_REPLY: - handle_sync_chunk_reply(m); - break; - case MMonSync::OP_FINISH_REPLY: - handle_sync_finish_reply(m); - break; - case MMonSync::OP_ABORT: - handle_sync_abort(m); - break; - default: - dout(0) << __func__ << " unknown op " << m->op << dendl; - m->put(); - assert(0 == "unknown op"); - break; - } -} +// --------------------------------------------------- +// probe void Monitor::cancel_probe_timeout() { @@ -2001,27 +1317,47 @@ void Monitor::handle_probe_reply(MMonProbe *m) assert(paxos != NULL); if (is_synchronizing()) { - dout(10) << " we are currently synchronizing, so that will continue." - << dendl; + dout(10) << " currently syncing" << dendl; m->put(); return; } entity_inst_t other = m->get_source_inst(); - // is there an existing quorum? - if (m->quorum.size()) { - dout(10) << " existing quorum " << m->quorum << dendl; - if (paxos->get_version() < m->paxos_first_version) { + if (m->paxos_last_version < sync_last_committed_floor) { + dout(10) << " peer paxos versions [" << m->paxos_first_version + << "," << m->paxos_last_version << "] < my sync_last_committed_floor " + << sync_last_committed_floor << ", ignoring" + << dendl; + } else { + if (paxos->get_version() < m->paxos_first_version && + m->paxos_first_version > 1) { // no need to sync if we're 0 and they start at 1. dout(10) << " peer paxos versions [" << m->paxos_first_version - << "," << m->paxos_last_version << "]" + << "," << m->paxos_last_version << "]" << " vs my version " << paxos->get_version() << " (too far ahead)" << dendl; - sync_start(other); + cancel_probe_timeout(); + sync_start(other, true); m->put(); return; } + if (paxos->get_version() + g_conf->paxos_max_join_drift < m->paxos_last_version) { + dout(10) << " peer paxos version " << m->paxos_last_version + << " vs my version " << paxos->get_version() + << " (too far ahead)" + << dendl; + cancel_probe_timeout(); + sync_start(other, false); + m->put(); + return; + } + } + + // is there an existing quorum? + if (m->quorum.size()) { + dout(10) << " existing quorum " << m->quorum << dendl; + dout(10) << " peer paxos version " << m->paxos_last_version << " vs my version " << paxos->get_version() << " (ok)" @@ -2046,16 +1382,6 @@ void Monitor::handle_probe_reply(MMonProbe *m) return; } - if (paxos->get_version() + g_conf->paxos_max_join_drift < m->paxos_last_version) { - dout(10) << " peer paxos version " << m->paxos_last_version - << " vs my version " << paxos->get_version() - << " (too far ahead)" - << dendl; - sync_start(other); - m->put(); - return; - } - unsigned need = monmap->size() / 2 + 1; dout(10) << " outside_quorum now " << outside_quorum << ", need " << need << dendl; if (outside_quorum.size() >= need) { @@ -2066,7 +1392,7 @@ void Monitor::handle_probe_reply(MMonProbe *m) dout(10) << " that's enough to form a new quorum, but it does not include me; waiting" << dendl; } } else { - dout(10) << " that's not yet enough for a new quorum, waiting" << dendl; + dout(10) << " that's not yet enough for a new quorum, waiting" << dendl; } } m->put(); @@ -2149,19 +1475,6 @@ void Monitor::lose_election(epoch_t epoch, set<int> &q, int l, uint64_t features dout(10) << "lose_election, epoch " << epoch << " leader is mon" << leader << " quorum is " << quorum << " features are " << quorum_features << dendl; - // let everyone currently syncing know that we are no longer the leader and - // that they should all abort their on-going syncs - for (map<entity_inst_t,Context*>::iterator iter = trim_timeouts.begin(); - iter != trim_timeouts.end(); - ++iter) { - timer.cancel_event((*iter).second); - entity_inst_t entity = (*iter).first; - MMonSync *msg = new MMonSync(MMonSync::OP_ABORT); - messenger->send_message(msg, entity); - } - trim_timeouts.clear(); - sync_role &= ~SYNC_ROLE_LEADER; - paxos->peon_init(); for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p) (*p)->election_finished(); @@ -2216,79 +1529,7 @@ bool Monitor::_allowed_command(MonSession *s, map<string, cmd_vartype>& cmd) return retval; } -void Monitor::_sync_status(ostream& ss) -{ - JSONFormatter jf(true); - jf.open_object_section("sync_status"); - jf.dump_string("state", get_state_name()); - jf.dump_unsigned("paxos_version", paxos->get_version()); - - if (is_leader() || (sync_role == SYNC_ROLE_LEADER)) { - Mutex::Locker l(trim_lock); - jf.open_object_section("trim"); - jf.dump_int("disabled", paxos->is_trim_disabled()); - jf.dump_int("should_trim", paxos->should_trim()); - if (!trim_timeouts.empty()) { - jf.open_array_section("mons"); - for (map<entity_inst_t,Context*>::iterator it = trim_timeouts.begin(); - it != trim_timeouts.end(); - ++it) { - entity_inst_t e = (*it).first; - jf.dump_stream("mon") << e; - int s = -1; - if (trim_entities_states.count(e)) - s = trim_entities_states[e]; - jf.dump_stream("sync_state") << get_sync_state_name(s); - } - } - jf.close_section(); - } - - if (!sync_entities.empty() || (sync_role == SYNC_ROLE_PROVIDER)) { - jf.open_array_section("on_going"); - for (map<entity_inst_t,SyncEntity>::iterator it = sync_entities.begin(); - it != sync_entities.end(); - ++it) { - entity_inst_t e = (*it).first; - jf.open_object_section("mon"); - jf.dump_stream("addr") << e; - jf.dump_string("state", (*it).second->get_state()); - int s = -1; - if (sync_entities_states.count(e)) - s = sync_entities_states[e]; - jf.dump_stream("sync_state") << get_sync_state_name(s); - - jf.close_section(); - } - jf.close_section(); - } - - if (is_synchronizing() || (sync_role == SYNC_ROLE_REQUESTER)) { - jf.open_object_section("leader"); - SyncEntity sync_entity = sync_leader; - if (sync_entity.get() != NULL) - jf.dump_stream("addr") << sync_entity->entity; - jf.close_section(); - - jf.open_object_section("provider"); - sync_entity = sync_provider; - if (sync_entity.get() != NULL) - jf.dump_stream("addr") << sync_entity->entity; - jf.close_section(); - } - - if (g_conf->mon_sync_leader_kill_at > 0) - jf.dump_int("leader_kill_at", g_conf->mon_sync_leader_kill_at); - if (g_conf->mon_sync_provider_kill_at > 0) - jf.dump_int("provider_kill_at", g_conf->mon_sync_provider_kill_at); - if (g_conf->mon_sync_requester_kill_at > 0) - jf.dump_int("requester_kill_at", g_conf->mon_sync_requester_kill_at); - - jf.close_section(); - jf.flush(ss); -} - -void Monitor::_sync_force(ostream& ss) +void Monitor::sync_force(ostream& ss) { MonitorDBStore::Transaction tx; tx.put("mon_sync", "force_sync", 1); @@ -2347,17 +1588,31 @@ void Monitor::_mon_status(ostream& ss) jf.dump_stream("peer") << *p; jf.close_section(); + jf.open_array_section("sync_provider"); + for (map<uint64_t,SyncProvider>::const_iterator p = sync_providers.begin(); + p != sync_providers.end(); + ++p) { + jf.dump_unsigned("cookie", p->second.cookie); + jf.dump_stream("entity") << p->second.entity; + jf.dump_stream("timeout") << p->second.timeout; + jf.dump_unsigned("last_committed", p->second.last_committed); + jf.dump_stream("last_key") << p->second.last_key; + } + jf.close_section(); + if (is_synchronizing()) { - if (sync_leader) - jf.dump_stream("sync_leader") << sync_leader->entity; - else - jf.dump_string("sync_leader", ""); - if (sync_provider) - jf.dump_stream("sync_provider") << sync_provider->entity; - else - jf.dump_string("sync_provider", ""); + jf.open_object_section("sync"); + jf.dump_stream("sync_provider") << sync_provider; + jf.dump_unsigned("sync_cookie", sync_cookie); + jf.dump_unsigned("sync_start_version", sync_start_version); + jf.close_section(); } + if (g_conf->mon_sync_provider_kill_at > 0) + jf.dump_int("provider_kill_at", g_conf->mon_sync_provider_kill_at); + if (g_conf->mon_sync_requester_kill_at > 0) + jf.dump_int("requester_kill_at", g_conf->mon_sync_requester_kill_at); + jf.open_object_section("monmap"); monmap->dump(&jf); jf.close_section(); @@ -2843,16 +2098,6 @@ void Monitor::handle_command(MMonCommand *m) rdata.append(ds); rs = ""; r = 0; - } else if (prefix == "sync status") { - if (!access_r) { - r = -EACCES; - rs = "access denied"; - goto out; - } - _sync_status(ds); - rdata.append(ds); - rs = ""; - r = 0; } else if (prefix == "sync force") { string validate1, validate2; cmd_getval(g_ceph_context, cmdmap, "validate1", validate1); @@ -2865,7 +2110,7 @@ void Monitor::handle_command(MMonCommand *m) "--i-know-what-i-am-doing' if you really do."; goto out; } - _sync_force(ds); + sync_force(ds); rs = ds.str(); r = 0; } else if (prefix == "heap") { @@ -4174,6 +3419,8 @@ void Monitor::tick() } } + sync_trim_providers(); + if (!maybe_wait_for_quorum.empty()) { finish_contexts(g_ceph_context, maybe_wait_for_quorum); } diff --git a/src/mon/Monitor.h b/src/mon/Monitor.h index 004292b9778..cbc91529c21 100644 --- a/src/mon/Monitor.h +++ b/src/mon/Monitor.h @@ -54,7 +54,7 @@ #include <errno.h> -#define CEPH_MON_PROTOCOL 12 /* cluster internal */ +#define CEPH_MON_PROTOCOL 13 /* cluster internal */ enum { @@ -171,11 +171,8 @@ public: default: return "???"; } } - const string get_state_name() const { - string sn(get_state_name(state)); - string sync_name(get_sync_state_name()); - sn.append(sync_name); - return sn; + const char *get_state_name() const { + return get_state_name(state); } bool is_shutdown() const { return state == STATE_SHUTDOWN; } @@ -224,878 +221,165 @@ private: * @{ */ /** - * Obtain the synchronization target prefixes in set form. - * - * We consider a target prefix all those that are relevant when - * synchronizing two stores. That is, all those that hold paxos service's - * versions, as well as paxos versions, or any control keys such as the - * first or last committed version. - * - * Given the current design, this function should return the name of all and - * any available paxos service, plus the paxos name. - * - * @returns a set of strings referring to the prefixes being synchronized - */ - set<string> get_sync_targets_names(); - /** - * Handle a sync-related message - * - * This function will call the appropriate handling functions for each - * operation type. - * - * @param m A sync-related message (i.e., of type MMonSync) - */ - void handle_sync(MMonSync *m); - /** - * Handle a sync-related message of operation type OP_ABORT. - * - * @param m A sync-related message of type OP_ABORT - */ - void handle_sync_abort(MMonSync *m); - /** - * Reset the monitor's sync-related data structures and state. - */ - void reset_sync(bool abort = false); - - /** - * @defgroup Synchronization_Roles - * @{ - */ - /** - * The monitor has no role in any on-going synchronization. - */ - static const uint8_t SYNC_ROLE_NONE = 0x0; - /** - * The monitor is the Leader in at least one synchronization. - */ - static const uint8_t SYNC_ROLE_LEADER = 0x1; - /** - * The monitor is the Provider in at least one synchronization. - */ - static const uint8_t SYNC_ROLE_PROVIDER = 0x2; - /** - * The monitor is a requester in the on-going synchronization. - */ - static const uint8_t SYNC_ROLE_REQUESTER = 0x4; - - /** - * The monitor's current role in on-going synchronizations, if any. - * - * A monitor can either be part of no synchronization at all, in which case - * @p sync_role shall hold the value @p SYNC_ROLE_NONE, or it can be part of - * an on-going synchronization, in which case it may be playing either one or - * two roles at the same time: - * - * - If the monitor is the sync requester (i.e., be the one synchronizing - * against some other monitor), the @p sync_role field will hold only the - * @p SYNC_ROLE_REQUESTER value. - * - Otherwise, the monitor can be either a sync leader, or a sync provider, - * or both, in which case @p sync_role will hold a binary OR of both - * @p SYNC_ROLE_LEADER and @p SYNC_ROLE_PROVIDER. - */ - uint8_t sync_role; - /** - * @} - */ - /** - * @defgroup Leader-specific - * @{ - */ - /** - * Guarantee mutual exclusion access to the @p trim_timeouts map. - * - * We need this mutex specially when we have a monitor starting a sync with - * the leader and another one finishing or aborting an on-going sync, that - * happens to be the last on-going trim on the map. Given that we will - * enable the Paxos trim once we deplete the @p trim_timeouts map, we must - * then ensure that we either add the new sync start to the map before - * removing the one just finishing, or that we remove the finishing one - * first and enable the trim before we add the new one. If we fail to do - * this, nasty repercussions could follow. - */ - Mutex trim_lock; - /** - * Map holding all on-going syncs' timeouts. - * - * An on-going sync leads to the Paxos trim to be suspended, and this map - * will associate entities to the timeouts to be triggered if the monitor - * being synchronized fails to check-in with the leader, letting him know - * that the sync is still in effect and that in no circumstances should the - * Paxos trim be enabled. - */ - map<entity_inst_t, Context*> trim_timeouts; - map<entity_inst_t, uint8_t> trim_entities_states; - /** - * Map associating monitors to a sync state. - * - * This map is used by both the Leader and the Sync Provider, and has the - * sole objective of keeping track of the state each monitor's sync process - * is in. - */ - map<entity_inst_t, uint8_t> sync_entities_states; - /** - * Timer that will enable the Paxos trim. - * - * This timer is set after the @p trim_timeouts map is depleted, and once - * fired it will enable the Paxos trim (if still disabled). By setting - * this timer, we avoid a scenario in which a monitor has just finished - * synchronizing, but because the Paxos trim had been disabled for a long, - * long time and a lot of trims were proposed in the timespan of the monitor - * finishing its sync and actually joining the cluster, the monitor happens - * to be out-of-sync yet again. Backing off enabling the Paxos trim will - * allow the other monitor to join the cluster before actually trimming. - */ - Context *trim_enable_timer; - - /** - * Callback class responsible for finishing a monitor's sync session on the - * leader's side, because the said monitor failed to acknowledge its - * liveliness in a timely manner, thus being assumed as failed. + * @} // provider state */ - struct C_TrimTimeout : public Context { - Monitor *mon; - entity_inst_t entity; - - C_TrimTimeout(Monitor *m, entity_inst_t& entity) - : mon(m), entity(entity) { } - void finish(int r) { - mon->sync_finish(entity); - } - }; + struct SyncProvider { + entity_inst_t entity; ///< who + uint64_t cookie; ///< unique cookie for this sync attempt + utime_t timeout; ///< when we give up and expire this attempt + version_t last_committed; ///< last paxos version on peer + pair<string,string> last_key; ///< last key sent to (or on) peer + bool full; ///< full scan? + MonitorDBStore::Synchronizer synchronizer; ///< iterator - /** - * Callback class responsible for enabling the Paxos trim if there are no - * more on-going syncs. - */ - struct C_TrimEnable : public Context { - Monitor *mon; + SyncProvider() : cookie(0), last_committed(0), full(false) {} - C_TrimEnable(Monitor *m) : mon(m) { } - void finish(int r) { - Mutex::Locker(mon->trim_lock); - // even if we are no longer the leader, we should re-enable trim if - // we have disabled it in the past. It doesn't mean we are going to - // do anything about it, but if we happen to become the leader - // sometime down the future, we sure want to have the trim enabled. - if (mon->trim_timeouts.empty()) - mon->paxos->trim_enable(); - mon->trim_enable_timer = NULL; + void reset_timeout(CephContext *cct, int grace) { + timeout = ceph_clock_now(cct); + timeout += grace; } }; - void sync_obtain_latest_monmap(bufferlist &bl); - void sync_store_init(); - void sync_store_cleanup(); - bool is_sync_on_going(); + map<uint64_t, SyncProvider> sync_providers; ///< cookie -> SyncProvider for those syncing from us + uint64_t sync_provider_count; ///< counter for issued cookies to keep them unique /** - * Send a heartbeat message to another entity. - * - * The sent message may be a heartbeat reply if the @p reply parameter is - * set to true. - * - * This function is used both by the leader (always with @p reply = true), - * and by the sync requester (always with @p reply = false). - * - * @param other The target monitor's entity instance. - * @param reply Whether the message to be sent should be a heartbeat reply. - */ - void sync_send_heartbeat(entity_inst_t &other, bool reply = false); - /** - * Handle a Sync Start request. - * - * Monitors wanting to synchronize with the cluster will have to first ask - * the leader to do so. The only objective with this is so that the we can - * gurantee that the leader won't trim the paxos state. - * - * The leader may not be the only one receiving this request. A sync provider - * may also receive it when it is taken as the point of entry onto the - * cluster. In this scenario, the provider must then forward this request to - * the leader, if he know of one, or assume himself as the leader for this - * sync purpose (this may happen if there is no formed quorum). - * - * @param m Sync message with operation type MMonSync::OP_START - */ - void handle_sync_start(MMonSync *m); - /** - * Handle a Heartbeat sent by a sync requester. - * - * We use heartbeats as a way to guarantee that both the leader and the sync - * requester are still alive. Receiving this message means that the requester - * if still working on getting his store synchronized. - * - * @param m Sync message with operation type MMonSync::OP_HEARTBEAT - */ - void handle_sync_heartbeat(MMonSync *m); - /** - * Handle a Sync Finish. - * - * A MMonSync::OP_FINISH is the way the sync requester has to inform the - * leader that he finished synchronizing his store. - * - * @param m Sync message with operation type MMonSync::OP_FINISH + * @} // requester state */ - void handle_sync_finish(MMonSync *m); + entity_inst_t sync_provider; ///< who we are syncing from + uint64_t sync_cookie; ///< 0 if we are starting, non-zero otherwise + bool sync_full; ///< true if we are a full sync, false for recent catch-up + version_t sync_start_version; ///< last_committed at sync start + Context *sync_timeout_event; ///< timeout event + /** - * Finish a given monitor's sync process on the leader's side. + * floor for sync source * - * This means cleaning up the state referring to the monitor whose sync has - * finished (may it have been finished successfully, by receiving a message - * with type MMonSync::OP_FINISH, or due to the assumption that the said - * monitor failed). + * When we sync we forget about our old last_committed value which + * can be dangerous. For example, if we have a cluster of: * - * If we happen to know of no other monitor synchronizing, we may then enable - * the paxos trim. - * - * @param entity Entity instance of the monitor whose sync we are considering - * as finished. - * @param abort If true, we consider this sync has finished due to an abort. - */ - void sync_finish(entity_inst_t &entity, bool abort = false); - /** - * Abort a given monitor's sync process on the leader's side. + * mon.a: lc 100 + * mon.b: lc 80 + * mon.c: lc 100 (us) * - * This function is a wrapper for Monitor::sync_finish(). + * If something forces us to sync (say, corruption, or manual + * intervention, or bug), we forget last_committed, and might abort. + * If mon.a happens to be down when we come back, we will see: * - * @param entity Entity instance of the monitor whose sync we are aborting. - */ - void sync_finish_abort(entity_inst_t &entity) { - sync_finish(entity, true); - } - /** - * @} // Leader-specific - */ - /** - * @defgroup Synchronization Provider-specific - * @{ - */ - /** - * Represents a participant in a synchronization, along with its state. + * mon.b: lc 80 + * mon.c: lc 0 (us) * - * This class is used to track down all the sync requesters we are providing - * to. In such scenario, it won't be uncommon to have the @p synchronizer - * field set with a connection to the MonitorDBStore, the @p timeout field - * containing a timeout event and @p entity containing the entity instance - * of the monitor we are here representing. + * and sync from mon.b, at which point a+b will both have lc 80 and + * come online with a majority holding out of date commits. * - * The sync requester will also use this class to represent both the sync - * leader and the sync provider. + * Avoid this by preserving our old last_committed value prior to + * sync and never going backwards. */ - struct SyncEntityImpl { - - /** - * Store synchronization related Sync state. - */ - enum { - /** - * No state whatsoever. We are not providing any sync suppport. - */ - STATE_NONE = 0, - /** - * This entity's sync effort is currently focused on reading and sharing - * our whole store state with @p entity. This means all the entries in - * the key/value space. - */ - STATE_WHOLE = 1, - /** - * This entity's sync effor is currently focused on reading and sharing - * our Paxos state with @p entity. This means all the Paxos-related - * key/value entries, such as the Paxos versions. - */ - STATE_PAXOS = 2 - }; - - /** - * The entity instace of the monitor whose sync effort we are representing. - */ - entity_inst_t entity; - /** - * Our Monitor. - */ - Monitor *mon; - /** - * The Paxos version we are focusing on. - * - * @note This is not used at the moment. We are still assessing whether we - * need it. - */ - version_t version; - /** - * Timeout event. Its type and purpose varies depending on the situation. - */ - Context *timeout; - /** - * Last key received during a sync effort. - * - * This field is mainly used by the sync requester to track the last - * received key, in case he needs to switch providers due to failure. The - * sync provider will also use this field whenever the requester specifies - * a last received key when requesting the provider to start sending his - * store chunks. - */ - pair<string,string> last_received_key; - /** - * Hold the Store Synchronization related Sync State. - */ - int sync_state; - /** - * The MonitorDBStore's chunk iterator instance we are currently using - * to obtain the store's chunks and pack them to the sync requester. - */ - MonitorDBStore::Synchronizer synchronizer; - MonitorDBStore::Synchronizer paxos_synchronizer; - /* Should only be used for debugging purposes */ - /** - * crc of the contents read from the store. - * - * @note may not always be available, as it is used only on specific - * points in time during the sync process. - * @note depends on '--mon-sync-debug' being set. - */ - __u32 crc; - /** - * Should be true if @p crc has been set. - */ - bool crc_available; - /** - * Total synchronization attempts. - */ - int attempts; - - SyncEntityImpl(entity_inst_t &entity, Monitor *mon) - : entity(entity), - mon(mon), - version(0), - timeout(NULL), - sync_state(STATE_NONE), - crc(0), - crc_available(false), - attempts(0) - { } - - /** - * Obtain current Sync State name. - * - * @returns Name of current sync state. - */ - string get_state() { - switch (sync_state) { - case STATE_NONE: return "none"; - case STATE_WHOLE: return "whole"; - case STATE_PAXOS: return "paxos"; - default: return "unknown"; - } - } - /** - * Obtain the paxos version at which this sync started. - * - * @returns Paxos version at which this sync started - */ - version_t get_version() { - return version; - } - /** - * Set a timeout event for this sync entity. - * - * @param event Timeout class to be called after @p fire_after seconds. - * @param fire_after Number of seconds until we fire the @p event event. - */ - void set_timeout(Context *event, double fire_after) { - cancel_timeout(); - timeout = event; - mon->timer.add_event_after(fire_after, timeout); - } - /** - * Cancel the currently set timeout, if any. - */ - void cancel_timeout() { - if (timeout) - mon->timer.cancel_event(timeout); - timeout = NULL; - } - /** - * Initiate the required fields for obtaining chunks out of the - * MonitorDBStore. - * - * This function will initiate @p synchronizer with a chunk iterator whose - * scope is all the keys/values that belong to one of the sync targets - * (i.e., paxos services or paxos). - * - * Calling @p Monitor::sync_update() will be essential during the efforts - * of providing a correct store state to the requester, since we will need - * to eventually update the iterator in order to start packing the Paxos - * versions. - */ - void sync_init() { - sync_state = STATE_WHOLE; - set<string> sync_targets = mon->get_sync_targets_names(); - - string prefix("paxos"); - paxos_synchronizer = mon->store->get_synchronizer(prefix); - version = mon->paxos->get_version(); - generic_dout(10) << __func__ << " version " << version << dendl; - - synchronizer = mon->store->get_synchronizer(last_received_key, - sync_targets); - sync_update(); - assert(synchronizer->has_next_chunk()); - } - /** - * Update the @p synchronizer chunk iterator, if needed. - * - * Whenever we reach the end of the iterator during @p STATE_WHOLE, we - * must update the @p synchronizer to an iterator focused on reading only - * Paxos versions. This is an essential part of the sync store approach, - * and it will guarantee that we end up with a consistent store. - */ - void sync_update() { - assert(sync_state != STATE_NONE); - assert(synchronizer.use_count() != 0); - - if (!synchronizer->has_next_chunk()) { - crc_set(synchronizer->crc()); - if (sync_state == STATE_WHOLE) { - assert(paxos_synchronizer.use_count() != 0); - sync_state = STATE_PAXOS; - synchronizer = paxos_synchronizer; - } - } - } + version_t sync_last_committed_floor; - /* For debug purposes only */ - /** - * Check if we have a CRC available. - * - * @returns true if crc is available; false otherwise. - */ - bool has_crc() { - return (g_conf->mon_sync_debug && crc_available); - } - /** - * Set @p crc to @p to_set - * - * @param to_set a crc value to set. - */ - void crc_set(__u32 to_set) { - crc = to_set; - crc_available = true; - } - /** - * Get the current CRC value from @p crc - * - * @returns the currenct CRC value from @p crc - */ - __u32 crc_get() { - return crc; - } - /** - * Clear the current CRC. - */ - void crc_clear() { - crc_available = false; - } - }; - typedef std::tr1::shared_ptr< SyncEntityImpl > SyncEntity; - /** - * Get a Monitor::SyncEntity instance. - * - * @param entity The monitor's entity instance that we want to associate - * with this Monitor::SyncEntity. - * @param mon The Monitor. - * - * @returns A Monitor::SyncEntity - */ - SyncEntity get_sync_entity(entity_inst_t &entity, Monitor *mon) { - return std::tr1::shared_ptr<SyncEntityImpl>( - new SyncEntityImpl(entity, mon)); - } - /** - * Callback class responsible for dealing with the consequences of a sync - * process timing out. - */ struct C_SyncTimeout : public Context { Monitor *mon; - entity_inst_t entity; - - C_SyncTimeout(Monitor *mon, entity_inst_t &entity) - : mon(mon), entity(entity) - { } - + C_SyncTimeout(Monitor *m) : mon(m) {} void finish(int r) { - mon->sync_timeout(entity); + mon->sync_timeout(); } }; + /** - * Map containing all the monitor entities to whom we are acting as sync - * providers. - */ - map<entity_inst_t, SyncEntity> sync_entities; - /** - * RNG used for the sync (currently only used to pick random monitors) - */ - SimpleRNG sync_rng; - /** - * Obtain random monitor from the monmap. - * - * @param other Any monitor other than the one with rank @p other - * @returns The picked monitor's name. - */ - int _pick_random_mon(int other = -1); - int _pick_random_quorum_mon(int other = -1); - /** - * Deal with the consequences of @p entity's sync timing out. - * - * @note Both the sync provider and the sync requester make use of this - * function, since both use the @p Monitor::C_SyncTimeout callback. - * - * Being the sync provider, whenever a Monitor::C_SyncTimeout is triggered, - * we only have to clean up the sync requester's state we are maintaining. - * - * Being the sync requester, we will have to choose a new sync provider, and - * resume our sync from where it was left. - * - * @param entity Entity instance of the monitor whose sync has timed out. - */ - void sync_timeout(entity_inst_t &entity); - /** - * Cleanup the state we, the provider, are keeping during @p entity's sync. - * - * @param entity Entity instance of the monitor whose sync state we are - * cleaning up. - */ - void sync_provider_cleanup(entity_inst_t &entity); - /** - * Handle a Sync Start Chunks request from a sync requester. - * - * This request will create the necessary state our the provider's end, and - * the provider will then be able to send chunks of his own store to the - * requester. - * - * @param m Sync message with operation type MMonSync::OP_START_CHUNKS - */ - void handle_sync_start_chunks(MMonSync *m); - /** - * Handle a requester's reply to the last chunk we sent him. - * - * We will only send a new chunk to the sync requester once he has acked the - * reception of the last chunk we sent them. - * - * That's also how we will make sure that, on their end, they became aware - * that there are no more chunks to send (since we shall tag a message with - * MMonSync::FLAG_LAST when we are sending them the last chunk of all), - * allowing us to clean up the requester's state. - * - * @param m Sync message with operation type MMonSync::OP_CHUNK_REPLY - */ - void handle_sync_chunk_reply(MMonSync *m); - /** - * Send a chunk to the sync entity represented by @p sync. + * Obtain the synchronization target prefixes in set form. * - * This function will send the next chunk available on the synchronizer. If - * it happens to be the last chunk, then the message shall be marked as - * such using MMonSync::FLAG_LAST. + * We consider a target prefix all those that are relevant when + * synchronizing two stores. That is, all those that hold paxos service's + * versions, as well as paxos versions, or any control keys such as the + * first or last committed version. * - * @param sync A Monitor::SyncEntity representing a sync requester monitor. - */ - void sync_send_chunks(SyncEntity sync); - /** - * @} // Synchronization Provider-specific - */ - /** - * @defgroup Synchronization Requester-specific - * @{ - */ - /** - * The state in which we (the sync leader, provider or requester) are in - * regard to our sync process (if we are the requester) or any entity that - * we may be leading or providing to. - */ - enum { - /** - * We are not part of any synchronization effort, or it has not began yet. - */ - SYNC_STATE_NONE = 0, - /** - * We have started our role in the synchronization. - * - * This state may have multiple meanings, depending on which entity is - * employing it and within which context. - * - * For instance, the leader will consider a sync requester to enter - * SYNC_STATE_START whenever it receives a MMonSync::OP_START from the - * said requester. On the other hand, the provider will consider that the - * requester enters this state after receiving a MMonSync::OP_START_CHUNKS. - * The sync requester will enter this state as soon as it begins its sync - * efforts. - */ - SYNC_STATE_START = 1, - /** - * We are synchronizing chunks. - * - * This state is not used by the sync leader; only the sync requester and - * the sync provider will. - */ - SYNC_STATE_CHUNKS = 2, - /** - * We are stopping the sync effort. - */ - SYNC_STATE_STOP = 3 - }; - /** - * The current sync state. + * Given the current design, this function should return the name of all and + * any available paxos service, plus the paxos name. * - * This field is only used by the sync requester, being the only one that - * will take this state as part of its global state. The sync leader and the - * sync provider will only associate sync states to other entities (i.e., to - * sync requesters), and those shall be kept in the @p sync_entities_states - * map. - */ - int sync_state; - /** - * Callback class responsible for dealing with the consequences of the sync - * requester not receiving a MMonSync::OP_START_REPLY in a timely manner. - */ - struct C_SyncStartTimeout : public Context { - Monitor *mon; - - C_SyncStartTimeout(Monitor *mon) - : mon(mon) - { } - - void finish(int r) { - mon->sync_start_reply_timeout(); - } - }; - /** - * Callback class responsible for retrying a Sync Start after a given - * backoff period, whenever the Sync Leader flags a MMonSync::OP_START_REPLY - * with the MMonSync::FLAG_RETRY flag. + * @returns a set of strings referring to the prefixes being synchronized */ - struct C_SyncStartRetry : public Context { - Monitor *mon; - entity_inst_t entity; - - C_SyncStartRetry(Monitor *mon, entity_inst_t &entity) - : mon(mon), entity(entity) - { } + set<string> get_sync_targets_names(); - void finish(int r) { - mon->bootstrap(); - } - }; /** - * We use heartbeats to check if both the Leader and the Synchronization - * Requester are both still alive, so we can determine if we should continue - * with the synchronization process, granted that trim is disabled. + * Reset the monitor's sync-related data structures and state, both + * for the requester- and provider-side. */ - struct C_HeartbeatTimeout : public Context { - Monitor *mon; - - C_HeartbeatTimeout(Monitor *mon) - : mon(mon) - { } + void sync_reset(); - void finish(int r) { - mon->sync_requester_abort(); - } - }; /** - * Callback class responsible for sending a heartbeat message to the sync - * leader. We use this callback to keep an assynchronous heartbeat with - * the sync leader at predefined intervals. + * Caled when a sync attempt times out (requester-side) */ - struct C_HeartbeatInterval : public Context { - Monitor *mon; - entity_inst_t entity; + void sync_timeout(); - C_HeartbeatInterval(Monitor *mon, entity_inst_t &entity) - : mon(mon), entity(entity) - { } - - void finish(int r) { - mon->sync_leader->set_timeout(new C_HeartbeatTimeout(mon), - g_conf->mon_sync_heartbeat_timeout); - mon->sync_send_heartbeat(entity); - } - }; /** - * Callback class responsible for dealing with the consequences of never - * receiving a reply to a MMonSync::OP_FINISH sent to the sync leader. + * Get the latest monmap for backup purposes during sync */ - struct C_SyncFinishReplyTimeout : public Context { - Monitor *mon; - - C_SyncFinishReplyTimeout(Monitor *mon) - : mon(mon) - { } + void sync_obtain_latest_monmap(bufferlist &bl); - void finish(int r) { - mon->sync_finish_reply_timeout(); - } - }; - /** - * The entity we, the sync requester, consider to be our sync leader. If - * there is a formed quorum, the @p sync_leader should represent the actual - * cluster Leader; otherwise, it can be any monitor and will likely be the - * same as @p sync_provider. - */ - SyncEntity sync_leader; - /** - * The entity we, the sync requester, are synchronizing against. This entity - * will be our source of store chunks, and we will ultimately obtain a store - * state equal (or very similar, maybe off by a couple of versions) as their - * own. - */ - SyncEntity sync_provider; - /** - * Clean up the Sync Requester's state (both in-memory and in-store). - */ - void sync_requester_cleanup(); - /** - * Abort the current sync effort. - * - * This will be translated into a MMonSync::OP_ABORT sent to the sync leader - * and to the sync provider, and ultimately it will also involve calling - * @p Monitor::sync_requester_cleanup() to clean up our current sync state. - */ - void sync_requester_abort(); - /** - * Deal with a timeout while waiting for a MMonSync::OP_FINISH_REPLY. - * - * This will be assumed as a leader failure, and having been exposed to the - * side-effects of a new Leader being elected, we have no other choice but - * to abort our sync process and start fresh. - */ - void sync_finish_reply_timeout(); - /** - * Deal with a timeout while waiting for a MMonSync::OP_START_REPLY. - * - * This will be assumed as a leader failure. Since we didn't get to do - * much work (as we haven't even started our sync), we will simply bootstrap - * and start off fresh with a new sync leader. - */ - void sync_start_reply_timeout(); /** * Start the synchronization efforts. * * This function should be called whenever we find the need to synchronize * our store state with the remaining cluster. * - * Starting the sync process means that we will have to request the cluster - * Leader (if there is a formed quorum) to stop trimming the Paxos state and - * allow us to start synchronizing with the sync provider we picked. * * @param entity An entity instance referring to the sync provider we picked. + * @param whether to sycn the full store, or just pull recent paxos commits */ - void sync_start(entity_inst_t &entity); - /** - * Request the provider to start sending the chunks of his store, in order - * for us to obtain a consistent store state similar to the one shared by - * the cluster. - * - * @param provider The SyncEntity representing the Sync Provider. - */ - void sync_start_chunks(SyncEntity provider); + void sync_start(entity_inst_t &entity, bool full); + +public: /** - * Handle a MMonSync::OP_START_REPLY sent by the Sync Leader. - * - * Reception of this message may be twofold: if it was marked with the - * MMonSync::FLAG_RETRY flag, we must backoff for a while and retry starting - * the sync at a later time; otherwise, we have the green-light to request - * the Sync Provider to start sharing his chunks with us. - * - * @param m Sync message with operation type MMonSync::OP_START_REPLY + * force a sync on next mon restart */ - void handle_sync_start_reply(MMonSync *m); + void sync_force(ostream& ss); + +private: /** - * Handle a Heartbeat reply sent by the Sync Leader. + * reset the sync timeout * - * We use heartbeats to keep the Sync Leader aware that we are keeping our - * sync efforts alive. We also use them to make sure our Sync Leader is - * still alive. If the Sync Leader fails, we will have to abort our on-going - * sync, or we could incurr in an inconsistent store state due to a trim on - * the Paxos state of the monitor provinding us with his store chunks. - * - * @param m Sync message with operation type MMonSync::OP_HEARTBEAT_REPLY + * This is used on the client to restart if things aren't progressing */ - void handle_sync_heartbeat_reply(MMonSync *m); + void sync_reset_timeout(); + /** - * Handle a chunk sent by the Sync Provider. + * trim stale sync provider state * - * We will receive the Sync Provider's store in chunks. These are encoded - * in bufferlists containing a transaction that will be directly applied - * onto our MonitorDBStore. - * - * Whenever we receive such a message, we must reply to the Sync Provider, - * as a way of acknowledging the reception of its last chunk. If the message - * is tagged with a MMonSync::FLAG_LAST, we can then consider we have - * received all the chunks the Sync Provider had to offer, and finish our - * sync efforts with the Sync Leader. - * - * @param m Sync message with operation type MMonSync::OP_CHUNK + * If someone is syncing from us and hasn't talked to us recently, expire their state. */ - void handle_sync_chunk(MMonSync *m); + void sync_trim_providers(); + /** - * Handle a reply sent by the Sync Leader to a MMonSync::OP_FINISH. - * - * As soon as we receive this message, we know we finally have a store state - * consistent with the remaining cluster (give or take a couple of versions). - * We may then bootstrap and attempt to join the other monitors in the - * cluster. + * Complete a sync * - * @param m Sync message with operation type MMonSync::OP_FINISH_REPLY - */ - void handle_sync_finish_reply(MMonSync *m); - /** - * Stop our synchronization effort by sending a MMonSync::OP_FINISH to the - * Sync Leader. + * Finish up a sync after we've gotten all of the chunks. * - * Once we receive the last chunk from the Sync Provider, we are in - * conditions of officially finishing our sync efforts. With that purpose in - * mind, we must then send a MMonSync::OP_FINISH to the Leader, letting him - * know that we no longer require the Paxos state to be preserved. + * @param last_committed final last_committed value from provider */ - void sync_stop(); + void sync_finish(version_t last_committed); + /** - * @} // Synchronization Requester-specific + * request the next chunk from the provider */ - const string get_sync_state_name(int s) const { - switch (s) { - case SYNC_STATE_NONE: return "none"; - case SYNC_STATE_START: return "start"; - case SYNC_STATE_CHUNKS: return "chunks"; - case SYNC_STATE_STOP: return "stop"; - } - return "???"; - } + void sync_get_next_chunk(); + /** - * Obtain a string describing the current Sync State. + * handle sync message * - * @returns A string describing the current Sync State, if any, or an empty - * string if no sync (or sync effort we know of) is in progress. + * @param m Sync message with operation type MMonSync::OP_START_CHUNKS */ - const string get_sync_state_name() const { - string sn; - - if (sync_role == SYNC_ROLE_NONE) - return ""; - - sn.append(" sync("); - - if (sync_role & SYNC_ROLE_LEADER) - sn.append(" leader"); - if (sync_role & SYNC_ROLE_PROVIDER) - sn.append(" provider"); - if (sync_role & SYNC_ROLE_REQUESTER) - sn.append(" requester"); + void handle_sync(MMonSync *m); - sn.append(" state "); - sn.append(get_sync_state_name(sync_state)); + void _sync_reply_no_cookie(MMonSync *m); - sn.append(" )"); + void handle_sync_get_cookie(MMonSync *m); + void handle_sync_get_chunk(MMonSync *m); + void handle_sync_finish(MMonSync *m); - return sn; - } + void handle_sync_cookie(MMonSync *m); + void handle_sync_forward(MMonSync *m); + void handle_sync_chunk(MMonSync *m); + void handle_sync_no_cookie(MMonSync *m); /** * @} // Synchronization @@ -1288,8 +572,6 @@ public: bool _allowed_command(MonSession *s, map<std::string, cmd_vartype>& cmd); void _mon_status(ostream& ss); void _quorum_status(ostream& ss); - void _sync_status(ostream& ss); - void _sync_force(ostream& ss); void _add_bootstrap_peer_hint(string cmd, string args, ostream& ss); void handle_command(class MMonCommand *m); void handle_route(MRoute *m); diff --git a/src/mon/MonitorDBStore.h b/src/mon/MonitorDBStore.h index 8405dbed2af..60a648de19b 100644 --- a/src/mon/MonitorDBStore.h +++ b/src/mon/MonitorDBStore.h @@ -251,7 +251,8 @@ class MonitorDBStore bool add_chunk_entry(Transaction &tx, string &prefix, string &key, - bufferlist &value) { + bufferlist &value, + uint64_t max) { Transaction tmp; bufferlist tmp_bl; tmp.put(prefix, key, value); @@ -262,7 +263,7 @@ class MonitorDBStore size_t len = tx_bl.length() + tmp_bl.length(); - if (!tx.empty() && (len > g_conf->mon_sync_max_payload_size)) { + if (!tx.empty() && (len > max)) { return false; } @@ -279,7 +280,6 @@ class MonitorDBStore return true; } - virtual void _get_chunk(Transaction &tx) = 0; virtual bool _is_valid() = 0; public: @@ -294,12 +294,7 @@ class MonitorDBStore virtual bool has_next_chunk() { return !done && _is_valid(); } - virtual void get_chunk(bufferlist &bl) { - Transaction tx; - _get_chunk(tx); - if (!tx.empty()) - tx.encode(bl); - } + virtual void get_chunk_tx(Transaction &tx, uint64_t max) = 0; virtual pair<string,string> get_next_key() = 0; }; typedef std::tr1::shared_ptr<StoreIteratorImpl> Synchronizer; @@ -327,7 +322,7 @@ class MonitorDBStore * differ from the one passed on to the function) * @param last_key[out] Last key in the chunk */ - virtual void _get_chunk(Transaction &tx) { + virtual void get_chunk_tx(Transaction &tx, uint64_t max) { assert(done == false); assert(iter->valid() == true); @@ -336,7 +331,7 @@ class MonitorDBStore string key(iter->raw_key().second); if (sync_prefixes.count(prefix)) { bufferlist value = iter->value(); - if (!add_chunk_entry(tx, prefix, key, value)) + if (!add_chunk_entry(tx, prefix, key, value, max)) return; } iter->next(); @@ -359,49 +354,6 @@ class MonitorDBStore } }; - class SinglePrefixStoreIteratorImpl : public StoreIteratorImpl { - KeyValueDB::Iterator iter; - string prefix; - - public: - SinglePrefixStoreIteratorImpl(KeyValueDB::Iterator iter, string prefix) - : StoreIteratorImpl(), - iter(iter), - prefix(prefix) - { } - - virtual ~SinglePrefixStoreIteratorImpl() { } - - private: - virtual void _get_chunk(Transaction &tx) { - assert(done == false); - assert(iter->valid() == true); - - while (iter->valid()) { - string key(iter->key()); - bufferlist value = iter->value(); - if (!add_chunk_entry(tx, prefix, key, value)) - return; - iter->next(); - } - assert(iter->valid() == false); - done = true; - } - - virtual pair<string,string> get_next_key() { - // this method is only used by scrub on the whole store - // iterator. also, the single prefix iterator has been dropped - // in later code. we leave this here only for the benefit of - // backporting. - assert(0 == "this should not get called"); - return make_pair(string(), string()); - } - - virtual bool _is_valid() { - return iter->valid(); - } - }; - Synchronizer get_synchronizer(pair<string,string> &key, set<string> &prefixes) { KeyValueDB::WholeSpaceIterator iter; @@ -417,18 +369,6 @@ class MonitorDBStore ); } - Synchronizer get_synchronizer(string &prefix) { - assert(!prefix.empty()); - - KeyValueDB::Iterator iter; - iter = db->get_snapshot_iterator(prefix); - iter->seek_to_first(); - - return std::tr1::shared_ptr<StoreIteratorImpl>( - new SinglePrefixStoreIteratorImpl(iter, prefix) - ); - } - KeyValueDB::Iterator get_iterator(const string &prefix) { assert(!prefix.empty()); KeyValueDB::Iterator iter = db->get_snapshot_iterator(prefix); diff --git a/src/mon/PGMonitor.cc b/src/mon/PGMonitor.cc index efe41e1cb74..6c9effccd6e 100644 --- a/src/mon/PGMonitor.cc +++ b/src/mon/PGMonitor.cc @@ -316,7 +316,7 @@ void PGMonitor::read_pgmap_meta() { dout(10) << __func__ << dendl; - string prefix = "pgmap_meta"; + string prefix = pgmap_meta_prefix; version_t version = mon->store->get(prefix, "version"); epoch_t last_osdmap_epoch = mon->store->get(prefix, "last_osdmap_epoch"); @@ -358,7 +358,7 @@ void PGMonitor::read_pgmap_full() { read_pgmap_meta(); - string prefix = "pgmap_pg"; + string prefix = pgmap_pg_prefix; for (KeyValueDB::Iterator i = mon->store->get_iterator(prefix); i->valid(); i->next()) { string key = i->key(); pg_t pgid; @@ -371,7 +371,7 @@ void PGMonitor::read_pgmap_full() dout(20) << " got " << pgid << dendl; } - prefix = "pgmap_osd"; + prefix = pgmap_osd_prefix; for (KeyValueDB::Iterator i = mon->store->get_iterator(prefix); i->valid(); i->next()) { string key = i->key(); int osd = atoi(key.c_str()); @@ -403,7 +403,7 @@ void PGMonitor::apply_pgmap_delta(bufferlist& bl) ::decode(pgid, p); dout(20) << " refreshing pg " << pgid << dendl; bufferlist bl; - int r = mon->store->get("pgmap_pg", stringify(pgid), bl); + int r = mon->store->get(pgmap_pg_prefix, stringify(pgid), bl); if (r >= 0) { pg_map.update_pg(pgid, bl); } else { @@ -418,7 +418,7 @@ void PGMonitor::apply_pgmap_delta(bufferlist& bl) ::decode(osd, p); dout(20) << " refreshing osd." << osd << dendl; bufferlist bl; - int r = mon->store->get("pgmap_osd", stringify(osd), bl); + int r = mon->store->get(pgmap_osd_prefix, stringify(osd), bl); if (r >= 0) { pg_map.update_osd(osd, bl); } else { @@ -442,7 +442,7 @@ void PGMonitor::encode_pending(MonitorDBStore::Transaction *t) uint64_t features = mon->get_quorum_features(); - string prefix = "pgmap_meta"; + string prefix = pgmap_meta_prefix; t->put(prefix, "version", pending_inc.version); { @@ -470,7 +470,7 @@ void PGMonitor::encode_pending(MonitorDBStore::Transaction *t) ::encode(pending_inc.stamp, incbl); { bufferlist dirty; - string prefix = "pgmap_pg"; + string prefix = pgmap_pg_prefix; for (map<pg_t,pg_stat_t>::const_iterator p = pending_inc.pg_stat_updates.begin(); p != pending_inc.pg_stat_updates.end(); ++p) { @@ -487,7 +487,7 @@ void PGMonitor::encode_pending(MonitorDBStore::Transaction *t) } { bufferlist dirty; - string prefix = "pgmap_osd"; + string prefix = pgmap_osd_prefix; for (map<int32_t,osd_stat_t>::const_iterator p = pending_inc.osd_stat_updates.begin(); p != pending_inc.osd_stat_updates.end(); ++p) { diff --git a/src/mon/PGMonitor.h b/src/mon/PGMonitor.h index e9b8f217255..271d0e1161d 100644 --- a/src/mon/PGMonitor.h +++ b/src/mon/PGMonitor.h @@ -53,6 +53,10 @@ public: private: PGMap::Incremental pending_inc; + const char *pgmap_meta_prefix; + const char *pgmap_pg_prefix; + const char *pgmap_osd_prefix; + void create_initial(); void update_from_paxos(bool *need_bootstrap); void upgrade_format(); @@ -146,10 +150,20 @@ public: PGMonitor(Monitor *mn, Paxos *p, const string& service_name) : PaxosService(mn, p, service_name), need_check_down_pgs(false), - last_map_pg_create_osd_epoch(0) + last_map_pg_create_osd_epoch(0), + pgmap_meta_prefix("pgmap_meta"), + pgmap_pg_prefix("pgmap_pg"), + pgmap_osd_prefix("pgmap_osd") { } ~PGMonitor() { } + virtual void get_store_prefixes(set<string>& s) { + s.insert(get_service_name()); + s.insert(pgmap_meta_prefix); + s.insert(pgmap_pg_prefix); + s.insert(pgmap_osd_prefix); + } + virtual void on_restart(); /* Courtesy function provided by PaxosService, called when an election diff --git a/src/mon/Paxos.cc b/src/mon/Paxos.cc index da00e400113..7779360ca33 100644 --- a/src/mon/Paxos.cc +++ b/src/mon/Paxos.cc @@ -43,36 +43,18 @@ MonitorDBStore *Paxos::get_store() return mon->store; } - -void Paxos::apply_version(MonitorDBStore::Transaction &tx, version_t v) -{ - bufferlist bl; - int err = get_store()->get(get_name(), v, bl); - assert(err == 0); - assert(bl.length()); - decode_append_transaction(tx, bl); -} - -void Paxos::reapply_all_versions() +void Paxos::read_and_prepare_transactions(MonitorDBStore::Transaction *tx, version_t first, version_t last) { - version_t first = get_store()->get(get_name(), "first_committed"); - version_t last = get_store()->get(get_name(), "last_committed"); dout(10) << __func__ << " first " << first << " last " << last << dendl; - - MonitorDBStore::Transaction tx; for (version_t v = first; v <= last; ++v) { dout(30) << __func__ << " apply version " << v << dendl; - apply_version(tx, v); + bufferlist bl; + int err = get_store()->get(get_name(), v, bl); + assert(err == 0); + assert(bl.length()); + decode_append_transaction(*tx, bl); } dout(15) << __func__ << " total versions " << (last-first) << dendl; - - dout(30) << __func__ << " tx dump:\n"; - JSONFormatter f(true); - tx.dump(&f); - f.flush(*_dout); - *_dout << dendl; - - get_store()->apply_transaction(tx); } void Paxos::init() @@ -88,6 +70,7 @@ void Paxos::init() << " first_committed: " << first_committed << dendl; dout(10) << "init" << dendl; + assert(is_consistent()); } // --------------------------------- @@ -970,8 +953,8 @@ void Paxos::lease_renew_timeout() void Paxos::trim() { assert(should_trim()); - version_t end = MIN(get_version() - g_conf->paxos_max_join_drift, - get_first_committed() + g_conf->paxos_trim_max); + version_t end = MIN(get_version() - g_conf->paxos_min, + get_first_committed() + g_conf->paxos_trim_max); if (first_committed >= end) return; @@ -1003,17 +986,6 @@ void Paxos::trim() queue_proposal(bl, new C_Trimmed(this)); } -void Paxos::trim_enable() -{ - trim_disabled_version = 0; - // We may not be the leader when we reach this function. We sure must - // have been the leader at some point, but we may have been demoted and - // we really should reset 'trim_disabled_version' if that was the case. - // So, make sure we only trim() iff we are the leader. - if (mon->is_leader() && should_trim()) - trim(); -} - /* * return a globally unique, monotonically increasing proposal number */ @@ -1027,7 +999,7 @@ version_t Paxos::get_new_proposal_number(version_t gt) last_pn++; last_pn *= 100; last_pn += (version_t)mon->rank; - + // write MonitorDBStore::Transaction t; t.put(get_name(), "last_pn", last_pn); diff --git a/src/mon/Paxos.h b/src/mon/Paxos.h index 9be4713a282..1cdad50e5bb 100644 --- a/src/mon/Paxos.h +++ b/src/mon/Paxos.h @@ -530,12 +530,6 @@ private: * trimming; false otherwise. */ bool trimming; - /** - * If we have disabled trimming our state, this variable should have a - * value greater than zero, corresponding to the version we had at the time - * we disabled the trim. - */ - version_t trim_disabled_version; /** * @defgroup Paxos_h_callbacks Callback classes. @@ -1013,8 +1007,7 @@ public: lease_timeout_event(0), accept_timeout_event(0), clock_drift_warned(0), - trimming(false), - trim_disabled_version(0) { } + trimming(false) { } const string get_name() const { return paxos_name; @@ -1022,8 +1015,7 @@ public: void dispatch(PaxosServiceMessage *m); - void reapply_all_versions(); - void apply_version(MonitorDBStore::Transaction &tx, version_t v); + void read_and_prepare_transactions(MonitorDBStore::Transaction *tx, version_t from, version_t last); void init(); /** @@ -1137,26 +1129,6 @@ public: void trim(); /** - * Disable trimming - * - * This is required by the Monitor's store synchronization mechanisms - * to guarantee a consistent store state. - */ - void trim_disable() { - if (!trim_disabled_version) - trim_disabled_version = get_version(); - } - /** - * Enable trimming - */ - void trim_enable(); - /** - * Check if trimming has been disabled - * - * @returns true if trim has been disabled; false otherwise. - */ - bool is_trim_disabled() { return (trim_disabled_version > 0); } - /** * Check if we should trim. * * If trimming is disabled, we must take that into consideration and only @@ -1165,18 +1137,12 @@ public: * @returns true if we should trim; false otherwise. */ bool should_trim() { - int available_versions = (get_version() - get_first_committed()); - int maximum_versions = - (g_conf->paxos_max_join_drift + g_conf->paxos_trim_min); + int available_versions = get_version() - get_first_committed(); + int maximum_versions = g_conf->paxos_min + g_conf->paxos_trim_min; if (trimming || (available_versions <= maximum_versions)) return false; - if (trim_disabled_version > 0) { - int disabled_versions = (get_version() - trim_disabled_version); - if (disabled_versions < g_conf->paxos_trim_disabled_max_versions) - return false; - } return true; } diff --git a/src/mon/PaxosService.h b/src/mon/PaxosService.h index 90b7e67c411..6c8d9c0bcc4 100644 --- a/src/mon/PaxosService.h +++ b/src/mon/PaxosService.h @@ -210,6 +210,13 @@ public: * @returns The service's name. */ string get_service_name() { return service_name; } + + /** + * Get the store prefixes we utilize + */ + virtual void get_store_prefixes(set<string>& s) { + s.insert(service_name); + } // i implement and you ignore /** |