diff options
author | Sage Weil <sage@inktank.com> | 2013-10-03 21:23:05 -0700 |
---|---|---|
committer | Sage Weil <sage@inktank.com> | 2013-10-03 21:23:05 -0700 |
commit | 6afb16e52b6a9d83fa4729aa2341ee3bbe1eb276 (patch) | |
tree | 33b4be716d2fba8cf7dd0902e17179edf6a598a3 | |
parent | aacd67e07174ca1e8687e78dcec1a2f68e3bdf5f (diff) | |
parent | e9e64545d5300364e4775ec68b4a6b9ca76b505b (diff) | |
download | ceph-6afb16e52b6a9d83fa4729aa2341ee3bbe1eb276.tar.gz |
Merge remote-tracking branch 'gh/next'
Conflicts:
src/Makefile.am
-rw-r--r-- | src/mon/Monitor.cc | 206 | ||||
-rw-r--r-- | src/mon/Monitor.h | 2 | ||||
-rw-r--r-- | src/mon/MonmapMonitor.cc | 39 | ||||
-rw-r--r-- | src/mon/PGMap.cc | 3 | ||||
-rw-r--r-- | src/test/ObjectMap/test_store_tool/test_store_tool.cc | 90 |
5 files changed, 234 insertions, 106 deletions
diff --git a/src/mon/Monitor.cc b/src/mon/Monitor.cc index 2c64a8f2ef2..3fe658d9623 100644 --- a/src/mon/Monitor.cc +++ b/src/mon/Monitor.cc @@ -2561,67 +2561,98 @@ bool Monitor::_ms_dispatch(Message *m) EntityName entity_name; bool src_is_mon; - src_is_mon = !connection || (connection->get_peer_type() & CEPH_ENTITY_TYPE_MON); - - if (connection) { - bool reuse_caps = false; - dout(20) << "have connection" << dendl; - s = static_cast<MonSession *>(connection->get_priv()); - if (s && s->closed) { - caps = s->caps; - reuse_caps = true; - s->put(); - s = NULL; + // regardless of who we are or who the sender is, the message must + // have a connection associated. If it doesn't then something fishy + // is going on. + assert(connection); + + src_is_mon = (connection->get_peer_type() & CEPH_ENTITY_TYPE_MON); + + bool reuse_caps = false; + dout(20) << "have connection" << dendl; + s = static_cast<MonSession *>(connection->get_priv()); + if (s && s->closed) { + caps = s->caps; + reuse_caps = true; + s->put(); + s = NULL; + } + if (!s) { + // if the sender is not a monitor, make sure their first message for a + // session is an MAuth. If it is not, assume it's a stray message, + // and considering that we are creating a new session it is safe to + // assume that the sender hasn't authenticated yet, so we have no way + // of assessing whether we should handle it or not. + if (!src_is_mon && m->get_type() != CEPH_MSG_AUTH) { + dout(1) << __func__ << " dropping stray message " << *m + << " from " << m->get_source_inst() << dendl; + m->put(); + return false; } - if (!s) { - if (!exited_quorum.is_zero() && !src_is_mon) { - waitlist_or_zap_client(m); - return true; - } - dout(10) << "do not have session, making new one" << dendl; - s = session_map.new_session(m->get_source_inst(), m->get_connection().get()); - m->get_connection()->set_priv(s->get()); - dout(10) << "ms_dispatch new session " << s << " for " << s->inst << dendl; - - if (m->get_connection()->get_peer_type() != CEPH_ENTITY_TYPE_MON) { - dout(10) << "setting timeout on session" << dendl; - // set an initial timeout here, so we will trim this session even if they don't - // do anything. - s->until = ceph_clock_now(g_ceph_context); - s->until += g_conf->mon_subscribe_interval; - } else { - //give it monitor caps; the peer type has been authenticated - reuse_caps = false; - dout(5) << "setting monitor caps on this connection" << dendl; - if (!s->caps.is_allow_all()) //but no need to repeatedly copy - s->caps = *mon_caps; - } - if (reuse_caps) - s->caps = caps; + + if (!exited_quorum.is_zero() && !src_is_mon) { + waitlist_or_zap_client(m); + return true; + } + + dout(10) << "do not have session, making new one" << dendl; + s = session_map.new_session(m->get_source_inst(), m->get_connection().get()); + m->get_connection()->set_priv(s->get()); + dout(10) << "ms_dispatch new session " << s << " for " << s->inst << dendl; + + if (!src_is_mon) { + dout(10) << "setting timeout on session" << dendl; + // set an initial timeout here, so we will trim this session even if they don't + // do anything. + s->until = ceph_clock_now(g_ceph_context); + s->until += g_conf->mon_subscribe_interval; } else { - dout(20) << "ms_dispatch existing session " << s << " for " << s->inst << dendl; + //give it monitor caps; the peer type has been authenticated + reuse_caps = false; + dout(5) << "setting monitor caps on this connection" << dendl; + if (!s->caps.is_allow_all()) //but no need to repeatedly copy + s->caps = *mon_caps; } + if (reuse_caps) + s->caps = caps; + } else { + dout(20) << "ms_dispatch existing session " << s << " for " << s->inst << dendl; + } + + if (s) { if (s->auth_handler) { entity_name = s->auth_handler->get_entity_name(); } - } - - if (s) dout(20) << " caps " << s->caps.get_str() << dendl; + } if (is_synchronizing() && !src_is_mon) { waitlist_or_zap_client(m); return true; } - { - switch (m->get_type()) { - + ret = dispatch(s, m, src_is_mon); + + if (s) { + s->put(); + } + + return ret; +} + +bool Monitor::dispatch(MonSession *s, Message *m, const bool src_is_mon) +{ + bool ret = true; + + assert(m != NULL); + + switch (m->get_type()) { + case MSG_ROUTE: handle_route(static_cast<MRoute*>(m)); break; - // misc + // misc case CEPH_MSG_MON_GET_MAP: handle_mon_get_map(static_cast<MMonGetMap*>(m)); break; @@ -2647,12 +2678,11 @@ bool Monitor::_ms_dispatch(Message *m) case MSG_MON_SYNC: handle_sync(static_cast<MMonSync*>(m)); break; - case MSG_MON_SCRUB: handle_scrub(static_cast<MMonScrub*>(m)); break; - // OSDs + // OSDs case MSG_OSD_MARK_ME_DOWN: case MSG_OSD_FAILURE: case MSG_OSD_BOOT: @@ -2665,20 +2695,20 @@ bool Monitor::_ms_dispatch(Message *m) paxos_service[PAXOS_OSDMAP]->dispatch((PaxosServiceMessage*)m); break; - // MDSs + // MDSs case MSG_MDS_BEACON: case MSG_MDS_OFFLOAD_TARGETS: paxos_service[PAXOS_MDSMAP]->dispatch((PaxosServiceMessage*)m); break; - // auth + // auth case MSG_MON_GLOBAL_ID: case CEPH_MSG_AUTH: /* no need to check caps here */ paxos_service[PAXOS_AUTH]->dispatch((PaxosServiceMessage*)m); break; - // pg + // pg case CEPH_MSG_STATFS: case MSG_PGSTATS: case MSG_GETPOOLSTATS: @@ -2689,7 +2719,7 @@ bool Monitor::_ms_dispatch(Message *m) paxos_service[PAXOS_OSDMAP]->dispatch((PaxosServiceMessage*)m); break; - // log + // log case MSG_LOG: paxos_service[PAXOS_LOG]->dispatch((PaxosServiceMessage*)m); break; @@ -2698,60 +2728,60 @@ bool Monitor::_ms_dispatch(Message *m) clog.handle_log_ack((MLogAck*)m); break; - // monmap + // monmap case MSG_MON_JOIN: paxos_service[PAXOS_MONMAP]->dispatch((PaxosServiceMessage*)m); break; - // paxos + // paxos case MSG_MON_PAXOS: { - MMonPaxos *pm = static_cast<MMonPaxos*>(m); - if (!src_is_mon && - !s->is_capable("mon", MON_CAP_X)) { - //can't send these! - pm->put(); - break; - } + MMonPaxos *pm = static_cast<MMonPaxos*>(m); + if (!src_is_mon || + !s->is_capable("mon", MON_CAP_X)) { + //can't send these! + pm->put(); + break; + } - if (state == STATE_SYNCHRONIZING) { - // we are synchronizing. These messages would do us no - // good, thus just drop them and ignore them. - dout(10) << __func__ << " ignore paxos msg from " - << pm->get_source_inst() << dendl; - pm->put(); - break; - } + if (state == STATE_SYNCHRONIZING) { + // we are synchronizing. These messages would do us no + // good, thus just drop them and ignore them. + dout(10) << __func__ << " ignore paxos msg from " + << pm->get_source_inst() << dendl; + pm->put(); + break; + } - // sanitize - if (pm->epoch > get_epoch()) { - bootstrap(); - pm->put(); - break; - } - if (pm->epoch != get_epoch()) { - pm->put(); - break; - } + // sanitize + if (pm->epoch > get_epoch()) { + bootstrap(); + pm->put(); + break; + } + if (pm->epoch != get_epoch()) { + pm->put(); + break; + } - paxos->dispatch((PaxosServiceMessage*)m); + paxos->dispatch((PaxosServiceMessage*)m); } break; - // elector messages + // elector messages case MSG_MON_ELECTION: //check privileges here for simplicity if (s && - !s->is_capable("mon", MON_CAP_X)) { - dout(0) << "MMonElection received from entity without enough caps!" - << s->caps << dendl; - m->put(); - break; + !s->is_capable("mon", MON_CAP_X)) { + dout(0) << "MMonElection received from entity without enough caps!" + << s->caps << dendl; + m->put(); + break; } if (!is_probing() && !is_synchronizing()) { - elector.dispatch(m); + elector.dispatch(m); } else { - m->put(); + m->put(); } break; @@ -2769,10 +2799,6 @@ bool Monitor::_ms_dispatch(Message *m) default: ret = false; - } - } - if (s) { - s->put(); } return ret; diff --git a/src/mon/Monitor.h b/src/mon/Monitor.h index 9b304428732..2c1c2cdeb19 100644 --- a/src/mon/Monitor.h +++ b/src/mon/Monitor.h @@ -700,6 +700,8 @@ public: lock.Unlock(); return ret; } + // dissociate message handling from session and connection logic + bool dispatch(MonSession *s, Message *m, const bool src_is_mon); //mon_caps is used for un-connected messages from monitors MonCap * mon_caps; bool ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, bool force_new); diff --git a/src/mon/MonmapMonitor.cc b/src/mon/MonmapMonitor.cc index 799f19df154..ca855592445 100644 --- a/src/mon/MonmapMonitor.cc +++ b/src/mon/MonmapMonitor.cc @@ -298,20 +298,45 @@ bool MonmapMonitor::prepare_command(MMonCommand *m) addr.set_port(CEPH_MON_PORT); } - if (pending_map.contains(addr) || - pending_map.contains(name)) { + /** + * If we have a monitor with the same name and different addr, then EEXIST + * If we have a monitor with the same addr and different name, then EEXIST + * If we have a monitor with the same addr and same name, then return as if + * we had just added the monitor. + * If we don't have the monitor, add it. + */ + + err = 0; + if (!ss.str().empty()) + ss << "; "; + + do { + if (pending_map.contains(addr)) { + string n = pending_map.get_name(addr); + if (n == name) + break; + } else if (pending_map.contains(name)) { + entity_addr_t tmp_addr = pending_map.get_addr(name); + if (tmp_addr == addr) + break; + } else { + break; + } err = -EEXIST; - if (!ss.str().empty()) - ss << "; "; - ss << "mon " << name << " " << addr << " already exists"; + ss << "mon." << name << " at " << addr << " already exists"; + goto out; + } while (false); + + ss << "added mon." << name << " at " << addr; + if (pending_map.contains(name)) { goto out; } pending_map.add(name, addr); pending_map.last_changed = ceph_clock_now(g_ceph_context); - ss << "added mon." << name << " at " << addr; getline(ss, rs); - wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs, get_last_committed())); + wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs, + get_last_committed())); return true; } else if (prefix == "mon remove") { diff --git a/src/mon/PGMap.cc b/src/mon/PGMap.cc index e9a35c6b8ab..4be39aba902 100644 --- a/src/mon/PGMap.cc +++ b/src/mon/PGMap.cc @@ -701,7 +701,8 @@ void PGMap::dump_stuck_plain(ostream& ss, PGMap::StuckPG type, utime_t cutoff) c { hash_map<pg_t, pg_stat_t> stuck_pg_stats; get_stuck_stats(type, cutoff, stuck_pg_stats); - dump_pg_stats_plain(ss, stuck_pg_stats); + if (!stuck_pg_stats.empty()) + dump_pg_stats_plain(ss, stuck_pg_stats); } void PGMap::dump_osd_perf_stats(Formatter *f) const diff --git a/src/test/ObjectMap/test_store_tool/test_store_tool.cc b/src/test/ObjectMap/test_store_tool/test_store_tool.cc index f81598ccfb8..8fcf3f30e82 100644 --- a/src/test/ObjectMap/test_store_tool/test_store_tool.cc +++ b/src/test/ObjectMap/test_store_tool/test_store_tool.cc @@ -24,6 +24,7 @@ #include "common/errno.h" #include "common/safe_io.h" #include "common/config.h" +#include "common/strtol.h" using namespace std; @@ -38,7 +39,7 @@ class StoreTool db.reset(db_ptr); } - void list(const string &prefix) { + void list(const string &prefix, const bool do_crc) { KeyValueDB::WholeSpaceIterator iter = db->get_iterator(); if (prefix.empty()) @@ -51,7 +52,11 @@ class StoreTool if (!prefix.empty() && (rk.first != prefix)) break; - std::cout << rk.first << ":" << rk.second << std::endl; + std::cout << rk.first << ":" << rk.second; + if (do_crc) { + std::cout << " (" << iter->value().crc32c(0) << ")"; + } + std::cout << std::endl; iter->next(); } } @@ -79,7 +84,7 @@ class StoreTool assert(!prefix.empty() && !key.empty()); map<string,bufferlist> result; - set<string> keys; + std::set<std::string> keys; keys.insert(key); db->get(prefix, keys, &result); @@ -101,6 +106,18 @@ class StoreTool std::cout << "total: " << s << std::endl; return s; } + + bool set(const string &prefix, const string &key, bufferlist &val) { + assert(!prefix.empty()); + assert(!key.empty()); + assert(val.length() > 0); + + KeyValueDB::Transaction tx = db->get_transaction(); + tx->set(prefix, key, val); + int ret = db->submit_transaction_sync(tx); + + return (ret == 0); + } }; void usage(const char *pname) @@ -109,10 +126,12 @@ void usage(const char *pname) << "\n" << "Commands:\n" << " list [prefix]\n" + << " list-crc [prefix]\n" << " exists <prefix> [key]\n" << " get <prefix> <key>\n" - << " verify <store path>\n" + << " crc <prefix> <key>\n" << " get-size\n" + << " set <prefix> <key> [ver <N>|in <file>]\n" << std::endl; } @@ -140,12 +159,14 @@ int main(int argc, const char *argv[]) StoreTool st(path); - if (cmd == "list") { + if (cmd == "list" || cmd == "list-crc") { string prefix; if (argc > 3) prefix = argv[3]; - st.list(prefix); + bool do_crc = (cmd == "list-crc"); + + st.list(prefix, do_crc); } else if (cmd == "exists") { string key; @@ -183,10 +204,63 @@ int main(int argc, const char *argv[]) bl.hexdump(os); std::cout << os.str() << std::endl; - } else if (cmd == "verify") { - assert(0); + } else if (cmd == "crc") { + if (argc < 5) { + usage(argv[0]); + return 1; + } + string prefix(argv[3]); + string key(argv[4]); + + bool exists = false; + bufferlist bl = st.get(prefix, key, exists); + std::cout << "(" << prefix << ", " << key << ") "; + if (!exists) { + std::cout << " does not exist" << std::endl; + return 1; + } + std::cout << " crc " << bl.crc32c(0) << std::endl; + } else if (cmd == "get-size") { std::cout << "estimated store size: " << st.get_size() << std::endl; + + } else if (cmd == "set") { + if (argc < 7) { + usage(argv[0]); + return 1; + } + string prefix(argv[3]); + string key(argv[4]); + string subcmd(argv[5]); + + bufferlist val; + string errstr; + if (subcmd == "ver") { + version_t v = (version_t) strict_strtoll(argv[6], 10, &errstr); + if (!errstr.empty()) { + std::cerr << "error reading version: " << errstr << std::endl; + return 1; + } + ::encode(v, val); + } else if (subcmd == "in") { + int ret = val.read_file(argv[6], &errstr); + if (ret < 0 || !errstr.empty()) { + std::cerr << "error reading file: " << errstr << std::endl; + return 1; + } + } else { + std::cerr << "unrecognized subcommand '" << subcmd << "'" << std::endl; + usage(argv[0]); + return 1; + } + + bool ret = st.set(prefix, key, val); + if (!ret) { + std::cerr << "error setting (" + << prefix << "," << key << ")" << std::endl; + return 1; + } + } else { std::cerr << "Unrecognized command: " << cmd << std::endl; return 1; |