summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSage Weil <sage@inktank.com>2013-10-03 11:32:50 -0700
committerSage Weil <sage@inktank.com>2013-10-03 11:32:50 -0700
commitde788bd687adf041203d469b0fceb4b2e8cb5b4f (patch)
treea115c196f45971b9524a0c7b5e9f661b83ae7afe
parent58d0a1f0dfb72536a38e0efe269d1cfcc2338964 (diff)
parentf1e23937a6386c7d1e23af115098fa81a2ca4230 (diff)
downloadceph-de788bd687adf041203d469b0fceb4b2e8cb5b4f.tar.gz
Merge pull request #653 from ceph/wip-mon-auth
mon: Monitor: dissociate msg handling from session and connection logic Reviewed-by: Sage Weil <sage@inktank.com>
-rw-r--r--src/mon/Monitor.cc206
-rw-r--r--src/mon/Monitor.h2
2 files changed, 118 insertions, 90 deletions
diff --git a/src/mon/Monitor.cc b/src/mon/Monitor.cc
index 10f5bfb149c..42cf3fa661d 100644
--- a/src/mon/Monitor.cc
+++ b/src/mon/Monitor.cc
@@ -2561,67 +2561,98 @@ bool Monitor::_ms_dispatch(Message *m)
EntityName entity_name;
bool src_is_mon;
- src_is_mon = !connection || (connection->get_peer_type() & CEPH_ENTITY_TYPE_MON);
-
- if (connection) {
- bool reuse_caps = false;
- dout(20) << "have connection" << dendl;
- s = static_cast<MonSession *>(connection->get_priv());
- if (s && s->closed) {
- caps = s->caps;
- reuse_caps = true;
- s->put();
- s = NULL;
+ // regardless of who we are or who the sender is, the message must
+ // have a connection associated. If it doesn't then something fishy
+ // is going on.
+ assert(connection);
+
+ src_is_mon = (connection->get_peer_type() & CEPH_ENTITY_TYPE_MON);
+
+ bool reuse_caps = false;
+ dout(20) << "have connection" << dendl;
+ s = static_cast<MonSession *>(connection->get_priv());
+ if (s && s->closed) {
+ caps = s->caps;
+ reuse_caps = true;
+ s->put();
+ s = NULL;
+ }
+ if (!s) {
+ // if the sender is not a monitor, make sure their first message for a
+ // session is an MAuth. If it is not, assume it's a stray message,
+ // and considering that we are creating a new session it is safe to
+ // assume that the sender hasn't authenticated yet, so we have no way
+ // of assessing whether we should handle it or not.
+ if (!src_is_mon && m->get_type() != CEPH_MSG_AUTH) {
+ dout(1) << __func__ << " dropping stray message " << *m
+ << " from " << m->get_source_inst() << dendl;
+ m->put();
+ return false;
}
- if (!s) {
- if (!exited_quorum.is_zero() && !src_is_mon) {
- waitlist_or_zap_client(m);
- return true;
- }
- dout(10) << "do not have session, making new one" << dendl;
- s = session_map.new_session(m->get_source_inst(), m->get_connection().get());
- m->get_connection()->set_priv(s->get());
- dout(10) << "ms_dispatch new session " << s << " for " << s->inst << dendl;
-
- if (m->get_connection()->get_peer_type() != CEPH_ENTITY_TYPE_MON) {
- dout(10) << "setting timeout on session" << dendl;
- // set an initial timeout here, so we will trim this session even if they don't
- // do anything.
- s->until = ceph_clock_now(g_ceph_context);
- s->until += g_conf->mon_subscribe_interval;
- } else {
- //give it monitor caps; the peer type has been authenticated
- reuse_caps = false;
- dout(5) << "setting monitor caps on this connection" << dendl;
- if (!s->caps.is_allow_all()) //but no need to repeatedly copy
- s->caps = *mon_caps;
- }
- if (reuse_caps)
- s->caps = caps;
+
+ if (!exited_quorum.is_zero() && !src_is_mon) {
+ waitlist_or_zap_client(m);
+ return true;
+ }
+
+ dout(10) << "do not have session, making new one" << dendl;
+ s = session_map.new_session(m->get_source_inst(), m->get_connection().get());
+ m->get_connection()->set_priv(s->get());
+ dout(10) << "ms_dispatch new session " << s << " for " << s->inst << dendl;
+
+ if (!src_is_mon) {
+ dout(10) << "setting timeout on session" << dendl;
+ // set an initial timeout here, so we will trim this session even if they don't
+ // do anything.
+ s->until = ceph_clock_now(g_ceph_context);
+ s->until += g_conf->mon_subscribe_interval;
} else {
- dout(20) << "ms_dispatch existing session " << s << " for " << s->inst << dendl;
+ //give it monitor caps; the peer type has been authenticated
+ reuse_caps = false;
+ dout(5) << "setting monitor caps on this connection" << dendl;
+ if (!s->caps.is_allow_all()) //but no need to repeatedly copy
+ s->caps = *mon_caps;
}
+ if (reuse_caps)
+ s->caps = caps;
+ } else {
+ dout(20) << "ms_dispatch existing session " << s << " for " << s->inst << dendl;
+ }
+
+ if (s) {
if (s->auth_handler) {
entity_name = s->auth_handler->get_entity_name();
}
- }
-
- if (s)
dout(20) << " caps " << s->caps.get_str() << dendl;
+ }
if (is_synchronizing() && !src_is_mon) {
waitlist_or_zap_client(m);
return true;
}
- {
- switch (m->get_type()) {
-
+ ret = dispatch(s, m, src_is_mon);
+
+ if (s) {
+ s->put();
+ }
+
+ return ret;
+}
+
+bool Monitor::dispatch(MonSession *s, Message *m, const bool src_is_mon)
+{
+ bool ret = true;
+
+ assert(m != NULL);
+
+ switch (m->get_type()) {
+
case MSG_ROUTE:
handle_route(static_cast<MRoute*>(m));
break;
- // misc
+ // misc
case CEPH_MSG_MON_GET_MAP:
handle_mon_get_map(static_cast<MMonGetMap*>(m));
break;
@@ -2647,12 +2678,11 @@ bool Monitor::_ms_dispatch(Message *m)
case MSG_MON_SYNC:
handle_sync(static_cast<MMonSync*>(m));
break;
-
case MSG_MON_SCRUB:
handle_scrub(static_cast<MMonScrub*>(m));
break;
- // OSDs
+ // OSDs
case MSG_OSD_MARK_ME_DOWN:
case MSG_OSD_FAILURE:
case MSG_OSD_BOOT:
@@ -2665,20 +2695,20 @@ bool Monitor::_ms_dispatch(Message *m)
paxos_service[PAXOS_OSDMAP]->dispatch((PaxosServiceMessage*)m);
break;
- // MDSs
+ // MDSs
case MSG_MDS_BEACON:
case MSG_MDS_OFFLOAD_TARGETS:
paxos_service[PAXOS_MDSMAP]->dispatch((PaxosServiceMessage*)m);
break;
- // auth
+ // auth
case MSG_MON_GLOBAL_ID:
case CEPH_MSG_AUTH:
/* no need to check caps here */
paxos_service[PAXOS_AUTH]->dispatch((PaxosServiceMessage*)m);
break;
- // pg
+ // pg
case CEPH_MSG_STATFS:
case MSG_PGSTATS:
case MSG_GETPOOLSTATS:
@@ -2689,7 +2719,7 @@ bool Monitor::_ms_dispatch(Message *m)
paxos_service[PAXOS_OSDMAP]->dispatch((PaxosServiceMessage*)m);
break;
- // log
+ // log
case MSG_LOG:
paxos_service[PAXOS_LOG]->dispatch((PaxosServiceMessage*)m);
break;
@@ -2698,60 +2728,60 @@ bool Monitor::_ms_dispatch(Message *m)
clog.handle_log_ack((MLogAck*)m);
break;
- // monmap
+ // monmap
case MSG_MON_JOIN:
paxos_service[PAXOS_MONMAP]->dispatch((PaxosServiceMessage*)m);
break;
- // paxos
+ // paxos
case MSG_MON_PAXOS:
{
- MMonPaxos *pm = static_cast<MMonPaxos*>(m);
- if (!src_is_mon &&
- !s->is_capable("mon", MON_CAP_X)) {
- //can't send these!
- pm->put();
- break;
- }
+ MMonPaxos *pm = static_cast<MMonPaxos*>(m);
+ if (!src_is_mon ||
+ !s->is_capable("mon", MON_CAP_X)) {
+ //can't send these!
+ pm->put();
+ break;
+ }
- if (state == STATE_SYNCHRONIZING) {
- // we are synchronizing. These messages would do us no
- // good, thus just drop them and ignore them.
- dout(10) << __func__ << " ignore paxos msg from "
- << pm->get_source_inst() << dendl;
- pm->put();
- break;
- }
+ if (state == STATE_SYNCHRONIZING) {
+ // we are synchronizing. These messages would do us no
+ // good, thus just drop them and ignore them.
+ dout(10) << __func__ << " ignore paxos msg from "
+ << pm->get_source_inst() << dendl;
+ pm->put();
+ break;
+ }
- // sanitize
- if (pm->epoch > get_epoch()) {
- bootstrap();
- pm->put();
- break;
- }
- if (pm->epoch != get_epoch()) {
- pm->put();
- break;
- }
+ // sanitize
+ if (pm->epoch > get_epoch()) {
+ bootstrap();
+ pm->put();
+ break;
+ }
+ if (pm->epoch != get_epoch()) {
+ pm->put();
+ break;
+ }
- paxos->dispatch((PaxosServiceMessage*)m);
+ paxos->dispatch((PaxosServiceMessage*)m);
}
break;
- // elector messages
+ // elector messages
case MSG_MON_ELECTION:
//check privileges here for simplicity
if (s &&
- !s->is_capable("mon", MON_CAP_X)) {
- dout(0) << "MMonElection received from entity without enough caps!"
- << s->caps << dendl;
- m->put();
- break;
+ !s->is_capable("mon", MON_CAP_X)) {
+ dout(0) << "MMonElection received from entity without enough caps!"
+ << s->caps << dendl;
+ m->put();
+ break;
}
if (!is_probing() && !is_synchronizing()) {
- elector.dispatch(m);
+ elector.dispatch(m);
} else {
- m->put();
+ m->put();
}
break;
@@ -2769,10 +2799,6 @@ bool Monitor::_ms_dispatch(Message *m)
default:
ret = false;
- }
- }
- if (s) {
- s->put();
}
return ret;
diff --git a/src/mon/Monitor.h b/src/mon/Monitor.h
index df4a751361a..19adf20c474 100644
--- a/src/mon/Monitor.h
+++ b/src/mon/Monitor.h
@@ -700,6 +700,8 @@ public:
lock.Unlock();
return ret;
}
+ // dissociate message handling from session and connection logic
+ bool dispatch(MonSession *s, Message *m, const bool src_is_mon);
//mon_caps is used for un-connected messages from monitors
MonCap * mon_caps;
bool ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, bool force_new);