summaryrefslogtreecommitdiff
path: root/bufferevent_ratelim.c
diff options
context:
space:
mode:
Diffstat (limited to 'bufferevent_ratelim.c')
-rw-r--r--bufferevent_ratelim.c237
1 files changed, 189 insertions, 48 deletions
diff --git a/bufferevent_ratelim.c b/bufferevent_ratelim.c
index 78d0c430..44d18129 100644
--- a/bufferevent_ratelim.c
+++ b/bufferevent_ratelim.c
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2007-2010 Niels Provos and Nick Mathewson
+ * Copyright (c) 2007-2011 Niels Provos and Nick Mathewson
* Copyright (c) 2002-2006 Niels Provos <provos@citi.umich.edu>
* All rights reserved.
*
@@ -25,6 +25,7 @@
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
* THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
+#include "evconfig-private.h"
#include <sys/types.h>
#include <limits.h>
@@ -43,6 +44,7 @@
#include "bufferevent-internal.h"
#include "mm-internal.h"
#include "util-internal.h"
+#include "event-internal.h"
int
ev_token_bucket_init(struct ev_token_bucket *bucket,
@@ -56,9 +58,9 @@ ev_token_bucket_init(struct ev_token_bucket *bucket,
leave "last_updated" as it is; the next update will add the
appropriate amount of bandwidth to the bucket.
*/
- if (bucket->read_limit > cfg->read_maximum)
+ if (bucket->read_limit > (ev_int64_t) cfg->read_maximum)
bucket->read_limit = cfg->read_maximum;
- if (bucket->write_limit > cfg->write_maximum)
+ if (bucket->write_limit > (ev_int64_t) cfg->write_maximum)
bucket->write_limit = cfg->write_maximum;
} else {
bucket->read_limit = cfg->read_rate;
@@ -138,8 +140,8 @@ ev_token_bucket_get_tick(const struct timeval *tv,
}
struct ev_token_bucket_cfg *
-ev_token_bucket_cfg_new(ev_uint32_t read_rate, ev_uint32_t read_burst,
- ev_uint32_t write_rate, ev_uint32_t write_burst,
+ev_token_bucket_cfg_new(size_t read_rate, size_t read_burst,
+ size_t write_rate, size_t write_burst,
const struct timeval *tick_len)
{
struct ev_token_bucket_cfg *r;
@@ -152,6 +154,11 @@ ev_token_bucket_cfg_new(ev_uint32_t read_rate, ev_uint32_t read_burst,
if (read_rate > read_burst || write_rate > write_burst ||
read_rate < 1 || write_rate < 1)
return NULL;
+ if (read_rate > EV_RATE_LIMIT_MAX ||
+ write_rate > EV_RATE_LIMIT_MAX ||
+ read_burst > EV_RATE_LIMIT_MAX ||
+ write_burst > EV_RATE_LIMIT_MAX)
+ return NULL;
r = mm_calloc(1, sizeof(struct ev_token_bucket_cfg));
if (!r)
return NULL;
@@ -160,7 +167,8 @@ ev_token_bucket_cfg_new(ev_uint32_t read_rate, ev_uint32_t read_burst,
r->read_maximum = read_burst;
r->write_maximum = write_burst;
memcpy(&r->tick_timeout, tick_len, sizeof(struct timeval));
- r->msec_per_tick = (tick_len->tv_sec * 1000) + tick_len->tv_usec/1000;
+ r->msec_per_tick = (tick_len->tv_sec * 1000) +
+ (tick_len->tv_usec & COMMON_TIMEOUT_MICROSECONDS_MASK)/1000;
return r;
}
@@ -170,28 +178,27 @@ ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg)
mm_free(cfg);
}
-/* No matter how big our bucket gets, don't try to read more than this
- * much in a single read operation. */
-#define MAX_TO_READ_EVER 16384
-/* No matter how big our bucket gets, don't try to write more than this
- * much in a single write operation. */
-#define MAX_TO_WRITE_EVER 16384
+/* Default values for max_single_read & max_single_write variables. */
+#define MAX_SINGLE_READ_DEFAULT 16384
+#define MAX_SINGLE_WRITE_DEFAULT 16384
#define LOCK_GROUP(g) EVLOCK_LOCK((g)->lock, 0)
#define UNLOCK_GROUP(g) EVLOCK_UNLOCK((g)->lock, 0)
static int _bev_group_suspend_reading(struct bufferevent_rate_limit_group *g);
static int _bev_group_suspend_writing(struct bufferevent_rate_limit_group *g);
+static void _bev_group_unsuspend_reading(struct bufferevent_rate_limit_group *g);
+static void _bev_group_unsuspend_writing(struct bufferevent_rate_limit_group *g);
/** Helper: figure out the maximum amount we should write if is_write, or
the maximum amount we should read if is_read. Return that maximum, or
0 if our bucket is wholly exhausted.
*/
-static inline int
+static inline ev_ssize_t
_bufferevent_get_rlim_max(struct bufferevent_private *bev, int is_write)
{
/* needs lock on bev. */
- int max_so_far = is_write?MAX_TO_WRITE_EVER:MAX_TO_READ_EVER;
+ ev_ssize_t max_so_far = is_write?bev->max_single_write:bev->max_single_read;
#define LIM(x) \
(is_write ? (x).write_limit : (x).read_limit)
@@ -221,7 +228,7 @@ _bufferevent_get_rlim_max(struct bufferevent_private *bev, int is_write)
if (bev->rate_limiting->group) {
struct bufferevent_rate_limit_group *g =
bev->rate_limiting->group;
- ev_uint32_t share;
+ ev_ssize_t share;
LOCK_GROUP(g);
if (GROUP_SUSPENDED(g)) {
/* We can get here if we failed to lock this
@@ -245,23 +252,25 @@ _bufferevent_get_rlim_max(struct bufferevent_private *bev, int is_write)
CLAMPTO(share);
}
+ if (max_so_far < 0)
+ max_so_far = 0;
return max_so_far;
}
-int
+ev_ssize_t
_bufferevent_get_read_max(struct bufferevent_private *bev)
{
return _bufferevent_get_rlim_max(bev, 0);
}
-int
+ev_ssize_t
_bufferevent_get_write_max(struct bufferevent_private *bev)
{
return _bufferevent_get_rlim_max(bev, 1);
}
int
-_bufferevent_decrement_read_buckets(struct bufferevent_private *bev, int bytes)
+_bufferevent_decrement_read_buckets(struct bufferevent_private *bev, ev_ssize_t bytes)
{
/* XXXXX Make sure all users of this function check its return value */
int r = 0;
@@ -276,6 +285,10 @@ _bufferevent_decrement_read_buckets(struct bufferevent_private *bev, int bytes)
if (event_add(&bev->rate_limiting->refill_bucket_event,
&bev->rate_limiting->cfg->tick_timeout) < 0)
r = -1;
+ } else if (bev->read_suspended & BEV_SUSPEND_BW) {
+ if (!(bev->write_suspended & BEV_SUSPEND_BW))
+ event_del(&bev->rate_limiting->refill_bucket_event);
+ bufferevent_unsuspend_read(&bev->bev, BEV_SUSPEND_BW);
}
}
@@ -285,6 +298,8 @@ _bufferevent_decrement_read_buckets(struct bufferevent_private *bev, int bytes)
bev->rate_limiting->group->total_read += bytes;
if (bev->rate_limiting->group->rate_limit.read_limit <= 0) {
_bev_group_suspend_reading(bev->rate_limiting->group);
+ } else if (bev->rate_limiting->group->read_suspended) {
+ _bev_group_unsuspend_reading(bev->rate_limiting->group);
}
UNLOCK_GROUP(bev->rate_limiting->group);
}
@@ -293,7 +308,7 @@ _bufferevent_decrement_read_buckets(struct bufferevent_private *bev, int bytes)
}
int
-_bufferevent_decrement_write_buckets(struct bufferevent_private *bev, int bytes)
+_bufferevent_decrement_write_buckets(struct bufferevent_private *bev, ev_ssize_t bytes)
{
/* XXXXX Make sure all users of this function check its return value */
int r = 0;
@@ -308,6 +323,10 @@ _bufferevent_decrement_write_buckets(struct bufferevent_private *bev, int bytes)
if (event_add(&bev->rate_limiting->refill_bucket_event,
&bev->rate_limiting->cfg->tick_timeout) < 0)
r = -1;
+ } else if (bev->write_suspended & BEV_SUSPEND_BW) {
+ if (!(bev->read_suspended & BEV_SUSPEND_BW))
+ event_del(&bev->rate_limiting->refill_bucket_event);
+ bufferevent_unsuspend_write(&bev->bev, BEV_SUSPEND_BW);
}
}
@@ -317,6 +336,8 @@ _bufferevent_decrement_write_buckets(struct bufferevent_private *bev, int bytes)
bev->rate_limiting->group->total_written += bytes;
if (bev->rate_limiting->group->rate_limit.write_limit <= 0) {
_bev_group_suspend_writing(bev->rate_limiting->group);
+ } else if (bev->rate_limiting->group->write_suspended) {
+ _bev_group_unsuspend_writing(bev->rate_limiting->group);
}
UNLOCK_GROUP(bev->rate_limiting->group);
}
@@ -511,6 +532,7 @@ _bev_group_refill_callback(evutil_socket_t fd, short what, void *arg)
event_base_gettimeofday_cached(event_get_base(&g->master_refill_event), &now);
LOCK_GROUP(g);
+
tick = ev_token_bucket_get_tick(&now, &g->rate_limit_cfg);
ev_token_bucket_update(&g->rate_limit, &g->rate_limit_cfg, tick);
@@ -541,15 +563,19 @@ bufferevent_set_rate_limit(struct bufferevent *bev,
struct bufferevent_rate_limit *rlim;
struct timeval now;
ev_uint32_t tick;
+ int reinit = 0, suspended = 0;
/* XXX reference-count cfg */
BEV_LOCK(bev);
if (cfg == NULL) {
if (bevp->rate_limiting) {
- bevp->rate_limiting->cfg = NULL;
+ rlim = bevp->rate_limiting;
+ rlim->cfg = NULL;
bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW);
bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW);
+ if (event_initialized(&rlim->refill_bucket_event))
+ event_del(&rlim->refill_bucket_event);
}
r = 0;
goto done;
@@ -559,29 +585,48 @@ bufferevent_set_rate_limit(struct bufferevent *bev,
tick = ev_token_bucket_get_tick(&now, cfg);
if (bevp->rate_limiting && bevp->rate_limiting->cfg == cfg) {
- ;
- } else if (bevp->rate_limiting) {
- bevp->rate_limiting->cfg = cfg;
- ev_token_bucket_init(&bevp->rate_limiting->limit, cfg, tick, 1);
- if (bevp->rate_limiting->limit.read_limit > 0)
- bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW);
- else
- bufferevent_suspend_read(bev, BEV_SUSPEND_BW);
- if (bevp->rate_limiting->limit.write_limit > 0)
- bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW);
- else
- bufferevent_suspend_write(bev, BEV_SUSPEND_BW);
- } else {
+ /* no-op */
+ r = 0;
+ goto done;
+ }
+ if (bevp->rate_limiting == NULL) {
rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
if (!rlim)
goto done;
- rlim->cfg = cfg;
- ev_token_bucket_init(&rlim->limit, cfg, tick, 0);
- evtimer_assign(&rlim->refill_bucket_event, bev->ev_base,
- _bev_refill_callback, bevp);
bevp->rate_limiting = rlim;
+ } else {
+ rlim = bevp->rate_limiting;
+ }
+ reinit = rlim->cfg != NULL;
+
+ rlim->cfg = cfg;
+ ev_token_bucket_init(&rlim->limit, cfg, tick, reinit);
+
+ if (reinit) {
+ EVUTIL_ASSERT(event_initialized(&rlim->refill_bucket_event));
+ event_del(&rlim->refill_bucket_event);
+ }
+ evtimer_assign(&rlim->refill_bucket_event, bev->ev_base,
+ _bev_refill_callback, bevp);
+
+ if (rlim->limit.read_limit > 0) {
+ bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW);
+ } else {
+ bufferevent_suspend_read(bev, BEV_SUSPEND_BW);
+ suspended=1;
+ }
+ if (rlim->limit.write_limit > 0) {
+ bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW);
+ } else {
+ bufferevent_suspend_write(bev, BEV_SUSPEND_BW);
+ suspended = 1;
}
+
+ if (suspended)
+ event_add(&rlim->refill_bucket_event, &cfg->tick_timeout);
+
r = 0;
+
done:
BEV_UNLOCK(bev);
return r;
@@ -606,13 +651,15 @@ bufferevent_rate_limit_group_new(struct event_base *base,
ev_token_bucket_init(&g->rate_limit, cfg, tick, 0);
- g->min_share = 64;
event_assign(&g->master_refill_event, base, -1, EV_PERSIST,
_bev_group_refill_callback, g);
/*XXXX handle event_add failure */
event_add(&g->master_refill_event, &cfg->tick_timeout);
EVTHREAD_ALLOC_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
+
+ bufferevent_rate_limit_group_set_min_share(g, 64);
+
return g;
}
@@ -630,9 +677,9 @@ bufferevent_rate_limit_group_set_cfg(
&g->rate_limit_cfg.tick_timeout, &cfg->tick_timeout, ==);
memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
- if (g->rate_limit.read_limit > cfg->read_maximum)
+ if (g->rate_limit.read_limit > (ev_ssize_t)cfg->read_maximum)
g->rate_limit.read_limit = cfg->read_maximum;
- if (g->rate_limit.write_limit > cfg->write_maximum)
+ if (g->rate_limit.write_limit > (ev_ssize_t)cfg->write_maximum)
g->rate_limit.write_limit = cfg->write_maximum;
if (!same_tick) {
@@ -640,10 +687,33 @@ bufferevent_rate_limit_group_set_cfg(
event_add(&g->master_refill_event, &cfg->tick_timeout);
}
+ /* The new limits might force us to adjust min_share differently. */
+ bufferevent_rate_limit_group_set_min_share(g, g->configured_min_share);
+
UNLOCK_GROUP(g);
return 0;
}
+int
+bufferevent_rate_limit_group_set_min_share(
+ struct bufferevent_rate_limit_group *g,
+ size_t share)
+{
+ if (share > EV_SSIZE_MAX)
+ return -1;
+
+ g->configured_min_share = share;
+
+ /* Can't set share to less than the one-tick maximum. IOW, at steady
+ * state, at least one connection can go per tick. */
+ if (share > g->rate_limit_cfg.read_rate)
+ share = g->rate_limit_cfg.read_rate;
+ if (share > g->rate_limit_cfg.write_rate)
+ share = g->rate_limit_cfg.write_rate;
+
+ g->min_share = share;
+ return 0;
+}
void
bufferevent_rate_limit_group_free(struct bufferevent_rate_limit_group *g)
@@ -706,6 +776,13 @@ bufferevent_add_to_rate_limit_group(struct bufferevent *bev,
int
bufferevent_remove_from_rate_limit_group(struct bufferevent *bev)
{
+ return bufferevent_remove_from_rate_limit_group_internal(bev, 1);
+}
+
+int
+bufferevent_remove_from_rate_limit_group_internal(struct bufferevent *bev,
+ int unsuspend)
+{
struct bufferevent_private *bevp =
EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
BEV_LOCK(bev);
@@ -718,8 +795,10 @@ bufferevent_remove_from_rate_limit_group(struct bufferevent *bev)
LIST_REMOVE(bevp, rate_limiting->next_in_group);
UNLOCK_GROUP(g);
}
- bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW_GROUP);
- bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW_GROUP);
+ if (unsuspend) {
+ bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW_GROUP);
+ bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW_GROUP);
+ }
BEV_UNLOCK(bev);
return 0;
}
@@ -769,6 +848,56 @@ bufferevent_get_write_limit(struct bufferevent *bev)
return r;
}
+int
+bufferevent_set_max_single_read(struct bufferevent *bev, size_t size)
+{
+ struct bufferevent_private *bevp;
+ BEV_LOCK(bev);
+ bevp = BEV_UPCAST(bev);
+ if (size == 0 || size > EV_SSIZE_MAX)
+ bevp->max_single_read = MAX_SINGLE_READ_DEFAULT;
+ else
+ bevp->max_single_read = size;
+ BEV_UNLOCK(bev);
+ return 0;
+}
+
+int
+bufferevent_set_max_single_write(struct bufferevent *bev, size_t size)
+{
+ struct bufferevent_private *bevp;
+ BEV_LOCK(bev);
+ bevp = BEV_UPCAST(bev);
+ if (size == 0 || size > EV_SSIZE_MAX)
+ bevp->max_single_write = MAX_SINGLE_WRITE_DEFAULT;
+ else
+ bevp->max_single_write = size;
+ BEV_UNLOCK(bev);
+ return 0;
+}
+
+ev_ssize_t
+bufferevent_get_max_single_read(struct bufferevent *bev)
+{
+ ev_ssize_t r;
+
+ BEV_LOCK(bev);
+ r = BEV_UPCAST(bev)->max_single_read;
+ BEV_UNLOCK(bev);
+ return r;
+}
+
+ev_ssize_t
+bufferevent_get_max_single_write(struct bufferevent *bev)
+{
+ ev_ssize_t r;
+
+ BEV_LOCK(bev);
+ r = BEV_UPCAST(bev)->max_single_write;
+ BEV_UNLOCK(bev);
+ return r;
+}
+
ev_ssize_t
bufferevent_get_max_to_read(struct bufferevent *bev)
{
@@ -820,7 +949,7 @@ int
bufferevent_decrement_read_limit(struct bufferevent *bev, ev_ssize_t decr)
{
int r = 0;
- ev_int32_t old_limit, new_limit;
+ ev_ssize_t old_limit, new_limit;
struct bufferevent_private *bevp;
BEV_LOCK(bev);
bevp = BEV_UPCAST(bev);
@@ -834,7 +963,8 @@ bufferevent_decrement_read_limit(struct bufferevent *bev, ev_ssize_t decr)
&bevp->rate_limiting->cfg->tick_timeout) < 0)
r = -1;
} else if (old_limit <= 0 && new_limit > 0) {
- event_del(&bevp->rate_limiting->refill_bucket_event);
+ if (!(bevp->write_suspended & BEV_SUSPEND_BW))
+ event_del(&bevp->rate_limiting->refill_bucket_event);
bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW);
}
@@ -848,7 +978,7 @@ bufferevent_decrement_write_limit(struct bufferevent *bev, ev_ssize_t decr)
/* XXXX this is mostly copy-and-paste from
* bufferevent_decrement_read_limit */
int r = 0;
- ev_int32_t old_limit, new_limit;
+ ev_ssize_t old_limit, new_limit;
struct bufferevent_private *bevp;
BEV_LOCK(bev);
bevp = BEV_UPCAST(bev);
@@ -862,7 +992,8 @@ bufferevent_decrement_write_limit(struct bufferevent *bev, ev_ssize_t decr)
&bevp->rate_limiting->cfg->tick_timeout) < 0)
r = -1;
} else if (old_limit <= 0 && new_limit > 0) {
- event_del(&bevp->rate_limiting->refill_bucket_event);
+ if (!(bevp->read_suspended & BEV_SUSPEND_BW))
+ event_del(&bevp->rate_limiting->refill_bucket_event);
bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW);
}
@@ -875,7 +1006,7 @@ bufferevent_rate_limit_group_decrement_read(
struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
{
int r = 0;
- ev_int32_t old_limit, new_limit;
+ ev_ssize_t old_limit, new_limit;
LOCK_GROUP(grp);
old_limit = grp->rate_limit.read_limit;
new_limit = (grp->rate_limit.read_limit -= decr);
@@ -895,7 +1026,7 @@ bufferevent_rate_limit_group_decrement_write(
struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
{
int r = 0;
- ev_int32_t old_limit, new_limit;
+ ev_ssize_t old_limit, new_limit;
LOCK_GROUP(grp);
old_limit = grp->rate_limit.write_limit;
new_limit = (grp->rate_limit.write_limit -= decr);
@@ -926,3 +1057,13 @@ bufferevent_rate_limit_group_reset_totals(struct bufferevent_rate_limit_group *g
{
grp->total_read = grp->total_written = 0;
}
+
+int
+_bufferevent_ratelim_init(struct bufferevent_private *bev)
+{
+ bev->rate_limiting = NULL;
+ bev->max_single_read = MAX_SINGLE_READ_DEFAULT;
+ bev->max_single_write = MAX_SINGLE_WRITE_DEFAULT;
+
+ return 0;
+}