summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSage Weil <sage@inktank.com>2013-10-03 21:23:05 -0700
committerSage Weil <sage@inktank.com>2013-10-03 21:23:05 -0700
commit6afb16e52b6a9d83fa4729aa2341ee3bbe1eb276 (patch)
tree33b4be716d2fba8cf7dd0902e17179edf6a598a3
parentaacd67e07174ca1e8687e78dcec1a2f68e3bdf5f (diff)
parente9e64545d5300364e4775ec68b4a6b9ca76b505b (diff)
downloadceph-6afb16e52b6a9d83fa4729aa2341ee3bbe1eb276.tar.gz
Merge remote-tracking branch 'gh/next'
Conflicts: src/Makefile.am
-rw-r--r--src/mon/Monitor.cc206
-rw-r--r--src/mon/Monitor.h2
-rw-r--r--src/mon/MonmapMonitor.cc39
-rw-r--r--src/mon/PGMap.cc3
-rw-r--r--src/test/ObjectMap/test_store_tool/test_store_tool.cc90
5 files changed, 234 insertions, 106 deletions
diff --git a/src/mon/Monitor.cc b/src/mon/Monitor.cc
index 2c64a8f2ef2..3fe658d9623 100644
--- a/src/mon/Monitor.cc
+++ b/src/mon/Monitor.cc
@@ -2561,67 +2561,98 @@ bool Monitor::_ms_dispatch(Message *m)
EntityName entity_name;
bool src_is_mon;
- src_is_mon = !connection || (connection->get_peer_type() & CEPH_ENTITY_TYPE_MON);
-
- if (connection) {
- bool reuse_caps = false;
- dout(20) << "have connection" << dendl;
- s = static_cast<MonSession *>(connection->get_priv());
- if (s && s->closed) {
- caps = s->caps;
- reuse_caps = true;
- s->put();
- s = NULL;
+ // regardless of who we are or who the sender is, the message must
+ // have a connection associated. If it doesn't then something fishy
+ // is going on.
+ assert(connection);
+
+ src_is_mon = (connection->get_peer_type() & CEPH_ENTITY_TYPE_MON);
+
+ bool reuse_caps = false;
+ dout(20) << "have connection" << dendl;
+ s = static_cast<MonSession *>(connection->get_priv());
+ if (s && s->closed) {
+ caps = s->caps;
+ reuse_caps = true;
+ s->put();
+ s = NULL;
+ }
+ if (!s) {
+ // if the sender is not a monitor, make sure their first message for a
+ // session is an MAuth. If it is not, assume it's a stray message,
+ // and considering that we are creating a new session it is safe to
+ // assume that the sender hasn't authenticated yet, so we have no way
+ // of assessing whether we should handle it or not.
+ if (!src_is_mon && m->get_type() != CEPH_MSG_AUTH) {
+ dout(1) << __func__ << " dropping stray message " << *m
+ << " from " << m->get_source_inst() << dendl;
+ m->put();
+ return false;
}
- if (!s) {
- if (!exited_quorum.is_zero() && !src_is_mon) {
- waitlist_or_zap_client(m);
- return true;
- }
- dout(10) << "do not have session, making new one" << dendl;
- s = session_map.new_session(m->get_source_inst(), m->get_connection().get());
- m->get_connection()->set_priv(s->get());
- dout(10) << "ms_dispatch new session " << s << " for " << s->inst << dendl;
-
- if (m->get_connection()->get_peer_type() != CEPH_ENTITY_TYPE_MON) {
- dout(10) << "setting timeout on session" << dendl;
- // set an initial timeout here, so we will trim this session even if they don't
- // do anything.
- s->until = ceph_clock_now(g_ceph_context);
- s->until += g_conf->mon_subscribe_interval;
- } else {
- //give it monitor caps; the peer type has been authenticated
- reuse_caps = false;
- dout(5) << "setting monitor caps on this connection" << dendl;
- if (!s->caps.is_allow_all()) //but no need to repeatedly copy
- s->caps = *mon_caps;
- }
- if (reuse_caps)
- s->caps = caps;
+
+ if (!exited_quorum.is_zero() && !src_is_mon) {
+ waitlist_or_zap_client(m);
+ return true;
+ }
+
+ dout(10) << "do not have session, making new one" << dendl;
+ s = session_map.new_session(m->get_source_inst(), m->get_connection().get());
+ m->get_connection()->set_priv(s->get());
+ dout(10) << "ms_dispatch new session " << s << " for " << s->inst << dendl;
+
+ if (!src_is_mon) {
+ dout(10) << "setting timeout on session" << dendl;
+ // set an initial timeout here, so we will trim this session even if they don't
+ // do anything.
+ s->until = ceph_clock_now(g_ceph_context);
+ s->until += g_conf->mon_subscribe_interval;
} else {
- dout(20) << "ms_dispatch existing session " << s << " for " << s->inst << dendl;
+ //give it monitor caps; the peer type has been authenticated
+ reuse_caps = false;
+ dout(5) << "setting monitor caps on this connection" << dendl;
+ if (!s->caps.is_allow_all()) //but no need to repeatedly copy
+ s->caps = *mon_caps;
}
+ if (reuse_caps)
+ s->caps = caps;
+ } else {
+ dout(20) << "ms_dispatch existing session " << s << " for " << s->inst << dendl;
+ }
+
+ if (s) {
if (s->auth_handler) {
entity_name = s->auth_handler->get_entity_name();
}
- }
-
- if (s)
dout(20) << " caps " << s->caps.get_str() << dendl;
+ }
if (is_synchronizing() && !src_is_mon) {
waitlist_or_zap_client(m);
return true;
}
- {
- switch (m->get_type()) {
-
+ ret = dispatch(s, m, src_is_mon);
+
+ if (s) {
+ s->put();
+ }
+
+ return ret;
+}
+
+bool Monitor::dispatch(MonSession *s, Message *m, const bool src_is_mon)
+{
+ bool ret = true;
+
+ assert(m != NULL);
+
+ switch (m->get_type()) {
+
case MSG_ROUTE:
handle_route(static_cast<MRoute*>(m));
break;
- // misc
+ // misc
case CEPH_MSG_MON_GET_MAP:
handle_mon_get_map(static_cast<MMonGetMap*>(m));
break;
@@ -2647,12 +2678,11 @@ bool Monitor::_ms_dispatch(Message *m)
case MSG_MON_SYNC:
handle_sync(static_cast<MMonSync*>(m));
break;
-
case MSG_MON_SCRUB:
handle_scrub(static_cast<MMonScrub*>(m));
break;
- // OSDs
+ // OSDs
case MSG_OSD_MARK_ME_DOWN:
case MSG_OSD_FAILURE:
case MSG_OSD_BOOT:
@@ -2665,20 +2695,20 @@ bool Monitor::_ms_dispatch(Message *m)
paxos_service[PAXOS_OSDMAP]->dispatch((PaxosServiceMessage*)m);
break;
- // MDSs
+ // MDSs
case MSG_MDS_BEACON:
case MSG_MDS_OFFLOAD_TARGETS:
paxos_service[PAXOS_MDSMAP]->dispatch((PaxosServiceMessage*)m);
break;
- // auth
+ // auth
case MSG_MON_GLOBAL_ID:
case CEPH_MSG_AUTH:
/* no need to check caps here */
paxos_service[PAXOS_AUTH]->dispatch((PaxosServiceMessage*)m);
break;
- // pg
+ // pg
case CEPH_MSG_STATFS:
case MSG_PGSTATS:
case MSG_GETPOOLSTATS:
@@ -2689,7 +2719,7 @@ bool Monitor::_ms_dispatch(Message *m)
paxos_service[PAXOS_OSDMAP]->dispatch((PaxosServiceMessage*)m);
break;
- // log
+ // log
case MSG_LOG:
paxos_service[PAXOS_LOG]->dispatch((PaxosServiceMessage*)m);
break;
@@ -2698,60 +2728,60 @@ bool Monitor::_ms_dispatch(Message *m)
clog.handle_log_ack((MLogAck*)m);
break;
- // monmap
+ // monmap
case MSG_MON_JOIN:
paxos_service[PAXOS_MONMAP]->dispatch((PaxosServiceMessage*)m);
break;
- // paxos
+ // paxos
case MSG_MON_PAXOS:
{
- MMonPaxos *pm = static_cast<MMonPaxos*>(m);
- if (!src_is_mon &&
- !s->is_capable("mon", MON_CAP_X)) {
- //can't send these!
- pm->put();
- break;
- }
+ MMonPaxos *pm = static_cast<MMonPaxos*>(m);
+ if (!src_is_mon ||
+ !s->is_capable("mon", MON_CAP_X)) {
+ //can't send these!
+ pm->put();
+ break;
+ }
- if (state == STATE_SYNCHRONIZING) {
- // we are synchronizing. These messages would do us no
- // good, thus just drop them and ignore them.
- dout(10) << __func__ << " ignore paxos msg from "
- << pm->get_source_inst() << dendl;
- pm->put();
- break;
- }
+ if (state == STATE_SYNCHRONIZING) {
+ // we are synchronizing. These messages would do us no
+ // good, thus just drop them and ignore them.
+ dout(10) << __func__ << " ignore paxos msg from "
+ << pm->get_source_inst() << dendl;
+ pm->put();
+ break;
+ }
- // sanitize
- if (pm->epoch > get_epoch()) {
- bootstrap();
- pm->put();
- break;
- }
- if (pm->epoch != get_epoch()) {
- pm->put();
- break;
- }
+ // sanitize
+ if (pm->epoch > get_epoch()) {
+ bootstrap();
+ pm->put();
+ break;
+ }
+ if (pm->epoch != get_epoch()) {
+ pm->put();
+ break;
+ }
- paxos->dispatch((PaxosServiceMessage*)m);
+ paxos->dispatch((PaxosServiceMessage*)m);
}
break;
- // elector messages
+ // elector messages
case MSG_MON_ELECTION:
//check privileges here for simplicity
if (s &&
- !s->is_capable("mon", MON_CAP_X)) {
- dout(0) << "MMonElection received from entity without enough caps!"
- << s->caps << dendl;
- m->put();
- break;
+ !s->is_capable("mon", MON_CAP_X)) {
+ dout(0) << "MMonElection received from entity without enough caps!"
+ << s->caps << dendl;
+ m->put();
+ break;
}
if (!is_probing() && !is_synchronizing()) {
- elector.dispatch(m);
+ elector.dispatch(m);
} else {
- m->put();
+ m->put();
}
break;
@@ -2769,10 +2799,6 @@ bool Monitor::_ms_dispatch(Message *m)
default:
ret = false;
- }
- }
- if (s) {
- s->put();
}
return ret;
diff --git a/src/mon/Monitor.h b/src/mon/Monitor.h
index 9b304428732..2c1c2cdeb19 100644
--- a/src/mon/Monitor.h
+++ b/src/mon/Monitor.h
@@ -700,6 +700,8 @@ public:
lock.Unlock();
return ret;
}
+ // dissociate message handling from session and connection logic
+ bool dispatch(MonSession *s, Message *m, const bool src_is_mon);
//mon_caps is used for un-connected messages from monitors
MonCap * mon_caps;
bool ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, bool force_new);
diff --git a/src/mon/MonmapMonitor.cc b/src/mon/MonmapMonitor.cc
index 799f19df154..ca855592445 100644
--- a/src/mon/MonmapMonitor.cc
+++ b/src/mon/MonmapMonitor.cc
@@ -298,20 +298,45 @@ bool MonmapMonitor::prepare_command(MMonCommand *m)
addr.set_port(CEPH_MON_PORT);
}
- if (pending_map.contains(addr) ||
- pending_map.contains(name)) {
+ /**
+ * If we have a monitor with the same name and different addr, then EEXIST
+ * If we have a monitor with the same addr and different name, then EEXIST
+ * If we have a monitor with the same addr and same name, then return as if
+ * we had just added the monitor.
+ * If we don't have the monitor, add it.
+ */
+
+ err = 0;
+ if (!ss.str().empty())
+ ss << "; ";
+
+ do {
+ if (pending_map.contains(addr)) {
+ string n = pending_map.get_name(addr);
+ if (n == name)
+ break;
+ } else if (pending_map.contains(name)) {
+ entity_addr_t tmp_addr = pending_map.get_addr(name);
+ if (tmp_addr == addr)
+ break;
+ } else {
+ break;
+ }
err = -EEXIST;
- if (!ss.str().empty())
- ss << "; ";
- ss << "mon " << name << " " << addr << " already exists";
+ ss << "mon." << name << " at " << addr << " already exists";
+ goto out;
+ } while (false);
+
+ ss << "added mon." << name << " at " << addr;
+ if (pending_map.contains(name)) {
goto out;
}
pending_map.add(name, addr);
pending_map.last_changed = ceph_clock_now(g_ceph_context);
- ss << "added mon." << name << " at " << addr;
getline(ss, rs);
- wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs, get_last_committed()));
+ wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs,
+ get_last_committed()));
return true;
} else if (prefix == "mon remove") {
diff --git a/src/mon/PGMap.cc b/src/mon/PGMap.cc
index e9a35c6b8ab..4be39aba902 100644
--- a/src/mon/PGMap.cc
+++ b/src/mon/PGMap.cc
@@ -701,7 +701,8 @@ void PGMap::dump_stuck_plain(ostream& ss, PGMap::StuckPG type, utime_t cutoff) c
{
hash_map<pg_t, pg_stat_t> stuck_pg_stats;
get_stuck_stats(type, cutoff, stuck_pg_stats);
- dump_pg_stats_plain(ss, stuck_pg_stats);
+ if (!stuck_pg_stats.empty())
+ dump_pg_stats_plain(ss, stuck_pg_stats);
}
void PGMap::dump_osd_perf_stats(Formatter *f) const
diff --git a/src/test/ObjectMap/test_store_tool/test_store_tool.cc b/src/test/ObjectMap/test_store_tool/test_store_tool.cc
index f81598ccfb8..8fcf3f30e82 100644
--- a/src/test/ObjectMap/test_store_tool/test_store_tool.cc
+++ b/src/test/ObjectMap/test_store_tool/test_store_tool.cc
@@ -24,6 +24,7 @@
#include "common/errno.h"
#include "common/safe_io.h"
#include "common/config.h"
+#include "common/strtol.h"
using namespace std;
@@ -38,7 +39,7 @@ class StoreTool
db.reset(db_ptr);
}
- void list(const string &prefix) {
+ void list(const string &prefix, const bool do_crc) {
KeyValueDB::WholeSpaceIterator iter = db->get_iterator();
if (prefix.empty())
@@ -51,7 +52,11 @@ class StoreTool
if (!prefix.empty() && (rk.first != prefix))
break;
- std::cout << rk.first << ":" << rk.second << std::endl;
+ std::cout << rk.first << ":" << rk.second;
+ if (do_crc) {
+ std::cout << " (" << iter->value().crc32c(0) << ")";
+ }
+ std::cout << std::endl;
iter->next();
}
}
@@ -79,7 +84,7 @@ class StoreTool
assert(!prefix.empty() && !key.empty());
map<string,bufferlist> result;
- set<string> keys;
+ std::set<std::string> keys;
keys.insert(key);
db->get(prefix, keys, &result);
@@ -101,6 +106,18 @@ class StoreTool
std::cout << "total: " << s << std::endl;
return s;
}
+
+ bool set(const string &prefix, const string &key, bufferlist &val) {
+ assert(!prefix.empty());
+ assert(!key.empty());
+ assert(val.length() > 0);
+
+ KeyValueDB::Transaction tx = db->get_transaction();
+ tx->set(prefix, key, val);
+ int ret = db->submit_transaction_sync(tx);
+
+ return (ret == 0);
+ }
};
void usage(const char *pname)
@@ -109,10 +126,12 @@ void usage(const char *pname)
<< "\n"
<< "Commands:\n"
<< " list [prefix]\n"
+ << " list-crc [prefix]\n"
<< " exists <prefix> [key]\n"
<< " get <prefix> <key>\n"
- << " verify <store path>\n"
+ << " crc <prefix> <key>\n"
<< " get-size\n"
+ << " set <prefix> <key> [ver <N>|in <file>]\n"
<< std::endl;
}
@@ -140,12 +159,14 @@ int main(int argc, const char *argv[])
StoreTool st(path);
- if (cmd == "list") {
+ if (cmd == "list" || cmd == "list-crc") {
string prefix;
if (argc > 3)
prefix = argv[3];
- st.list(prefix);
+ bool do_crc = (cmd == "list-crc");
+
+ st.list(prefix, do_crc);
} else if (cmd == "exists") {
string key;
@@ -183,10 +204,63 @@ int main(int argc, const char *argv[])
bl.hexdump(os);
std::cout << os.str() << std::endl;
- } else if (cmd == "verify") {
- assert(0);
+ } else if (cmd == "crc") {
+ if (argc < 5) {
+ usage(argv[0]);
+ return 1;
+ }
+ string prefix(argv[3]);
+ string key(argv[4]);
+
+ bool exists = false;
+ bufferlist bl = st.get(prefix, key, exists);
+ std::cout << "(" << prefix << ", " << key << ") ";
+ if (!exists) {
+ std::cout << " does not exist" << std::endl;
+ return 1;
+ }
+ std::cout << " crc " << bl.crc32c(0) << std::endl;
+
} else if (cmd == "get-size") {
std::cout << "estimated store size: " << st.get_size() << std::endl;
+
+ } else if (cmd == "set") {
+ if (argc < 7) {
+ usage(argv[0]);
+ return 1;
+ }
+ string prefix(argv[3]);
+ string key(argv[4]);
+ string subcmd(argv[5]);
+
+ bufferlist val;
+ string errstr;
+ if (subcmd == "ver") {
+ version_t v = (version_t) strict_strtoll(argv[6], 10, &errstr);
+ if (!errstr.empty()) {
+ std::cerr << "error reading version: " << errstr << std::endl;
+ return 1;
+ }
+ ::encode(v, val);
+ } else if (subcmd == "in") {
+ int ret = val.read_file(argv[6], &errstr);
+ if (ret < 0 || !errstr.empty()) {
+ std::cerr << "error reading file: " << errstr << std::endl;
+ return 1;
+ }
+ } else {
+ std::cerr << "unrecognized subcommand '" << subcmd << "'" << std::endl;
+ usage(argv[0]);
+ return 1;
+ }
+
+ bool ret = st.set(prefix, key, val);
+ if (!ret) {
+ std::cerr << "error setting ("
+ << prefix << "," << key << ")" << std::endl;
+ return 1;
+ }
+
} else {
std::cerr << "Unrecognized command: " << cmd << std::endl;
return 1;