summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGreg Farnum <greg@inktank.com>2013-04-09 12:11:15 -0700
committerGreg Farnum <greg@inktank.com>2013-04-09 12:11:15 -0700
commitcecbb4d88a9a2f0fc16829c23c0f1dd62d087b85 (patch)
tree2e69949d845982517b2a076b45655bc87f116410
parenta48739d9ab69b8372723139f652717709910e4d2 (diff)
parentaca0aea1bfbafba9cab1b2c693760b824bd82d30 (diff)
downloadceph-cecbb4d88a9a2f0fc16829c23c0f1dd62d087b85.tar.gz
Merge remote-tracking branch 'origin/wip-osd-throttle2' into next
Reviewed-by: Greg Farnum <greg@inktank.com>
-rw-r--r--src/ceph_mon.cc6
-rw-r--r--src/ceph_osd.cc14
-rw-r--r--src/common/config_opts.h1
-rw-r--r--src/msg/Message.h50
-rw-r--r--src/msg/Messenger.h13
-rw-r--r--src/msg/Pipe.cc39
-rw-r--r--src/msg/SimpleMessenger.h13
-rw-r--r--src/test/mon/test_mon_workloadgen.cc4
8 files changed, 92 insertions, 48 deletions
diff --git a/src/ceph_mon.cc b/src/ceph_mon.cc
index 4a4df8942e9..72354e18876 100644
--- a/src/ceph_mon.cc
+++ b/src/ceph_mon.cc
@@ -407,15 +407,15 @@ int main(int argc, const char **argv)
// throttle client traffic
Throttle *client_throttler = new Throttle(g_ceph_context, "mon_client_bytes",
g_conf->mon_client_bytes);
- messenger->set_policy_throttler(entity_name_t::TYPE_CLIENT, client_throttler);
+ messenger->set_policy_throttlers(entity_name_t::TYPE_CLIENT, client_throttler, NULL);
// throttle daemon traffic
// NOTE: actual usage on the leader may multiply by the number of
// monitors if they forward large update messages from daemons.
Throttle *daemon_throttler = new Throttle(g_ceph_context, "mon_daemon_bytes",
g_conf->mon_daemon_bytes);
- messenger->set_policy_throttler(entity_name_t::TYPE_OSD, daemon_throttler);
- messenger->set_policy_throttler(entity_name_t::TYPE_MDS, daemon_throttler);
+ messenger->set_policy_throttlers(entity_name_t::TYPE_OSD, daemon_throttler, NULL);
+ messenger->set_policy_throttlers(entity_name_t::TYPE_MDS, daemon_throttler, NULL);
cout << "starting " << g_conf->name << " rank " << rank
<< " at " << ipaddr
diff --git a/src/ceph_osd.cc b/src/ceph_osd.cc
index 5a90abd6125..33a107c1dc0 100644
--- a/src/ceph_osd.cc
+++ b/src/ceph_osd.cc
@@ -338,9 +338,12 @@ int main(int argc, const char **argv)
"(no journal)" : g_conf->osd_journal)
<< std::endl;
- boost::scoped_ptr<Throttle> client_throttler(
+ boost::scoped_ptr<Throttle> client_byte_throttler(
new Throttle(g_ceph_context, "osd_client_bytes",
g_conf->osd_client_message_size_cap));
+ boost::scoped_ptr<Throttle> client_msg_throttler(
+ new Throttle(g_ceph_context, "osd_client_messages",
+ g_conf->osd_client_message_cap));
uint64_t supported =
CEPH_FEATURE_UID |
@@ -349,9 +352,9 @@ int main(int argc, const char **argv)
CEPH_FEATURE_MSG_AUTH;
client_messenger->set_default_policy(Messenger::Policy::stateless_server(supported, 0));
- client_messenger->set_policy_throttler(
- entity_name_t::TYPE_CLIENT,
- client_throttler.get()); // default, actually
+ client_messenger->set_policy_throttlers(entity_name_t::TYPE_CLIENT,
+ client_byte_throttler.get(),
+ client_msg_throttler.get());
client_messenger->set_policy(entity_name_t::TYPE_MON,
Messenger::Policy::lossy_client(supported,
CEPH_FEATURE_UID |
@@ -462,7 +465,8 @@ int main(int argc, const char **argv)
delete messenger_hbclient;
delete messenger_hbserver;
delete cluster_messenger;
- client_throttler.reset();
+ client_byte_throttler.reset();
+ client_msg_throttler.reset();
g_ceph_context->put();
// cd on exit, so that gmon.out (if any) goes into a separate directory for each node.
diff --git a/src/common/config_opts.h b/src/common/config_opts.h
index cb2fd391fc9..17e6ec4c8c4 100644
--- a/src/common/config_opts.h
+++ b/src/common/config_opts.h
@@ -349,6 +349,7 @@ OPTION(osd_journal_size, OPT_INT, 5120) // in mb
OPTION(osd_max_write_size, OPT_INT, 90)
OPTION(osd_max_pgls, OPT_U64, 1024) // max number of pgls entries to return
OPTION(osd_client_message_size_cap, OPT_U64, 500*1024L*1024L) // client data allowed in-memory (in bytes)
+OPTION(osd_client_message_cap, OPT_U64, 100) // client messages allowed in-memory (in bytes)
OPTION(osd_pg_bits, OPT_INT, 6) // bits per osd
OPTION(osd_pgp_bits, OPT_INT, 6) // bits per osd
OPTION(osd_crush_chooseleaf_type, OPT_INT, 1) // 1 = host
diff --git a/src/msg/Message.h b/src/msg/Message.h
index 1bf28e36f2d..33d26b2e7da 100644
--- a/src/msg/Message.h
+++ b/src/msg/Message.h
@@ -299,7 +299,10 @@ protected:
// release our size in bytes back to this throttler when our payload
// is adjusted or when we are destroyed.
- Throttle *throttler;
+ Throttle *byte_throttler;
+
+ // release a count back to this throttler when we are destroyed
+ Throttle *msg_throttler;
// keep track of how big this message was when we reserved space in
// the msgr dispatch_throttler, so that we can properly release it
@@ -313,14 +316,16 @@ protected:
public:
Message()
: connection(NULL),
- throttler(NULL),
+ byte_throttler(NULL),
+ msg_throttler(NULL),
dispatch_throttle_size(0) {
memset(&header, 0, sizeof(header));
memset(&footer, 0, sizeof(footer));
};
Message(int t, int version=1, int compat_version=0)
: connection(NULL),
- throttler(NULL),
+ byte_throttler(NULL),
+ msg_throttler(NULL),
dispatch_throttle_size(0) {
memset(&header, 0, sizeof(header));
header.type = t;
@@ -340,8 +345,10 @@ protected:
assert(nref.read() == 0);
if (connection)
connection->put();
- if (throttler)
- throttler->put(payload.length() + middle.length() + data.length());
+ if (byte_throttler)
+ byte_throttler->put(payload.length() + middle.length() + data.length());
+ if (msg_throttler)
+ msg_throttler->put();
}
public:
Connection *get_connection() { return connection; }
@@ -350,8 +357,10 @@ public:
connection->put();
connection = c;
}
- void set_throttler(Throttle *t) { throttler = t; }
- Throttle *get_throttler() { return throttler; }
+ void set_byte_throttler(Throttle *t) { byte_throttler = t; }
+ Throttle *get_byte_throttler() { return byte_throttler; }
+ void set_message_throttler(Throttle *t) { msg_throttler = t; }
+ Throttle *get_message_throttler() { return msg_throttler; }
void set_dispatch_throttle_size(uint64_t s) { dispatch_throttle_size = s; }
uint64_t get_dispatch_throttle_size() { return dispatch_throttle_size; }
@@ -369,39 +378,48 @@ public:
*/
void clear_payload() {
- if (throttler) throttler->put(payload.length() + middle.length());
+ if (byte_throttler)
+ byte_throttler->put(payload.length() + middle.length());
payload.clear();
middle.clear();
}
void clear_data() {
- if (throttler) throttler->put(data.length());
+ if (byte_throttler)
+ byte_throttler->put(data.length());
data.clear();
}
bool empty_payload() { return payload.length() == 0; }
bufferlist& get_payload() { return payload; }
void set_payload(bufferlist& bl) {
- if (throttler) throttler->put(payload.length());
+ if (byte_throttler)
+ byte_throttler->put(payload.length());
payload.claim(bl);
- if (throttler) throttler->take(payload.length());
+ if (byte_throttler)
+ byte_throttler->take(payload.length());
}
void set_middle(bufferlist& bl) {
- if (throttler) throttler->put(payload.length());
+ if (byte_throttler)
+ byte_throttler->put(payload.length());
middle.claim(bl);
- if (throttler) throttler->take(payload.length());
+ if (byte_throttler)
+ byte_throttler->take(payload.length());
}
bufferlist& get_middle() { return middle; }
void set_data(const bufferlist &d) {
- if (throttler) throttler->put(data.length());
+ if (byte_throttler)
+ byte_throttler->put(data.length());
data = d;
- if (throttler) throttler->take(data.length());
+ if (byte_throttler)
+ byte_throttler->take(data.length());
}
bufferlist& get_data() { return data; }
void claim_data(bufferlist& bl) {
- if (throttler) throttler->put(data.length());
+ if (byte_throttler)
+ byte_throttler->put(data.length());
bl.claim(data);
}
off_t get_data_len() { return data.length(); }
diff --git a/src/msg/Messenger.h b/src/msg/Messenger.h
index 7205940c118..b08fdaa7f30 100644
--- a/src/msg/Messenger.h
+++ b/src/msg/Messenger.h
@@ -74,7 +74,8 @@ public:
* the associated Connection(s). When reading in a new Message, the Messenger
* will call throttler->throttle() for the size of the new Message.
*/
- Throttle *throttler;
+ Throttle *throttler_bytes;
+ Throttle *throttler_messages;
/// Specify features supported locally by the endpoint.
uint64_t features_supported;
@@ -82,12 +83,16 @@ public:
uint64_t features_required;
Policy()
- : lossy(false), server(false), standby(false), resetcheck(true), throttler(NULL),
+ : lossy(false), server(false), standby(false), resetcheck(true),
+ throttler_bytes(NULL),
+ throttler_messages(NULL),
features_supported(CEPH_FEATURES_SUPPORTED_DEFAULT),
features_required(0) {}
private:
Policy(bool l, bool s, bool st, bool r, uint64_t sup, uint64_t req)
- : lossy(l), server(s), standby(st), resetcheck(r), throttler(NULL),
+ : lossy(l), server(s), standby(st), resetcheck(r),
+ throttler_bytes(NULL),
+ throttler_messages(NULL),
features_supported(sup | CEPH_FEATURES_SUPPORTED_DEFAULT),
features_required(req) {}
@@ -266,7 +271,7 @@ public:
* ownership of this pointer, but you must not destroy it before
* you destroy the Messenger.
*/
- virtual void set_policy_throttler(int type, Throttle *t) = 0;
+ virtual void set_policy_throttlers(int type, Throttle *bytes, Throttle *msgs=NULL) = 0;
/**
* Set the default send priority
*
diff --git a/src/msg/Pipe.cc b/src/msg/Pipe.cc
index 75f7e551292..f4100bc483b 100644
--- a/src/msg/Pipe.cc
+++ b/src/msg/Pipe.cc
@@ -1664,14 +1664,20 @@ int Pipe::read_message(Message **pm)
Message *message;
utime_t recv_stamp = ceph_clock_now(msgr->cct);
+ if (policy.throttler_messages) {
+ ldout(msgr->cct,10) << "reader wants " << 1 << " message from policy throttler "
+ << policy.throttler_messages->get_current() << "/"
+ << policy.throttler_messages->get_max() << dendl;
+ policy.throttler_messages->get();
+ }
+
uint64_t message_size = header.front_len + header.middle_len + header.data_len;
if (message_size) {
- bool waited_on_throttle = false;
- if (policy.throttler) {
- ldout(msgr->cct,10) << "reader wants " << message_size << " from policy throttler "
- << policy.throttler->get_current() << "/"
- << policy.throttler->get_max() << dendl;
- waited_on_throttle = policy.throttler->get(message_size);
+ if (policy.throttler_bytes) {
+ ldout(msgr->cct,10) << "reader wants " << message_size << " bytes from policy throttler "
+ << policy.throttler_bytes->get_current() << "/"
+ << policy.throttler_bytes->get_max() << dendl;
+ policy.throttler_bytes->get(message_size);
}
// throttle total bytes waiting for dispatch. do this _after_ the
@@ -1681,7 +1687,7 @@ int Pipe::read_message(Message **pm)
ldout(msgr->cct,10) << "reader wants " << message_size << " from dispatch throttler "
<< msgr->dispatch_throttler.get_current() << "/"
<< msgr->dispatch_throttler.get_max() << dendl;
- waited_on_throttle |= msgr->dispatch_throttler.get(message_size);
+ msgr->dispatch_throttler.get(message_size);
}
utime_t throttle_stamp = ceph_clock_now(msgr->cct);
@@ -1810,7 +1816,8 @@ int Pipe::read_message(Message **pm)
}
}
- message->set_throttler(policy.throttler);
+ message->set_byte_throttler(policy.throttler_bytes);
+ message->set_message_throttler(policy.throttler_messages);
// store reservation size in message, so we don't get confused
// by messages entering the dispatch queue through other paths.
@@ -1825,12 +1832,18 @@ int Pipe::read_message(Message **pm)
out_dethrottle:
// release bytes reserved from the throttlers on failure
+ if (policy.throttler_messages) {
+ ldout(msgr->cct,10) << "reader releasing " << 1 << " message to policy throttler "
+ << policy.throttler_messages->get_current() << "/"
+ << policy.throttler_messages->get_max() << dendl;
+ policy.throttler_messages->put();
+ }
if (message_size) {
- if (policy.throttler) {
- ldout(msgr->cct,10) << "reader releasing " << message_size << " to policy throttler "
- << policy.throttler->get_current() << "/"
- << policy.throttler->get_max() << dendl;
- policy.throttler->put(message_size);
+ if (policy.throttler_bytes) {
+ ldout(msgr->cct,10) << "reader releasing " << message_size << " bytes to policy throttler "
+ << policy.throttler_bytes->get_current() << "/"
+ << policy.throttler_bytes->get_max() << dendl;
+ policy.throttler_bytes->put(message_size);
}
msgr->dispatch_throttle_release(message_size);
diff --git a/src/msg/SimpleMessenger.h b/src/msg/SimpleMessenger.h
index cc946e3d25a..d837a4496ae 100644
--- a/src/msg/SimpleMessenger.h
+++ b/src/msg/SimpleMessenger.h
@@ -162,12 +162,15 @@ public:
* ownership of this pointer, but you must not destroy it before
* you destroy SimpleMessenger.
*/
- void set_policy_throttler(int type, Throttle *t) {
+ void set_policy_throttlers(int type, Throttle *byte_throttle, Throttle *msg_throttle) {
Mutex::Locker l(policy_lock);
- if (policy_map.count(type))
- policy_map[type].throttler = t;
- else
- default_policy.throttler = t;
+ if (policy_map.count(type)) {
+ policy_map[type].throttler_bytes = byte_throttle;
+ policy_map[type].throttler_messages = msg_throttle;
+ } else {
+ default_policy.throttler_bytes = byte_throttle;
+ default_policy.throttler_messages = msg_throttle;
+ }
}
/**
* Bind the SimpleMessenger to a specific address. If bind_addr
diff --git a/src/test/mon/test_mon_workloadgen.cc b/src/test/mon/test_mon_workloadgen.cc
index 216e6288b1f..07f999180a3 100644
--- a/src/test/mon/test_mon_workloadgen.cc
+++ b/src/test/mon/test_mon_workloadgen.cc
@@ -366,8 +366,8 @@ class OSDStub : public TestStub
messenger->set_default_policy(
Messenger::Policy::stateless_server(supported, 0));
- messenger->set_policy_throttler(entity_name_t::TYPE_CLIENT,
- &throttler);
+ messenger->set_policy_throttlers(entity_name_t::TYPE_CLIENT,
+ &throttler, NULL);
messenger->set_policy(entity_name_t::TYPE_MON,
Messenger::Policy::lossy_client(supported, CEPH_FEATURE_UID |
CEPH_FEATURE_PGID64 |