diff options
author | Greg Farnum <gregory.farnum@dreamhost.com> | 2012-03-01 18:31:49 -0800 |
---|---|---|
committer | Greg Farnum <gregory.farnum@dreamhost.com> | 2012-03-02 12:32:36 -0800 |
commit | 26e48f4234c051edeeac6e7ba739f911d9454b9a (patch) | |
tree | eaefc0e98466cc291d0a130a3126417b8e2f9232 /src | |
parent | 29be52820d6b16cb606741bc69cc6baebd14bc5b (diff) | |
download | ceph-26e48f4234c051edeeac6e7ba739f911d9454b9a.tar.gz |
msgr: Require that init functions are called before bind() and start().
Fix up callers to handle these constraints.
Signed-off-by: Greg Farnum <gregory.farnum@dreamhost.com>
Diffstat (limited to 'src')
-rw-r--r-- | src/ceph_mds.cc | 8 | ||||
-rw-r--r-- | src/ceph_mon.cc | 31 | ||||
-rw-r--r-- | src/ceph_osd.cc | 30 | ||||
-rw-r--r-- | src/msg/Messenger.h | 35 | ||||
-rw-r--r-- | src/msg/SimpleMessenger.h | 38 |
5 files changed, 106 insertions, 36 deletions
diff --git a/src/ceph_mds.cc b/src/ceph_mds.cc index 76aefd7609c..ed85338e1f5 100644 --- a/src/ceph_mds.cc +++ b/src/ceph_mds.cc @@ -236,10 +236,6 @@ int main(int argc, const char **argv) messenger->set_cluster_protocol(CEPH_MDS_PROTOCOL); messenger->set_nonce(getpid()); - int r = messenger->bind(g_conf->public_addr); - if (r < 0) - exit(1); - cout << "starting " << g_conf->name << " at " << messenger->get_ms_addr() << std::endl; uint64_t supported = @@ -260,6 +256,10 @@ int main(int argc, const char **argv) messenger->set_policy(entity_name_t::TYPE_CLIENT, Messenger::Policy::stateful_server(supported, 0)); + int r = messenger->bind(g_conf->public_addr); + if (r < 0) + exit(1); + if (shadow != MDSMap::STATE_ONESHOT_REPLAY) global_init_daemonize(g_ceph_context, 0); common_init_finish(g_ceph_context); diff --git a/src/ceph_mon.cc b/src/ceph_mon.cc index 02d9ec45fc4..c9e6d37cc7a 100644 --- a/src/ceph_mon.cc +++ b/src/ceph_mon.cc @@ -370,6 +370,22 @@ int main(int argc, const char **argv) SimpleMessenger *messenger = new SimpleMessenger(g_ceph_context, entity_name_t::MON(rank)); messenger->set_cluster_protocol(CEPH_MON_PROTOCOL); + messenger->set_default_send_priority(CEPH_MSG_PRIO_HIGH); + + uint64_t supported = + CEPH_FEATURE_UID | + CEPH_FEATURE_NOSRCADDR | + CEPH_FEATURE_MONCLOCKCHECK | + CEPH_FEATURE_PGID64; + messenger->set_default_policy(Messenger::Policy::stateless_server(supported, 0)); + messenger->set_policy(entity_name_t::TYPE_MON, + Messenger::Policy::lossless_peer(supported, + CEPH_FEATURE_UID | + CEPH_FEATURE_PGID64)); + messenger->set_policy(entity_name_t::TYPE_OSD, + Messenger::Policy::stateless_server(supported, + CEPH_FEATURE_PGID64 | + CEPH_FEATURE_OSDENC)); global_print_banner(); @@ -384,7 +400,6 @@ int main(int argc, const char **argv) return 1; // start monitor - messenger->set_default_send_priority(CEPH_MSG_PRIO_HIGH); mon = new Monitor(g_ceph_context, g_conf->name.get_id(), &store, messenger, &monmap); global_init_daemonize(g_ceph_context, 0); @@ -398,20 +413,6 @@ int main(int argc, const char **argv) register_async_signal_handler_oneshot(SIGINT, handle_mon_signal); register_async_signal_handler_oneshot(SIGTERM, handle_mon_signal); - uint64_t supported = - CEPH_FEATURE_UID | - CEPH_FEATURE_NOSRCADDR | - CEPH_FEATURE_MONCLOCKCHECK | - CEPH_FEATURE_PGID64; - messenger->set_default_policy(Messenger::Policy::stateless_server(supported, 0)); - messenger->set_policy(entity_name_t::TYPE_MON, - Messenger::Policy::lossless_peer(supported, - CEPH_FEATURE_UID | - CEPH_FEATURE_PGID64)); - messenger->set_policy(entity_name_t::TYPE_OSD, - Messenger::Policy::stateless_server(supported, - CEPH_FEATURE_PGID64 | - CEPH_FEATURE_OSDENC)); mon->init(); messenger->wait(); diff --git a/src/ceph_osd.cc b/src/ceph_osd.cc index 98a998d906c..3bf1cbaf286 100644 --- a/src/ceph_osd.cc +++ b/src/ceph_osd.cc @@ -312,21 +312,6 @@ int main(int argc, const char **argv) messenger_hbin->set_cluster_protocol(CEPH_OSD_PROTOCOL); messenger_hbout->set_cluster_protocol(CEPH_OSD_PROTOCOL); - r = client_messenger->bind(g_conf->public_addr); - if (r < 0) - exit(1); - r = cluster_messenger->bind(g_conf->cluster_addr); - if (r < 0) - exit(1); - - // hb should bind to same ip as cluster_addr (if specified) - entity_addr_t hb_addr = g_conf->cluster_addr; - if (!hb_addr.is_blank_ip()) - hb_addr.set_port(0); - r = messenger_hbout->bind(hb_addr); - if (r < 0) - exit(1); - global_print_banner(); cout << "starting osd." << whoami @@ -366,6 +351,21 @@ int main(int argc, const char **argv) cluster_messenger->set_policy(entity_name_t::TYPE_CLIENT, Messenger::Policy::stateless_server(0, 0)); + r = client_messenger->bind(g_conf->public_addr); + if (r < 0) + exit(1); + r = cluster_messenger->bind(g_conf->cluster_addr); + if (r < 0) + exit(1); + + // hb should bind to same ip as cluster_addr (if specified) + entity_addr_t hb_addr = g_conf->cluster_addr; + if (!hb_addr.is_blank_ip()) + hb_addr.set_port(0); + r = messenger_hbout->bind(hb_addr); + if (r < 0) + exit(1); + // Set up crypto, daemonize, etc. // Leave stderr open in case we need to report errors. global_init_daemonize(g_ceph_context, CINIT_FLAG_NO_CLOSE_STDERR); diff --git a/src/msg/Messenger.h b/src/msg/Messenger.h index a35c7303e76..d5ce4d6ad19 100644 --- a/src/msg/Messenger.h +++ b/src/msg/Messenger.h @@ -73,6 +73,7 @@ private: list<Dispatcher*> dispatchers; protected: + /// the "name" of the local daemon. eg client.99 entity_name_t _my_name; int default_send_priority; /// set to true once the Messenger has started, and set to false on shutdown @@ -106,21 +107,51 @@ protected: virtual void set_ip(entity_addr_t &addr) = 0; entity_inst_t get_myinst() { return entity_inst_t(get_myname(), get_myaddr()); } + /** + * Set the name of the local entity. The name is reported to others and + * can be changed while the system is running, but doing so at incorrect + * times may have bad results. + * + * @param m The name to set. + */ void set_myname(const entity_name_t m) { _my_name = m; } - void set_default_send_priority(int p) { default_send_priority = p; } + /** + * Set the default send priority + * This is an init-time function and must be called *before* calling + * start(). + * + * @param p The cluster protocol to use. Defined externally. + */ + void set_default_send_priority(int p) { + assert(!started); + default_send_priority = p; + } int get_default_send_priority() { return default_send_priority; } // hrmpf. virtual int get_dispatch_queue_len() { return 0; }; - // setup + /** + * Add a new Dispatcher to the front of the list. If you add + * a Dispatcher which is already included, it will get a duplicate + * entry. This will reduce efficiency but not break anything. + * + * @param d The Dispatcher to insert into the list. + */ void add_dispatcher_head(Dispatcher *d) { bool first = dispatchers.empty(); dispatchers.push_front(d); if (first) ready(); } + /** + * Add a new Dispatcher to the end of the list. If you add + * a Dispatcher which is already included, it will get a duplicate + * entry. This will reduce efficiency but not break anything. + * + * @param d The Dispatcher to insert into the list. + */ void add_dispatcher_tail(Dispatcher *d) { bool first = dispatchers.empty(); dispatchers.push_back(d); diff --git a/src/msg/SimpleMessenger.h b/src/msg/SimpleMessenger.h index bbdae103588..6aa218efad0 100644 --- a/src/msg/SimpleMessenger.h +++ b/src/msg/SimpleMessenger.h @@ -413,13 +413,43 @@ private: else return default_policy; } + /** + * Set a policy which is applied to all peers who do not have a type-specific + * Policy. + * This is an init-time function and must be called *before* calling + * start() or bind(). + * + * @param p The Policy to apply. + */ void set_default_policy(Policy p) { + assert(!started && !did_bind); default_policy = p; } + /** + * Set a policy which is applied to all peers of the given type. + * This is an init-time function and must be called *before* calling + * start() or bind(). + * + * @param type The peer type this policy applies to. + * @param p The policy to apply. + */ void set_policy(int type, Policy p) { + assert(!started && !did_bind); policy_map[type] = p; } + /** + * Set a Throttler which is applied to all Messages from the given + * type of peer. + * This is an init-time function and must be called *before* calling + * start() or bind(). + * + * @param type The peer type this Throttler will apply to. + * @param t The Throttler to apply. SimpleMessenger does not take + * ownership of this pointer, but you must not destroy it before + * you destroy SimpleMessenger. + */ void set_policy_throttler(int type, Throttle *t) { + assert (!started && !did_bind); get_policy(type).throttler = t; } @@ -532,7 +562,15 @@ public: virtual int start(); virtual void wait(); + /** + * Set the cluster protocol in use by this daemon. + * This is an init-time function and must be called *before* calling + * start() or bind(). + * + * @param p The cluster protocol to use. Defined externally. + */ void set_cluster_protocol(int p) { + assert(!started && !did_bind); cluster_protocol = p; } |