diff options
author | Greg Farnum <greg@inktank.com> | 2013-04-09 12:11:15 -0700 |
---|---|---|
committer | Greg Farnum <greg@inktank.com> | 2013-04-09 12:11:15 -0700 |
commit | cecbb4d88a9a2f0fc16829c23c0f1dd62d087b85 (patch) | |
tree | 2e69949d845982517b2a076b45655bc87f116410 | |
parent | a48739d9ab69b8372723139f652717709910e4d2 (diff) | |
parent | aca0aea1bfbafba9cab1b2c693760b824bd82d30 (diff) | |
download | ceph-cecbb4d88a9a2f0fc16829c23c0f1dd62d087b85.tar.gz |
Merge remote-tracking branch 'origin/wip-osd-throttle2' into next
Reviewed-by: Greg Farnum <greg@inktank.com>
-rw-r--r-- | src/ceph_mon.cc | 6 | ||||
-rw-r--r-- | src/ceph_osd.cc | 14 | ||||
-rw-r--r-- | src/common/config_opts.h | 1 | ||||
-rw-r--r-- | src/msg/Message.h | 50 | ||||
-rw-r--r-- | src/msg/Messenger.h | 13 | ||||
-rw-r--r-- | src/msg/Pipe.cc | 39 | ||||
-rw-r--r-- | src/msg/SimpleMessenger.h | 13 | ||||
-rw-r--r-- | src/test/mon/test_mon_workloadgen.cc | 4 |
8 files changed, 92 insertions, 48 deletions
diff --git a/src/ceph_mon.cc b/src/ceph_mon.cc index 4a4df8942e9..72354e18876 100644 --- a/src/ceph_mon.cc +++ b/src/ceph_mon.cc @@ -407,15 +407,15 @@ int main(int argc, const char **argv) // throttle client traffic Throttle *client_throttler = new Throttle(g_ceph_context, "mon_client_bytes", g_conf->mon_client_bytes); - messenger->set_policy_throttler(entity_name_t::TYPE_CLIENT, client_throttler); + messenger->set_policy_throttlers(entity_name_t::TYPE_CLIENT, client_throttler, NULL); // throttle daemon traffic // NOTE: actual usage on the leader may multiply by the number of // monitors if they forward large update messages from daemons. Throttle *daemon_throttler = new Throttle(g_ceph_context, "mon_daemon_bytes", g_conf->mon_daemon_bytes); - messenger->set_policy_throttler(entity_name_t::TYPE_OSD, daemon_throttler); - messenger->set_policy_throttler(entity_name_t::TYPE_MDS, daemon_throttler); + messenger->set_policy_throttlers(entity_name_t::TYPE_OSD, daemon_throttler, NULL); + messenger->set_policy_throttlers(entity_name_t::TYPE_MDS, daemon_throttler, NULL); cout << "starting " << g_conf->name << " rank " << rank << " at " << ipaddr diff --git a/src/ceph_osd.cc b/src/ceph_osd.cc index 5a90abd6125..33a107c1dc0 100644 --- a/src/ceph_osd.cc +++ b/src/ceph_osd.cc @@ -338,9 +338,12 @@ int main(int argc, const char **argv) "(no journal)" : g_conf->osd_journal) << std::endl; - boost::scoped_ptr<Throttle> client_throttler( + boost::scoped_ptr<Throttle> client_byte_throttler( new Throttle(g_ceph_context, "osd_client_bytes", g_conf->osd_client_message_size_cap)); + boost::scoped_ptr<Throttle> client_msg_throttler( + new Throttle(g_ceph_context, "osd_client_messages", + g_conf->osd_client_message_cap)); uint64_t supported = CEPH_FEATURE_UID | @@ -349,9 +352,9 @@ int main(int argc, const char **argv) CEPH_FEATURE_MSG_AUTH; client_messenger->set_default_policy(Messenger::Policy::stateless_server(supported, 0)); - client_messenger->set_policy_throttler( - entity_name_t::TYPE_CLIENT, - client_throttler.get()); // default, actually + client_messenger->set_policy_throttlers(entity_name_t::TYPE_CLIENT, + client_byte_throttler.get(), + client_msg_throttler.get()); client_messenger->set_policy(entity_name_t::TYPE_MON, Messenger::Policy::lossy_client(supported, CEPH_FEATURE_UID | @@ -462,7 +465,8 @@ int main(int argc, const char **argv) delete messenger_hbclient; delete messenger_hbserver; delete cluster_messenger; - client_throttler.reset(); + client_byte_throttler.reset(); + client_msg_throttler.reset(); g_ceph_context->put(); // cd on exit, so that gmon.out (if any) goes into a separate directory for each node. diff --git a/src/common/config_opts.h b/src/common/config_opts.h index cb2fd391fc9..17e6ec4c8c4 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -349,6 +349,7 @@ OPTION(osd_journal_size, OPT_INT, 5120) // in mb OPTION(osd_max_write_size, OPT_INT, 90) OPTION(osd_max_pgls, OPT_U64, 1024) // max number of pgls entries to return OPTION(osd_client_message_size_cap, OPT_U64, 500*1024L*1024L) // client data allowed in-memory (in bytes) +OPTION(osd_client_message_cap, OPT_U64, 100) // client messages allowed in-memory (in bytes) OPTION(osd_pg_bits, OPT_INT, 6) // bits per osd OPTION(osd_pgp_bits, OPT_INT, 6) // bits per osd OPTION(osd_crush_chooseleaf_type, OPT_INT, 1) // 1 = host diff --git a/src/msg/Message.h b/src/msg/Message.h index 1bf28e36f2d..33d26b2e7da 100644 --- a/src/msg/Message.h +++ b/src/msg/Message.h @@ -299,7 +299,10 @@ protected: // release our size in bytes back to this throttler when our payload // is adjusted or when we are destroyed. - Throttle *throttler; + Throttle *byte_throttler; + + // release a count back to this throttler when we are destroyed + Throttle *msg_throttler; // keep track of how big this message was when we reserved space in // the msgr dispatch_throttler, so that we can properly release it @@ -313,14 +316,16 @@ protected: public: Message() : connection(NULL), - throttler(NULL), + byte_throttler(NULL), + msg_throttler(NULL), dispatch_throttle_size(0) { memset(&header, 0, sizeof(header)); memset(&footer, 0, sizeof(footer)); }; Message(int t, int version=1, int compat_version=0) : connection(NULL), - throttler(NULL), + byte_throttler(NULL), + msg_throttler(NULL), dispatch_throttle_size(0) { memset(&header, 0, sizeof(header)); header.type = t; @@ -340,8 +345,10 @@ protected: assert(nref.read() == 0); if (connection) connection->put(); - if (throttler) - throttler->put(payload.length() + middle.length() + data.length()); + if (byte_throttler) + byte_throttler->put(payload.length() + middle.length() + data.length()); + if (msg_throttler) + msg_throttler->put(); } public: Connection *get_connection() { return connection; } @@ -350,8 +357,10 @@ public: connection->put(); connection = c; } - void set_throttler(Throttle *t) { throttler = t; } - Throttle *get_throttler() { return throttler; } + void set_byte_throttler(Throttle *t) { byte_throttler = t; } + Throttle *get_byte_throttler() { return byte_throttler; } + void set_message_throttler(Throttle *t) { msg_throttler = t; } + Throttle *get_message_throttler() { return msg_throttler; } void set_dispatch_throttle_size(uint64_t s) { dispatch_throttle_size = s; } uint64_t get_dispatch_throttle_size() { return dispatch_throttle_size; } @@ -369,39 +378,48 @@ public: */ void clear_payload() { - if (throttler) throttler->put(payload.length() + middle.length()); + if (byte_throttler) + byte_throttler->put(payload.length() + middle.length()); payload.clear(); middle.clear(); } void clear_data() { - if (throttler) throttler->put(data.length()); + if (byte_throttler) + byte_throttler->put(data.length()); data.clear(); } bool empty_payload() { return payload.length() == 0; } bufferlist& get_payload() { return payload; } void set_payload(bufferlist& bl) { - if (throttler) throttler->put(payload.length()); + if (byte_throttler) + byte_throttler->put(payload.length()); payload.claim(bl); - if (throttler) throttler->take(payload.length()); + if (byte_throttler) + byte_throttler->take(payload.length()); } void set_middle(bufferlist& bl) { - if (throttler) throttler->put(payload.length()); + if (byte_throttler) + byte_throttler->put(payload.length()); middle.claim(bl); - if (throttler) throttler->take(payload.length()); + if (byte_throttler) + byte_throttler->take(payload.length()); } bufferlist& get_middle() { return middle; } void set_data(const bufferlist &d) { - if (throttler) throttler->put(data.length()); + if (byte_throttler) + byte_throttler->put(data.length()); data = d; - if (throttler) throttler->take(data.length()); + if (byte_throttler) + byte_throttler->take(data.length()); } bufferlist& get_data() { return data; } void claim_data(bufferlist& bl) { - if (throttler) throttler->put(data.length()); + if (byte_throttler) + byte_throttler->put(data.length()); bl.claim(data); } off_t get_data_len() { return data.length(); } diff --git a/src/msg/Messenger.h b/src/msg/Messenger.h index 7205940c118..b08fdaa7f30 100644 --- a/src/msg/Messenger.h +++ b/src/msg/Messenger.h @@ -74,7 +74,8 @@ public: * the associated Connection(s). When reading in a new Message, the Messenger * will call throttler->throttle() for the size of the new Message. */ - Throttle *throttler; + Throttle *throttler_bytes; + Throttle *throttler_messages; /// Specify features supported locally by the endpoint. uint64_t features_supported; @@ -82,12 +83,16 @@ public: uint64_t features_required; Policy() - : lossy(false), server(false), standby(false), resetcheck(true), throttler(NULL), + : lossy(false), server(false), standby(false), resetcheck(true), + throttler_bytes(NULL), + throttler_messages(NULL), features_supported(CEPH_FEATURES_SUPPORTED_DEFAULT), features_required(0) {} private: Policy(bool l, bool s, bool st, bool r, uint64_t sup, uint64_t req) - : lossy(l), server(s), standby(st), resetcheck(r), throttler(NULL), + : lossy(l), server(s), standby(st), resetcheck(r), + throttler_bytes(NULL), + throttler_messages(NULL), features_supported(sup | CEPH_FEATURES_SUPPORTED_DEFAULT), features_required(req) {} @@ -266,7 +271,7 @@ public: * ownership of this pointer, but you must not destroy it before * you destroy the Messenger. */ - virtual void set_policy_throttler(int type, Throttle *t) = 0; + virtual void set_policy_throttlers(int type, Throttle *bytes, Throttle *msgs=NULL) = 0; /** * Set the default send priority * diff --git a/src/msg/Pipe.cc b/src/msg/Pipe.cc index 75f7e551292..f4100bc483b 100644 --- a/src/msg/Pipe.cc +++ b/src/msg/Pipe.cc @@ -1664,14 +1664,20 @@ int Pipe::read_message(Message **pm) Message *message; utime_t recv_stamp = ceph_clock_now(msgr->cct); + if (policy.throttler_messages) { + ldout(msgr->cct,10) << "reader wants " << 1 << " message from policy throttler " + << policy.throttler_messages->get_current() << "/" + << policy.throttler_messages->get_max() << dendl; + policy.throttler_messages->get(); + } + uint64_t message_size = header.front_len + header.middle_len + header.data_len; if (message_size) { - bool waited_on_throttle = false; - if (policy.throttler) { - ldout(msgr->cct,10) << "reader wants " << message_size << " from policy throttler " - << policy.throttler->get_current() << "/" - << policy.throttler->get_max() << dendl; - waited_on_throttle = policy.throttler->get(message_size); + if (policy.throttler_bytes) { + ldout(msgr->cct,10) << "reader wants " << message_size << " bytes from policy throttler " + << policy.throttler_bytes->get_current() << "/" + << policy.throttler_bytes->get_max() << dendl; + policy.throttler_bytes->get(message_size); } // throttle total bytes waiting for dispatch. do this _after_ the @@ -1681,7 +1687,7 @@ int Pipe::read_message(Message **pm) ldout(msgr->cct,10) << "reader wants " << message_size << " from dispatch throttler " << msgr->dispatch_throttler.get_current() << "/" << msgr->dispatch_throttler.get_max() << dendl; - waited_on_throttle |= msgr->dispatch_throttler.get(message_size); + msgr->dispatch_throttler.get(message_size); } utime_t throttle_stamp = ceph_clock_now(msgr->cct); @@ -1810,7 +1816,8 @@ int Pipe::read_message(Message **pm) } } - message->set_throttler(policy.throttler); + message->set_byte_throttler(policy.throttler_bytes); + message->set_message_throttler(policy.throttler_messages); // store reservation size in message, so we don't get confused // by messages entering the dispatch queue through other paths. @@ -1825,12 +1832,18 @@ int Pipe::read_message(Message **pm) out_dethrottle: // release bytes reserved from the throttlers on failure + if (policy.throttler_messages) { + ldout(msgr->cct,10) << "reader releasing " << 1 << " message to policy throttler " + << policy.throttler_messages->get_current() << "/" + << policy.throttler_messages->get_max() << dendl; + policy.throttler_messages->put(); + } if (message_size) { - if (policy.throttler) { - ldout(msgr->cct,10) << "reader releasing " << message_size << " to policy throttler " - << policy.throttler->get_current() << "/" - << policy.throttler->get_max() << dendl; - policy.throttler->put(message_size); + if (policy.throttler_bytes) { + ldout(msgr->cct,10) << "reader releasing " << message_size << " bytes to policy throttler " + << policy.throttler_bytes->get_current() << "/" + << policy.throttler_bytes->get_max() << dendl; + policy.throttler_bytes->put(message_size); } msgr->dispatch_throttle_release(message_size); diff --git a/src/msg/SimpleMessenger.h b/src/msg/SimpleMessenger.h index cc946e3d25a..d837a4496ae 100644 --- a/src/msg/SimpleMessenger.h +++ b/src/msg/SimpleMessenger.h @@ -162,12 +162,15 @@ public: * ownership of this pointer, but you must not destroy it before * you destroy SimpleMessenger. */ - void set_policy_throttler(int type, Throttle *t) { + void set_policy_throttlers(int type, Throttle *byte_throttle, Throttle *msg_throttle) { Mutex::Locker l(policy_lock); - if (policy_map.count(type)) - policy_map[type].throttler = t; - else - default_policy.throttler = t; + if (policy_map.count(type)) { + policy_map[type].throttler_bytes = byte_throttle; + policy_map[type].throttler_messages = msg_throttle; + } else { + default_policy.throttler_bytes = byte_throttle; + default_policy.throttler_messages = msg_throttle; + } } /** * Bind the SimpleMessenger to a specific address. If bind_addr diff --git a/src/test/mon/test_mon_workloadgen.cc b/src/test/mon/test_mon_workloadgen.cc index 216e6288b1f..07f999180a3 100644 --- a/src/test/mon/test_mon_workloadgen.cc +++ b/src/test/mon/test_mon_workloadgen.cc @@ -366,8 +366,8 @@ class OSDStub : public TestStub messenger->set_default_policy( Messenger::Policy::stateless_server(supported, 0)); - messenger->set_policy_throttler(entity_name_t::TYPE_CLIENT, - &throttler); + messenger->set_policy_throttlers(entity_name_t::TYPE_CLIENT, + &throttler, NULL); messenger->set_policy(entity_name_t::TYPE_MON, Messenger::Policy::lossy_client(supported, CEPH_FEATURE_UID | CEPH_FEATURE_PGID64 | |