summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorGreg Farnum <gregory.farnum@dreamhost.com>2012-03-01 18:31:49 -0800
committerGreg Farnum <gregory.farnum@dreamhost.com>2012-03-02 12:32:36 -0800
commit26e48f4234c051edeeac6e7ba739f911d9454b9a (patch)
treeeaefc0e98466cc291d0a130a3126417b8e2f9232 /src
parent29be52820d6b16cb606741bc69cc6baebd14bc5b (diff)
downloadceph-26e48f4234c051edeeac6e7ba739f911d9454b9a.tar.gz
msgr: Require that init functions are called before bind() and start().
Fix up callers to handle these constraints. Signed-off-by: Greg Farnum <gregory.farnum@dreamhost.com>
Diffstat (limited to 'src')
-rw-r--r--src/ceph_mds.cc8
-rw-r--r--src/ceph_mon.cc31
-rw-r--r--src/ceph_osd.cc30
-rw-r--r--src/msg/Messenger.h35
-rw-r--r--src/msg/SimpleMessenger.h38
5 files changed, 106 insertions, 36 deletions
diff --git a/src/ceph_mds.cc b/src/ceph_mds.cc
index 76aefd7609c..ed85338e1f5 100644
--- a/src/ceph_mds.cc
+++ b/src/ceph_mds.cc
@@ -236,10 +236,6 @@ int main(int argc, const char **argv)
messenger->set_cluster_protocol(CEPH_MDS_PROTOCOL);
messenger->set_nonce(getpid());
- int r = messenger->bind(g_conf->public_addr);
- if (r < 0)
- exit(1);
-
cout << "starting " << g_conf->name << " at " << messenger->get_ms_addr()
<< std::endl;
uint64_t supported =
@@ -260,6 +256,10 @@ int main(int argc, const char **argv)
messenger->set_policy(entity_name_t::TYPE_CLIENT,
Messenger::Policy::stateful_server(supported, 0));
+ int r = messenger->bind(g_conf->public_addr);
+ if (r < 0)
+ exit(1);
+
if (shadow != MDSMap::STATE_ONESHOT_REPLAY)
global_init_daemonize(g_ceph_context, 0);
common_init_finish(g_ceph_context);
diff --git a/src/ceph_mon.cc b/src/ceph_mon.cc
index 02d9ec45fc4..c9e6d37cc7a 100644
--- a/src/ceph_mon.cc
+++ b/src/ceph_mon.cc
@@ -370,6 +370,22 @@ int main(int argc, const char **argv)
SimpleMessenger *messenger = new SimpleMessenger(g_ceph_context,
entity_name_t::MON(rank));
messenger->set_cluster_protocol(CEPH_MON_PROTOCOL);
+ messenger->set_default_send_priority(CEPH_MSG_PRIO_HIGH);
+
+ uint64_t supported =
+ CEPH_FEATURE_UID |
+ CEPH_FEATURE_NOSRCADDR |
+ CEPH_FEATURE_MONCLOCKCHECK |
+ CEPH_FEATURE_PGID64;
+ messenger->set_default_policy(Messenger::Policy::stateless_server(supported, 0));
+ messenger->set_policy(entity_name_t::TYPE_MON,
+ Messenger::Policy::lossless_peer(supported,
+ CEPH_FEATURE_UID |
+ CEPH_FEATURE_PGID64));
+ messenger->set_policy(entity_name_t::TYPE_OSD,
+ Messenger::Policy::stateless_server(supported,
+ CEPH_FEATURE_PGID64 |
+ CEPH_FEATURE_OSDENC));
global_print_banner();
@@ -384,7 +400,6 @@ int main(int argc, const char **argv)
return 1;
// start monitor
- messenger->set_default_send_priority(CEPH_MSG_PRIO_HIGH);
mon = new Monitor(g_ceph_context, g_conf->name.get_id(), &store, messenger, &monmap);
global_init_daemonize(g_ceph_context, 0);
@@ -398,20 +413,6 @@ int main(int argc, const char **argv)
register_async_signal_handler_oneshot(SIGINT, handle_mon_signal);
register_async_signal_handler_oneshot(SIGTERM, handle_mon_signal);
- uint64_t supported =
- CEPH_FEATURE_UID |
- CEPH_FEATURE_NOSRCADDR |
- CEPH_FEATURE_MONCLOCKCHECK |
- CEPH_FEATURE_PGID64;
- messenger->set_default_policy(Messenger::Policy::stateless_server(supported, 0));
- messenger->set_policy(entity_name_t::TYPE_MON,
- Messenger::Policy::lossless_peer(supported,
- CEPH_FEATURE_UID |
- CEPH_FEATURE_PGID64));
- messenger->set_policy(entity_name_t::TYPE_OSD,
- Messenger::Policy::stateless_server(supported,
- CEPH_FEATURE_PGID64 |
- CEPH_FEATURE_OSDENC));
mon->init();
messenger->wait();
diff --git a/src/ceph_osd.cc b/src/ceph_osd.cc
index 98a998d906c..3bf1cbaf286 100644
--- a/src/ceph_osd.cc
+++ b/src/ceph_osd.cc
@@ -312,21 +312,6 @@ int main(int argc, const char **argv)
messenger_hbin->set_cluster_protocol(CEPH_OSD_PROTOCOL);
messenger_hbout->set_cluster_protocol(CEPH_OSD_PROTOCOL);
- r = client_messenger->bind(g_conf->public_addr);
- if (r < 0)
- exit(1);
- r = cluster_messenger->bind(g_conf->cluster_addr);
- if (r < 0)
- exit(1);
-
- // hb should bind to same ip as cluster_addr (if specified)
- entity_addr_t hb_addr = g_conf->cluster_addr;
- if (!hb_addr.is_blank_ip())
- hb_addr.set_port(0);
- r = messenger_hbout->bind(hb_addr);
- if (r < 0)
- exit(1);
-
global_print_banner();
cout << "starting osd." << whoami
@@ -366,6 +351,21 @@ int main(int argc, const char **argv)
cluster_messenger->set_policy(entity_name_t::TYPE_CLIENT,
Messenger::Policy::stateless_server(0, 0));
+ r = client_messenger->bind(g_conf->public_addr);
+ if (r < 0)
+ exit(1);
+ r = cluster_messenger->bind(g_conf->cluster_addr);
+ if (r < 0)
+ exit(1);
+
+ // hb should bind to same ip as cluster_addr (if specified)
+ entity_addr_t hb_addr = g_conf->cluster_addr;
+ if (!hb_addr.is_blank_ip())
+ hb_addr.set_port(0);
+ r = messenger_hbout->bind(hb_addr);
+ if (r < 0)
+ exit(1);
+
// Set up crypto, daemonize, etc.
// Leave stderr open in case we need to report errors.
global_init_daemonize(g_ceph_context, CINIT_FLAG_NO_CLOSE_STDERR);
diff --git a/src/msg/Messenger.h b/src/msg/Messenger.h
index a35c7303e76..d5ce4d6ad19 100644
--- a/src/msg/Messenger.h
+++ b/src/msg/Messenger.h
@@ -73,6 +73,7 @@ private:
list<Dispatcher*> dispatchers;
protected:
+ /// the "name" of the local daemon. eg client.99
entity_name_t _my_name;
int default_send_priority;
/// set to true once the Messenger has started, and set to false on shutdown
@@ -106,21 +107,51 @@ protected:
virtual void set_ip(entity_addr_t &addr) = 0;
entity_inst_t get_myinst() { return entity_inst_t(get_myname(), get_myaddr()); }
+ /**
+ * Set the name of the local entity. The name is reported to others and
+ * can be changed while the system is running, but doing so at incorrect
+ * times may have bad results.
+ *
+ * @param m The name to set.
+ */
void set_myname(const entity_name_t m) { _my_name = m; }
- void set_default_send_priority(int p) { default_send_priority = p; }
+ /**
+ * Set the default send priority
+ * This is an init-time function and must be called *before* calling
+ * start().
+ *
+ * @param p The cluster protocol to use. Defined externally.
+ */
+ void set_default_send_priority(int p) {
+ assert(!started);
+ default_send_priority = p;
+ }
int get_default_send_priority() { return default_send_priority; }
// hrmpf.
virtual int get_dispatch_queue_len() { return 0; };
- // setup
+ /**
+ * Add a new Dispatcher to the front of the list. If you add
+ * a Dispatcher which is already included, it will get a duplicate
+ * entry. This will reduce efficiency but not break anything.
+ *
+ * @param d The Dispatcher to insert into the list.
+ */
void add_dispatcher_head(Dispatcher *d) {
bool first = dispatchers.empty();
dispatchers.push_front(d);
if (first)
ready();
}
+ /**
+ * Add a new Dispatcher to the end of the list. If you add
+ * a Dispatcher which is already included, it will get a duplicate
+ * entry. This will reduce efficiency but not break anything.
+ *
+ * @param d The Dispatcher to insert into the list.
+ */
void add_dispatcher_tail(Dispatcher *d) {
bool first = dispatchers.empty();
dispatchers.push_back(d);
diff --git a/src/msg/SimpleMessenger.h b/src/msg/SimpleMessenger.h
index bbdae103588..6aa218efad0 100644
--- a/src/msg/SimpleMessenger.h
+++ b/src/msg/SimpleMessenger.h
@@ -413,13 +413,43 @@ private:
else
return default_policy;
}
+ /**
+ * Set a policy which is applied to all peers who do not have a type-specific
+ * Policy.
+ * This is an init-time function and must be called *before* calling
+ * start() or bind().
+ *
+ * @param p The Policy to apply.
+ */
void set_default_policy(Policy p) {
+ assert(!started && !did_bind);
default_policy = p;
}
+ /**
+ * Set a policy which is applied to all peers of the given type.
+ * This is an init-time function and must be called *before* calling
+ * start() or bind().
+ *
+ * @param type The peer type this policy applies to.
+ * @param p The policy to apply.
+ */
void set_policy(int type, Policy p) {
+ assert(!started && !did_bind);
policy_map[type] = p;
}
+ /**
+ * Set a Throttler which is applied to all Messages from the given
+ * type of peer.
+ * This is an init-time function and must be called *before* calling
+ * start() or bind().
+ *
+ * @param type The peer type this Throttler will apply to.
+ * @param t The Throttler to apply. SimpleMessenger does not take
+ * ownership of this pointer, but you must not destroy it before
+ * you destroy SimpleMessenger.
+ */
void set_policy_throttler(int type, Throttle *t) {
+ assert (!started && !did_bind);
get_policy(type).throttler = t;
}
@@ -532,7 +562,15 @@ public:
virtual int start();
virtual void wait();
+ /**
+ * Set the cluster protocol in use by this daemon.
+ * This is an init-time function and must be called *before* calling
+ * start() or bind().
+ *
+ * @param p The cluster protocol to use. Defined externally.
+ */
void set_cluster_protocol(int p) {
+ assert(!started && !did_bind);
cluster_protocol = p;
}